aboutsummaryrefslogtreecommitdiff
path: root/src/IvyClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/IvyClient.java')
-rwxr-xr-xsrc/IvyClient.java223
1 files changed, 125 insertions, 98 deletions
diff --git a/src/IvyClient.java b/src/IvyClient.java
index d3deb8b..753184a 100755
--- a/src/IvyClient.java
+++ b/src/IvyClient.java
@@ -11,10 +11,17 @@
*
* CHANGELOG:
* 1.2.6
+ * - major cleanup to handle simultaneous connections, e.g., between two
+ * busses within the same process ( AsyncAPI test is very stressful )
+ * I made an assymetric processing to elect the client that should
+ * disconnect based on the socket ports ... might work...
+ * - jakarta regexp are not meant to be threadsafe, so for match() and
+ * compile() must be enclaused in a synchronized block
* - now sends back an error message when an incorrect regexp is sent
* the message is supposed to be readable
+ * - sendMsg has no more async parameter
* 1.2.5:
- * - no more java ping ...
+ * - no more java ping pong
* 1.2.5:
* - use org.apache.regexp instead of gnu-regexp
* http://jakarta.apache.org/regexp/apidocs/
@@ -58,7 +65,6 @@ import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.*;
-/* import gnu.regexp.*; GNURETOAPACHERE */
import org.apache.regexp.*;
public class IvyClient implements Runnable {
@@ -70,8 +76,7 @@ public class IvyClient implements Runnable {
final static int Error = 3; /* error message */
final static int DelRegexp = 4;/* the peer removes one of his regex */
final static int EndRegexp = 5;/* no more regexp in the handshake */
- final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */
- /* SchizoToken is aka BeginRegexp in other implementations */
+ final static int SchizoToken = 6; /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */
final static int DirectMsg = 7;/* the peer sends a direct message */
final static int Die = 8; /* the peer wants us to quit */
final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
@@ -83,55 +88,81 @@ public class IvyClient implements Runnable {
final static char newLineChar = '\n';
// private variables
+ private static Integer csMutex=new Integer(0);
+ private static int clientSerial=0; /* an unique ID for each IvyClient */
+
private String messages_classes[] = null;
private Ivy bus;
private Socket socket;
private BufferedReader in;
private OutputStream out;
- private int appPort;
- private boolean peerCalling;
+ private int remotePort=0;
private volatile Thread clientThread;// volatile to ensure the quick communication
- private Integer clientKey ;
+ private Integer clientKey;
private boolean discCallbackPerformed = false;
// protected variables
- String appName;
+ String appName="none";
Hashtable regexps = new Hashtable();
Hashtable regexpsText = new Hashtable();
static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
int protocol;
+ private boolean incoming;
- IvyClient(){}
+ IvyClient() { }
- IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException {
- appName = "Unknown";
- appPort = 0;
+ IvyClient(Ivy bus, Socket socket,int remotePort) throws IOException {
+ synchronized(csMutex) { clientKey=new Integer(clientSerial++); }
this.bus = bus;
- this.socket = socket;
- this.peerCalling=peerCalling;
- this.clientKey=clientKey;
- this.protocol=protocol;
+ this.remotePort = remotePort;
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = socket.getOutputStream();
+ incoming=(remotePort==0);
+ traceDebug(((incoming)?"incoming":"outgoing")+" connection on "+socket);
+ this.socket = socket;
+ if (!incoming) { // outgoing connexion
+ synchronized(bus) {
+ bus.addHalf(this); // register a half connexion
+ if (bus.shouldIleave(this)) {
+ traceDebug(toStringExt()+" should leave ...");
+ close(false);
+ bus.removeHalf(this);
+ return;
+ }
+ sendSchizo();
+ // the registering will take place at the reception of the regexps...
+ }
+ }
+ clientThread = new Thread(this); // clientThread handles the incoming traffic
+ clientThread.start();
+ }
+
+ // sends our ID, whether we initiated the connexion or not
+ // the ID is the couple "host name,application Port", the host name
+ // information is in the socket itself, the port is not known if we
+ // initiate the connexion
+ private void sendSchizo() throws IOException {
+ traceDebug("sending our service port "+bus.applicationPort);
Hashtable tosend=bus.selfIvyClient.regexpsText;
- // sends our ID, whether we initiated the connexion or not
- // the ID is the couple "host name,application Port", the host name
- // information is in the socket itself, the port is not known if we
- // initiate the connexion
sendString(SchizoToken,bus.applicationPort,bus.appName);
- // sends our regexps to the peer
for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) {
Integer ikey = (Integer)e.nextElement();
sendRegexp(ikey.intValue(),(String)tosend.get(ikey));
}
sendString( EndRegexp,0,"");
- // spawns a thread to manage the incoming traffic on this
- // socket. We should be ready to receive messages now.
- clientThread = new Thread(this);
- clientThread.start();
}
- public String toString() { return "IvyClient "+bus.appName+":"+appName; }
+ synchronized private void handShake() {
+ synchronized(bus) {
+ bus.removeHalf(this);
+ bus.addClient(this);
+ }
+ }
+
+ public String toString() { return "IC["+clientKey+","+bus.getSerial()+"] "+bus.appName+":"+appName+":"+remotePort; }
+ public String toStringExt() {
+ return "client socket:"+socket+", remoteport:" + remotePort;
+ }
/**
* returns the name of the remote agent.
@@ -175,7 +206,6 @@ public class IvyClient implements Runnable {
/* closes the connexion to the peer */
protected void close(boolean notify) throws IOException {
- bus.waitForAll();
traceDebug("closing connexion to "+appName);
if (notify) sendBye("hasta la vista");
stopListening();
@@ -214,32 +244,19 @@ public class IvyClient implements Runnable {
static String decode(String s) { return s.replace(escapeChar,'\n'); }
static String encode(String s) { return s.replace('\n',escapeChar); }
Integer getClientKey() { return clientKey ; }
- int getAppPort() { return appPort ; }
- void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);}
- void delRegexp(int id) {sendString(DelRegexp,id,"");}
-
- int sendMsg(String message,boolean async) {
- if (async) {
- new Sender(message);
- return -1;
- } else { return sendMsg(message); }
- }
+ protected void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);}
+ protected void delRegexp(int id) {sendString(DelRegexp,id,"");}
- private int sendMsg(String message) {
+ protected int sendMsg(String message) {
int count = 0;
for (Enumeration e = regexps.keys();e.hasMoreElements();) {
Integer key = (Integer)e.nextElement();
RE regexp = (RE)regexps.get(key);
- /*
- * GNURETOAPACHERE
- int nb = regexp.getNumSubs();
- REMatch result = regexp.getMatch(message);
- if (result==null) continue; // no match
- count++; // match
- send(Msg,key,regexp.getNumSubs(),result);
- *
- */
- if (!regexp.match(message)) continue; // no match
+ // re.match fails sometimes when it is called concurrently ..
+ // see 28412 on jakarta regexp bugzilla
+ synchronized (regexp) {
+ if (!regexp.match(message)) continue; // no match
+ }
count++; // match
sendResult(Msg,key,regexp);
}
@@ -262,25 +279,40 @@ public class IvyClient implements Runnable {
/*
* compares two peers the id is the couple (host,service port).
- * true if the peers are similir. This should not happen, it is bad
+ * true if the peers are similar. This should not happen, it is bad
*/
- protected boolean sameClient( IvyClient clnt ) {
- return ( appPort != 0 && appPort == clnt.appPort )
- && ( getRemoteAddress() == clnt.getRemoteAddress() ) ;
+ protected int compareTo(IvyClient clnt) {
+ // return clnt.clientKey.compareTo(clientKey); // Wrong. it's random...
+ return (clnt.socket.getPort()-socket.getLocalPort());
+ }
+
+ protected boolean equals(IvyClient clnt) {
+ if (clnt==this) return false;
+ // TODO go beyond the port number ! add some host processing, cf:
+ // IvyWatcher ...
+ if (remotePort==clnt.remotePort) return true;
+ /*
+ e.g.
+ if (socket.getInetAddress()==null) return false;
+ if (clnt.socket.getInetAddress()==null) return false;
+ if (!socket.getInetAddress().equals(clnt.socket.getInetAddress())) return false;
+ */
+ return false;
}
/*
* the code of the thread handling the incoming messages.
*/
public void run() {
+ traceDebug("Thread started");
Thread thisThread = Thread.currentThread();
String msg = null;
try {
- traceDebug("Connected from "+ socket.getInetAddress().getHostName()+ ":"+socket.getPort());
+ traceDebug("connection established with "+
+ socket.getInetAddress().getHostName()+ ":"+socket.getPort());
} catch (Exception ie) {
traceDebug("Interrupted while resolving remote hostname");
}
- traceDebug("Thread started");
while (clientThread==thisThread) {
try {
if ((msg=in.readLine()) != null ) {
@@ -294,21 +326,23 @@ public class IvyClient implements Runnable {
break;
}
} catch (IvyException ie) {
- traceDebug("IvyClient caught an exception");
+ traceDebug("caught an IvyException");
ie.printStackTrace();
} catch (InterruptedIOException ioe) {
traceDebug("I have been interrupted. I'm about to leave my thread loop");
if (thisThread!=clientThread) break;
} catch (IOException e) {
- traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort());
+ if (clientThread!=null) {
+ traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort());
+ }
break;
}
}
traceDebug("normally Disconnected from "+ appName);
- traceDebug("Thread stopped");
// invokes the disconnect applicationListeners
if (!discCallbackPerformed) bus.clientDisconnects(this);
discCallbackPerformed=true;
+ traceDebug("Thread stopped");
}
private synchronized void sendBuffer( String buffer ) throws IvyException {
@@ -340,21 +374,18 @@ public class IvyClient implements Runnable {
}
}
- /* GNURETOAPACHERE
- private void send(int type, Integer id, int nbsub, REMatch result) {
- */
-
private void sendResult(int type,Integer id, RE regexp) {
- String buffer = type+" "+id+StartArg;
- /*
- for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1)
- */
- for(int i=1;i<regexp.getParenCount();i++) buffer+=regexp.getParen(i)+EndArg;
try {
+ String buffer = type+" "+id+StartArg;
+ for(int i=1;i<regexp.getParenCount();i++)
+ buffer+=regexp.getParen(i)+EndArg;
sendBuffer(buffer);
} catch (IvyException ie ) {
System.err.println("received an exception: " + ie.getMessage());
ie.printStackTrace();
+ } catch (StringIndexOutOfBoundsException sioobe) {
+ System.out.println("arg: "+regexp.getParenCount()+" "+regexp);
+ sioobe.printStackTrace();
}
}
@@ -461,10 +492,6 @@ public class IvyClient implements Runnable {
break;
case EndRegexp:
bus.clientConnects(this);
- /* TODO check with the protocol itself.
- * the peer is perhaps not ready to handle this message
- * an assymetric processing should be written
- */
if (bus.ready_message!=null) sendMsg(bus.ready_message);
break;
case Msg:
@@ -487,16 +514,33 @@ public class IvyClient implements Runnable {
String error=s.substring(from,b.length);
traceDebug("Error msg "+msgId+" "+error);
break;
- case SchizoToken: // aka BeginRegexp in other implementations
+ case SchizoToken: // aka BeginRegexp in other implementations, or MsgSync
appName=s.substring(from,b.length);
- appPort=msgId.intValue();
- if ( bus.checkConnected(this) ) {
- try {
- close(false);
- } catch (IOException ioe) {
- throw new IvyException("io " + ioe.getMessage());
+ remotePort=msgId.intValue();
+ traceDebug("the peer sent his service port: "+remotePort);
+ if (incoming) {
+ // incoming connexion, I wait for his token to send him mine ...
+ synchronized(bus) {
+ bus.addHalf(this);
+ try {
+ // there is another connexion. Should I leave ?
+ // Assymetric processing to prevent concurrent disconnexions
+ if (bus.shouldIleave(this)) {
+ traceDebug(toStringExt()+" should leave ...");
+ close(false);
+ bus.removeHalf(this);
+ return false;
+ }
+ sendSchizo();
+ handShake();
+ } catch (IOException ioe) {
+ throw new IvyException(ioe.toString());
+ }
}
- throw new IvyException("Rare ! A concurrent connect occured");
+ } else {
+ // outgoing connexion
+ // I already have sent him a token
+ handShake();
}
break;
case DirectMsg:
@@ -513,31 +557,14 @@ public class IvyClient implements Runnable {
private void sendBye() {sendString(Bye,0,"");}
private void sendBye(String message) {sendString(Bye,0,message);}
- private InetAddress getRemoteAddress() { return socket.getInetAddress(); }
-
private void traceDebug(String s){
- if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s);
+ if (debug) System.out.println("-->IvyClient["+clientKey+","+bus.getSerial()+"] "+bus.appName+" (remote "+appName+")<-- "+s);
}
private void traceDebug(String[] tab){
- String s = "String array " + tab.length + " elements: ";
+ String s = " string array " + tab.length + " elements: ";
for (int i=0;i<tab.length;i++) s+="("+tab[i]+") ";
- if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s);
+ traceDebug(s);
}
- // a class to perform the threaded execution of each new message
- // this is an experimental feature introduced in 1.2.4
- class Sender implements Runnable {
- String message;
- private Thread t;
- public Sender(String message) {
- this.message=message;
- t=new Thread(Sender.this);
- bus.registerThread(t);
- t.start();
- bus.unRegisterThread(t);
- }
- public void run() { sendMsg(message); }
- } // class Sender
-
}