aboutsummaryrefslogtreecommitdiff
path: root/src/IvyWatcher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/IvyWatcher.java')
-rwxr-xr-xsrc/IvyWatcher.java231
1 files changed, 125 insertions, 106 deletions
diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java
index 1dcaab0..c9a661f 100755
--- a/src/IvyWatcher.java
+++ b/src/IvyWatcher.java
@@ -91,13 +91,15 @@ import java.util.Map;
import java.util.HashMap;
import java.util.regex.*;
-class IvyWatcher extends Thread {
+class IvyWatcher implements Runnable {
private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
private boolean alreadyIgnored = false;
private Ivy bus; /* master bus controler */
private DatagramSocket broadcast; /* supervision socket */
private int port;
- private volatile Thread listenThread = null;
+ private String domainaddr;
+ private volatile boolean keeprunning = false;
+ private Thread listenThread = null;
private InetAddress group;
// FIXME should not be static ? (findbugs)
private static int serial=0;
@@ -113,11 +115,10 @@ class IvyWatcher extends Thread {
*/
IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException {
this.bus = bus;
- this.port=port;
+ this.port = port;
+ this.domainaddr = domainaddr;
busWatcherId=bus.getWatcherId();
- listenThread = new Thread(this);
- listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port);
- listenThread.setDaemon(true);
+ keeprunning = true ;
// create the MulticastSocket
try {
group = InetAddress.getByName(domainaddr);
@@ -135,98 +136,30 @@ class IvyWatcher extends Thread {
*/
public void run() {
traceDebug("Thread started"); // THREADDEBUG
- Thread thisThread=Thread.currentThread();
+ listenThread=Thread.currentThread();
+ listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port);
+ // listenThread.setDaemon(true); // not possible in the treadpool ? FIXME
traceDebug("beginning of a watcher Thread");
- InetAddress remotehost=null;
try {
- int remotePort=0;
- while( listenThread==thisThread ) {
+ while( keeprunning ) {
try {
byte buf[] = new byte[256];
DatagramPacket packet=new DatagramPacket(buf,buf.length);
broadcast.receive(packet);
- bus.setStarting(true); // someone's coming
- if (listenThread!=thisThread) break; // I was summoned to leave during the receive
+ bus.pushThread("UDP packet received");
+ if ( !keeprunning ) break; // I was summoned to leave during the receive
String msg = new String(buf,0,packet.getLength());
- String remotehostname=null;
- try {
- remotehost = packet.getAddress();
- remotehostname = remotehost.getHostName();
- Matcher m = recoucou.matcher(msg);
- if (!m.matches()) {
- System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort());
- continue;
- }
- int version = Integer.parseInt(m.group(1));
- if ( version < Protocol.PROTOCOLMINIMUM ) {
- System.err.println("Ignoring bad format broadcast from "+
- remotehostname+":"+packet.getPort()
- +" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum");
- continue;
- }
- remotePort = Integer.parseInt(m.group(2));
- if (bus.getAppPort()==remotePort) { // if (same port number)
- if (busWatcherId!=null) {
- traceDebug("there's an appId: "+m.group(3));
- String otherId=m.group(3);
- String otherName=m.group(4);
- if (busWatcherId.compareTo(otherId)==0) {
- // same port #, same bus Id, It's me, I'm outta here
- traceDebug("ignoring my own broadcast");
- bus.setStarting(false);
- continue;
- } else {
- // same port #, different bus Id, it's another agent
- // implementing the Oh Soooo Cool watcherId undocumented
- // unprotocolar Ivy add on
- traceDebug("accepting a broadcast from a same port by "+otherName);
- }
- } else {
- // there's no watcherId in the broacast. I fall back to a
- // crude strategy: I ignore the first broadcast with the same
- // port number, and accept the following ones
- if (alreadyIgnored) {
- traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort()
- +" on my port number ("+remotePort+"), it's probably someone else");
- } else {
- alreadyIgnored=true;
- traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort()
- +" on my port number ("+remotePort+"), it's probably me");
- continue;
- }
- }
- } // end if (same port #)
- traceDebug("broadcast accepted from " +remotehostname
- +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version);
- if (!alreadyBroadcasted(remotehost.toString(),remotePort)) {
- traceDebug("no known agent originating from " + remotehost + ":" + remotePort);
- try {
- Socket s = new Socket(remotehost,remotePort);
- s.setReceiveBufferSize(bus.getBufferSize());
- //System.out.println("MY DEBUG - buffer size="+s.getReceiveBufferSize());
- s.setTcpNoDelay(true);
- bus.createIvyClient(s,remotePort,false);
- } catch ( java.net.ConnectException jnc ) {
- traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus");
- }
- } else {
- traceDebug("there is already a request originating from " + remotehost + ":" + remotePort);
- }
- } catch (NumberFormatException nfe) {
- System.err.println("Ignoring bad format broadcast from "+remotehostname);
- continue;
- } catch ( UnknownHostException e ) {
- System.err.println("Unkonwn host "+remotehost +","+e.getMessage());
- } catch ( IOException e) {
- System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage());
- e.printStackTrace();
- }
+ boolean b = (parsePacket(packet, msg));
+ bus.popThread("UDP packet processed");
+ if (b) continue; else break;
} catch (InterruptedIOException jii ){
- if (thisThread!=listenThread) { break ;}
+ // System.out.println("WTF UDP packet interrupted");
+ // another thread took place, not important
+ if ( !keeprunning ) { break ;}
}
} // while
} catch (java.net.SocketException se ){
- if (thisThread==listenThread) {
+ if ( keeprunning ) {
traceDebug("socket exception, continuing anyway on other Ivy domains "+se);
}
} catch (IOException ioe ){
@@ -235,9 +168,99 @@ class IvyWatcher extends Thread {
traceDebug("Thread stopped"); // THREADDEBUG
}
- @Override public void interrupt(){
- super.interrupt();
- broadcast.close();
+ /**
+ * parses the content of a received packet.
+ * @return true if the watcher can keep looping (continue), false if there's
+ * a big problem (break).
+ *
+ * first checks if the message structure is correct
+ * then checks if it's our own broadcasts
+ * then checks if there's already a concurrent connexion in progress
+ * if it's ok, creates a new IvyClient
+ *
+ */
+ private boolean parsePacket(DatagramPacket packet, String msg) {
+ // System.out.println("TODO Y - parse In");
+ int remotePort=0;
+ InetAddress remotehost = null;
+ String remotehostname = null;
+ try {
+ remotehost=packet.getAddress();
+ remotehostname=remotehost.getHostName();
+ Matcher m = recoucou.matcher(msg);
+ // is it a correct broadcast packet ?
+ if (!m.matches()) {
+ System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort());
+ return true;
+ }
+ // is it the correct protocol version ?
+ int version = Integer.parseInt(m.group(1));
+ if ( version < Protocol.PROTOCOLMINIMUM ) {
+ System.err.println("Ignoring bad format broadcast from "+
+ remotehostname+":"+packet.getPort()
+ +" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum");
+ return true;
+ }
+ // is it my own broadcast ?
+ remotePort = Integer.parseInt(m.group(2));
+ if (bus.getAppPort()==remotePort) { // if (same port number)
+ if (busWatcherId!=null) {
+ traceDebug("there's an appId: "+m.group(3));
+ String otherId=m.group(3);
+ String otherName=m.group(4);
+ if (busWatcherId.compareTo(otherId)==0) {
+ // same port #, same bus Id, It's me, I'm outta here
+ traceDebug("ignoring my own broadcast");
+ return true;
+ } else {
+ // same port #, different bus Id, it's another agent
+ // implementing the Oh Soooo Cool watcherId undocumented
+ // unprotocolar Ivy add on
+ traceDebug("accepting a broadcast from a same port by "+otherName);
+ }
+ } else {
+ // there's no watcherId in the broacast. I fall back to a
+ // crude strategy: I ignore the first broadcast with the same
+ // port number, and accept the following ones
+ if (alreadyIgnored) {
+ traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort()
+ +" on my port number ("+remotePort+"), it's probably someone else");
+ } else {
+ alreadyIgnored=true;
+ traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort()
+ +" on my port number ("+remotePort+"), it's probably me");
+ return true;
+ }
+ }
+ } // end if (same port #)
+
+ // it's definitively not me, let's shake hands !
+ traceDebug("broadcast accepted from " +remotehostname
+ +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version);
+
+ if (!alreadyBroadcasted(remotehost.toString(),remotePort)) {
+ traceDebug("no known agent originating from " + remotehost + ":" + remotePort);
+ try {
+ Socket s = new Socket(remotehost,remotePort);
+ s.setReceiveBufferSize(bus.getBufferSize());
+ s.setTcpNoDelay(true);
+ if (!bus.createIvyClient(s,remotePort,false)) return false ;
+ } catch ( java.net.ConnectException jnc ) {
+ traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus");
+ }
+ } else {
+ traceDebug("there is already a request originating from " + remotehost + ":" + remotePort);
+ }
+ } catch (NumberFormatException nfe) {
+ System.err.println("Ignoring bad format broadcast from "+remotehostname);
+ return true;
+ } catch ( UnknownHostException e ) {
+ System.err.println("Unkonwn host "+remotehost +","+e.getMessage());
+ } catch ( IOException e) {
+ System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage());
+ e.printStackTrace();
+ }
+ return true;
}
/**
@@ -245,13 +268,9 @@ class IvyWatcher extends Thread {
*/
synchronized void doStop() {
traceDebug("begining stopping");
- bus.setStarting(true);
- Thread t = listenThread;
- listenThread=null;
- interrupt();
+ keeprunning = false;
+ if (listenThread != null) listenThread.interrupt();
broadcast.close();
- if (t!=null) { t.interrupt(); } // it might not even have been created
- bus.setStarting(false);
traceDebug("stopped");
}
@@ -261,16 +280,18 @@ class IvyWatcher extends Thread {
DatagramPacket packet;
String data;
Ivy bus;
+
public PacketSender(String data, Ivy b) {
this.data=data;
bus = b;
packet=new DatagramPacket(data.getBytes(),data.length(),group,port);
- Thread t = new Thread((PacketSender.this));
- t.setName("Ivy Packet sender");
- t.start();
+ bus.getPool().execute( PacketSender.this );
}
+
public void run() {
+ bus.pushThread("packet sender started");
traceDebug("PacketSender thread started"); // THREADDEBUG
+ Thread.currentThread().setName("Ivy Packet sender");
try {
broadcast.send(packet);
} catch (InterruptedIOException e) {
@@ -279,15 +300,13 @@ class IvyWatcher extends Thread {
e.printStackTrace();
traceDebug("IO interrupted during the broadcast. Do nothing");
} catch ( IOException e ) {
- if (listenThread!=null) {
- System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway");
- // cannot throw new IvyException in a run ...
- e.printStackTrace();
- }
+ System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway");
+ // cannot throw new IvyException in a run ...
+ e.printStackTrace();
}
try { Thread.sleep(100); } catch (InterruptedException ie ){ }
- bus.setStarting(false); // one of the senders has finished its work, plus extra time
traceDebug("PacketSender thread stopped"); // THREADDEBUG
+ bus.popThread("packet sender finished"); // one of the senders has finished its work, plus extra time
}
}
@@ -295,8 +314,8 @@ class IvyWatcher extends Thread {
// String hello = Ivy.PROTOCOLVERSION + " " + bus.getAppPort() + "\n";
String hello = Protocol.PROTOCOLVERSION + " " + bus.getAppPort() + " "+busWatcherId+" "+bus.getSelfIvyClient().getApplicationName()+"\n";
if (broadcast==null) throw new IvyException("IvyWatcher PacketSender null broadcast address");
+ bus.getPool().execute(this);
new PacketSender(hello,bus); // notifies our arrival on each domain: protocol version + port
- listenThread.start();
}
/*