diff options
-rw-r--r-- | README | 14 | ||||
-rw-r--r-- | build.xml | 2 | ||||
-rwxr-xr-x | src/Ivy.java | 146 | ||||
-rwxr-xr-x | src/IvyClient.java | 48 | ||||
-rw-r--r-- | src/IvyDaemon.java | 63 | ||||
-rwxr-xr-x | src/IvyWatcher.java | 231 | ||||
-rw-r--r-- | src/Probe.java | 34 | ||||
-rw-r--r-- | src/Protocol.java | 26 | ||||
-rw-r--r-- | src/ProxyClient.java | 8 | ||||
-rw-r--r-- | src/Puppet.java | 5 | ||||
-rw-r--r-- | src/SelfIvyClient.java | 15 | ||||
-rw-r--r-- | tests/AsyncAPI.java | 7 | ||||
-rw-r--r-- | tests/Makefile | 9 | ||||
-rw-r--r-- | tests/SendNow.java | 2 |
14 files changed, 337 insertions, 273 deletions
@@ -1,7 +1,7 @@ ivy-java README -Last modification, lun jan 6 16:25:35 CET 2003 , Yannick +Last modification, Dim 13 mai 2012 15:21:10 CEST, Yannick ----------------------------------------------------------------------------- Ivy java is open source software distributed under the terms of the GNU @@ -40,14 +40,14 @@ DOCUMENTATION and in pdf format: doc/programmersguide.pdf If any of those file is missing, see the tar.gz archive on the Ivy java - web page ( http://www.tls.cena.fr/products/ivy/ivy-java.html ) + web page ( http://www2.tls.cena.fr/products/ivy/ivy-java.html ) UTILITIES ivy-java comes with a simple utility program intended to test and demonstrate its features. It is compiled into the Java archive - file. To run fr.dgac.ivy.Probe, you will need gnu.regexp and gnu.getopt, - which are available at http://www.urbanophile.com/~arenn/hacking/download.html, + file. To run fr.dgac.ivy.tools.Probe, you will need gnu.getopt, + which is available at http://www.urbanophile.com/~arenn/hacking/download.html, and put those class files in your classpath as well. Running java fr.dgac.ivy.Probe successfully is the key to knowing whether @@ -55,7 +55,7 @@ UTILITIES Ivy also comes with a simple TCP relay, allowing any script application to send text messages onto an Ivy bus. To run the relay, launch - $ java fr.dgac.ivy.IvyDaemon + $ java fr.dgac.ivy.tools.IvyDaemon Then any line sent to the local port 3456 will be forwarded as an ivy message. It can be used in shell scripts in conjunction with netcat $ echo "hello world" | nc -q 0 localhost 3456 @@ -71,7 +71,7 @@ HACKING BUG REPORTS - Send bug reports to <jestin@cena.fr>, or join the ivy mailing list + Send bug reports to <yannick.jestin@enac.fr>, or join the ivy mailing list by sending a "subscribe" message to <ivy-request@tls.cena.fr>. It helps if you can send a code sample showing the messages you were using and how you were using it. @@ -87,4 +87,4 @@ LATEST VERSION Thanks! -- - Yannick Jestin <jestin@cena.fr> + Yannick Jestin <yannick.jestin@enac.fr> @@ -1,6 +1,6 @@ <project name="ivy-java" basedir="." default="main"> - <property name="version" value="1.2.16"/> + <property name="version" value="1.2.17"/> <property name="defaultbus" value="-DIVYBUS=224.5.6.7:8910" /> <property name="src.dir" value="src"/> diff --git a/src/Ivy.java b/src/Ivy.java index f894ee4..4c6f8ff 100755 --- a/src/Ivy.java +++ b/src/Ivy.java @@ -16,6 +16,8 @@ * * CHANGELOG: * 1.2.16 + * - uses a ThreadPoolExecutor + * - sendMsg goes synchronized * - API break: getIvyClients now returns a Collection, instead of a Vector * - fixes a concurent exception in the stop() method (no more * removeClient , triggered in the SendNow test) @@ -142,6 +144,9 @@ import java.util.HashMap; import java.util.ArrayList; import java.util.Properties; import java.util.StringTokenizer; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; public class Ivy implements Runnable { @@ -157,7 +162,7 @@ public class Ivy implements Runnable { * the library version, useful for development purposes only, when java is * invoked with -DIVY_DEBUG */ - private static final String LIBVERSION ="1.2.16"; + private static final String LIBVERSION ="1.2.17"; /* * private fields @@ -176,17 +181,22 @@ public class Ivy implements Runnable { private boolean debug; private ServerSocket app; private Collection<IvyWatcher> watchers = new ArrayList<IvyWatcher>(); - private volatile Thread serverThread; // to ensure quick communication of the end + private volatile boolean keeprunning = false ; + private Thread serverThread = null ; + private Map<Integer,IvyClient> clients = Collections.synchronizedMap(new HashMap<Integer,IvyClient>()); private Map<Integer,IvyClient> half = Collections.synchronizedMap(new HashMap<Integer,IvyClient>()); + private Vector<IvyApplicationListener> ivyApplicationListenerList = new Vector<IvyApplicationListener>(); private Vector<IvyBindListener> ivyBindListenerList = new Vector<IvyBindListener>(); private Vector<Thread> sendThreads = new Vector<Thread>(); private String[] filter = null; private boolean stopped = true; - private boolean starting = false; + // private boolean starting = false; + private volatile int nbThreads = 0; protected Object readyToSend = new Object(); private boolean doSendToSelf = false; + private ExecutorService pool = null; // FIXME should not be static ? (findbugs) private static int serial = 0; @@ -332,7 +342,7 @@ public class Ivy implements Runnable { public final void start(final String domainbus) throws IvyException { if (!stopped) throw new IvyException("cannot start a bus that's already started"); - setStarting(true); // this will remain true entil one of the PacketSenders has finished + pool = Executors.newCachedThreadPool(); stopped=false; String db = domainbus; if (db == null) db = Domain.getDomain(null); @@ -376,20 +386,19 @@ public class Ivy implements Runnable { watcherId = getWBUId().replace(' ' , '*'); // no space in the watcherId // readies the rendezvous : an IvyWatcher (thread) per domain bus for (Domain dom: d) watchers.add(new IvyWatcher(this , dom.getDomainaddr() , dom.getPort())); - serverThread = new Thread(this); - serverThread.setName("Ivy TCP server Thread"); - serverThread.setDaemon(true); - serverThread.start(); + keeprunning = true ; + pool.execute(this); + // sends the broadcasts and listen to incoming connexions for (IvyWatcher iw: watchers) iw.doStart(); } - private void waitForRemote(String s) { try { - while (starting==true) { + while (nbThreads > 0) { + traceDebug("I'm waiting before "+s+", a connecting tread is in progress"); Thread.sleep(GRACEDELAY); - traceDebug("I'm waiting before "+s+", a starting tread is in progress"); + traceDebug("I'm done waiting before "+s); } } catch (InterruptedException ie) { // should not happen, and it's not a problem anyway @@ -403,34 +412,22 @@ public class Ivy implements Runnable { waitForRemote("stopping"); if (stopped) return; stopped = true; - serverThread = null; + keeprunning = false; traceDebug("beginning stopping"); try { - // stopping the serverThread - Thread t = serverThread; - if (t != null) { - t.interrupt(); // The serverThread might be stopped even before having been created - // System.out.println("IZZZ joining " + t); - try { t.join(); } catch ( InterruptedException ie ) { - ie.printStackTrace(); - } - } synchronized (lockApp) { app.close(); } + // stopping the IvyWatchers for (IvyWatcher iw: watchers) iw.doStop(); watchers.clear(); // stopping the remaining IvyClients synchronized (clients) { - for (IvyClient c : clients.values()) { - if (c != null) { - c.close(true); - // removeClient(c); // useless ? - } - } + for (IvyClient c : clients.values()) if (c != null) c.close(true); } } catch (IOException e) { traceDebug("IOexception Stop "); } + pool.shutdown(); traceDebug("end stopping"); } @@ -483,8 +480,10 @@ public class Ivy implements Runnable { * @param message A String which will be compared to the regular * expressions of the different clients * @return returns the number of messages actually sent + * + * since 1.2.16 goes synchronized to avoid concurrent access */ - public final int sendMsg(final String message) throws IvyException { + synchronized public final int sendMsg(final String message) throws IvyException { int count = 0; waitForRemote("sending"); synchronized (lock) { @@ -811,6 +810,12 @@ public class Ivy implements Runnable { public final String getWBUId() { return "ID<" + appName + myserial + ":" + nextId() + ":" + generator.nextInt() + ">"; } + + + @Override public String toString() { + return "bus <"+appName+">[port:"+applicationPort+",serial:"+myserial+"]"; + } + private synchronized long nextId() { return current++; } /////////////////////////////////////////////////////////////////: @@ -819,11 +824,19 @@ public class Ivy implements Runnable { // /////////////////////////////////////////////////////////////////: - protected IvyClient createIvyClient(Socket s , int port, boolean domachin) throws IOException { - setStarting(true); // this one will stop when the client has finished starting + /** + * @return false if the client has not been created, true otherwise + */ + protected boolean createIvyClient(Socket s , int port, boolean domachin) throws IOException { IvyClient i = new IvyClient(this , s , port , domachin); - i.doStart(); - return i; + try { + pool.execute(i); + } catch (RejectedExecutionException ree) { + // in another thread, the pool is shut down + traceDebug("in another thread, the pool is shut down"); + return false; + } + return true; } @@ -846,7 +859,6 @@ public class Ivy implements Runnable { synchronized (clients) { clients.put(c.getClientKey() , c); } - setStarting(false); traceDebug("added " + c + " in clients: " + getClientNames(clients)); } else { traceDebug("not adding "+c+" in clients, double connexion detected, removing lowest one"); @@ -860,25 +872,21 @@ public class Ivy implements Runnable { } protected synchronized void addHalf(IvyClient c) { - //synchronized(lock){ - synchronized (half) { half.put(c.getClientKey() , c); } - //} + synchronized (half) { half.put(c.getClientKey() , c); } traceDebug("added " + c + " in half: " + getClientNames(half)); } protected synchronized void removeHalf(IvyClient c) { - //synchronized(lock) { - if (half == null||c == null) return; - synchronized (half) { - half.remove(c.getClientKey()); - } - //} + if (half == null||c == null) return; + synchronized (half) { + half.remove(c.getClientKey()); + } traceDebug("removed " + c + " from half: " + getClientNames(half)); } private synchronized IvyClient searchPeer(IvyClient ic) { synchronized (clients) { - for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.equals(ic))) return peer; + for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.myEquals(ic))) return peer; } return null; } @@ -888,26 +896,29 @@ public class Ivy implements Runnable { */ public void run() { traceDebug("service thread started"); // THREADDEBUG - Thread thisThread = Thread.currentThread(); + serverThread = Thread.currentThread(); + serverThread.setName("Ivy TCP server Thread"); + //serverThread.setDaemon(true); Socket socket = null; - while ( thisThread == serverThread ){ + while ( keeprunning ){ try { synchronized (this) { //System.out.println("DEBUG stopped: "+stopped); - if ((thisThread != serverThread)||stopped) break; // early disconnexion + if ( (!keeprunning) || stopped ) break; // early disconnexion } synchronized (lockApp) { socket = app.accept(); // TODO I can't synchronize on (this) in the run } synchronized (this) { - if ((thisThread != serverThread)||stopped) break; // early disconnexion - createIvyClient(socket , 0 , true); // the peer called me + if ( (!keeprunning) || stopped ) break; // early disconnexion + // the peer called me + if ( ! createIvyClient(socket , 0 , true) ) break; } } catch (InterruptedIOException ie) { // traceDebug("server socket was interrupted. good"); - if (thisThread != serverThread) break; + if ( !keeprunning ) break; } catch( IOException e ) { - if (serverThread == thisThread) { + if ( keeprunning ) { traceDebug("Error IvyServer exception: " + e.getMessage()); System.out.println("Ivy server socket reader caught an exception " + e.getMessage()); System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)"); @@ -920,34 +931,33 @@ public class Ivy implements Runnable { traceDebug("service thread stopped"); // THREADDEBUG } - protected String getAppName() { return appName; } - protected int getAppPort() { return applicationPort; } - protected String getReadyMessage() { return ready_message; } - protected boolean getProtectNewlines() { return doProtectNewlines; } + String getAppName() { return appName; } + int getAppPort() { return applicationPort; } + String getReadyMessage() { return ready_message; } + boolean getProtectNewlines() { return doProtectNewlines; } + String getWatcherId() { return watcherId; } + int getBufferSize() { return bufferSize; } + int getSerial() { return myserial; } + ExecutorService getPool() { return pool; } - protected void setStarting(boolean s) { + protected void pushThread(String reason) { synchronized(readyToSend) { - traceDebug("setStarting "+s); - starting = s; + nbThreads++ ; + //System.out.println("DEBUG PUSH "+this+" -- threads: "+nbThreads + "; reason: "+reason); } } - protected String getWatcherId() { return watcherId; } - protected int getBufferSize() { return bufferSize; } + protected void popThread(String reason) { + synchronized(readyToSend) { + nbThreads-- ; + //System.out.println("DEBUG POP "+this+" -- threads: "+nbThreads + "reason: "+reason); + } + } - protected int getSerial() { return myserial; } private void traceDebug(String s){ if (debug) System.out.println("-->Ivy[" + myserial + "]<-- " + s); } - // stuff to guarantee that all the treads have left - synchronized void registerThread(Thread t) { sendThreads.addElement(t); } - synchronized void unRegisterThread(Thread t) { sendThreads.removeElement(t); } - synchronized Thread getOneThread() { - if (sendThreads.size() == 0) return null; - return (Thread) sendThreads.firstElement(); - } - // a small private method for debbugging purposes private String getClientNames(Map<Integer , IvyClient> t) { StringBuffer s = new StringBuffer(); diff --git a/src/IvyClient.java b/src/IvyClient.java index aadbd0b..3cef803 100755 --- a/src/IvyClient.java +++ b/src/IvyClient.java @@ -12,6 +12,8 @@ * created for each remote client. * * CHANGELOG: + * 1.2.17 + * - fixes a synchronization issue in sendMsg * 1.2.16 * - now uses the synchronized wrappers of the Java API for all collections * 1.2.14 @@ -103,7 +105,7 @@ import java.util.Vector; import java.util.regex.*; import java.util.Collection; -public class IvyClient extends Thread { +public class IvyClient implements Runnable { // private variables private final static int MAXPONGCALLBACKS = 10; @@ -118,11 +120,10 @@ public class IvyClient extends Thread { private BufferedReader in; private OutputStream out; private int remotePort=0; - private volatile Thread clientThread;// volatile to ensure the quick communication + private volatile boolean keepgoing = true;// volatile to ensure the atomicity private Integer clientKey; private boolean discCallbackPerformed = false; private String remoteHostname="unresolved"; - private static ThreadGroup clientsThreadGroup = new ThreadGroup("Ivy clients threadgroup"); // protected variables String appName="none"; @@ -149,26 +150,17 @@ public class IvyClient extends Thread { synchronized(bus) { bus.addHalf(this); // register a half connexion sendSchizo(); - // the registering (handShake) will take place at the reception of the regexps... + // the handShake will take place at the reception of the regexps. } } remoteHostname = socket.getInetAddress().getHostName(); - clientThread = new Thread(clientsThreadGroup, this); // clientThread handles the incoming traffic - clientThread.setName("Ivy client thread to "+remoteHostname+":"+remotePort); - } - - /* removed from the constructor, to avoid Mulithread correctnaess issuses - * see http://findbugs.sourceforge.net/bugDescriptions.html#SC_START_IN_CTOR - */ - protected void doStart() { - clientThread.start(); } // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion - private void sendSchizo() throws IOException { + synchronized private void sendSchizo() throws IOException { traceDebug("sending our service port "+bus.getAppPort()); Map<Integer,String> tosend=bus.getSelfIvyClient().regexpsText; synchronized (tosend) { @@ -273,9 +265,9 @@ public class IvyClient extends Thread { protected int sendMsg(String message) { int count = 0; - for (Integer key : regexps.keySet()) { - Pattern regexp = regexps.get(key); - synchronized (regexp) { + synchronized (regexps) { + for (Integer key : regexps.keySet()) { + Pattern regexp = regexps.get(key); Matcher m = regexp.matcher(message); if (m.matches()) { count++; // match @@ -294,10 +286,9 @@ public class IvyClient extends Thread { /* interrupt the listener thread */ private void stopListening() { - Thread t = clientThread; - if (t==null) return; // we can be summoned to quit from two path at a time - clientThread=null; - t.interrupt(); + if ( !keepgoing ) return; // we can be summoned to quit from two path at a time + keepgoing = false; + interrupt(); } /* @@ -330,6 +321,7 @@ public class IvyClient extends Thread { public void run() { traceDebug("Thread started"); Thread thisThread = Thread.currentThread(); + thisThread.setName("Ivy client thread to "+remoteHostname+":"+remotePort); String msg = null; try { traceDebug("connection established with "+ @@ -337,10 +329,10 @@ public class IvyClient extends Thread { } catch (Exception ie) { traceDebug("Interrupted while resolving remote hostname"); } - while (clientThread==thisThread) { + while ( keepgoing ) { try { if ((msg=in.readLine()) != null ) { - if (clientThread!=thisThread) break; // early stop during readLine() + if ( !keepgoing ) break; // early stop during readLine() if (!newParseMsg(msg)) { close(true); break; @@ -354,9 +346,9 @@ public class IvyClient extends Thread { ie.printStackTrace(); } catch (InterruptedIOException ioe) { traceDebug("I have been interrupted. I'm about to leave my thread loop"); - if (thisThread!=clientThread) break; + if ( !keepgoing) break; } catch (IOException e) { - if (clientThread!=null) { + if ( !keepgoing ) { traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort()); } break; @@ -370,8 +362,8 @@ public class IvyClient extends Thread { traceDebug("Thread stopped"); } - @Override public void interrupt(){ - super.interrupt(); + void interrupt(){ + Thread.currentThread().interrupt(); try { if (socket!=null) socket.close(); } catch (IOException ioe) { @@ -577,7 +569,7 @@ public class IvyClient extends Thread { try { bus.addHalf(this); sendSchizo(); - bus.handShake(this); + bus.handShake(this); // } catch (IOException ioe) { throw new IvyException(ioe.toString()); } diff --git a/src/IvyDaemon.java b/src/IvyDaemon.java index f99202d..40d0535 100644 --- a/src/IvyDaemon.java +++ b/src/IvyDaemon.java @@ -12,6 +12,10 @@ * (c) CENA * * changelog: + * 1.2.16 + * - now uses a Thread Pool Executor + * - now parses the messages: if the message is ".die IvyDaemon", quits the + * bus * 1.2.14 * - remove the Thread.start() from the constructor, to avoid mulithread issues * see * http://findbugs.sourceforge.net/bugDescriptions.html#SC_START_IN_CTOR @@ -35,14 +39,19 @@ import fr.dgac.ivy.* ; import java.io.*; import java.net.*; import java.util.Properties ; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import gnu.getopt.Getopt; -public class IvyDaemon implements Runnable { +public class IvyDaemon implements Runnable, IvyApplicationListener { private ServerSocket serviceSocket; private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; - private volatile Thread daemonThread;// volatile to ensure the quick communication + private Thread daemonThread;// volatile to ensure the quick communication + private volatile boolean keeprunning = false ;// volatile to ensure the quick communication private Ivy bus; + private ExecutorService pool = null; + private static volatile int serial = 0; public static final int DEFAULT_SERVICE_PORT = 3456 ; public static final String DEFAULTNAME = "IvyDaemon"; @@ -88,52 +97,56 @@ public class IvyDaemon implements Runnable { if (!quiet) System.out.println("broadcasting on "+Domain.domains(domain)); bus.start(domain); if (!quiet) System.out.println("listening on "+servicePort); - new IvyDaemon(bus,servicePort).doStart(); + new IvyDaemon(bus,servicePort); } public IvyDaemon(Ivy bus,int servicePort) throws IOException { this.bus=bus; + bus.addApplicationListener(this); serviceSocket = new ServerSocket(servicePort) ; - daemonThread=new Thread(this); - daemonThread.setName("Ivy Daemon tool thread"); + pool = Executors.newCachedThreadPool(); + keeprunning = true ; + pool.execute(this); } - protected void doStart() { - daemonThread.start(); - } /* * the service socket reader. * it could be a thread, but as long as we've got one .... */ public void run() { - Thread thisThread = Thread.currentThread(); + daemonThread = Thread.currentThread(); + daemonThread.setName("Ivy Daemon tool thread"); traceDebug("Thread started"); - while ( daemonThread==thisThread ) { + while ( keeprunning ) { + /* there is no way out of here, except ^C */ try { new SubReader(serviceSocket.accept()); } catch( IOException e ) { traceDebug("TCP socket reader caught an exception " + e.getMessage()); } } + System.out.println("outta here"); traceDebug("Thread stopped"); + pool.shutdown(); } - class SubReader extends Thread { + class SubReader implements Runnable { BufferedReader in; SubReader(Socket socket) throws IOException { in=new BufferedReader(new InputStreamReader(socket.getInputStream())); - SubReader.this.start(); + // setName("Subreader "+serial++); + pool.execute(SubReader.this); } public void run() { traceDebug("Subreader Thread started"); - String msg = null; try { while (true) { - msg=in.readLine(); + String msg=in.readLine(); if (msg==null) break; + if (msg.compareTo(".die IvyDaemon") == 0) break; try { bus.sendMsg(msg); } catch (IvyException ie) { @@ -146,9 +159,31 @@ public class IvyDaemon implements Runnable { throw new RuntimeException(); } traceDebug("Subreader Thread stopped"); + try { + in.close(); + } catch (IOException ioe) { + // do nothing + } + pool.shutdown(); + bus.stop(); + System.exit(0); } } + public void connect( IvyClient client ) { } + public void disconnect( IvyClient client ) { } + public void die( IvyClient client, int id, String msgarg) { + keeprunning = false; + if ( daemonThread != null ) daemonThread.interrupt(); + try { + serviceSocket.close(); + } catch (IOException ioe) { + // I don't care + } + } + + public void directMessage( IvyClient client, int id,String msgarg ) {} + private static void traceDebug(String s){ if (debug) System.out.println("-->IvyDaemon "+name+"<-- "+s); } diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java index 1dcaab0..c9a661f 100755 --- a/src/IvyWatcher.java +++ b/src/IvyWatcher.java @@ -91,13 +91,15 @@ import java.util.Map; import java.util.HashMap; import java.util.regex.*; -class IvyWatcher extends Thread { +class IvyWatcher implements Runnable { private static boolean debug = (System.getProperty("IVY_DEBUG")!=null); private boolean alreadyIgnored = false; private Ivy bus; /* master bus controler */ private DatagramSocket broadcast; /* supervision socket */ private int port; - private volatile Thread listenThread = null; + private String domainaddr; + private volatile boolean keeprunning = false; + private Thread listenThread = null; private InetAddress group; // FIXME should not be static ? (findbugs) private static int serial=0; @@ -113,11 +115,10 @@ class IvyWatcher extends Thread { */ IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException { this.bus = bus; - this.port=port; + this.port = port; + this.domainaddr = domainaddr; busWatcherId=bus.getWatcherId(); - listenThread = new Thread(this); - listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port); - listenThread.setDaemon(true); + keeprunning = true ; // create the MulticastSocket try { group = InetAddress.getByName(domainaddr); @@ -135,98 +136,30 @@ class IvyWatcher extends Thread { */ public void run() { traceDebug("Thread started"); // THREADDEBUG - Thread thisThread=Thread.currentThread(); + listenThread=Thread.currentThread(); + listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port); + // listenThread.setDaemon(true); // not possible in the treadpool ? FIXME traceDebug("beginning of a watcher Thread"); - InetAddress remotehost=null; try { - int remotePort=0; - while( listenThread==thisThread ) { + while( keeprunning ) { try { byte buf[] = new byte[256]; DatagramPacket packet=new DatagramPacket(buf,buf.length); broadcast.receive(packet); - bus.setStarting(true); // someone's coming - if (listenThread!=thisThread) break; // I was summoned to leave during the receive + bus.pushThread("UDP packet received"); + if ( !keeprunning ) break; // I was summoned to leave during the receive String msg = new String(buf,0,packet.getLength()); - String remotehostname=null; - try { - remotehost = packet.getAddress(); - remotehostname = remotehost.getHostName(); - Matcher m = recoucou.matcher(msg); - if (!m.matches()) { - System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort()); - continue; - } - int version = Integer.parseInt(m.group(1)); - if ( version < Protocol.PROTOCOLMINIMUM ) { - System.err.println("Ignoring bad format broadcast from "+ - remotehostname+":"+packet.getPort() - +" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum"); - continue; - } - remotePort = Integer.parseInt(m.group(2)); - if (bus.getAppPort()==remotePort) { // if (same port number) - if (busWatcherId!=null) { - traceDebug("there's an appId: "+m.group(3)); - String otherId=m.group(3); - String otherName=m.group(4); - if (busWatcherId.compareTo(otherId)==0) { - // same port #, same bus Id, It's me, I'm outta here - traceDebug("ignoring my own broadcast"); - bus.setStarting(false); - continue; - } else { - // same port #, different bus Id, it's another agent - // implementing the Oh Soooo Cool watcherId undocumented - // unprotocolar Ivy add on - traceDebug("accepting a broadcast from a same port by "+otherName); - } - } else { - // there's no watcherId in the broacast. I fall back to a - // crude strategy: I ignore the first broadcast with the same - // port number, and accept the following ones - if (alreadyIgnored) { - traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort() - +" on my port number ("+remotePort+"), it's probably someone else"); - } else { - alreadyIgnored=true; - traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort() - +" on my port number ("+remotePort+"), it's probably me"); - continue; - } - } - } // end if (same port #) - traceDebug("broadcast accepted from " +remotehostname - +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version); - if (!alreadyBroadcasted(remotehost.toString(),remotePort)) { - traceDebug("no known agent originating from " + remotehost + ":" + remotePort); - try { - Socket s = new Socket(remotehost,remotePort); - s.setReceiveBufferSize(bus.getBufferSize()); - //System.out.println("MY DEBUG - buffer size="+s.getReceiveBufferSize()); - s.setTcpNoDelay(true); - bus.createIvyClient(s,remotePort,false); - } catch ( java.net.ConnectException jnc ) { - traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus"); - } - } else { - traceDebug("there is already a request originating from " + remotehost + ":" + remotePort); - } - } catch (NumberFormatException nfe) { - System.err.println("Ignoring bad format broadcast from "+remotehostname); - continue; - } catch ( UnknownHostException e ) { - System.err.println("Unkonwn host "+remotehost +","+e.getMessage()); - } catch ( IOException e) { - System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage()); - e.printStackTrace(); - } + boolean b = (parsePacket(packet, msg)); + bus.popThread("UDP packet processed"); + if (b) continue; else break; } catch (InterruptedIOException jii ){ - if (thisThread!=listenThread) { break ;} + // System.out.println("WTF UDP packet interrupted"); + // another thread took place, not important + if ( !keeprunning ) { break ;} } } // while } catch (java.net.SocketException se ){ - if (thisThread==listenThread) { + if ( keeprunning ) { traceDebug("socket exception, continuing anyway on other Ivy domains "+se); } } catch (IOException ioe ){ @@ -235,9 +168,99 @@ class IvyWatcher extends Thread { traceDebug("Thread stopped"); // THREADDEBUG } - @Override public void interrupt(){ - super.interrupt(); - broadcast.close(); + /** + * parses the content of a received packet. + * @return true if the watcher can keep looping (continue), false if there's + * a big problem (break). + * + * first checks if the message structure is correct + * then checks if it's our own broadcasts + * then checks if there's already a concurrent connexion in progress + * if it's ok, creates a new IvyClient + * + */ + private boolean parsePacket(DatagramPacket packet, String msg) { + // System.out.println("TODO Y - parse In"); + int remotePort=0; + InetAddress remotehost = null; + String remotehostname = null; + try { + remotehost=packet.getAddress(); + remotehostname=remotehost.getHostName(); + Matcher m = recoucou.matcher(msg); + // is it a correct broadcast packet ? + if (!m.matches()) { + System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort()); + return true; + } + // is it the correct protocol version ? + int version = Integer.parseInt(m.group(1)); + if ( version < Protocol.PROTOCOLMINIMUM ) { + System.err.println("Ignoring bad format broadcast from "+ + remotehostname+":"+packet.getPort() + +" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum"); + return true; + } + // is it my own broadcast ? + remotePort = Integer.parseInt(m.group(2)); + if (bus.getAppPort()==remotePort) { // if (same port number) + if (busWatcherId!=null) { + traceDebug("there's an appId: "+m.group(3)); + String otherId=m.group(3); + String otherName=m.group(4); + if (busWatcherId.compareTo(otherId)==0) { + // same port #, same bus Id, It's me, I'm outta here + traceDebug("ignoring my own broadcast"); + return true; + } else { + // same port #, different bus Id, it's another agent + // implementing the Oh Soooo Cool watcherId undocumented + // unprotocolar Ivy add on + traceDebug("accepting a broadcast from a same port by "+otherName); + } + } else { + // there's no watcherId in the broacast. I fall back to a + // crude strategy: I ignore the first broadcast with the same + // port number, and accept the following ones + if (alreadyIgnored) { + traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort() + +" on my port number ("+remotePort+"), it's probably someone else"); + } else { + alreadyIgnored=true; + traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort() + +" on my port number ("+remotePort+"), it's probably me"); + return true; + } + } + } // end if (same port #) + + // it's definitively not me, let's shake hands ! + traceDebug("broadcast accepted from " +remotehostname + +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version); + + if (!alreadyBroadcasted(remotehost.toString(),remotePort)) { + traceDebug("no known agent originating from " + remotehost + ":" + remotePort); + try { + Socket s = new Socket(remotehost,remotePort); + s.setReceiveBufferSize(bus.getBufferSize()); + s.setTcpNoDelay(true); + if (!bus.createIvyClient(s,remotePort,false)) return false ; + } catch ( java.net.ConnectException jnc ) { + traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus"); + } + } else { + traceDebug("there is already a request originating from " + remotehost + ":" + remotePort); + } + } catch (NumberFormatException nfe) { + System.err.println("Ignoring bad format broadcast from "+remotehostname); + return true; + } catch ( UnknownHostException e ) { + System.err.println("Unkonwn host "+remotehost +","+e.getMessage()); + } catch ( IOException e) { + System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage()); + e.printStackTrace(); + } + return true; } /** @@ -245,13 +268,9 @@ class IvyWatcher extends Thread { */ synchronized void doStop() { traceDebug("begining stopping"); - bus.setStarting(true); - Thread t = listenThread; - listenThread=null; - interrupt(); + keeprunning = false; + if (listenThread != null) listenThread.interrupt(); broadcast.close(); - if (t!=null) { t.interrupt(); } // it might not even have been created - bus.setStarting(false); traceDebug("stopped"); } @@ -261,16 +280,18 @@ class IvyWatcher extends Thread { DatagramPacket packet; String data; Ivy bus; + public PacketSender(String data, Ivy b) { this.data=data; bus = b; packet=new DatagramPacket(data.getBytes(),data.length(),group,port); - Thread t = new Thread((PacketSender.this)); - t.setName("Ivy Packet sender"); - t.start(); + bus.getPool().execute( PacketSender.this ); } + public void run() { + bus.pushThread("packet sender started"); traceDebug("PacketSender thread started"); // THREADDEBUG + Thread.currentThread().setName("Ivy Packet sender"); try { broadcast.send(packet); } catch (InterruptedIOException e) { @@ -279,15 +300,13 @@ class IvyWatcher extends Thread { e.printStackTrace(); traceDebug("IO interrupted during the broadcast. Do nothing"); } catch ( IOException e ) { - if (listenThread!=null) { - System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway"); - // cannot throw new IvyException in a run ... - e.printStackTrace(); - } + System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway"); + // cannot throw new IvyException in a run ... + e.printStackTrace(); } try { Thread.sleep(100); } catch (InterruptedException ie ){ } - bus.setStarting(false); // one of the senders has finished its work, plus extra time traceDebug("PacketSender thread stopped"); // THREADDEBUG + bus.popThread("packet sender finished"); // one of the senders has finished its work, plus extra time } } @@ -295,8 +314,8 @@ class IvyWatcher extends Thread { // String hello = Ivy.PROTOCOLVERSION + " " + bus.getAppPort() + "\n"; String hello = Protocol.PROTOCOLVERSION + " " + bus.getAppPort() + " "+busWatcherId+" "+bus.getSelfIvyClient().getApplicationName()+"\n"; if (broadcast==null) throw new IvyException("IvyWatcher PacketSender null broadcast address"); + bus.getPool().execute(this); new PacketSender(hello,bus); // notifies our arrival on each domain: protocol version + port - listenThread.start(); } /* diff --git a/src/Probe.java b/src/Probe.java index f362b38..a3816f6 100644 --- a/src/Probe.java +++ b/src/Probe.java @@ -9,6 +9,8 @@ * Changelog: * 1.2.16 * - now uses the synchronized wrappers of the Java API for all collections + * - gets read of Thread mumbo jumbo, and no more System.exit() on die, we + * prefer System.in.close() ! * 1.2.14 * - uses the "new" for: loop construct of Java5 * - throws RuntimeException instead of System.exit(), allows code reuse @@ -67,7 +69,7 @@ import java.util.*; import gnu.getopt.Getopt; import java.util.regex.*; -public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBindListener, Runnable { +public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBindListener { public static final String helpCommands = "Available commands:\n"+ ".die CLIENT\t\t\t* sends a die message\n"+ @@ -146,7 +148,6 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin } private BufferedReader in; - private volatile Thread looperThread; private Ivy bus; private boolean timestamp,quiet,debug,exitOnDie=false, encore=true; private static Pattern directMsgRE, timeCountRE; @@ -169,25 +170,18 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin this.debug = debug; } - public void start(Ivy bus) throws IvyException { - if (looperThread!=null) throw new IvyException("Probe already started"); - this.bus=bus; - bus.addApplicationListener(this); - looperThread=new Thread(this); - looperThread.setName("Ivy Probe looper thread on readline"); - looperThread.start(); - } public void setExitOnDie(boolean b) { exitOnDie=b; } - public void run() { - traceDebug("Probe Thread started"); - Thread thisThread=Thread.currentThread(); + public void start(Ivy bus) throws IvyException { + this.bus=bus; + bus.addApplicationListener(this); + traceDebug("Probe Loop started"); String s; SelfIvyClient sic = bus.getSelfIvyClient(); println(sic.getApplicationName()+ " ready, type .help and return to get help"); // "infinite" loop on keyboard input - while (encore && looperThread==thisThread) { + while ( encore ) { try { s=in.readLine(); if (s==null) break; @@ -202,7 +196,7 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin } println("End of input. Good bye !"); bus.stop(); - traceDebug("Probe Thread stopped"); + traceDebug("Probe Loop stopped"); } boolean parseCommand(String s) throws IOException { @@ -351,7 +345,15 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin public void die(IvyClient client, int id,String msgarg) { println("received die msg from " + client.getApplicationName() +" with the message: "+msgarg+", good bye"); /* I cannot stop the readLine(), because it is native code ?! */ - if (exitOnDie) System.exit(0); + if (exitOnDie) { + encore = false; + try { + System.in.close(); + } catch (IOException ie) { + // perfect ! + } + //System.exit(0); + } } public void directMessage(IvyClient client, int id, String arg) { diff --git a/src/Protocol.java b/src/Protocol.java index 9751661..ddd0161 100644 --- a/src/Protocol.java +++ b/src/Protocol.java @@ -13,17 +13,17 @@ package fr.dgac.ivy; enum Protocol { - BYE(0), /* end of the peer */ - ADDREGEXP(1), /* the peer adds a regexp */ - MSG(2), /* the peer sends a message */ - ERROR(3), /* error message */ - DELREGEXP(4), /* the peer removes one of his regex */ - ENDREGEXP(5), /* no more regexp in the handshake */ - SCHIZOTOKEN(6), /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */ - DIRECTMSG(7), /* the peer sends a direct message */ - DIE(8), /* the peer wants us to quit */ - PING(9), - PONG(10); + BYE(0), /* end of the peer */ + ADDREGEXP(1), /* the peer adds a regexp */ + MSG(2), /* the peer sends a message */ + ERROR(3), /* error message */ + DELREGEXP(4), /* the peer removes one of his regex */ + ENDREGEXP(5), /* no more regexp in the handshake */ + SCHIZOTOKEN(6), /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */ + DIRECTMSG(7), /* the peer sends a direct message */ + DIE(8), /* the peer wants us to quit */ + PING(9), + PONG(10); final static char STARTARG = '\u0002';/* begin of arguments */ final static char ENDARG = '\u0003'; /* end of arguments */ @@ -50,5 +50,7 @@ enum Protocol { if (p.value() == i) return p; throw new IvyException("protocol magic number "+i+" not known"); } - + + @Override public String toString() { return ""+value; } + } diff --git a/src/ProxyClient.java b/src/ProxyClient.java index 92f2c75..e6e8606 100644 --- a/src/ProxyClient.java +++ b/src/ProxyClient.java @@ -176,7 +176,7 @@ class ProxyClient extends Ivy { * protocol and forward everything to the proxies. * TODO: remember everything in case a new proxy client comes ? */ - protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException { + protected boolean createIvyClient(Socket s,int port, boolean domachin) throws IOException { IvyClient i; // TODO si c'est un puppet, je ne dois pas creer de Ghost // voir meme me deconnecter du biniou ? @@ -185,8 +185,8 @@ class ProxyClient extends Ivy { // this new Ivy agent is in fact one of my puppets ... System.out.println("not Ghosting this (probable) Puppet Ivy agent"); i= new IvyClient(this,s,port,domachin); - i.start(); - return i; + p.bus.getPool().execute(i); + return true; } } String key = getWBUId(); @@ -196,7 +196,7 @@ class ProxyClient extends Ivy { while ((ghostId=id.get(key))==null) { Thread.sleep(200); } Ghost g = new Ghost(this,s,port,domachin,ghostId,this); ghosts.put(ghostId,g); - return g; + return true; } catch (InterruptedException ie) { ie.printStackTrace(); } System.out.println("error waiting"); throw new RuntimeException(); diff --git a/src/Puppet.java b/src/Puppet.java index c307e01..ee26cb2 100644 --- a/src/Puppet.java +++ b/src/Puppet.java @@ -139,8 +139,9 @@ class Puppet { static class PuppetIvy extends Ivy { PuppetIvy(String name,String ready,IvyApplicationListener ial){super(name,ready,ial);} - protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException { - return new PuppetIvyClient(PuppetIvy.this,s,port,domachin); + protected boolean createIvyClient(Socket s,int port, boolean domachin) throws IOException { + new PuppetIvyClient(PuppetIvy.this,s,port,domachin); + return true; } int getAP() {return getAppPort();} } diff --git a/src/SelfIvyClient.java b/src/SelfIvyClient.java index bd06ee1..2751c92 100644 --- a/src/SelfIvyClient.java +++ b/src/SelfIvyClient.java @@ -192,13 +192,16 @@ public class SelfIvyClient extends IvyClient { this.cb=cb; this.c=c; args=a; - t=new Thread(Runner.this); - bus.registerThread(t); - t.setName("Ivy Runner Thread to execute an async callback"); - t.start(); - bus.unRegisterThread(t); + //t=new Thread(Runner.this); + //bus.registerThread(t); + //t.start(); + bus.getPool().execute(Runner.this); + //bus.unRegisterThread(t); + } + public void run() { + Thread.currentThread().setName("Ivy Runner Thread to execute an async callback"); + cb.receive(c,args); } - public void run() { cb.receive(c,args); } } // class Runner private void traceDebug(String s){ diff --git a/tests/AsyncAPI.java b/tests/AsyncAPI.java index e174929..1f07565 100644 --- a/tests/AsyncAPI.java +++ b/tests/AsyncAPI.java @@ -43,8 +43,7 @@ class AsyncAPI { public AsyncAPI(int nb,String domain,int d, boolean v,boolean async) throws IvyException { verbose=v; nbpacket=nb; - name = "MSreceive"; - bus = new Ivy(name,null, null); + bus = new Ivy(RECEIVENAME,null, null); wait = d; delay=new DelayAnswer(); if (async) re = bus.bindAsyncMsg(TOSUBSCRIBE,delay,BindType.ASYNC); @@ -59,7 +58,7 @@ class AsyncAPI { static Object truc = new Object(); - int count=0,total=0; + volatile int count=0,total=0; int status = 0; synchronized void huh(IvyClient ic, String[] args) { @@ -95,7 +94,7 @@ class AsyncAPI { Thread.sleep(wait); System.out.println("RECEIVE Finished Sleeping"); } catch (InterruptedException ie) { - ie.printStackTrace(); + System.out.println("RECEIVE Sleeping interrupted, not a problem"); } } } diff --git a/tests/Makefile b/tests/Makefile index b313a50..d7dd85b 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -26,13 +26,14 @@ all: $(JAVAC) -d classes $(JAVACOPTS) $(CLASSPATH) $(SRC) @echo "all modules built. run make run" -run: sendnow sendnowself nl api filter unitaires probe stop request test1 async test2 +run: sendnowself nl api filter unitaires probe stop request test1 test2 async +fail: sendnow request: @echo "*****************************" @echo "TEST REQUEST" @echo "*****************************" - $(JAVA) $(DEBUG) $(DEBUG) $(JAVAOPTS) $(CLASSPATH) Request $(DOMAIN) + $(JAVA) $(DEBUG) $(JAVAOPTS) $(CLASSPATH) Request $(DOMAIN) @echo "*****************************" @echo "TEST REQUEST successful" @echo "*****************************" @@ -42,7 +43,7 @@ sendnowself: @echo "*****************************" @echo "TEST SENDNOW SELF" @echo "*****************************" - $(JAVA) $(DEBUG) $(DEBUG) $(JAVAOPTS) $(CLASSPATH) SendNowSelf $(DOMAIN) + $(JAVA) $(DEBUG) $(JAVAOPTS) $(CLASSPATH) SendNowSelf $(DOMAIN) @echo "*****************************" @echo "TEST SENDNOW SELF successful" @echo "*****************************" @@ -52,7 +53,7 @@ sendnow: @echo "*****************************" @echo "TEST SENDNOW" @echo "*****************************" - $(JAVA) $(DEBUG) $(DEBUG) $(JAVAOPTS) $(CLASSPATH) SendNow $(DOMAIN) + $(JAVA) $(DEBUG) $(JAVAOPTS) $(CLASSPATH) SendNow $(DOMAIN) @echo "*****************************" @echo "TEST SENDNOW successful" @echo "*****************************" diff --git a/tests/SendNow.java b/tests/SendNow.java index 093ee19..739bc41 100644 --- a/tests/SendNow.java +++ b/tests/SendNow.java @@ -49,7 +49,7 @@ public class SendNow { // else is joining the bus // we could add something at the JVM level, but it's no use for inter // process anyway. 200ms before close + lock is long enough - System.out.println("race condition: the Receiver has not received our message, quitting anyway"); + System.out.println("the Receiver has not received our message, quitting anyway"); System.exit(-1); } } catch (IvyException ie) { |