aboutsummaryrefslogtreecommitdiff
path: root/src/IvyWatcher.java
blob: 7157c18864db674becab81031fbb970c41e6f55a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package fr.dgac.ivy ;

import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.StringTokenizer;
import gnu.regexp.*;
import java.util.Vector;
import java.util.Enumeration;

/**
 * A private Class for the Ivy rendezvous
 *
 * @author	François-Régis Colin
 * @author	Yannick Jestin
 * @author	<a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
 *
 * right now, the rendez vous is either an UDP socket or a TCP multicast.
 * The watcher will answer to
 * each peer advertising its arrival on the bus. The intrinsics of Unix are so
 * that the broadcast is done using the same socket, which is not a good
 * thing.
 *
 * CHANGELOG:
 * 1.2.1:
 *  - changed the fill character from 0 to 10, in order to prevent a nasty bug
 *    on Windows XP machines
 *  - fixed a NullPointerException while trying to stop a Thread before having
 *    created it.
 * 1.0.12:
 *  - setSoTimeout on socket
 *  - the broadcast reader Thread goes volatile
 * 1.0.10:
 *  - isInDomain() is wrong  in multicast. I've removed it
 *  - there was a remanence effect in the datagrampacket buffer. I clean it up after each message
 *  - cleaned up the getDomain() and getPort() code 
 *  - close message sends an interruption on all threads for a clean exit
 *  - removed the timeout bug eating all the CPU resources
 *  - now handles a Vector of broadcast listeners
 */

class IvyWatcher implements Runnable {
  private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
  private boolean isMulticastAddress = false;
  private Ivy  bus;			/* master bus controler */
  private DatagramSocket broadcast;	/* supervision socket */
  private String domainaddr;
  private int port;
  private volatile Thread listenThread;
  private InetAddress group;

  /**
   * creates an Ivy watcher
   * @param bus the bus
   * @param net the domain
   */
  IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException {
    this.bus = bus;
    this.domainaddr=domainaddr;
    this.port=port;
    listenThread = new Thread(this);
    // create the MulticastSocket
    try {
      group = InetAddress.getByName(domainaddr);
      broadcast = new MulticastSocket(port);
      if (group.isMulticastAddress()) {
	isMulticastAddress = true;
	((MulticastSocket)broadcast).joinGroup(group);
      } 
      broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH);
    } catch ( IOException e ) {
      throw new IvyException("IvyWatcher I/O error" + e );
    }
  }
  
  /**
   * the behaviour of each thread watching the UDP socket.
   */
  public void run() {
    // System.out.println("IvyWatcher Thread started"); // THREADDEBUG
    Thread thisThread=Thread.currentThread();
    traceDebug("beginning of a watcher Thread");
    byte buf[] = new byte[256];
    DatagramPacket packet=new DatagramPacket(buf, 256);
    try {
      while( listenThread==thisThread ) {
	int port;
	try {
	  broadcast.receive(packet);
	  String msg = new String(packet.getData()) ;
	  for (int i=0;i<buf.length;i++) { buf[i]=10; }
	    // clean up the buffer after each message
	    // BUGFIX ? I change 0 to 10 in order to avoid a bug
	  InetAddress remotehost = packet.getAddress();
	  traceDebug("BUSWATCHER Receive Broadcast from "+ remotehost.getHostName()+":"+packet.getPort());
	  // TODO if ( !isInDomain( remotehost ) ) continue;
	  // TODO get rid of the StringTokenizer ?
	  StringTokenizer st = new StringTokenizer(msg);
	  if ( !st.hasMoreTokens()) {
	    System.err.println("Bad format "+msg);
	    continue;
	  }
	  int version  = Integer.parseInt( st.nextToken() );
	  if ( version != bus.PROCOCOLVERSION ) {
	    System.err.println("Ignoring bad protocol version broadcast");
	    continue;
	  }
	  if  ( ! st.hasMoreTokens()) {
	    System.err.println("Bad format "+msg);
	    continue;
	  }
	  port = Integer.parseInt( st.nextToken() );
	  if ( (bus.applicationPort == port) ) continue;
	  traceDebug("BUSWATCHER Broadcast de "
	      +packet.getAddress().getHostName()
	      +":"+packet.getPort()+" port "+port+" version "+version);
	  try {
	    Socket socket = new Socket( remotehost, port );
	    bus.addClient(socket,false);
	  } catch ( UnknownHostException e ) {
	    System.err.println("Unkonwn host "+remotehost + e.getMessage());
	  } catch ( IOException e) {
	    System.err.println("can't connect to "+remotehost+" port "+
		port+e.getMessage());
	  }
	} catch (InterruptedIOException jii ){
	  if (thisThread!=listenThread) { break ;}
	}
      } // while
    } catch (java.net.SocketException se ){
      if (thisThread==listenThread) { se.printStackTrace(); }
    } catch (IOException ioe ){
      ioe.printStackTrace();
    }
    traceDebug("end of a watcher thread");
    // System.out.println("IvyWatcher Thread stopped"); // THREADDEBUG
  } 

  /**
   * stops the thread waiting on the broadcast socket
   */
  void stop() {
    traceDebug("begining stopping an IvyWatcher");
    Thread t = listenThread;
    listenThread=null;
    broadcast.close();
    if (t!=null) { t.interrupt(); } // it might not even have been created
    traceDebug("ending stopping an IvyWatcher");
  }

  private void sendBroadcast(String data, String domain, int port) throws IvyException {
    try {
      DatagramPacket packet = new DatagramPacket( data.getBytes(), data.length(), group, port );
      broadcast.send(packet);
    } catch ( IOException e ) {
      throw new IvyException("Broadcast error " + e.getMessage() );
    }
  }

  void start() throws IvyException {
    String hello = bus.PROCOCOLVERSION + " " + bus.applicationPort + "\n";
    listenThread.start();
    sendBroadcast(hello,domainaddr,port); // notifies our arrival on each domain: protocol version + port
  }

  /*
   * deprecated since we use Multicast. How to check when we are in UDP
   * broadcast ?
  private boolean isInDomain( InetAddress host ){
    byte rem_addr[] = host.getAddress();
    for ( int i = 0 ; i < domainaddrList.size(); i++ ) {
      byte addr[] = ((InetAddress)domainaddrList.elementAt(i)).getAddress();
      int j ;
      for (  j = 0 ; j < 4 ; j++  )
        if ( (addr[j] != -1) && (addr[j] != rem_addr[j]) ) break;
      if ( j == 4 ) {
        traceDebug( "host " + host + " is in domain\n" );
	return true;
      }
    }
    traceDebug( "host " + host + " Not in domain\n" );
    return false;
  }
   */

  static String getDomain(String net) {
    int sep_index = net.lastIndexOf( ":" );
    if ( sep_index != -1 ) { net = net.substring(0,sep_index); }
    try {
      net += ".255.255.255";
      RE exp = new RE( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*");
      net = exp.substitute( net , "$1" );
    } catch ( REException e ){
      System.out.println("Bad broascat addr " + net);
      return null;
    }
    // System.out.println("net: "+net);
    return net;
  }

  static int getPort(String net) {
    int sep_index = net.lastIndexOf( ":" );
    int port= ( sep_index == -1 ) ? Ivy.DEFAULT_PORT :Integer.parseInt( net.substring( sep_index +1 ));
    // System.out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port);
    return port;
  }


  private void traceDebug(String s){
    if (debug) System.out.println("-->ivywatcher<-- "+s);
  }
  

} // class IvyWatcher
/* EOF */