diff options
Diffstat (limited to 'src/IvyWatcher.java')
-rwxr-xr-x | src/IvyWatcher.java | 231 |
1 files changed, 125 insertions, 106 deletions
diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java index 1dcaab0..c9a661f 100755 --- a/src/IvyWatcher.java +++ b/src/IvyWatcher.java @@ -91,13 +91,15 @@ import java.util.Map; import java.util.HashMap; import java.util.regex.*; -class IvyWatcher extends Thread { +class IvyWatcher implements Runnable { private static boolean debug = (System.getProperty("IVY_DEBUG")!=null); private boolean alreadyIgnored = false; private Ivy bus; /* master bus controler */ private DatagramSocket broadcast; /* supervision socket */ private int port; - private volatile Thread listenThread = null; + private String domainaddr; + private volatile boolean keeprunning = false; + private Thread listenThread = null; private InetAddress group; // FIXME should not be static ? (findbugs) private static int serial=0; @@ -113,11 +115,10 @@ class IvyWatcher extends Thread { */ IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException { this.bus = bus; - this.port=port; + this.port = port; + this.domainaddr = domainaddr; busWatcherId=bus.getWatcherId(); - listenThread = new Thread(this); - listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port); - listenThread.setDaemon(true); + keeprunning = true ; // create the MulticastSocket try { group = InetAddress.getByName(domainaddr); @@ -135,98 +136,30 @@ class IvyWatcher extends Thread { */ public void run() { traceDebug("Thread started"); // THREADDEBUG - Thread thisThread=Thread.currentThread(); + listenThread=Thread.currentThread(); + listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port); + // listenThread.setDaemon(true); // not possible in the treadpool ? FIXME traceDebug("beginning of a watcher Thread"); - InetAddress remotehost=null; try { - int remotePort=0; - while( listenThread==thisThread ) { + while( keeprunning ) { try { byte buf[] = new byte[256]; DatagramPacket packet=new DatagramPacket(buf,buf.length); broadcast.receive(packet); - bus.setStarting(true); // someone's coming - if (listenThread!=thisThread) break; // I was summoned to leave during the receive + bus.pushThread("UDP packet received"); + if ( !keeprunning ) break; // I was summoned to leave during the receive String msg = new String(buf,0,packet.getLength()); - String remotehostname=null; - try { - remotehost = packet.getAddress(); - remotehostname = remotehost.getHostName(); - Matcher m = recoucou.matcher(msg); - if (!m.matches()) { - System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort()); - continue; - } - int version = Integer.parseInt(m.group(1)); - if ( version < Protocol.PROTOCOLMINIMUM ) { - System.err.println("Ignoring bad format broadcast from "+ - remotehostname+":"+packet.getPort() - +" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum"); - continue; - } - remotePort = Integer.parseInt(m.group(2)); - if (bus.getAppPort()==remotePort) { // if (same port number) - if (busWatcherId!=null) { - traceDebug("there's an appId: "+m.group(3)); - String otherId=m.group(3); - String otherName=m.group(4); - if (busWatcherId.compareTo(otherId)==0) { - // same port #, same bus Id, It's me, I'm outta here - traceDebug("ignoring my own broadcast"); - bus.setStarting(false); - continue; - } else { - // same port #, different bus Id, it's another agent - // implementing the Oh Soooo Cool watcherId undocumented - // unprotocolar Ivy add on - traceDebug("accepting a broadcast from a same port by "+otherName); - } - } else { - // there's no watcherId in the broacast. I fall back to a - // crude strategy: I ignore the first broadcast with the same - // port number, and accept the following ones - if (alreadyIgnored) { - traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort() - +" on my port number ("+remotePort+"), it's probably someone else"); - } else { - alreadyIgnored=true; - traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort() - +" on my port number ("+remotePort+"), it's probably me"); - continue; - } - } - } // end if (same port #) - traceDebug("broadcast accepted from " +remotehostname - +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version); - if (!alreadyBroadcasted(remotehost.toString(),remotePort)) { - traceDebug("no known agent originating from " + remotehost + ":" + remotePort); - try { - Socket s = new Socket(remotehost,remotePort); - s.setReceiveBufferSize(bus.getBufferSize()); - //System.out.println("MY DEBUG - buffer size="+s.getReceiveBufferSize()); - s.setTcpNoDelay(true); - bus.createIvyClient(s,remotePort,false); - } catch ( java.net.ConnectException jnc ) { - traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus"); - } - } else { - traceDebug("there is already a request originating from " + remotehost + ":" + remotePort); - } - } catch (NumberFormatException nfe) { - System.err.println("Ignoring bad format broadcast from "+remotehostname); - continue; - } catch ( UnknownHostException e ) { - System.err.println("Unkonwn host "+remotehost +","+e.getMessage()); - } catch ( IOException e) { - System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage()); - e.printStackTrace(); - } + boolean b = (parsePacket(packet, msg)); + bus.popThread("UDP packet processed"); + if (b) continue; else break; } catch (InterruptedIOException jii ){ - if (thisThread!=listenThread) { break ;} + // System.out.println("WTF UDP packet interrupted"); + // another thread took place, not important + if ( !keeprunning ) { break ;} } } // while } catch (java.net.SocketException se ){ - if (thisThread==listenThread) { + if ( keeprunning ) { traceDebug("socket exception, continuing anyway on other Ivy domains "+se); } } catch (IOException ioe ){ @@ -235,9 +168,99 @@ class IvyWatcher extends Thread { traceDebug("Thread stopped"); // THREADDEBUG } - @Override public void interrupt(){ - super.interrupt(); - broadcast.close(); + /** + * parses the content of a received packet. + * @return true if the watcher can keep looping (continue), false if there's + * a big problem (break). + * + * first checks if the message structure is correct + * then checks if it's our own broadcasts + * then checks if there's already a concurrent connexion in progress + * if it's ok, creates a new IvyClient + * + */ + private boolean parsePacket(DatagramPacket packet, String msg) { + // System.out.println("TODO Y - parse In"); + int remotePort=0; + InetAddress remotehost = null; + String remotehostname = null; + try { + remotehost=packet.getAddress(); + remotehostname=remotehost.getHostName(); + Matcher m = recoucou.matcher(msg); + // is it a correct broadcast packet ? + if (!m.matches()) { + System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort()); + return true; + } + // is it the correct protocol version ? + int version = Integer.parseInt(m.group(1)); + if ( version < Protocol.PROTOCOLMINIMUM ) { + System.err.println("Ignoring bad format broadcast from "+ + remotehostname+":"+packet.getPort() + +" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum"); + return true; + } + // is it my own broadcast ? + remotePort = Integer.parseInt(m.group(2)); + if (bus.getAppPort()==remotePort) { // if (same port number) + if (busWatcherId!=null) { + traceDebug("there's an appId: "+m.group(3)); + String otherId=m.group(3); + String otherName=m.group(4); + if (busWatcherId.compareTo(otherId)==0) { + // same port #, same bus Id, It's me, I'm outta here + traceDebug("ignoring my own broadcast"); + return true; + } else { + // same port #, different bus Id, it's another agent + // implementing the Oh Soooo Cool watcherId undocumented + // unprotocolar Ivy add on + traceDebug("accepting a broadcast from a same port by "+otherName); + } + } else { + // there's no watcherId in the broacast. I fall back to a + // crude strategy: I ignore the first broadcast with the same + // port number, and accept the following ones + if (alreadyIgnored) { + traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort() + +" on my port number ("+remotePort+"), it's probably someone else"); + } else { + alreadyIgnored=true; + traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort() + +" on my port number ("+remotePort+"), it's probably me"); + return true; + } + } + } // end if (same port #) + + // it's definitively not me, let's shake hands ! + traceDebug("broadcast accepted from " +remotehostname + +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version); + + if (!alreadyBroadcasted(remotehost.toString(),remotePort)) { + traceDebug("no known agent originating from " + remotehost + ":" + remotePort); + try { + Socket s = new Socket(remotehost,remotePort); + s.setReceiveBufferSize(bus.getBufferSize()); + s.setTcpNoDelay(true); + if (!bus.createIvyClient(s,remotePort,false)) return false ; + } catch ( java.net.ConnectException jnc ) { + traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus"); + } + } else { + traceDebug("there is already a request originating from " + remotehost + ":" + remotePort); + } + } catch (NumberFormatException nfe) { + System.err.println("Ignoring bad format broadcast from "+remotehostname); + return true; + } catch ( UnknownHostException e ) { + System.err.println("Unkonwn host "+remotehost +","+e.getMessage()); + } catch ( IOException e) { + System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage()); + e.printStackTrace(); + } + return true; } /** @@ -245,13 +268,9 @@ class IvyWatcher extends Thread { */ synchronized void doStop() { traceDebug("begining stopping"); - bus.setStarting(true); - Thread t = listenThread; - listenThread=null; - interrupt(); + keeprunning = false; + if (listenThread != null) listenThread.interrupt(); broadcast.close(); - if (t!=null) { t.interrupt(); } // it might not even have been created - bus.setStarting(false); traceDebug("stopped"); } @@ -261,16 +280,18 @@ class IvyWatcher extends Thread { DatagramPacket packet; String data; Ivy bus; + public PacketSender(String data, Ivy b) { this.data=data; bus = b; packet=new DatagramPacket(data.getBytes(),data.length(),group,port); - Thread t = new Thread((PacketSender.this)); - t.setName("Ivy Packet sender"); - t.start(); + bus.getPool().execute( PacketSender.this ); } + public void run() { + bus.pushThread("packet sender started"); traceDebug("PacketSender thread started"); // THREADDEBUG + Thread.currentThread().setName("Ivy Packet sender"); try { broadcast.send(packet); } catch (InterruptedIOException e) { @@ -279,15 +300,13 @@ class IvyWatcher extends Thread { e.printStackTrace(); traceDebug("IO interrupted during the broadcast. Do nothing"); } catch ( IOException e ) { - if (listenThread!=null) { - System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway"); - // cannot throw new IvyException in a run ... - e.printStackTrace(); - } + System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway"); + // cannot throw new IvyException in a run ... + e.printStackTrace(); } try { Thread.sleep(100); } catch (InterruptedException ie ){ } - bus.setStarting(false); // one of the senders has finished its work, plus extra time traceDebug("PacketSender thread stopped"); // THREADDEBUG + bus.popThread("packet sender finished"); // one of the senders has finished its work, plus extra time } } @@ -295,8 +314,8 @@ class IvyWatcher extends Thread { // String hello = Ivy.PROTOCOLVERSION + " " + bus.getAppPort() + "\n"; String hello = Protocol.PROTOCOLVERSION + " " + bus.getAppPort() + " "+busWatcherId+" "+bus.getSelfIvyClient().getApplicationName()+"\n"; if (broadcast==null) throw new IvyException("IvyWatcher PacketSender null broadcast address"); + bus.getPool().execute(this); new PacketSender(hello,bus); // notifies our arrival on each domain: protocol version + port - listenThread.start(); } /* |