/** * A private Class for the the peers on the bus. * * @author Yannick Jestin * @author http://www.tls.cena.fr/products/ivy/ * * each time a connexion is made with a remote peer, the regexp are exchanged * once ready, a ready message is sent, and then we can send messages, * die messages, direct messages, add or remove regexps, or quit. A thread is * created for each remote client. * * CHANGELOG: * 1.2.4: * - sendBuffer goes synchronized * - sendMsg now has a async parameter, allowing the use of threads to * delegate the sending of messages * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg() * - breaks the connexion with faulty Ivy clients (either protocol or invalid * regexps, closes bug J007 (CM)) * - sendDie now always requires a reason * - invokes the disconnect applicationListeners at the end of the run() * loop. * closes Bug J006 (YJ) * - changed the access of some functions ( sendRegexp, etc ) to protected * 1.2.3: * - silently stops on InterruptedIOException. * - direct Messages * - deals with early stops during readline * 1.2.2: * - cleared a bug causing the CPU to be eating when a remote client left the * bus. closes Damien Figarol bug reported on december, 2002. It is handled * in the readline() thread * 1.2.1: * - removes a NullPointerException when stops pinging on a pinger that * wasn't even started * 1.0.12: * - introducing a Ping and Pong in the protocol, in order to detect the loss of * connection faster. Enabled through the use of -DIVY_PING variable only * the PINGTIMEOUT value in milliseconds allows me to have a status of the * socket guaranteed after this timeout * - right handling of IOExceptions in sendBuffer, the Client is removed from * the bus * - sendDie goes public, so does sendDie(String) * - appName visibility changed from private to protected * 1.0.10: * - removed the timeout bug eating all the CPU resources */ package fr.dgac.ivy ; import java.lang.Thread; import java.net.*; import java.io.*; import java.util.*; import gnu.regexp.*; public class IvyClient implements Runnable { /* the protocol magic numbers */ final static int Bye = 0; /* end of the peer */ final static int AddRegexp = 1;/* the peer adds a regexp */ final static int Msg = 2 ; /* the peer sends a message */ final static int Error = 3; /* error message */ final static int DelRegexp = 4;/* the peer removes one of his regex */ final static int EndRegexp = 5;/* no more regexp in the handshake */ final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */ /* SchizoToken is aka BeginRegexp in other implementations */ final static int DirectMsg = 7;/* the peer sends a direct message */ final static int Die = 8; /* the peer wants us to quit */ final static int Ping = 9; /* checks the presence of the other */ final static int Pong = 10; /* checks the presence of the other */ final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ final static String StartArg = "\u0002";/* begin of arguments */ final static String EndArg = "\u0003"; /* end of arguments */ final static String escape ="\u001A"; final static char escapeChar = escape.charAt(0); final static char endArgChar = EndArg.charAt(0); final static char newLineChar = '\n'; // private variables private String messages_classes[] = null; private Ivy bus; private Socket socket; private BufferedReader in; private OutputStream out; private int appPort; private boolean peerCalling; private volatile Thread clientThread;// volatile to ensure the quick communication private Integer clientKey ; private static boolean doping = (System.getProperty("IVY_PING")!=null) ; private final static int PINGTIMEOUT = 5000; private PINGER pinger; private volatile Thread pingerThread; private boolean discCallbackPerformed = false; // protected variables String appName; Hashtable regexps = new Hashtable(); Hashtable regexpsText = new Hashtable(); static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; int protocol; IvyClient(){} IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException { appName = "Unknown"; appPort = 0; this.bus = bus; this.socket = socket; this.peerCalling=peerCalling; this.clientKey=clientKey; this.protocol=protocol; in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = socket.getOutputStream(); Hashtable tosend=bus.selfIvyClient.regexpsText; // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion send(SchizoToken,bus.applicationPort,bus.appName); // sends our regexps to the peer for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) { Integer ikey = (Integer)e.nextElement(); sendRegexp(ikey.intValue(),(String)tosend.get(ikey)); } send( EndRegexp,0,""); // spawns a thread to manage the incoming traffic on this // socket. We should be ready to receive messages now. clientThread = new Thread(this); clientThread .start(); if (doping) { pinger = new PINGER(); pingerThread=new Thread(pinger); pingerThread.start(); } } public String toString() { return "IvyClient "+bus.appName+":"+appName; } /** * returns the name of the remote agent. */ public String getApplicationName() { return appName ; } /** * allow an Ivy package class to access the list of regexps at a * given time. * perhaps we should implement a new IvyApplicationListener method to * allow the notification of regexp addition and deletion * The content is not modifyable because String are not mutable, and cannot * be modified once they are create. * @see getRegexpsArray to get a String[] result */ public Enumeration getRegexps() { return regexpsText.elements(); } /** * allow an Ivy package class to access the list of regexps at a * given time. * @since 1.2.4 */ public String[] getRegexpsArray() { String[] s = new String[regexpsText.size()]; int i=0; for (Enumeration e=getRegexps();e.hasMoreElements();) s[i++]=(String)e.nextElement(); return s; } /** * sends a direct message to the peer * @param id the numeric value provided to the remote client * @param message the string that will be match-tested */ public void sendDirectMsg(int id,String message) throws IvyException { if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1)) throw new IvyException("newline character not allowed in Ivy messages"); send(DirectMsg,id,message); } /* closes the connexion to the peer */ protected void close(boolean notify) throws IOException { bus.waitForAll(); traceDebug("closing connexion to "+appName); if (doping&&(pinger!=null)) { pinger.stopPinging(); } if (notify) sendBye("hasta la vista"); stopListening(); socket.close(); } /** * asks the remote client to leave the bus. * @param message the message that will be carried */ public void sendDie(String message) { send(Die,0,message); } /** * checks the "validity" of a regular expression. * @param exp the string being a regular expression * @return true if the regexp is valid * @since 1.2.4 */ public boolean CheckRegexp( String exp ) { boolean ok = true; if ( exp.startsWith( "^" )&&messages_classes!=null) { ok=false; for (int i=0 ; i < messages_classes.length;i++) { if (messages_classes[i].equals(exp.substring(1))) return true; } } return ok; } /////////////////////////////////////////////////// // // PROTECTED METHODS // /////////////////////////////////////////////////// static String decode(String s) { return s.replace(escapeChar,'\n'); } static String encode(String s) { return s.replace('\n',escapeChar); } Integer getClientKey() { return clientKey ; } int getAppPort() { return appPort ; } void sendRegexp(int id,String regexp) {send(AddRegexp,id,regexp);} void delRegexp(int id) {send( DelRegexp,id,"");} int sendMsg(String message,boolean async) { if (async) { new Sender(message); return -1; } else { return sendMsg(message); } } private int sendMsg(String message) { int count = 0; for (Enumeration e = regexps.keys();e.hasMoreElements();) { Integer key = (Integer)e.nextElement(); RE regexp = (RE)regexps.get(key); int nb = regexp.getNumSubs(); REMatch result = regexp.getMatch(message); if (result==null) continue; // no match count++; // match send(Msg,key,regexp.getNumSubs(),result); } return count; } /////////////////////////////////////////////////// // // PRIVATE METHODS // /////////////////////////////////////////////////// /* interrupt the listener thread */ private void stopListening() { Thread t = clientThread; if (t==null) return; // we can be summoned to quit from two path at a time clientThread=null; t.interrupt(); } /* * compares two peers the id is the couple (host,service port). * true if the peers are similir. This should not happen, it is bad */ protected boolean sameClient( IvyClient clnt ) { return ( appPort != 0 && appPort == clnt.appPort ) && ( getRemoteAddress() == clnt.getRemoteAddress() ) ; } /* * the code of the thread handling the incoming messages. */ public void run() { Thread thisThread = Thread.currentThread(); String msg = null; try { traceDebug("Connected from "+ socket.getInetAddress().getHostName()+ ":"+socket.getPort()); } catch (Exception ie) { traceDebug("Interrupted while resolving remote hostname"); } traceDebug("Thread started"); while (clientThread==thisThread) { try { if ((msg=in.readLine()) != null ) { if (clientThread!=thisThread) break; // early stop during readLine() if (doping && (pingerThread!=null)) pingerThread.interrupt(); if (!newParseMsg(msg)) { close(true); break; } } else { traceDebug("readline null ! leaving the thead"); break; } } catch (IvyException ie) { ie.printStackTrace(); } catch (InterruptedIOException ioe) { traceDebug("I have been interrupted. I'm about to leave my thread loop"); if (thisThread!=clientThread) break; } catch (IOException e) { traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort()); break; } } traceDebug("normally Disconnected from "+ appName); traceDebug("Thread stopped"); // invokes the disconnect applicationListeners if (!discCallbackPerformed) bus.clientDisconnects(this); discCallbackPerformed=true; } private synchronized void sendBuffer( String buffer ) throws IvyException { buffer += "\n"; try { out.write(buffer.getBytes() ); out.flush(); } catch ( IOException e ) { traceDebug("I can't send my message to this client. He probably left"); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the disconnect applicationListeners if (!discCallbackPerformed) bus.clientDisconnects(this); discCallbackPerformed=true; try { close(false); } catch (IOException ioe) { throw new IvyException("close failed"+ioe.getMessage()); } } } private void send(int type, int id, String arg) { try { sendBuffer(type+" "+id+StartArg+arg); } catch (IvyException ie ) { System.err.println("received an exception: " + ie.getMessage()); ie.printStackTrace(); } } private void send(int type, Integer id, int nbsub, REMatch result) { String buffer = type+" "+id+StartArg; for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1) buffer += result.toString(sub)+EndArg; try { sendBuffer(buffer); } catch (IvyException ie ) { System.err.println("received an exception: " + ie.getMessage()); ie.printStackTrace(); } } private String dumpHex(String s) { byte[] b = s.getBytes(); String out = ""; String zu = "\t"; for (int i=0;i15) ? c : 'X')+" "; } out += zu; return out; } private String dumpMsg(String s) { String deb = " \""+s+"\" "+s.length()+" cars, "; for (int i=0;i=b.length) { System.out.println("protocol error from "+appName); return false; } try { msgType = Integer.parseInt(s.substring(from,to)); } catch (NumberFormatException nfe) { System.out.println("protocol error on msgType from "+appName); return false; } from=to+1; while ((to=b.length) { System.out.println("protocol error from "+appName); return false; } try { msgId = new Integer(s.substring(from,to)); } catch (NumberFormatException nfe) { System.out.println("protocol error from "+appName+" "+s.substring(from,to)+" is not a number"); return false; } from=to+1; switch (msgType) { case Die: traceDebug("received die Message from " + appName); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners String message=s.substring(from,b.length); bus.dieReceived(this,msgId.intValue(),message); // makes the bus die bus.stop(); try { close(false); } catch (IOException ioe) { throw new IvyException(ioe.getMessage()); } break; case Bye: // the peer quits traceDebug("received bye Message from "+appName); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners if (!discCallbackPerformed) bus.clientDisconnects(this); discCallbackPerformed=true; try { close(false); } catch (IOException ioe) { throw new IvyException(ioe.getMessage()); } break; case AddRegexp: String regexp=s.substring(from,b.length); if ( CheckRegexp(regexp) ) { try { regexps.put(msgId,new RE(regexp)); regexpsText.put(msgId,regexp); bus.regexpReceived(this,msgId.intValue(),regexp); } catch (REException e) { // the remote client sent an invalid regexp ! System.out.println("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp"); return true; } } else { throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName); } break; case DelRegexp: regexps.remove(msgId); String text=(String)regexpsText.remove(msgId); bus.regexpDeleted(this,msgId.intValue(),text); break; case EndRegexp: bus.clientConnects(this); /* TODO check with the protocol itself. * the peer is perhaps not ready to handle this message * an assymetric processing should be written */ if (bus.ready_message!=null) sendMsg(bus.ready_message); break; case Msg: Vector v = new Vector(); while (toIvyClient "+bus.appName+":"+appName+"<-- "+s); } class PINGER implements Runnable { boolean isPinging = false; public void run() { isPinging=true; traceDebug("Pinger Thread started"); while (isPinging) { try { Thread.sleep(PINGTIMEOUT); sendPing("are you here ?"); } catch (InterruptedException ie) { } } traceDebug("Pinger Thread stopped"); } public void stopPinging() { isPinging=false; pingerThread.interrupt();} } // a class to perform the threaded execution of each new message // this is an experimental feature introduced in 1.2.4 class Sender implements Runnable { String message; private Thread t; public Sender(String message) { this.message=message; t=new Thread(Sender.this); bus.registerThread(t); t.start(); bus.unRegisterThread(t); } public void run() { sendMsg(message); } } // class Sender }