/** * IvyWatcher, A private Class for the Ivy rendezvous * * @author Yannick Jestin * @author Francois-Regis Colin * @author http://www.tls.cena.fr/products/ivy/ * * (C) CENA * * right now, the rendez vous is either an UDP socket or a TCP multicast. * The watcher will answer to * each peer advertising its arrival on the bus. The intrinsics of Unix are so * that the broadcast is done using the same socket, which is not a good * thing. * * CHANGELOG: * 1.2.14 * - tries to fix a lock on accept() by becoming a Thread instead of * runnalbe (see tests/test2) * - removed unread field (domainaddr, e.g.) * - throws RuntimeException instead of System.exit(), allows code reuse * - switch from gnu regexp (deprecated) to the built in java regexp * - add generic types to declarations * 1.2.13: * - TCP_NO_DELAY to disable Nagle's algorithm * - private static ?! pour already present ... * 1.2.9: * - added an application Id in the UDP broadcast. It seems to be ok with * most implementations ( VERSION PORT APPID APPNAME \n) is compatible with (VERSION * APPID). If I receive a broadcast with with the same TCP port number, * I ignore the first and accept the new ones * 1.2.8: * - alreadyBroadcasted was static, thus Ivy Agents within the same JVM used * to share the list of agents already connected. A nasty bug. * 1.2.7: * - better handling of multiple connexions from the same remote agent when * there are different broadcast addresses ( introduced the alreadyBroadcasted * function ) * 1.2.6: * - IOException now goes silent when we asked the bus to stop() * - use a new buffer for each Datagram received, to prevent an old bug * 1.2.5: * - getDomain now sends IvyException for malformed broadcast addresses * - uses apache jakarta-regexp instead of gnu-regexp * - throws an IvyException if the broadcast domain cannot be resolved * 1.2.4: * - sends the broadcast before listening to the other's broadcasts. * I can't wait for all the broadcast to be sent before starting the listen * mode, otherwise another agent behaving likewise could be started * meanwhile, and one would not "see" each other. * - (REMOVED) allows the connexion from a remote host with the same port number * it's too complicated to know if the packet is from ourselves... * - deals with the protocol errors in a more efficient way. The goal is not * to loose our connectivity because of a rude agent. * fixes Bug J005 (YJ + JPI) * 1.2.3: * - the packet sending is done in its own thread from now on (PacketSender) * I don't care stopping it, since it can't be blocked. * - checks whether I have been interrupted just after the receive (start() * then stop() immediately). * 1.2.1: * - can be Interrupted during the broadcast Send. I catch the * and do nothing with it. InterruptedIOException * - changed the fill character from 0 to 10, in order to prevent a nasty bug * on Windows XP machines * - fixed a NullPointerException while trying to stop a Thread before having * created it. * 1.0.12: * - setSoTimeout on socket * - the broadcast reader Thread goes volatile * 1.0.10: * - isInDomain() is wrong in multicast. I've removed it * - there was a remanence effect in the datagrampacket buffer. I clean it up after each message * - cleaned up the getDomain() and getPort() code * - close message sends an interruption on all threads for a clean exit * - removed the timeout bug eating all the CPU resources * - now handles a Vector of broadcast listeners */ package fr.dgac.ivy ; import java.lang.Thread; import java.net.*; import java.io.*; import java.util.Hashtable; import java.util.regex.*; class IvyWatcher extends Thread { private static boolean debug = (System.getProperty("IVY_DEBUG")!=null); private boolean alreadyIgnored = false; private Ivy bus; /* master bus controler */ private DatagramSocket broadcast; /* supervision socket */ private int port; private volatile Thread listenThread = null; private InetAddress group; private static int serial=0; private int myserial=serial++; private String busWatcherId = null; private static Pattern recoucou, numbersPoint, exp; /** * creates an Ivy watcher * @param bus the bus * @param net the domain */ IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException { this.bus = bus; this.port=port; busWatcherId=bus.getWatcherId(); listenThread = new Thread(this); // create the MulticastSocket try { group = InetAddress.getByName(domainaddr); broadcast = new MulticastSocket(port); if (group.isMulticastAddress()) ((MulticastSocket)broadcast).joinGroup(group); broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH); } catch ( UnknownHostException uhe ) { } catch ( IOException e ) { throw new IvyException("IvyWatcher I/O error" + e ); } } /** * the behaviour of each thread watching the UDP socket. */ public void run() { traceDebug("Thread started"); // THREADDEBUG Thread thisThread=Thread.currentThread(); traceDebug("beginning of a watcher Thread"); InetAddress remotehost=null; try { int remotePort=0; while( listenThread==thisThread ) { try { byte buf[] = new byte[256]; DatagramPacket packet=new DatagramPacket(buf,buf.length); broadcast.receive(packet); bus.setStarting(true); // someone's coming if (listenThread!=thisThread) break; // I was summoned to leave during the receive String msg = new String(buf,0,packet.getLength()); String remotehostname=null; try { remotehost = packet.getAddress(); remotehostname = remotehost.getHostName(); Matcher m = recoucou.matcher(msg); if (!m.matches()) { System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort()); continue; } int version = Integer.parseInt(m.group(1)); if ( version < Ivy.PROTOCOLMINIMUM ) { System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort() +" protocol version "+remotehost+" we need "+Ivy.PROTOCOLMINIMUM+" minimum"); continue; } remotePort = Integer.parseInt(m.group(2)); if (bus.getAppPort()==remotePort) { // if (same port number) if (busWatcherId!=null) { traceDebug("there's an appId: "+m.group(3)); String otherId=m.group(3); String otherName=m.group(4); if (busWatcherId.compareTo(otherId)==0) { // same port #, same bus Id, It's me, I'm outta here traceDebug("ignoring my own broadcast"); bus.setStarting(false); continue; } else { // same port #, different bus Id, it's another agent // implementing the Oh Soooo Cool watcherId undocumented // unprotocolar Ivy add on traceDebug("accepting a broadcast from a same port by "+otherName); } } else { // there's no watcherId in the broacast. I fall back to a // crude strategy: I ignore the first broadcast with the same // port number, and accept the following ones if (alreadyIgnored) { traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort() +" on my port number ("+remotePort+"), it's probably someone else"); } else { alreadyIgnored=true; traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort() +" on my port number ("+remotePort+"), it's probably me"); continue; } } } // end if (same port #) traceDebug("broadcast accepted from " +remotehostname +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version); if (!alreadyBroadcasted(remotehost.toString(),remotePort)) { traceDebug("no known agent originating from " + remotehost + ":" + remotePort); try { Socket s = new Socket(remotehost,remotePort); s.setTcpNoDelay(true); bus.createIvyClient(s,remotePort,false); } catch ( java.net.ConnectException jnc ) { traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus"); } } else { traceDebug("there is already a request originating from " + remotehost + ":" + remotePort); } } catch (NumberFormatException nfe) { System.err.println("Ignoring bad format broadcast from "+remotehostname); continue; } catch ( UnknownHostException e ) { System.err.println("Unkonwn host "+remotehost +","+e.getMessage()); } catch ( IOException e) { System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage()); e.printStackTrace(); } } catch (InterruptedIOException jii ){ if (thisThread!=listenThread) { break ;} } } // while } catch (java.net.SocketException se ){ if (thisThread==listenThread) { traceDebug("socket exception, continuing anyway on other Ivy domains "+se); } } catch (IOException ioe ){ System.out.println("IO Exception, continuing anyway on other Ivy domains "+ioe); } traceDebug("Thread stopped"); // THREADDEBUG } @Override public void interrupt(){ super.interrupt(); broadcast.close(); } /** * stops the thread waiting on the broadcast socket */ synchronized void doStop() { traceDebug("begining stopping"); bus.setStarting(true); Thread t = listenThread; listenThread=null; interrupt(); broadcast.close(); if (t!=null) { t.interrupt(); } // it might not even have been created bus.setStarting(false); traceDebug("stopped"); } private class PacketSender implements Runnable { // do I need multiple packetsenders ? Well, there is one PacketSender per // domain. DatagramPacket packet; String data; Ivy bus; public PacketSender(String data, Ivy b) { this.data=data; bus = b; packet=new DatagramPacket(data.getBytes(),data.length(),group,port); new Thread((PacketSender.this)).start(); } public void run() { traceDebug("PacketSender thread started"); // THREADDEBUG try { broadcast.send(packet); } catch (InterruptedIOException e) { // somebody interrupts my IO. Thread, do nothing. System.out.println(e.bytesTransferred+" bytes transferred anyway, out of " + data.length()); e.printStackTrace(); traceDebug("IO interrupted during the broadcast. Do nothing"); } catch ( IOException e ) { if (listenThread!=null) { System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway"); // cannot throw new IvyException in a run ... e.printStackTrace(); } } try { Thread.sleep(100); } catch (InterruptedException ie ){ } bus.setStarting(false); // one of the senders has finished its work, plus extra time traceDebug("PacketSender thread stopped"); // THREADDEBUG } } synchronized void doStart() throws IvyException { // String hello = Ivy.PROTOCOLVERSION + " " + bus.getAppPort() + "\n"; String hello = Ivy.PROTOCOLVERSION + " " + bus.getAppPort() + " "+busWatcherId+" "+bus.getSelfIvyClient().getApplicationName()+"\n"; if (broadcast==null) throw new IvyException("IvyWatcher PacketSender null broadcast address"); new PacketSender(hello,bus); // notifies our arrival on each domain: protocol version + port listenThread.start(); } /* * since 1.2.7 pre .... * went local instead of static ! fixed a nasty bug in 1.2.8 * checks if there is already a broadcast received from the same address * on the same port * * regoes static ... */ //private static Hashtable alreadySocks=new Hashtable(); private Hashtable alreadySocks=new Hashtable(); private synchronized boolean alreadyBroadcasted(String s,int port) { // System.out.println("DEBUUUUUUUG " + s+ ":" + port); if (s==null) return false; Integer i = alreadySocks.get(s); if (((i!=null)&&(i.compareTo(port))==0)) return true; alreadySocks.put(s,port); return false; } /* private boolean isInDomain( InetAddress host ){ return true; // TODO check if this function is useful. for now, it always returns true // deprecated since we use Multicast. How to check when we are in UDP // broadcast ? // byte rem_addr[] = host.getAddress(); for ( int i = 0 ; i < domainaddrList.size(); i++ ) { byte addr[] = ((InetAddress)domainaddrList.elementAt(i)).getAddress(); int j ; for ( j = 0 ; j < 4 ; j++ ) if ( (addr[j] != -1) && (addr[j] != rem_addr[j]) ) break; if ( j == 4 ) { traceDebug( "host " + host + " is in domain\n" ); return true; } } traceDebug( "host " + host + " Not in domain\n" ); return false; } */ static String getDomain(String net) throws IvyException { // System.out.println("debug: net=[" + net+ "]"); int sep_index = net.lastIndexOf( ":" ); if ( sep_index != -1 ) { net = net.substring(0,sep_index); } try { Matcher m = numbersPoint.matcher(net); if (!m.matches()) { // traceDebug("should only have numbers and point ? I won't add anything... " + net); return "127.255.255.255"; // return net; } net += ".255.255.255"; Matcher mm= exp.matcher(net); if (!mm.matches()) { System.out.println("Bad broascat addr " + net); throw new IvyException("bad broadcast addr"); } net=mm.group(1); } catch ( PatternSyntaxException e ){ e.printStackTrace(); throw new RuntimeException(); } //System.out.println("next domain: "+net); return net; } static int getPort(String net) { // returns 0 if no port is set int sep_index = net.lastIndexOf( ":" ); int port= ( sep_index == -1 ) ? 0 :Integer.parseInt( net.substring( sep_index +1 )); // System.out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port); //System.out.println("next port: "+port); return port; } private void traceDebug(String s){ if (debug) System.out.println("-->IvyWatcher["+myserial+","+bus.getSerial()+"]<-- "+s); } static { try { numbersPoint = Pattern.compile("([0-9]|\\.)+"); recoucou = Pattern.compile("([0-9]+) ([0-9]+) ([^ ]*) (.*)",Pattern.DOTALL); exp = Pattern.compile( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*"); } catch (PatternSyntaxException res) { res.printStackTrace(); System.out.println("Regular Expression bug in Ivy source code ... bailing out"); throw new RuntimeException(); } } } // class IvyWatcher