aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/Ivy.java358
-rwxr-xr-xsrc/IvyApplicationAdapter.java4
-rwxr-xr-xsrc/IvyApplicationListener.java8
-rw-r--r--src/IvyBindListener.java30
-rwxr-xr-xsrc/IvyClient.java269
-rw-r--r--src/IvyDaemon.java6
-rwxr-xr-xsrc/IvyWatcher.java91
-rw-r--r--src/Makefile25
-rw-r--r--src/Probe.java73
-rw-r--r--src/SelfIvyClient.java144
10 files changed, 768 insertions, 240 deletions
diff --git a/src/Ivy.java b/src/Ivy.java
index b11d546..e3cc0bb 100755
--- a/src/Ivy.java
+++ b/src/Ivy.java
@@ -7,10 +7,28 @@
*<pre>
*Ivy bus = new Ivy("Dummy agent","ready",null);
*bus.bindMsg("(.*)",myMessageListener);
- *bus.start(null);
+ *bus.start(getDomain(null));
*</pre>
*
* CHANGELOG:
+ * 1.2.4:
+ * - added an accessor for doSendToSelf
+ * - waitForMsg() and waitForClient() to make the synchronization with
+ * other Ivy agents easier
+ * - with the bindAsyncMsg() to subscribe and perform each callback in a
+ * new Thread
+ * - bindMsg(regexp,messagelistener,boolean) allow to subscribe with a
+ * synchrone/asynch exectution
+ * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg()
+ * - bindListener are now handled
+ * - removeApplicationListener can throw IvyException
+ * - bus.start(null) now starts on getDomain(null), first the IVYBUS
+ * property, then the DEFAULT_DOMAIN, 127:2010
+ * - bindMsg() now throws an IvyException if the regexp is invalid !!!
+ * BEWARE, this can impact lots of programs ! (fixes J007)
+ * - no more includes the "broadcasting on " in the domain(String) method
+ * - new function sendToSelf(boolean) allow us to send messages to
+ * ourselves
* 1.2.3:
* - adds a IVYBUS property to propagate the domain once set. This way,
* children forked through Ivy java can inherit from the current value.
@@ -19,7 +37,7 @@
* added the String domains(String d) function, in order to display the
* domain list
* 1.2.1:
- * bus.start(null) now starts on DEFAULT_DOMAIN
+ * bus.start(null) now starts on DEFAULT_DOMAIN. ( correction 1.2.4 This was not true.)
* added the getDomains in order to correctly display the domain list
* checks if the serverThread exists before interrupting it
* no has unBindMsg(String)
@@ -44,11 +62,12 @@ public class Ivy implements Runnable {
/**
* the name of the application on the bus
*/
- String appName;
+ protected String appName;
/**
* the protocol version number
*/
- public static final int PROCOCOLVERSION = 3 ;
+ public static final int PROTOCOLVERSION = 3 ;
+ public static final int PROTOCOLMINIMUM = 3 ;
/**
* the port for the UDP rendez vous, if none is supplied
*/
@@ -61,24 +80,23 @@ public class Ivy implements Runnable {
* the library version, useful for development purposes only, when java is
* invoked with -DIVY_DEBUG
*/
- public static final String libVersion ="1.2.3";
+ public static final String libVersion ="1.2.4";
private boolean debug;
- private static int serial=0; /* an unique ID for each regexp */
private static int clientSerial=0; /* an unique ID for each IvyClient */
private ServerSocket app;
private Vector watchers;
private volatile Thread serverThread; // to ensure quick communication of the end
- private Hashtable callbacks = new Hashtable();
private Hashtable clients = new Hashtable();
private Vector ivyApplicationListenerList = new Vector();
- private String messages_classes[] = null;
- private boolean sendToSelf = false ;
+ private Vector ivyBindListenerList = new Vector();
+ private Vector sendThreads = new Vector();
private boolean stopped = false;
- int applicationPort; /* Application port number */
- Hashtable regexp_out = new Hashtable();
- String ready_message = null;
-
+ protected int applicationPort; /* Application port number */
+ protected String ready_message = null;
+ protected boolean doProtectNewlines = false ;
+ private boolean doSendToSelf = false ;
+ protected SelfIvyClient selfIvyClient ;
public final static int TIMEOUTLENGTH = 3000;
/**
@@ -96,6 +114,49 @@ public class Ivy implements Runnable {
ready_message = message;
debug = (System.getProperty("IVY_DEBUG")!=null);
if ( appcb != null ) ivyApplicationListenerList.addElement( appcb );
+ selfIvyClient=new SelfIvyClient(this,name);
+ }
+
+ /**
+ * Waits for a message to be received
+ *
+ * @since 1.2.4
+ * @param regexp the message we're waiting for to continue the main thread.
+ * @param timeout in millisecond, 0 if infinite
+ * @return the IvyClient who sent the message, or null if the timeout is
+ * reached
+ */
+ public IvyClient waitForMsg(String regexp,int timeout) throws IvyException {
+ Waiter w = new Waiter(timeout);
+ int re = bindMsg(regexp,w);
+ IvyClient ic=w.waitFor();
+ unBindMsg(re);
+ return ic;
+ }
+
+ /**
+ * Waits for an other IvyClient to join the bus
+ *
+ * @since 1.2.4
+ * @param name the name of the client we're waiting for to continue the main thread.
+ * @param timeout in millisecond, 0 if infinite
+ * @return the first IvyClient with the name or null if the timeout is
+ * reached
+ */
+ public IvyClient waitForClient(String name,int timeout) throws IvyException {
+ IvyClient ic;
+ if (name==null) throw new IvyException("null name given to waitForClient");
+ // first check if client with the same name is on the bus
+ for (Enumeration e=clients.elements();e.hasMoreElements();) {
+ ic = (IvyClient)e.nextElement();
+ if (name.compareTo(ic.getApplicationName())==0) return ic;
+ }
+ // if not enter the waiting loop
+ WaiterClient w = new WaiterClient(name,timeout);
+ int i = addApplicationListener(w);
+ ic=w.waitForClient();
+ removeApplicationListener(i);
+ return ic;
}
/**
@@ -113,7 +174,7 @@ public class Ivy implements Runnable {
*
*/
public void start(String domainbus) throws IvyException {
- if (domainbus==null) domainbus=DEFAULT_DOMAIN;
+ if (domainbus==null) domainbus=getDomain(null);
Properties sysProp = System.getProperties();
sysProp.put("IVYBUS",domainbus);
try {
@@ -123,7 +184,7 @@ public class Ivy implements Runnable {
} catch (IOException e) {
throw new IvyException("can't open TCP service socket " + e );
}
- traceDebug("lib: "+libVersion+" protocol: "+PROCOCOLVERSION+" TCP service open on port "+applicationPort);
+ traceDebug("lib: "+libVersion+" protocol: "+PROTOCOLVERSION+" TCP service open on port "+applicationPort);
watchers = new Vector();
Domain[] d = parseDomains(domainbus);
@@ -144,7 +205,11 @@ public class Ivy implements Runnable {
int index=0;
while ( st.hasMoreTokens()) {
String s = st.nextToken() ;
- d[index++]=new Domain(IvyWatcher.getDomain(s),IvyWatcher.getPort(s));
+ try {
+ d[index++]=new Domain(IvyWatcher.getDomain(s),IvyWatcher.getPort(s));
+ } catch (IvyException ie) {
+ // do nothing
+ }
}
return d;
}
@@ -177,31 +242,83 @@ public class Ivy implements Runnable {
}
/**
- * Toggles the sending of messages to oneself
+ * Toggles the sending of messages to oneself, the remote client's
+ * IvyMessageListeners are processed first, and ourself afterwards.
+ * @param boolean true if you want to send the message to yourself. Default
+ * is false
+ * @since 1.2.4
+ */
+ public void sendToSelf(boolean b) {doSendToSelf=b;}
+
+ /**
+ * @param boolean do I send message to myself ?
+ * @since 1.2.4
+ */
+ public boolean isSendToSelf() {return doSendToSelf;}
+
+ /**
+ * returns our self IvyClient.
+ * @since 1.2.4
+ */
+ public IvyClient getSelfIvyClient() {return selfIvyClient;}
+
+ /**
+ * Toggles the encoding/decoding of messages to prevent bugs related to the
+ * presence of a "\n"
+ * @param boolean true if you want to send the message to yourself. Default
+ * is false.
+ * @since 1.2.5?
+ * The default escape character is a ESC 0x1A
+ */
+ private void protectNewlines(boolean b) {doProtectNewlines=b;}
+
+ /**
+ * Performs a pattern matching according to everyone's regexps, and sends
+ * the results to the relevant ivy agents sequentially
+ *
+ * @param message A String which will be compared to the regular
+ * expressions of the different clients
+ * @return the number of messages actually sent
+ */
+ public int sendMsg(String message) throws IvyException {
+ return sendMsg(message,false);
+ }
+
+ /**
+ * Performs a pattern matching according to everyone's regexps, and sends
+ * the results to the relevant ivy agents, using as many threads as needed.
*
+ * @since 1.2.4
+ * @param message A String which will be compared to the regular
+ * expressions of the different clients
+ * @return always returns -1
*/
- public void sendToSelf(boolean b) {sendToSelf=b;}
+ public int sendAsyncMsg(String message,boolean async) throws IvyException {
+ return sendMsg(message,true);
+ }
/**
* Performs a pattern matching according to everyone's regexps, and sends
* the results to the relevant ivy agents.
- * <p><em>There is one thread for each client connected, we could also
- * create another thread each time we send a message.</em>
+ *
+ * @since 1.2.4
* @param message A String which will be compared to the regular
* expressions of the different clients
- * @return the number of messages actually sent
+ * @param async if true, the sending will be performed in a separate thread,
+ * default is false
+ * @return if async is true, always returns -1, else returns the number of messages actually sent
*/
- public int sendMsg( String message ) {
+ public int sendMsg(String message,boolean async) throws IvyException {
int count = 0;
- // an alternate implementation would one sender thread per client
- // instead of one for all the clients. It might be a performance issue
+ if (doProtectNewlines)
+ message=IvyClient.encode(message);
+ else if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
+ throw new IvyException("newline character not allowed in Ivy messages");
for ( Enumeration e=clients.elements();e.hasMoreElements();) {
IvyClient client = (IvyClient)e.nextElement();
- count += client.sendMsg( message );
- }
- if (sendToSelf) {
- // TODO
+ count += client.sendMsg(message, async);
}
+ if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(message);
return count;
}
@@ -221,17 +338,57 @@ public class Ivy implements Runnable {
* interface, on the AWT/Swing framework
* @return the id of the regular expression
*/
- public int bindMsg(String regexp, IvyMessageListener callback ) {
- // creates a new binding (regexp,callback)
- Integer key = new Integer(serial++);
- regexp_out.put(key,regexp);
- callbacks.put(key,callback );
+ public int bindMsg(String sregexp, IvyMessageListener callback ) throws IvyException {
+ return bindMsg(sregexp,callback,false);
+ }
+
+ /**
+ * Subscribes to a regular expression with asyncrhonous callback execution.
+ *
+ * Same as bindMsg, except that the callback will be executed in a separate
+ * thread each time.
+ * WARNING : there is no way to predict the order of execution
+ * of the * callbacks, i.e. a message received might trigger a callback before
+ * another one sent before
+ *
+ * @since 1.2.4
+ * @param regexp a perl regular expression, groups are done with parenthesis
+ * @param callback any objects implementing the IvyMessageListener
+ * interface, on the AWT/Swing framework
+ * @return the int ID of the regular expression.
+ */
+ public int bindAsyncMsg(String sregexp, IvyMessageListener callback ) throws IvyException {
+ return bindMsg(sregexp,callback,true);
+ }
+
+ /**
+ * Subscribes to a regular expression.
+ *
+ * The callback will be executed with
+ * the saved parameters of the regexp as arguments when a message will sent
+ * by another agent. A program <em>doesn't</em> receive its own messages.
+ * <p>Example:
+ * <br>the Ivy agent A performs <pre>b.bindMsg("^Hello (*)",cb);</pre>
+ * <br>the Ivy agent B performs <pre>b2.sendMsg("Hello world");</pre>
+ * <br>a thread in A will uun the callback cb with its second argument set
+ * to a array of String, with one single element, "world"
+ * @since 1.2.4
+ * @param regexp a perl regular expression, groups are done with parenthesis
+ * @param callback any objects implementing the IvyMessageListener
+ * interface, on the AWT/Swing framework
+ * @param async if true, each callback will be run in a separate thread,
+ * default is false
+ * @return the id of the regular expression
+ */
+ public int bindMsg(String sregexp, IvyMessageListener callback,boolean async ) throws IvyException {
+ // adds the regexp to our collection in selfIvyClient
+ int key=selfIvyClient.bindMsg(sregexp,callback,async);
// notifies the other clients this new regexp
for (Enumeration e=clients.elements();e.hasMoreElements();) {
IvyClient c = (IvyClient)e.nextElement();
- c.sendRegexp(key.intValue(),regexp);
+ c.sendRegexp(key,sregexp);
}
- return key.intValue();
+ return key;
}
/**
@@ -240,14 +397,9 @@ public class Ivy implements Runnable {
* @param id the id of the regular expression, returned when it was bound
*/
public void unBindMsg(int id) throws IvyException {
- Integer key = new Integer(id);
- if ( ( regexp_out.remove(key) == null )
- || (callbacks.remove(key) == null ) ) {
- throw new IvyException("client wants to remove an unexistant regexp "+id);
- }
- for (Enumeration e=clients.elements();e.hasMoreElements();) {
+ selfIvyClient.unBindMsg(id);
+ for (Enumeration e=clients.elements();e.hasMoreElements();)
((IvyClient)e.nextElement()).delRegexp(id );
- }
}
/**
@@ -258,18 +410,31 @@ public class Ivy implements Runnable {
* @param String the string for the regular expression
*/
public boolean unBindMsg(String re) {
- for (Enumeration e=regexp_out.keys();e.hasMoreElements();) {
- Integer k = (Integer)e.nextElement();
- if ( ((String)regexp_out.get(k)).compareTo(re) == 0) {
- try {
- unBindMsg(k.intValue());
- } catch (IvyException ie) {
- return false;
- }
- return true;
- }
+ return selfIvyClient.unBindMsg(re);
+ }
+
+ /**
+ * adds a bind listener to a bus
+ * @param callback is an object implementing the IvyBindListener interface
+ * @return the id of the bind listener, useful if you wish to remove it later
+ * @since 1.2.4
+ */
+ public int addBindListener(IvyBindListener callback){
+ ivyBindListenerList.addElement(callback);
+ return ivyBindListenerList.indexOf(callback);
+ }
+
+ /**
+ * removes a bind listener
+ * @param id the id of the bind listener to remove
+ * @since 1.2.4
+ */
+ public void removeBindListener(int id) throws IvyException {
+ try {
+ ivyBindListenerList.removeElementAt(id);
+ } catch (ArrayIndexOutOfBoundsException aie) {
+ throw new IvyException(id+" is not a valid Id");
}
- return false;
}
/**
@@ -281,32 +446,49 @@ public class Ivy implements Runnable {
*/
public int addApplicationListener(IvyApplicationListener callback){
ivyApplicationListenerList.addElement(callback);
- int id = ivyApplicationListenerList.indexOf( callback );
- return id;
+ return ivyApplicationListenerList.indexOf( callback );
}
/**
* removes an application listener
* @param id the id of the application listener to remove
*/
- public void removeApplicationListener(int id){
- ivyApplicationListenerList.removeElementAt(id);
+ public void removeApplicationListener(int id) throws IvyException {
+ try {
+ ivyApplicationListenerList.removeElementAt(id);
+ } catch (ArrayIndexOutOfBoundsException aie) {
+ throw new IvyException(id+" is not a valid Id");
+ }
}
/* invokes the application listeners upon arrival of a new Ivy client */
- public void connect(IvyClient client){
+ protected void clientConnects(IvyClient client){
for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) {
((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).connect(client);
}
}
- /* invokes the application listeners upon arrival of a new Ivy client */
- void disconnectReceived(IvyClient client){
+ /* invokes the application listeners upon the departure of an Ivy client */
+ protected void clientDisconnects(IvyClient client){
for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) {
((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).disconnect(client);
}
}
+ /* invokes the bind listeners */
+ protected void regexpReceived(IvyClient client,int id,String sregexp){
+ for ( int i = 0 ; i < ivyBindListenerList.size(); i++ ) {
+ ((IvyBindListener)ivyBindListenerList.elementAt(i)).bindPerformed(client,id,sregexp);
+ }
+ }
+
+ /* invokes the bind listeners */
+ protected void regexpDeleted(IvyClient client,int id,String sregexp){
+ for ( int i = 0 ; i < ivyBindListenerList.size(); i++ ) {
+ ((IvyBindListener)ivyBindListenerList.elementAt(i)).unbindPerformed(client,id,sregexp);
+ }
+ }
+
/*
* removes a client from the list
*/
@@ -316,9 +498,9 @@ public class Ivy implements Runnable {
* invokes the application listeners when we are summoned to die
* then stops
*/
- public void dieReceived(IvyClient client, int id){
+ public void dieReceived(IvyClient client, int id,String message){
for ( int i=0 ;i<ivyApplicationListenerList.size();i++ ) {
- ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).die(client,id);
+ ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).die(client,id,message);
}
}
@@ -360,21 +542,13 @@ public class Ivy implements Runnable {
//
/////////////////////////////////////////////////////////////////:
- synchronized void addClient(Socket socket,boolean peerCalling) throws IOException {
+ synchronized void addClient(Socket socket,boolean peerCalling,int protocolVersion) throws IOException {
if (stopped) return;
- IvyClient client = new IvyClient( this, socket,peerCalling,new Integer(clientSerial++));
+ IvyClient client = new IvyClient( this, socket,peerCalling,new Integer(clientSerial++),protocolVersion);
clients.put(client.getClientKey(),client);
traceDebug(getClientNames());
}
- void callCallback(IvyClient client, Integer key, String[] tab) throws IvyException {
- IvyMessageListener callback=(IvyMessageListener)callbacks.get(key);
- if (callback==null){
- throw new IvyException("(callCallback) Not regexp matching id "+key.intValue());
- }
- callback.receive( client, tab);
- }
-
private static String[] myTokenize(String s,String separator) {
int index=0, last=0, length=s.length();
Vector v = new Vector();
@@ -402,20 +576,6 @@ public class Ivy implements Runnable {
}
- /**
- * checks the "validity" of a regular expression.
- */
- boolean CheckRegexp( String exp ) {
- boolean regexp_ok = true;
- if ( exp.startsWith( "^" )&&messages_classes!=null) {
- regexp_ok=false;
- for (int i=0 ; i < messages_classes.length;i++) {
- if (messages_classes[i].equals(exp.substring(1))) return true;
- }
- }
- return regexp_ok;
- }
-
/*
* prevents two clients from connecting to each other at the same time
* there might still be a lingering bug here, that we could avoid with the
@@ -433,14 +593,14 @@ public class Ivy implements Runnable {
/*
* the service socket thread reader main loop
*/
- public void run() {
+ public void run() {
// System.out.println("Ivy service Thread started"); // THREADDEBUG
Thread thisThread=Thread.currentThread();
while(thisThread==serverThread){
try {
Socket socket = app.accept();
if ((thisThread!=serverThread)||stopped) break; // early disconnexion
- addClient(socket,true); // the peer called me
+ addClient(socket,true,0); // the peer called me TODO I don't know his protocol version
} catch (InterruptedIOException ie) {
if (thisThread!=serverThread) break;
} catch( IOException e ) {
@@ -459,6 +619,21 @@ public class Ivy implements Runnable {
private void traceDebug(String s){ if (debug) System.out.println("-->ivy<-- "+s); }
+ synchronized void registerThread(Thread t) { sendThreads.add(t); }
+ synchronized void unRegisterThread(Thread t) { sendThreads.remove(t); }
+ synchronized Thread getOneThread() {
+ if (sendThreads.size()==0) return null;
+ return (Thread) sendThreads.firstElement();
+ }
+
+ void waitForAll() {
+ Thread t;
+ System.out.println("DEVELOPMENT WAITFORALL sendThreads size : " + sendThreads.size());
+ try { while ((t=getOneThread())!=null) { t.join(); } }
+ catch (InterruptedException ie) { System.out.println("waitForAll Interrupted"); }
+ System.out.println("DEVELOPMENT END WAITFORALL");
+ }
+
/* a small private method for debbugging purposes */
private String getClientNames() {
String s = appName+" clients are: ";
@@ -469,7 +644,7 @@ public class Ivy implements Runnable {
}
public String domains(String toparse) {
- String s="broadcasting on ";
+ String s="";
Ivy.Domain[] d = parseDomains(toparse);
for (int index=0;index<d.length;index++) {
s+=d[index].getDomainaddr()+":"+d[index].getPort()+" ";
@@ -495,7 +670,10 @@ public class Ivy implements Runnable {
Ivy bus = new Ivy("Test Unitaire","TU ready",null);
try {
bus.start(null);
- try { Thread.sleep(2000); } catch (InterruptedException ie) { }
+ System.out.println("waiting 5 seconds for a coucou");
+ System.out.println(((bus.waitForMsg("^coucou",5000))!=null)?"coucou received":"coucou not received");
+ System.out.println("waiting 5 seconds for IvyProbe");
+ System.out.println(((bus.waitForClient("IVYPROBE",5000))!=null)?"Ivyprobe joined the bus":"nobody came");
bus.stop();
} catch (IvyException ie) {
ie.printStackTrace();
diff --git a/src/IvyApplicationAdapter.java b/src/IvyApplicationAdapter.java
index 376ee9a..0a82486 100755
--- a/src/IvyApplicationAdapter.java
+++ b/src/IvyApplicationAdapter.java
@@ -5,7 +5,7 @@
* @author Yannick Jestin
* @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
*
- * an ApplicatinListener class for handling application-level request on the
+ * an ApplicationListener class for handling application-level request on the
* Ivy bus. The methods in this class are empty. This class exists as a
* convenience for implementing a subset of the methods of the
* applicationlistener. See the AWT 1.1 framework for further information on
@@ -19,6 +19,6 @@ package fr.dgac.ivy;
public abstract class IvyApplicationAdapter implements IvyApplicationListener {
public void connect( IvyClient client ) { }
public void disconnect( IvyClient client ) { }
- public void die( IvyClient client, int id ) { }
+ public void die( IvyClient client, int id, String msgarg) { }
public void directMessage( IvyClient client, int id,String msgarg ) {}
}
diff --git a/src/IvyApplicationListener.java b/src/IvyApplicationListener.java
index bdfb12c..456a85f 100755
--- a/src/IvyApplicationListener.java
+++ b/src/IvyApplicationListener.java
@@ -9,6 +9,11 @@ package fr.dgac.ivy;
*
* The ApplicatinListenr for receiving application level events on the Ivy
* bus: connexion, disconnexion, direct messages or requests to quit.
+ *
+ * Changelog:
+ * 1.2.4
+ * - sendDie now requires a String argument ! It is MANDATORY, and could
+ * impact your implementations !
*/
public interface IvyApplicationListener extends java.util.EventListener {
@@ -26,13 +31,12 @@ public interface IvyApplicationListener extends java.util.EventListener {
* invoked when a peer request us to leave the bus
* @param client the peer
*/
- public abstract void die(IvyClient client, int id);
+ public abstract void die(IvyClient client, int id,String msgarg);
/**
* invoked when a peer sends us a direct message
* @param client the peer
* @param id
* @param msgarg the message itself
- * this is not yet implemented in java. I believe it has no real use :)
*/
public abstract void directMessage( IvyClient client, int id,String msgarg );
}
diff --git a/src/IvyBindListener.java b/src/IvyBindListener.java
new file mode 100644
index 0000000..96096f2
--- /dev/null
+++ b/src/IvyBindListener.java
@@ -0,0 +1,30 @@
+package fr.dgac.ivy;
+
+/**
+ * this interface specifies the methods of a BindListener
+ *
+ * @author Yannick Jestin
+ * @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
+ *
+ * Changelog:
+ */
+
+public interface IvyBindListener extends java.util.EventListener {
+
+ /**
+ * invoked when a Ivy Client performs a bind
+ * @param client the peer
+ * @param int the regexp ID
+ * @param regexp the regexp
+ */
+ public abstract void bindPerformed(IvyClient client,int id, String regexp);
+
+ /**
+ * invoked when a Ivy Client performs a unbind
+ * @param client the peer
+ * @param int the regexp ID
+ * @param regexp the regexp
+ */
+ public abstract void unbindPerformed(IvyClient client,int id,String regexp);
+
+}
diff --git a/src/IvyClient.java b/src/IvyClient.java
index 8aad199..b69c8d3 100755
--- a/src/IvyClient.java
+++ b/src/IvyClient.java
@@ -10,6 +10,18 @@
* created for each remote client.
*
* CHANGELOG:
+ * 1.2.4:
+ * - sendBuffer goes synchronized
+ * - sendMsg now has a async parameter, allowing the use of threads to
+ * delegate the sending of messages
+ * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg()
+ * - breaks the connexion with faulty Ivy clients (either protocol or invalid
+ * regexps, closes bug J007 (CM))
+ * - sendDie now always requires a reason
+ * - invokes the disconnect applicationListeners at the end of the run()
+ * loop.
+ * closes Bug J006 (YJ)
+ * - changed the access of some functions ( sendRegexp, etc ) to protected
* 1.2.3:
* - silently stops on InterruptedIOException.
* - direct Messages
@@ -50,54 +62,64 @@ public class IvyClient implements Runnable {
final static int DelRegexp = 4;/* the peer removes one of his regex */
final static int EndRegexp = 5;/* no more regexp in the handshake */
final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */
+ /* SchizoToken is aka BeginRegexp in other implementations */
final static int DirectMsg = 7;/* the peer sends a direct message */
final static int Die = 8; /* the peer wants us to quit */
final static int Ping = 9; /* checks the presence of the other */
final static int Pong = 10; /* checks the presence of the other */
-
final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
final static String StartArg = "\u0002";/* begin of arguments */
final static String EndArg = "\u0003"; /* end of arguments */
+ final static String escape ="\u001A";
+ final static char escapeChar = escape.charAt(0);
+ final static char endArgChar = EndArg.charAt(0);
+ final static char newLineChar = '\n';
-
- private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
+ // private variables
+ private String messages_classes[] = null;
private Ivy bus;
private Socket socket;
private BufferedReader in;
private OutputStream out;
- private Hashtable regexp_in = new Hashtable();
- private Hashtable regexp_text = new Hashtable();
private int appPort;
private boolean peerCalling;
private volatile Thread clientThread;// volatile to ensure the quick communication
private Integer clientKey ;
private static boolean doping = (System.getProperty("IVY_PING")!=null) ;
- final static int PINGTIMEOUT = 5000;
+ private final static int PINGTIMEOUT = 5000;
private PINGER pinger;
private volatile Thread pingerThread;
+ private boolean discCallbackPerformed = false;
// protected variables
String appName;
+ Hashtable regexps = new Hashtable();
+ Hashtable regexpsText = new Hashtable();
+ static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
+ int protocol;
+
+ IvyClient(){}
- IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey) throws IOException {
+ IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException {
appName = "Unknown";
appPort = 0;
this.bus = bus;
this.socket = socket;
this.peerCalling=peerCalling;
this.clientKey=clientKey;
+ this.protocol=protocol;
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = socket.getOutputStream();
- Hashtable regexps=bus.regexp_out;
+ Hashtable tosend=bus.selfIvyClient.regexpsText;
// sends our ID, whether we initiated the connexion or not
// the ID is the couple "host name,application Port", the host name
// information is in the socket itself, the port is not known if we
// initiate the connexion
send(SchizoToken,bus.applicationPort,bus.appName);
// sends our regexps to the peer
- for (Enumeration e = regexps.keys(); e.hasMoreElements(); ) {
+ for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) {
Integer ikey = (Integer)e.nextElement();
- sendRegexp( ikey.intValue(),(String)regexps.get(ikey));
+ sendRegexp(ikey.intValue(),(String)tosend.get(ikey));
}
send( EndRegexp,0,"");
// spawns a thread to manage the incoming traffic on this
@@ -111,88 +133,138 @@ public class IvyClient implements Runnable {
}
}
+ public String toString() { return "IvyClient "+bus.appName+":"+appName; }
+
/**
* returns the name of the remote agent.
*/
public String getApplicationName() { return appName ; }
- Integer getClientKey() { return clientKey ; }
-
/**
* allow an Ivy package class to access the list of regexps at a
* given time.
* perhaps we should implement a new IvyApplicationListener method to
* allow the notification of regexp addition and deletion
+ * The content is not modifyable because String are not mutable, and cannot
+ * be modified once they are create.
+ * @see getRegexpsArray to get a String[] result
*/
- Enumeration getRegexps() { return regexp_text.elements(); }
+ public Enumeration getRegexps() { return regexpsText.elements(); }
- int getAppPort() { return appPort ; }
-
- void sendRegexp(int id,String regexp) {
- send(AddRegexp,id,regexp); /* perhaps we should perform some checking here */
+ /**
+ * allow an Ivy package class to access the list of regexps at a
+ * given time.
+ * @since 1.2.4
+ */
+ public String[] getRegexpsArray() {
+ String[] s = new String[regexpsText.size()];
+ int i=0;
+ for (Enumeration e=getRegexps();e.hasMoreElements();)
+ s[i++]=(String)e.nextElement();
+ return s;
}
- public void delRegexp(int id) {send( DelRegexp,id,"");}
-
/**
* sends a direct message to the peer
* @param id the numeric value provided to the remote client
* @param message the string that will be match-tested
*/
- public void sendDirectMsg(int id,String message) { send(DirectMsg,id,message); }
+ public void sendDirectMsg(int id,String message) throws IvyException {
+ if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
+ throw new IvyException("newline character not allowed in Ivy messages");
+ send(DirectMsg,id,message);
+ }
- /**
- * closes the connexion to the peer.
- * @param notify should I send Bye message ?
- * the thread managing the socket is stopped
- */
- void close(boolean notify) throws IOException {
+ /* closes the connexion to the peer */
+ protected void close(boolean notify) throws IOException {
+ bus.waitForAll();
traceDebug("closing connexion to "+appName);
if (doping&&(pinger!=null)) { pinger.stopPinging(); }
if (notify) sendBye("hasta la vista");
stopListening();
- // bus.clientDisconnect(this);
- socket.close(); // should I also close in and out ?
+ socket.close();
}
/**
- * sends the substrings of a message to the peer for each matching regexp.
- * @param message the string that will be match-tested
- * @return the number of messages sent to the peer
+ * asks the remote client to leave the bus.
+ * @param message the message that will be carried
*/
- int sendMsg( String message ) {
+ public void sendDie(String message) { send(Die,0,message); }
+
+ /**
+ * checks the "validity" of a regular expression.
+ * @param exp the string being a regular expression
+ * @return true if the regexp is valid
+ * @since 1.2.4
+ */
+ public boolean CheckRegexp( String exp ) {
+ boolean ok = true;
+ if ( exp.startsWith( "^" )&&messages_classes!=null) {
+ ok=false;
+ for (int i=0 ; i < messages_classes.length;i++) {
+ if (messages_classes[i].equals(exp.substring(1))) return true;
+ }
+ }
+ return ok;
+ }
+
+ ///////////////////////////////////////////////////
+ //
+ // PROTECTED METHODS
+ //
+ ///////////////////////////////////////////////////
+ static String decode(String s) { return s.replace(escapeChar,'\n'); }
+ static String encode(String s) { return s.replace('\n',escapeChar); }
+ Integer getClientKey() { return clientKey ; }
+ int getAppPort() { return appPort ; }
+ void sendRegexp(int id,String regexp) {send(AddRegexp,id,regexp);}
+ void delRegexp(int id) {send( DelRegexp,id,"");}
+
+ int sendMsg(String message,boolean async) {
+ if (async) {
+ new Sender(message);
+ return -1;
+ } else { return sendMsg(message); }
+ }
+
+ private int sendMsg(String message) {
int count = 0;
- for (Enumeration e = regexp_in.keys();e.hasMoreElements();) {
+ for (Enumeration e = regexps.keys();e.hasMoreElements();) {
Integer key = (Integer)e.nextElement();
- RE regexp = (RE)regexp_in.get(key);
+ RE regexp = (RE)regexps.get(key);
+ int nb = regexp.getNumSubs();
REMatch result = regexp.getMatch(message);
- if ( result != null ) {
- send(Msg,key,regexp.getNumSubs(),result);
- count++;
- }
+ if (result==null) continue; // no match
+ count++; // match
+ send(Msg,key,regexp.getNumSubs(),result);
}
return count;
}
-
- void stopListening() {
+
+ ///////////////////////////////////////////////////
+ //
+ // PRIVATE METHODS
+ //
+ ///////////////////////////////////////////////////
+
+ /* interrupt the listener thread */
+ private void stopListening() {
Thread t = clientThread;
if (t==null) return; // we can be summoned to quit from two path at a time
clientThread=null;
t.interrupt();
}
- /**
+ /*
* compares two peers the id is the couple (host,service port).
- * @param clnt the other peer
- * @return true if the peers are similir. This should not happen, it is bad
- * © ® (tm)
+ * true if the peers are similir. This should not happen, it is bad
*/
- boolean sameClient( IvyClient clnt ) {
+ protected boolean sameClient( IvyClient clnt ) {
return ( appPort != 0 && appPort == clnt.appPort )
&& ( getRemoteAddress() == clnt.getRemoteAddress() ) ;
}
- /**
+ /*
* the code of the thread handling the incoming messages.
*/
public void run() {
@@ -209,7 +281,10 @@ public class IvyClient implements Runnable {
if ((msg=in.readLine()) != null ) {
if (clientThread!=thisThread) break; // early stop during readLine()
if (doping && (pingerThread!=null)) pingerThread.interrupt();
- newParseMsg(msg);
+ if (!newParseMsg(msg)) {
+ close(true);
+ break;
+ }
} else {
traceDebug("readline null ! leaving the thead");
break;
@@ -226,9 +301,12 @@ public class IvyClient implements Runnable {
}
traceDebug("normally Disconnected from "+ appName);
traceDebug("Thread stopped");
+ // invokes the disconnect applicationListeners
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
}
- private void sendBuffer( String buffer ) throws IvyException {
+ private synchronized void sendBuffer( String buffer ) throws IvyException {
buffer += "\n";
try {
out.write(buffer.getBytes() );
@@ -237,8 +315,9 @@ public class IvyClient implements Runnable {
traceDebug("I can't send my message to this client. He probably left");
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
- // invokes the die applicationListeners
- bus.disconnectReceived(this);
+ // invokes the disconnect applicationListeners
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
try {
close(false);
} catch (IOException ioe) {
@@ -258,12 +337,8 @@ public class IvyClient implements Runnable {
private void send(int type, Integer id, int nbsub, REMatch result) {
String buffer = type+" "+id+StartArg;
- // Start at 1 because group 0 represent entire matching
- for(int sub = 1; sub <= nbsub; sub++) {
- if (result.getStartIndex(sub) > -1) {
- buffer += result.toString(sub)+EndArg;
- }
- }
+ for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1)
+ buffer += result.toString(sub)+EndArg;
try {
sendBuffer(buffer);
} catch (IvyException ie ) {
@@ -293,24 +368,33 @@ public class IvyClient implements Runnable {
return s;
}
- private void newParseMsg(String s) throws IvyException {
+ private boolean newParseMsg(String s) throws IvyException {
byte[] b = s.getBytes();
int from=0,to=0,msgType;
Integer msgId;
while ((to<b.length)&&(b[to]!=' ')) to++;
- if (to>=b.length) throw new IvyException("protocol error");
+ // return false au lieu de throw
+ if (to>=b.length) {
+ System.out.println("protocol error from "+appName);
+ return false;
+ }
try {
msgType = Integer.parseInt(s.substring(from,to));
} catch (NumberFormatException nfe) {
- throw new IvyException("protocol error on msgType");
+ System.out.println("protocol error on msgType from "+appName);
+ return false;
}
from=to+1;
while ((to<b.length)&&(b[to]!=2)) to++;
- if (to>=b.length) throw new IvyException("protocol error");
+ if (to>=b.length) {
+ System.out.println("protocol error from "+appName);
+ return false;
+ }
try {
msgId = new Integer(s.substring(from,to));
} catch (NumberFormatException nfe) {
- throw new IvyException("protocol error on identifier");
+ System.out.println("protocol error from "+appName+" "+s.substring(from,to)+" is not a number");
+ return false;
}
from=to+1;
switch (msgType) {
@@ -319,7 +403,8 @@ public class IvyClient implements Runnable {
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
- bus.dieReceived(this,msgId.intValue());
+ String message=s.substring(from,b.length);
+ bus.dieReceived(this,msgId.intValue(),message);
// makes the bus die
bus.stop();
try {
@@ -334,7 +419,8 @@ public class IvyClient implements Runnable {
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
- bus.disconnectReceived(this);
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
try {
close(false);
} catch (IOException ioe) {
@@ -343,24 +429,28 @@ public class IvyClient implements Runnable {
break;
case AddRegexp:
String regexp=s.substring(from,b.length);
- if ( bus.CheckRegexp(regexp) ) {
+ if ( CheckRegexp(regexp) ) {
try {
- regexp_in.put(msgId,new RE(regexp));
- regexp_text.put(msgId,regexp);
+ regexps.put(msgId,new RE(regexp));
+ regexpsText.put(msgId,regexp);
+ bus.regexpReceived(this,msgId.intValue(),regexp);
} catch (REException e) {
- throw new IvyException("regexp error " +e.getMessage());
+ // the remote client sent an invalid regexp !
+ System.out.println("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp");
+ return true;
}
} else {
throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName);
}
break;
case DelRegexp:
- regexp_in.remove(msgId);
- regexp_text.remove(msgId);
+ regexps.remove(msgId);
+ String text=(String)regexpsText.remove(msgId);
+ bus.regexpDeleted(this,msgId.intValue(),text);
break;
case EndRegexp:
- bus.connect(this);
- /*
+ bus.clientConnects(this);
+ /* TODO check with the protocol itself.
* the peer is perhaps not ready to handle this message
* an assymetric processing should be written
*/
@@ -371,7 +461,7 @@ public class IvyClient implements Runnable {
while (to<b.length) {
while ( (to<b.length) && (b[to]!=3) ) to++;
if (to<b.length) {
- v.addElement(s.substring(from,to));
+ v.addElement(decode(s.substring(from,to)));
to++;
from=to;
}
@@ -382,7 +472,7 @@ public class IvyClient implements Runnable {
// for developpemnt purposes
// System.out.println(" *"+tab[i]+"* "+(tab[i]).length());
}
- bus.callCallback(this,msgId,tab);
+ bus.selfIvyClient.callCallback(this,msgId,tab);
break;
case Pong:
String paramPong=s.substring(from,b.length);
@@ -398,7 +488,7 @@ public class IvyClient implements Runnable {
String error=s.substring(from,b.length);
traceDebug("Error msg "+msgId+" "+error);
break;
- case SchizoToken:
+ case SchizoToken: // aka BeginRegexp in other implementations
appName=s.substring(from,b.length);
appPort=msgId.intValue();
if ( bus.checkConnected(this) ) {
@@ -415,25 +505,19 @@ public class IvyClient implements Runnable {
bus.directMessage( this, msgId.intValue(), direct );
break;
default:
- throw new IvyException("protocol error, unknown message type "+msgType);
+ System.out.println("protocol error from "+appName+", unknown message type "+msgType);
+ return false;
}
+ return true;
}
- void sendPong(String s) {send(Pong,0,s);}
- void sendPing(String s) {send(Ping,0,s);}
-
+ protected void sendPong(String s) {send(Pong,0,s);}
+ protected void sendPing(String s) {send(Ping,0,s);}
private void sendBye() {send(Bye,0,"");}
private void sendBye(String message) {send(Bye,0,message);}
-
- public void sendDie() { send(Die,0,""); }
- public void sendDie(String message) {send(Die,0,message);}
private InetAddress getRemoteAddress() { return socket.getInetAddress(); }
- public String toString() {
- return "IvyClient "+bus.appName+":"+appName;
- }
-
private void traceDebug(String s){
if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s);
}
@@ -455,4 +539,19 @@ public class IvyClient implements Runnable {
public void stopPinging() { isPinging=false; pingerThread.interrupt();}
}
+ // a class to perform the threaded execution of each new message
+ // this is an experimental feature introduced in 1.2.4
+ class Sender implements Runnable {
+ String message;
+ private Thread t;
+ public Sender(String message) {
+ this.message=message;
+ t=new Thread(Sender.this);
+ bus.registerThread(t);
+ t.start();
+ bus.unRegisterThread(t);
+ }
+ public void run() { sendMsg(message); }
+ } // class Sender
+
}
diff --git a/src/IvyDaemon.java b/src/IvyDaemon.java
index 56019c1..b843a63 100644
--- a/src/IvyDaemon.java
+++ b/src/IvyDaemon.java
@@ -119,7 +119,11 @@ public class IvyDaemon implements Runnable {
while (true) {
msg=in.readLine();
if (msg==null) break;
- bus.sendMsg(msg);
+ try {
+ bus.sendMsg(msg);
+ } catch (IvyException ie) {
+ System.out.println("incorrect characters whithin the message. Not sent");
+ }
}
} catch (IOException ioe) {
traceDebug("Subreader exception ...");
diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java
index 98956ea..7698632 100755
--- a/src/IvyWatcher.java
+++ b/src/IvyWatcher.java
@@ -14,6 +14,17 @@
* thing.
*
* CHANGELOG:
+ * 1.2.5:
+ * - getDomain now sends IvyException for malformed broadcast addresses
+ * 1.2.4:
+ * - sends the broadcast before listening to the other's broadcasts.
+ * TODO wait for all the broadcast to be sent before starting the listen
+ * mode
+ * - (REMOVED) allows the connexion from a remote host with the same port number
+ * it's too complicated to know if the packet is from ourselves...
+ * - deals with the protocol errors in a more efficient way. The goal is not
+ * to loose our connectivity because of a rude agent.
+ * fixes Bug J005 (YJ + JPI)
* 1.2.3:
* - the packet sending is done in its own thread from now on (PacketSender)
* I don't care stopping it, since it can't be blocked.
@@ -51,6 +62,7 @@ class IvyWatcher implements Runnable {
private boolean isMulticastAddress = false;
private Ivy bus; /* master bus controler */
private DatagramSocket broadcast; /* supervision socket */
+ private InetAddress localhost,loopback;
private String domainaddr;
private int port;
private volatile Thread listenThread;
@@ -75,6 +87,9 @@ class IvyWatcher implements Runnable {
((MulticastSocket)broadcast).joinGroup(group);
}
broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH);
+ localhost=InetAddress.getLocalHost();
+ loopback=InetAddress.getByName(null);
+ } catch ( UnknownHostException uhe ) {
} catch ( IOException e ) {
throw new IvyException("IvyWatcher I/O error" + e );
}
@@ -89,9 +104,9 @@ class IvyWatcher implements Runnable {
traceDebug("beginning of a watcher Thread");
byte buf[] = new byte[256];
DatagramPacket packet=new DatagramPacket(buf, 256);
+ InetAddress remotehost=null;
try {
while( listenThread==thisThread ) {
- int port;
try {
broadcast.receive(packet);
if (listenThread!=thisThread) break; // I was summoned to leave during the receive
@@ -99,37 +114,43 @@ class IvyWatcher implements Runnable {
for (int i=0;i<buf.length;i++) { buf[i]=10; }
// clean up the buffer after each message
// BUGFIX ? I change 0 to 10 in order to avoid a bug
- InetAddress remotehost = packet.getAddress();
- traceDebug("BUSWATCHER Receive Broadcast from "+ remotehost.getHostName()+":"+packet.getPort());
+ remotehost = packet.getAddress();
+ traceDebug("BUSWATCHER Receive Broadcast from "+remotehost.getHostName()+":"+packet.getPort());
// TODO if ( !isInDomain( remotehost ) ) continue;
- // TODO get rid of the StringTokenizer ?
- StringTokenizer st = new StringTokenizer(msg);
- if ( !st.hasMoreTokens()) {
- System.err.println("Bad format "+msg);
- continue;
- }
- int version = Integer.parseInt( st.nextToken() );
- if ( version != bus.PROCOCOLVERSION ) {
- System.err.println("Ignoring bad protocol version broadcast");
- continue;
- }
- if ( ! st.hasMoreTokens()) {
- System.err.println("Bad format "+msg);
- continue;
- }
- port = Integer.parseInt( st.nextToken() );
- if ( (bus.applicationPort == port) ) continue;
- traceDebug("BUSWATCHER Broadcast de "
- +packet.getAddress().getHostName()
- +":"+packet.getPort()+" port "+port+" version "+version);
try {
+ RE re = new RE("([0-9]*) ([0-9]*)");
+ REMatch result = re.getMatch(msg);
+ if (result==null) {
+ System.err.println("Ignoring bad format broadcast from "+remotehost);
+ continue;
+ }
+ int version = Integer.parseInt(result.toString(1));
+ if ( version < bus.PROTOCOLMINIMUM ) {
+ System.err.println("Ignoring bad protocol version "+remotehost+" we need "+ bus.PROTOCOLMINIMUM+" minimum");
+ continue;
+ }
+ int port = Integer.parseInt(result.toString(2));
+ // allows the connexion from a remote host with the same port number
+ // if ( ( (remotehost.equals(localhost)) || (remotehost.equals(loopback)) )
+ // && (bus.applicationPort==port)) {
+ if (bus.applicationPort==port) {
+ traceDebug("ignoring my own broadcast. OK");
+ continue; // it's me
+ }
+ traceDebug("Broadcast de " +packet.getAddress().getHostName()
+ +":"+packet.getPort()+" port "+port+" version "+version);
Socket socket = new Socket( remotehost, port );
- bus.addClient(socket,false);
+ bus.addClient(socket,false,version);
+ } catch (REException ree) {
+ ree.printStackTrace();
+ System.exit(-1);
+ } catch (NumberFormatException nfe) {
+ System.err.println("Ignoring bad format broadcast from "+remotehost);
+ continue;
} catch ( UnknownHostException e ) {
System.err.println("Unkonwn host "+remotehost + e.getMessage());
} catch ( IOException e) {
- System.err.println("can't connect to "+remotehost+" port "+
- port+e.getMessage());
+ System.err.println("can't connect to "+remotehost+" port "+ port+e.getMessage());
}
} catch (InterruptedIOException jii ){
if (thisThread!=listenThread) { break ;}
@@ -184,9 +205,9 @@ class IvyWatcher implements Runnable {
}
synchronized void start() throws IvyException {
- String hello = bus.PROCOCOLVERSION + " " + bus.applicationPort + "\n";
- listenThread.start();
+ String hello = bus.PROTOCOLVERSION + " " + bus.applicationPort + "\n";
new PacketSender(hello); // notifies our arrival on each domain: protocol version + port
+ listenThread.start();
}
/*
@@ -209,18 +230,24 @@ class IvyWatcher implements Runnable {
}
*/
- static String getDomain(String net) {
+ // TODO this is buggy :-\ try it on a named multicast address just to see
+ static String getDomain(String net) throws IvyException {
+ // System.out.println("debug: net=[" + net+ "]");
int sep_index = net.lastIndexOf( ":" );
if ( sep_index != -1 ) { net = net.substring(0,sep_index); }
try {
net += ".255.255.255";
RE exp = new RE( "^(\\d+\\.\\d+\\.\\d+\\.\\d+).*");
net = exp.substitute( net , "$1" );
+ if (net==null) {
+ System.out.println("Bad broascat addr " + net);
+ throw new IvyException("bad broadcast addr");
+ }
} catch ( REException e ){
- System.out.println("Bad broascat addr " + net);
- return null;
+ System.out.println(e);
+ System.exit(0);
}
- // System.out.println("net: "+net);
+ // System.out.println("debug: returning net=[" + net+ "]");
return net;
}
diff --git a/src/Makefile b/src/Makefile
index 82fa423..54cfdec 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -1,19 +1,28 @@
-GNUPATH=/usr/share/java/repository:/usr/share/java/gnu.getopt.0.9.jar:/usr/share/java/gnu-regexp-1.1.3.jar
+GNUPATH=/usr/share/java/repository:/usr/share/java/gnu-regexp-1.1.3.jar
+#GNUPATH=/usr/share/java/repository:/usr/share/java/gnu.getopt.0.9.jar:/usr/share/java/gnu-regexp-1.1.3.jar
+#GNUPATH=${HOME}/JavaFactory
#######################################
# generic setup
#######################################
- JAVAC = javac
-JAVACOPTS = -d . -deprecation
-CLASSPATH = -classpath .:$(GNUPATH)
+# JAVAC = javac
+#JAVACOPTS = -d . -deprecation
+#CLASSPATH = -classpath .:$(GNUPATH)
#######################################
-# jikes setup on my box
+# gcj setup
#######################################
-# RTPATH = /usr/local/j2sdk1.4.1/jre/lib/rt.jar
+# JAVAC = gcj
#JAVACOPTS = -d . -deprecation
-# JAVAC = jikes
-#CLASSPATH = -classpath .:$(RTPATH)
+#CLASSPATH = -classpath .:$(GNUPATH)
+
+#######################################
+# jikes setup on my box
+#######################################
+ RTPATH = /usr/lib/j2se/1.4/jre/lib/rt.jar
+JAVACOPTS = -d . -deprecation
+ JAVAC = jikes
+CLASSPATH = -classpath .:$(RTPATH):$(GNUPATH)
#######################################
# blackdown jdk118 setup
diff --git a/src/Probe.java b/src/Probe.java
index bd37089..8e4f9c2 100644
--- a/src/Probe.java
+++ b/src/Probe.java
@@ -7,6 +7,9 @@
* (c) CENA
*
* Changelog:
+ * 1.2.4
+ * - now uses the bindListener paradigm to display the binding/unbinding dynamically
+ * - adds the -s (send to self) command line switch
* 1.2.3
* - now allows directMessages with the .direct command
* - the parseMsg is being rewritten with regexps
@@ -40,17 +43,18 @@ import java.util.*;
import gnu.getopt.Getopt;
import gnu.regexp.*;
-public class Probe implements IvyApplicationListener, IvyMessageListener, Runnable {
+public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBindListener, Runnable {
public static final String helpCommands = "Available commands:\n.die CLIENTNAME sends a die message\n.direct CLIENTNAME ID MESSAGE sends the direct message to the client, with a message id set to the numerical ID\n.bye quits the application\n.quit idem\n.list lists the available clients\n.ping sends a ping request if IVY_PING is enabled\n.bind REGEXP binds to a regexp at runtime\n.unbind REGEXP unbinds to a regexp at runtime";
- public static final String helpmsg = "usage: java fr.dgac.ivy.Probe [options] [regexp]\n\t-b BUS\tspecifies the Ivy bus domain\n\t-n ivyname (default JPROBE)\n\t-q\tquiet, no tty output\n\t-d\tdebug\n\t-t\ttime stamp each message\n\t-h\thelp\n\n\t regexp is a Perl5 compatible regular expression";
+ public static final String helpmsg = "usage: java fr.dgac.ivy.Probe [options] [regexp]\n\t-b BUS\tspecifies the Ivy bus domain\n\t-n ivyname (default JPROBE)\n\t-q\tquiet, no tty output\n\t-d\tdebug\n\t-t\ttime stamp each message\n\t-s\tsends to self\n\t-h\thelp\n\n\t regexp is a Perl5 compatible regular expression";
public static void main(String[] args) throws IvyException {
- Getopt opt = new Getopt("Probe",args,"n:b:dqht");
+ Getopt opt = new Getopt("Probe",args,"n:b:dqsht");
int c;
boolean timestamp=false;
boolean quiet=false;
+ boolean sendsToSelf=false;
String domain=Ivy.getDomain(null);
String name="JPROBE";
while ((c = opt.getopt()) != -1) switch (c) {
@@ -62,17 +66,24 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, Runnab
case 'n': name=opt.getOptarg(); break;
case 'q': quiet=true; break;
case 't': timestamp=true; break;
+ case 's': sendsToSelf=true; break;
case 'h':
default: System.out.println(helpmsg); System.exit(0);
}
Probe p = new Probe(new BufferedReader(new InputStreamReader(System.in)),timestamp,quiet,System.getProperty("IVY_DEBUG")!=null);
p.setExitOnDie(true);
Ivy bus=new Ivy(name,name+" ready",null);
+ bus.addBindListener(p);
+ bus.sendToSelf(sendsToSelf);
for (int i=opt.getOptind();i<args.length;i++) {
- if (!quiet) System.out.println("you want to subscribe to " + args[i]);
- bus.bindMsg(args[i],p);
+ try {
+ bus.bindMsg(args[i],p);
+ if (!quiet) System.out.println("you have subscribed to " + args[i]);
+ } catch (IvyException ie) {
+ System.out.println("you have not subscribed to " + args[i]+ ", this regexp is invalid");
+ }
}
- if (!quiet) System.out.println(bus.domains(domain));
+ if (!quiet) System.out.println("broadcasting on "+bus.domains(domain));
bus.start(domain);
p.start(bus);
}
@@ -135,34 +146,44 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, Runnab
REMatch result;
traceDebug("parsing the ["+s+"] (length "+s.length()+") string");
// crude parsing of the ".xyz" commands
- // TODO use regexps instends of String.lastIndexOf(String). Example
- // provided with .direct !
if (s.length()==0) {
- println("-> Sent to " +bus.sendMsg(s)+" peers");
+ try {
+ println("-> Sent to " +bus.sendMsg(s)+" peers");
+ } catch (IvyException ie) {
+ println("-> not sent, the message contains incorrect characters");
+ }
} else if ((result=directMsgRE.getMatch(s))!=null) {
String target = result.toString(1);
int id = Integer.parseInt(result.toString(2));
String message = result.toString(3);
Vector v=bus.getIvyClientsByName(target);
if (v.size()==0) println("no Ivy client with the name \""+target+"\"");
- for (int i=0;i<v.size();i++) ((IvyClient)v.elementAt(i)).sendDirectMsg(id,message);
+ try {
+ for (int i=0;i<v.size();i++) ((IvyClient)v.elementAt(i)).sendDirectMsg(id,message);
+ } catch (IvyException ie) {
+ println("-> not sent, the message contains incorrect characters");
+ }
return;
} else if (s.lastIndexOf(".die ")>=0){
String target=s.substring(5);
Vector v=bus.getIvyClientsByName(target);
if (v.size()==0) println("no Ivy client with the name \""+target+"\"");
- for (int i=0;i<v.size();i++) ((IvyClient)v.elementAt(i)).sendDie();
+ for (int i=0;i<v.size();i++) ((IvyClient)v.elementAt(i)).sendDie("java probe wants you to leave the bus");
} else if (s.lastIndexOf(".unbind ")>=0){
String regexp=s.substring(8);
if (bus.unBindMsg(regexp)) {
- println("you want to unsubscribe to " + regexp);
+ println("you have unsubscribed to " + regexp);
} else {
println("you can't unsubscribe to " + regexp + ", your're not subscribed to it");
}
} else if (s.lastIndexOf(".bind ")>=0){
String regexp=s.substring(6);
- println("you want to subscribe to " + regexp);
- bus.bindMsg(regexp,this);
+ try {
+ bus.bindMsg(regexp,this);
+ println("you have now subscribed to " + regexp);
+ } catch (IvyException ie) {
+ System.out.println("warning, the regular expression '" + regexp + "' is invalid. Not bound !");
+ }
} else if (s.lastIndexOf(".ping ")>=0){
String target=s.substring(6);
Vector v=bus.getIvyClientsByName(target);
@@ -187,28 +208,40 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, Runnab
println("this command is not recognized");
println(helpCommands);
} else {
- println("-> Sent to " +bus.sendMsg(s)+" peers");
+ try {
+ println("-> Sent to " +bus.sendMsg(s)+" peers");
+ } catch (IvyException ie) {
+ println("-> not sent, the line contains incorrect characters");
+ }
}
} // parseCommand
+ public void bindPerformed(IvyClient client,int id,String re) {
+ println(client.getApplicationName() + " subscribes to " +re );
+ }
+
+ public void unbindPerformed(IvyClient client,int id,String re) {
+ println(client.getApplicationName() + " unsubscribes to " +re );
+ }
+
public void connect(IvyClient client) {
println(client.getApplicationName() + " connected " );
- for (java.util.Enumeration e=client.getRegexps();e.hasMoreElements();)
- println(client.getApplicationName() + " subscribes to " +e.nextElement() );
+ // for (java.util.Enumeration e=client.getRegexps();e.hasMoreElements();)
+ // println(client.getApplicationName() + " subscribes to " +e.nextElement() );
}
public void disconnect(IvyClient client) {
println(client.getApplicationName() + " disconnected " );
}
- public void die(IvyClient client, int id) {
- println("received die msg from " + client.getApplicationName() +" good bye");
+ public void die(IvyClient client, int id,String msgarg) {
+ println("received die msg from " + client.getApplicationName() +" with the message: "+msgarg+", good bye");
/* I cannot stop the readLine(), because it is native code */
if (exitOnDie) System.exit(0);
}
public void directMessage(IvyClient client, int id, String arg) {
- println(client.getApplicationName() + " direct Message "+ id + arg );
+ println(client.getApplicationName() + " sent the direct Message, id: "+ id + ", arg: "+ arg );
}
public void receive(IvyClient client, String[] args) {
diff --git a/src/SelfIvyClient.java b/src/SelfIvyClient.java
new file mode 100644
index 0000000..09d9f80
--- /dev/null
+++ b/src/SelfIvyClient.java
@@ -0,0 +1,144 @@
+/**
+ * A private Class for ourself on the bus
+ *
+ * @author Yannick Jestin
+ * @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
+ * @since 1.2.4
+ *
+ * CHANGELOG:
+ * 1.2.4:
+ * - adds a the threaded option for callbacks
+ * - Matthieu's bugreport on unBindMsg()
+ */
+
+package fr.dgac.ivy ;
+import java.util.*;
+import gnu.regexp.*;
+
+class SelfIvyClient extends IvyClient {
+
+ private Ivy bus;
+ private static int serial=0; /* an unique ID for each regexp */
+ private Hashtable callbacks=new Hashtable();
+ private Hashtable threadedFlag=new Hashtable();
+ private boolean massThreaded=false;
+
+ public void sendDirectMsg(int id,String message) {
+ bus.directMessage(this,id,message);
+ }
+ public void sendDie(String message) { bus.dieReceived(this,0,message); }
+
+ protected SelfIvyClient(Ivy bus,String appName) {
+ this.bus=bus;
+ this.protocol=Ivy.PROTOCOLVERSION;
+ this.appName=appName;
+ }
+
+ synchronized protected int bindMsg(String sregexp, IvyMessageListener callback, boolean threaded ) throws IvyException {
+ // creates a new binding (regexp,callback)
+ try {
+ RE re=new RE(sregexp);
+ Integer key = new Integer(serial++);
+ regexps.put(key,re);
+ regexpsText.put(key,sregexp);
+ callbacks.put(key,callback);
+ threadedFlag.put(key,new Boolean(threaded));
+ return key.intValue();
+ } catch (REException ree) {
+ throw new IvyException("Invalid regexp " + sregexp);
+ }
+ }
+
+ synchronized protected void unBindMsg(int id) throws IvyException {
+ Integer key = new Integer(id);
+ if ( ( regexps.remove(key) == null )
+ || (regexpsText.remove(key) == null )
+ || (callbacks.remove(key) == null )
+ || (threadedFlag.remove(key) == null )
+ )
+ throw new IvyException("client wants to remove an unexistant regexp "+id);
+ }
+
+ synchronized protected boolean unBindMsg(String re) {
+ for (Enumeration e=regexpsText.keys();e.hasMoreElements();) {
+ Integer k = (Integer)e.nextElement();
+ if ( ((String)regexps.get(k)).compareTo(re) == 0) {
+ try {
+ bus.unBindMsg(k.intValue());
+ } catch (IvyException ie) {
+ return false;
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected int sendSelfMsg(String message) {
+ int count = 0;
+ for (Enumeration e = regexps.keys();e.hasMoreElements();) {
+ Integer key = (Integer)e.nextElement();
+ RE regexp = (RE)regexps.get(key);
+ String sre = (String)regexpsText.get(key);
+ int nb = regexp.getNumSubs();
+ REMatch result = regexp.getMatch(message);
+ if (result==null) continue;
+ count++;
+ callCallback(this,key,toArgs(nb,result));
+ }
+ return count;
+ }
+
+ protected void callCallback(IvyClient client, Integer key, String[] tab) {
+ IvyMessageListener callback=(IvyMessageListener)callbacks.get(key);
+ boolean threaded=((Boolean)threadedFlag.get(key)).booleanValue();
+ if (callback==null) {
+ System.out.println("(callCallback) Not regexp matching id "+key.intValue());
+ System.exit(0);
+ }
+ if (!threaded) {
+ // runs the callback in the same thread
+ callback.receive(client, tab);
+ } else {
+ // starts a new Thread for each callback ...
+ new Runner(callback,client,tab);
+ }
+ }
+
+ private String[] toArgs(int nb,REMatch result) {
+ String[] args = new String[nb];
+ for(int sub=1;sub<=nb;sub++) {
+ args[sub-1]=result.toString(sub);
+ if (bus.doProtectNewlines) args[sub-1]=decode(args[sub-1]);
+ }
+ return args;
+ }
+
+ public String toString() { return "IvyClient (ourself)"+bus.appName+":"+appName; }
+
+ // a class to perform the threaded execution of each new message
+ // this is an experimental feature introduced in 1.2.4
+ class Runner implements Runnable {
+ IvyMessageListener cb;
+ IvyClient c;
+ String[] args;
+ private Thread t;
+ public Runner(IvyMessageListener cb,IvyClient c,String[] args) {
+ this.cb=cb;
+ this.args=args;
+ this.c=c;
+ t=new Thread(Runner.this);
+ bus.registerThread(t);
+ t.start();
+ bus.unRegisterThread(t);
+ }
+ public void run() { cb.receive(c,args); }
+ } // class Runner
+
+ private void traceDebug(String s){
+ if (debug)
+ System.out.println("-->SelfIvyClient "+bus.appName+":"+appName+"<-- "+s);
+ }
+
+}
+/* EOF */