aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/Ivy.java150
-rwxr-xr-xsrc/IvyClient.java223
-rwxr-xr-xsrc/IvyWatcher.java78
-rw-r--r--src/Makefile5
-rw-r--r--src/Probe.java1
-rw-r--r--src/SelfIvyClient.java17
6 files changed, 243 insertions, 231 deletions
diff --git a/src/Ivy.java b/src/Ivy.java
index 3486765..196c3b4 100755
--- a/src/Ivy.java
+++ b/src/Ivy.java
@@ -14,11 +14,14 @@
*
* CHANGELOG:
* 1.2.6:
+ * - added serial numbers for traceDebug
* - changed the semantic of -b a,b:port,c:otherport if no port is
* specified for a, it take the port from the next one. If none is
* specified, it takes DEFAULT_PORT
* - no more asynchronous sending of message ( async bind is ok though )
* because the tests are sooooo unsuccessful
+ * - use addElement/removeElement instead of add/remove is registering
+ * threads ( jdk1.1 backward compatibility )
* 1.2.5:
* - protection of newlines
* 1.2.4:
@@ -93,11 +96,11 @@ public class Ivy implements Runnable {
public static final String libVersion ="1.2.6";
private boolean debug;
- private static int clientSerial=0; /* an unique ID for each IvyClient */
private ServerSocket app;
private Vector watchers;
private volatile Thread serverThread; // to ensure quick communication of the end
private Hashtable clients = new Hashtable();
+ private Hashtable half = new Hashtable();
private Vector ivyApplicationListenerList = new Vector();
private Vector ivyBindListenerList = new Vector();
private Vector sendThreads = new Vector();
@@ -108,6 +111,8 @@ public class Ivy implements Runnable {
private boolean doSendToSelf = false ;
protected SelfIvyClient selfIvyClient ;
public final static int TIMEOUTLENGTH = 3000;
+ private static int serial=0;
+ private int myserial=serial++;
/**
* Readies the structures for the software bus connexion.
@@ -237,7 +242,7 @@ public class Ivy implements Runnable {
public synchronized void stop() {
if (stopped) return;
stopped=true;
- traceDebug("beginning stopping the bus");
+ traceDebug("beginning stopping");
try {
// stopping the serverThread
Thread t=serverThread;
@@ -255,7 +260,7 @@ public class Ivy implements Runnable {
} catch (IOException e) {
traceDebug("IOexception Stop ");
}
- traceDebug("the bus should have stopped so far");
+ traceDebug("end stopping");
}
/**
@@ -291,51 +296,21 @@ public class Ivy implements Runnable {
/**
* Performs a pattern matching according to everyone's regexps, and sends
- * the results to the relevant ivy agents sequentially
- *
- * @param message A String which will be compared to the regular
- * expressions of the different clients
- * @return the number of messages actually sent
- */
- public int sendMsg(String message) throws IvyException {
- return sendMsg(message,false);
- }
-
- /*
- * Performs a pattern matching according to everyone's regexps, and sends
- * the results to the relevant ivy agents, using as many threads as needed.
- *
- * disappeared in 1.2.6
- * @since 1.2.4
- * @param message A String which will be compared to the regular
- * expressions of the different clients
- * @return always returns -1
- public int sendAsyncMsg(String message,boolean async) throws IvyException {
- return sendMsg(message,true);
- }
- */
-
- /*
- * Performs a pattern matching according to everyone's regexps, and sends
* the results to the relevant ivy agents.
*
- * @since 1.2.4
* @param message A String which will be compared to the regular
* expressions of the different clients
- * @param async if true, the sending will be performed in a separate thread,
- * default is false
- * @return if async is true, always returns -1, else returns the number of messages actually sent
+ * @return returns the number of messages actually sent
*/
- protected int sendMsg(String message,boolean async) throws IvyException {
+ public int sendMsg(String message) throws IvyException {
int count = 0;
- if (async) throw new IvyException("Async sending not supported anymore");
if (doProtectNewlines)
message=IvyClient.encode(message);
else if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
throw new IvyException("newline character not allowed in Ivy messages");
for ( Enumeration e=clients.elements();e.hasMoreElements();) {
IvyClient client = (IvyClient)e.nextElement();
- count += client.sendMsg(message, async);
+ count += client.sendMsg(message);
}
if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(message);
return count;
@@ -508,10 +483,6 @@ public class Ivy implements Runnable {
}
}
- /*
- * removes a client from the list
- */
- void removeClient(IvyClient c) { clients.remove(c.getClientKey()); }
/*
* invokes the application listeners when we are summoned to die
@@ -560,14 +531,6 @@ public class Ivy implements Runnable {
// Protected methods
//
/////////////////////////////////////////////////////////////////:
-
- synchronized void addClient(Socket socket,boolean peerCalling,int protocolVersion) throws IOException {
- if (stopped) return;
- IvyClient client = new IvyClient( this, socket,peerCalling,new Integer(clientSerial++),protocolVersion);
- clients.put(client.getClientKey(),client);
- traceDebug(getClientNames());
- }
-
private static String[] myTokenize(String s,String separator) {
int index=0, last=0, length=s.length();
Vector v = new Vector();
@@ -595,70 +558,87 @@ public class Ivy implements Runnable {
}
- /*
- * prevents two clients from connecting to each other at the same time
- * there might still be a lingering bug here, that we could avoid with the
- * SchizoToken.
- */
- boolean checkConnected( IvyClient clnt ) {
- if ( clnt.getAppPort() == 0 ) return false;
- for (Enumeration e=clients.elements();e.hasMoreElements();) {
- IvyClient client = (IvyClient)e.nextElement();
- if ( clnt != client && client.sameClient( clnt ) ) return true;
- }
- return false;
+ void addClient(IvyClient c) {
+ clients.put(c.getClientKey(),c);
+ traceDebug("added "+c+" in clients: "+getClientNames(clients));
+ }
+ void removeClient(IvyClient c) {
+ clients.remove(c.getClientKey());
+ traceDebug("removed "+c+" from clients: "+getClientNames(clients));
+ }
+ void addHalf(IvyClient c) {
+ half.put(c.getClientKey(),c);
+ traceDebug("added "+c+" in half: "+getClientNames(half));
+ }
+ void removeHalf(IvyClient c) {
+ half.remove(c.getClientKey());
+ traceDebug("removed "+c+" from half: "+getClientNames(half));
+ }
+
+ boolean shouldIleave(IvyClient ic) {
+ traceDebug("looking for "+ic+" in "+getClientNames(half)+" and "+getClientNames(clients));
+ IvyClient peer=searchPeer(ic);
+ if (peer==null) return false;
+ boolean shoulda=peer.compareTo(ic)>0;
+ traceDebug(ic+" "+ic.toStringExt()+((shoulda)?" must leave ":" must not leave"));
+ traceDebug(peer+" "+peer.toStringExt()+((!shoulda)?" must leave ":" must not leave"));
+ return shoulda;
+ }
+
+ private IvyClient searchPeer(IvyClient ic) {
+ IvyClient peer;
+ for (Enumeration e=half.elements();e.hasMoreElements();)
+ if ((peer=(IvyClient)e.nextElement()).equals(ic)) return peer;
+ for (Enumeration e=clients.elements();e.hasMoreElements();)
+ if ((peer=(IvyClient)e.nextElement()).equals(ic)) return peer;
+ return null;
}
/*
* the service socket thread reader main loop
*/
public void run() {
- traceDebug("Ivy service Thread started"); // THREADDEBUG
+ traceDebug("service thread started"); // THREADDEBUG
Thread thisThread=Thread.currentThread();
while(thisThread==serverThread){
try {
Socket socket = app.accept();
if ((thisThread!=serverThread)||stopped) break; // early disconnexion
- addClient(socket,true,0); // the peer called me TODO I can't know his protocol version
+ new IvyClient(this,socket,0); // the peer called me
} catch (InterruptedIOException ie) {
if (thisThread!=serverThread) break;
} catch( IOException e ) {
if (serverThread==thisThread) {
traceDebug("Error IvyServer exception: " + e.getMessage());
System.out.println("Ivy server socket reader caught an exception " + e.getMessage());
- System.out.println("this is probably a bug in your JVM !");
- // e.printStackTrace();
+ System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)");
+ System.exit(0);
} else { traceDebug("my server socket has been closed"); }
}
}
- traceDebug("Ivy service Thread stopped"); // THREADDEBUG
+ traceDebug("service thread stopped"); // THREADDEBUG
}
+ protected int getSerial() { return myserial ; }
+ private void traceDebug(String s){
+ if (debug) System.out.println("-->Ivy["+myserial+"]<-- "+s);
+ }
- private void traceDebug(String s){ if (debug) System.out.println("-->ivy<-- "+s); }
-
- synchronized void registerThread(Thread t) { sendThreads.add(t); }
- synchronized void unRegisterThread(Thread t) { sendThreads.remove(t); }
+ // stuff to guarantee that all the treads have left
+ synchronized void registerThread(Thread t) { sendThreads.addElement(t); }
+ synchronized void unRegisterThread(Thread t) { sendThreads.removeElement(t); }
synchronized Thread getOneThread() {
if (sendThreads.size()==0) return null;
return (Thread) sendThreads.firstElement();
}
- void waitForAll() {
- Thread t;
- traceDebug("DEVELOPMENT WAITFORALL sendThreads size : " + sendThreads.size());
- try { while ((t=getOneThread())!=null) { t.join(); } }
- catch (InterruptedException ie) { System.out.println("waitForAll Interrupted"); }
- traceDebug("DEVELOPMENT END WAITFORALL");
- }
-
- /* a small private method for debbugging purposes */
- private String getClientNames() {
- String s = appName+" clients are: ";
- for (Enumeration e=clients.elements();e.hasMoreElements();){
- s+=((IvyClient)e.nextElement()).getApplicationName()+" ";
+ // a small private method for debbugging purposes
+ private String getClientNames(Hashtable t) {
+ String s = "(";
+ for (Enumeration e=t.elements();e.hasMoreElements();){
+ s+=((IvyClient)e.nextElement()).getApplicationName()+",";
}
- return s;
+ return s+")";
}
public String domains(String toparse) {
@@ -681,9 +661,7 @@ public class Ivy implements Runnable {
}
- /*
- * unitary test. Normally, running java fr.dgac.ivy.Ivy should stop in 2.3 seconds :)
- */
+ // test. Normally, running java fr.dgac.ivy.Ivy should stop in 2.3 seconds :)
public static void main(String[] args) {
Ivy bus = new Ivy("Test Unitaire","TU ready",null);
try {
diff --git a/src/IvyClient.java b/src/IvyClient.java
index d3deb8b..753184a 100755
--- a/src/IvyClient.java
+++ b/src/IvyClient.java
@@ -11,10 +11,17 @@
*
* CHANGELOG:
* 1.2.6
+ * - major cleanup to handle simultaneous connections, e.g., between two
+ * busses within the same process ( AsyncAPI test is very stressful )
+ * I made an assymetric processing to elect the client that should
+ * disconnect based on the socket ports ... might work...
+ * - jakarta regexp are not meant to be threadsafe, so for match() and
+ * compile() must be enclaused in a synchronized block
* - now sends back an error message when an incorrect regexp is sent
* the message is supposed to be readable
+ * - sendMsg has no more async parameter
* 1.2.5:
- * - no more java ping ...
+ * - no more java ping pong
* 1.2.5:
* - use org.apache.regexp instead of gnu-regexp
* http://jakarta.apache.org/regexp/apidocs/
@@ -58,7 +65,6 @@ import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.*;
-/* import gnu.regexp.*; GNURETOAPACHERE */
import org.apache.regexp.*;
public class IvyClient implements Runnable {
@@ -70,8 +76,7 @@ public class IvyClient implements Runnable {
final static int Error = 3; /* error message */
final static int DelRegexp = 4;/* the peer removes one of his regex */
final static int EndRegexp = 5;/* no more regexp in the handshake */
- final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */
- /* SchizoToken is aka BeginRegexp in other implementations */
+ final static int SchizoToken = 6; /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */
final static int DirectMsg = 7;/* the peer sends a direct message */
final static int Die = 8; /* the peer wants us to quit */
final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
@@ -83,55 +88,81 @@ public class IvyClient implements Runnable {
final static char newLineChar = '\n';
// private variables
+ private static Integer csMutex=new Integer(0);
+ private static int clientSerial=0; /* an unique ID for each IvyClient */
+
private String messages_classes[] = null;
private Ivy bus;
private Socket socket;
private BufferedReader in;
private OutputStream out;
- private int appPort;
- private boolean peerCalling;
+ private int remotePort=0;
private volatile Thread clientThread;// volatile to ensure the quick communication
- private Integer clientKey ;
+ private Integer clientKey;
private boolean discCallbackPerformed = false;
// protected variables
- String appName;
+ String appName="none";
Hashtable regexps = new Hashtable();
Hashtable regexpsText = new Hashtable();
static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
int protocol;
+ private boolean incoming;
- IvyClient(){}
+ IvyClient() { }
- IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException {
- appName = "Unknown";
- appPort = 0;
+ IvyClient(Ivy bus, Socket socket,int remotePort) throws IOException {
+ synchronized(csMutex) { clientKey=new Integer(clientSerial++); }
this.bus = bus;
- this.socket = socket;
- this.peerCalling=peerCalling;
- this.clientKey=clientKey;
- this.protocol=protocol;
+ this.remotePort = remotePort;
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = socket.getOutputStream();
+ incoming=(remotePort==0);
+ traceDebug(((incoming)?"incoming":"outgoing")+" connection on "+socket);
+ this.socket = socket;
+ if (!incoming) { // outgoing connexion
+ synchronized(bus) {
+ bus.addHalf(this); // register a half connexion
+ if (bus.shouldIleave(this)) {
+ traceDebug(toStringExt()+" should leave ...");
+ close(false);
+ bus.removeHalf(this);
+ return;
+ }
+ sendSchizo();
+ // the registering will take place at the reception of the regexps...
+ }
+ }
+ clientThread = new Thread(this); // clientThread handles the incoming traffic
+ clientThread.start();
+ }
+
+ // sends our ID, whether we initiated the connexion or not
+ // the ID is the couple "host name,application Port", the host name
+ // information is in the socket itself, the port is not known if we
+ // initiate the connexion
+ private void sendSchizo() throws IOException {
+ traceDebug("sending our service port "+bus.applicationPort);
Hashtable tosend=bus.selfIvyClient.regexpsText;
- // sends our ID, whether we initiated the connexion or not
- // the ID is the couple "host name,application Port", the host name
- // information is in the socket itself, the port is not known if we
- // initiate the connexion
sendString(SchizoToken,bus.applicationPort,bus.appName);
- // sends our regexps to the peer
for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) {
Integer ikey = (Integer)e.nextElement();
sendRegexp(ikey.intValue(),(String)tosend.get(ikey));
}
sendString( EndRegexp,0,"");
- // spawns a thread to manage the incoming traffic on this
- // socket. We should be ready to receive messages now.
- clientThread = new Thread(this);
- clientThread.start();
}
- public String toString() { return "IvyClient "+bus.appName+":"+appName; }
+ synchronized private void handShake() {
+ synchronized(bus) {
+ bus.removeHalf(this);
+ bus.addClient(this);
+ }
+ }
+
+ public String toString() { return "IC["+clientKey+","+bus.getSerial()+"] "+bus.appName+":"+appName+":"+remotePort; }
+ public String toStringExt() {
+ return "client socket:"+socket+", remoteport:" + remotePort;
+ }
/**
* returns the name of the remote agent.
@@ -175,7 +206,6 @@ public class IvyClient implements Runnable {
/* closes the connexion to the peer */
protected void close(boolean notify) throws IOException {
- bus.waitForAll();
traceDebug("closing connexion to "+appName);
if (notify) sendBye("hasta la vista");
stopListening();
@@ -214,32 +244,19 @@ public class IvyClient implements Runnable {
static String decode(String s) { return s.replace(escapeChar,'\n'); }
static String encode(String s) { return s.replace('\n',escapeChar); }
Integer getClientKey() { return clientKey ; }
- int getAppPort() { return appPort ; }
- void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);}
- void delRegexp(int id) {sendString(DelRegexp,id,"");}
-
- int sendMsg(String message,boolean async) {
- if (async) {
- new Sender(message);
- return -1;
- } else { return sendMsg(message); }
- }
+ protected void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);}
+ protected void delRegexp(int id) {sendString(DelRegexp,id,"");}
- private int sendMsg(String message) {
+ protected int sendMsg(String message) {
int count = 0;
for (Enumeration e = regexps.keys();e.hasMoreElements();) {
Integer key = (Integer)e.nextElement();
RE regexp = (RE)regexps.get(key);
- /*
- * GNURETOAPACHERE
- int nb = regexp.getNumSubs();
- REMatch result = regexp.getMatch(message);
- if (result==null) continue; // no match
- count++; // match
- send(Msg,key,regexp.getNumSubs(),result);
- *
- */
- if (!regexp.match(message)) continue; // no match
+ // re.match fails sometimes when it is called concurrently ..
+ // see 28412 on jakarta regexp bugzilla
+ synchronized (regexp) {
+ if (!regexp.match(message)) continue; // no match
+ }
count++; // match
sendResult(Msg,key,regexp);
}
@@ -262,25 +279,40 @@ public class IvyClient implements Runnable {
/*
* compares two peers the id is the couple (host,service port).
- * true if the peers are similir. This should not happen, it is bad
+ * true if the peers are similar. This should not happen, it is bad
*/
- protected boolean sameClient( IvyClient clnt ) {
- return ( appPort != 0 && appPort == clnt.appPort )
- && ( getRemoteAddress() == clnt.getRemoteAddress() ) ;
+ protected int compareTo(IvyClient clnt) {
+ // return clnt.clientKey.compareTo(clientKey); // Wrong. it's random...
+ return (clnt.socket.getPort()-socket.getLocalPort());
+ }
+
+ protected boolean equals(IvyClient clnt) {
+ if (clnt==this) return false;
+ // TODO go beyond the port number ! add some host processing, cf:
+ // IvyWatcher ...
+ if (remotePort==clnt.remotePort) return true;
+ /*
+ e.g.
+ if (socket.getInetAddress()==null) return false;
+ if (clnt.socket.getInetAddress()==null) return false;
+ if (!socket.getInetAddress().equals(clnt.socket.getInetAddress())) return false;
+ */
+ return false;
}
/*
* the code of the thread handling the incoming messages.
*/
public void run() {
+ traceDebug("Thread started");
Thread thisThread = Thread.currentThread();
String msg = null;
try {
- traceDebug("Connected from "+ socket.getInetAddress().getHostName()+ ":"+socket.getPort());
+ traceDebug("connection established with "+
+ socket.getInetAddress().getHostName()+ ":"+socket.getPort());
} catch (Exception ie) {
traceDebug("Interrupted while resolving remote hostname");
}
- traceDebug("Thread started");
while (clientThread==thisThread) {
try {
if ((msg=in.readLine()) != null ) {
@@ -294,21 +326,23 @@ public class IvyClient implements Runnable {
break;
}
} catch (IvyException ie) {
- traceDebug("IvyClient caught an exception");
+ traceDebug("caught an IvyException");
ie.printStackTrace();
} catch (InterruptedIOException ioe) {
traceDebug("I have been interrupted. I'm about to leave my thread loop");
if (thisThread!=clientThread) break;
} catch (IOException e) {
- traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort());
+ if (clientThread!=null) {
+ traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort());
+ }
break;
}
}
traceDebug("normally Disconnected from "+ appName);
- traceDebug("Thread stopped");
// invokes the disconnect applicationListeners
if (!discCallbackPerformed) bus.clientDisconnects(this);
discCallbackPerformed=true;
+ traceDebug("Thread stopped");
}
private synchronized void sendBuffer( String buffer ) throws IvyException {
@@ -340,21 +374,18 @@ public class IvyClient implements Runnable {
}
}
- /* GNURETOAPACHERE
- private void send(int type, Integer id, int nbsub, REMatch result) {
- */
-
private void sendResult(int type,Integer id, RE regexp) {
- String buffer = type+" "+id+StartArg;
- /*
- for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1)
- */
- for(int i=1;i<regexp.getParenCount();i++) buffer+=regexp.getParen(i)+EndArg;
try {
+ String buffer = type+" "+id+StartArg;
+ for(int i=1;i<regexp.getParenCount();i++)
+ buffer+=regexp.getParen(i)+EndArg;
sendBuffer(buffer);
} catch (IvyException ie ) {
System.err.println("received an exception: " + ie.getMessage());
ie.printStackTrace();
+ } catch (StringIndexOutOfBoundsException sioobe) {
+ System.out.println("arg: "+regexp.getParenCount()+" "+regexp);
+ sioobe.printStackTrace();
}
}
@@ -461,10 +492,6 @@ public class IvyClient implements Runnable {
break;
case EndRegexp:
bus.clientConnects(this);
- /* TODO check with the protocol itself.
- * the peer is perhaps not ready to handle this message
- * an assymetric processing should be written
- */
if (bus.ready_message!=null) sendMsg(bus.ready_message);
break;
case Msg:
@@ -487,16 +514,33 @@ public class IvyClient implements Runnable {
String error=s.substring(from,b.length);
traceDebug("Error msg "+msgId+" "+error);
break;
- case SchizoToken: // aka BeginRegexp in other implementations
+ case SchizoToken: // aka BeginRegexp in other implementations, or MsgSync
appName=s.substring(from,b.length);
- appPort=msgId.intValue();
- if ( bus.checkConnected(this) ) {
- try {
- close(false);
- } catch (IOException ioe) {
- throw new IvyException("io " + ioe.getMessage());
+ remotePort=msgId.intValue();
+ traceDebug("the peer sent his service port: "+remotePort);
+ if (incoming) {
+ // incoming connexion, I wait for his token to send him mine ...
+ synchronized(bus) {
+ bus.addHalf(this);
+ try {
+ // there is another connexion. Should I leave ?
+ // Assymetric processing to prevent concurrent disconnexions
+ if (bus.shouldIleave(this)) {
+ traceDebug(toStringExt()+" should leave ...");
+ close(false);
+ bus.removeHalf(this);
+ return false;
+ }
+ sendSchizo();
+ handShake();
+ } catch (IOException ioe) {
+ throw new IvyException(ioe.toString());
+ }
}
- throw new IvyException("Rare ! A concurrent connect occured");
+ } else {
+ // outgoing connexion
+ // I already have sent him a token
+ handShake();
}
break;
case DirectMsg:
@@ -513,31 +557,14 @@ public class IvyClient implements Runnable {
private void sendBye() {sendString(Bye,0,"");}
private void sendBye(String message) {sendString(Bye,0,message);}
- private InetAddress getRemoteAddress() { return socket.getInetAddress(); }
-
private void traceDebug(String s){
- if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s);
+ if (debug) System.out.println("-->IvyClient["+clientKey+","+bus.getSerial()+"] "+bus.appName+" (remote "+appName+")<-- "+s);
}
private void traceDebug(String[] tab){
- String s = "String array " + tab.length + " elements: ";
+ String s = " string array " + tab.length + " elements: ";
for (int i=0;i<tab.length;i++) s+="("+tab[i]+") ";
- if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s);
+ traceDebug(s);
}
- // a class to perform the threaded execution of each new message
- // this is an experimental feature introduced in 1.2.4
- class Sender implements Runnable {
- String message;
- private Thread t;
- public Sender(String message) {
- this.message=message;
- t=new Thread(Sender.this);
- bus.registerThread(t);
- t.start();
- bus.unRegisterThread(t);
- }
- public void run() { sendMsg(message); }
- } // class Sender
-
}
diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java
index dafa6f9..3ac5ef9 100755
--- a/src/IvyWatcher.java
+++ b/src/IvyWatcher.java
@@ -14,6 +14,9 @@
* thing.
*
* CHANGELOG:
+ * 1.2.6:
+ * - IOException now goes silent when we asked the bus to stop()
+ * - use a new buffer for each Datagram received, to prevent an old bug
* 1.2.5:
* - getDomain now sends IvyException for malformed broadcast addresses
* - uses apache jakarta-regexp instead of gnu-regexp
@@ -56,7 +59,6 @@ import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.StringTokenizer;
-/* import gnu.regexp.*; GNURETOAPACHERE */
import org.apache.regexp.*;
import java.util.Vector;
import java.util.Enumeration;
@@ -71,6 +73,8 @@ class IvyWatcher implements Runnable {
private int port;
private volatile Thread listenThread;
private InetAddress group;
+ private static int serial=0;
+ private int myserial=serial++;
/**
* creates an Ivy watcher
@@ -103,55 +107,57 @@ class IvyWatcher implements Runnable {
* the behaviour of each thread watching the UDP socket.
*/
public void run() {
- traceDebug("IvyWatcher Thread started"); // THREADDEBUG
+ traceDebug("Thread started"); // THREADDEBUG
Thread thisThread=Thread.currentThread();
traceDebug("beginning of a watcher Thread");
- byte buf[] = new byte[256];
- DatagramPacket packet=new DatagramPacket(buf, 256);
InetAddress remotehost=null;
try {
while( listenThread==thisThread ) {
try {
+ byte buf[] = new byte[256];
+ DatagramPacket packet=new DatagramPacket(buf,buf.length);
broadcast.receive(packet);
if (listenThread!=thisThread) break; // I was summoned to leave during the receive
- String msg = new String(packet.getData()) ;
- for (int i=0;i<buf.length;i++) { buf[i]=10; }
- // clean up the buffer after each message
- // BUGFIX ? I change 0 to 10 in order to avoid a bug
- remotehost = packet.getAddress();
- traceDebug("BUSWATCHER Receive Broadcast from "+remotehost.getHostName()+":"+packet.getPort());
- // if ( !isInDomain( remotehost ) ) continue;
+ String msg = new String(buf,0,packet.getLength());
+ String remotehostname=null;
try {
+ remotehost = packet.getAddress();
+ remotehostname = remotehost.getHostName();
RE re = new RE("([0-9]*) ([0-9]*)");
if (!(re.match(msg))) {
- System.err.println("Ignoring bad format broadcast from "+remotehost);
+ System.err.println("Ignoring bad format broadcast from "+
+ remotehostname+":"+packet.getPort());
continue;
}
int version = Integer.parseInt(re.getParen(1));
if ( version < bus.PROTOCOLMINIMUM ) {
- System.err.println("Ignoring bad protocol version "+remotehost+" we need "+ bus.PROTOCOLMINIMUM+" minimum");
+ System.err.println("Ignoring bad format broadcast from "+
+ remotehostname+":"+packet.getPort()
+ +" protocol version "+remotehost+" we need "+bus.PROTOCOLMINIMUM+" minimum");
continue;
}
int port = Integer.parseInt(re.getParen(2));
- // allows the connexion from a remote host with the same port number
- // if ( ( (remotehost.equals(localhost)) || (remotehost.equals(loopback)) )
- // && (bus.applicationPort==port)) {
if (bus.applicationPort==port) {
- traceDebug("ignoring a broadcast on my port number, it's *probably* me");
- continue; // it's me
+ traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort()
+ +" on my port number ("+port+"), it's probably me");
+ // TODO check better
+ // if bus.applicationPort=port
+ // parse the list of Watchers and check for each
+ // iw.broadcast.getInetAddress().equals(packet().getAddress()
+ // if one is true, "continue" ( ignore the broadcast )
+ continue;
}
- traceDebug("Broadcast de " +packet.getAddress().getHostName()
- +":"+packet.getPort()+" port "+port+" version "+version);
- Socket socket = new Socket( remotehost, port );
- bus.addClient(socket,false,version);
+ traceDebug("broadcast accepted from " +remotehostname
+ +":"+packet.getPort()+", port:"+port+", protocol version:"+version);
+ new IvyClient(bus,new Socket(remotehost,port),port);
} catch (RESyntaxException ree) {
ree.printStackTrace();
System.exit(-1);
} catch (NumberFormatException nfe) {
- System.err.println("Ignoring bad format broadcast from "+remotehost);
+ System.err.println("Ignoring bad format broadcast from "+remotehostname);
continue;
} catch ( UnknownHostException e ) {
- System.err.println("Unkonwn host "+remotehost + e.getMessage());
+ System.err.println("Unkonwn host "+remotehost +","+e.getMessage());
} catch ( IOException e) {
System.err.println("can't connect to "+remotehost+" port "+ port+e.getMessage());
}
@@ -161,26 +167,24 @@ class IvyWatcher implements Runnable {
} // while
} catch (java.net.SocketException se ){
if (thisThread==listenThread) {
- System.out.println("IvyWatcher error, continuing anyway");
- se.printStackTrace();
+ traceDebug("socket exception, continuing anyway on other Ivy domains "+se);
}
} catch (IOException ioe ){
- System.out.println("IvyWatcher IO Exception, continuing anyway");
- ioe.printStackTrace();
+ System.out.println("IO Exception, continuing anyway on other Ivy domains "+ioe);
}
- traceDebug("IvyWatcher Thread stopped"); // THREADDEBUG
+ traceDebug("Thread stopped"); // THREADDEBUG
}
/**
* stops the thread waiting on the broadcast socket
*/
synchronized void stop() {
- traceDebug("begining stopping an IvyWatcher");
+ traceDebug("begining stopping");
Thread t = listenThread;
listenThread=null;
broadcast.close();
if (t!=null) { t.interrupt(); } // it might not even have been created
- traceDebug("ending stopping an IvyWatcher");
+ traceDebug("stopped");
}
private class PacketSender implements Runnable {
@@ -201,10 +205,11 @@ class IvyWatcher implements Runnable {
e.printStackTrace();
traceDebug("IO interrupted during the broadcast. Do nothing");
} catch ( IOException e ) {
- System.out.println("Broadcast Error" + e.getMessage());
- e.printStackTrace();
- // throw new IvyException("Broadcast error " + e.getMessage() );
- System.exit(0);
+ if (listenThread!=null) {
+ System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway");
+ // cannot throw new IvyException in a run ...
+ e.printStackTrace();
+ }
}
traceDebug("PacketSender thread stopped"); // THREADDEBUG
}
@@ -274,9 +279,8 @@ class IvyWatcher implements Runnable {
private void traceDebug(String s){
- if (debug) System.out.println("-->ivywatcher<-- "+s);
+ if (debug) System.out.println("-->IvyWatcher["+myserial+","+bus.getSerial()+"]<-- "+s);
}
} // class IvyWatcher
-/* EOF */
diff --git a/src/Makefile b/src/Makefile
index 8a12da1..5d0dbe2 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -1,6 +1,7 @@
-GNUPATH=/usr/share/java/gnu-getopt.jar:/usr/share/java/regexp.jar
-#GNUPATH=/usr/share/java/repository:/usr/share/java/gnu.getopt.0.9.jar:/usr/share/java/gnu-regexp-1.1.3.jar
+GNUPATH=/usr/share/java/gnu-getopt.jar:/usr/share/java/regexp.jar # debian SID
+#GNUPATH=/usr/share/java/repository:/usr/share/java/gnu.getopt.jar:/usr/share/java/regexp.jar # debian woody
#GNUPATH=${HOME}/java/jars/gnu-getopt.jar:${HOME}/java/jars/regexp.jar
+#GNUPATH=../bundle
#######################################
# generic setup
diff --git a/src/Probe.java b/src/Probe.java
index c83b9ac..e086560 100644
--- a/src/Probe.java
+++ b/src/Probe.java
@@ -45,7 +45,6 @@ package fr.dgac.ivy ;
import java.io.*;
import java.util.*;
import gnu.getopt.Getopt;
-/* import gnu.regexp.*; GNURETOAPACHERE */
import org.apache.regexp.*;
public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBindListener, Runnable {
diff --git a/src/SelfIvyClient.java b/src/SelfIvyClient.java
index d6e6f1e..a96e0d2 100644
--- a/src/SelfIvyClient.java
+++ b/src/SelfIvyClient.java
@@ -6,6 +6,8 @@
* @since 1.2.4
*
* CHANGELOG:
+ * 1.2.6:
+ * - jakarta regexp are not threadsafe, adding extra synch blocks
* 1.2.5:
* - uses apache regexp instead of gnu regexp
* 1.2.4:
@@ -15,7 +17,6 @@
package fr.dgac.ivy ;
import java.util.*;
-/* import gnu.regexp.*; GNURETOAPACHERE */
import org.apache.regexp.*;
class SelfIvyClient extends IvyClient {
@@ -83,9 +84,11 @@ class SelfIvyClient extends IvyClient {
Integer key = (Integer)e.nextElement();
RE regexp = (RE)regexps.get(key);
String sre = (String)regexpsText.get(key);
- if (!regexp.match(message)) continue;
- count++;
- callCallback(this,key,toArgs(regexp));
+ synchronized(regexp) {
+ if (!regexp.match(message)) continue;
+ count++;
+ callCallback(this,key,toArgs(regexp));
+ }
}
return count;
}
@@ -101,7 +104,7 @@ class SelfIvyClient extends IvyClient {
// runs the callback in the same thread
callback.receive(client, tab);
} else {
- // starts a new Thread for each callback ...
+ // starts a new Thread for each callback ... ( Async API )
new Runner(callback,client,tab);
}
}
@@ -124,10 +127,10 @@ class SelfIvyClient extends IvyClient {
IvyClient c;
String[] args;
private Thread t;
- public Runner(IvyMessageListener cb,IvyClient c,String[] args) {
+ public Runner(IvyMessageListener cb,IvyClient c,String[] a) {
this.cb=cb;
- this.args=args;
this.c=c;
+ args=a;
t=new Thread(Runner.this);
bus.registerThread(t);
t.start();