diff options
Diffstat (limited to 'src/Ivy.java')
-rwxr-xr-x | src/Ivy.java | 811 |
1 files changed, 439 insertions, 372 deletions
diff --git a/src/Ivy.java b/src/Ivy.java index 5e3362f..b147af3 100755 --- a/src/Ivy.java +++ b/src/Ivy.java @@ -1,10 +1,12 @@ /** * a software bus package * - * @author Yannick Jestin - * @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a> + * @author Yannick Jestin <a + * href="mailto:yannick.jestin@enac.fr">yannick.jestin&enac.fr</a> + * @author <a href="http://www2.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a> * * (c) CENA 1998-2004 + * (c) ENAC 2005-2011 * *<pre> *Ivy bus = new Ivy("Dummy agent","ready",null); @@ -13,6 +15,37 @@ *</pre> * * CHANGELOG: + * 1.2.14 + * - added a lock mechanism to be sure that once a connexion has been + * initiated, the ready message will be sent before stopping the bus + * now: Ivy b = new Ivy(...); b.sendMsg("coucou"); b.stop(); should + * send messages (at least the ready message) if there is a connexion + * attempt made before b.stop() is effective. To be sure, there is a 200ms + * delay before b.stop() can be effective (the Threads stopped, the sockets + * closed) + * - reintroduced a mechanism to allow the drop of a double connexion + * attempt + * - removed protected methods from javadoc + * - switch to apache fop + docbook for documentation + * - added sypport to the Swing Dispatch Thread in the bindAsyncMsg api + * (this breaks the former API, this is BAAAAAD). Use BindType.SWING as the + * latter argument + * - javadoc updated + * - appName gone private, with a protected accessor + * - add a lockApp synchronization for application socket control + * - use of stringbuffers to concatenate strings, instead of using +, which + * could lead to a quadractic cost in the number of iteraction (the growing + * string was recopied in each iteration) + * - throws RuntimeException instead of System.exit(), allows code reuse + * - ready message is set to appName + " READY" if null has been provided + * - switch from gnu regexp (deprecated) to the built in java regexp + * - when possible, move the regexp Pattern.compile in static areas, to avoid multiple + * calls + * - add generic types to declarations + * - fxed a potential null pointer dereference on quit + * - lowercase CheckRegexp to checkRegexp (bad practice, thanks to FindBugs) + * - recopy the filter String[] in setfilter, to avoid exposing internal + * representation (unsafe operation) * 1.2.13: * - adds support for RESyntaxException * 1.2.12: @@ -86,114 +119,131 @@ * adding the sendToSelf feature * fixed the printStackTrace upon closing of the ServerSocket after a close() */ -package fr.dgac.ivy ; +package fr.dgac.ivy; import java.net.*; import java.io.*; import java.util.*; import gnu.getopt.Getopt; -import org.apache.regexp.*; +import java.util.regex.*; public class Ivy implements Runnable { /** - * the name of the application on the bus + * the protocol version number. */ - String appName; + public static final int PROTOCOLVERSION = 3; + public static final int PROTOCOLMINIMUM = 3; + private static final int GRACEDELAY = 200; // in milliseconds /** - * the protocol version number + * the port for the UDP rendez vous, if none is supplied. */ - public static final int PROTOCOLVERSION = 3 ; - public static final int PROTOCOLMINIMUM = 3 ; + public static final int DEFAULT_PORT = 2010; /** - * the port for the UDP rendez vous, if none is supplied + * the domain for the UDP rendez vous. */ - public static final int DEFAULT_PORT = 2010 ; - /** - * the domain for the UDP rendez vous - */ - public static final String DEFAULT_DOMAIN = "127.255.255.255:"+DEFAULT_PORT; + public static final String DEFAULT_DOMAIN = "127.255.255.255:" + DEFAULT_PORT; /** * the library version, useful for development purposes only, when java is * invoked with -DIVY_DEBUG */ - public static final String libVersion ="1.2.13"; + public static final String LIBVERSION ="1.2.14"; + public static final int TIMEOUTLENGTH = 1000; + private String appName; + private int applicationPort; /* Application port number */ + private String ready_message = null; + private boolean doProtectNewlines = false; + private SelfIvyClient selfIvyClient; + private Object lockApp = new Object(); private boolean debug; private ServerSocket app; - private Vector watchers = new Vector(); + private Vector<IvyWatcher> watchers = new Vector<IvyWatcher>(); private volatile Thread serverThread; // to ensure quick communication of the end - private Hashtable clients = new Hashtable(); - private Hashtable half = new Hashtable(); - private Vector ivyApplicationListenerList = new Vector(); - private Vector ivyBindListenerList = new Vector(); - private Vector sendThreads = new Vector(); + private Hashtable<Integer,IvyClient> clients = new Hashtable<Integer,IvyClient>(); + private Hashtable<Integer,IvyClient> half = new Hashtable<Integer,IvyClient>(); + private Vector<IvyApplicationListener> ivyApplicationListenerList = new Vector<IvyApplicationListener>(); + private Vector<IvyBindListener> ivyBindListenerList = new Vector<IvyBindListener>(); + private Vector<Thread> sendThreads = new Vector<Thread>(); private String[] filter = null; - private boolean stopped=true; - protected int applicationPort; /* Application port number */ - protected String ready_message = null; - protected boolean doProtectNewlines = false ; - private boolean doSendToSelf = false ; - protected SelfIvyClient selfIvyClient ; - public final static int TIMEOUTLENGTH = 1000; - private static int serial=0; - private int myserial=serial++; - static long current = System.currentTimeMillis(); - private static java.util.Random generator = new java.util.Random(current*(serial+1)); - private String watcherId=null; - + private boolean stopped = true; + private boolean starting = false; + protected Object readyToSend = new Object(); + private boolean doSendToSelf = false; + private static int serial = 0; + private int myserial = serial++; + private static long current = System.currentTimeMillis(); + private static java.util.Random generator = new java.util.Random(current*(serial + 1)); + private String watcherId = null; + private static Pattern rangeRE; // tcp range min and max + private static Pattern bounded; + + private static final Object lock = new Object(); + /** * Readies the structures for the software bus connexion. * * All the dirty work is done un the start() method * @see #start * @param name The name of your Ivy agent on the software bus - * @param message The hellow message you will send once ready + * @param message The hellow message you will send once ready. It can be + * null, in which case "appname READY" will be the default * @param appcb A callback handling the notification of connexions and - * disconnections, may be null + * disconnections, (may be null for most agents) */ - public Ivy(String name, String message, IvyApplicationListener appcb) { + public Ivy(final String name, final String message, final IvyApplicationListener appcb) { appName = name; - ready_message = message; - debug = (System.getProperty("IVY_DEBUG")!=null); - if ( appcb != null ) ivyApplicationListenerList.addElement( appcb ); - selfIvyClient=new SelfIvyClient(this,name); + ready_message = (message == null) ? name + " READY" : message; + debug = + (System.getProperty("IVY_DEBUG") != null) + || (System.getProperty("IVY_DEBUG") != null); + if ( appcb != null ) { + ivyApplicationListenerList.addElement( appcb ); + } + selfIvyClient = new SelfIvyClient(this , name); } /** - * Waits for a message to be received + * Waits for a message to be received. * * @since 1.2.4 * @param regexp the message we're waiting for to continue the main thread. * @param timeout in millisecond, 0 if infinite + * @throws IvyException if something bad happens * @return the IvyClient who sent the message, or null if the timeout is * reached */ - public IvyClient waitForMsg(String regexp,int timeout) throws IvyException { + public final IvyClient waitForMsg(final String regexp , final int timeout) throws IvyException { Waiter w = new Waiter(timeout); - int re = bindMsg(regexp,w); - IvyClient ic=w.waitFor(); + int re = bindMsg(regexp , w); + IvyClient ic = w.waitFor(); unBindMsg(re); return ic; } /** - * Waits for an other IvyClient to join the bus + * Waits for an other IvyClient to join the bus. * * @since 1.2.4 * @param name the name of the client we're waiting for to continue the main thread. * @param timeout in millisecond, 0 if infinite + * @throws IvyException if something bad happens * @return the first IvyClient with the name or null if the timeout is * reached */ - public IvyClient waitForClient(String name,int timeout) throws IvyException { + public final IvyClient waitForClient(final String name , final int timeout) throws IvyException { IvyClient ic; - if (name==null) throw new IvyException("null name given to waitForClient"); + if (name == null) { + throw new IvyException("null name given to waitForClient"); + } // first check if client with the same name is on the bus - if ((ic=alreadyThere(clients,name))!=null) return ic; + ic = alreadyThere(clients , name); + if (ic != null) { + return ic; + } // if not enter the waiting loop - WaiterClient w = new WaiterClient(name,timeout,clients); + WaiterClient w = new WaiterClient(name , timeout , clients); int i = addApplicationListener(w); - ic=w.waitForClient(); + ic = w.waitForClient(); removeApplicationListener(i); return ic; } @@ -201,15 +251,9 @@ public class Ivy implements Runnable { /* * since 1.2.8 */ - static protected IvyClient alreadyThere(Hashtable c,String name) { - IvyClient ic; - for (Enumeration e=c.elements();e.hasMoreElements();) { - try { - ic = (IvyClient)e.nextElement(); - } catch (ArrayIndexOutOfBoundsException _ ) { - return null; // with gij, it ... can happen - } - if ((ic!=null)&&(name.compareTo(ic.getApplicationName())==0)) return ic; + protected static IvyClient alreadyThere(final Hashtable<Integer , IvyClient> c , final String name) { + for (IvyClient ic : c.values()) { + if ((ic != null)&&(name.compareTo(ic.getApplicationName()) == 0)) return ic; } return null; } @@ -220,8 +264,11 @@ public class Ivy implements Runnable { * <li>One thread (IvyWatcher) for each traffic rendezvous (either UDP broadcast or TCPMulticast) * <li>One thread (serverThread/Ivy) to accept incoming connexions on server socket * <li>a thread for each IvyClient when the connexion has been done - * - * @param domainbus a domain of the form 10.0.0:1234, it is similar to the + * @throws IvyException if there is a problem joining the bus + * @param domainbus a domain of the form 10.0.0:1234, A good practice is to + * sick to a null value, so that your agent will honor the IVY_BUS parameter + * given to the jvm (java -DIVYBUS= ... . Otherwise, you can provide some + * hard-coded value, similar to the * netmask without the trailing .255. This will determine the meeting point * of the different applications. Right now, this is done with an UDP * broadcast. Beware of routing problems ! You can also use a comma @@ -230,108 +277,124 @@ public class Ivy implements Runnable { * 1.2.8: goes synchronized. I don't know if it's really useful * */ - public synchronized void start(String domainbus) throws IvyException { + + public final void start(final String domainbus) throws IvyException { if (!stopped) throw new IvyException("cannot start a bus that's already started"); + setStarting(true); // this will remain true entil one of the PacketSenders has finished stopped=false; - if (domainbus==null) domainbus=getDomain(null); - Properties sysProp = System.getProperties(); - sysProp.put("IVYBUS",domainbus); - String range=(String)sysProp.get("IVYRANGE"); - RE rangeRE; // tcp range min and max - try { - rangeRE = new RE("(\\d+)-(\\d+)"); // tcp range min and max - } catch ( RESyntaxException res ) { - throw new IvyException("Regular Expression bug in Ivy source code ... bailing out"); + String db = domainbus; + if (db == null) { + db = getDomain(null); } - if ((range!=null)&&rangeRE.match(range)) { - int rangeMin=Integer.parseInt(rangeRE.getParen(1)); - int rangeMax=Integer.parseInt(rangeRE.getParen(2)); - int index=rangeMin; - traceDebug("trying to allocate a TCP port between "+rangeMin+" and "+rangeMax); - boolean allocated=false; + Properties sysProp = System.getProperties(); + sysProp.put("IVYBUS" , db); + String range = (String)sysProp.get("IVYRANGE"); + Matcher match; + if ((range != null)&&(match = rangeRE.matcher(range)).matches()) { + int rangeMin = Integer.parseInt(match.group(1)); + int rangeMax = Integer.parseInt(match.group(2)); + int index = rangeMin; + traceDebug("trying to allocate a TCP port between " + rangeMin + " and " + rangeMax); + boolean allocated = false; while (!allocated) try { - if (index>rangeMax) throw new IvyException("no available port in IVYRANGE" + range ); - app = new ServerSocket(index); - app.setSoTimeout(TIMEOUTLENGTH); - applicationPort = app.getLocalPort(); - allocated=true; + if (index>rangeMax) throw new IvyException("no available port in IVYRANGE" + range ); + synchronized (lockApp) { + app = new ServerSocket(index); + app.setSoTimeout(TIMEOUTLENGTH); + applicationPort = app.getLocalPort(); + } + allocated = true; } catch (BindException e) { index++; } catch (IOException e) { - throw new IvyException("can't open TCP service socket " + e ); + throw new IvyException("can't open TCP service socket " + e ); } } else try { - app = new ServerSocket(0); - app.setSoTimeout(TIMEOUTLENGTH); - applicationPort = app.getLocalPort(); + synchronized (lockApp) { + app = new ServerSocket(0); + app.setSoTimeout(TIMEOUTLENGTH); + applicationPort = app.getLocalPort(); + } } catch (IOException e) { throw new IvyException("can't open TCP service socket " + e ); } - // app.getInetAddress().getHostName()) is always 0.0.0.0 - traceDebug("lib: "+libVersion+" protocol: "+PROTOCOLVERSION+" TCP service open on port "+applicationPort); + traceDebug("lib: " + LIBVERSION + " protocol: " + PROTOCOLVERSION + " TCP service open on port " + applicationPort); - Domain[] d = parseDomains(domainbus); - if (d.length==0) throw new IvyException("no domain found in "+domainbus); - watcherId=getWBUId().replace(' ','*'); // no space in the watcherId + Domain[] d = parseDomains(db); + if (d.length == 0) throw new IvyException("no domain found in " + db); + watcherId = getWBUId().replace(' ' , '*'); // no space in the watcherId // readies the rendezvous : an IvyWatcher (thread) per domain bus - for (int index=0;index<d.length;index++) - watchers.addElement(new IvyWatcher(this,d[index].domainaddr,d[index].port)); + for (Domain dom: d) watchers.addElement(new IvyWatcher(this , dom.domainaddr , dom.port)); serverThread = new Thread(this); serverThread.start(); // sends the broadcasts and listen to incoming connexions - for (int i=0;i<watchers.size();i++){ ((IvyWatcher)watchers.elementAt(i)).start(); } + for (IvyWatcher iw: watchers) iw.doStart(); } - public Domain[] parseDomains(String domainbus) { - StringTokenizer st = new StringTokenizer(domainbus,","); + protected final Domain[] parseDomains(final String domainbus) { + StringTokenizer st = new StringTokenizer(domainbus , ","); Domain[] d = new Domain[st.countTokens()]; - int index=0; + int index = 0; while ( st.hasMoreTokens()) { - String s = st.nextToken() ; + String s = st.nextToken(); try { - d[index++]=new Domain(IvyWatcher.getDomain(s),IvyWatcher.getPort(s)); + d[index++] = new Domain(IvyWatcher.getDomain(s) , IvyWatcher.getPort(s)); } catch (IvyException ie) { - // do nothing - ie.printStackTrace(); + // do nothing + ie.printStackTrace(); } } // fixes the port values ... int lastport = Ivy.DEFAULT_PORT; - for (index--;index>=0;index--) { - Domain dom=d[index]; - if (dom.port==0) dom.port=lastport; - lastport=dom.port; + for (index--; index >= 0; index--) { + Domain dom = d[index]; + if (dom.port == 0) dom.port = lastport; + lastport = dom.port; } return d; } + + private void waitForRemote(String s) { + try { + while (starting==true) { + Thread.sleep(GRACEDELAY); + traceDebug("I'm waiting before "+s+", a starting tread is in progress"); + } + } catch (InterruptedException ie) { + // should not happen, and it's not a problem anyway + } + } + /** - * disconnects from the Ivy bus + * disconnects from the Ivy bus. */ - public void stop() { + public final void stop() { + waitForRemote("stopping"); if (stopped) return; - stopped=true; + stopped = true; + serverThread = null; traceDebug("beginning stopping"); try { // stopping the serverThread - Thread t=serverThread; - serverThread=null; - if (t!=null) t.interrupt(); // The serverThread might be stopped even before having been created - // System.out.println("IZZZ joining "+t); - try { t.join(); } catch ( InterruptedException _ ) { } - // TODO BUG avec gcj+kaffe, le close() reste pendu et ne rend pas la main - app.close(); + Thread t = serverThread; + if (t != null) { + t.interrupt(); // The serverThread might be stopped even before having been created + // System.out.println("IZZZ joining " + t); + try { t.join(); } catch ( InterruptedException ie ) { + ie.printStackTrace(); + } + } + synchronized (lockApp) { app.close(); } // stopping the IvyWatchers - for (int i=0;i<watchers.size();i++){ ((IvyWatcher)watchers.elementAt(i)).stop(); } + for (IvyWatcher iw: watchers) iw.doStop(); watchers.clear(); // stopping the remaining IvyClients - for (Enumeration e=clients.elements();e.hasMoreElements();) { - try { - IvyClient c = (IvyClient)e.nextElement(); - if (c!=null) { c.close(true);removeClient(c); } - } catch (ArrayIndexOutOfBoundsException _ ) { - continue; + for (IvyClient c : clients.values()) { + if (c != null) { + c.close(true); + removeClient(c); } } } catch (IOException e) { @@ -347,53 +410,62 @@ public class Ivy implements Runnable { * is false * @since 1.2.4 */ - public void sendToSelf(boolean b) {doSendToSelf=b;} + public final void sendToSelf(final boolean b) { + doSendToSelf = b; + } /** * do I send messsages to myself ? + * @return a boolean * @since 1.2.4 */ - public boolean isSendToSelf() {return doSendToSelf;} + public final boolean isSendToSelf() { + return doSendToSelf; + } /** - * returns our self IvyClient. + * selfIvyClient accesssor. + * @return our selfIvyClient + * @since 1.2.4 * @since 1.2.4 */ - public IvyClient getSelfIvyClient() {return selfIvyClient;} + public final SelfIvyClient getSelfIvyClient() { + return selfIvyClient; + } /** * Toggles the encoding/decoding of messages to prevent bugs related to the - * presence of a "\n" + * presence of a "\n". * @param b true if you want to enforce encoding of newlines. Default * is false. Every receiver will have to decode newlines * @since 1.2.5 * The default escape character is a ESC 0x1A */ - public void protectNewlines(boolean b) {doProtectNewlines=b;} + public final void protectNewlines(final boolean b) { + doProtectNewlines = b; + } /** * Performs a pattern matching according to everyone's regexps, and sends * the results to the relevant ivy agents. - * + * @throws IvyException if there is a problem sending the message * @param message A String which will be compared to the regular * expressions of the different clients * @return returns the number of messages actually sent */ - public int sendMsg(String message) throws IvyException { + public final int sendMsg(final String message) throws IvyException { int count = 0; - if (doProtectNewlines) - message=IvyClient.encode(message); - else if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1)) - throw new IvyException("newline character not allowed in Ivy messages"); - for ( Enumeration e=clients.elements();e.hasMoreElements();) { - try { - IvyClient client = (IvyClient)e.nextElement(); - if (client!=null) count += client.sendMsg(message); - } catch (ArrayIndexOutOfBoundsException _ ) { - continue; // gij problem - } + waitForRemote("sending"); + synchronized (lock) { + traceDebug("sending "+message); + String msg = message; + if (doProtectNewlines) msg = IvyClient.encode(message); + else if ( (msg.indexOf(IvyClient.newLineChar) != -1)||(msg.indexOf(IvyClient.endArgChar) != -1)) + throw new IvyException("newline character not allowed in Ivy messages"); + for ( IvyClient client : clients.values()) if (client != null) count += client.sendMsg(msg); + if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(msg); + traceDebug("end sending "+message+" to "+count+" clients"); } - if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(message); return count; } @@ -411,10 +483,12 @@ public class Ivy implements Runnable { * @param sregexp a perl regular expression, groups are done with parenthesis * @param callback any objects implementing the IvyMessageListener * interface, on the AWT/Swing framework + * @throws IvyException if there is a problem in the binding, be it regexp + * or network * @return the id of the regular expression */ - public int bindMsg(String sregexp, IvyMessageListener callback ) throws IvyException { - return bindMsg(sregexp,callback,false); + public final int bindMsg(final String sregexp , final IvyMessageListener callback ) throws IvyException { + return bindMsg(sregexp , callback , BindType.NORMAL); } /** @@ -430,10 +504,15 @@ public class Ivy implements Runnable { * @param sregexp a perl compatible regular expression, groups are done with parenthesis * @param callback any objects implementing the IvyMessageListener * interface, on the AWT/Swing framework + * @param type if set to NORMAL, it's a normal bind, if it's ASYNC, the + * callback will be created in a newly spawned Thread (Heavy ressources), if + * it's SWING, the callback will be deferred to the Swing Event Dispatch + * Tread + * @throws IvyException if there is a problem binding (network, regexp...) * @return the int ID of the regular expression. */ - public int bindAsyncMsg(String sregexp, IvyMessageListener callback ) throws IvyException { - return bindMsg(sregexp,callback,true); + public final int bindAsyncMsg(final String sregexp, final IvyMessageListener callback, BindType type ) throws IvyException { + return bindMsg(sregexp , callback , type); } /** @@ -452,28 +531,22 @@ public class Ivy implements Runnable { * @param sregexp a perl regular expression, groups are done with parenthesis * @param callback any objects implementing the IvyMessageListener * interface, on the AWT/Swing framework - * @param async if true, each callback will be run in a separate thread, - * default is false + * @param type if NORMAL (default) it's a normal bind, if ASYNC, each callback will be run in a separate thread, if SWING, the callback will be deferred to the Swing Event Dispatch Thread + * default is NORMAL + * @throws IvyException if there is a problem binding (regexp, network) * @return the id of the regular expression */ - public int bindMsg(String sregexp, IvyMessageListener callback,boolean async ) throws IvyException { + public final int bindMsg(final String sregexp , final IvyMessageListener callback , final BindType type ) throws IvyException { // adds the regexp to our collection in selfIvyClient - int key=selfIvyClient.bindMsg(sregexp,callback,async); + int key = selfIvyClient.bindMsg(sregexp , callback , type); // notifies the other clients this new regexp - for (Enumeration e=clients.elements();e.hasMoreElements();) { - try { - IvyClient c = (IvyClient)e.nextElement(); - if (c!=null) c.sendRegexp(key,sregexp); - } catch (ArrayIndexOutOfBoundsException _ ) { - continue; // gij problem - } - } + for (IvyClient c : clients.values() ) if (c != null) c.sendRegexp(key , sregexp); return key; } /** * Subscribes to a regular expression for one time only, useful for - * requests, in cunjunction with getWBUId() + * requests, in cunjunction with getWBUId(). * * The callback will be executed once and only once, and the agent will * unsubscribe @@ -481,250 +554,227 @@ public class Ivy implements Runnable { * @param sregexp a perl regular expression, groups are done with parenthesis * @param callback any objects implementing the IvyMessageListener * interface, on the AWT/Swing framework + * @throws IvyException if there is a problem during the binding * @return the id of the regular expression */ - public int bindMsgOnce(String sregexp, IvyMessageListener callback ) throws IvyException { + public final int bindMsgOnce(final String sregexp, final IvyMessageListener callback ) throws IvyException { Once once = new Once(callback); - int id = bindMsg(sregexp,once); + int id = bindMsg(sregexp , once); once.setRegexpId(id); return id; } /** - * unsubscribes a regular expression + * unsubscribes a regular expression using the id provided at bind time. * * @param id the id of the regular expression, returned when it was bound + * @throws IvyException if the id is not valid anymore */ - public void unBindMsg(int id) throws IvyException { + public final void unBindMsg(final int id) throws IvyException { selfIvyClient.unBindMsg(id); - for (Enumeration e=clients.elements();e.hasMoreElements();) { - try { - IvyClient ic=(IvyClient)e.nextElement(); - if (ic!=null) ic.delRegexp(id ); - } catch (ArrayIndexOutOfBoundsException _ ) { - continue; - } - - } + for (IvyClient ic : clients.values() ) if (ic != null) ic.delRegexp(id ); } /** - * unsubscribes a regular expression + * unsubscribes a regular expression based on its string. * * @return a boolean, true if the regexp existed, false otherwise or * whenever an exception occured during unbinding * @param re the string for the regular expression */ - public boolean unBindMsg(String re) { return selfIvyClient.unBindMsg(re); } + public final boolean unBindMsg(final String re) { return selfIvyClient.unBindMsg(re); } /** - * adds a bind listener to a bus + * adds a bind listener to a bus. * @param callback is an object implementing the IvyBindListener interface * @return the id of the bind listener, useful if you wish to remove it later * @since 1.2.4 */ - public int addBindListener(IvyBindListener callback){ + public final int addBindListener(final IvyBindListener callback){ ivyBindListenerList.addElement(callback); return ivyBindListenerList.indexOf(callback); } /** - * removes a bind listener + * removes a bind listener. * @param id the id of the bind listener to remove + * @throws IvyException if id is not known * @since 1.2.4 */ - public void removeBindListener(int id) throws IvyException { + public final void removeBindListener(final int id) throws IvyException { try { ivyBindListenerList.removeElementAt(id); } catch (ArrayIndexOutOfBoundsException aie) { - throw new IvyException(id+" is not a valid Id"); + throw new IvyException(id + " is not a valid Id"); } } /** - * adds an application listener to a bus + * adds an application listener to a bus. * @param callback is an object implementing the IvyApplicationListener * interface * @return the id of the application listener, useful if you wish to remove * it later */ - public int addApplicationListener(IvyApplicationListener callback){ + public synchronized final int addApplicationListener(final IvyApplicationListener callback){ ivyApplicationListenerList.addElement(callback); return ivyApplicationListenerList.indexOf( callback ); } /** - * removes an application listener + * removes an application listener. * @param id the id of the application listener to remove + * @throws IvyException if there is no such id */ - public void removeApplicationListener(int id) throws IvyException { + public synchronized final void removeApplicationListener(final int id) throws IvyException { try { ivyApplicationListenerList.removeElementAt(id); } catch (ArrayIndexOutOfBoundsException aie) { - throw new IvyException(id+" is not a valid Id"); + throw new IvyException(id + " is not a valid Id"); } } /** - * sets the filter expression - * @param filter the extensive list of strings beginning the messages + * sets the filter expression. + * @param f the extensive list of strings beginning the messages * @since 1.2.9 * * once this filter is set, when a client subscribes to a regexp of the * form "^dummystring...", there is a check against the filter list. If no * keyword is found to match, the binding is just ignored. */ - public void setFilter(String[] filter){ this.filter=filter; } - - /** - * checks the "validity" of a regular expression if a filter has been set - * @since 1.2.9 - * @param exp a string regular expression - * must be synchronized ( RE is not threadsafe ) - */ - private static RE bounded ; + public final synchronized void setFilter(final String[] f){ + filter = java.util.Arrays.copyOf(f , f.length); + } static { + // compiles the static regexps try { - bounded = new RE("^\\^([a-zA-Z0-9_-]+).*"); - } catch ( RESyntaxException res ) { + rangeRE = Pattern.compile("(\\d+)-(\\d+)"); // tcp range min and max + bounded = Pattern.compile("^\\^([a-zA-Z0-9_-]+).*"); + } catch ( PatternSyntaxException res ) { res.printStackTrace(); System.out.println("Regular Expression bug in Ivy source code ... bailing out"); - System.exit(0); } } - public synchronized boolean CheckRegexp( String exp ) { - if (filter==null) return true; // there's no message filter - if (!bounded.match(exp)) return true; // the regexp is not bounded + + /** + * checks the "validity" of a regular expression if a filter has been set. + * @since 1.2.9 + * @param exp a string regular expression + * TODO must it be synchronized ( RE was not threadsafe, java regexp is ) + */ + public final boolean checkRegexp(final String exp) { + if (filter == null) return true; // there's no message filter + Matcher m = bounded.matcher(exp); + if (!m.matches()) return true; // the regexp is not bounded //System.out.println("the regexp is bounded, "+bounded.getParen(1)); // else the regexp is bounded. The matching string *must* be in the filter - for (int i=0;i<filter.length;i++) { - String prems = bounded.getParen(1); - // traceDebug(" classFilter ["+filter[i]+"] vs regexp ["+prems+"]"); - if (filter[i].compareTo(prems)==0) return true; - } + String prems = m.group(1); + for (String f: filter) if (f.compareTo(prems) == 0) return true; + // traceDebug(" classFilter ["+filter[i]+"] vs regexp ["+prems+"]"); return false; } // a private class used by bindMsgOnce, to ensure that a callback will be // executed once, and only once private class Once implements IvyMessageListener { - boolean received=false; - int id=-1; - IvyMessageListener callback=null; - Once(IvyMessageListener callback){this.callback=callback;} - void setRegexpId(int id){this.id=id;} - public void receive(IvyClient ic,String[] args){ - synchronized(this) { - // synchronized because it will most likely be called - // concurrently, and I *do* want to ensure that it won't - // execute twice - if (received||(callback==null)||(id==-1)) return; - received=true; - try {Ivy.this.unBindMsg(id);} catch (IvyException ie) { ie.printStackTrace(); } - callback.receive(ic,args); + private boolean received = false; + private int id = -1; + private IvyMessageListener ocallback = null; + Once(final IvyMessageListener callback){ ocallback = callback; } + synchronized void setRegexpId(final int fid){ id = fid; } + public void receive(final IvyClient ic , final String[] args){ + synchronized(Once.this) { + // synchronized because it will most likely be called + // concurrently, and I *do* want to ensure that it won't + // execute twice + if (received||(ocallback == null)||(id == -1)) return; + received = true; + try { Ivy.this.unBindMsg(id); } catch (IvyException ie) { ie.printStackTrace(); } + ocallback.receive(ic , args); } } } - + /* invokes the application listeners upon arrival of a new Ivy client */ - protected void clientConnects(IvyClient client){ - for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { - ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).connect(client); - } + protected synchronized final void clientConnects(final IvyClient client){ + for (IvyApplicationListener ial : ivyApplicationListenerList) ial.connect(client); } /* invokes the application listeners upon the departure of an Ivy client */ - protected void clientDisconnects(IvyClient client){ - for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { - ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).disconnect(client); - } + protected synchronized final void clientDisconnects(final IvyClient client){ + for (IvyApplicationListener ial : ivyApplicationListenerList) ial.disconnect(client); } /* invokes the bind listeners */ - protected void regexpReceived(IvyClient client,int id,String sregexp){ - for ( int i = 0 ; i < ivyBindListenerList.size(); i++ ) { - ((IvyBindListener)ivyBindListenerList.elementAt(i)).bindPerformed(client,id,sregexp); - } + protected final void regexpReceived(final IvyClient client , final int id , final String sregexp){ + for (IvyBindListener ibl : ivyBindListenerList) ibl.bindPerformed(client , id , sregexp); } /* invokes the bind listeners */ - protected void regexpDeleted(IvyClient client,int id,String sregexp){ - for ( int i = 0 ; i < ivyBindListenerList.size(); i++ ) { - ((IvyBindListener)ivyBindListenerList.elementAt(i)).unbindPerformed(client,id,sregexp); - } + protected final void regexpDeleted(final IvyClient client , final int id , final String sregexp){ + for (IvyBindListener ibl : ivyBindListenerList) ibl.unbindPerformed(client , id , sregexp); } /* * invokes the application listeners when we are summoned to die * then stops */ - protected void dieReceived(IvyClient client, int id,String message){ - for ( int i=0 ;i<ivyApplicationListenerList.size();i++ ) { - ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).die(client,id,message); - } + protected synchronized final void dieReceived(final IvyClient client , final int id , final String message){ + for (IvyApplicationListener ial : ivyApplicationListenerList) ial.die(client , id , message); } /* invokes the direct message callbacks */ - protected void directMessage( IvyClient client, int id,String msgarg ){ - for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { - ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).directMessage(client,id, msgarg); - } + protected synchronized final void directMessage(final IvyClient client , final int id , final String msgarg ){ + for (IvyApplicationListener ial : ivyApplicationListenerList) ial.directMessage(client , id, msgarg); } /** - * gives the (Vectored) list of IvyClient at a given instant + * gives the (Vectored) list of IvyClient at a given instant. + * @return a vector of IvyClients */ - public Vector getIvyClients() { - Vector v=new Vector(); - for (Enumeration e=clients.elements();e.hasMoreElements();) { - try { - IvyClient ic=(IvyClient)e.nextElement(); - if (ic!=null) v.addElement(ic); - } catch (ArrayIndexOutOfBoundsException _) { - continue; - } - } + public final Vector<IvyClient> getIvyClients() { + Vector<IvyClient> v = new Vector<IvyClient>(); + for (IvyClient ic : clients.values() ) if (ic != null) v.addElement(ic); return v; } /** - * gives a list of IvyClient with the name given in parameter + * gives a list of IvyClient with the name given in parameter. * * @param name The name of the Ivy agent you're looking for + * @return a vector of IvyClients */ - public Vector getIvyClientsByName(String name) { - Vector v=new Vector(); + public final Vector<IvyClient> getIvyClientsByName(final String name) { + Vector<IvyClient> v = new Vector<IvyClient>(); String icname; - for (Enumeration e=clients.elements();e.hasMoreElements();) { - try { - IvyClient ic = (IvyClient)e.nextElement(); - if ( (ic==null)||((icname=ic.getApplicationName())==null) ) break; - if (icname.compareTo(name)==0) v.addElement(ic); - } catch (ArrayIndexOutOfBoundsException _ ) { - continue; - } + for (IvyClient ic : clients.values() ) { + if ( (ic == null)||((icname = ic.getApplicationName()) == null) ) break; + if (icname.compareTo(name) == 0) v.addElement(ic); } return v; } /** - * returns the domain bus + * returns the domain bus. * * @param domainbus if non null, returns the argument * @return It returns domainbus, if non null, * otherwise it returns the IVYBUS property if non null, otherwise it * returns Ivy.DEFAULT_DOMAIN */ - public static String getDomain(String domainbus) { - if ( domainbus == null ) domainbus = System.getProperty("IVYBUS"); - if ( domainbus == null ) domainbus = DEFAULT_DOMAIN; - return domainbus; + public static final String getDomain(final String domainbus) { + String db = null; + db = domainbus; + if ( db == null ) db = System.getProperty("IVYBUS"); + if ( db == null ) db = DEFAULT_DOMAIN; + return db; } /** - * returns the domain bus + * returns the domain bus. * * @since 1.2.8 * @param progname The name of your program, for error message @@ -732,37 +782,35 @@ public class Ivy implements Runnable { * @return returns the domain bus, ascending priority : ivy default bus, IVY_BUS * property, -b domain on the command line */ - public static String getDomainArgs(String progname, String[] args) { - Getopt opt = new Getopt(progname,args,"b:"); + public static final String getDomainArgs(final String progname, final String[] args) { + Getopt opt = new Getopt(progname , args , "b:"); int c; - if ( ((c=opt.getopt())!=-1) && c=='b' ) return opt.getOptarg(); + if ( ((c = opt.getopt()) != -1) && c == 'b' ) return opt.getOptarg(); return getDomain(null); } /** - * returns a "wana be unique" ID to make requests on the bus + * returns a "wana be unique" ID to make requests on the bus. * * @since 1.2.8 * @return returns a string wich is meant to be noisy enough to be unique */ - public String getWBUId() { - return "ID<"+appName+myserial+":"+nextId()+":"+generator.nextInt()+">"; + public final String getWBUId() { + return "ID<" + appName + myserial + ":" + nextId() + ":" + generator.nextInt() + ">"; } private synchronized long nextId() { return current++; } /** - * prints a human readable representation of the list of domains + * prints a human readable representation of the list of domains. * * @since 1.2.9 */ public String domains(String toparse) { - String s=""; + StringBuffer s = new StringBuffer(); Ivy.Domain[] d = parseDomains(toparse); - for (int index=0;index<d.length;index++) { - s+=d[index].getDomainaddr()+":"+d[index].getPort()+" "; - } - return s; + for (Ivy.Domain dd : d) s.append(dd.getDomainaddr() + ":" + dd.getPort() + " "); + return s.toString(); } /////////////////////////////////////////////////////////////////: // @@ -770,57 +818,74 @@ public class Ivy implements Runnable { // /////////////////////////////////////////////////////////////////: - protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException { - return new IvyClient(this,s,port,domachin); + protected IvyClient createIvyClient(Socket s , int port, boolean domachin) throws IOException { + setStarting(true); // this one will stop when the client has finished starting + IvyClient i = new IvyClient(this , s , port , domachin); + i.doStart(); + return i; } - protected synchronized void addClient(IvyClient c) { - if (clients==null||c==null) return; - synchronized (clients) { - clients.put(c.getClientKey(),c); - traceDebug("added "+c+" in clients: "+getClientNames(clients)); - } - } protected synchronized void removeClient(IvyClient c) { - synchronized (clients) { + synchronized(lock) { clients.remove(c.getClientKey()); - traceDebug("removed "+c+" from clients: "+getClientNames(clients)); + traceDebug("removed " + c + " from clients: " + getClientNames(clients)); } } - protected void addHalf(IvyClient c) { - synchronized(half){half.put(c.getClientKey(),c);} - traceDebug("added "+c+" in half: "+getClientNames(half)); + protected synchronized void handShake(IvyClient c) { + synchronized(lock) { + removeHalf(c); + if (clients == null||c == null) return; + // TODO check if it's not already here ! + IvyClient peer = searchPeer(c); + if ((peer == null) || peer.distanceTo(c)>0 ){ + clients.put(c.getClientKey() , c); + setStarting(false); + traceDebug("added " + c + " in clients: " + getClientNames(clients)); + } else { + traceDebug("not adding "+c+" in clients, double connexion detected, removing lowest one"); + try { + c.close(false); + } catch (IOException ioe) { + // TODO + } + } + } } - protected void removeHalf(IvyClient c) { - if (half==null||c==null) return; - synchronized(half){half.remove(c.getClientKey());} - traceDebug("removed "+c+" from half: "+getClientNames(half)); + protected synchronized void addHalf(IvyClient c) { + synchronized(lock){ half.put(c.getClientKey() , c); } + traceDebug("added " + c + " in half: " + getClientNames(half)); } - private boolean shouldIleave(IvyClient ic) { - traceDebug("looking for "+ic+" in "+getClientNames(half)+" and "+getClientNames(clients)); - IvyClient peer=searchPeer(ic); - if (peer==null) return false; - boolean shoulda=peer.compareTo(ic)>0; - traceDebug(ic+" "+ic.toStringExt()+((shoulda)?" must leave ":" must not leave")); - traceDebug(peer+" "+peer.toStringExt()+((!shoulda)?" must leave ":" must not leave")); + protected synchronized void removeHalf(IvyClient c) { + synchronized(lock) { + if (half == null||c == null) return; + half.remove(c.getClientKey()); + } + traceDebug("removed " + c + " from half: " + getClientNames(half)); + } + + /* + private synchronized boolean shouldIleave(IvyClient ic) { + traceDebug("looking for " + ic + " in " + getClientNames(half) + " and " + getClientNames(clients)); + IvyClient peer = searchPeer(ic); + if (peer == null) return false; + boolean shoulda = peer.distanceTo(ic)>0; + traceDebug(ic + " " + ic.toStringExt() + ((shoulda) ? " must leave " : " must not leave")); + traceDebug(peer + " " + peer.toStringExt() + ((!shoulda) ? " must leave " : " must not leave")); return shoulda; } + */ - private IvyClient searchPeer(IvyClient ic) { - IvyClient peer; - for (Enumeration e=half.elements();e.hasMoreElements();) { - peer=(IvyClient)e.nextElement(); - if ((peer!=null)&&(peer.equals(ic))) return peer; - } - synchronized (clients) { - for (Enumeration e=clients.elements();e.hasMoreElements();){ - peer=(IvyClient)e.nextElement(); - if ((peer!=null)&&(peer.equals(ic))) return peer; - } + private synchronized IvyClient searchPeer(IvyClient ic) { + synchronized(lock) { + //for (Enumeration<IvyClient> e = half.elements(); e.hasMoreElements(); ) { +// peer = e.nextElement(); +// if ((peer != null)&&(peer.equals(ic))) return peer; + // } + for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.equals(ic))) return peer; } return null; } @@ -830,78 +895,80 @@ public class Ivy implements Runnable { */ public void run() { traceDebug("service thread started"); // THREADDEBUG - Thread thisThread=Thread.currentThread(); - while(thisThread==serverThread){ + Thread thisThread = Thread.currentThread(); + Socket socket = null; + while ( thisThread == serverThread ){ try { - Socket socket = app.accept(); - if ((thisThread!=serverThread)||stopped) break; // early disconnexion - createIvyClient(socket,0,true); // the peer called me + synchronized (this) { + //System.out.println("DEBUG stopped: "+stopped); + if ((thisThread != serverThread)||stopped) break; // early disconnexion + } + synchronized (lockApp) { + socket = app.accept(); // TODO I can't synchronize on (this) in the run + } + synchronized (this) { + if ((thisThread != serverThread)||stopped) break; // early disconnexion + createIvyClient(socket , 0 , true); // the peer called me + } } catch (InterruptedIOException ie) { - // traceDebug("server socket was interrupted. good"); - if (thisThread!=serverThread) break; + // traceDebug("server socket was interrupted. good"); + if (thisThread != serverThread) break; } catch( IOException e ) { - if (serverThread==thisThread) { + if (serverThread == thisThread) { traceDebug("Error IvyServer exception: " + e.getMessage()); - System.out.println("Ivy server socket reader caught an exception " + e.getMessage()); - System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)"); - System.exit(0); - } else { - traceDebug("my server socket has been closed"); - } + System.out.println("Ivy server socket reader caught an exception " + e.getMessage()); + System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)"); + throw new RuntimeException(); + } else { + traceDebug("my server socket has been closed"); + } } } traceDebug("service thread stopped"); // THREADDEBUG } - protected String getWatcherId() { return watcherId ; } + protected String getAppName() { return appName; } + protected int getAppPort() { return applicationPort; } + protected String getReadyMessage() { return ready_message; } + protected boolean getProtectNewlines() { return doProtectNewlines; } + + protected void setStarting(boolean s) { + synchronized(readyToSend) { + traceDebug("setStarting "+s); + starting = s; + } + } + protected String getWatcherId() { return watcherId; } - protected int getSerial() { return myserial ; } + protected int getSerial() { return myserial; } private void traceDebug(String s){ - if (debug) System.out.println("-->Ivy["+myserial+"]<-- "+s); + if (debug) System.out.println("-->Ivy[" + myserial + "]<-- " + s); } // stuff to guarantee that all the treads have left synchronized void registerThread(Thread t) { sendThreads.addElement(t); } synchronized void unRegisterThread(Thread t) { sendThreads.removeElement(t); } synchronized Thread getOneThread() { - if (sendThreads.size()==0) return null; + if (sendThreads.size() == 0) return null; return (Thread) sendThreads.firstElement(); } // a small private method for debbugging purposes - private String getClientNames(Hashtable t) { - String s = "("; - for (Enumeration e=t.elements();e.hasMoreElements();){ - IvyClient ic = (IvyClient)e.nextElement(); - if (ic!=null) s+=ic.getApplicationName()+","; - } - return s+")"; - } - - private class Domain { - String domainaddr; - int port; - public Domain(String domainaddr,int port) {this.domainaddr=domainaddr;this.port=port;} - public String toString() {return domainaddr+":"+port;} + private String getClientNames(Hashtable<Integer , IvyClient> t) { + StringBuffer s = new StringBuffer(); + s.append("("); + for (IvyClient ic : t.values() ) if (ic != null) s.append(ic.getApplicationName() + ","); + s.append(")"); + return s.toString(); + } + + private static class Domain { + private String domainaddr; + private int port; + public Domain(String ddomainaddr , int dport) { this.domainaddr = ddomainaddr;this.port = dport; } + public String toString() { return domainaddr + ":" + port; } public String getDomainaddr() { return domainaddr; } public int getPort() { return port; } } - // test. Normally, running java fr.dgac.ivy.Ivy should stop in 2.3 seconds :) - public static void main(String[] args) { - Ivy bus = new Ivy("Test Unitaire","TU ready",null); - try { - bus.start(Ivy.getDomainArgs("IvyTest",args)); - System.out.println("waiting 5 seconds for a coucou"); - System.out.println(((bus.waitForMsg("^coucou",5000))!=null)?"coucou received":"coucou not received"); - System.out.println("waiting 5 seconds for IvyProbe"); - System.out.println(((bus.waitForClient("IVYPROBE",5000))!=null)?"Ivyprobe joined the bus":"nobody came"); - System.out.println("random values: "+bus.getWBUId()+", "+bus.getWBUId()+", "+bus.getWBUId()); - bus.stop(); - } catch (IvyException ie) { - System.out.println("Ivy main test error"); - ie.printStackTrace(); - } - } - } // class Ivy |