diff options
Diffstat (limited to 'src/IvyClient.java')
-rwxr-xr-x | src/IvyClient.java | 269 |
1 files changed, 184 insertions, 85 deletions
diff --git a/src/IvyClient.java b/src/IvyClient.java index 8aad199..b69c8d3 100755 --- a/src/IvyClient.java +++ b/src/IvyClient.java @@ -10,6 +10,18 @@ * created for each remote client. * * CHANGELOG: + * 1.2.4: + * - sendBuffer goes synchronized + * - sendMsg now has a async parameter, allowing the use of threads to + * delegate the sending of messages + * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg() + * - breaks the connexion with faulty Ivy clients (either protocol or invalid + * regexps, closes bug J007 (CM)) + * - sendDie now always requires a reason + * - invokes the disconnect applicationListeners at the end of the run() + * loop. + * closes Bug J006 (YJ) + * - changed the access of some functions ( sendRegexp, etc ) to protected * 1.2.3: * - silently stops on InterruptedIOException. * - direct Messages @@ -50,54 +62,64 @@ public class IvyClient implements Runnable { final static int DelRegexp = 4;/* the peer removes one of his regex */ final static int EndRegexp = 5;/* no more regexp in the handshake */ final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */ + /* SchizoToken is aka BeginRegexp in other implementations */ final static int DirectMsg = 7;/* the peer sends a direct message */ final static int Die = 8; /* the peer wants us to quit */ final static int Ping = 9; /* checks the presence of the other */ final static int Pong = 10; /* checks the presence of the other */ - final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ final static String StartArg = "\u0002";/* begin of arguments */ final static String EndArg = "\u0003"; /* end of arguments */ + final static String escape ="\u001A"; + final static char escapeChar = escape.charAt(0); + final static char endArgChar = EndArg.charAt(0); + final static char newLineChar = '\n'; - - private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; + // private variables + private String messages_classes[] = null; private Ivy bus; private Socket socket; private BufferedReader in; private OutputStream out; - private Hashtable regexp_in = new Hashtable(); - private Hashtable regexp_text = new Hashtable(); private int appPort; private boolean peerCalling; private volatile Thread clientThread;// volatile to ensure the quick communication private Integer clientKey ; private static boolean doping = (System.getProperty("IVY_PING")!=null) ; - final static int PINGTIMEOUT = 5000; + private final static int PINGTIMEOUT = 5000; private PINGER pinger; private volatile Thread pingerThread; + private boolean discCallbackPerformed = false; // protected variables String appName; + Hashtable regexps = new Hashtable(); + Hashtable regexpsText = new Hashtable(); + static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; + int protocol; + + IvyClient(){} - IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey) throws IOException { + IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException { appName = "Unknown"; appPort = 0; this.bus = bus; this.socket = socket; this.peerCalling=peerCalling; this.clientKey=clientKey; + this.protocol=protocol; in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = socket.getOutputStream(); - Hashtable regexps=bus.regexp_out; + Hashtable tosend=bus.selfIvyClient.regexpsText; // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion send(SchizoToken,bus.applicationPort,bus.appName); // sends our regexps to the peer - for (Enumeration e = regexps.keys(); e.hasMoreElements(); ) { + for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) { Integer ikey = (Integer)e.nextElement(); - sendRegexp( ikey.intValue(),(String)regexps.get(ikey)); + sendRegexp(ikey.intValue(),(String)tosend.get(ikey)); } send( EndRegexp,0,""); // spawns a thread to manage the incoming traffic on this @@ -111,88 +133,138 @@ public class IvyClient implements Runnable { } } + public String toString() { return "IvyClient "+bus.appName+":"+appName; } + /** * returns the name of the remote agent. */ public String getApplicationName() { return appName ; } - Integer getClientKey() { return clientKey ; } - /** * allow an Ivy package class to access the list of regexps at a * given time. * perhaps we should implement a new IvyApplicationListener method to * allow the notification of regexp addition and deletion + * The content is not modifyable because String are not mutable, and cannot + * be modified once they are create. + * @see getRegexpsArray to get a String[] result */ - Enumeration getRegexps() { return regexp_text.elements(); } + public Enumeration getRegexps() { return regexpsText.elements(); } - int getAppPort() { return appPort ; } - - void sendRegexp(int id,String regexp) { - send(AddRegexp,id,regexp); /* perhaps we should perform some checking here */ + /** + * allow an Ivy package class to access the list of regexps at a + * given time. + * @since 1.2.4 + */ + public String[] getRegexpsArray() { + String[] s = new String[regexpsText.size()]; + int i=0; + for (Enumeration e=getRegexps();e.hasMoreElements();) + s[i++]=(String)e.nextElement(); + return s; } - public void delRegexp(int id) {send( DelRegexp,id,"");} - /** * sends a direct message to the peer * @param id the numeric value provided to the remote client * @param message the string that will be match-tested */ - public void sendDirectMsg(int id,String message) { send(DirectMsg,id,message); } + public void sendDirectMsg(int id,String message) throws IvyException { + if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1)) + throw new IvyException("newline character not allowed in Ivy messages"); + send(DirectMsg,id,message); + } - /** - * closes the connexion to the peer. - * @param notify should I send Bye message ? - * the thread managing the socket is stopped - */ - void close(boolean notify) throws IOException { + /* closes the connexion to the peer */ + protected void close(boolean notify) throws IOException { + bus.waitForAll(); traceDebug("closing connexion to "+appName); if (doping&&(pinger!=null)) { pinger.stopPinging(); } if (notify) sendBye("hasta la vista"); stopListening(); - // bus.clientDisconnect(this); - socket.close(); // should I also close in and out ? + socket.close(); } /** - * sends the substrings of a message to the peer for each matching regexp. - * @param message the string that will be match-tested - * @return the number of messages sent to the peer + * asks the remote client to leave the bus. + * @param message the message that will be carried */ - int sendMsg( String message ) { + public void sendDie(String message) { send(Die,0,message); } + + /** + * checks the "validity" of a regular expression. + * @param exp the string being a regular expression + * @return true if the regexp is valid + * @since 1.2.4 + */ + public boolean CheckRegexp( String exp ) { + boolean ok = true; + if ( exp.startsWith( "^" )&&messages_classes!=null) { + ok=false; + for (int i=0 ; i < messages_classes.length;i++) { + if (messages_classes[i].equals(exp.substring(1))) return true; + } + } + return ok; + } + + /////////////////////////////////////////////////// + // + // PROTECTED METHODS + // + /////////////////////////////////////////////////// + static String decode(String s) { return s.replace(escapeChar,'\n'); } + static String encode(String s) { return s.replace('\n',escapeChar); } + Integer getClientKey() { return clientKey ; } + int getAppPort() { return appPort ; } + void sendRegexp(int id,String regexp) {send(AddRegexp,id,regexp);} + void delRegexp(int id) {send( DelRegexp,id,"");} + + int sendMsg(String message,boolean async) { + if (async) { + new Sender(message); + return -1; + } else { return sendMsg(message); } + } + + private int sendMsg(String message) { int count = 0; - for (Enumeration e = regexp_in.keys();e.hasMoreElements();) { + for (Enumeration e = regexps.keys();e.hasMoreElements();) { Integer key = (Integer)e.nextElement(); - RE regexp = (RE)regexp_in.get(key); + RE regexp = (RE)regexps.get(key); + int nb = regexp.getNumSubs(); REMatch result = regexp.getMatch(message); - if ( result != null ) { - send(Msg,key,regexp.getNumSubs(),result); - count++; - } + if (result==null) continue; // no match + count++; // match + send(Msg,key,regexp.getNumSubs(),result); } return count; } - - void stopListening() { + + /////////////////////////////////////////////////// + // + // PRIVATE METHODS + // + /////////////////////////////////////////////////// + + /* interrupt the listener thread */ + private void stopListening() { Thread t = clientThread; if (t==null) return; // we can be summoned to quit from two path at a time clientThread=null; t.interrupt(); } - /** + /* * compares two peers the id is the couple (host,service port). - * @param clnt the other peer - * @return true if the peers are similir. This should not happen, it is bad - * © ® (tm) + * true if the peers are similir. This should not happen, it is bad */ - boolean sameClient( IvyClient clnt ) { + protected boolean sameClient( IvyClient clnt ) { return ( appPort != 0 && appPort == clnt.appPort ) && ( getRemoteAddress() == clnt.getRemoteAddress() ) ; } - /** + /* * the code of the thread handling the incoming messages. */ public void run() { @@ -209,7 +281,10 @@ public class IvyClient implements Runnable { if ((msg=in.readLine()) != null ) { if (clientThread!=thisThread) break; // early stop during readLine() if (doping && (pingerThread!=null)) pingerThread.interrupt(); - newParseMsg(msg); + if (!newParseMsg(msg)) { + close(true); + break; + } } else { traceDebug("readline null ! leaving the thead"); break; @@ -226,9 +301,12 @@ public class IvyClient implements Runnable { } traceDebug("normally Disconnected from "+ appName); traceDebug("Thread stopped"); + // invokes the disconnect applicationListeners + if (!discCallbackPerformed) bus.clientDisconnects(this); + discCallbackPerformed=true; } - private void sendBuffer( String buffer ) throws IvyException { + private synchronized void sendBuffer( String buffer ) throws IvyException { buffer += "\n"; try { out.write(buffer.getBytes() ); @@ -237,8 +315,9 @@ public class IvyClient implements Runnable { traceDebug("I can't send my message to this client. He probably left"); // first, I'm not a first class IvyClient any more bus.removeClient(this); - // invokes the die applicationListeners - bus.disconnectReceived(this); + // invokes the disconnect applicationListeners + if (!discCallbackPerformed) bus.clientDisconnects(this); + discCallbackPerformed=true; try { close(false); } catch (IOException ioe) { @@ -258,12 +337,8 @@ public class IvyClient implements Runnable { private void send(int type, Integer id, int nbsub, REMatch result) { String buffer = type+" "+id+StartArg; - // Start at 1 because group 0 represent entire matching - for(int sub = 1; sub <= nbsub; sub++) { - if (result.getStartIndex(sub) > -1) { - buffer += result.toString(sub)+EndArg; - } - } + for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1) + buffer += result.toString(sub)+EndArg; try { sendBuffer(buffer); } catch (IvyException ie ) { @@ -293,24 +368,33 @@ public class IvyClient implements Runnable { return s; } - private void newParseMsg(String s) throws IvyException { + private boolean newParseMsg(String s) throws IvyException { byte[] b = s.getBytes(); int from=0,to=0,msgType; Integer msgId; while ((to<b.length)&&(b[to]!=' ')) to++; - if (to>=b.length) throw new IvyException("protocol error"); + // return false au lieu de throw + if (to>=b.length) { + System.out.println("protocol error from "+appName); + return false; + } try { msgType = Integer.parseInt(s.substring(from,to)); } catch (NumberFormatException nfe) { - throw new IvyException("protocol error on msgType"); + System.out.println("protocol error on msgType from "+appName); + return false; } from=to+1; while ((to<b.length)&&(b[to]!=2)) to++; - if (to>=b.length) throw new IvyException("protocol error"); + if (to>=b.length) { + System.out.println("protocol error from "+appName); + return false; + } try { msgId = new Integer(s.substring(from,to)); } catch (NumberFormatException nfe) { - throw new IvyException("protocol error on identifier"); + System.out.println("protocol error from "+appName+" "+s.substring(from,to)+" is not a number"); + return false; } from=to+1; switch (msgType) { @@ -319,7 +403,8 @@ public class IvyClient implements Runnable { // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners - bus.dieReceived(this,msgId.intValue()); + String message=s.substring(from,b.length); + bus.dieReceived(this,msgId.intValue(),message); // makes the bus die bus.stop(); try { @@ -334,7 +419,8 @@ public class IvyClient implements Runnable { // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners - bus.disconnectReceived(this); + if (!discCallbackPerformed) bus.clientDisconnects(this); + discCallbackPerformed=true; try { close(false); } catch (IOException ioe) { @@ -343,24 +429,28 @@ public class IvyClient implements Runnable { break; case AddRegexp: String regexp=s.substring(from,b.length); - if ( bus.CheckRegexp(regexp) ) { + if ( CheckRegexp(regexp) ) { try { - regexp_in.put(msgId,new RE(regexp)); - regexp_text.put(msgId,regexp); + regexps.put(msgId,new RE(regexp)); + regexpsText.put(msgId,regexp); + bus.regexpReceived(this,msgId.intValue(),regexp); } catch (REException e) { - throw new IvyException("regexp error " +e.getMessage()); + // the remote client sent an invalid regexp ! + System.out.println("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp"); + return true; } } else { throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName); } break; case DelRegexp: - regexp_in.remove(msgId); - regexp_text.remove(msgId); + regexps.remove(msgId); + String text=(String)regexpsText.remove(msgId); + bus.regexpDeleted(this,msgId.intValue(),text); break; case EndRegexp: - bus.connect(this); - /* + bus.clientConnects(this); + /* TODO check with the protocol itself. * the peer is perhaps not ready to handle this message * an assymetric processing should be written */ @@ -371,7 +461,7 @@ public class IvyClient implements Runnable { while (to<b.length) { while ( (to<b.length) && (b[to]!=3) ) to++; if (to<b.length) { - v.addElement(s.substring(from,to)); + v.addElement(decode(s.substring(from,to))); to++; from=to; } @@ -382,7 +472,7 @@ public class IvyClient implements Runnable { // for developpemnt purposes // System.out.println(" *"+tab[i]+"* "+(tab[i]).length()); } - bus.callCallback(this,msgId,tab); + bus.selfIvyClient.callCallback(this,msgId,tab); break; case Pong: String paramPong=s.substring(from,b.length); @@ -398,7 +488,7 @@ public class IvyClient implements Runnable { String error=s.substring(from,b.length); traceDebug("Error msg "+msgId+" "+error); break; - case SchizoToken: + case SchizoToken: // aka BeginRegexp in other implementations appName=s.substring(from,b.length); appPort=msgId.intValue(); if ( bus.checkConnected(this) ) { @@ -415,25 +505,19 @@ public class IvyClient implements Runnable { bus.directMessage( this, msgId.intValue(), direct ); break; default: - throw new IvyException("protocol error, unknown message type "+msgType); + System.out.println("protocol error from "+appName+", unknown message type "+msgType); + return false; } + return true; } - void sendPong(String s) {send(Pong,0,s);} - void sendPing(String s) {send(Ping,0,s);} - + protected void sendPong(String s) {send(Pong,0,s);} + protected void sendPing(String s) {send(Ping,0,s);} private void sendBye() {send(Bye,0,"");} private void sendBye(String message) {send(Bye,0,message);} - - public void sendDie() { send(Die,0,""); } - public void sendDie(String message) {send(Die,0,message);} private InetAddress getRemoteAddress() { return socket.getInetAddress(); } - public String toString() { - return "IvyClient "+bus.appName+":"+appName; - } - private void traceDebug(String s){ if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s); } @@ -455,4 +539,19 @@ public class IvyClient implements Runnable { public void stopPinging() { isPinging=false; pingerThread.interrupt();} } + // a class to perform the threaded execution of each new message + // this is an experimental feature introduced in 1.2.4 + class Sender implements Runnable { + String message; + private Thread t; + public Sender(String message) { + this.message=message; + t=new Thread(Sender.this); + bus.registerThread(t); + t.start(); + bus.unRegisterThread(t); + } + public void run() { sendMsg(message); } + } // class Sender + } |