aboutsummaryrefslogtreecommitdiff
path: root/src/IvyWatcher.java
blob: 4b06b1bd142cf20b3e202402fd0e1a6c6437fabc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
/**
 * IvyWatcher, A private Class for the Ivy rendezvous
 *
 * @author	Yannick Jestin
 * @author	François-Régis Colin
 * @author	<a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
 *
 * (C) CENA
 *
 * right now, the rendez vous is either an UDP socket or a TCP multicast.
 * The watcher will answer to
 * each peer advertising its arrival on the bus. The intrinsics of Unix are so
 * that the broadcast is done using the same socket, which is not a good
 * thing.
 *
 * CHANGELOG:
 * 1.2.9:
 *  - added an application Id in the UDP broadcast. It seems to be ok with
 *  most implementations ( VERSION PORT APPID APPNAME \n) is compatible with (VERSION
 *  APPID). If I receive a broadcast with with the same TCP port number,
 *  I ignore the first and accept the new ones
 * 1.2.8:
 *  - alreadyBroadcasted was static, thus Ivy Agents within the same JVM used
 *    to share the list of agents already connected. A nasty bug.
 * 1.2.7:
 *  - better handling of multiple connexions from the same remote agent when
 *    there are different broadcast addresses ( introduced the alreadyBroadcasted
 *    function )
 * 1.2.6:
 *  - IOException now goes silent when we asked the bus to stop()
 *  - use a new buffer for each Datagram received, to prevent an old bug
 * 1.2.5:
 *  - getDomain now sends IvyException for malformed broadcast addresses
 *  - uses apache jakarta-regexp instead of gnu-regexp
 *  - throws an IvyException if the broadcast domain cannot be resolved
 * 1.2.4:
 *  - sends the broadcast before listening to the other's broadcasts.
 *    I can't wait for all the broadcast to be sent before starting the listen
 *    mode, otherwise another agent behaving likewise could be started
 *    meanwhile, and one would not "see" each other.
 *  - (REMOVED) allows the connexion from a remote host with the same port number
 *    it's too complicated to know if the packet is from ourselves...
 *  - deals with the protocol errors in a more efficient way. The goal is not
 *    to loose our connectivity because of a rude agent.
 *    fixes Bug J005 (YJ + JPI)
 * 1.2.3:
 *  - the packet sending is done in its own thread from now on (PacketSender)
 *    I don't care stopping it, since it can't be blocked.
 *  - checks whether I have been interrupted just after the receive (start()
 *  then stop() immediately).
 * 1.2.1:
 *  - can be Interrupted during the broadcast Send. I catch the
 *    and do nothing with it. InterruptedIOException
 *  - changed the fill character from 0 to 10, in order to prevent a nasty bug
 *    on Windows XP machines
 *  - fixed a NullPointerException while trying to stop a Thread before having
 *    created it.
 * 1.0.12:
 *  - setSoTimeout on socket
 *  - the broadcast reader Thread goes volatile
 * 1.0.10:
 *  - isInDomain() is wrong  in multicast. I've removed it
 *  - there was a remanence effect in the datagrampacket buffer. I clean it up after each message
 *  - cleaned up the getDomain() and getPort() code 
 *  - close message sends an interruption on all threads for a clean exit
 *  - removed the timeout bug eating all the CPU resources
 *  - now handles a Vector of broadcast listeners
 */
package fr.dgac.ivy ;
import java.lang.Thread;
import java.net.*;
import java.io.*;
import org.apache.regexp.*;
import java.util.Hashtable;

class IvyWatcher implements Runnable {
  private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
  private boolean isMulticastAddress = false;
  private boolean alreadyIgnored = false;
  private Ivy bus;			/* master bus controler */
  private DatagramSocket broadcast;	/* supervision socket */
  private InetAddress localhost,loopback;
  private String domainaddr;
  private int port;
  private volatile Thread listenThread = null;
  private InetAddress group;
  private static int serial=0;
  private int myserial=serial++;
  private String busWatcherId = null;

  /**
   * creates an Ivy watcher
   * @param bus the bus
   * @param net the domain
   */
  IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException {
    this.bus = bus;
    this.domainaddr=domainaddr;
    this.port=port;
    busWatcherId=bus.getWatcherId();
    listenThread = new Thread(this);
    // create the MulticastSocket
    try {
      group = InetAddress.getByName(domainaddr);
      broadcast = new MulticastSocket(port);
      if (group.isMulticastAddress()) {
	isMulticastAddress = true;
	((MulticastSocket)broadcast).joinGroup(group);
      } 
      broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH);
      localhost=InetAddress.getLocalHost();
      loopback=InetAddress.getByName(null);
    } catch ( UnknownHostException uhe ) {
    } catch ( IOException e ) {
      throw new IvyException("IvyWatcher I/O error" + e );
    }
  }
  
  /**
   * the behaviour of each thread watching the UDP socket.
   */
  public void run() {
    traceDebug("Thread started"); // THREADDEBUG
    Thread thisThread=Thread.currentThread();
    traceDebug("beginning of a watcher Thread");
    InetAddress remotehost=null;
    try {
      int remotePort=0;
      while( listenThread==thisThread ) {
	try {
	  byte buf[] = new byte[256];
	  DatagramPacket packet=new DatagramPacket(buf,buf.length);
	  broadcast.receive(packet);
	  if (listenThread!=thisThread) break; // I was summoned to leave during the receive
	  String msg = new String(buf,0,packet.getLength());
	  String remotehostname=null;
	  try {
	    remotehost = packet.getAddress();
	    remotehostname = remotehost.getHostName();
	    RE re  = new RE("([0-9]*) ([0-9]*)");
	    if (!(re.match(msg))) {
	      System.err.println("Ignoring bad format broadcast from "+
		  remotehostname+":"+packet.getPort());
	      continue;
	    }
	    int version = Integer.parseInt(re.getParen(1));
	    if ( version < Ivy.PROTOCOLMINIMUM ) {
	      System.err.println("Ignoring bad format broadcast from "+
		  remotehostname+":"+packet.getPort()
		  +" protocol version "+remotehost+" we need "+Ivy.PROTOCOLMINIMUM+" minimum");
	      continue;
	    }
	    remotePort = Integer.parseInt(re.getParen(2));
	    if (bus.applicationPort==remotePort) { // if (same port number)
	      RE reId  = new RE("([0-9]*) ([0-9]*) ([^ ]*) (.*)");
	      if (reId.match(msg)&&(busWatcherId!=null)) {
	        traceDebug("there's an appId: "+reId.getParen(3));
		String otherId=reId.getParen(3);
		String otherName=reId.getParen(4);
	        if (busWatcherId.compareTo(otherId)==0) {
		  // same port #, same bus Id, It's me, I'm outta here
		  traceDebug("ignoring my own broadcast");
		  continue;
		} else {
		  // same port #, different bus Id, it's another agent
		  // implementing the Oh Soooo Cool watcherId undocumented
		  // unprotocolar Ivy add on 
		  traceDebug("accepting a broadcast from a same port by "+otherName);
		}
	      } else {
	        // there's no watcherId in the broacast. I fall back to a
		// crude strategy: I ignore the first broadcast with the same
		// port number, and accept the following ones
		if (alreadyIgnored) {
		  traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort()
		      +" on my port number ("+remotePort+"), it's probably someone else");
		} else {
		  alreadyIgnored=true;
		  traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort()
		      +" on my port number ("+remotePort+"), it's probably me");
		  continue;
		}
	      }
	    } // end if (same port #)
	    traceDebug("broadcast accepted from " +remotehostname
	      +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version);
	    if (!alreadyBroadcasted(remotehost.toString(),remotePort)) {
	      traceDebug("no known agent originating from " + remotehost + ":" + remotePort);
	      try {
		Socket s = new Socket(remotehost,remotePort);
		new IvyClient(bus,s,remotePort,false);
	      } catch ( java.net.ConnectException jnc ) {
		traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus");
	      }
	    } else {
	      traceDebug("there is already a request originating from " + remotehost + ":" + remotePort);
	    }
	  } catch (RESyntaxException ree) {
	    ree.printStackTrace();
	    System.exit(-1);
	  } catch (NumberFormatException nfe) {
	    System.err.println("Ignoring bad format broadcast from "+remotehostname);
	    continue;
	  } catch ( UnknownHostException e ) {
	    System.err.println("Unkonwn host "+remotehost +","+e.getMessage());
	  } catch ( IOException e) {
	    System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage());
	    e.printStackTrace();
	  }
	} catch (InterruptedIOException jii ){
	  if (thisThread!=listenThread) { break ;}
	}
      } // while
    } catch (java.net.SocketException se ){
      if (thisThread==listenThread) {
	traceDebug("socket exception, continuing anyway on other Ivy domains "+se);
      }
    } catch (IOException ioe ){
      System.out.println("IO Exception, continuing anyway on other Ivy domains "+ioe);
    }
    traceDebug("Thread stopped"); // THREADDEBUG
  } 

  /**
   * stops the thread waiting on the broadcast socket
   */
  synchronized void stop() {
    traceDebug("begining stopping");
    Thread t = listenThread;
    listenThread=null;
    broadcast.close();
    if (t!=null) { t.interrupt(); } // it might not even have been created
    traceDebug("stopped");
  }

  private class PacketSender implements Runnable {
    // do I need multiple packetsenders ? Well, there is one PacketSender per
    // domain.
    DatagramPacket packet;
    String data;
    public PacketSender(String data) {
      this.data=data;
      packet=new DatagramPacket(data.getBytes(),data.length(),group,port);
      new Thread((PacketSender.this)).start();
    }
    public void run() {
      traceDebug("PacketSender thread started"); // THREADDEBUG
      try {
	broadcast.send(packet);
      } catch (InterruptedIOException e) {
	// somebody interrupts my IO. Thread, do nothing.
	System.out.println(e.bytesTransferred+" bytes transferred anyway, out of " + data.length());
	e.printStackTrace();
	traceDebug("IO interrupted during the broadcast. Do nothing");
      } catch ( IOException e ) {
	if (listenThread!=null) {
	   System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway");
	   // cannot throw new IvyException in a run ...
	   e.printStackTrace();
	}
      }
      traceDebug("PacketSender thread stopped"); // THREADDEBUG
    }
  }

  synchronized void start() throws IvyException {
    // String hello = Ivy.PROTOCOLVERSION + " " + bus.applicationPort + "\n";
    String hello = Ivy.PROTOCOLVERSION + " " + bus.applicationPort + " "+busWatcherId+" "+bus.selfIvyClient.getApplicationName()+"\n";
    if (broadcast==null) throw new IvyException("IvyWatcher PacketSender null broadcast address");
    new PacketSender(hello); // notifies our arrival on each domain: protocol version + port
    listenThread.start();
  }

  /*
   * since 1.2.7 pre ....
   * went local instead of static ! fixed a nasty bug in 1.2.8
   * checks if there is already a broadcast received from the same address
   * on the same port
   *
   * regoes static ...
   */
  private static Hashtable alreadySocks=new Hashtable();
  private synchronized boolean alreadyBroadcasted(String s,int port) {
    // System.out.println("DEBUUUUUUUG " + s+ ":" + port);
    if (s==null) return false;
    Integer i = (Integer)alreadySocks.get(s);
    if (((i!=null)&&(i.compareTo(new Integer(port)))==0)) return true;
    alreadySocks.put(s,new Integer(port));
    return false;
  }

  /*
  private boolean isInDomain( InetAddress host ){
    return true;
   // TODO check if this function is useful. for now, it always returns true
   // deprecated since we use Multicast. How to check when we are in UDP
   // broadcast ?
   //
    byte rem_addr[] = host.getAddress();
    for ( int i = 0 ; i < domainaddrList.size(); i++ ) {
      byte addr[] = ((InetAddress)domainaddrList.elementAt(i)).getAddress();
      int j ;
      for (  j = 0 ; j < 4 ; j++  )
        if ( (addr[j] != -1) && (addr[j] != rem_addr[j]) ) break;
      if ( j == 4 ) {
        traceDebug( "host " + host + " is in domain\n" );
	return true;
      }
    }
    traceDebug( "host " + host + " Not in domain\n" );
    return false;
  }
  */

  static String getDomain(String net) throws IvyException {
    // System.out.println("debug: net=[" + net+ "]");
    int sep_index = net.lastIndexOf( ":" );
    if ( sep_index != -1 ) { net = net.substring(0,sep_index); }
    try {
      RE numbersPoint = new RE("([0-9]|\\.)+");
      if (!numbersPoint.match(net)) {
	// traceDebug("should only have numbers and point ? I won't add anything... " + net);
	return net;
      }
      net += ".255.255.255";
      RE exp = new RE( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*");
      if (!exp.match(net)) {
	System.out.println("Bad broascat addr " + net);
	throw new IvyException("bad broadcast addr");
      }
      net=exp.getParen(1);
    } catch ( RESyntaxException e ){
      System.out.println(e);
      System.exit(0);
    }
    // System.out.println("debug: returning net=[" + net+ "]");
    return net;
  }

  static int getPort(String net) { // returns 0 if no port is set
    int sep_index = net.lastIndexOf( ":" );
    int port= ( sep_index == -1 ) ? 0 :Integer.parseInt( net.substring( sep_index +1 ));
    // System.out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port);
    return port;
  }

  private void traceDebug(String s){
    if (debug) System.out.println("-->IvyWatcher["+myserial+","+bus.getSerial()+"]<-- "+s);
  }
  
} // class IvyWatcher