aboutsummaryrefslogtreecommitdiff
path: root/src/IvyWatcher.java
blob: 7698632e24e0234b1dbf2c8defb2d6c1c0d71860 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/**
 * IvyWatcher, A private Class for the Ivy rendezvous
 *
 * @author	Yannick Jestin
 * @author	François-Régis Colin
 * @author	<a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
 *
 * (C) CENA
 *
 * right now, the rendez vous is either an UDP socket or a TCP multicast.
 * The watcher will answer to
 * each peer advertising its arrival on the bus. The intrinsics of Unix are so
 * that the broadcast is done using the same socket, which is not a good
 * thing.
 *
 * CHANGELOG:
 * 1.2.5:
 *  - getDomain now sends IvyException for malformed broadcast addresses
 * 1.2.4:
 *  - sends the broadcast before listening to the other's broadcasts.
 *    TODO wait for all the broadcast to be sent before starting the listen
 *    mode
 *  - (REMOVED) allows the connexion from a remote host with the same port number
 *    it's too complicated to know if the packet is from ourselves...
 *  - deals with the protocol errors in a more efficient way. The goal is not
 *    to loose our connectivity because of a rude agent.
 *    fixes Bug J005 (YJ + JPI)
 * 1.2.3:
 *  - the packet sending is done in its own thread from now on (PacketSender)
 *    I don't care stopping it, since it can't be blocked.
 *  - checks whether I have been interrupted just after the receive (start()
 *  then stop() immediately).
 * 1.2.1:
 *  - can be Interrupted during the broadcast Send. I catch the
 *    and do nothing with it. InterruptedIOException
 *  - changed the fill character from 0 to 10, in order to prevent a nasty bug
 *    on Windows XP machines
 *  - fixed a NullPointerException while trying to stop a Thread before having
 *    created it.
 * 1.0.12:
 *  - setSoTimeout on socket
 *  - the broadcast reader Thread goes volatile
 * 1.0.10:
 *  - isInDomain() is wrong  in multicast. I've removed it
 *  - there was a remanence effect in the datagrampacket buffer. I clean it up after each message
 *  - cleaned up the getDomain() and getPort() code 
 *  - close message sends an interruption on all threads for a clean exit
 *  - removed the timeout bug eating all the CPU resources
 *  - now handles a Vector of broadcast listeners
 */
package fr.dgac.ivy ;
import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.StringTokenizer;
import gnu.regexp.*;
import java.util.Vector;
import java.util.Enumeration;

class IvyWatcher implements Runnable {
  private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
  private boolean isMulticastAddress = false;
  private Ivy  bus;			/* master bus controler */
  private DatagramSocket broadcast;	/* supervision socket */
  private InetAddress localhost,loopback;
  private String domainaddr;
  private int port;
  private volatile Thread listenThread;
  private InetAddress group;

  /**
   * creates an Ivy watcher
   * @param bus the bus
   * @param net the domain
   */
  IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException {
    this.bus = bus;
    this.domainaddr=domainaddr;
    this.port=port;
    listenThread = new Thread(this);
    // create the MulticastSocket
    try {
      group = InetAddress.getByName(domainaddr);
      broadcast = new MulticastSocket(port);
      if (group.isMulticastAddress()) {
	isMulticastAddress = true;
	((MulticastSocket)broadcast).joinGroup(group);
      } 
      broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH);
      localhost=InetAddress.getLocalHost();
      loopback=InetAddress.getByName(null);
    } catch ( UnknownHostException uhe ) {
    } catch ( IOException e ) {
      throw new IvyException("IvyWatcher I/O error" + e );
    }
  }
  
  /**
   * the behaviour of each thread watching the UDP socket.
   */
  public void run() {
    // System.out.println("IvyWatcher Thread started"); // THREADDEBUG
    Thread thisThread=Thread.currentThread();
    traceDebug("beginning of a watcher Thread");
    byte buf[] = new byte[256];
    DatagramPacket packet=new DatagramPacket(buf, 256);
    InetAddress remotehost=null;
    try {
      while( listenThread==thisThread ) {
	try {
	  broadcast.receive(packet);
	  if (listenThread!=thisThread) break; // I was summoned to leave during the receive
	  String msg = new String(packet.getData()) ;
	  for (int i=0;i<buf.length;i++) { buf[i]=10; }
	    // clean up the buffer after each message
	    // BUGFIX ? I change 0 to 10 in order to avoid a bug
	  remotehost = packet.getAddress();
	  traceDebug("BUSWATCHER Receive Broadcast from "+remotehost.getHostName()+":"+packet.getPort());
	  // TODO if ( !isInDomain( remotehost ) ) continue;
	  try {
	    RE re  = new RE("([0-9]*) ([0-9]*)");
	    REMatch result = re.getMatch(msg);
	    if (result==null) {
	      System.err.println("Ignoring bad format broadcast from "+remotehost);
	      continue;
	    }
	    int version = Integer.parseInt(result.toString(1));
	    if ( version < bus.PROTOCOLMINIMUM ) {
	      System.err.println("Ignoring bad protocol version "+remotehost+" we need "+ bus.PROTOCOLMINIMUM+" minimum");
	      continue;
	    }
	    int port = Integer.parseInt(result.toString(2));
            // allows the connexion from a remote host with the same port number
	    // if ( ( (remotehost.equals(localhost)) || (remotehost.equals(loopback)) )
		// && (bus.applicationPort==port)) {
	    if (bus.applicationPort==port) {
	      traceDebug("ignoring my own broadcast. OK");
	      continue; // it's me
	    }
	    traceDebug("Broadcast de " +packet.getAddress().getHostName()
	      +":"+packet.getPort()+" port "+port+" version "+version);
	    Socket socket = new Socket( remotehost, port );
	    bus.addClient(socket,false,version);
	  } catch (REException ree) {
	    ree.printStackTrace();
	    System.exit(-1);
	  } catch (NumberFormatException nfe) {
	    System.err.println("Ignoring bad format broadcast from "+remotehost);
	    continue;
	  } catch ( UnknownHostException e ) {
	    System.err.println("Unkonwn host "+remotehost + e.getMessage());
	  } catch ( IOException e) {
	    System.err.println("can't connect to "+remotehost+" port "+ port+e.getMessage());
	  }
	} catch (InterruptedIOException jii ){
	  if (thisThread!=listenThread) { break ;}
	}
      } // while
    } catch (java.net.SocketException se ){
      if (thisThread==listenThread) { se.printStackTrace(); }
    } catch (IOException ioe ){
      ioe.printStackTrace();
    }
    traceDebug("end of a watcher thread");
    // System.out.println("IvyWatcher Thread stopped"); // THREADDEBUG
  } 

  /**
   * stops the thread waiting on the broadcast socket
   */
  synchronized void stop() {
    traceDebug("begining stopping an IvyWatcher");
    Thread t = listenThread;
    listenThread=null;
    broadcast.close();
    if (t!=null) { t.interrupt(); } // it might not even have been created
    traceDebug("ending stopping an IvyWatcher");
  }

  private class PacketSender implements Runnable {
    DatagramPacket packet;
    String data;
    public PacketSender(String data) {
      this.data=data;
      packet=new DatagramPacket(data.getBytes(),data.length(),group,port);
      new Thread((PacketSender.this)).start();
    }
    public void run() {
      traceDebug("PacketSender thread started"); // THREADDEBUG
      try {
	broadcast.send(packet);
      } catch (InterruptedIOException e) {
	// somebody interrupts my IO. Thread, do nothing.
	System.out.println(e.bytesTransferred+" bytes transferred anyway, out of " + data.length());
	e.printStackTrace();
	traceDebug("IO interrupted during the broadcast. Do nothing");
      } catch ( IOException e ) {
	System.out.println("Broadcast Error" + e.getMessage());
	e.printStackTrace();
	// throw new IvyException("Broadcast error " + e.getMessage() );
	System.exit(0);
      }
      traceDebug("PacketSender thread stopped"); // THREADDEBUG
    }
  }

  synchronized void start() throws IvyException {
    String hello = bus.PROTOCOLVERSION + " " + bus.applicationPort + "\n";
    new PacketSender(hello); // notifies our arrival on each domain: protocol version + port
    listenThread.start();
  }

  /*
   * deprecated since we use Multicast. How to check when we are in UDP
   * broadcast ?
  private boolean isInDomain( InetAddress host ){
    byte rem_addr[] = host.getAddress();
    for ( int i = 0 ; i < domainaddrList.size(); i++ ) {
      byte addr[] = ((InetAddress)domainaddrList.elementAt(i)).getAddress();
      int j ;
      for (  j = 0 ; j < 4 ; j++  )
        if ( (addr[j] != -1) && (addr[j] != rem_addr[j]) ) break;
      if ( j == 4 ) {
        traceDebug( "host " + host + " is in domain\n" );
	return true;
      }
    }
    traceDebug( "host " + host + " Not in domain\n" );
    return false;
  }
   */

  // TODO this is buggy :-\ try it on a named multicast address just to see
  static String getDomain(String net) throws IvyException {
    // System.out.println("debug: net=[" + net+ "]");
    int sep_index = net.lastIndexOf( ":" );
    if ( sep_index != -1 ) { net = net.substring(0,sep_index); }
    try {
      net += ".255.255.255";
      RE exp = new RE( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*");
      net = exp.substitute( net , "$1" );
      if (net==null) {
	System.out.println("Bad broascat addr " + net);
	throw new IvyException("bad broadcast addr");
      }
    } catch ( REException e ){
      System.out.println(e);
      System.exit(0);
    }
    // System.out.println("debug: returning net=[" + net+ "]");
    return net;
  }

  static int getPort(String net) {
    int sep_index = net.lastIndexOf( ":" );
    int port= ( sep_index == -1 ) ? Ivy.DEFAULT_PORT :Integer.parseInt( net.substring( sep_index +1 ));
    // System.out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port);
    return port;
  }


  private void traceDebug(String s){
    if (debug) System.out.println("-->ivywatcher<-- "+s);
  }
  

} // class IvyWatcher
/* EOF */