diff options
-rw-r--r-- | doc/ivy-java.1 | 9 | ||||
-rw-r--r-- | examples/TestIvySwing.java | 1 | ||||
-rwxr-xr-x | src/Ivy.java | 227 | ||||
-rwxr-xr-x | src/IvyApplicationAdapter.java | 5 | ||||
-rwxr-xr-x | src/IvyClient.java | 188 | ||||
-rw-r--r-- | src/IvyDaemon.java | 2 | ||||
-rwxr-xr-x | src/IvyException.java | 4 | ||||
-rwxr-xr-x | src/IvyWatcher.java | 203 | ||||
-rw-r--r-- | src/Makefile | 2 | ||||
-rw-r--r-- | src/Probe.java | 130 | ||||
-rw-r--r-- | tests/Makefile | 35 |
11 files changed, 528 insertions, 278 deletions
diff --git a/doc/ivy-java.1 b/doc/ivy-java.1 index 76b8a94..896c898 100644 --- a/doc/ivy-java.1 +++ b/doc/ivy-java.1 @@ -62,13 +62,13 @@ changed on this point since 1.3 . You can still use it on a previous JDK. .nf .SH "FILES" -.I /usr/share/java/repository/fr/dgac.ivy/*.class +.I /usr/share/java/repository/fr/dgac/ivy/*.class .nf .I /usr/share/doc/ivy\-java/ .SH "EXAMPLES" .nf -java \-DIVYBUS=143.196.53.255:2011 fr.dgac.ivy.Probe +java \-DIVYBUS=143.196.53.255:2011 fr.dgac.ivy.Probe -help .nf java \-DIVYBUS=127.0.0.1:3042,10.192.36:2021,228.5.6.7:54321 fr.dgac.ivy.Probe '^coucou (.*)' .nf @@ -81,7 +81,7 @@ java fr.dgac.ivy.Probe \-b 10.192.36:2021 '^coucou (.*)' .nf .I libgnu\-getopt\-java version 1.0.9 .nf -.I for TestIvySwing, swing if it is not bundled with your jdk1.1 +.I If you want to run the TestIvySwing example with a 1.1 VM, you will need a swing jar .SH "BUGS" So far, there are problems with old java VMs ( 1.1.7A ) @@ -103,7 +103,6 @@ ivyprobe (1) For further details, please refer to the Ivy html page at http://www.tls.cena.fr/products/ivy/ .SH "NOTES" -In case of any comment or bug report on this library, please contact -jestin@cena.fr +In case of any comment or bug report on this library, please contact jestin@cena.fr .nf Special thanks to Michelle Jacomi (jacomi@cena.fr) for her kind support diff --git a/examples/TestIvySwing.java b/examples/TestIvySwing.java index 07b226e..1d5a25a 100644 --- a/examples/TestIvySwing.java +++ b/examples/TestIvySwing.java @@ -223,6 +223,7 @@ class TestIvySwing extends JPanel implements IvyApplicationListener { f.dispose(); // I leave when the last TestIvySwing exits if (--nbTIS == 0) System.exit(0); + System.out.println("closed"); } public void windowActivated(WindowEvent e) {tfSend.grabFocus();} } diff --git a/src/Ivy.java b/src/Ivy.java index 08f3c6c..15926bc 100755 --- a/src/Ivy.java +++ b/src/Ivy.java @@ -10,8 +10,7 @@ package fr.dgac.ivy ; import java.net.*; import java.io.*; -import java.util.Vector; -import java.util.Hashtable; +import java.util.*; /** * A class connecting to the Ivy software bus. @@ -24,15 +23,22 @@ import java.util.Hashtable; * * CHANGELOG: * 1.0.12: + * - setSoTimeout is back on the server socket + * - added a regression test main() + * - clients is now a Hashtable. the deletion now works better + * - getIvyClientsByName allows the research of IvyClient by name + * - getDomain doesnt throw IvyException anymore * - removed the close() disconnect(IvyClient c). Fixes a big badaboum bug * - getDomain becomes public + * - adding the sendToSelf feature + * - fixed the printStackTrace upon closing of the ServerSocket after a close() */ -public class Ivy implements Runnable, IvyApplicationListener { +public class Ivy implements Runnable { /** * the name of the application on the bus */ - public String appName; + String appName; /** * the protocol version number */ @@ -49,22 +55,25 @@ public class Ivy implements Runnable, IvyApplicationListener { * the library version, useful for development purposes only, when java is * invoked with -DIVY_DEBUG */ - public static final String libVersion ="1.0.12"; + public static final String libVersion ="1.2.0"; private boolean debug; - private static int serial=0; /* an unique ID for each regexp */ + private static int serial=0; /* an unique ID for each regexp */ + private static int clientSerial=0; /* an unique ID for each IvyClient */ private ServerSocket app; - private IvyWatcher watch; - private Thread server; + private Vector watchers; + private volatile Thread serverThread; // to ensure quick communication of the end private Hashtable callbacks = new Hashtable(); - private Vector clients = new Vector(); + private Hashtable clients = new Hashtable(); private Vector ivyApplicationListenerList = new Vector(); private String messages_classes[] = null; - + private boolean sendToSelf = false ; + private boolean stopped = false; int applicationPort; /* Application port number */ - boolean ivyRunning = false; Hashtable regexp_out = new Hashtable(); String ready_message = null; + + public final static int TIMEOUTLENGTH = 3000; /** * Readies the structures for the software bus connexion. @@ -86,8 +95,8 @@ public class Ivy implements Runnable, IvyApplicationListener { /** * connects the Ivy bus to a domain or list of domains. * - * <li>One thread (IvyWatcher) to watch rendezvous traffic (UDP or TCPMulticast) - * <li>One thread (server/Ivy) to accept incoming connexions on server socket + * <li>One thread (IvyWatcher) for each traffic rendezvous (either UDP broadcast or TCPMulticast) + * <li>One thread (serverThread/Ivy) to accept incoming connexions on server socket * <li>a thread for each IvyClient when the connexion has been done * * @param domainbus a domain of the form 10.0.0:1234, it is similar to the @@ -100,41 +109,63 @@ public class Ivy implements Runnable, IvyApplicationListener { public void start(String domainbus) throws IvyException { try { app = new ServerSocket(0); + app.setSoTimeout(TIMEOUTLENGTH); applicationPort = app.getLocalPort(); } catch (IOException e) { throw new IvyException("can't open TCP service socket " + e ); } traceDebug("lib: "+libVersion+" protocol: "+PROCOCOLVERSION+" TCP service open on port "+applicationPort); - watch = new IvyWatcher(this); - ivyRunning = true; - server = new Thread(this); - server.start(); - watch.start(getDomain(domainbus)); + watchers = new Vector(); + + // readies the rendezvous : an IvyWatcher (thread) per domain bus + StringTokenizer st = new StringTokenizer(domainbus,","); + while ( st.hasMoreTokens()) { + String s = st.nextToken() ; + String domainaddr=IvyWatcher.getDomain(s); + int port=IvyWatcher.getPort(s); + IvyWatcher watcher =new IvyWatcher(this,domainaddr,port); + watchers.addElement(watcher); + } + serverThread = new Thread(this); + serverThread.start(); + // sends the broadcasts and listen to incoming connexions + for (int i=0;i<watchers.size();i++){ ((IvyWatcher)watchers.elementAt(i)).start(); } } /** - * disconnects from the Ivy bus. + * disconnects from the Ivy bus */ public void stop() { - if (!ivyRunning) { - traceDebug("was already stropped ..."); - return; - } + if (stopped ) return; + traceDebug("beginning stopping the bus"); try { - ivyRunning = false; + // stopping the serverThread + Thread t=serverThread; + serverThread=null; + t.interrupt(); app.close(); - watch.stop(); - for (int i=0;i<clients.size();i++) { - IvyClient client = (IvyClient)clients.elementAt(i); - client.close("normal Ivy Stopping..."); + // stopping the IvyWatchers + for (int i=0;i<watchers.size();i++){ ((IvyWatcher)watchers.elementAt(i)).stop(); } + // stopping the remaining IvyClients + for (Enumeration e=clients.elements();e.hasMoreElements();) { + IvyClient c = (IvyClient)e.nextElement(); + c.close(true); + removeClient(c); } } catch (IOException e) { traceDebug("IOexception Stop "); } - clients.removeAllElements(); + traceDebug("the bus should have stopped so far"); + stopped = true; } /** + * Toggles the sending of messages to oneself + * + */ + public void sendToSelf(boolean b) {sendToSelf=b;} + + /** * Performs a pattern matching according to everyone's regexps, and sends * the results to the relevant ivy agents. * <p><em>There is one thread for each client connected, we could also @@ -147,10 +178,13 @@ public class Ivy implements Runnable, IvyApplicationListener { int count = 0; // an alternate implementation would one sender thread per client // instead of one for all the clients. It might be a performance issue - for ( int i = 0 ; i < clients.size(); i++ ) { - IvyClient client = (IvyClient)clients.elementAt(i); + for ( Enumeration e=clients.elements();e.hasMoreElements();) { + IvyClient client = (IvyClient)e.nextElement(); count += client.sendMsg( message ); } + if (sendToSelf) { + // TODO + } return count; } @@ -176,8 +210,9 @@ public class Ivy implements Runnable, IvyApplicationListener { regexp_out.put(key,regexp); callbacks.put(key,callback ); // notifies the other clients this new regexp - for (int i=0;i<clients.size();i++){ - ((IvyClient)clients.elementAt(i)).sendRegexp(key.intValue(),regexp); + for (Enumeration e=clients.elements();e.hasMoreElements();) { + IvyClient c = (IvyClient)e.nextElement(); + c.sendRegexp(key.intValue(),regexp); } return key.intValue(); } @@ -193,8 +228,8 @@ public class Ivy implements Runnable, IvyApplicationListener { || (callbacks.remove(key) == null ) ) { throw new IvyException("client wants to remove an unexistant regexp "+id); } - for (int i=0;i<clients.size();i++ ) { - ((IvyClient)clients.elementAt(i)).delRegexp(id ); + for (Enumeration e=clients.elements();e.hasMoreElements();) { + ((IvyClient)e.nextElement()).delRegexp(id ); } } @@ -219,44 +254,67 @@ public class Ivy implements Runnable, IvyApplicationListener { ivyApplicationListenerList.removeElementAt(id); } - /* invokes the application listeners upon arrival of a new Ivy client - * it *might* be considered poor style to invoke them as the same level - * as the others applicationListeners. This is part of the interface - */ + /* invokes the application listeners upon arrival of a new Ivy client */ public void connect(IvyClient client){ for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).connect(client); } } - /* invokes the application listeners upon departure of an Ivy client - * ibid. - */ - public void disconnect(IvyClient client){ + /* invokes the application listeners upon arrival of a new Ivy client */ + void disconnectReceived(IvyClient client){ for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).disconnect(client); } } - /* invokes the application listeners upon death of an Ivy client - * ibid + /* + * removes a client from the list */ - public void die(IvyClient client, int id){ - for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { - ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).die(client, id); + void removeClient(IvyClient c) { clients.remove(c.getClientKey()); } + + /** + * invokes the application listeners when we are summoned to die + * then stops + */ + public void dieReceived(IvyClient client, int id){ + for ( int i=0 ;i<ivyApplicationListenerList.size();i++ ) { + ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).die(client,id); } - stop(); } - /* invokes the direct message callbacks - * ibid - */ + /* invokes the direct message callbacks */ public void directMessage( IvyClient client, int id,String msgarg ){ for ( int i = 0 ; i < ivyApplicationListenerList.size(); i++ ) { ((IvyApplicationListener)ivyApplicationListenerList.elementAt(i)).directMessage(client,id, msgarg); } } + /** + * gives the names of IvyClient(s) + */ + public Vector getIvyClients() { + Vector v=new Vector(); + for (Enumeration e=clients.elements();e.hasMoreElements();) { + v.add(e.nextElement()); + } + return v; + } + + /** + * gives a list of IvyClient(s) with the name given in parameter + * + * @param name The name of the Ivy agent you're looking for + */ + public Vector getIvyClientsByName(String name) { + Vector v=new Vector(); + for (Enumeration e=clients.elements();e.hasMoreElements();) { + IvyClient ic = (IvyClient)e.nextElement(); + if ( ((ic.getApplicationName()).compareTo(name))==0 ) v.addElement(ic); + } + return v; + } + /////////////////////////////////////////////////////////////////: // // Protected methods @@ -264,12 +322,10 @@ public class Ivy implements Runnable, IvyApplicationListener { /////////////////////////////////////////////////////////////////: void addClient(Socket socket,boolean peerCalling) throws IOException { - IvyClient client = new IvyClient(this, socket,peerCalling); - clients.addElement(client); - } - - void removeClient( IvyClient client ) { - clients.removeElement( client ); + IvyClient client = new IvyClient( + this, socket,peerCalling,new Integer(clientSerial++)); + clients.put(client.getClientKey(),client); + traceDebug(getClientNames()); } void callCallback(IvyClient client, Integer key, String[] tab) throws IvyException { @@ -300,8 +356,7 @@ public class Ivy implements Runnable, IvyApplicationListener { return tab; } - - public static String getDomain(String domainbus) throws IvyException { + public static String getDomain(String domainbus) { if ( domainbus == null ) domainbus = System.getProperty("IVYBUS"); if ( domainbus == null ) domainbus = DEFAULT_DOMAIN; return domainbus; @@ -329,39 +384,59 @@ public class Ivy implements Runnable, IvyApplicationListener { */ boolean checkConnected( IvyClient clnt ) { if ( clnt.getAppPort() == 0 ) return false; - for ( int i = 0 ; i < clients.size(); i++ ) { - IvyClient client = (IvyClient)clients.elementAt(i); + for (Enumeration e=clients.elements();e.hasMoreElements();) { + IvyClient client = (IvyClient)e.nextElement(); if ( clnt != client && client.sameClient( clnt ) ) return true; } return false; } /* - * the service socket thread reader. + * the service socket thread reader main loop */ public void run() { - // traceDebug("IvyServer beginning transmission"); - while(ivyRunning){ + Thread thisThread=Thread.currentThread(); + while(thisThread==serverThread){ try { Socket socket = app.accept(); addClient(socket,true); // the peer called me + } catch (InterruptedIOException ie) { + if (thisThread!=serverThread) break; } catch( IOException e ) { - traceDebug("Error IvyServer exception: " + e.getMessage()); - System.out.println("DEBUG TCP socket reader caught an exception " + e.getMessage()); - e.printStackTrace(); + if (serverThread==thisThread) { + traceDebug("Error IvyServer exception: " + e.getMessage()); + System.out.println("Ivy server socket reader caught an exception " + e.getMessage()); + e.printStackTrace(); + } else { traceDebug("my server socket has been closed"); } } } - // traceDebug("IvyServer end of transmission"); + traceDebug("stopping the server Thread"); } - /////////////////////////////////////////////////////////////////: - // - // Private methods - // - /////////////////////////////////////////////////////////////////: - private void traceDebug(String s){ - if (debug) System.out.println("-->ivy<-- "+s); + private void traceDebug(String s){ if (debug) System.out.println("-->ivy<-- "+s); } + + /* a small private method for debbugging purposes */ + private String getClientNames() { + String s = appName+" clients are: "; + for (Enumeration e=clients.elements();e.hasMoreElements();){ + s+=((IvyClient)e.nextElement()).getApplicationName()+" "; + } + return s; + } + + /* + * unitary test. Normally, running java fr.dgac.ivy.Ivy should stop in 2.3 seconds :) + */ + public static void main(String[] args) { + Ivy bus = new Ivy("Test Unitaire","TU ready",null); + try { + bus.start(DEFAULT_DOMAIN); + try { Thread.sleep(2000); } catch (InterruptedException ie) { } + bus.stop(); + } catch (IvyException ie) { + ie.printStackTrace(); + } } } // class Ivy diff --git a/src/IvyApplicationAdapter.java b/src/IvyApplicationAdapter.java index af198dd..a07020f 100755 --- a/src/IvyApplicationAdapter.java +++ b/src/IvyApplicationAdapter.java @@ -13,11 +13,14 @@ package fr.dgac.ivy; * convenience for implementing a subset of the methods of the * applicationlistener. See the AWT 1.1 framework for further information on * this. + * + * changelog: + * 1.0.12: fixed a missing id in the parameters */ public abstract class IvyApplicationAdapter implements IvyApplicationListener { public void connect( IvyClient client ) { } public void disconnect( IvyClient client ) { } - public void die( IvyClient client ) { } + public void die( IvyClient client, int id ) { } public void directMessage( IvyClient client, int id,String msgarg ) {} } diff --git a/src/IvyClient.java b/src/IvyClient.java index 18a50f2..3857631 100755 --- a/src/IvyClient.java +++ b/src/IvyClient.java @@ -20,8 +20,14 @@ import gnu.regexp.*; * * CHANGELOG: * 1.0.12: + * - introducing a Ping and Pong in the protocol, in order to detect the loss of + * connection faster. Enabled through the use of -DIVY_PING variable only + * the PINGTIMEOUT value in milliseconds allows me to have a status of the + * socket guaranteed after this timeout * - right handling of IOExceptions in sendBuffer, the Client is removed from - * the bus + * the bus + * - sendDie goes public, so does sendDie(String) + * - appName visibility changed from private to protected * 1.0.10: * - removed the timeout bug eating all the CPU resources */ @@ -38,10 +44,14 @@ public class IvyClient implements Runnable { final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */ final static int DirectMsg = 7;/* the peer sends a direct message */ final static int Die = 8; /* the peer wants us to quit */ + final static int Ping = 9; /* checks the presence of the other */ + final static int Pong = 10; /* checks the presence of the other */ + final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ final static String StartArg = "\u0002";/* begin of arguments */ final static String EndArg = "\u0003"; /* end of arguments */ + private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; private Ivy bus; private Socket socket; @@ -49,19 +59,25 @@ public class IvyClient implements Runnable { private OutputStream out; private Hashtable regexp_in = new Hashtable(); private Hashtable regexp_text = new Hashtable(); - private String appName; private int appPort; - private boolean gardefou=true; private boolean peerCalling; - private Thread client; + private volatile Thread clientThread;// volatile to ensure the quick communication + private Integer clientKey ; + private static boolean doping = (System.getProperty("IVY_PING")!=null) ; + final static int PINGTIMEOUT = 5000; + private PINGER pinger; + private volatile Thread pingerThread; + + // protected variables + String appName; - IvyClient(Ivy bus, Socket socket,boolean peerCalling) throws IOException { + IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey) throws IOException { appName = "Unknown"; appPort = 0; this.bus = bus; this.socket = socket; this.peerCalling=peerCalling; - // CHANGE: socket.setSoTimeout(100); + this.clientKey=clientKey; in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = socket.getOutputStream(); Hashtable regexps=bus.regexp_out; @@ -72,14 +88,19 @@ public class IvyClient implements Runnable { send(SchizoToken,bus.applicationPort,bus.appName); // sends our regexps to the peer for (Enumeration e = regexps.keys(); e.hasMoreElements(); ) { - Integer key = (Integer)e.nextElement(); - sendRegexp( key.intValue(),(String)regexps.get(key)); + Integer ikey = (Integer)e.nextElement(); + sendRegexp( ikey.intValue(),(String)regexps.get(ikey)); } send( EndRegexp,0,""); // spawns a thread to manage the incoming traffic on this // socket. We should be ready to receive messages now. - client= new Thread(this); - client.start(); + clientThread = new Thread(this); + clientThread .start(); + if (doping) { + pinger = new PINGER(); + pingerThread=new Thread(pinger); + pingerThread.start(); + } } /** @@ -87,6 +108,8 @@ public class IvyClient implements Runnable { */ public String getApplicationName() { return appName ; } + Integer getClientKey() { return clientKey ; } + /** * allow an Ivy package class to access the list of regexps at a * given time. @@ -98,7 +121,7 @@ public class IvyClient implements Runnable { int getAppPort() { return appPort ; } void sendRegexp(int id,String regexp) { - send(AddRegexp,id,regexp); /* perhaps we should perform some checking here */ + send(AddRegexp,id,regexp); /* perhaps we should perform some checking here */ } public void delRegexp(int id) {send( DelRegexp,id,"");} @@ -124,14 +147,23 @@ public class IvyClient implements Runnable { /** * closes the connexion to the peer. - * @param msg the debug information + * @param notify should I send Bye message ? * the thread managing the socket is stopped */ - void close(String msg) throws IOException { - traceDebug("(closing) "+msg); - gardefou=false; - client.interrupt(); - // socket.close(); // should I ? + void close(boolean notify) throws IOException { + traceDebug("closing connexion to "+appName); + if (doping) { pinger.stopPinging(); } + if (notify) sendBye("hasta la vista"); + stopListening(); + // bus.clientDisconnect(this); + socket.close(); // should I also close in and out ? + } + + void stopListening() { + Thread t = clientThread; + if (t==null) return; // we can be summoned to quit from two path at a time + clientThread=null; + t.interrupt(); } /** @@ -149,31 +181,27 @@ public class IvyClient implements Runnable { * the code of the thread handling the incoming messages. */ public void run() { + Thread thisThread = Thread.currentThread(); + traceDebug("Connected from "+ socket.getInetAddress().getHostName()+ ":"+socket.getPort()); String msg = null; - try { - traceDebug("Connected from "+ socket.getInetAddress().getHostName()+ ":"+socket.getPort()); - while ( gardefou ) { - try { - if ((msg=in.readLine()) != null ) { - newParseMsg(msg); - } - } catch (IvyException ie) { - ie.printStackTrace(); - } catch (InterruptedIOException ioe) { - System.out.println("I have been interrupted. I'm about to leave my thread loop"); - if (!gardefou) break; + while ( clientThread==thisThread ) { + try { + if ((msg=in.readLine()) != null ) { + if (doping && (pingerThread!=null)) pingerThread.interrupt(); + newParseMsg(msg); } + } catch (IvyException ie) { + ie.printStackTrace(); + } catch (InterruptedIOException ioe) { + System.out.println("I have been interrupted. I'm about to leave my thread loop"); + if (thisThread!=clientThread) break; + } catch (IOException e) { + if (clientThread!=thisThread) break; + traceDebug("abnormally Disconnected from "+ + socket.getInetAddress().getHostName()+":"+socket.getPort()); } - traceDebug("normally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort()); - socket.close(); - out.close(); - in.close(); - } catch (IOException e) { - traceDebug("abnormally Disconnected from "+ - socket.getInetAddress().getHostName()+":"+socket.getPort()); - } - bus.disconnect(this); - bus.removeClient(this); + } // while + traceDebug("normally Disconnected from "+ appName); } private void sendBuffer( String buffer ) throws IvyException { @@ -183,8 +211,12 @@ public class IvyClient implements Runnable { out.flush(); } catch ( IOException e ) { traceDebug("I can't send my message to this client. He probably left"); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the die applicationListeners + bus.disconnectReceived(this); try { - close("IO Exception"); + close(false); } catch (IOException ioe) { throw new IvyException("close failed"+ioe.getMessage()); } @@ -258,9 +290,32 @@ public class IvyClient implements Runnable { } from=to+1; switch (msgType) { + case Die: + traceDebug("received die Message from " + appName); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the die applicationListeners + bus.dieReceived(this,msgId.intValue()); + // makes the bus die + bus.stop(); + try { + close(false); + } catch (IOException ioe) { + throw new IvyException(ioe.getMessage()); + } + break; case Bye: - bus.die(this,msgId.intValue()); - gardefou=false; + // the peer quits + traceDebug("received bye Message from "+appName); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the die applicationListeners + bus.disconnectReceived(this); + try { + close(false); + } catch (IOException ioe) { + throw new IvyException(ioe.getMessage()); + } break; case AddRegexp: String regexp=s.substring(from,b.length); @@ -305,6 +360,16 @@ public class IvyClient implements Runnable { } bus.callCallback(this,msgId,tab); break; + case Pong: + String paramPong=s.substring(from,b.length); + traceDebug("Ping msg from "+appName+" : "+paramPong); + break; + case Ping: + // I receive a ping. I can answer a pong. + String param=s.substring(from,b.length); + traceDebug("Ping msg from "+appName+" : "+param); + sendPong(param); + break; case Error: String error=s.substring(from,b.length); traceDebug("Error msg "+msgId+" "+error); @@ -314,7 +379,7 @@ public class IvyClient implements Runnable { appPort=msgId.intValue(); if ( bus.checkConnected(this) ) { try { - close("Quitting Application already connected"); + close(false); } catch (IOException ioe) { throw new IvyException("io " + ioe.getMessage()); } @@ -325,20 +390,43 @@ public class IvyClient implements Runnable { String direct=s.substring(from,b.length); bus.directMessage( this, msgId.intValue(), direct ); break; - case Die: - gardefou=false; - bus.die(this,msgId.intValue()); - break; default: throw new IvyException("protocol error, unknown message type "+msgType); } } - private void sendDie() {send(Die,0,"");} - private void sendDie(String message) {send(Die,0,message);} + void sendPong(String s) {send(Pong,0,s);} + void sendPing(String s) {send(Ping,0,s);} + + private void sendBye() {send(Bye,0,"");} + private void sendBye(String message) {send(Bye,0,message);} + + public void sendDie() { send(Die,0,""); } + public void sendDie(String message) {send(Die,0,message);} + private InetAddress getRemoteAddress() { return socket.getInetAddress(); } + + public String toString() { + return "IvyClient "+bus.appName+":"+appName; + } private void traceDebug(String s){ - if (debug) System.out.println("-->IvyClient "+appName+"<-- "+s); + if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s); + } + + class PINGER implements Runnable { + boolean isPinging = false; + public void run() { + isPinging=true; + while (isPinging) { + try { + Thread.sleep(PINGTIMEOUT); + sendPing("are you here ?"); + } catch (InterruptedException ie) { + } + } + } + public void stopPinging() { isPinging=false; pingerThread.interrupt();} } + } // class IvyClient /* EOF */ diff --git a/src/IvyDaemon.java b/src/IvyDaemon.java index db4623e..a0560f5 100644 --- a/src/IvyDaemon.java +++ b/src/IvyDaemon.java @@ -83,7 +83,7 @@ public class IvyDaemon implements Runnable { new SubReader( new BufferedReader(new InputStreamReader(socket.getInputStream()))); } catch( IOException e ) { - System.out.println("DEBUG TCP socket reader caught an exception " + e.getMessage()); + System.out.println("IvyDaemon DEBUG TCP socket reader caught an exception " + e.getMessage()); } } } diff --git a/src/IvyException.java b/src/IvyException.java index e467771..de739ad 100755 --- a/src/IvyException.java +++ b/src/IvyException.java @@ -7,8 +7,10 @@ package fr.dgac.ivy; * @author Yannick Jestin * @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a> * + * changelog: + * 1.0.12 changed default access constructor to public access */ public class IvyException extends Exception { - IvyException(String s) { super(s); } + public IvyException(String s) { super(s); } } diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java index 14fee3d..004c704 100755 --- a/src/IvyWatcher.java +++ b/src/IvyWatcher.java @@ -22,6 +22,9 @@ import java.util.Enumeration; * thing. * * CHANGELOG: + * 1.0.12: + * - setSoTimeout on socket + * - the broadcast reader Thread goes volatile * 1.0.10: * - isInDomain() is wrong in multicast. I've removed it * - there was a remanence effect in the datagrampacket buffer. I clean it up after each message @@ -33,146 +36,122 @@ import java.util.Enumeration; class IvyWatcher implements Runnable { private static boolean debug = (System.getProperty("IVY_DEBUG")!=null); - //private Vector domainaddrList; - private boolean watcherrunning = false; private boolean isMulticastAddress = false; - private Vector broadcastListener ; private Ivy bus; /* master bus controler */ private DatagramSocket broadcast; /* supervision socket */ - // it can also be a MulticastSocket, which inherits from the previous + private String domainaddr; + private int port; + private volatile Thread listenThread; + private InetAddress group; + /** - * creates an Ivy watcher. + * creates an Ivy watcher * @param bus the bus + * @param net the domain */ - IvyWatcher(Ivy bus) throws IvyException { + IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException { this.bus = bus; - //domainaddrList = new Vector(); + this.domainaddr=domainaddr; + this.port=port; + listenThread = new Thread(this); + // create the MulticastSocket + try { + group = InetAddress.getByName(domainaddr); + broadcast = new MulticastSocket(port); + if (group.isMulticastAddress()) { + isMulticastAddress = true; + ((MulticastSocket)broadcast).joinGroup(group); + } + broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH); + } catch ( IOException e ) { + throw new IvyException("IvyWatcher I/O error" + e ); + } } /** - * the behaviour of the thread watching the UDP socket. - * this thread will stop either when the bus stops or when the - * watcherrunning will be set to false - * - * TODO: better handling of exceptions, because we juste System.err.println - * here, run cannot throw IvyException ... + * the behaviour of each thread watching the UDP socket. */ - public void run() { + public void run() { + Thread thisThread=Thread.currentThread(); + traceDebug("beginning of a watcher Thread"); byte buf[] = new byte[256]; DatagramPacket packet=new DatagramPacket(buf, 256); - int port; - traceDebug("IvyWatcher waiting for Broadcast"); - while( watcherrunning && bus.ivyRunning ) try { - broadcast.receive(packet); - String msg = new String(packet.getData()) ; - // clean up the buffer after each message - for (int i=0;i<buf.length;i++) { buf[i]=0; } - InetAddress remotehost = packet.getAddress(); - traceDebug("BUSWATCHER Receive Broadcast from "+ - remotehost.getHostName()+":"+packet.getPort()); - // we used to check if remoteaddr is in our broadcast domain list otherwise we - // ignore the broadcast - // if ( !isInDomain( remotehost ) ) continue; - StringTokenizer st = new StringTokenizer(msg); - if ( !st.hasMoreTokens()) { - System.err.println("Bad format "+msg); - continue; - } - int version = Integer.parseInt( st.nextToken() ); - if ( version != bus.PROCOCOLVERSION ) { - System.err.println("Ignoring bad protocol version broadcast"); - continue; - } - if ( ! st.hasMoreTokens()) { - System.err.println("Bad format "+msg); - continue; - } - port = Integer.parseInt( st.nextToken() ); - if ( (bus.applicationPort == port) ) continue; - traceDebug("BUSWATCHER Broadcast de " - +packet.getAddress().getHostName() - +":"+packet.getPort()+" port "+port+" version "+version); + try { + while( listenThread==thisThread ) { + int port; try { - Socket socket = new Socket( remotehost, port ); - bus.addClient(socket,false); - } catch ( UnknownHostException e ) { - System.err.println("Unkonwn host "+remotehost + e.getMessage()); - } catch ( IOException e) { - System.err.println("can't connect to "+remotehost+" port "+ - port+e.getMessage()); + broadcast.receive(packet); + String msg = new String(packet.getData()) ; + for (int i=0;i<buf.length;i++) { buf[i]=0; } // clean up the buffer after each message + InetAddress remotehost = packet.getAddress(); + traceDebug("BUSWATCHER Receive Broadcast from "+ remotehost.getHostName()+":"+packet.getPort()); + // TODO if ( !isInDomain( remotehost ) ) continue; + // TODO get rid of the StringTokenizer ? + StringTokenizer st = new StringTokenizer(msg); + if ( !st.hasMoreTokens()) { + System.err.println("Bad format "+msg); + continue; + } + int version = Integer.parseInt( st.nextToken() ); + if ( version != bus.PROCOCOLVERSION ) { + System.err.println("Ignoring bad protocol version broadcast"); + continue; + } + if ( ! st.hasMoreTokens()) { + System.err.println("Bad format "+msg); + continue; + } + port = Integer.parseInt( st.nextToken() ); + if ( (bus.applicationPort == port) ) continue; + traceDebug("BUSWATCHER Broadcast de " + +packet.getAddress().getHostName() + +":"+packet.getPort()+" port "+port+" version "+version); + try { + Socket socket = new Socket( remotehost, port ); + bus.addClient(socket,false); + } catch ( UnknownHostException e ) { + System.err.println("Unkonwn host "+remotehost + e.getMessage()); + } catch ( IOException e) { + System.err.println("can't connect to "+remotehost+" port "+ + port+e.getMessage()); + } + } catch (InterruptedIOException jii ){ + if (thisThread!=listenThread) { break ;} } - } catch (java.io.InterruptedIOException jii ){ - if (!watcherrunning) break; - System.out.println("DEBUG IvyClient: I have been interrupted"); - } catch (java.io.IOException ioe ){ - System.err.println("IvyWatcher IOException "+ ioe.getMessage() ); + } // while + } catch (java.net.SocketException se ){ + if (thisThread==listenThread) { se.printStackTrace(); } + } catch (IOException ioe ){ + ioe.printStackTrace(); } - stop(); - } // while - + traceDebug("end of a watcher thread"); + } + /** * stops the thread waiting on the broadcast socket */ void stop() { - traceDebug("broadcast listener normal shutdown"); - watcherrunning=false; - for (Enumeration e = broadcastListener.elements();e.hasMoreElements();) { - Thread t = (Thread) e.nextElement(); - t.interrupt(); - } + traceDebug("begining stopping an IvyWatcher"); + Thread t = listenThread; + listenThread=null; + broadcast.close(); + t.interrupt(); + traceDebug("ending stopping an IvyWatcher"); } - private static void sendBroadcast(String data, String domain, int port) throws IvyException { - MulticastSocket send ; + private void sendBroadcast(String data, String domain, int port) throws IvyException { try { - InetAddress group = InetAddress.getByName(domain); - send = new MulticastSocket(port); - if (group.isMulticastAddress()) { ((MulticastSocket)send).joinGroup(group); } DatagramPacket packet = new DatagramPacket( data.getBytes(), data.length(), group, port ); - send.send(packet); - } catch ( UnknownHostException e ) { - throw new IvyException("Broadcast sent on unknown network "+ - e.getMessage()); + broadcast.send(packet); } catch ( IOException e ) { throw new IvyException("Broadcast error " + e.getMessage() ); } } - void start(String net) throws IvyException { + void start() throws IvyException { String hello = bus.PROCOCOLVERSION + " " + bus.applicationPort + "\n"; - StringTokenizer st = new StringTokenizer(net,","); - broadcastListener = new Vector(); - while ( st.hasMoreTokens()) { - String s = st.nextToken() ; - String domainaddr=getDomain(s); - int port=getPort(s); - // System.out.println("Domaine: "+domainaddr+" : "+port); - try { - InetAddress group = InetAddress.getByName(domainaddr); - broadcast = new MulticastSocket(port); - // Handling of multicast address - if (group.isMulticastAddress()) { - isMulticastAddress = true; - ((MulticastSocket)broadcast).joinGroup(group); - } - } catch ( IOException e ) { - throw new IvyException("IvyWatcher I/O error" + e ); - } - /* - try { - broadcast.setSoTimeout(100); - } catch ( java.net.SocketException jns ) { - throw new IvyException("IvyWatcher setSoTimeout error" + jns.getMessage() ); - } - */ - // starts a Thread listening on the socket - watcherrunning=true; - Thread t = new Thread(this); - broadcastListener.addElement(t); - t.start(); - // notifies our arrival on each domain: protocol version + port - sendBroadcast(hello,domainaddr,port); - } + listenThread.start(); + sendBroadcast(hello,domainaddr,port); // notifies our arrival on each domain: protocol version + port } /* @@ -195,7 +174,7 @@ class IvyWatcher implements Runnable { } */ - private static String getDomain(String net) { + static String getDomain(String net) { int sep_index = net.lastIndexOf( ":" ); if ( sep_index != -1 ) { net = net.substring(0,sep_index); } try { @@ -210,7 +189,7 @@ class IvyWatcher implements Runnable { return net; } - private static int getPort(String net) { + static int getPort(String net) { int sep_index = net.lastIndexOf( ":" ); int port= ( sep_index == -1 ) ? Ivy.DEFAULT_PORT :Integer.parseInt( net.substring( sep_index +1 )); // System.out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port); diff --git a/src/Makefile b/src/Makefile index 17bd123..501cd24 100644 --- a/src/Makefile +++ b/src/Makefile @@ -4,7 +4,7 @@ JAVACOPTS = -d . -deprecation .SUFFIXES: .java .class SRCS = *.java OBJS = $(SRCS:.java=.class) - JAVAC = jikes + JAVAC = jikes -classpath $(JIKESPATH) #JAVAC = javac DOCS = ../doc/html/api diff --git a/src/Probe.java b/src/Probe.java index fdb102e..a0a2ed0 100644 --- a/src/Probe.java +++ b/src/Probe.java @@ -1,5 +1,6 @@ package fr.dgac.ivy ; import java.io.*; +import java.util.*; import gnu.getopt.Getopt; /** @@ -11,25 +12,38 @@ import gnu.getopt.Getopt; * * * Changelog: + * 1.0.12 + * - Probe can now send empty strings on keyboard input + * - rewritten with a looping thread on stdin to allow a cleaner exit on die + * message : not very good + * - processes .help, .die , .quit and .bye commands + * - it is possible to rename the JPROBE on the bus with the -n switch, it can + * circumvent name collisions during tests + * e.g: java fr.dgac.ivy.Probe -n JPROBE2 * 1.0.10 * exits on end of user input * handles multiple domains - it was a IvyWatcher problem - */ -class Probe implements IvyApplicationListener, IvyMessageListener { +class Probe implements IvyApplicationListener, IvyMessageListener, Runnable { /** * help message for the standalone program */ - public static final String helpmsg = "usage: java fr.dgac.ivy.Probe [options] [regexp]\n\t-b BUS\tspecifies the Ivy bus domain\n\t-q\tquiet, no tty output\n\t-d\tdebug\n\t-h\thelp\n\n\t regexp is a Perl5 compatible regular expression"; + public static final String helpCommands = "Available commands:\n.die CLIENTNAME sends a die message\n.bye quits the application\n.quit idem\n.list lists the available clients\n.ping sends a ping request if IVY_PING is enabled"; + public static final String helpmsg = "usage: java fr.dgac.ivy.Probe [options] [regexp]\n\t-b BUS\tspecifies the Ivy bus domain\n\t-n ivyname (default JPROBE)\n\t-q\tquiet, no tty output\n\t-d\tdebug\n\t-h\thelp\n\n\t regexp is a Perl5 compatible regular expression"; public static void main(String[] args) throws IvyException { - Getopt opt = new Getopt("Probe",args,"b:dh"); + Getopt opt = new Getopt("Probe",args,"n:b:dh"); int c; String domain=Ivy.getDomain(null); + String name="JPROBE"; while ((c = opt.getopt()) != -1) switch (c) { case 'b': domain=opt.getOptarg(); break; + case 'n': + name=opt.getOptarg(); + break; case 'd': System.setProperty("IVY_DEBUG","yes"); break; @@ -39,39 +53,95 @@ class Probe implements IvyApplicationListener, IvyMessageListener { System.exit(0); } // getopt Probe p = new Probe(); - Ivy bus=new Ivy("JPROBE","JPROBE ready",p); - try { - for (int i=opt.getOptind();i<args.length;i++) { - System.out.println("you want to subscribe to " + args[i]); - bus.bindMsg(args[i],p); - } - System.out.println("broadcasting on "+domain); - bus.start(domain); - String s; - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + Ivy bus=new Ivy(name,name+" ready",p); + for (int i=opt.getOptind();i<args.length;i++) { + System.out.println("you want to subscribe to " + args[i]); + bus.bindMsg(args[i],p); + } + System.out.println("broadcasting on "+domain); + bus.start(domain); + p.start(bus); + } + + private BufferedReader in; + private volatile Thread looperThread; + private Ivy bus; + + Probe() { + in = new BufferedReader(new InputStreamReader(System.in)); + looperThread=new Thread(this); + } + + public void start(Ivy bus) { + this.bus=bus; + looperThread.start(); + } + + public void run() { + Thread thisThread=Thread.currentThread(); + String s; + while (looperThread==thisThread) { // infinite loop on keyboard input try { - while (p.looping()) { - if ( (s=in.readLine()).length()!=0) { - System.out.println("-> Sent to " +bus.sendMsg(s)+" peers"); - } - } + s=in.readLine(); + parseCommand(s); } catch (NullPointerException e) { // EOF triggered by a ^D, for instance - bus.stop(); + bus.stop(); } catch (IOException e) { System.out.println("ioe ?"); e.printStackTrace(); - bus.stop(); + bus.stop(); + } catch (InterruptedException ie) { + System.out.println("allo ?"); } - } catch (IvyException ie) { - System.out.println("Caught an exception. quitting. "+ie.getMessage()); - } - System.exit(0); // quit ! + } //while + System.out.println("End of looping"); + System.exit(0); } - private boolean looping=true; - public boolean looping() { return looping ; } // accessor + void parseCommand(String s) throws IOException, InterruptedException { + // System.out.println("parsing the ["+s+"] (length "+s.length()+") string"); + // crude parsing of the ".xyz" commands + // TODO use regexps instends of String.lastIndexOf(String) + if (s.length()==0) { + System.out.println("-> Sent to " +bus.sendMsg(s)+" peers"); + } else if (s.lastIndexOf(".die ")>=0){ + String target=s.substring(5); + Vector v=bus.getIvyClientsByName(target); + if (v.size()==0) { + System.out.println("no Ivy client with the name \""+target+"\""); + } + for (int i=0;i<v.size();i++) { + ((IvyClient)v.elementAt(i)).sendDie(); + } + } else if (s.lastIndexOf(".ping ")>=0){ + String target=s.substring(6); + Vector v=bus.getIvyClientsByName(target); + if (v.size()==0) { + System.out.println("no Ivy client with the name \""+target+"\""); + } + for (int i=0;i<v.size();i++) { + ((IvyClient)v.elementAt(i)).sendPing("test"); + } + } else if ( (s.lastIndexOf(".quit")>=0)||(s.lastIndexOf(".bye")>=0)){ + bus.stop(); + System.exit(0); + } else if (s.lastIndexOf(".list")>=0) { + Vector v = bus.getIvyClients(); + System.out.println(v.size()+" clients on the bus"); + for (int i=0;i<v.size();i++) { + System.out.println(" -> "+((IvyClient)v.elementAt(i)).getApplicationName()); + } + } else if ( s.lastIndexOf(".help")>=0) { + System.out.println(helpCommands); + } else if ( s.charAt(0)=='.') { + System.out.println("this command is not recognized"); + System.out.println(helpCommands); + } else { + System.out.println("-> Sent to " +bus.sendMsg(s)+" peers"); + } + } // parseCommand public void connect(IvyClient client) { System.out.println(client.getApplicationName() + " connected " ); @@ -85,9 +155,11 @@ class Probe implements IvyApplicationListener, IvyMessageListener { } public void die(IvyClient client, int id) { - looping=false; - System.out.println("received die msg from " + client.getApplicationName() ); - System.exit(0); + System.out.println("received die msg from " + client.getApplicationName() ); + Thread t = looperThread; + looperThread = null; + t.interrupt(); // TODO does'nt work + System.exit(0); } public void directMessage(IvyClient client, int id, String arg) { diff --git a/tests/Makefile b/tests/Makefile index bb72f81..2dfa6c9 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,13 +1,44 @@ JIKESPATH=/usr/lib/j2re1.3/lib/rt.jar:../lib/ivy-java.jar JAVAC= jikes -classpath $(JIKESPATH) .SUFFIXES: .java .class -SRC = TestNet.java Bug.java BugTok.java +SRC = TestNet.java Bug.java BugTok.java BenchLocal.java OBJ = $(SRC:.java=.class) +JAVAOPTS = -DIVY_PING +JAVA = java $(JAVAOPTS) +JDK118=/usr/local/jdk118_v3/bin/java +JDK13=java +JDK12= /usr/local/jdk1.2.2/bin/java +JDK14= /usr/local/j2sdk1.4.0/bin/java + +#LOCALNET=10.0.0.255:3456 +LOCALNET=10.192.36.255:3456 +MULTICAST=228.1.2.3:4567 .java.class: $(JAVAC) $< all: $(OBJ) + @echo 'run make alltests' + +alltests: $(OBJ) unitaires test1 test2 jvm + +unitaires: + $(JAVA) $(JAVAOPTS) -classpath ../lib/ivy-java.jar:. fr.dgac.ivy.Ivy + +test1: $(OBJ) + $(JAVA) $(JAVAOPTS) -classpath ../lib/ivy-java.jar:. BenchLocal + $(JAVA) $(JAVAOPTS) -classpath ../lib/ivy-java.jar:. BenchLocal -b $(LOCALNET) + $(JAVA) $(JAVAOPTS) -classpath ../lib/ivy-java.jar:. BenchLocal -b $(MULTICAST) + $(JAVA) $(JAVAOPTS) -classpath ../lib/ivy-java.jar:. BenchLocal -d 100 + $(JAVA) $(JAVAOPTS) -classpath ../lib/ivy-java.jar:. BenchLocal -d 0 + +test2: $(OBJ) + $(JAVA) $(JAVAOPTS) -classpath ../lib/ivy-java.jar:. BenchLocal -t 2 -d 100 + $(JAVA) $(JAVAOPTS) -classpath ../lib/ivy-java.jar:. BenchLocal -t 2 -d 0 + +jvm: + make JAVA=$(JDK14) test1 + make JAVA=$(JDK14) test2 clean: - /bin/rm -f $(OBJ) + /bin/rm -f $(OBJ) *.class |