1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
|
/**
* IvyWatcher, A private Class for the Ivy rendezvous
*
* @author Yannick Jestin
* @author François-Régis Colin
* @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
*
* (C) CENA
*
* right now, the rendez vous is either an UDP socket or a TCP multicast.
* The watcher will answer to
* each peer advertising its arrival on the bus. The intrinsics of Unix are so
* that the broadcast is done using the same socket, which is not a good
* thing.
*
* CHANGELOG:
* 1.2.6:
* - IOException now goes silent when we asked the bus to stop()
* - use a new buffer for each Datagram received, to prevent an old bug
* 1.2.5:
* - getDomain now sends IvyException for malformed broadcast addresses
* - uses apache jakarta-regexp instead of gnu-regexp
* - throws an IvyException if the broadcast domain cannot be resolved
* 1.2.4:
* - sends the broadcast before listening to the other's broadcasts.
* I can't wait for all the broadcast to be sent before starting the listen
* mode, otherwise another agent behaving likewise could be started
* meanwhile, and one would not "see" each other.
* - (REMOVED) allows the connexion from a remote host with the same port number
* it's too complicated to know if the packet is from ourselves...
* - deals with the protocol errors in a more efficient way. The goal is not
* to loose our connectivity because of a rude agent.
* fixes Bug J005 (YJ + JPI)
* 1.2.3:
* - the packet sending is done in its own thread from now on (PacketSender)
* I don't care stopping it, since it can't be blocked.
* - checks whether I have been interrupted just after the receive (start()
* then stop() immediately).
* 1.2.1:
* - can be Interrupted during the broadcast Send. I catch the
* and do nothing with it. InterruptedIOException
* - changed the fill character from 0 to 10, in order to prevent a nasty bug
* on Windows XP machines
* - fixed a NullPointerException while trying to stop a Thread before having
* created it.
* 1.0.12:
* - setSoTimeout on socket
* - the broadcast reader Thread goes volatile
* 1.0.10:
* - isInDomain() is wrong in multicast. I've removed it
* - there was a remanence effect in the datagrampacket buffer. I clean it up after each message
* - cleaned up the getDomain() and getPort() code
* - close message sends an interruption on all threads for a clean exit
* - removed the timeout bug eating all the CPU resources
* - now handles a Vector of broadcast listeners
*/
package fr.dgac.ivy ;
import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.StringTokenizer;
import org.apache.regexp.*;
import java.util.Vector;
import java.util.Enumeration;
class IvyWatcher implements Runnable {
private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
private boolean isMulticastAddress = false;
private Ivy bus; /* master bus controler */
private DatagramSocket broadcast; /* supervision socket */
private InetAddress localhost,loopback;
private String domainaddr;
private int port;
private volatile Thread listenThread;
private InetAddress group;
private static int serial=0;
private int myserial=serial++;
/**
* creates an Ivy watcher
* @param bus the bus
* @param net the domain
*/
IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException {
this.bus = bus;
this.domainaddr=domainaddr;
this.port=port;
listenThread = new Thread(this);
// create the MulticastSocket
try {
group = InetAddress.getByName(domainaddr);
broadcast = new MulticastSocket(port);
if (group.isMulticastAddress()) {
isMulticastAddress = true;
((MulticastSocket)broadcast).joinGroup(group);
}
broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH);
localhost=InetAddress.getLocalHost();
loopback=InetAddress.getByName(null);
} catch ( UnknownHostException uhe ) {
} catch ( IOException e ) {
throw new IvyException("IvyWatcher I/O error" + e );
}
}
/**
* the behaviour of each thread watching the UDP socket.
*/
public void run() {
traceDebug("Thread started"); // THREADDEBUG
Thread thisThread=Thread.currentThread();
traceDebug("beginning of a watcher Thread");
InetAddress remotehost=null;
try {
while( listenThread==thisThread ) {
try {
byte buf[] = new byte[256];
DatagramPacket packet=new DatagramPacket(buf,buf.length);
broadcast.receive(packet);
if (listenThread!=thisThread) break; // I was summoned to leave during the receive
String msg = new String(buf,0,packet.getLength());
String remotehostname=null;
try {
remotehost = packet.getAddress();
remotehostname = remotehost.getHostName();
RE re = new RE("([0-9]*) ([0-9]*)");
if (!(re.match(msg))) {
System.err.println("Ignoring bad format broadcast from "+
remotehostname+":"+packet.getPort());
continue;
}
int version = Integer.parseInt(re.getParen(1));
if ( version < bus.PROTOCOLMINIMUM ) {
System.err.println("Ignoring bad format broadcast from "+
remotehostname+":"+packet.getPort()
+" protocol version "+remotehost+" we need "+bus.PROTOCOLMINIMUM+" minimum");
continue;
}
int port = Integer.parseInt(re.getParen(2));
if (bus.applicationPort==port) {
traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort()
+" on my port number ("+port+"), it's probably me");
// TODO check better
// if bus.applicationPort=port
// parse the list of Watchers and check for each
// iw.broadcast.getInetAddress().equals(packet().getAddress()
// if one is true, "continue" ( ignore the broadcast )
continue;
}
traceDebug("broadcast accepted from " +remotehostname
+":"+packet.getPort()+", port:"+port+", protocol version:"+version);
new IvyClient(bus,new Socket(remotehost,port),port);
} catch (RESyntaxException ree) {
ree.printStackTrace();
System.exit(-1);
} catch (NumberFormatException nfe) {
System.err.println("Ignoring bad format broadcast from "+remotehostname);
continue;
} catch ( UnknownHostException e ) {
System.err.println("Unkonwn host "+remotehost +","+e.getMessage());
} catch ( IOException e) {
System.err.println("can't connect to "+remotehost+" port "+ port+e.getMessage());
}
} catch (InterruptedIOException jii ){
if (thisThread!=listenThread) { break ;}
}
} // while
} catch (java.net.SocketException se ){
if (thisThread==listenThread) {
traceDebug("socket exception, continuing anyway on other Ivy domains "+se);
}
} catch (IOException ioe ){
System.out.println("IO Exception, continuing anyway on other Ivy domains "+ioe);
}
traceDebug("Thread stopped"); // THREADDEBUG
}
/**
* stops the thread waiting on the broadcast socket
*/
synchronized void stop() {
traceDebug("begining stopping");
Thread t = listenThread;
listenThread=null;
broadcast.close();
if (t!=null) { t.interrupt(); } // it might not even have been created
traceDebug("stopped");
}
private class PacketSender implements Runnable {
DatagramPacket packet;
String data;
public PacketSender(String data) {
this.data=data;
packet=new DatagramPacket(data.getBytes(),data.length(),group,port);
new Thread((PacketSender.this)).start();
}
public void run() {
traceDebug("PacketSender thread started"); // THREADDEBUG
try {
broadcast.send(packet);
} catch (InterruptedIOException e) {
// somebody interrupts my IO. Thread, do nothing.
System.out.println(e.bytesTransferred+" bytes transferred anyway, out of " + data.length());
e.printStackTrace();
traceDebug("IO interrupted during the broadcast. Do nothing");
} catch ( IOException e ) {
if (listenThread!=null) {
System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway");
// cannot throw new IvyException in a run ...
e.printStackTrace();
}
}
traceDebug("PacketSender thread stopped"); // THREADDEBUG
}
}
synchronized void start() throws IvyException {
String hello = bus.PROTOCOLVERSION + " " + bus.applicationPort + "\n";
if (broadcast==null) throw new IvyException("IvyWatcher PacketSender null broadcast address");
new PacketSender(hello); // notifies our arrival on each domain: protocol version + port
listenThread.start();
}
/*
private boolean isInDomain( InetAddress host ){
return true;
// TODO check if this function is useful. for now, it always returns true
// deprecated since we use Multicast. How to check when we are in UDP
// broadcast ?
//
byte rem_addr[] = host.getAddress();
for ( int i = 0 ; i < domainaddrList.size(); i++ ) {
byte addr[] = ((InetAddress)domainaddrList.elementAt(i)).getAddress();
int j ;
for ( j = 0 ; j < 4 ; j++ )
if ( (addr[j] != -1) && (addr[j] != rem_addr[j]) ) break;
if ( j == 4 ) {
traceDebug( "host " + host + " is in domain\n" );
return true;
}
}
traceDebug( "host " + host + " Not in domain\n" );
return false;
}
*/
static String getDomain(String net) throws IvyException {
// System.out.println("debug: net=[" + net+ "]");
int sep_index = net.lastIndexOf( ":" );
if ( sep_index != -1 ) { net = net.substring(0,sep_index); }
try {
RE numbersPoint = new RE("([0-9]|\\.)+");
if (!numbersPoint.match(net)) {
// traceDebug("should only have numbers and point ? I won't add anything... " + net);
return net;
}
net += ".255.255.255";
RE exp = new RE( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*");
if (!exp.match(net)) {
System.out.println("Bad broascat addr " + net);
throw new IvyException("bad broadcast addr");
}
net=exp.getParen(1);
} catch ( RESyntaxException e ){
System.out.println(e);
System.exit(0);
}
// System.out.println("debug: returning net=[" + net+ "]");
return net;
}
static int getPort(String net) { // returns 0 if no port is set
int sep_index = net.lastIndexOf( ":" );
int port= ( sep_index == -1 ) ? 0 :Integer.parseInt( net.substring( sep_index +1 ));
// System.out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port);
return port;
}
private void traceDebug(String s){
if (debug) System.out.println("-->IvyWatcher["+myserial+","+bus.getSerial()+"]<-- "+s);
}
} // class IvyWatcher
|