From 64c95fd653177361f7e691ab345ed54d42ef6aed Mon Sep 17 00:00:00 2001
From: jestin
Date: Mon, 12 Jan 2004 09:48:49 +0000
Subject: 1.2.5 pre
---
src/Ivy.java | 358 ++++++++++++++++++++++++++++++----------
src/IvyApplicationAdapter.java | 4 +-
src/IvyApplicationListener.java | 8 +-
src/IvyBindListener.java | 30 ++++
src/IvyClient.java | 269 ++++++++++++++++++++----------
src/IvyDaemon.java | 6 +-
src/IvyWatcher.java | 91 ++++++----
src/Makefile | 25 ++-
src/Probe.java | 73 +++++---
src/SelfIvyClient.java | 144 ++++++++++++++++
10 files changed, 768 insertions(+), 240 deletions(-)
create mode 100644 src/IvyBindListener.java
create mode 100644 src/SelfIvyClient.java
(limited to 'src')
diff --git a/src/Ivy.java b/src/Ivy.java
index b11d546..e3cc0bb 100755
--- a/src/Ivy.java
+++ b/src/Ivy.java
@@ -7,10 +7,28 @@
*
*Ivy bus = new Ivy("Dummy agent","ready",null);
*bus.bindMsg("(.*)",myMessageListener);
- *bus.start(null);
+ *bus.start(getDomain(null));
*
*
* CHANGELOG:
+ * 1.2.4:
+ * - added an accessor for doSendToSelf
+ * - waitForMsg() and waitForClient() to make the synchronization with
+ * other Ivy agents easier
+ * - with the bindAsyncMsg() to subscribe and perform each callback in a
+ * new Thread
+ * - bindMsg(regexp,messagelistener,boolean) allow to subscribe with a
+ * synchrone/asynch exectution
+ * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg()
+ * - bindListener are now handled
+ * - removeApplicationListener can throw IvyException
+ * - bus.start(null) now starts on getDomain(null), first the IVYBUS
+ * property, then the DEFAULT_DOMAIN, 127:2010
+ * - bindMsg() now throws an IvyException if the regexp is invalid !!!
+ * BEWARE, this can impact lots of programs ! (fixes J007)
+ * - no more includes the "broadcasting on " in the domain(String) method
+ * - new function sendToSelf(boolean) allow us to send messages to
+ * ourselves
* 1.2.3:
* - adds a IVYBUS property to propagate the domain once set. This way,
* children forked through Ivy java can inherit from the current value.
@@ -19,7 +37,7 @@
* added the String domains(String d) function, in order to display the
* domain list
* 1.2.1:
- * bus.start(null) now starts on DEFAULT_DOMAIN
+ * bus.start(null) now starts on DEFAULT_DOMAIN. ( correction 1.2.4 This was not true.)
* added the getDomains in order to correctly display the domain list
* checks if the serverThread exists before interrupting it
* no has unBindMsg(String)
@@ -44,11 +62,12 @@ public class Ivy implements Runnable {
/**
* the name of the application on the bus
*/
- String appName;
+ protected String appName;
/**
* the protocol version number
*/
- public static final int PROCOCOLVERSION = 3 ;
+ public static final int PROTOCOLVERSION = 3 ;
+ public static final int PROTOCOLMINIMUM = 3 ;
/**
* the port for the UDP rendez vous, if none is supplied
*/
@@ -61,24 +80,23 @@ public class Ivy implements Runnable {
* the library version, useful for development purposes only, when java is
* invoked with -DIVY_DEBUG
*/
- public static final String libVersion ="1.2.3";
+ public static final String libVersion ="1.2.4";
private boolean debug;
- private static int serial=0; /* an unique ID for each regexp */
private static int clientSerial=0; /* an unique ID for each IvyClient */
private ServerSocket app;
private Vector watchers;
private volatile Thread serverThread; // to ensure quick communication of the end
- private Hashtable callbacks = new Hashtable();
private Hashtable clients = new Hashtable();
private Vector ivyApplicationListenerList = new Vector();
- private String messages_classes[] = null;
- private boolean sendToSelf = false ;
+ private Vector ivyBindListenerList = new Vector();
+ private Vector sendThreads = new Vector();
private boolean stopped = false;
- int applicationPort; /* Application port number */
- Hashtable regexp_out = new Hashtable();
- String ready_message = null;
-
+ protected int applicationPort; /* Application port number */
+ protected String ready_message = null;
+ protected boolean doProtectNewlines = false ;
+ private boolean doSendToSelf = false ;
+ protected SelfIvyClient selfIvyClient ;
public final static int TIMEOUTLENGTH = 3000;
/**
@@ -96,6 +114,49 @@ public class Ivy implements Runnable {
ready_message = message;
debug = (System.getProperty("IVY_DEBUG")!=null);
if ( appcb != null ) ivyApplicationListenerList.addElement( appcb );
+ selfIvyClient=new SelfIvyClient(this,name);
+ }
+
+ /**
+ * Waits for a message to be received
+ *
+ * @since 1.2.4
+ * @param regexp the message we're waiting for to continue the main thread.
+ * @param timeout in millisecond, 0 if infinite
+ * @return the IvyClient who sent the message, or null if the timeout is
+ * reached
+ */
+ public IvyClient waitForMsg(String regexp,int timeout) throws IvyException {
+ Waiter w = new Waiter(timeout);
+ int re = bindMsg(regexp,w);
+ IvyClient ic=w.waitFor();
+ unBindMsg(re);
+ return ic;
+ }
+
+ /**
+ * Waits for an other IvyClient to join the bus
+ *
+ * @since 1.2.4
+ * @param name the name of the client we're waiting for to continue the main thread.
+ * @param timeout in millisecond, 0 if infinite
+ * @return the first IvyClient with the name or null if the timeout is
+ * reached
+ */
+ public IvyClient waitForClient(String name,int timeout) throws IvyException {
+ IvyClient ic;
+ if (name==null) throw new IvyException("null name given to waitForClient");
+ // first check if client with the same name is on the bus
+ for (Enumeration e=clients.elements();e.hasMoreElements();) {
+ ic = (IvyClient)e.nextElement();
+ if (name.compareTo(ic.getApplicationName())==0) return ic;
+ }
+ // if not enter the waiting loop
+ WaiterClient w = new WaiterClient(name,timeout);
+ int i = addApplicationListener(w);
+ ic=w.waitForClient();
+ removeApplicationListener(i);
+ return ic;
}
/**
@@ -113,7 +174,7 @@ public class Ivy implements Runnable {
*
*/
public void start(String domainbus) throws IvyException {
- if (domainbus==null) domainbus=DEFAULT_DOMAIN;
+ if (domainbus==null) domainbus=getDomain(null);
Properties sysProp = System.getProperties();
sysProp.put("IVYBUS",domainbus);
try {
@@ -123,7 +184,7 @@ public class Ivy implements Runnable {
} catch (IOException e) {
throw new IvyException("can't open TCP service socket " + e );
}
- traceDebug("lib: "+libVersion+" protocol: "+PROCOCOLVERSION+" TCP service open on port "+applicationPort);
+ traceDebug("lib: "+libVersion+" protocol: "+PROTOCOLVERSION+" TCP service open on port "+applicationPort);
watchers = new Vector();
Domain[] d = parseDomains(domainbus);
@@ -144,7 +205,11 @@ public class Ivy implements Runnable {
int index=0;
while ( st.hasMoreTokens()) {
String s = st.nextToken() ;
- d[index++]=new Domain(IvyWatcher.getDomain(s),IvyWatcher.getPort(s));
+ try {
+ d[index++]=new Domain(IvyWatcher.getDomain(s),IvyWatcher.getPort(s));
+ } catch (IvyException ie) {
+ // do nothing
+ }
}
return d;
}
@@ -177,31 +242,83 @@ public class Ivy implements Runnable {
}
/**
- * Toggles the sending of messages to oneself
+ * Toggles the sending of messages to oneself, the remote client's
+ * IvyMessageListeners are processed first, and ourself afterwards.
+ * @param boolean true if you want to send the message to yourself. Default
+ * is false
+ * @since 1.2.4
+ */
+ public void sendToSelf(boolean b) {doSendToSelf=b;}
+
+ /**
+ * @param boolean do I send message to myself ?
+ * @since 1.2.4
+ */
+ public boolean isSendToSelf() {return doSendToSelf;}
+
+ /**
+ * returns our self IvyClient.
+ * @since 1.2.4
+ */
+ public IvyClient getSelfIvyClient() {return selfIvyClient;}
+
+ /**
+ * Toggles the encoding/decoding of messages to prevent bugs related to the
+ * presence of a "\n"
+ * @param boolean true if you want to send the message to yourself. Default
+ * is false.
+ * @since 1.2.5?
+ * The default escape character is a ESC 0x1A
+ */
+ private void protectNewlines(boolean b) {doProtectNewlines=b;}
+
+ /**
+ * Performs a pattern matching according to everyone's regexps, and sends
+ * the results to the relevant ivy agents sequentially
+ *
+ * @param message A String which will be compared to the regular
+ * expressions of the different clients
+ * @return the number of messages actually sent
+ */
+ public int sendMsg(String message) throws IvyException {
+ return sendMsg(message,false);
+ }
+
+ /**
+ * Performs a pattern matching according to everyone's regexps, and sends
+ * the results to the relevant ivy agents, using as many threads as needed.
*
+ * @since 1.2.4
+ * @param message A String which will be compared to the regular
+ * expressions of the different clients
+ * @return always returns -1
*/
- public void sendToSelf(boolean b) {sendToSelf=b;}
+ public int sendAsyncMsg(String message,boolean async) throws IvyException {
+ return sendMsg(message,true);
+ }
/**
* Performs a pattern matching according to everyone's regexps, and sends
* the results to the relevant ivy agents.
- * There is one thread for each client connected, we could also
- * create another thread each time we send a message.
+ *
+ * @since 1.2.4
* @param message A String which will be compared to the regular
* expressions of the different clients
- * @return the number of messages actually sent
+ * @param async if true, the sending will be performed in a separate thread,
+ * default is false
+ * @return if async is true, always returns -1, else returns the number of messages actually sent
*/
- public int sendMsg( String message ) {
+ public int sendMsg(String message,boolean async) throws IvyException {
int count = 0;
- // an alternate implementation would one sender thread per client
- // instead of one for all the clients. It might be a performance issue
+ if (doProtectNewlines)
+ message=IvyClient.encode(message);
+ else if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
+ throw new IvyException("newline character not allowed in Ivy messages");
for ( Enumeration e=clients.elements();e.hasMoreElements();) {
IvyClient client = (IvyClient)e.nextElement();
- count += client.sendMsg( message );
- }
- if (sendToSelf) {
- // TODO
+ count += client.sendMsg(message, async);
}
+ if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(message);
return count;
}
@@ -221,17 +338,57 @@ public class Ivy implements Runnable {
* interface, on the AWT/Swing framework
* @return the id of the regular expression
*/
- public int bindMsg(String regexp, IvyMessageListener callback ) {
- // creates a new binding (regexp,callback)
- Integer key = new Integer(serial++);
- regexp_out.put(key,regexp);
- callbacks.put(key,callback );
+ public int bindMsg(String sregexp, IvyMessageListener callback ) throws IvyException {
+ return bindMsg(sregexp,callback,false);
+ }
+
+ /**
+ * Subscribes to a regular expression with asyncrhonous callback execution.
+ *
+ * Same as bindMsg, except that the callback will be executed in a separate
+ * thread each time.
+ * WARNING : there is no way to predict the order of execution
+ * of the * callbacks, i.e. a message received might trigger a callback before
+ * another one sent before
+ *
+ * @since 1.2.4
+ * @param regexp a perl regular expression, groups are done with parenthesis
+ * @param callback any objects implementing the IvyMessageListener
+ * interface, on the AWT/Swing framework
+ * @return the int ID of the regular expression.
+ */
+ public int bindAsyncMsg(String sregexp, IvyMessageListener callback ) throws IvyException {
+ return bindMsg(sregexp,callback,true);
+ }
+
+ /**
+ * Subscribes to a regular expression.
+ *
+ * The callback will be executed with
+ * the saved parameters of the regexp as arguments when a message will sent
+ * by another agent. A program doesn't receive its own messages.
+ *
Example:
+ *
the Ivy agent A performs
b.bindMsg("^Hello (*)",cb);
+ *
the Ivy agent B performs b2.sendMsg("Hello world");
+ *
a thread in A will uun the callback cb with its second argument set
+ * to a array of String, with one single element, "world"
+ * @since 1.2.4
+ * @param regexp a perl regular expression, groups are done with parenthesis
+ * @param callback any objects implementing the IvyMessageListener
+ * interface, on the AWT/Swing framework
+ * @param async if true, each callback will be run in a separate thread,
+ * default is false
+ * @return the id of the regular expression
+ */
+ public int bindMsg(String sregexp, IvyMessageListener callback,boolean async ) throws IvyException {
+ // adds the regexp to our collection in selfIvyClient
+ int key=selfIvyClient.bindMsg(sregexp,callback,async);
// notifies the other clients this new regexp
for (Enumeration e=clients.elements();e.hasMoreElements();) {
IvyClient c = (IvyClient)e.nextElement();
- c.sendRegexp(key.intValue(),regexp);
+ c.sendRegexp(key,sregexp);
}
- return key.intValue();
+ return key;
}
/**
@@ -240,14 +397,9 @@ public class Ivy implements Runnable {
* @param id the id of the regular expression, returned when it was bound
*/
public void unBindMsg(int id) throws IvyException {
- Integer key = new Integer(id);
- if ( ( regexp_out.remove(key) == null )
- || (callbacks.remove(key) == null ) ) {
- throw new IvyException("client wants to remove an unexistant regexp "+id);
- }
- for (Enumeration e=clients.elements();e.hasMoreElements();) {
+ selfIvyClient.unBindMsg(id);
+ for (Enumeration e=clients.elements();e.hasMoreElements();)
((IvyClient)e.nextElement()).delRegexp(id );
- }
}
/**
@@ -258,18 +410,31 @@ public class Ivy implements Runnable {
* @param String the string for the regular expression
*/
public boolean unBindMsg(String re) {
- for (Enumeration e=regexp_out.keys();e.hasMoreElements();) {
- Integer k = (Integer)e.nextElement();
- if ( ((String)regexp_out.get(k)).compareTo(re) == 0) {
- try {
- unBindMsg(k.intValue());
- } catch (IvyException ie) {
- return false;
- }
- return true;
- }
+ return selfIvyClient.unBindMsg(re);
+ }
+
+ /**
+ * adds a bind listener to a bus
+ * @param callback is an object implementing the IvyBindListener interface
+ * @return the id of the bind listener, useful if you wish to remove it later
+ * @since 1.2.4
+ */
+ public int addBindListener(IvyBindListener callback){
+ ivyBindListenerList.addElement(callback);
+ return ivyBindListenerList.indexOf(callback);
+ }
+
+ /**
+ * removes a bind listener
+ * @param id the id of the bind listener to remove
+ * @since 1.2.4
+ */
+ public void removeBindListener(int id) throws IvyException {
+ try {
+ ivyBindListenerList.removeElementAt(id);
+ } catch (ArrayIndexOutOfBoundsException aie) {
+ throw new IvyException(id+" is not a valid Id");
}
- return false;
}
/**
@@ -281,32 +446,49 @@ public class Ivy implements Runnable {
*/
public int addApplicationListener(IvyApplicationListener callback){
ivyApplicationListenerList.addElement(callback);
- int id = ivyApplicationListenerList.indexOf( callback );
- return id;
+ return ivyApplicationListenerList.indexOf( callback );
}
/**
* removes an application listener
* @param id the id of the application listener to remove
*/
- public void removeApplicationListener(int id){
- ivyApplicationListenerList.removeElementAt(id);
+ public void removeApplicationListener(int id) throws IvyException {
+ try {
+ ivyApplicationListenerList.removeElementAt(id);
+ } catch (ArrayIndexOutOfBoundsException aie) {
+ throw new IvyException(id+" is not a valid Id");
+ }
}
/* invokes the application listeners upon arrival of a new Ivy client */
- public void connect(IvyClient client){
+ protected void clientConnects(IvyClient client){
for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) {
((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).connect(client);
}
}
- /* invokes the application listeners upon arrival of a new Ivy client */
- void disconnectReceived(IvyClient client){
+ /* invokes the application listeners upon the departure of an Ivy client */
+ protected void clientDisconnects(IvyClient client){
for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) {
((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).disconnect(client);
}
}
+ /* invokes the bind listeners */
+ protected void regexpReceived(IvyClient client,int id,String sregexp){
+ for ( int i = 0 ; i < ivyBindListenerList.size(); i++ ) {
+ ((IvyBindListener)ivyBindListenerList.elementAt(i)).bindPerformed(client,id,sregexp);
+ }
+ }
+
+ /* invokes the bind listeners */
+ protected void regexpDeleted(IvyClient client,int id,String sregexp){
+ for ( int i = 0 ; i < ivyBindListenerList.size(); i++ ) {
+ ((IvyBindListener)ivyBindListenerList.elementAt(i)).unbindPerformed(client,id,sregexp);
+ }
+ }
+
/*
* removes a client from the list
*/
@@ -316,9 +498,9 @@ public class Ivy implements Runnable {
* invokes the application listeners when we are summoned to die
* then stops
*/
- public void dieReceived(IvyClient client, int id){
+ public void dieReceived(IvyClient client, int id,String message){
for ( int i=0 ;iivy<-- "+s); }
+ synchronized void registerThread(Thread t) { sendThreads.add(t); }
+ synchronized void unRegisterThread(Thread t) { sendThreads.remove(t); }
+ synchronized Thread getOneThread() {
+ if (sendThreads.size()==0) return null;
+ return (Thread) sendThreads.firstElement();
+ }
+
+ void waitForAll() {
+ Thread t;
+ System.out.println("DEVELOPMENT WAITFORALL sendThreads size : " + sendThreads.size());
+ try { while ((t=getOneThread())!=null) { t.join(); } }
+ catch (InterruptedException ie) { System.out.println("waitForAll Interrupted"); }
+ System.out.println("DEVELOPMENT END WAITFORALL");
+ }
+
/* a small private method for debbugging purposes */
private String getClientNames() {
String s = appName+" clients are: ";
@@ -469,7 +644,7 @@ public class Ivy implements Runnable {
}
public String domains(String toparse) {
- String s="broadcasting on ";
+ String s="";
Ivy.Domain[] d = parseDomains(toparse);
for (int index=0;indexhttp://www.tls.cena.fr/products/ivy/
*
- * an ApplicatinListener class for handling application-level request on the
+ * an ApplicationListener class for handling application-level request on the
* Ivy bus. The methods in this class are empty. This class exists as a
* convenience for implementing a subset of the methods of the
* applicationlistener. See the AWT 1.1 framework for further information on
@@ -19,6 +19,6 @@ package fr.dgac.ivy;
public abstract class IvyApplicationAdapter implements IvyApplicationListener {
public void connect( IvyClient client ) { }
public void disconnect( IvyClient client ) { }
- public void die( IvyClient client, int id ) { }
+ public void die( IvyClient client, int id, String msgarg) { }
public void directMessage( IvyClient client, int id,String msgarg ) {}
}
diff --git a/src/IvyApplicationListener.java b/src/IvyApplicationListener.java
index bdfb12c..456a85f 100755
--- a/src/IvyApplicationListener.java
+++ b/src/IvyApplicationListener.java
@@ -9,6 +9,11 @@ package fr.dgac.ivy;
*
* The ApplicatinListenr for receiving application level events on the Ivy
* bus: connexion, disconnexion, direct messages or requests to quit.
+ *
+ * Changelog:
+ * 1.2.4
+ * - sendDie now requires a String argument ! It is MANDATORY, and could
+ * impact your implementations !
*/
public interface IvyApplicationListener extends java.util.EventListener {
@@ -26,13 +31,12 @@ public interface IvyApplicationListener extends java.util.EventListener {
* invoked when a peer request us to leave the bus
* @param client the peer
*/
- public abstract void die(IvyClient client, int id);
+ public abstract void die(IvyClient client, int id,String msgarg);
/**
* invoked when a peer sends us a direct message
* @param client the peer
* @param id
* @param msgarg the message itself
- * this is not yet implemented in java. I believe it has no real use :)
*/
public abstract void directMessage( IvyClient client, int id,String msgarg );
}
diff --git a/src/IvyBindListener.java b/src/IvyBindListener.java
new file mode 100644
index 0000000..96096f2
--- /dev/null
+++ b/src/IvyBindListener.java
@@ -0,0 +1,30 @@
+package fr.dgac.ivy;
+
+/**
+ * this interface specifies the methods of a BindListener
+ *
+ * @author Yannick Jestin
+ * @author http://www.tls.cena.fr/products/ivy/
+ *
+ * Changelog:
+ */
+
+public interface IvyBindListener extends java.util.EventListener {
+
+ /**
+ * invoked when a Ivy Client performs a bind
+ * @param client the peer
+ * @param int the regexp ID
+ * @param regexp the regexp
+ */
+ public abstract void bindPerformed(IvyClient client,int id, String regexp);
+
+ /**
+ * invoked when a Ivy Client performs a unbind
+ * @param client the peer
+ * @param int the regexp ID
+ * @param regexp the regexp
+ */
+ public abstract void unbindPerformed(IvyClient client,int id,String regexp);
+
+}
diff --git a/src/IvyClient.java b/src/IvyClient.java
index 8aad199..b69c8d3 100755
--- a/src/IvyClient.java
+++ b/src/IvyClient.java
@@ -10,6 +10,18 @@
* created for each remote client.
*
* CHANGELOG:
+ * 1.2.4:
+ * - sendBuffer goes synchronized
+ * - sendMsg now has a async parameter, allowing the use of threads to
+ * delegate the sending of messages
+ * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg()
+ * - breaks the connexion with faulty Ivy clients (either protocol or invalid
+ * regexps, closes bug J007 (CM))
+ * - sendDie now always requires a reason
+ * - invokes the disconnect applicationListeners at the end of the run()
+ * loop.
+ * closes Bug J006 (YJ)
+ * - changed the access of some functions ( sendRegexp, etc ) to protected
* 1.2.3:
* - silently stops on InterruptedIOException.
* - direct Messages
@@ -50,54 +62,64 @@ public class IvyClient implements Runnable {
final static int DelRegexp = 4;/* the peer removes one of his regex */
final static int EndRegexp = 5;/* no more regexp in the handshake */
final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */
+ /* SchizoToken is aka BeginRegexp in other implementations */
final static int DirectMsg = 7;/* the peer sends a direct message */
final static int Die = 8; /* the peer wants us to quit */
final static int Ping = 9; /* checks the presence of the other */
final static int Pong = 10; /* checks the presence of the other */
-
final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
final static String StartArg = "\u0002";/* begin of arguments */
final static String EndArg = "\u0003"; /* end of arguments */
+ final static String escape ="\u001A";
+ final static char escapeChar = escape.charAt(0);
+ final static char endArgChar = EndArg.charAt(0);
+ final static char newLineChar = '\n';
-
- private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
+ // private variables
+ private String messages_classes[] = null;
private Ivy bus;
private Socket socket;
private BufferedReader in;
private OutputStream out;
- private Hashtable regexp_in = new Hashtable();
- private Hashtable regexp_text = new Hashtable();
private int appPort;
private boolean peerCalling;
private volatile Thread clientThread;// volatile to ensure the quick communication
private Integer clientKey ;
private static boolean doping = (System.getProperty("IVY_PING")!=null) ;
- final static int PINGTIMEOUT = 5000;
+ private final static int PINGTIMEOUT = 5000;
private PINGER pinger;
private volatile Thread pingerThread;
+ private boolean discCallbackPerformed = false;
// protected variables
String appName;
+ Hashtable regexps = new Hashtable();
+ Hashtable regexpsText = new Hashtable();
+ static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
+ int protocol;
+
+ IvyClient(){}
- IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey) throws IOException {
+ IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException {
appName = "Unknown";
appPort = 0;
this.bus = bus;
this.socket = socket;
this.peerCalling=peerCalling;
this.clientKey=clientKey;
+ this.protocol=protocol;
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = socket.getOutputStream();
- Hashtable regexps=bus.regexp_out;
+ Hashtable tosend=bus.selfIvyClient.regexpsText;
// sends our ID, whether we initiated the connexion or not
// the ID is the couple "host name,application Port", the host name
// information is in the socket itself, the port is not known if we
// initiate the connexion
send(SchizoToken,bus.applicationPort,bus.appName);
// sends our regexps to the peer
- for (Enumeration e = regexps.keys(); e.hasMoreElements(); ) {
+ for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) {
Integer ikey = (Integer)e.nextElement();
- sendRegexp( ikey.intValue(),(String)regexps.get(ikey));
+ sendRegexp(ikey.intValue(),(String)tosend.get(ikey));
}
send( EndRegexp,0,"");
// spawns a thread to manage the incoming traffic on this
@@ -111,88 +133,138 @@ public class IvyClient implements Runnable {
}
}
+ public String toString() { return "IvyClient "+bus.appName+":"+appName; }
+
/**
* returns the name of the remote agent.
*/
public String getApplicationName() { return appName ; }
- Integer getClientKey() { return clientKey ; }
-
/**
* allow an Ivy package class to access the list of regexps at a
* given time.
* perhaps we should implement a new IvyApplicationListener method to
* allow the notification of regexp addition and deletion
+ * The content is not modifyable because String are not mutable, and cannot
+ * be modified once they are create.
+ * @see getRegexpsArray to get a String[] result
*/
- Enumeration getRegexps() { return regexp_text.elements(); }
+ public Enumeration getRegexps() { return regexpsText.elements(); }
- int getAppPort() { return appPort ; }
-
- void sendRegexp(int id,String regexp) {
- send(AddRegexp,id,regexp); /* perhaps we should perform some checking here */
+ /**
+ * allow an Ivy package class to access the list of regexps at a
+ * given time.
+ * @since 1.2.4
+ */
+ public String[] getRegexpsArray() {
+ String[] s = new String[regexpsText.size()];
+ int i=0;
+ for (Enumeration e=getRegexps();e.hasMoreElements();)
+ s[i++]=(String)e.nextElement();
+ return s;
}
- public void delRegexp(int id) {send( DelRegexp,id,"");}
-
/**
* sends a direct message to the peer
* @param id the numeric value provided to the remote client
* @param message the string that will be match-tested
*/
- public void sendDirectMsg(int id,String message) { send(DirectMsg,id,message); }
+ public void sendDirectMsg(int id,String message) throws IvyException {
+ if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
+ throw new IvyException("newline character not allowed in Ivy messages");
+ send(DirectMsg,id,message);
+ }
- /**
- * closes the connexion to the peer.
- * @param notify should I send Bye message ?
- * the thread managing the socket is stopped
- */
- void close(boolean notify) throws IOException {
+ /* closes the connexion to the peer */
+ protected void close(boolean notify) throws IOException {
+ bus.waitForAll();
traceDebug("closing connexion to "+appName);
if (doping&&(pinger!=null)) { pinger.stopPinging(); }
if (notify) sendBye("hasta la vista");
stopListening();
- // bus.clientDisconnect(this);
- socket.close(); // should I also close in and out ?
+ socket.close();
}
/**
- * sends the substrings of a message to the peer for each matching regexp.
- * @param message the string that will be match-tested
- * @return the number of messages sent to the peer
+ * asks the remote client to leave the bus.
+ * @param message the message that will be carried
*/
- int sendMsg( String message ) {
+ public void sendDie(String message) { send(Die,0,message); }
+
+ /**
+ * checks the "validity" of a regular expression.
+ * @param exp the string being a regular expression
+ * @return true if the regexp is valid
+ * @since 1.2.4
+ */
+ public boolean CheckRegexp( String exp ) {
+ boolean ok = true;
+ if ( exp.startsWith( "^" )&&messages_classes!=null) {
+ ok=false;
+ for (int i=0 ; i < messages_classes.length;i++) {
+ if (messages_classes[i].equals(exp.substring(1))) return true;
+ }
+ }
+ return ok;
+ }
+
+ ///////////////////////////////////////////////////
+ //
+ // PROTECTED METHODS
+ //
+ ///////////////////////////////////////////////////
+ static String decode(String s) { return s.replace(escapeChar,'\n'); }
+ static String encode(String s) { return s.replace('\n',escapeChar); }
+ Integer getClientKey() { return clientKey ; }
+ int getAppPort() { return appPort ; }
+ void sendRegexp(int id,String regexp) {send(AddRegexp,id,regexp);}
+ void delRegexp(int id) {send( DelRegexp,id,"");}
+
+ int sendMsg(String message,boolean async) {
+ if (async) {
+ new Sender(message);
+ return -1;
+ } else { return sendMsg(message); }
+ }
+
+ private int sendMsg(String message) {
int count = 0;
- for (Enumeration e = regexp_in.keys();e.hasMoreElements();) {
+ for (Enumeration e = regexps.keys();e.hasMoreElements();) {
Integer key = (Integer)e.nextElement();
- RE regexp = (RE)regexp_in.get(key);
+ RE regexp = (RE)regexps.get(key);
+ int nb = regexp.getNumSubs();
REMatch result = regexp.getMatch(message);
- if ( result != null ) {
- send(Msg,key,regexp.getNumSubs(),result);
- count++;
- }
+ if (result==null) continue; // no match
+ count++; // match
+ send(Msg,key,regexp.getNumSubs(),result);
}
return count;
}
-
- void stopListening() {
+
+ ///////////////////////////////////////////////////
+ //
+ // PRIVATE METHODS
+ //
+ ///////////////////////////////////////////////////
+
+ /* interrupt the listener thread */
+ private void stopListening() {
Thread t = clientThread;
if (t==null) return; // we can be summoned to quit from two path at a time
clientThread=null;
t.interrupt();
}
- /**
+ /*
* compares two peers the id is the couple (host,service port).
- * @param clnt the other peer
- * @return true if the peers are similir. This should not happen, it is bad
- * © ® (tm)
+ * true if the peers are similir. This should not happen, it is bad
*/
- boolean sameClient( IvyClient clnt ) {
+ protected boolean sameClient( IvyClient clnt ) {
return ( appPort != 0 && appPort == clnt.appPort )
&& ( getRemoteAddress() == clnt.getRemoteAddress() ) ;
}
- /**
+ /*
* the code of the thread handling the incoming messages.
*/
public void run() {
@@ -209,7 +281,10 @@ public class IvyClient implements Runnable {
if ((msg=in.readLine()) != null ) {
if (clientThread!=thisThread) break; // early stop during readLine()
if (doping && (pingerThread!=null)) pingerThread.interrupt();
- newParseMsg(msg);
+ if (!newParseMsg(msg)) {
+ close(true);
+ break;
+ }
} else {
traceDebug("readline null ! leaving the thead");
break;
@@ -226,9 +301,12 @@ public class IvyClient implements Runnable {
}
traceDebug("normally Disconnected from "+ appName);
traceDebug("Thread stopped");
+ // invokes the disconnect applicationListeners
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
}
- private void sendBuffer( String buffer ) throws IvyException {
+ private synchronized void sendBuffer( String buffer ) throws IvyException {
buffer += "\n";
try {
out.write(buffer.getBytes() );
@@ -237,8 +315,9 @@ public class IvyClient implements Runnable {
traceDebug("I can't send my message to this client. He probably left");
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
- // invokes the die applicationListeners
- bus.disconnectReceived(this);
+ // invokes the disconnect applicationListeners
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
try {
close(false);
} catch (IOException ioe) {
@@ -258,12 +337,8 @@ public class IvyClient implements Runnable {
private void send(int type, Integer id, int nbsub, REMatch result) {
String buffer = type+" "+id+StartArg;
- // Start at 1 because group 0 represent entire matching
- for(int sub = 1; sub <= nbsub; sub++) {
- if (result.getStartIndex(sub) > -1) {
- buffer += result.toString(sub)+EndArg;
- }
- }
+ for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1)
+ buffer += result.toString(sub)+EndArg;
try {
sendBuffer(buffer);
} catch (IvyException ie ) {
@@ -293,24 +368,33 @@ public class IvyClient implements Runnable {
return s;
}
- private void newParseMsg(String s) throws IvyException {
+ private boolean newParseMsg(String s) throws IvyException {
byte[] b = s.getBytes();
int from=0,to=0,msgType;
Integer msgId;
while ((to=b.length) throw new IvyException("protocol error");
+ // return false au lieu de throw
+ if (to>=b.length) {
+ System.out.println("protocol error from "+appName);
+ return false;
+ }
try {
msgType = Integer.parseInt(s.substring(from,to));
} catch (NumberFormatException nfe) {
- throw new IvyException("protocol error on msgType");
+ System.out.println("protocol error on msgType from "+appName);
+ return false;
}
from=to+1;
while ((to=b.length) throw new IvyException("protocol error");
+ if (to>=b.length) {
+ System.out.println("protocol error from "+appName);
+ return false;
+ }
try {
msgId = new Integer(s.substring(from,to));
} catch (NumberFormatException nfe) {
- throw new IvyException("protocol error on identifier");
+ System.out.println("protocol error from "+appName+" "+s.substring(from,to)+" is not a number");
+ return false;
}
from=to+1;
switch (msgType) {
@@ -319,7 +403,8 @@ public class IvyClient implements Runnable {
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
- bus.dieReceived(this,msgId.intValue());
+ String message=s.substring(from,b.length);
+ bus.dieReceived(this,msgId.intValue(),message);
// makes the bus die
bus.stop();
try {
@@ -334,7 +419,8 @@ public class IvyClient implements Runnable {
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
- bus.disconnectReceived(this);
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
try {
close(false);
} catch (IOException ioe) {
@@ -343,24 +429,28 @@ public class IvyClient implements Runnable {
break;
case AddRegexp:
String regexp=s.substring(from,b.length);
- if ( bus.CheckRegexp(regexp) ) {
+ if ( CheckRegexp(regexp) ) {
try {
- regexp_in.put(msgId,new RE(regexp));
- regexp_text.put(msgId,regexp);
+ regexps.put(msgId,new RE(regexp));
+ regexpsText.put(msgId,regexp);
+ bus.regexpReceived(this,msgId.intValue(),regexp);
} catch (REException e) {
- throw new IvyException("regexp error " +e.getMessage());
+ // the remote client sent an invalid regexp !
+ System.out.println("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp");
+ return true;
}
} else {
throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName);
}
break;
case DelRegexp:
- regexp_in.remove(msgId);
- regexp_text.remove(msgId);
+ regexps.remove(msgId);
+ String text=(String)regexpsText.remove(msgId);
+ bus.regexpDeleted(this,msgId.intValue(),text);
break;
case EndRegexp:
- bus.connect(this);
- /*
+ bus.clientConnects(this);
+ /* TODO check with the protocol itself.
* the peer is perhaps not ready to handle this message
* an assymetric processing should be written
*/
@@ -371,7 +461,7 @@ public class IvyClient implements Runnable {
while (toIvyClient "+bus.appName+":"+appName+"<-- "+s);
}
@@ -455,4 +539,19 @@ public class IvyClient implements Runnable {
public void stopPinging() { isPinging=false; pingerThread.interrupt();}
}
+ // a class to perform the threaded execution of each new message
+ // this is an experimental feature introduced in 1.2.4
+ class Sender implements Runnable {
+ String message;
+ private Thread t;
+ public Sender(String message) {
+ this.message=message;
+ t=new Thread(Sender.this);
+ bus.registerThread(t);
+ t.start();
+ bus.unRegisterThread(t);
+ }
+ public void run() { sendMsg(message); }
+ } // class Sender
+
}
diff --git a/src/IvyDaemon.java b/src/IvyDaemon.java
index 56019c1..b843a63 100644
--- a/src/IvyDaemon.java
+++ b/src/IvyDaemon.java
@@ -119,7 +119,11 @@ public class IvyDaemon implements Runnable {
while (true) {
msg=in.readLine();
if (msg==null) break;
- bus.sendMsg(msg);
+ try {
+ bus.sendMsg(msg);
+ } catch (IvyException ie) {
+ System.out.println("incorrect characters whithin the message. Not sent");
+ }
}
} catch (IOException ioe) {
traceDebug("Subreader exception ...");
diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java
index 98956ea..7698632 100755
--- a/src/IvyWatcher.java
+++ b/src/IvyWatcher.java
@@ -14,6 +14,17 @@
* thing.
*
* CHANGELOG:
+ * 1.2.5:
+ * - getDomain now sends IvyException for malformed broadcast addresses
+ * 1.2.4:
+ * - sends the broadcast before listening to the other's broadcasts.
+ * TODO wait for all the broadcast to be sent before starting the listen
+ * mode
+ * - (REMOVED) allows the connexion from a remote host with the same port number
+ * it's too complicated to know if the packet is from ourselves...
+ * - deals with the protocol errors in a more efficient way. The goal is not
+ * to loose our connectivity because of a rude agent.
+ * fixes Bug J005 (YJ + JPI)
* 1.2.3:
* - the packet sending is done in its own thread from now on (PacketSender)
* I don't care stopping it, since it can't be blocked.
@@ -51,6 +62,7 @@ class IvyWatcher implements Runnable {
private boolean isMulticastAddress = false;
private Ivy bus; /* master bus controler */
private DatagramSocket broadcast; /* supervision socket */
+ private InetAddress localhost,loopback;
private String domainaddr;
private int port;
private volatile Thread listenThread;
@@ -75,6 +87,9 @@ class IvyWatcher implements Runnable {
((MulticastSocket)broadcast).joinGroup(group);
}
broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH);
+ localhost=InetAddress.getLocalHost();
+ loopback=InetAddress.getByName(null);
+ } catch ( UnknownHostException uhe ) {
} catch ( IOException e ) {
throw new IvyException("IvyWatcher I/O error" + e );
}
@@ -89,9 +104,9 @@ class IvyWatcher implements Runnable {
traceDebug("beginning of a watcher Thread");
byte buf[] = new byte[256];
DatagramPacket packet=new DatagramPacket(buf, 256);
+ InetAddress remotehost=null;
try {
while( listenThread==thisThread ) {
- int port;
try {
broadcast.receive(packet);
if (listenThread!=thisThread) break; // I was summoned to leave during the receive
@@ -99,37 +114,43 @@ class IvyWatcher implements Runnable {
for (int i=0;i Sent to " +bus.sendMsg(s)+" peers");
+ try {
+ println("-> Sent to " +bus.sendMsg(s)+" peers");
+ } catch (IvyException ie) {
+ println("-> not sent, the message contains incorrect characters");
+ }
} else if ((result=directMsgRE.getMatch(s))!=null) {
String target = result.toString(1);
int id = Integer.parseInt(result.toString(2));
String message = result.toString(3);
Vector v=bus.getIvyClientsByName(target);
if (v.size()==0) println("no Ivy client with the name \""+target+"\"");
- for (int i=0;i not sent, the message contains incorrect characters");
+ }
return;
} else if (s.lastIndexOf(".die ")>=0){
String target=s.substring(5);
Vector v=bus.getIvyClientsByName(target);
if (v.size()==0) println("no Ivy client with the name \""+target+"\"");
- for (int i=0;i=0){
String regexp=s.substring(8);
if (bus.unBindMsg(regexp)) {
- println("you want to unsubscribe to " + regexp);
+ println("you have unsubscribed to " + regexp);
} else {
println("you can't unsubscribe to " + regexp + ", your're not subscribed to it");
}
} else if (s.lastIndexOf(".bind ")>=0){
String regexp=s.substring(6);
- println("you want to subscribe to " + regexp);
- bus.bindMsg(regexp,this);
+ try {
+ bus.bindMsg(regexp,this);
+ println("you have now subscribed to " + regexp);
+ } catch (IvyException ie) {
+ System.out.println("warning, the regular expression '" + regexp + "' is invalid. Not bound !");
+ }
} else if (s.lastIndexOf(".ping ")>=0){
String target=s.substring(6);
Vector v=bus.getIvyClientsByName(target);
@@ -187,28 +208,40 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, Runnab
println("this command is not recognized");
println(helpCommands);
} else {
- println("-> Sent to " +bus.sendMsg(s)+" peers");
+ try {
+ println("-> Sent to " +bus.sendMsg(s)+" peers");
+ } catch (IvyException ie) {
+ println("-> not sent, the line contains incorrect characters");
+ }
}
} // parseCommand
+ public void bindPerformed(IvyClient client,int id,String re) {
+ println(client.getApplicationName() + " subscribes to " +re );
+ }
+
+ public void unbindPerformed(IvyClient client,int id,String re) {
+ println(client.getApplicationName() + " unsubscribes to " +re );
+ }
+
public void connect(IvyClient client) {
println(client.getApplicationName() + " connected " );
- for (java.util.Enumeration e=client.getRegexps();e.hasMoreElements();)
- println(client.getApplicationName() + " subscribes to " +e.nextElement() );
+ // for (java.util.Enumeration e=client.getRegexps();e.hasMoreElements();)
+ // println(client.getApplicationName() + " subscribes to " +e.nextElement() );
}
public void disconnect(IvyClient client) {
println(client.getApplicationName() + " disconnected " );
}
- public void die(IvyClient client, int id) {
- println("received die msg from " + client.getApplicationName() +" good bye");
+ public void die(IvyClient client, int id,String msgarg) {
+ println("received die msg from " + client.getApplicationName() +" with the message: "+msgarg+", good bye");
/* I cannot stop the readLine(), because it is native code */
if (exitOnDie) System.exit(0);
}
public void directMessage(IvyClient client, int id, String arg) {
- println(client.getApplicationName() + " direct Message "+ id + arg );
+ println(client.getApplicationName() + " sent the direct Message, id: "+ id + ", arg: "+ arg );
}
public void receive(IvyClient client, String[] args) {
diff --git a/src/SelfIvyClient.java b/src/SelfIvyClient.java
new file mode 100644
index 0000000..09d9f80
--- /dev/null
+++ b/src/SelfIvyClient.java
@@ -0,0 +1,144 @@
+/**
+ * A private Class for ourself on the bus
+ *
+ * @author Yannick Jestin
+ * @author http://www.tls.cena.fr/products/ivy/
+ * @since 1.2.4
+ *
+ * CHANGELOG:
+ * 1.2.4:
+ * - adds a the threaded option for callbacks
+ * - Matthieu's bugreport on unBindMsg()
+ */
+
+package fr.dgac.ivy ;
+import java.util.*;
+import gnu.regexp.*;
+
+class SelfIvyClient extends IvyClient {
+
+ private Ivy bus;
+ private static int serial=0; /* an unique ID for each regexp */
+ private Hashtable callbacks=new Hashtable();
+ private Hashtable threadedFlag=new Hashtable();
+ private boolean massThreaded=false;
+
+ public void sendDirectMsg(int id,String message) {
+ bus.directMessage(this,id,message);
+ }
+ public void sendDie(String message) { bus.dieReceived(this,0,message); }
+
+ protected SelfIvyClient(Ivy bus,String appName) {
+ this.bus=bus;
+ this.protocol=Ivy.PROTOCOLVERSION;
+ this.appName=appName;
+ }
+
+ synchronized protected int bindMsg(String sregexp, IvyMessageListener callback, boolean threaded ) throws IvyException {
+ // creates a new binding (regexp,callback)
+ try {
+ RE re=new RE(sregexp);
+ Integer key = new Integer(serial++);
+ regexps.put(key,re);
+ regexpsText.put(key,sregexp);
+ callbacks.put(key,callback);
+ threadedFlag.put(key,new Boolean(threaded));
+ return key.intValue();
+ } catch (REException ree) {
+ throw new IvyException("Invalid regexp " + sregexp);
+ }
+ }
+
+ synchronized protected void unBindMsg(int id) throws IvyException {
+ Integer key = new Integer(id);
+ if ( ( regexps.remove(key) == null )
+ || (regexpsText.remove(key) == null )
+ || (callbacks.remove(key) == null )
+ || (threadedFlag.remove(key) == null )
+ )
+ throw new IvyException("client wants to remove an unexistant regexp "+id);
+ }
+
+ synchronized protected boolean unBindMsg(String re) {
+ for (Enumeration e=regexpsText.keys();e.hasMoreElements();) {
+ Integer k = (Integer)e.nextElement();
+ if ( ((String)regexps.get(k)).compareTo(re) == 0) {
+ try {
+ bus.unBindMsg(k.intValue());
+ } catch (IvyException ie) {
+ return false;
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected int sendSelfMsg(String message) {
+ int count = 0;
+ for (Enumeration e = regexps.keys();e.hasMoreElements();) {
+ Integer key = (Integer)e.nextElement();
+ RE regexp = (RE)regexps.get(key);
+ String sre = (String)regexpsText.get(key);
+ int nb = regexp.getNumSubs();
+ REMatch result = regexp.getMatch(message);
+ if (result==null) continue;
+ count++;
+ callCallback(this,key,toArgs(nb,result));
+ }
+ return count;
+ }
+
+ protected void callCallback(IvyClient client, Integer key, String[] tab) {
+ IvyMessageListener callback=(IvyMessageListener)callbacks.get(key);
+ boolean threaded=((Boolean)threadedFlag.get(key)).booleanValue();
+ if (callback==null) {
+ System.out.println("(callCallback) Not regexp matching id "+key.intValue());
+ System.exit(0);
+ }
+ if (!threaded) {
+ // runs the callback in the same thread
+ callback.receive(client, tab);
+ } else {
+ // starts a new Thread for each callback ...
+ new Runner(callback,client,tab);
+ }
+ }
+
+ private String[] toArgs(int nb,REMatch result) {
+ String[] args = new String[nb];
+ for(int sub=1;sub<=nb;sub++) {
+ args[sub-1]=result.toString(sub);
+ if (bus.doProtectNewlines) args[sub-1]=decode(args[sub-1]);
+ }
+ return args;
+ }
+
+ public String toString() { return "IvyClient (ourself)"+bus.appName+":"+appName; }
+
+ // a class to perform the threaded execution of each new message
+ // this is an experimental feature introduced in 1.2.4
+ class Runner implements Runnable {
+ IvyMessageListener cb;
+ IvyClient c;
+ String[] args;
+ private Thread t;
+ public Runner(IvyMessageListener cb,IvyClient c,String[] args) {
+ this.cb=cb;
+ this.args=args;
+ this.c=c;
+ t=new Thread(Runner.this);
+ bus.registerThread(t);
+ t.start();
+ bus.unRegisterThread(t);
+ }
+ public void run() { cb.receive(c,args); }
+ } // class Runner
+
+ private void traceDebug(String s){
+ if (debug)
+ System.out.println("-->SelfIvyClient "+bus.appName+":"+appName+"<-- "+s);
+ }
+
+}
+/* EOF */
--
cgit v1.1