aboutsummaryrefslogtreecommitdiff
path: root/src/IvyWatcher.java
blob: 2fe0bfd64dabf29759d0542ebe6d578f22c09e84 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package fr.dgac.ivy ;

import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.StringTokenizer;
import gnu.regexp.*;
import java.util.Vector;

/**
 * A private Class for the Ivy rendezvous
 *
 * @author	François-Régis Colin
 * @author	Yannick Jestin
 * @author	<a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
 *
 * right now, the rendez vous is on a UDP socket. The watcher will answer to
 * each peer advertising its arrival on the bus. The intrinsics of Unix are so
 * that the broadcast is done using the same socket, which is not a good
 * thing.
 */

class IvyWatcher implements Runnable {
  private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
  private Vector domainaddrList;
  private boolean watcherrunning = false;
  private boolean isMulticastAddress = false;
  private Thread broadcastListener ;
  private Ivy  bus;			/* master bus controler */
  private DatagramSocket broadcast;	/* supervision socket */
    // it can also be a MulticastSocket, which inherits from the previous
  /**
   * creates an Ivy watcher.
   * @param bus the bus
   */
  IvyWatcher(Ivy bus) throws IvyException {
    this.bus = bus;
    domainaddrList = new Vector();
  }
  
  /**
   * the behaviour of the thread watching the UDP socket.
   * this thread will stop either when the bus stops or when the
   * watcherrunning will be set to false
   */
  public void run() {
    traceDebug("IvyWatcher waiting for Broadcast");
    while( watcherrunning && bus.ivyRunning ) try {
      byte buf[] = new byte[256];
      DatagramPacket packet=new DatagramPacket(buf, 256);
      int port;
      broadcast.receive(packet);
      String msg = new String(packet.getData()) ;
      InetAddress remotehost = packet.getAddress();
      traceDebug("BUSWATCHER Receive Broadcast from "+
		      remotehost.getHostName()+":"+packet.getPort());
      // check if remoteaddr is in our broadcast domain list
      // otherwise we ignore the broadcast
      if ( !isInDomain( remotehost ) ) continue;
      StringTokenizer st = new StringTokenizer(msg);
      if ( !st.hasMoreTokens()) {
        System.err.println("Bad format "+msg);
	continue;
      }
      int version  = Integer.parseInt( st.nextToken() );
      if ( version != bus.PROCOCOLVERSION ) {
        System.err.println("Ignoring bad protocol version broadcast");
	continue;
      }
      if  ( ! st.hasMoreTokens()) {
        System.err.println("Bad format "+msg);
	continue;
      }
      port = Integer.parseInt( st.nextToken() );
      if ( (bus.applicationPort == port) ) continue;
      traceDebug("BUSWATCHER Broadcast de "
		      +packet.getAddress().getHostName()
		      +":"+packet.getPort()+" port "+port+" version "+version);
      try {
        Socket socket = new Socket( remotehost, port );
	bus.addClient(socket,false);
      } catch ( UnknownHostException e ) {
        System.err.println("Unkonwn host "+remotehost + e.getMessage());
      } catch ( IOException e) {
        System.err.println("can't connect to "+remotehost+" port "+
	  port+e.getMessage());
      }
    } catch ( IOException e ) {
      watcherrunning=false;
      traceDebug("broadcast listener crashed "  +  e.getMessage());
    }
    broadcast.close();
    traceDebug("broadcast listener normal shutdown");
  }
  
  /**
   * stops the thread waiting on the broadcast socket
   */
  void stop() { watcherrunning=false; }

  private void sendBroadcast(String data, String net) throws IvyException {
    try {
      // simple trick to expand to 255 (Alex Bustico)
      net += ".255.255.255";
      RE exp = new RE( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*");
      net = exp.substitute( net , "$1" );
    } catch ( REException e ){
      throw new IvyException("Bad broascat addr");
    }
    try {
      InetAddress iaddr = InetAddress.getByName(net);
      domainaddrList.addElement(iaddr);
      DatagramPacket packet = new DatagramPacket(
		      data.getBytes(),
		      data.length(),
		      iaddr,
		      broadcast.getLocalPort() );
      broadcast.send(packet);
    } catch ( UnknownHostException e ) {
      throw new IvyException("Broadcast sent on unknown network "+
        e.getMessage());
    } catch ( IOException e ) {
      throw new IvyException("Broadcast error " + e.getMessage() );
    }
  }

  void start(String domain) throws IvyException {
    String domainaddr;
    // parse Domain to get port number
    int port;
    int sep_index = domain.lastIndexOf( ":" );
    if ( sep_index == -1 ) {
      port = bus.DEFAULT_PORT;
      domainaddr = domain;
    } else { 
      port = Integer.parseInt( domain.substring( sep_index +1 ));
      domainaddr = domain.substring(0,sep_index);
    }
    // Handling of multicast address
    try {
      InetAddress group = InetAddress.getByName(domainaddr);
      if (group.isMulticastAddress()) {
        isMulticastAddress = true;
        broadcast = new MulticastSocket(port ); // create the UDP socket
	((MulticastSocket)broadcast).joinGroup(group);
      } else {
        broadcast = new MulticastSocket(port ); // create the UDP socket
      }
    } catch ( IOException e ) {
      throw new IvyException("IvyWatcher I/O error" + e );
    } 
    // starts a Thread listening on the socket
    watcherrunning=true;
    broadcastListener = new Thread(this);
    broadcastListener.start();
    // notifies our arrival on each domain: protocol version + port
    String hello = bus.PROCOCOLVERSION + " " + bus.applicationPort + "\n";
    StringTokenizer st = new StringTokenizer(domainaddr," \t:,");
    while  ( st.hasMoreTokens()) { sendBroadcast( hello, st.nextToken() ); }
  }

  private boolean isInDomain( InetAddress host ){
    byte rem_addr[] = host.getAddress();
    for ( int i = 0 ; i < domainaddrList.size(); i++ ) {
      byte addr[] = ((InetAddress)domainaddrList.elementAt(i)).getAddress();
      int j ;
      for (  j = 0 ; j < 4 ; j++  )
        if ( (addr[j] != -1) && (addr[j] != rem_addr[j]) ) break;
      if ( j == 4 ) {
        traceDebug( "host " + host + " is in domain\n" );
	return true;
      }
    }
    traceDebug( "host " + host + " Not in domain\n" );
    return false;
  }

  private void traceDebug(String s){
    if (debug) System.out.println("-->ivywatcher<-- "+s);
  }
  

} // class IvyWatcher
/* EOF */