aboutsummaryrefslogtreecommitdiff
path: root/src/IvyWatcher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/IvyWatcher.java')
-rwxr-xr-xsrc/IvyWatcher.java203
1 files changed, 91 insertions, 112 deletions
diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java
index 14fee3d..004c704 100755
--- a/src/IvyWatcher.java
+++ b/src/IvyWatcher.java
@@ -22,6 +22,9 @@ import java.util.Enumeration;
* thing.
*
* CHANGELOG:
+ * 1.0.12:
+ * - setSoTimeout on socket
+ * - the broadcast reader Thread goes volatile
* 1.0.10:
* - isInDomain() is wrong in multicast. I've removed it
* - there was a remanence effect in the datagrampacket buffer. I clean it up after each message
@@ -33,146 +36,122 @@ import java.util.Enumeration;
class IvyWatcher implements Runnable {
private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
- //private Vector domainaddrList;
- private boolean watcherrunning = false;
private boolean isMulticastAddress = false;
- private Vector broadcastListener ;
private Ivy bus; /* master bus controler */
private DatagramSocket broadcast; /* supervision socket */
- // it can also be a MulticastSocket, which inherits from the previous
+ private String domainaddr;
+ private int port;
+ private volatile Thread listenThread;
+ private InetAddress group;
+
/**
- * creates an Ivy watcher.
+ * creates an Ivy watcher
* @param bus the bus
+ * @param net the domain
*/
- IvyWatcher(Ivy bus) throws IvyException {
+ IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException {
this.bus = bus;
- //domainaddrList = new Vector();
+ this.domainaddr=domainaddr;
+ this.port=port;
+ listenThread = new Thread(this);
+ // create the MulticastSocket
+ try {
+ group = InetAddress.getByName(domainaddr);
+ broadcast = new MulticastSocket(port);
+ if (group.isMulticastAddress()) {
+ isMulticastAddress = true;
+ ((MulticastSocket)broadcast).joinGroup(group);
+ }
+ broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH);
+ } catch ( IOException e ) {
+ throw new IvyException("IvyWatcher I/O error" + e );
+ }
}
/**
- * the behaviour of the thread watching the UDP socket.
- * this thread will stop either when the bus stops or when the
- * watcherrunning will be set to false
- *
- * TODO: better handling of exceptions, because we juste System.err.println
- * here, run cannot throw IvyException ...
+ * the behaviour of each thread watching the UDP socket.
*/
- public void run() {
+ public void run() {
+ Thread thisThread=Thread.currentThread();
+ traceDebug("beginning of a watcher Thread");
byte buf[] = new byte[256];
DatagramPacket packet=new DatagramPacket(buf, 256);
- int port;
- traceDebug("IvyWatcher waiting for Broadcast");
- while( watcherrunning && bus.ivyRunning ) try {
- broadcast.receive(packet);
- String msg = new String(packet.getData()) ;
- // clean up the buffer after each message
- for (int i=0;i<buf.length;i++) { buf[i]=0; }
- InetAddress remotehost = packet.getAddress();
- traceDebug("BUSWATCHER Receive Broadcast from "+
- remotehost.getHostName()+":"+packet.getPort());
- // we used to check if remoteaddr is in our broadcast domain list otherwise we
- // ignore the broadcast
- // if ( !isInDomain( remotehost ) ) continue;
- StringTokenizer st = new StringTokenizer(msg);
- if ( !st.hasMoreTokens()) {
- System.err.println("Bad format "+msg);
- continue;
- }
- int version = Integer.parseInt( st.nextToken() );
- if ( version != bus.PROCOCOLVERSION ) {
- System.err.println("Ignoring bad protocol version broadcast");
- continue;
- }
- if ( ! st.hasMoreTokens()) {
- System.err.println("Bad format "+msg);
- continue;
- }
- port = Integer.parseInt( st.nextToken() );
- if ( (bus.applicationPort == port) ) continue;
- traceDebug("BUSWATCHER Broadcast de "
- +packet.getAddress().getHostName()
- +":"+packet.getPort()+" port "+port+" version "+version);
+ try {
+ while( listenThread==thisThread ) {
+ int port;
try {
- Socket socket = new Socket( remotehost, port );
- bus.addClient(socket,false);
- } catch ( UnknownHostException e ) {
- System.err.println("Unkonwn host "+remotehost + e.getMessage());
- } catch ( IOException e) {
- System.err.println("can't connect to "+remotehost+" port "+
- port+e.getMessage());
+ broadcast.receive(packet);
+ String msg = new String(packet.getData()) ;
+ for (int i=0;i<buf.length;i++) { buf[i]=0; } // clean up the buffer after each message
+ InetAddress remotehost = packet.getAddress();
+ traceDebug("BUSWATCHER Receive Broadcast from "+ remotehost.getHostName()+":"+packet.getPort());
+ // TODO if ( !isInDomain( remotehost ) ) continue;
+ // TODO get rid of the StringTokenizer ?
+ StringTokenizer st = new StringTokenizer(msg);
+ if ( !st.hasMoreTokens()) {
+ System.err.println("Bad format "+msg);
+ continue;
+ }
+ int version = Integer.parseInt( st.nextToken() );
+ if ( version != bus.PROCOCOLVERSION ) {
+ System.err.println("Ignoring bad protocol version broadcast");
+ continue;
+ }
+ if ( ! st.hasMoreTokens()) {
+ System.err.println("Bad format "+msg);
+ continue;
+ }
+ port = Integer.parseInt( st.nextToken() );
+ if ( (bus.applicationPort == port) ) continue;
+ traceDebug("BUSWATCHER Broadcast de "
+ +packet.getAddress().getHostName()
+ +":"+packet.getPort()+" port "+port+" version "+version);
+ try {
+ Socket socket = new Socket( remotehost, port );
+ bus.addClient(socket,false);
+ } catch ( UnknownHostException e ) {
+ System.err.println("Unkonwn host "+remotehost + e.getMessage());
+ } catch ( IOException e) {
+ System.err.println("can't connect to "+remotehost+" port "+
+ port+e.getMessage());
+ }
+ } catch (InterruptedIOException jii ){
+ if (thisThread!=listenThread) { break ;}
}
- } catch (java.io.InterruptedIOException jii ){
- if (!watcherrunning) break;
- System.out.println("DEBUG IvyClient: I have been interrupted");
- } catch (java.io.IOException ioe ){
- System.err.println("IvyWatcher IOException "+ ioe.getMessage() );
+ } // while
+ } catch (java.net.SocketException se ){
+ if (thisThread==listenThread) { se.printStackTrace(); }
+ } catch (IOException ioe ){
+ ioe.printStackTrace();
}
- stop();
- } // while
-
+ traceDebug("end of a watcher thread");
+ }
+
/**
* stops the thread waiting on the broadcast socket
*/
void stop() {
- traceDebug("broadcast listener normal shutdown");
- watcherrunning=false;
- for (Enumeration e = broadcastListener.elements();e.hasMoreElements();) {
- Thread t = (Thread) e.nextElement();
- t.interrupt();
- }
+ traceDebug("begining stopping an IvyWatcher");
+ Thread t = listenThread;
+ listenThread=null;
+ broadcast.close();
+ t.interrupt();
+ traceDebug("ending stopping an IvyWatcher");
}
- private static void sendBroadcast(String data, String domain, int port) throws IvyException {
- MulticastSocket send ;
+ private void sendBroadcast(String data, String domain, int port) throws IvyException {
try {
- InetAddress group = InetAddress.getByName(domain);
- send = new MulticastSocket(port);
- if (group.isMulticastAddress()) { ((MulticastSocket)send).joinGroup(group); }
DatagramPacket packet = new DatagramPacket( data.getBytes(), data.length(), group, port );
- send.send(packet);
- } catch ( UnknownHostException e ) {
- throw new IvyException("Broadcast sent on unknown network "+
- e.getMessage());
+ broadcast.send(packet);
} catch ( IOException e ) {
throw new IvyException("Broadcast error " + e.getMessage() );
}
}
- void start(String net) throws IvyException {
+ void start() throws IvyException {
String hello = bus.PROCOCOLVERSION + " " + bus.applicationPort + "\n";
- StringTokenizer st = new StringTokenizer(net,",");
- broadcastListener = new Vector();
- while ( st.hasMoreTokens()) {
- String s = st.nextToken() ;
- String domainaddr=getDomain(s);
- int port=getPort(s);
- // System.out.println("Domaine: "+domainaddr+" : "+port);
- try {
- InetAddress group = InetAddress.getByName(domainaddr);
- broadcast = new MulticastSocket(port);
- // Handling of multicast address
- if (group.isMulticastAddress()) {
- isMulticastAddress = true;
- ((MulticastSocket)broadcast).joinGroup(group);
- }
- } catch ( IOException e ) {
- throw new IvyException("IvyWatcher I/O error" + e );
- }
- /*
- try {
- broadcast.setSoTimeout(100);
- } catch ( java.net.SocketException jns ) {
- throw new IvyException("IvyWatcher setSoTimeout error" + jns.getMessage() );
- }
- */
- // starts a Thread listening on the socket
- watcherrunning=true;
- Thread t = new Thread(this);
- broadcastListener.addElement(t);
- t.start();
- // notifies our arrival on each domain: protocol version + port
- sendBroadcast(hello,domainaddr,port);
- }
+ listenThread.start();
+ sendBroadcast(hello,domainaddr,port); // notifies our arrival on each domain: protocol version + port
}
/*
@@ -195,7 +174,7 @@ class IvyWatcher implements Runnable {
}
*/
- private static String getDomain(String net) {
+ static String getDomain(String net) {
int sep_index = net.lastIndexOf( ":" );
if ( sep_index != -1 ) { net = net.substring(0,sep_index); }
try {
@@ -210,7 +189,7 @@ class IvyWatcher implements Runnable {
return net;
}
- private static int getPort(String net) {
+ static int getPort(String net) {
int sep_index = net.lastIndexOf( ":" );
int port= ( sep_index == -1 ) ? Ivy.DEFAULT_PORT :Integer.parseInt( net.substring( sep_index +1 ));
// System.out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port);