diff options
-rwxr-xr-x | src/Ivy.java | 150 | ||||
-rwxr-xr-x | src/IvyClient.java | 223 | ||||
-rwxr-xr-x | src/IvyWatcher.java | 78 | ||||
-rw-r--r-- | src/Makefile | 5 | ||||
-rw-r--r-- | src/Probe.java | 1 | ||||
-rw-r--r-- | src/SelfIvyClient.java | 17 |
6 files changed, 243 insertions, 231 deletions
diff --git a/src/Ivy.java b/src/Ivy.java index 3486765..196c3b4 100755 --- a/src/Ivy.java +++ b/src/Ivy.java @@ -14,11 +14,14 @@ * * CHANGELOG: * 1.2.6: + * - added serial numbers for traceDebug * - changed the semantic of -b a,b:port,c:otherport if no port is * specified for a, it take the port from the next one. If none is * specified, it takes DEFAULT_PORT * - no more asynchronous sending of message ( async bind is ok though ) * because the tests are sooooo unsuccessful + * - use addElement/removeElement instead of add/remove is registering + * threads ( jdk1.1 backward compatibility ) * 1.2.5: * - protection of newlines * 1.2.4: @@ -93,11 +96,11 @@ public class Ivy implements Runnable { public static final String libVersion ="1.2.6"; private boolean debug; - private static int clientSerial=0; /* an unique ID for each IvyClient */ private ServerSocket app; private Vector watchers; private volatile Thread serverThread; // to ensure quick communication of the end private Hashtable clients = new Hashtable(); + private Hashtable half = new Hashtable(); private Vector ivyApplicationListenerList = new Vector(); private Vector ivyBindListenerList = new Vector(); private Vector sendThreads = new Vector(); @@ -108,6 +111,8 @@ public class Ivy implements Runnable { private boolean doSendToSelf = false ; protected SelfIvyClient selfIvyClient ; public final static int TIMEOUTLENGTH = 3000; + private static int serial=0; + private int myserial=serial++; /** * Readies the structures for the software bus connexion. @@ -237,7 +242,7 @@ public class Ivy implements Runnable { public synchronized void stop() { if (stopped) return; stopped=true; - traceDebug("beginning stopping the bus"); + traceDebug("beginning stopping"); try { // stopping the serverThread Thread t=serverThread; @@ -255,7 +260,7 @@ public class Ivy implements Runnable { } catch (IOException e) { traceDebug("IOexception Stop "); } - traceDebug("the bus should have stopped so far"); + traceDebug("end stopping"); } /** @@ -291,51 +296,21 @@ public class Ivy implements Runnable { /** * Performs a pattern matching according to everyone's regexps, and sends - * the results to the relevant ivy agents sequentially - * - * @param message A String which will be compared to the regular - * expressions of the different clients - * @return the number of messages actually sent - */ - public int sendMsg(String message) throws IvyException { - return sendMsg(message,false); - } - - /* - * Performs a pattern matching according to everyone's regexps, and sends - * the results to the relevant ivy agents, using as many threads as needed. - * - * disappeared in 1.2.6 - * @since 1.2.4 - * @param message A String which will be compared to the regular - * expressions of the different clients - * @return always returns -1 - public int sendAsyncMsg(String message,boolean async) throws IvyException { - return sendMsg(message,true); - } - */ - - /* - * Performs a pattern matching according to everyone's regexps, and sends * the results to the relevant ivy agents. * - * @since 1.2.4 * @param message A String which will be compared to the regular * expressions of the different clients - * @param async if true, the sending will be performed in a separate thread, - * default is false - * @return if async is true, always returns -1, else returns the number of messages actually sent + * @return returns the number of messages actually sent */ - protected int sendMsg(String message,boolean async) throws IvyException { + public int sendMsg(String message) throws IvyException { int count = 0; - if (async) throw new IvyException("Async sending not supported anymore"); if (doProtectNewlines) message=IvyClient.encode(message); else if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1)) throw new IvyException("newline character not allowed in Ivy messages"); for ( Enumeration e=clients.elements();e.hasMoreElements();) { IvyClient client = (IvyClient)e.nextElement(); - count += client.sendMsg(message, async); + count += client.sendMsg(message); } if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(message); return count; @@ -508,10 +483,6 @@ public class Ivy implements Runnable { } } - /* - * removes a client from the list - */ - void removeClient(IvyClient c) { clients.remove(c.getClientKey()); } /* * invokes the application listeners when we are summoned to die @@ -560,14 +531,6 @@ public class Ivy implements Runnable { // Protected methods // /////////////////////////////////////////////////////////////////: - - synchronized void addClient(Socket socket,boolean peerCalling,int protocolVersion) throws IOException { - if (stopped) return; - IvyClient client = new IvyClient( this, socket,peerCalling,new Integer(clientSerial++),protocolVersion); - clients.put(client.getClientKey(),client); - traceDebug(getClientNames()); - } - private static String[] myTokenize(String s,String separator) { int index=0, last=0, length=s.length(); Vector v = new Vector(); @@ -595,70 +558,87 @@ public class Ivy implements Runnable { } - /* - * prevents two clients from connecting to each other at the same time - * there might still be a lingering bug here, that we could avoid with the - * SchizoToken. - */ - boolean checkConnected( IvyClient clnt ) { - if ( clnt.getAppPort() == 0 ) return false; - for (Enumeration e=clients.elements();e.hasMoreElements();) { - IvyClient client = (IvyClient)e.nextElement(); - if ( clnt != client && client.sameClient( clnt ) ) return true; - } - return false; + void addClient(IvyClient c) { + clients.put(c.getClientKey(),c); + traceDebug("added "+c+" in clients: "+getClientNames(clients)); + } + void removeClient(IvyClient c) { + clients.remove(c.getClientKey()); + traceDebug("removed "+c+" from clients: "+getClientNames(clients)); + } + void addHalf(IvyClient c) { + half.put(c.getClientKey(),c); + traceDebug("added "+c+" in half: "+getClientNames(half)); + } + void removeHalf(IvyClient c) { + half.remove(c.getClientKey()); + traceDebug("removed "+c+" from half: "+getClientNames(half)); + } + + boolean shouldIleave(IvyClient ic) { + traceDebug("looking for "+ic+" in "+getClientNames(half)+" and "+getClientNames(clients)); + IvyClient peer=searchPeer(ic); + if (peer==null) return false; + boolean shoulda=peer.compareTo(ic)>0; + traceDebug(ic+" "+ic.toStringExt()+((shoulda)?" must leave ":" must not leave")); + traceDebug(peer+" "+peer.toStringExt()+((!shoulda)?" must leave ":" must not leave")); + return shoulda; + } + + private IvyClient searchPeer(IvyClient ic) { + IvyClient peer; + for (Enumeration e=half.elements();e.hasMoreElements();) + if ((peer=(IvyClient)e.nextElement()).equals(ic)) return peer; + for (Enumeration e=clients.elements();e.hasMoreElements();) + if ((peer=(IvyClient)e.nextElement()).equals(ic)) return peer; + return null; } /* * the service socket thread reader main loop */ public void run() { - traceDebug("Ivy service Thread started"); // THREADDEBUG + traceDebug("service thread started"); // THREADDEBUG Thread thisThread=Thread.currentThread(); while(thisThread==serverThread){ try { Socket socket = app.accept(); if ((thisThread!=serverThread)||stopped) break; // early disconnexion - addClient(socket,true,0); // the peer called me TODO I can't know his protocol version + new IvyClient(this,socket,0); // the peer called me } catch (InterruptedIOException ie) { if (thisThread!=serverThread) break; } catch( IOException e ) { if (serverThread==thisThread) { traceDebug("Error IvyServer exception: " + e.getMessage()); System.out.println("Ivy server socket reader caught an exception " + e.getMessage()); - System.out.println("this is probably a bug in your JVM !"); - // e.printStackTrace(); + System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)"); + System.exit(0); } else { traceDebug("my server socket has been closed"); } } } - traceDebug("Ivy service Thread stopped"); // THREADDEBUG + traceDebug("service thread stopped"); // THREADDEBUG } + protected int getSerial() { return myserial ; } + private void traceDebug(String s){ + if (debug) System.out.println("-->Ivy["+myserial+"]<-- "+s); + } - private void traceDebug(String s){ if (debug) System.out.println("-->ivy<-- "+s); } - - synchronized void registerThread(Thread t) { sendThreads.add(t); } - synchronized void unRegisterThread(Thread t) { sendThreads.remove(t); } + // stuff to guarantee that all the treads have left + synchronized void registerThread(Thread t) { sendThreads.addElement(t); } + synchronized void unRegisterThread(Thread t) { sendThreads.removeElement(t); } synchronized Thread getOneThread() { if (sendThreads.size()==0) return null; return (Thread) sendThreads.firstElement(); } - void waitForAll() { - Thread t; - traceDebug("DEVELOPMENT WAITFORALL sendThreads size : " + sendThreads.size()); - try { while ((t=getOneThread())!=null) { t.join(); } } - catch (InterruptedException ie) { System.out.println("waitForAll Interrupted"); } - traceDebug("DEVELOPMENT END WAITFORALL"); - } - - /* a small private method for debbugging purposes */ - private String getClientNames() { - String s = appName+" clients are: "; - for (Enumeration e=clients.elements();e.hasMoreElements();){ - s+=((IvyClient)e.nextElement()).getApplicationName()+" "; + // a small private method for debbugging purposes + private String getClientNames(Hashtable t) { + String s = "("; + for (Enumeration e=t.elements();e.hasMoreElements();){ + s+=((IvyClient)e.nextElement()).getApplicationName()+","; } - return s; + return s+")"; } public String domains(String toparse) { @@ -681,9 +661,7 @@ public class Ivy implements Runnable { } - /* - * unitary test. Normally, running java fr.dgac.ivy.Ivy should stop in 2.3 seconds :) - */ + // test. Normally, running java fr.dgac.ivy.Ivy should stop in 2.3 seconds :) public static void main(String[] args) { Ivy bus = new Ivy("Test Unitaire","TU ready",null); try { diff --git a/src/IvyClient.java b/src/IvyClient.java index d3deb8b..753184a 100755 --- a/src/IvyClient.java +++ b/src/IvyClient.java @@ -11,10 +11,17 @@ * * CHANGELOG: * 1.2.6 + * - major cleanup to handle simultaneous connections, e.g., between two + * busses within the same process ( AsyncAPI test is very stressful ) + * I made an assymetric processing to elect the client that should + * disconnect based on the socket ports ... might work... + * - jakarta regexp are not meant to be threadsafe, so for match() and + * compile() must be enclaused in a synchronized block * - now sends back an error message when an incorrect regexp is sent * the message is supposed to be readable + * - sendMsg has no more async parameter * 1.2.5: - * - no more java ping ... + * - no more java ping pong * 1.2.5: * - use org.apache.regexp instead of gnu-regexp * http://jakarta.apache.org/regexp/apidocs/ @@ -58,7 +65,6 @@ import java.lang.Thread; import java.net.*; import java.io.*; import java.util.*; -/* import gnu.regexp.*; GNURETOAPACHERE */ import org.apache.regexp.*; public class IvyClient implements Runnable { @@ -70,8 +76,7 @@ public class IvyClient implements Runnable { final static int Error = 3; /* error message */ final static int DelRegexp = 4;/* the peer removes one of his regex */ final static int EndRegexp = 5;/* no more regexp in the handshake */ - final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */ - /* SchizoToken is aka BeginRegexp in other implementations */ + final static int SchizoToken = 6; /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */ final static int DirectMsg = 7;/* the peer sends a direct message */ final static int Die = 8; /* the peer wants us to quit */ final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ @@ -83,55 +88,81 @@ public class IvyClient implements Runnable { final static char newLineChar = '\n'; // private variables + private static Integer csMutex=new Integer(0); + private static int clientSerial=0; /* an unique ID for each IvyClient */ + private String messages_classes[] = null; private Ivy bus; private Socket socket; private BufferedReader in; private OutputStream out; - private int appPort; - private boolean peerCalling; + private int remotePort=0; private volatile Thread clientThread;// volatile to ensure the quick communication - private Integer clientKey ; + private Integer clientKey; private boolean discCallbackPerformed = false; // protected variables - String appName; + String appName="none"; Hashtable regexps = new Hashtable(); Hashtable regexpsText = new Hashtable(); static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; int protocol; + private boolean incoming; - IvyClient(){} + IvyClient() { } - IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException { - appName = "Unknown"; - appPort = 0; + IvyClient(Ivy bus, Socket socket,int remotePort) throws IOException { + synchronized(csMutex) { clientKey=new Integer(clientSerial++); } this.bus = bus; - this.socket = socket; - this.peerCalling=peerCalling; - this.clientKey=clientKey; - this.protocol=protocol; + this.remotePort = remotePort; in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = socket.getOutputStream(); + incoming=(remotePort==0); + traceDebug(((incoming)?"incoming":"outgoing")+" connection on "+socket); + this.socket = socket; + if (!incoming) { // outgoing connexion + synchronized(bus) { + bus.addHalf(this); // register a half connexion + if (bus.shouldIleave(this)) { + traceDebug(toStringExt()+" should leave ..."); + close(false); + bus.removeHalf(this); + return; + } + sendSchizo(); + // the registering will take place at the reception of the regexps... + } + } + clientThread = new Thread(this); // clientThread handles the incoming traffic + clientThread.start(); + } + + // sends our ID, whether we initiated the connexion or not + // the ID is the couple "host name,application Port", the host name + // information is in the socket itself, the port is not known if we + // initiate the connexion + private void sendSchizo() throws IOException { + traceDebug("sending our service port "+bus.applicationPort); Hashtable tosend=bus.selfIvyClient.regexpsText; - // sends our ID, whether we initiated the connexion or not - // the ID is the couple "host name,application Port", the host name - // information is in the socket itself, the port is not known if we - // initiate the connexion sendString(SchizoToken,bus.applicationPort,bus.appName); - // sends our regexps to the peer for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) { Integer ikey = (Integer)e.nextElement(); sendRegexp(ikey.intValue(),(String)tosend.get(ikey)); } sendString( EndRegexp,0,""); - // spawns a thread to manage the incoming traffic on this - // socket. We should be ready to receive messages now. - clientThread = new Thread(this); - clientThread.start(); } - public String toString() { return "IvyClient "+bus.appName+":"+appName; } + synchronized private void handShake() { + synchronized(bus) { + bus.removeHalf(this); + bus.addClient(this); + } + } + + public String toString() { return "IC["+clientKey+","+bus.getSerial()+"] "+bus.appName+":"+appName+":"+remotePort; } + public String toStringExt() { + return "client socket:"+socket+", remoteport:" + remotePort; + } /** * returns the name of the remote agent. @@ -175,7 +206,6 @@ public class IvyClient implements Runnable { /* closes the connexion to the peer */ protected void close(boolean notify) throws IOException { - bus.waitForAll(); traceDebug("closing connexion to "+appName); if (notify) sendBye("hasta la vista"); stopListening(); @@ -214,32 +244,19 @@ public class IvyClient implements Runnable { static String decode(String s) { return s.replace(escapeChar,'\n'); } static String encode(String s) { return s.replace('\n',escapeChar); } Integer getClientKey() { return clientKey ; } - int getAppPort() { return appPort ; } - void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);} - void delRegexp(int id) {sendString(DelRegexp,id,"");} - - int sendMsg(String message,boolean async) { - if (async) { - new Sender(message); - return -1; - } else { return sendMsg(message); } - } + protected void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);} + protected void delRegexp(int id) {sendString(DelRegexp,id,"");} - private int sendMsg(String message) { + protected int sendMsg(String message) { int count = 0; for (Enumeration e = regexps.keys();e.hasMoreElements();) { Integer key = (Integer)e.nextElement(); RE regexp = (RE)regexps.get(key); - /* - * GNURETOAPACHERE - int nb = regexp.getNumSubs(); - REMatch result = regexp.getMatch(message); - if (result==null) continue; // no match - count++; // match - send(Msg,key,regexp.getNumSubs(),result); - * - */ - if (!regexp.match(message)) continue; // no match + // re.match fails sometimes when it is called concurrently .. + // see 28412 on jakarta regexp bugzilla + synchronized (regexp) { + if (!regexp.match(message)) continue; // no match + } count++; // match sendResult(Msg,key,regexp); } @@ -262,25 +279,40 @@ public class IvyClient implements Runnable { /* * compares two peers the id is the couple (host,service port). - * true if the peers are similir. This should not happen, it is bad + * true if the peers are similar. This should not happen, it is bad */ - protected boolean sameClient( IvyClient clnt ) { - return ( appPort != 0 && appPort == clnt.appPort ) - && ( getRemoteAddress() == clnt.getRemoteAddress() ) ; + protected int compareTo(IvyClient clnt) { + // return clnt.clientKey.compareTo(clientKey); // Wrong. it's random... + return (clnt.socket.getPort()-socket.getLocalPort()); + } + + protected boolean equals(IvyClient clnt) { + if (clnt==this) return false; + // TODO go beyond the port number ! add some host processing, cf: + // IvyWatcher ... + if (remotePort==clnt.remotePort) return true; + /* + e.g. + if (socket.getInetAddress()==null) return false; + if (clnt.socket.getInetAddress()==null) return false; + if (!socket.getInetAddress().equals(clnt.socket.getInetAddress())) return false; + */ + return false; } /* * the code of the thread handling the incoming messages. */ public void run() { + traceDebug("Thread started"); Thread thisThread = Thread.currentThread(); String msg = null; try { - traceDebug("Connected from "+ socket.getInetAddress().getHostName()+ ":"+socket.getPort()); + traceDebug("connection established with "+ + socket.getInetAddress().getHostName()+ ":"+socket.getPort()); } catch (Exception ie) { traceDebug("Interrupted while resolving remote hostname"); } - traceDebug("Thread started"); while (clientThread==thisThread) { try { if ((msg=in.readLine()) != null ) { @@ -294,21 +326,23 @@ public class IvyClient implements Runnable { break; } } catch (IvyException ie) { - traceDebug("IvyClient caught an exception"); + traceDebug("caught an IvyException"); ie.printStackTrace(); } catch (InterruptedIOException ioe) { traceDebug("I have been interrupted. I'm about to leave my thread loop"); if (thisThread!=clientThread) break; } catch (IOException e) { - traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort()); + if (clientThread!=null) { + traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort()); + } break; } } traceDebug("normally Disconnected from "+ appName); - traceDebug("Thread stopped"); // invokes the disconnect applicationListeners if (!discCallbackPerformed) bus.clientDisconnects(this); discCallbackPerformed=true; + traceDebug("Thread stopped"); } private synchronized void sendBuffer( String buffer ) throws IvyException { @@ -340,21 +374,18 @@ public class IvyClient implements Runnable { } } - /* GNURETOAPACHERE - private void send(int type, Integer id, int nbsub, REMatch result) { - */ - private void sendResult(int type,Integer id, RE regexp) { - String buffer = type+" "+id+StartArg; - /* - for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1) - */ - for(int i=1;i<regexp.getParenCount();i++) buffer+=regexp.getParen(i)+EndArg; try { + String buffer = type+" "+id+StartArg; + for(int i=1;i<regexp.getParenCount();i++) + buffer+=regexp.getParen(i)+EndArg; sendBuffer(buffer); } catch (IvyException ie ) { System.err.println("received an exception: " + ie.getMessage()); ie.printStackTrace(); + } catch (StringIndexOutOfBoundsException sioobe) { + System.out.println("arg: "+regexp.getParenCount()+" "+regexp); + sioobe.printStackTrace(); } } @@ -461,10 +492,6 @@ public class IvyClient implements Runnable { break; case EndRegexp: bus.clientConnects(this); - /* TODO check with the protocol itself. - * the peer is perhaps not ready to handle this message - * an assymetric processing should be written - */ if (bus.ready_message!=null) sendMsg(bus.ready_message); break; case Msg: @@ -487,16 +514,33 @@ public class IvyClient implements Runnable { String error=s.substring(from,b.length); traceDebug("Error msg "+msgId+" "+error); break; - case SchizoToken: // aka BeginRegexp in other implementations + case SchizoToken: // aka BeginRegexp in other implementations, or MsgSync appName=s.substring(from,b.length); - appPort=msgId.intValue(); - if ( bus.checkConnected(this) ) { - try { - close(false); - } catch (IOException ioe) { - throw new IvyException("io " + ioe.getMessage()); + remotePort=msgId.intValue(); + traceDebug("the peer sent his service port: "+remotePort); + if (incoming) { + // incoming connexion, I wait for his token to send him mine ... + synchronized(bus) { + bus.addHalf(this); + try { + // there is another connexion. Should I leave ? + // Assymetric processing to prevent concurrent disconnexions + if (bus.shouldIleave(this)) { + traceDebug(toStringExt()+" should leave ..."); + close(false); + bus.removeHalf(this); + return false; + } + sendSchizo(); + handShake(); + } catch (IOException ioe) { + throw new IvyException(ioe.toString()); + } } - throw new IvyException("Rare ! A concurrent connect occured"); + } else { + // outgoing connexion + // I already have sent him a token + handShake(); } break; case DirectMsg: @@ -513,31 +557,14 @@ public class IvyClient implements Runnable { private void sendBye() {sendString(Bye,0,"");} private void sendBye(String message) {sendString(Bye,0,message);} - private InetAddress getRemoteAddress() { return socket.getInetAddress(); } - private void traceDebug(String s){ - if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s); + if (debug) System.out.println("-->IvyClient["+clientKey+","+bus.getSerial()+"] "+bus.appName+" (remote "+appName+")<-- "+s); } private void traceDebug(String[] tab){ - String s = "String array " + tab.length + " elements: "; + String s = " string array " + tab.length + " elements: "; for (int i=0;i<tab.length;i++) s+="("+tab[i]+") "; - if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s); + traceDebug(s); } - // a class to perform the threaded execution of each new message - // this is an experimental feature introduced in 1.2.4 - class Sender implements Runnable { - String message; - private Thread t; - public Sender(String message) { - this.message=message; - t=new Thread(Sender.this); - bus.registerThread(t); - t.start(); - bus.unRegisterThread(t); - } - public void run() { sendMsg(message); } - } // class Sender - } diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java index dafa6f9..3ac5ef9 100755 --- a/src/IvyWatcher.java +++ b/src/IvyWatcher.java @@ -14,6 +14,9 @@ * thing. * * CHANGELOG: + * 1.2.6: + * - IOException now goes silent when we asked the bus to stop() + * - use a new buffer for each Datagram received, to prevent an old bug * 1.2.5: * - getDomain now sends IvyException for malformed broadcast addresses * - uses apache jakarta-regexp instead of gnu-regexp @@ -56,7 +59,6 @@ import java.lang.Thread; import java.net.*; import java.io.*; import java.util.StringTokenizer; -/* import gnu.regexp.*; GNURETOAPACHERE */ import org.apache.regexp.*; import java.util.Vector; import java.util.Enumeration; @@ -71,6 +73,8 @@ class IvyWatcher implements Runnable { private int port; private volatile Thread listenThread; private InetAddress group; + private static int serial=0; + private int myserial=serial++; /** * creates an Ivy watcher @@ -103,55 +107,57 @@ class IvyWatcher implements Runnable { * the behaviour of each thread watching the UDP socket. */ public void run() { - traceDebug("IvyWatcher Thread started"); // THREADDEBUG + traceDebug("Thread started"); // THREADDEBUG Thread thisThread=Thread.currentThread(); traceDebug("beginning of a watcher Thread"); - byte buf[] = new byte[256]; - DatagramPacket packet=new DatagramPacket(buf, 256); InetAddress remotehost=null; try { while( listenThread==thisThread ) { try { + byte buf[] = new byte[256]; + DatagramPacket packet=new DatagramPacket(buf,buf.length); broadcast.receive(packet); if (listenThread!=thisThread) break; // I was summoned to leave during the receive - String msg = new String(packet.getData()) ; - for (int i=0;i<buf.length;i++) { buf[i]=10; } - // clean up the buffer after each message - // BUGFIX ? I change 0 to 10 in order to avoid a bug - remotehost = packet.getAddress(); - traceDebug("BUSWATCHER Receive Broadcast from "+remotehost.getHostName()+":"+packet.getPort()); - // if ( !isInDomain( remotehost ) ) continue; + String msg = new String(buf,0,packet.getLength()); + String remotehostname=null; try { + remotehost = packet.getAddress(); + remotehostname = remotehost.getHostName(); RE re = new RE("([0-9]*) ([0-9]*)"); if (!(re.match(msg))) { - System.err.println("Ignoring bad format broadcast from "+remotehost); + System.err.println("Ignoring bad format broadcast from "+ + remotehostname+":"+packet.getPort()); continue; } int version = Integer.parseInt(re.getParen(1)); if ( version < bus.PROTOCOLMINIMUM ) { - System.err.println("Ignoring bad protocol version "+remotehost+" we need "+ bus.PROTOCOLMINIMUM+" minimum"); + System.err.println("Ignoring bad format broadcast from "+ + remotehostname+":"+packet.getPort() + +" protocol version "+remotehost+" we need "+bus.PROTOCOLMINIMUM+" minimum"); continue; } int port = Integer.parseInt(re.getParen(2)); - // allows the connexion from a remote host with the same port number - // if ( ( (remotehost.equals(localhost)) || (remotehost.equals(loopback)) ) - // && (bus.applicationPort==port)) { if (bus.applicationPort==port) { - traceDebug("ignoring a broadcast on my port number, it's *probably* me"); - continue; // it's me + traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort() + +" on my port number ("+port+"), it's probably me"); + // TODO check better + // if bus.applicationPort=port + // parse the list of Watchers and check for each + // iw.broadcast.getInetAddress().equals(packet().getAddress() + // if one is true, "continue" ( ignore the broadcast ) + continue; } - traceDebug("Broadcast de " +packet.getAddress().getHostName() - +":"+packet.getPort()+" port "+port+" version "+version); - Socket socket = new Socket( remotehost, port ); - bus.addClient(socket,false,version); + traceDebug("broadcast accepted from " +remotehostname + +":"+packet.getPort()+", port:"+port+", protocol version:"+version); + new IvyClient(bus,new Socket(remotehost,port),port); } catch (RESyntaxException ree) { ree.printStackTrace(); System.exit(-1); } catch (NumberFormatException nfe) { - System.err.println("Ignoring bad format broadcast from "+remotehost); + System.err.println("Ignoring bad format broadcast from "+remotehostname); continue; } catch ( UnknownHostException e ) { - System.err.println("Unkonwn host "+remotehost + e.getMessage()); + System.err.println("Unkonwn host "+remotehost +","+e.getMessage()); } catch ( IOException e) { System.err.println("can't connect to "+remotehost+" port "+ port+e.getMessage()); } @@ -161,26 +167,24 @@ class IvyWatcher implements Runnable { } // while } catch (java.net.SocketException se ){ if (thisThread==listenThread) { - System.out.println("IvyWatcher error, continuing anyway"); - se.printStackTrace(); + traceDebug("socket exception, continuing anyway on other Ivy domains "+se); } } catch (IOException ioe ){ - System.out.println("IvyWatcher IO Exception, continuing anyway"); - ioe.printStackTrace(); + System.out.println("IO Exception, continuing anyway on other Ivy domains "+ioe); } - traceDebug("IvyWatcher Thread stopped"); // THREADDEBUG + traceDebug("Thread stopped"); // THREADDEBUG } /** * stops the thread waiting on the broadcast socket */ synchronized void stop() { - traceDebug("begining stopping an IvyWatcher"); + traceDebug("begining stopping"); Thread t = listenThread; listenThread=null; broadcast.close(); if (t!=null) { t.interrupt(); } // it might not even have been created - traceDebug("ending stopping an IvyWatcher"); + traceDebug("stopped"); } private class PacketSender implements Runnable { @@ -201,10 +205,11 @@ class IvyWatcher implements Runnable { e.printStackTrace(); traceDebug("IO interrupted during the broadcast. Do nothing"); } catch ( IOException e ) { - System.out.println("Broadcast Error" + e.getMessage()); - e.printStackTrace(); - // throw new IvyException("Broadcast error " + e.getMessage() ); - System.exit(0); + if (listenThread!=null) { + System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway"); + // cannot throw new IvyException in a run ... + e.printStackTrace(); + } } traceDebug("PacketSender thread stopped"); // THREADDEBUG } @@ -274,9 +279,8 @@ class IvyWatcher implements Runnable { private void traceDebug(String s){ - if (debug) System.out.println("-->ivywatcher<-- "+s); + if (debug) System.out.println("-->IvyWatcher["+myserial+","+bus.getSerial()+"]<-- "+s); } } // class IvyWatcher -/* EOF */ diff --git a/src/Makefile b/src/Makefile index 8a12da1..5d0dbe2 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,6 +1,7 @@ -GNUPATH=/usr/share/java/gnu-getopt.jar:/usr/share/java/regexp.jar -#GNUPATH=/usr/share/java/repository:/usr/share/java/gnu.getopt.0.9.jar:/usr/share/java/gnu-regexp-1.1.3.jar +GNUPATH=/usr/share/java/gnu-getopt.jar:/usr/share/java/regexp.jar # debian SID +#GNUPATH=/usr/share/java/repository:/usr/share/java/gnu.getopt.jar:/usr/share/java/regexp.jar # debian woody #GNUPATH=${HOME}/java/jars/gnu-getopt.jar:${HOME}/java/jars/regexp.jar +#GNUPATH=../bundle ####################################### # generic setup diff --git a/src/Probe.java b/src/Probe.java index c83b9ac..e086560 100644 --- a/src/Probe.java +++ b/src/Probe.java @@ -45,7 +45,6 @@ package fr.dgac.ivy ; import java.io.*; import java.util.*; import gnu.getopt.Getopt; -/* import gnu.regexp.*; GNURETOAPACHERE */ import org.apache.regexp.*; public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBindListener, Runnable { diff --git a/src/SelfIvyClient.java b/src/SelfIvyClient.java index d6e6f1e..a96e0d2 100644 --- a/src/SelfIvyClient.java +++ b/src/SelfIvyClient.java @@ -6,6 +6,8 @@ * @since 1.2.4 * * CHANGELOG: + * 1.2.6: + * - jakarta regexp are not threadsafe, adding extra synch blocks * 1.2.5: * - uses apache regexp instead of gnu regexp * 1.2.4: @@ -15,7 +17,6 @@ package fr.dgac.ivy ; import java.util.*; -/* import gnu.regexp.*; GNURETOAPACHERE */ import org.apache.regexp.*; class SelfIvyClient extends IvyClient { @@ -83,9 +84,11 @@ class SelfIvyClient extends IvyClient { Integer key = (Integer)e.nextElement(); RE regexp = (RE)regexps.get(key); String sre = (String)regexpsText.get(key); - if (!regexp.match(message)) continue; - count++; - callCallback(this,key,toArgs(regexp)); + synchronized(regexp) { + if (!regexp.match(message)) continue; + count++; + callCallback(this,key,toArgs(regexp)); + } } return count; } @@ -101,7 +104,7 @@ class SelfIvyClient extends IvyClient { // runs the callback in the same thread callback.receive(client, tab); } else { - // starts a new Thread for each callback ... + // starts a new Thread for each callback ... ( Async API ) new Runner(callback,client,tab); } } @@ -124,10 +127,10 @@ class SelfIvyClient extends IvyClient { IvyClient c; String[] args; private Thread t; - public Runner(IvyMessageListener cb,IvyClient c,String[] args) { + public Runner(IvyMessageListener cb,IvyClient c,String[] a) { this.cb=cb; - this.args=args; this.c=c; + args=a; t=new Thread(Runner.this); bus.registerThread(t); t.start(); |