aboutsummaryrefslogtreecommitdiff
path: root/src/IvyWatcher.java
blob: e371ea6a4634996d49d79971fbbde27a121c5ed0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package fr.dgac.ivy ;

import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.StringTokenizer;
import gnu.regexp.*;
import java.util.Vector;
import java.util.Enumeration;

/**
 * A private Class for the Ivy rendezvous
 *
 * @author	François-Régis Colin
 * @author	Yannick Jestin
 * @author	<a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
 *
 * right now, the rendez vous is either an UDP socket or a TCP multicast.
 * The watcher will answer to
 * each peer advertising its arrival on the bus. The intrinsics of Unix are so
 * that the broadcast is done using the same socket, which is not a good
 * thing.
 *
 * CHANGELOG:
 * 1.0.10:
 *  - isInDomain() is wrong  in multicast. I've removed it
 *  - there was a remanence effect in the datagrampacket buffer. I clean it up after each message
 *  - cleaned up the getDomain() and getPort() code 
 *  - close message sends an interruption on all threads for a clean exit
 *  - removed the timeout bug eating all the CPU resources
 *  - now handles a Vector of broadcast listeners
 */

class IvyWatcher implements Runnable {
  private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
  //private Vector domainaddrList;
  private boolean watcherrunning = false;
  private boolean isMulticastAddress = false;
  private Vector broadcastListener ;
  private Ivy  bus;			/* master bus controler */
  private DatagramSocket broadcast;	/* supervision socket */
    // it can also be a MulticastSocket, which inherits from the previous
  /**
   * creates an Ivy watcher.
   * @param bus the bus
   */
  IvyWatcher(Ivy bus) throws IvyException {
    this.bus = bus;
    //domainaddrList = new Vector();
  }
  
  /**
   * the behaviour of the thread watching the UDP socket.
   * this thread will stop either when the bus stops or when the
   * watcherrunning will be set to false
   *
   * TODO: better handling of exceptions, because we juste System.err.println
   * here, run cannot throw IvyException ...
   */
  public void run()  {
    byte buf[] = new byte[256];
    DatagramPacket packet=new DatagramPacket(buf, 256);
    int port;
    traceDebug("IvyWatcher waiting for Broadcast");
    while( watcherrunning && bus.ivyRunning ) try {
	broadcast.receive(packet);
	String msg = new String(packet.getData()) ;
	// clean up the buffer after each message
	for (int i=0;i<buf.length;i++) { buf[i]=0; }
	InetAddress remotehost = packet.getAddress();
	traceDebug("BUSWATCHER Receive Broadcast from "+
	    remotehost.getHostName()+":"+packet.getPort());
	// we used to check if remoteaddr is in our broadcast domain list otherwise we
	// ignore the broadcast
	// if ( !isInDomain( remotehost ) ) continue;
	StringTokenizer st = new StringTokenizer(msg);
	if ( !st.hasMoreTokens()) {
	  System.err.println("Bad format "+msg);
	  continue;
	}
	int version  = Integer.parseInt( st.nextToken() );
	if ( version != bus.PROCOCOLVERSION ) {
	  System.err.println("Ignoring bad protocol version broadcast");
	  continue;
	}
	if  ( ! st.hasMoreTokens()) {
	  System.err.println("Bad format "+msg);
	  continue;
	}
	port = Integer.parseInt( st.nextToken() );
	if ( (bus.applicationPort == port) ) continue;
	traceDebug("BUSWATCHER Broadcast de "
	    +packet.getAddress().getHostName()
	    +":"+packet.getPort()+" port "+port+" version "+version);
	try {
	  Socket socket = new Socket( remotehost, port );
	  bus.addClient(socket,false);
	} catch ( UnknownHostException e ) {
	  System.err.println("Unkonwn host "+remotehost + e.getMessage());
	} catch ( IOException e) {
	  System.err.println("can't connect to "+remotehost+" port "+
	      port+e.getMessage());
	}
    } catch (java.io.InterruptedIOException jii ){
      if (!watcherrunning) break;
      System.out.println("DEBUG IvyClient: I have been interrupted");
    } catch (java.io.IOException ioe ){
     System.err.println("IvyWatcher IOException "+ ioe.getMessage() );
    }
    stop();
  } // while
  
  /**
   * stops the thread waiting on the broadcast socket
   */
  void stop() {
    traceDebug("broadcast listener normal shutdown");
    watcherrunning=false;
    for (Enumeration e = broadcastListener.elements();e.hasMoreElements();) {
      Thread t = (Thread) e.nextElement();
      t.interrupt();
    }
  }

  private static void sendBroadcast(String data, String domain, int port) throws IvyException {
    MulticastSocket send ;
    try {
      InetAddress group = InetAddress.getByName(domain);
      send = new MulticastSocket(port);
      if (group.isMulticastAddress()) { ((MulticastSocket)send).joinGroup(group); } 
      DatagramPacket packet = new DatagramPacket( data.getBytes(), data.length(), group, port );
      send.send(packet);
    } catch ( UnknownHostException e ) {
      throw new IvyException("Broadcast sent on unknown network "+
        e.getMessage());
    } catch ( IOException e ) {
      throw new IvyException("Broadcast error " + e.getMessage() );
    }
  }

  void start(String net) throws IvyException {
    String hello = bus.PROCOCOLVERSION + " " + bus.applicationPort + "\n";
    StringTokenizer st = new StringTokenizer(net,",");
    broadcastListener = new Vector();
    while  ( st.hasMoreTokens()) {
      String s = st.nextToken() ;
      String domainaddr=getDomain(s);
      int port=getPort(s);
      // System.out.println("Domaine: "+domainaddr+" : "+port);
      try {
	InetAddress group = InetAddress.getByName(domainaddr);
	broadcast = new MulticastSocket(port);
	// Handling of multicast address
	if (group.isMulticastAddress()) {
	  isMulticastAddress = true;
	  ((MulticastSocket)broadcast).joinGroup(group);
	} 
      } catch ( IOException e ) {
	throw new IvyException("IvyWatcher I/O error" + e );
      } 
      /*
      try {
	broadcast.setSoTimeout(100);
      } catch ( java.net.SocketException jns ) {
	throw new IvyException("IvyWatcher setSoTimeout error" + jns.getMessage() );
      }
      */
      // starts a Thread listening on the socket
      watcherrunning=true;
      Thread t = new Thread(this);
      broadcastListener.addElement(t);
      t.start();
      // notifies our arrival on each domain: protocol version + port
      sendBroadcast(hello,domainaddr,port);
    }
  }

  /*
   * TODO
   * deprecated since we use Multicast. How to check when we are in UDP
   * broadcast ?
  private boolean isInDomain( InetAddress host ){
    byte rem_addr[] = host.getAddress();
    for ( int i = 0 ; i < domainaddrList.size(); i++ ) {
      byte addr[] = ((InetAddress)domainaddrList.elementAt(i)).getAddress();
      int j ;
      for (  j = 0 ; j < 4 ; j++  )
        if ( (addr[j] != -1) && (addr[j] != rem_addr[j]) ) break;
      if ( j == 4 ) {
        traceDebug( "host " + host + " is in domain\n" );
	return true;
      }
    }
    traceDebug( "host " + host + " Not in domain\n" );
    return false;
  }
   */

  private static String getDomain(String net) {
    int sep_index = net.lastIndexOf( ":" );
    if ( sep_index != -1 ) { net = net.substring(0,sep_index); }
    try {
      net += ".255.255.255";
      RE exp = new RE( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*");
      net = exp.substitute( net , "$1" );
    } catch ( REException e ){
      System.out.println("Bad broascat addr " + net);
      return null;
    }
    //System.out.println("net: "+net);
    return net;
  }

  private static int getPort(String net) {
    int sep_index = net.lastIndexOf( ":" );
    int port= ( sep_index == -1 ) ? Ivy.DEFAULT_PORT :Integer.parseInt( net.substring( sep_index +1 ));
    // System.out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port);
    return port;
  }


  private void traceDebug(String s){
    if (debug) System.out.println("-->ivywatcher<-- "+s);
  }
  

} // class IvyWatcher
/* EOF */