diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/Ivy.java | 358 | ||||
-rwxr-xr-x | src/IvyApplicationAdapter.java | 4 | ||||
-rwxr-xr-x | src/IvyApplicationListener.java | 8 | ||||
-rw-r--r-- | src/IvyBindListener.java | 30 | ||||
-rwxr-xr-x | src/IvyClient.java | 269 | ||||
-rw-r--r-- | src/IvyDaemon.java | 6 | ||||
-rwxr-xr-x | src/IvyWatcher.java | 91 | ||||
-rw-r--r-- | src/Makefile | 25 | ||||
-rw-r--r-- | src/Probe.java | 73 | ||||
-rw-r--r-- | src/SelfIvyClient.java | 144 |
10 files changed, 768 insertions, 240 deletions
diff --git a/src/Ivy.java b/src/Ivy.java index b11d546..e3cc0bb 100755 --- a/src/Ivy.java +++ b/src/Ivy.java @@ -7,10 +7,28 @@ *<pre> *Ivy bus = new Ivy("Dummy agent","ready",null); *bus.bindMsg("(.*)",myMessageListener); - *bus.start(null); + *bus.start(getDomain(null)); *</pre> * * CHANGELOG: + * 1.2.4: + * - added an accessor for doSendToSelf + * - waitForMsg() and waitForClient() to make the synchronization with + * other Ivy agents easier + * - with the bindAsyncMsg() to subscribe and perform each callback in a + * new Thread + * - bindMsg(regexp,messagelistener,boolean) allow to subscribe with a + * synchrone/asynch exectution + * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg() + * - bindListener are now handled + * - removeApplicationListener can throw IvyException + * - bus.start(null) now starts on getDomain(null), first the IVYBUS + * property, then the DEFAULT_DOMAIN, 127:2010 + * - bindMsg() now throws an IvyException if the regexp is invalid !!! + * BEWARE, this can impact lots of programs ! (fixes J007) + * - no more includes the "broadcasting on " in the domain(String) method + * - new function sendToSelf(boolean) allow us to send messages to + * ourselves * 1.2.3: * - adds a IVYBUS property to propagate the domain once set. This way, * children forked through Ivy java can inherit from the current value. @@ -19,7 +37,7 @@ * added the String domains(String d) function, in order to display the * domain list * 1.2.1: - * bus.start(null) now starts on DEFAULT_DOMAIN + * bus.start(null) now starts on DEFAULT_DOMAIN. ( correction 1.2.4 This was not true.) * added the getDomains in order to correctly display the domain list * checks if the serverThread exists before interrupting it * no has unBindMsg(String) @@ -44,11 +62,12 @@ public class Ivy implements Runnable { /** * the name of the application on the bus */ - String appName; + protected String appName; /** * the protocol version number */ - public static final int PROCOCOLVERSION = 3 ; + public static final int PROTOCOLVERSION = 3 ; + public static final int PROTOCOLMINIMUM = 3 ; /** * the port for the UDP rendez vous, if none is supplied */ @@ -61,24 +80,23 @@ public class Ivy implements Runnable { * the library version, useful for development purposes only, when java is * invoked with -DIVY_DEBUG */ - public static final String libVersion ="1.2.3"; + public static final String libVersion ="1.2.4"; private boolean debug; - private static int serial=0; /* an unique ID for each regexp */ private static int clientSerial=0; /* an unique ID for each IvyClient */ private ServerSocket app; private Vector watchers; private volatile Thread serverThread; // to ensure quick communication of the end - private Hashtable callbacks = new Hashtable(); private Hashtable clients = new Hashtable(); private Vector ivyApplicationListenerList = new Vector(); - private String messages_classes[] = null; - private boolean sendToSelf = false ; + private Vector ivyBindListenerList = new Vector(); + private Vector sendThreads = new Vector(); private boolean stopped = false; - int applicationPort; /* Application port number */ - Hashtable regexp_out = new Hashtable(); - String ready_message = null; - + protected int applicationPort; /* Application port number */ + protected String ready_message = null; + protected boolean doProtectNewlines = false ; + private boolean doSendToSelf = false ; + protected SelfIvyClient selfIvyClient ; public final static int TIMEOUTLENGTH = 3000; /** @@ -96,6 +114,49 @@ public class Ivy implements Runnable { ready_message = message; debug = (System.getProperty("IVY_DEBUG")!=null); if ( appcb != null ) ivyApplicationListenerList.addElement( appcb ); + selfIvyClient=new SelfIvyClient(this,name); + } + + /** + * Waits for a message to be received + * + * @since 1.2.4 + * @param regexp the message we're waiting for to continue the main thread. + * @param timeout in millisecond, 0 if infinite + * @return the IvyClient who sent the message, or null if the timeout is + * reached + */ + public IvyClient waitForMsg(String regexp,int timeout) throws IvyException { + Waiter w = new Waiter(timeout); + int re = bindMsg(regexp,w); + IvyClient ic=w.waitFor(); + unBindMsg(re); + return ic; + } + + /** + * Waits for an other IvyClient to join the bus + * + * @since 1.2.4 + * @param name the name of the client we're waiting for to continue the main thread. + * @param timeout in millisecond, 0 if infinite + * @return the first IvyClient with the name or null if the timeout is + * reached + */ + public IvyClient waitForClient(String name,int timeout) throws IvyException { + IvyClient ic; + if (name==null) throw new IvyException("null name given to waitForClient"); + // first check if client with the same name is on the bus + for (Enumeration e=clients.elements();e.hasMoreElements();) { + ic = (IvyClient)e.nextElement(); + if (name.compareTo(ic.getApplicationName())==0) return ic; + } + // if not enter the waiting loop + WaiterClient w = new WaiterClient(name,timeout); + int i = addApplicationListener(w); + ic=w.waitForClient(); + removeApplicationListener(i); + return ic; } /** @@ -113,7 +174,7 @@ public class Ivy implements Runnable { * */ public void start(String domainbus) throws IvyException { - if (domainbus==null) domainbus=DEFAULT_DOMAIN; + if (domainbus==null) domainbus=getDomain(null); Properties sysProp = System.getProperties(); sysProp.put("IVYBUS",domainbus); try { @@ -123,7 +184,7 @@ public class Ivy implements Runnable { } catch (IOException e) { throw new IvyException("can't open TCP service socket " + e ); } - traceDebug("lib: "+libVersion+" protocol: "+PROCOCOLVERSION+" TCP service open on port "+applicationPort); + traceDebug("lib: "+libVersion+" protocol: "+PROTOCOLVERSION+" TCP service open on port "+applicationPort); watchers = new Vector(); Domain[] d = parseDomains(domainbus); @@ -144,7 +205,11 @@ public class Ivy implements Runnable { int index=0; while ( st.hasMoreTokens()) { String s = st.nextToken() ; - d[index++]=new Domain(IvyWatcher.getDomain(s),IvyWatcher.getPort(s)); + try { + d[index++]=new Domain(IvyWatcher.getDomain(s),IvyWatcher.getPort(s)); + } catch (IvyException ie) { + // do nothing + } } return d; } @@ -177,31 +242,83 @@ public class Ivy implements Runnable { } /** - * Toggles the sending of messages to oneself + * Toggles the sending of messages to oneself, the remote client's + * IvyMessageListeners are processed first, and ourself afterwards. + * @param boolean true if you want to send the message to yourself. Default + * is false + * @since 1.2.4 + */ + public void sendToSelf(boolean b) {doSendToSelf=b;} + + /** + * @param boolean do I send message to myself ? + * @since 1.2.4 + */ + public boolean isSendToSelf() {return doSendToSelf;} + + /** + * returns our self IvyClient. + * @since 1.2.4 + */ + public IvyClient getSelfIvyClient() {return selfIvyClient;} + + /** + * Toggles the encoding/decoding of messages to prevent bugs related to the + * presence of a "\n" + * @param boolean true if you want to send the message to yourself. Default + * is false. + * @since 1.2.5? + * The default escape character is a ESC 0x1A + */ + private void protectNewlines(boolean b) {doProtectNewlines=b;} + + /** + * Performs a pattern matching according to everyone's regexps, and sends + * the results to the relevant ivy agents sequentially + * + * @param message A String which will be compared to the regular + * expressions of the different clients + * @return the number of messages actually sent + */ + public int sendMsg(String message) throws IvyException { + return sendMsg(message,false); + } + + /** + * Performs a pattern matching according to everyone's regexps, and sends + * the results to the relevant ivy agents, using as many threads as needed. * + * @since 1.2.4 + * @param message A String which will be compared to the regular + * expressions of the different clients + * @return always returns -1 */ - public void sendToSelf(boolean b) {sendToSelf=b;} + public int sendAsyncMsg(String message,boolean async) throws IvyException { + return sendMsg(message,true); + } /** * Performs a pattern matching according to everyone's regexps, and sends * the results to the relevant ivy agents. - * <p><em>There is one thread for each client connected, we could also - * create another thread each time we send a message.</em> + * + * @since 1.2.4 * @param message A String which will be compared to the regular * expressions of the different clients - * @return the number of messages actually sent + * @param async if true, the sending will be performed in a separate thread, + * default is false + * @return if async is true, always returns -1, else returns the number of messages actually sent */ - public int sendMsg( String message ) { + public int sendMsg(String message,boolean async) throws IvyException { int count = 0; - // an alternate implementation would one sender thread per client - // instead of one for all the clients. It might be a performance issue + if (doProtectNewlines) + message=IvyClient.encode(message); + else if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1)) + throw new IvyException("newline character not allowed in Ivy messages"); for ( Enumeration e=clients.elements();e.hasMoreElements();) { IvyClient client = (IvyClient)e.nextElement(); - count += client.sendMsg( message ); - } - if (sendToSelf) { - // TODO + count += client.sendMsg(message, async); } + if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(message); return count; } @@ -221,17 +338,57 @@ public class Ivy implements Runnable { * interface, on the AWT/Swing framework * @return the id of the regular expression */ - public int bindMsg(String regexp, IvyMessageListener callback ) { - // creates a new binding (regexp,callback) - Integer key = new Integer(serial++); - regexp_out.put(key,regexp); - callbacks.put(key,callback ); + public int bindMsg(String sregexp, IvyMessageListener callback ) throws IvyException { + return bindMsg(sregexp,callback,false); + } + + /** + * Subscribes to a regular expression with asyncrhonous callback execution. + * + * Same as bindMsg, except that the callback will be executed in a separate + * thread each time. + * WARNING : there is no way to predict the order of execution + * of the * callbacks, i.e. a message received might trigger a callback before + * another one sent before + * + * @since 1.2.4 + * @param regexp a perl regular expression, groups are done with parenthesis + * @param callback any objects implementing the IvyMessageListener + * interface, on the AWT/Swing framework + * @return the int ID of the regular expression. + */ + public int bindAsyncMsg(String sregexp, IvyMessageListener callback ) throws IvyException { + return bindMsg(sregexp,callback,true); + } + + /** + * Subscribes to a regular expression. + * + * The callback will be executed with + * the saved parameters of the regexp as arguments when a message will sent + * by another agent. A program <em>doesn't</em> receive its own messages. + * <p>Example: + * <br>the Ivy agent A performs <pre>b.bindMsg("^Hello (*)",cb);</pre> + * <br>the Ivy agent B performs <pre>b2.sendMsg("Hello world");</pre> + * <br>a thread in A will uun the callback cb with its second argument set + * to a array of String, with one single element, "world" + * @since 1.2.4 + * @param regexp a perl regular expression, groups are done with parenthesis + * @param callback any objects implementing the IvyMessageListener + * interface, on the AWT/Swing framework + * @param async if true, each callback will be run in a separate thread, + * default is false + * @return the id of the regular expression + */ + public int bindMsg(String sregexp, IvyMessageListener callback,boolean async ) throws IvyException { + // adds the regexp to our collection in selfIvyClient + int key=selfIvyClient.bindMsg(sregexp,callback,async); // notifies the other clients this new regexp for (Enumeration e=clients.elements();e.hasMoreElements();) { IvyClient c = (IvyClient)e.nextElement(); - c.sendRegexp(key.intValue(),regexp); + c.sendRegexp(key,sregexp); } - return key.intValue(); + return key; } /** @@ -240,14 +397,9 @@ public class Ivy implements Runnable { * @param id the id of the regular expression, returned when it was bound */ public void unBindMsg(int id) throws IvyException { - Integer key = new Integer(id); - if ( ( regexp_out.remove(key) == null ) - || (callbacks.remove(key) == null ) ) { - throw new IvyException("client wants to remove an unexistant regexp "+id); - } - for (Enumeration e=clients.elements();e.hasMoreElements();) { + selfIvyClient.unBindMsg(id); + for (Enumeration e=clients.elements();e.hasMoreElements();) ((IvyClient)e.nextElement()).delRegexp(id ); - } } /** @@ -258,18 +410,31 @@ public class Ivy implements Runnable { * @param String the string for the regular expression */ public boolean unBindMsg(String re) { - for (Enumeration e=regexp_out.keys();e.hasMoreElements();) { - Integer k = (Integer)e.nextElement(); - if ( ((String)regexp_out.get(k)).compareTo(re) == 0) { - try { - unBindMsg(k.intValue()); - } catch (IvyException ie) { - return false; - } - return true; - } + return selfIvyClient.unBindMsg(re); + } + + /** + * adds a bind listener to a bus + * @param callback is an object implementing the IvyBindListener interface + * @return the id of the bind listener, useful if you wish to remove it later + * @since 1.2.4 + */ + public int addBindListener(IvyBindListener callback){ + ivyBindListenerList.addElement(callback); + return ivyBindListenerList.indexOf(callback); + } + + /** + * removes a bind listener + * @param id the id of the bind listener to remove + * @since 1.2.4 + */ + public void removeBindListener(int id) throws IvyException { + try { + ivyBindListenerList.removeElementAt(id); + } catch (ArrayIndexOutOfBoundsException aie) { + throw new IvyException(id+" is not a valid Id"); } - return false; } /** @@ -281,32 +446,49 @@ public class Ivy implements Runnable { */ public int addApplicationListener(IvyApplicationListener callback){ ivyApplicationListenerList.addElement(callback); - int id = ivyApplicationListenerList.indexOf( callback ); - return id; + return ivyApplicationListenerList.indexOf( callback ); } /** * removes an application listener * @param id the id of the application listener to remove */ - public void removeApplicationListener(int id){ - ivyApplicationListenerList.removeElementAt(id); + public void removeApplicationListener(int id) throws IvyException { + try { + ivyApplicationListenerList.removeElementAt(id); + } catch (ArrayIndexOutOfBoundsException aie) { + throw new IvyException(id+" is not a valid Id"); + } } /* invokes the application listeners upon arrival of a new Ivy client */ - public void connect(IvyClient client){ + protected void clientConnects(IvyClient client){ for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).connect(client); } } - /* invokes the application listeners upon arrival of a new Ivy client */ - void disconnectReceived(IvyClient client){ + /* invokes the application listeners upon the departure of an Ivy client */ + protected void clientDisconnects(IvyClient client){ for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).disconnect(client); } } + /* invokes the bind listeners */ + protected void regexpReceived(IvyClient client,int id,String sregexp){ + for ( int i = 0 ; i < ivyBindListenerList.size(); i++ ) { + ((IvyBindListener)ivyBindListenerList.elementAt(i)).bindPerformed(client,id,sregexp); + } + } + + /* invokes the bind listeners */ + protected void regexpDeleted(IvyClient client,int id,String sregexp){ + for ( int i = 0 ; i < ivyBindListenerList.size(); i++ ) { + ((IvyBindListener)ivyBindListenerList.elementAt(i)).unbindPerformed(client,id,sregexp); + } + } + /* * removes a client from the list */ @@ -316,9 +498,9 @@ public class Ivy implements Runnable { * invokes the application listeners when we are summoned to die * then stops */ - public void dieReceived(IvyClient client, int id){ + public void dieReceived(IvyClient client, int id,String message){ for ( int i=0 ;i<ivyApplicationListenerList.size();i++ ) { - ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).die(client,id); + ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).die(client,id,message); } } @@ -360,21 +542,13 @@ public class Ivy implements Runnable { // /////////////////////////////////////////////////////////////////: - synchronized void addClient(Socket socket,boolean peerCalling) throws IOException { + synchronized void addClient(Socket socket,boolean peerCalling,int protocolVersion) throws IOException { if (stopped) return; - IvyClient client = new IvyClient( this, socket,peerCalling,new Integer(clientSerial++)); + IvyClient client = new IvyClient( this, socket,peerCalling,new Integer(clientSerial++),protocolVersion); clients.put(client.getClientKey(),client); traceDebug(getClientNames()); } - void callCallback(IvyClient client, Integer key, String[] tab) throws IvyException { - IvyMessageListener callback=(IvyMessageListener)callbacks.get(key); - if (callback==null){ - throw new IvyException("(callCallback) Not regexp matching id "+key.intValue()); - } - callback.receive( client, tab); - } - private static String[] myTokenize(String s,String separator) { int index=0, last=0, length=s.length(); Vector v = new Vector(); @@ -402,20 +576,6 @@ public class Ivy implements Runnable { } - /** - * checks the "validity" of a regular expression. - */ - boolean CheckRegexp( String exp ) { - boolean regexp_ok = true; - if ( exp.startsWith( "^" )&&messages_classes!=null) { - regexp_ok=false; - for (int i=0 ; i < messages_classes.length;i++) { - if (messages_classes[i].equals(exp.substring(1))) return true; - } - } - return regexp_ok; - } - /* * prevents two clients from connecting to each other at the same time * there might still be a lingering bug here, that we could avoid with the @@ -433,14 +593,14 @@ public class Ivy implements Runnable { /* * the service socket thread reader main loop */ - public void run() { + public void run() { // System.out.println("Ivy service Thread started"); // THREADDEBUG Thread thisThread=Thread.currentThread(); while(thisThread==serverThread){ try { Socket socket = app.accept(); if ((thisThread!=serverThread)||stopped) break; // early disconnexion - addClient(socket,true); // the peer called me + addClient(socket,true,0); // the peer called me TODO I don't know his protocol version } catch (InterruptedIOException ie) { if (thisThread!=serverThread) break; } catch( IOException e ) { @@ -459,6 +619,21 @@ public class Ivy implements Runnable { private void traceDebug(String s){ if (debug) System.out.println("-->ivy<-- "+s); } + synchronized void registerThread(Thread t) { sendThreads.add(t); } + synchronized void unRegisterThread(Thread t) { sendThreads.remove(t); } + synchronized Thread getOneThread() { + if (sendThreads.size()==0) return null; + return (Thread) sendThreads.firstElement(); + } + + void waitForAll() { + Thread t; + System.out.println("DEVELOPMENT WAITFORALL sendThreads size : " + sendThreads.size()); + try { while ((t=getOneThread())!=null) { t.join(); } } + catch (InterruptedException ie) { System.out.println("waitForAll Interrupted"); } + System.out.println("DEVELOPMENT END WAITFORALL"); + } + /* a small private method for debbugging purposes */ private String getClientNames() { String s = appName+" clients are: "; @@ -469,7 +644,7 @@ public class Ivy implements Runnable { } public String domains(String toparse) { - String s="broadcasting on "; + String s=""; Ivy.Domain[] d = parseDomains(toparse); for (int index=0;index<d.length;index++) { s+=d[index].getDomainaddr()+":"+d[index].getPort()+" "; @@ -495,7 +670,10 @@ public class Ivy implements Runnable { Ivy bus = new Ivy("Test Unitaire","TU ready",null); try { bus.start(null); - try { Thread.sleep(2000); } catch (InterruptedException ie) { } + System.out.println("waiting 5 seconds for a coucou"); + System.out.println(((bus.waitForMsg("^coucou",5000))!=null)?"coucou received":"coucou not received"); + System.out.println("waiting 5 seconds for IvyProbe"); + System.out.println(((bus.waitForClient("IVYPROBE",5000))!=null)?"Ivyprobe joined the bus":"nobody came"); bus.stop(); } catch (IvyException ie) { ie.printStackTrace(); diff --git a/src/IvyApplicationAdapter.java b/src/IvyApplicationAdapter.java index 376ee9a..0a82486 100755 --- a/src/IvyApplicationAdapter.java +++ b/src/IvyApplicationAdapter.java @@ -5,7 +5,7 @@ * @author Yannick Jestin * @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a> * - * an ApplicatinListener class for handling application-level request on the + * an ApplicationListener class for handling application-level request on the * Ivy bus. The methods in this class are empty. This class exists as a * convenience for implementing a subset of the methods of the * applicationlistener. See the AWT 1.1 framework for further information on @@ -19,6 +19,6 @@ package fr.dgac.ivy; public abstract class IvyApplicationAdapter implements IvyApplicationListener { public void connect( IvyClient client ) { } public void disconnect( IvyClient client ) { } - public void die( IvyClient client, int id ) { } + public void die( IvyClient client, int id, String msgarg) { } public void directMessage( IvyClient client, int id,String msgarg ) {} } diff --git a/src/IvyApplicationListener.java b/src/IvyApplicationListener.java index bdfb12c..456a85f 100755 --- a/src/IvyApplicationListener.java +++ b/src/IvyApplicationListener.java @@ -9,6 +9,11 @@ package fr.dgac.ivy; * * The ApplicatinListenr for receiving application level events on the Ivy * bus: connexion, disconnexion, direct messages or requests to quit. + * + * Changelog: + * 1.2.4 + * - sendDie now requires a String argument ! It is MANDATORY, and could + * impact your implementations ! */ public interface IvyApplicationListener extends java.util.EventListener { @@ -26,13 +31,12 @@ public interface IvyApplicationListener extends java.util.EventListener { * invoked when a peer request us to leave the bus * @param client the peer */ - public abstract void die(IvyClient client, int id); + public abstract void die(IvyClient client, int id,String msgarg); /** * invoked when a peer sends us a direct message * @param client the peer * @param id * @param msgarg the message itself - * this is not yet implemented in java. I believe it has no real use :) */ public abstract void directMessage( IvyClient client, int id,String msgarg ); } diff --git a/src/IvyBindListener.java b/src/IvyBindListener.java new file mode 100644 index 0000000..96096f2 --- /dev/null +++ b/src/IvyBindListener.java @@ -0,0 +1,30 @@ +package fr.dgac.ivy; + +/** + * this interface specifies the methods of a BindListener + * + * @author Yannick Jestin + * @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a> + * + * Changelog: + */ + +public interface IvyBindListener extends java.util.EventListener { + + /** + * invoked when a Ivy Client performs a bind + * @param client the peer + * @param int the regexp ID + * @param regexp the regexp + */ + public abstract void bindPerformed(IvyClient client,int id, String regexp); + + /** + * invoked when a Ivy Client performs a unbind + * @param client the peer + * @param int the regexp ID + * @param regexp the regexp + */ + public abstract void unbindPerformed(IvyClient client,int id,String regexp); + +} diff --git a/src/IvyClient.java b/src/IvyClient.java index 8aad199..b69c8d3 100755 --- a/src/IvyClient.java +++ b/src/IvyClient.java @@ -10,6 +10,18 @@ * created for each remote client. * * CHANGELOG: + * 1.2.4: + * - sendBuffer goes synchronized + * - sendMsg now has a async parameter, allowing the use of threads to + * delegate the sending of messages + * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg() + * - breaks the connexion with faulty Ivy clients (either protocol or invalid + * regexps, closes bug J007 (CM)) + * - sendDie now always requires a reason + * - invokes the disconnect applicationListeners at the end of the run() + * loop. + * closes Bug J006 (YJ) + * - changed the access of some functions ( sendRegexp, etc ) to protected * 1.2.3: * - silently stops on InterruptedIOException. * - direct Messages @@ -50,54 +62,64 @@ public class IvyClient implements Runnable { final static int DelRegexp = 4;/* the peer removes one of his regex */ final static int EndRegexp = 5;/* no more regexp in the handshake */ final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */ + /* SchizoToken is aka BeginRegexp in other implementations */ final static int DirectMsg = 7;/* the peer sends a direct message */ final static int Die = 8; /* the peer wants us to quit */ final static int Ping = 9; /* checks the presence of the other */ final static int Pong = 10; /* checks the presence of the other */ - final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ final static String StartArg = "\u0002";/* begin of arguments */ final static String EndArg = "\u0003"; /* end of arguments */ + final static String escape ="\u001A"; + final static char escapeChar = escape.charAt(0); + final static char endArgChar = EndArg.charAt(0); + final static char newLineChar = '\n'; - - private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; + // private variables + private String messages_classes[] = null; private Ivy bus; private Socket socket; private BufferedReader in; private OutputStream out; - private Hashtable regexp_in = new Hashtable(); - private Hashtable regexp_text = new Hashtable(); private int appPort; private boolean peerCalling; private volatile Thread clientThread;// volatile to ensure the quick communication private Integer clientKey ; private static boolean doping = (System.getProperty("IVY_PING")!=null) ; - final static int PINGTIMEOUT = 5000; + private final static int PINGTIMEOUT = 5000; private PINGER pinger; private volatile Thread pingerThread; + private boolean discCallbackPerformed = false; // protected variables String appName; + Hashtable regexps = new Hashtable(); + Hashtable regexpsText = new Hashtable(); + static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; + int protocol; + + IvyClient(){} - IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey) throws IOException { + IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException { appName = "Unknown"; appPort = 0; this.bus = bus; this.socket = socket; this.peerCalling=peerCalling; this.clientKey=clientKey; + this.protocol=protocol; in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = socket.getOutputStream(); - Hashtable regexps=bus.regexp_out; + Hashtable tosend=bus.selfIvyClient.regexpsText; // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion send(SchizoToken,bus.applicationPort,bus.appName); // sends our regexps to the peer - for (Enumeration e = regexps.keys(); e.hasMoreElements(); ) { + for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) { Integer ikey = (Integer)e.nextElement(); - sendRegexp( ikey.intValue(),(String)regexps.get(ikey)); + sendRegexp(ikey.intValue(),(String)tosend.get(ikey)); } send( EndRegexp,0,""); // spawns a thread to manage the incoming traffic on this @@ -111,88 +133,138 @@ public class IvyClient implements Runnable { } } + public String toString() { return "IvyClient "+bus.appName+":"+appName; } + /** * returns the name of the remote agent. */ public String getApplicationName() { return appName ; } - Integer getClientKey() { return clientKey ; } - /** * allow an Ivy package class to access the list of regexps at a * given time. * perhaps we should implement a new IvyApplicationListener method to * allow the notification of regexp addition and deletion + * The content is not modifyable because String are not mutable, and cannot + * be modified once they are create. + * @see getRegexpsArray to get a String[] result */ - Enumeration getRegexps() { return regexp_text.elements(); } + public Enumeration getRegexps() { return regexpsText.elements(); } - int getAppPort() { return appPort ; } - - void sendRegexp(int id,String regexp) { - send(AddRegexp,id,regexp); /* perhaps we should perform some checking here */ + /** + * allow an Ivy package class to access the list of regexps at a + * given time. + * @since 1.2.4 + */ + public String[] getRegexpsArray() { + String[] s = new String[regexpsText.size()]; + int i=0; + for (Enumeration e=getRegexps();e.hasMoreElements();) + s[i++]=(String)e.nextElement(); + return s; } - public void delRegexp(int id) {send( DelRegexp,id,"");} - /** * sends a direct message to the peer * @param id the numeric value provided to the remote client * @param message the string that will be match-tested */ - public void sendDirectMsg(int id,String message) { send(DirectMsg,id,message); } + public void sendDirectMsg(int id,String message) throws IvyException { + if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1)) + throw new IvyException("newline character not allowed in Ivy messages"); + send(DirectMsg,id,message); + } - /** - * closes the connexion to the peer. - * @param notify should I send Bye message ? - * the thread managing the socket is stopped - */ - void close(boolean notify) throws IOException { + /* closes the connexion to the peer */ + protected void close(boolean notify) throws IOException { + bus.waitForAll(); traceDebug("closing connexion to "+appName); if (doping&&(pinger!=null)) { pinger.stopPinging(); } if (notify) sendBye("hasta la vista"); stopListening(); - // bus.clientDisconnect(this); - socket.close(); // should I also close in and out ? + socket.close(); } /** - * sends the substrings of a message to the peer for each matching regexp. - * @param message the string that will be match-tested - * @return the number of messages sent to the peer + * asks the remote client to leave the bus. + * @param message the message that will be carried */ - int sendMsg( String message ) { + public void sendDie(String message) { send(Die,0,message); } + + /** + * checks the "validity" of a regular expression. + * @param exp the string being a regular expression + * @return true if the regexp is valid + * @since 1.2.4 + */ + public boolean CheckRegexp( String exp ) { + boolean ok = true; + if ( exp.startsWith( "^" )&&messages_classes!=null) { + ok=false; + for (int i=0 ; i < messages_classes.length;i++) { + if (messages_classes[i].equals(exp.substring(1))) return true; + } + } + return ok; + } + + /////////////////////////////////////////////////// + // + // PROTECTED METHODS + // + /////////////////////////////////////////////////// + static String decode(String s) { return s.replace(escapeChar,'\n'); } + static String encode(String s) { return s.replace('\n',escapeChar); } + Integer getClientKey() { return clientKey ; } + int getAppPort() { return appPort ; } + void sendRegexp(int id,String regexp) {send(AddRegexp,id,regexp);} + void delRegexp(int id) {send( DelRegexp,id,"");} + + int sendMsg(String message,boolean async) { + if (async) { + new Sender(message); + return -1; + } else { return sendMsg(message); } + } + + private int sendMsg(String message) { int count = 0; - for (Enumeration e = regexp_in.keys();e.hasMoreElements();) { + for (Enumeration e = regexps.keys();e.hasMoreElements();) { Integer key = (Integer)e.nextElement(); - RE regexp = (RE)regexp_in.get(key); + RE regexp = (RE)regexps.get(key); + int nb = regexp.getNumSubs(); REMatch result = regexp.getMatch(message); - if ( result != null ) { - send(Msg,key,regexp.getNumSubs(),result); - count++; - } + if (result==null) continue; // no match + count++; // match + send(Msg,key,regexp.getNumSubs(),result); } return count; } - - void stopListening() { + + /////////////////////////////////////////////////// + // + // PRIVATE METHODS + // + /////////////////////////////////////////////////// + + /* interrupt the listener thread */ + private void stopListening() { Thread t = clientThread; if (t==null) return; // we can be summoned to quit from two path at a time clientThread=null; t.interrupt(); } - /** + /* * compares two peers the id is the couple (host,service port). - * @param clnt the other peer - * @return true if the peers are similir. This should not happen, it is bad - * © ® (tm) + * true if the peers are similir. This should not happen, it is bad */ - boolean sameClient( IvyClient clnt ) { + protected boolean sameClient( IvyClient clnt ) { return ( appPort != 0 && appPort == clnt.appPort ) && ( getRemoteAddress() == clnt.getRemoteAddress() ) ; } - /** + /* * the code of the thread handling the incoming messages. */ public void run() { @@ -209,7 +281,10 @@ public class IvyClient implements Runnable { if ((msg=in.readLine()) != null ) { if (clientThread!=thisThread) break; // early stop during readLine() if (doping && (pingerThread!=null)) pingerThread.interrupt(); - newParseMsg(msg); + if (!newParseMsg(msg)) { + close(true); + break; + } } else { traceDebug("readline null ! leaving the thead"); break; @@ -226,9 +301,12 @@ public class IvyClient implements Runnable { } traceDebug("normally Disconnected from "+ appName); traceDebug("Thread stopped"); + // invokes the disconnect applicationListeners + if (!discCallbackPerformed) bus.clientDisconnects(this); + discCallbackPerformed=true; } - private void sendBuffer( String buffer ) throws IvyException { + private synchronized void sendBuffer( String buffer ) throws IvyException { buffer += "\n"; try { out.write(buffer.getBytes() ); @@ -237,8 +315,9 @@ public class IvyClient implements Runnable { traceDebug("I can't send my message to this client. He probably left"); // first, I'm not a first class IvyClient any more bus.removeClient(this); - // invokes the die applicationListeners - bus.disconnectReceived(this); + // invokes the disconnect applicationListeners + if (!discCallbackPerformed) bus.clientDisconnects(this); + discCallbackPerformed=true; try { close(false); } catch (IOException ioe) { @@ -258,12 +337,8 @@ public class IvyClient implements Runnable { private void send(int type, Integer id, int nbsub, REMatch result) { String buffer = type+" "+id+StartArg; - // Start at 1 because group 0 represent entire matching - for(int sub = 1; sub <= nbsub; sub++) { - if (result.getStartIndex(sub) > -1) { - buffer += result.toString(sub)+EndArg; - } - } + for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1) + buffer += result.toString(sub)+EndArg; try { sendBuffer(buffer); } catch (IvyException ie ) { @@ -293,24 +368,33 @@ public class IvyClient implements Runnable { return s; } - private void newParseMsg(String s) throws IvyException { + private boolean newParseMsg(String s) throws IvyException { byte[] b = s.getBytes(); int from=0,to=0,msgType; Integer msgId; while ((to<b.length)&&(b[to]!=' ')) to++; - if (to>=b.length) throw new IvyException("protocol error"); + // return false au lieu de throw + if (to>=b.length) { + System.out.println("protocol error from "+appName); + return false; + } try { msgType = Integer.parseInt(s.substring(from,to)); } catch (NumberFormatException nfe) { - throw new IvyException("protocol error on msgType"); + System.out.println("protocol error on msgType from "+appName); + return false; } from=to+1; while ((to<b.length)&&(b[to]!=2)) to++; - if (to>=b.length) throw new IvyException("protocol error"); + if (to>=b.length) { + System.out.println("protocol error from "+appName); + return false; + } try { msgId = new Integer(s.substring(from,to)); } catch (NumberFormatException nfe) { - throw new IvyException("protocol error on identifier"); + System.out.println("protocol error from "+appName+" "+s.substring(from,to)+" is not a number"); + return false; } from=to+1; switch (msgType) { @@ -319,7 +403,8 @@ public class IvyClient implements Runnable { // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners - bus.dieReceived(this,msgId.intValue()); + String message=s.substring(from,b.length); + bus.dieReceived(this,msgId.intValue(),message); // makes the bus die bus.stop(); try { @@ -334,7 +419,8 @@ public class IvyClient implements Runnable { // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners - bus.disconnectReceived(this); + if (!discCallbackPerformed) bus.clientDisconnects(this); + discCallbackPerformed=true; try { close(false); } catch (IOException ioe) { @@ -343,24 +429,28 @@ public class IvyClient implements Runnable { break; case AddRegexp: String regexp=s.substring(from,b.length); - if ( bus.CheckRegexp(regexp) ) { + if ( CheckRegexp(regexp) ) { try { - regexp_in.put(msgId,new RE(regexp)); - regexp_text.put(msgId,regexp); + regexps.put(msgId,new RE(regexp)); + regexpsText.put(msgId,regexp); + bus.regexpReceived(this,msgId.intValue(),regexp); } catch (REException e) { - throw new IvyException("regexp error " +e.getMessage()); + // the remote client sent an invalid regexp ! + System.out.println("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp"); + return true; } } else { throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName); } break; case DelRegexp: - regexp_in.remove(msgId); - regexp_text.remove(msgId); + regexps.remove(msgId); + String text=(String)regexpsText.remove(msgId); + bus.regexpDeleted(this,msgId.intValue(),text); break; case EndRegexp: - bus.connect(this); - /* + bus.clientConnects(this); + /* TODO check with the protocol itself. * the peer is perhaps not ready to handle this message * an assymetric processing should be written */ @@ -371,7 +461,7 @@ public class IvyClient implements Runnable { while (to<b.length) { while ( (to<b.length) && (b[to]!=3) ) to++; if (to<b.length) { - v.addElement(s.substring(from,to)); + v.addElement(decode(s.substring(from,to))); to++; from=to; } @@ -382,7 +472,7 @@ public class IvyClient implements Runnable { // for developpemnt purposes // System.out.println(" *"+tab[i]+"* "+(tab[i]).length()); } - bus.callCallback(this,msgId,tab); + bus.selfIvyClient.callCallback(this,msgId,tab); break; case Pong: String paramPong=s.substring(from,b.length); @@ -398,7 +488,7 @@ public class IvyClient implements Runnable { String error=s.substring(from,b.length); traceDebug("Error msg "+msgId+" "+error); break; - case SchizoToken: + case SchizoToken: // aka BeginRegexp in other implementations appName=s.substring(from,b.length); appPort=msgId.intValue(); if ( bus.checkConnected(this) ) { @@ -415,25 +505,19 @@ public class IvyClient implements Runnable { bus.directMessage( this, msgId.intValue(), direct ); break; default: - throw new IvyException("protocol error, unknown message type "+msgType); + System.out.println("protocol error from "+appName+", unknown message type "+msgType); + return false; } + return true; } - void sendPong(String s) {send(Pong,0,s);} - void sendPing(String s) {send(Ping,0,s);} - + protected void sendPong(String s) {send(Pong,0,s);} + protected void sendPing(String s) {send(Ping,0,s);} private void sendBye() {send(Bye,0,"");} private void sendBye(String message) {send(Bye,0,message);} - - public void sendDie() { send(Die,0,""); } - public void sendDie(String message) {send(Die,0,message);} private InetAddress getRemoteAddress() { return socket.getInetAddress(); } - public String toString() { - return "IvyClient "+bus.appName+":"+appName; - } - private void traceDebug(String s){ if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s); } @@ -455,4 +539,19 @@ public class IvyClient implements Runnable { public void stopPinging() { isPinging=false; pingerThread.interrupt();} } + // a class to perform the threaded execution of each new message + // this is an experimental feature introduced in 1.2.4 + class Sender implements Runnable { + String message; + private Thread t; + public Sender(String message) { + this.message=message; + t=new Thread(Sender.this); + bus.registerThread(t); + t.start(); + bus.unRegisterThread(t); + } + public void run() { sendMsg(message); } + } // class Sender + } diff --git a/src/IvyDaemon.java b/src/IvyDaemon.java index 56019c1..b843a63 100644 --- a/src/IvyDaemon.java +++ b/src/IvyDaemon.java @@ -119,7 +119,11 @@ public class IvyDaemon implements Runnable { while (true) { msg=in.readLine(); if (msg==null) break; - bus.sendMsg(msg); + try { + bus.sendMsg(msg); + } catch (IvyException ie) { + System.out.println("incorrect characters whithin the message. Not sent"); + } } } catch (IOException ioe) { traceDebug("Subreader exception ..."); diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java index 98956ea..7698632 100755 --- a/src/IvyWatcher.java +++ b/src/IvyWatcher.java @@ -14,6 +14,17 @@ * thing. * * CHANGELOG: + * 1.2.5: + * - getDomain now sends IvyException for malformed broadcast addresses + * 1.2.4: + * - sends the broadcast before listening to the other's broadcasts. + * TODO wait for all the broadcast to be sent before starting the listen + * mode + * - (REMOVED) allows the connexion from a remote host with the same port number + * it's too complicated to know if the packet is from ourselves... + * - deals with the protocol errors in a more efficient way. The goal is not + * to loose our connectivity because of a rude agent. + * fixes Bug J005 (YJ + JPI) * 1.2.3: * - the packet sending is done in its own thread from now on (PacketSender) * I don't care stopping it, since it can't be blocked. @@ -51,6 +62,7 @@ class IvyWatcher implements Runnable { private boolean isMulticastAddress = false; private Ivy bus; /* master bus controler */ private DatagramSocket broadcast; /* supervision socket */ + private InetAddress localhost,loopback; private String domainaddr; private int port; private volatile Thread listenThread; @@ -75,6 +87,9 @@ class IvyWatcher implements Runnable { ((MulticastSocket)broadcast).joinGroup(group); } broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH); + localhost=InetAddress.getLocalHost(); + loopback=InetAddress.getByName(null); + } catch ( UnknownHostException uhe ) { } catch ( IOException e ) { throw new IvyException("IvyWatcher I/O error" + e ); } @@ -89,9 +104,9 @@ class IvyWatcher implements Runnable { traceDebug("beginning of a watcher Thread"); byte buf[] = new byte[256]; DatagramPacket packet=new DatagramPacket(buf, 256); + InetAddress remotehost=null; try { while( listenThread==thisThread ) { - int port; try { broadcast.receive(packet); if (listenThread!=thisThread) break; // I was summoned to leave during the receive @@ -99,37 +114,43 @@ class IvyWatcher implements Runnable { for (int i=0;i<buf.length;i++) { buf[i]=10; } // clean up the buffer after each message // BUGFIX ? I change 0 to 10 in order to avoid a bug - InetAddress remotehost = packet.getAddress(); - traceDebug("BUSWATCHER Receive Broadcast from "+ remotehost.getHostName()+":"+packet.getPort()); + remotehost = packet.getAddress(); + traceDebug("BUSWATCHER Receive Broadcast from "+remotehost.getHostName()+":"+packet.getPort()); // TODO if ( !isInDomain( remotehost ) ) continue; - // TODO get rid of the StringTokenizer ? - StringTokenizer st = new StringTokenizer(msg); - if ( !st.hasMoreTokens()) { - System.err.println("Bad format "+msg); - continue; - } - int version = Integer.parseInt( st.nextToken() ); - if ( version != bus.PROCOCOLVERSION ) { - System.err.println("Ignoring bad protocol version broadcast"); - continue; - } - if ( ! st.hasMoreTokens()) { - System.err.println("Bad format "+msg); - continue; - } - port = Integer.parseInt( st.nextToken() ); - if ( (bus.applicationPort == port) ) continue; - traceDebug("BUSWATCHER Broadcast de " - +packet.getAddress().getHostName() - +":"+packet.getPort()+" port "+port+" version "+version); try { + RE re = new RE("([0-9]*) ([0-9]*)"); + REMatch result = re.getMatch(msg); + if (result==null) { + System.err.println("Ignoring bad format broadcast from "+remotehost); + continue; + } + int version = Integer.parseInt(result.toString(1)); + if ( version < bus.PROTOCOLMINIMUM ) { + System.err.println("Ignoring bad protocol version "+remotehost+" we need "+ bus.PROTOCOLMINIMUM+" minimum"); + continue; + } + int port = Integer.parseInt(result.toString(2)); + // allows the connexion from a remote host with the same port number + // if ( ( (remotehost.equals(localhost)) || (remotehost.equals(loopback)) ) + // && (bus.applicationPort==port)) { + if (bus.applicationPort==port) { + traceDebug("ignoring my own broadcast. OK"); + continue; // it's me + } + traceDebug("Broadcast de " +packet.getAddress().getHostName() + +":"+packet.getPort()+" port "+port+" version "+version); Socket socket = new Socket( remotehost, port ); - bus.addClient(socket,false); + bus.addClient(socket,false,version); + } catch (REException ree) { + ree.printStackTrace(); + System.exit(-1); + } catch (NumberFormatException nfe) { + System.err.println("Ignoring bad format broadcast from "+remotehost); + continue; } catch ( UnknownHostException e ) { System.err.println("Unkonwn host "+remotehost + e.getMessage()); } catch ( IOException e) { - System.err.println("can't connect to "+remotehost+" port "+ - port+e.getMessage()); + System.err.println("can't connect to "+remotehost+" port "+ port+e.getMessage()); } } catch (InterruptedIOException jii ){ if (thisThread!=listenThread) { break ;} @@ -184,9 +205,9 @@ class IvyWatcher implements Runnable { } synchronized void start() throws IvyException { - String hello = bus.PROCOCOLVERSION + " " + bus.applicationPort + "\n"; - listenThread.start(); + String hello = bus.PROTOCOLVERSION + " " + bus.applicationPort + "\n"; new PacketSender(hello); // notifies our arrival on each domain: protocol version + port + listenThread.start(); } /* @@ -209,18 +230,24 @@ class IvyWatcher implements Runnable { } */ - static String getDomain(String net) { + // TODO this is buggy :-\ try it on a named multicast address just to see + static String getDomain(String net) throws IvyException { + // System.out.println("debug: net=[" + net+ "]"); int sep_index = net.lastIndexOf( ":" ); if ( sep_index != -1 ) { net = net.substring(0,sep_index); } try { net += ".255.255.255"; RE exp = new RE( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*"); net = exp.substitute( net , "$1" ); + if (net==null) { + System.out.println("Bad broascat addr " + net); + throw new IvyException("bad broadcast addr"); + } } catch ( REException e ){ - System.out.println("Bad broascat addr " + net); - return null; + System.out.println(e); + System.exit(0); } - // System.out.println("net: "+net); + // System.out.println("debug: returning net=[" + net+ "]"); return net; } diff --git a/src/Makefile b/src/Makefile index 82fa423..54cfdec 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,19 +1,28 @@ -GNUPATH=/usr/share/java/repository:/usr/share/java/gnu.getopt.0.9.jar:/usr/share/java/gnu-regexp-1.1.3.jar +GNUPATH=/usr/share/java/repository:/usr/share/java/gnu-regexp-1.1.3.jar +#GNUPATH=/usr/share/java/repository:/usr/share/java/gnu.getopt.0.9.jar:/usr/share/java/gnu-regexp-1.1.3.jar +#GNUPATH=${HOME}/JavaFactory ####################################### # generic setup ####################################### - JAVAC = javac -JAVACOPTS = -d . -deprecation -CLASSPATH = -classpath .:$(GNUPATH) +# JAVAC = javac +#JAVACOPTS = -d . -deprecation +#CLASSPATH = -classpath .:$(GNUPATH) ####################################### -# jikes setup on my box +# gcj setup ####################################### -# RTPATH = /usr/local/j2sdk1.4.1/jre/lib/rt.jar +# JAVAC = gcj #JAVACOPTS = -d . -deprecation -# JAVAC = jikes -#CLASSPATH = -classpath .:$(RTPATH) +#CLASSPATH = -classpath .:$(GNUPATH) + +####################################### +# jikes setup on my box +####################################### + RTPATH = /usr/lib/j2se/1.4/jre/lib/rt.jar +JAVACOPTS = -d . -deprecation + JAVAC = jikes +CLASSPATH = -classpath .:$(RTPATH):$(GNUPATH) ####################################### # blackdown jdk118 setup diff --git a/src/Probe.java b/src/Probe.java index bd37089..8e4f9c2 100644 --- a/src/Probe.java +++ b/src/Probe.java @@ -7,6 +7,9 @@ * (c) CENA * * Changelog: + * 1.2.4 + * - now uses the bindListener paradigm to display the binding/unbinding dynamically + * - adds the -s (send to self) command line switch * 1.2.3 * - now allows directMessages with the .direct command * - the parseMsg is being rewritten with regexps @@ -40,17 +43,18 @@ import java.util.*; import gnu.getopt.Getopt; import gnu.regexp.*; -public class Probe implements IvyApplicationListener, IvyMessageListener, Runnable { +public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBindListener, Runnable { public static final String helpCommands = "Available commands:\n.die CLIENTNAME sends a die message\n.direct CLIENTNAME ID MESSAGE sends the direct message to the client, with a message id set to the numerical ID\n.bye quits the application\n.quit idem\n.list lists the available clients\n.ping sends a ping request if IVY_PING is enabled\n.bind REGEXP binds to a regexp at runtime\n.unbind REGEXP unbinds to a regexp at runtime"; - public static final String helpmsg = "usage: java fr.dgac.ivy.Probe [options] [regexp]\n\t-b BUS\tspecifies the Ivy bus domain\n\t-n ivyname (default JPROBE)\n\t-q\tquiet, no tty output\n\t-d\tdebug\n\t-t\ttime stamp each message\n\t-h\thelp\n\n\t regexp is a Perl5 compatible regular expression"; + public static final String helpmsg = "usage: java fr.dgac.ivy.Probe [options] [regexp]\n\t-b BUS\tspecifies the Ivy bus domain\n\t-n ivyname (default JPROBE)\n\t-q\tquiet, no tty output\n\t-d\tdebug\n\t-t\ttime stamp each message\n\t-s\tsends to self\n\t-h\thelp\n\n\t regexp is a Perl5 compatible regular expression"; public static void main(String[] args) throws IvyException { - Getopt opt = new Getopt("Probe",args,"n:b:dqht"); + Getopt opt = new Getopt("Probe",args,"n:b:dqsht"); int c; boolean timestamp=false; boolean quiet=false; + boolean sendsToSelf=false; String domain=Ivy.getDomain(null); String name="JPROBE"; while ((c = opt.getopt()) != -1) switch (c) { @@ -62,17 +66,24 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, Runnab case 'n': name=opt.getOptarg(); break; case 'q': quiet=true; break; case 't': timestamp=true; break; + case 's': sendsToSelf=true; break; case 'h': default: System.out.println(helpmsg); System.exit(0); } Probe p = new Probe(new BufferedReader(new InputStreamReader(System.in)),timestamp,quiet,System.getProperty("IVY_DEBUG")!=null); p.setExitOnDie(true); Ivy bus=new Ivy(name,name+" ready",null); + bus.addBindListener(p); + bus.sendToSelf(sendsToSelf); for (int i=opt.getOptind();i<args.length;i++) { - if (!quiet) System.out.println("you want to subscribe to " + args[i]); - bus.bindMsg(args[i],p); + try { + bus.bindMsg(args[i],p); + if (!quiet) System.out.println("you have subscribed to " + args[i]); + } catch (IvyException ie) { + System.out.println("you have not subscribed to " + args[i]+ ", this regexp is invalid"); + } } - if (!quiet) System.out.println(bus.domains(domain)); + if (!quiet) System.out.println("broadcasting on "+bus.domains(domain)); bus.start(domain); p.start(bus); } @@ -135,34 +146,44 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, Runnab REMatch result; traceDebug("parsing the ["+s+"] (length "+s.length()+") string"); // crude parsing of the ".xyz" commands - // TODO use regexps instends of String.lastIndexOf(String). Example - // provided with .direct ! if (s.length()==0) { - println("-> Sent to " +bus.sendMsg(s)+" peers"); + try { + println("-> Sent to " +bus.sendMsg(s)+" peers"); + } catch (IvyException ie) { + println("-> not sent, the message contains incorrect characters"); + } } else if ((result=directMsgRE.getMatch(s))!=null) { String target = result.toString(1); int id = Integer.parseInt(result.toString(2)); String message = result.toString(3); Vector v=bus.getIvyClientsByName(target); if (v.size()==0) println("no Ivy client with the name \""+target+"\""); - for (int i=0;i<v.size();i++) ((IvyClient)v.elementAt(i)).sendDirectMsg(id,message); + try { + for (int i=0;i<v.size();i++) ((IvyClient)v.elementAt(i)).sendDirectMsg(id,message); + } catch (IvyException ie) { + println("-> not sent, the message contains incorrect characters"); + } return; } else if (s.lastIndexOf(".die ")>=0){ String target=s.substring(5); Vector v=bus.getIvyClientsByName(target); if (v.size()==0) println("no Ivy client with the name \""+target+"\""); - for (int i=0;i<v.size();i++) ((IvyClient)v.elementAt(i)).sendDie(); + for (int i=0;i<v.size();i++) ((IvyClient)v.elementAt(i)).sendDie("java probe wants you to leave the bus"); } else if (s.lastIndexOf(".unbind ")>=0){ String regexp=s.substring(8); if (bus.unBindMsg(regexp)) { - println("you want to unsubscribe to " + regexp); + println("you have unsubscribed to " + regexp); } else { println("you can't unsubscribe to " + regexp + ", your're not subscribed to it"); } } else if (s.lastIndexOf(".bind ")>=0){ String regexp=s.substring(6); - println("you want to subscribe to " + regexp); - bus.bindMsg(regexp,this); + try { + bus.bindMsg(regexp,this); + println("you have now subscribed to " + regexp); + } catch (IvyException ie) { + System.out.println("warning, the regular expression '" + regexp + "' is invalid. Not bound !"); + } } else if (s.lastIndexOf(".ping ")>=0){ String target=s.substring(6); Vector v=bus.getIvyClientsByName(target); @@ -187,28 +208,40 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, Runnab println("this command is not recognized"); println(helpCommands); } else { - println("-> Sent to " +bus.sendMsg(s)+" peers"); + try { + println("-> Sent to " +bus.sendMsg(s)+" peers"); + } catch (IvyException ie) { + println("-> not sent, the line contains incorrect characters"); + } } } // parseCommand + public void bindPerformed(IvyClient client,int id,String re) { + println(client.getApplicationName() + " subscribes to " +re ); + } + + public void unbindPerformed(IvyClient client,int id,String re) { + println(client.getApplicationName() + " unsubscribes to " +re ); + } + public void connect(IvyClient client) { println(client.getApplicationName() + " connected " ); - for (java.util.Enumeration e=client.getRegexps();e.hasMoreElements();) - println(client.getApplicationName() + " subscribes to " +e.nextElement() ); + // for (java.util.Enumeration e=client.getRegexps();e.hasMoreElements();) + // println(client.getApplicationName() + " subscribes to " +e.nextElement() ); } public void disconnect(IvyClient client) { println(client.getApplicationName() + " disconnected " ); } - public void die(IvyClient client, int id) { - println("received die msg from " + client.getApplicationName() +" good bye"); + public void die(IvyClient client, int id,String msgarg) { + println("received die msg from " + client.getApplicationName() +" with the message: "+msgarg+", good bye"); /* I cannot stop the readLine(), because it is native code */ if (exitOnDie) System.exit(0); } public void directMessage(IvyClient client, int id, String arg) { - println(client.getApplicationName() + " direct Message "+ id + arg ); + println(client.getApplicationName() + " sent the direct Message, id: "+ id + ", arg: "+ arg ); } public void receive(IvyClient client, String[] args) { diff --git a/src/SelfIvyClient.java b/src/SelfIvyClient.java new file mode 100644 index 0000000..09d9f80 --- /dev/null +++ b/src/SelfIvyClient.java @@ -0,0 +1,144 @@ +/** + * A private Class for ourself on the bus + * + * @author Yannick Jestin + * @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a> + * @since 1.2.4 + * + * CHANGELOG: + * 1.2.4: + * - adds a the threaded option for callbacks + * - Matthieu's bugreport on unBindMsg() + */ + +package fr.dgac.ivy ; +import java.util.*; +import gnu.regexp.*; + +class SelfIvyClient extends IvyClient { + + private Ivy bus; + private static int serial=0; /* an unique ID for each regexp */ + private Hashtable callbacks=new Hashtable(); + private Hashtable threadedFlag=new Hashtable(); + private boolean massThreaded=false; + + public void sendDirectMsg(int id,String message) { + bus.directMessage(this,id,message); + } + public void sendDie(String message) { bus.dieReceived(this,0,message); } + + protected SelfIvyClient(Ivy bus,String appName) { + this.bus=bus; + this.protocol=Ivy.PROTOCOLVERSION; + this.appName=appName; + } + + synchronized protected int bindMsg(String sregexp, IvyMessageListener callback, boolean threaded ) throws IvyException { + // creates a new binding (regexp,callback) + try { + RE re=new RE(sregexp); + Integer key = new Integer(serial++); + regexps.put(key,re); + regexpsText.put(key,sregexp); + callbacks.put(key,callback); + threadedFlag.put(key,new Boolean(threaded)); + return key.intValue(); + } catch (REException ree) { + throw new IvyException("Invalid regexp " + sregexp); + } + } + + synchronized protected void unBindMsg(int id) throws IvyException { + Integer key = new Integer(id); + if ( ( regexps.remove(key) == null ) + || (regexpsText.remove(key) == null ) + || (callbacks.remove(key) == null ) + || (threadedFlag.remove(key) == null ) + ) + throw new IvyException("client wants to remove an unexistant regexp "+id); + } + + synchronized protected boolean unBindMsg(String re) { + for (Enumeration e=regexpsText.keys();e.hasMoreElements();) { + Integer k = (Integer)e.nextElement(); + if ( ((String)regexps.get(k)).compareTo(re) == 0) { + try { + bus.unBindMsg(k.intValue()); + } catch (IvyException ie) { + return false; + } + return true; + } + } + return false; + } + + protected int sendSelfMsg(String message) { + int count = 0; + for (Enumeration e = regexps.keys();e.hasMoreElements();) { + Integer key = (Integer)e.nextElement(); + RE regexp = (RE)regexps.get(key); + String sre = (String)regexpsText.get(key); + int nb = regexp.getNumSubs(); + REMatch result = regexp.getMatch(message); + if (result==null) continue; + count++; + callCallback(this,key,toArgs(nb,result)); + } + return count; + } + + protected void callCallback(IvyClient client, Integer key, String[] tab) { + IvyMessageListener callback=(IvyMessageListener)callbacks.get(key); + boolean threaded=((Boolean)threadedFlag.get(key)).booleanValue(); + if (callback==null) { + System.out.println("(callCallback) Not regexp matching id "+key.intValue()); + System.exit(0); + } + if (!threaded) { + // runs the callback in the same thread + callback.receive(client, tab); + } else { + // starts a new Thread for each callback ... + new Runner(callback,client,tab); + } + } + + private String[] toArgs(int nb,REMatch result) { + String[] args = new String[nb]; + for(int sub=1;sub<=nb;sub++) { + args[sub-1]=result.toString(sub); + if (bus.doProtectNewlines) args[sub-1]=decode(args[sub-1]); + } + return args; + } + + public String toString() { return "IvyClient (ourself)"+bus.appName+":"+appName; } + + // a class to perform the threaded execution of each new message + // this is an experimental feature introduced in 1.2.4 + class Runner implements Runnable { + IvyMessageListener cb; + IvyClient c; + String[] args; + private Thread t; + public Runner(IvyMessageListener cb,IvyClient c,String[] args) { + this.cb=cb; + this.args=args; + this.c=c; + t=new Thread(Runner.this); + bus.registerThread(t); + t.start(); + bus.unRegisterThread(t); + } + public void run() { cb.receive(c,args); } + } // class Runner + + private void traceDebug(String s){ + if (debug) + System.out.println("-->SelfIvyClient "+bus.appName+":"+appName+"<-- "+s); + } + +} +/* EOF */ |