diff options
author | jestin | 2004-07-29 17:33:08 +0000 |
---|---|---|
committer | jestin | 2004-07-29 17:33:08 +0000 |
commit | dc931b33bf8fa61c8eb4d7297e42379543274745 (patch) | |
tree | eb96e1299e8a8a6c1c46ab95010e8206c7a79824 /src/IvyClient.java | |
parent | f1b701b31a59d905cae82a752a56a3ed87a1b668 (diff) | |
download | ivy-java-dc931b33bf8fa61c8eb4d7297e42379543274745.zip ivy-java-dc931b33bf8fa61c8eb4d7297e42379543274745.tar.gz ivy-java-dc931b33bf8fa61c8eb4d7297e42379543274745.tar.bz2 ivy-java-dc931b33bf8fa61c8eb4d7297e42379543274745.tar.xz |
bugfixes majeurs, voir le changelog 1.2.6
Diffstat (limited to 'src/IvyClient.java')
-rwxr-xr-x | src/IvyClient.java | 223 |
1 files changed, 125 insertions, 98 deletions
diff --git a/src/IvyClient.java b/src/IvyClient.java index d3deb8b..753184a 100755 --- a/src/IvyClient.java +++ b/src/IvyClient.java @@ -11,10 +11,17 @@ * * CHANGELOG: * 1.2.6 + * - major cleanup to handle simultaneous connections, e.g., between two + * busses within the same process ( AsyncAPI test is very stressful ) + * I made an assymetric processing to elect the client that should + * disconnect based on the socket ports ... might work... + * - jakarta regexp are not meant to be threadsafe, so for match() and + * compile() must be enclaused in a synchronized block * - now sends back an error message when an incorrect regexp is sent * the message is supposed to be readable + * - sendMsg has no more async parameter * 1.2.5: - * - no more java ping ... + * - no more java ping pong * 1.2.5: * - use org.apache.regexp instead of gnu-regexp * http://jakarta.apache.org/regexp/apidocs/ @@ -58,7 +65,6 @@ import java.lang.Thread; import java.net.*; import java.io.*; import java.util.*; -/* import gnu.regexp.*; GNURETOAPACHERE */ import org.apache.regexp.*; public class IvyClient implements Runnable { @@ -70,8 +76,7 @@ public class IvyClient implements Runnable { final static int Error = 3; /* error message */ final static int DelRegexp = 4;/* the peer removes one of his regex */ final static int EndRegexp = 5;/* no more regexp in the handshake */ - final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */ - /* SchizoToken is aka BeginRegexp in other implementations */ + final static int SchizoToken = 6; /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */ final static int DirectMsg = 7;/* the peer sends a direct message */ final static int Die = 8; /* the peer wants us to quit */ final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ @@ -83,55 +88,81 @@ public class IvyClient implements Runnable { final static char newLineChar = '\n'; // private variables + private static Integer csMutex=new Integer(0); + private static int clientSerial=0; /* an unique ID for each IvyClient */ + private String messages_classes[] = null; private Ivy bus; private Socket socket; private BufferedReader in; private OutputStream out; - private int appPort; - private boolean peerCalling; + private int remotePort=0; private volatile Thread clientThread;// volatile to ensure the quick communication - private Integer clientKey ; + private Integer clientKey; private boolean discCallbackPerformed = false; // protected variables - String appName; + String appName="none"; Hashtable regexps = new Hashtable(); Hashtable regexpsText = new Hashtable(); static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; int protocol; + private boolean incoming; - IvyClient(){} + IvyClient() { } - IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException { - appName = "Unknown"; - appPort = 0; + IvyClient(Ivy bus, Socket socket,int remotePort) throws IOException { + synchronized(csMutex) { clientKey=new Integer(clientSerial++); } this.bus = bus; - this.socket = socket; - this.peerCalling=peerCalling; - this.clientKey=clientKey; - this.protocol=protocol; + this.remotePort = remotePort; in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = socket.getOutputStream(); + incoming=(remotePort==0); + traceDebug(((incoming)?"incoming":"outgoing")+" connection on "+socket); + this.socket = socket; + if (!incoming) { // outgoing connexion + synchronized(bus) { + bus.addHalf(this); // register a half connexion + if (bus.shouldIleave(this)) { + traceDebug(toStringExt()+" should leave ..."); + close(false); + bus.removeHalf(this); + return; + } + sendSchizo(); + // the registering will take place at the reception of the regexps... + } + } + clientThread = new Thread(this); // clientThread handles the incoming traffic + clientThread.start(); + } + + // sends our ID, whether we initiated the connexion or not + // the ID is the couple "host name,application Port", the host name + // information is in the socket itself, the port is not known if we + // initiate the connexion + private void sendSchizo() throws IOException { + traceDebug("sending our service port "+bus.applicationPort); Hashtable tosend=bus.selfIvyClient.regexpsText; - // sends our ID, whether we initiated the connexion or not - // the ID is the couple "host name,application Port", the host name - // information is in the socket itself, the port is not known if we - // initiate the connexion sendString(SchizoToken,bus.applicationPort,bus.appName); - // sends our regexps to the peer for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) { Integer ikey = (Integer)e.nextElement(); sendRegexp(ikey.intValue(),(String)tosend.get(ikey)); } sendString( EndRegexp,0,""); - // spawns a thread to manage the incoming traffic on this - // socket. We should be ready to receive messages now. - clientThread = new Thread(this); - clientThread.start(); } - public String toString() { return "IvyClient "+bus.appName+":"+appName; } + synchronized private void handShake() { + synchronized(bus) { + bus.removeHalf(this); + bus.addClient(this); + } + } + + public String toString() { return "IC["+clientKey+","+bus.getSerial()+"] "+bus.appName+":"+appName+":"+remotePort; } + public String toStringExt() { + return "client socket:"+socket+", remoteport:" + remotePort; + } /** * returns the name of the remote agent. @@ -175,7 +206,6 @@ public class IvyClient implements Runnable { /* closes the connexion to the peer */ protected void close(boolean notify) throws IOException { - bus.waitForAll(); traceDebug("closing connexion to "+appName); if (notify) sendBye("hasta la vista"); stopListening(); @@ -214,32 +244,19 @@ public class IvyClient implements Runnable { static String decode(String s) { return s.replace(escapeChar,'\n'); } static String encode(String s) { return s.replace('\n',escapeChar); } Integer getClientKey() { return clientKey ; } - int getAppPort() { return appPort ; } - void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);} - void delRegexp(int id) {sendString(DelRegexp,id,"");} - - int sendMsg(String message,boolean async) { - if (async) { - new Sender(message); - return -1; - } else { return sendMsg(message); } - } + protected void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);} + protected void delRegexp(int id) {sendString(DelRegexp,id,"");} - private int sendMsg(String message) { + protected int sendMsg(String message) { int count = 0; for (Enumeration e = regexps.keys();e.hasMoreElements();) { Integer key = (Integer)e.nextElement(); RE regexp = (RE)regexps.get(key); - /* - * GNURETOAPACHERE - int nb = regexp.getNumSubs(); - REMatch result = regexp.getMatch(message); - if (result==null) continue; // no match - count++; // match - send(Msg,key,regexp.getNumSubs(),result); - * - */ - if (!regexp.match(message)) continue; // no match + // re.match fails sometimes when it is called concurrently .. + // see 28412 on jakarta regexp bugzilla + synchronized (regexp) { + if (!regexp.match(message)) continue; // no match + } count++; // match sendResult(Msg,key,regexp); } @@ -262,25 +279,40 @@ public class IvyClient implements Runnable { /* * compares two peers the id is the couple (host,service port). - * true if the peers are similir. This should not happen, it is bad + * true if the peers are similar. This should not happen, it is bad */ - protected boolean sameClient( IvyClient clnt ) { - return ( appPort != 0 && appPort == clnt.appPort ) - && ( getRemoteAddress() == clnt.getRemoteAddress() ) ; + protected int compareTo(IvyClient clnt) { + // return clnt.clientKey.compareTo(clientKey); // Wrong. it's random... + return (clnt.socket.getPort()-socket.getLocalPort()); + } + + protected boolean equals(IvyClient clnt) { + if (clnt==this) return false; + // TODO go beyond the port number ! add some host processing, cf: + // IvyWatcher ... + if (remotePort==clnt.remotePort) return true; + /* + e.g. + if (socket.getInetAddress()==null) return false; + if (clnt.socket.getInetAddress()==null) return false; + if (!socket.getInetAddress().equals(clnt.socket.getInetAddress())) return false; + */ + return false; } /* * the code of the thread handling the incoming messages. */ public void run() { + traceDebug("Thread started"); Thread thisThread = Thread.currentThread(); String msg = null; try { - traceDebug("Connected from "+ socket.getInetAddress().getHostName()+ ":"+socket.getPort()); + traceDebug("connection established with "+ + socket.getInetAddress().getHostName()+ ":"+socket.getPort()); } catch (Exception ie) { traceDebug("Interrupted while resolving remote hostname"); } - traceDebug("Thread started"); while (clientThread==thisThread) { try { if ((msg=in.readLine()) != null ) { @@ -294,21 +326,23 @@ public class IvyClient implements Runnable { break; } } catch (IvyException ie) { - traceDebug("IvyClient caught an exception"); + traceDebug("caught an IvyException"); ie.printStackTrace(); } catch (InterruptedIOException ioe) { traceDebug("I have been interrupted. I'm about to leave my thread loop"); if (thisThread!=clientThread) break; } catch (IOException e) { - traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort()); + if (clientThread!=null) { + traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort()); + } break; } } traceDebug("normally Disconnected from "+ appName); - traceDebug("Thread stopped"); // invokes the disconnect applicationListeners if (!discCallbackPerformed) bus.clientDisconnects(this); discCallbackPerformed=true; + traceDebug("Thread stopped"); } private synchronized void sendBuffer( String buffer ) throws IvyException { @@ -340,21 +374,18 @@ public class IvyClient implements Runnable { } } - /* GNURETOAPACHERE - private void send(int type, Integer id, int nbsub, REMatch result) { - */ - private void sendResult(int type,Integer id, RE regexp) { - String buffer = type+" "+id+StartArg; - /* - for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1) - */ - for(int i=1;i<regexp.getParenCount();i++) buffer+=regexp.getParen(i)+EndArg; try { + String buffer = type+" "+id+StartArg; + for(int i=1;i<regexp.getParenCount();i++) + buffer+=regexp.getParen(i)+EndArg; sendBuffer(buffer); } catch (IvyException ie ) { System.err.println("received an exception: " + ie.getMessage()); ie.printStackTrace(); + } catch (StringIndexOutOfBoundsException sioobe) { + System.out.println("arg: "+regexp.getParenCount()+" "+regexp); + sioobe.printStackTrace(); } } @@ -461,10 +492,6 @@ public class IvyClient implements Runnable { break; case EndRegexp: bus.clientConnects(this); - /* TODO check with the protocol itself. - * the peer is perhaps not ready to handle this message - * an assymetric processing should be written - */ if (bus.ready_message!=null) sendMsg(bus.ready_message); break; case Msg: @@ -487,16 +514,33 @@ public class IvyClient implements Runnable { String error=s.substring(from,b.length); traceDebug("Error msg "+msgId+" "+error); break; - case SchizoToken: // aka BeginRegexp in other implementations + case SchizoToken: // aka BeginRegexp in other implementations, or MsgSync appName=s.substring(from,b.length); - appPort=msgId.intValue(); - if ( bus.checkConnected(this) ) { - try { - close(false); - } catch (IOException ioe) { - throw new IvyException("io " + ioe.getMessage()); + remotePort=msgId.intValue(); + traceDebug("the peer sent his service port: "+remotePort); + if (incoming) { + // incoming connexion, I wait for his token to send him mine ... + synchronized(bus) { + bus.addHalf(this); + try { + // there is another connexion. Should I leave ? + // Assymetric processing to prevent concurrent disconnexions + if (bus.shouldIleave(this)) { + traceDebug(toStringExt()+" should leave ..."); + close(false); + bus.removeHalf(this); + return false; + } + sendSchizo(); + handShake(); + } catch (IOException ioe) { + throw new IvyException(ioe.toString()); + } } - throw new IvyException("Rare ! A concurrent connect occured"); + } else { + // outgoing connexion + // I already have sent him a token + handShake(); } break; case DirectMsg: @@ -513,31 +557,14 @@ public class IvyClient implements Runnable { private void sendBye() {sendString(Bye,0,"");} private void sendBye(String message) {sendString(Bye,0,message);} - private InetAddress getRemoteAddress() { return socket.getInetAddress(); } - private void traceDebug(String s){ - if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s); + if (debug) System.out.println("-->IvyClient["+clientKey+","+bus.getSerial()+"] "+bus.appName+" (remote "+appName+")<-- "+s); } private void traceDebug(String[] tab){ - String s = "String array " + tab.length + " elements: "; + String s = " string array " + tab.length + " elements: "; for (int i=0;i<tab.length;i++) s+="("+tab[i]+") "; - if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s); + traceDebug(s); } - // a class to perform the threaded execution of each new message - // this is an experimental feature introduced in 1.2.4 - class Sender implements Runnable { - String message; - private Thread t; - public Sender(String message) { - this.message=message; - t=new Thread(Sender.this); - bus.registerThread(t); - t.start(); - bus.unRegisterThread(t); - } - public void run() { sendMsg(message); } - } // class Sender - } |