aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/Ivy.java146
-rwxr-xr-xsrc/IvyClient.java48
-rw-r--r--src/IvyDaemon.java63
-rwxr-xr-xsrc/IvyWatcher.java231
-rw-r--r--src/Probe.java34
-rw-r--r--src/Protocol.java26
-rw-r--r--src/ProxyClient.java8
-rw-r--r--src/Puppet.java5
-rw-r--r--src/SelfIvyClient.java15
9 files changed, 320 insertions, 256 deletions
diff --git a/src/Ivy.java b/src/Ivy.java
index f894ee4..4c6f8ff 100755
--- a/src/Ivy.java
+++ b/src/Ivy.java
@@ -16,6 +16,8 @@
*
* CHANGELOG:
* 1.2.16
+ * - uses a ThreadPoolExecutor
+ * - sendMsg goes synchronized
* - API break: getIvyClients now returns a Collection, instead of a Vector
* - fixes a concurent exception in the stop() method (no more
* removeClient , triggered in the SendNow test)
@@ -142,6 +144,9 @@ import java.util.HashMap;
import java.util.ArrayList;
import java.util.Properties;
import java.util.StringTokenizer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
public class Ivy implements Runnable {
@@ -157,7 +162,7 @@ public class Ivy implements Runnable {
* the library version, useful for development purposes only, when java is
* invoked with -DIVY_DEBUG
*/
- private static final String LIBVERSION ="1.2.16";
+ private static final String LIBVERSION ="1.2.17";
/*
* private fields
@@ -176,17 +181,22 @@ public class Ivy implements Runnable {
private boolean debug;
private ServerSocket app;
private Collection<IvyWatcher> watchers = new ArrayList<IvyWatcher>();
- private volatile Thread serverThread; // to ensure quick communication of the end
+ private volatile boolean keeprunning = false ;
+ private Thread serverThread = null ;
+
private Map<Integer,IvyClient> clients = Collections.synchronizedMap(new HashMap<Integer,IvyClient>());
private Map<Integer,IvyClient> half = Collections.synchronizedMap(new HashMap<Integer,IvyClient>());
+
private Vector<IvyApplicationListener> ivyApplicationListenerList = new Vector<IvyApplicationListener>();
private Vector<IvyBindListener> ivyBindListenerList = new Vector<IvyBindListener>();
private Vector<Thread> sendThreads = new Vector<Thread>();
private String[] filter = null;
private boolean stopped = true;
- private boolean starting = false;
+ // private boolean starting = false;
+ private volatile int nbThreads = 0;
protected Object readyToSend = new Object();
private boolean doSendToSelf = false;
+ private ExecutorService pool = null;
// FIXME should not be static ? (findbugs)
private static int serial = 0;
@@ -332,7 +342,7 @@ public class Ivy implements Runnable {
public final void start(final String domainbus) throws IvyException {
if (!stopped) throw new IvyException("cannot start a bus that's already started");
- setStarting(true); // this will remain true entil one of the PacketSenders has finished
+ pool = Executors.newCachedThreadPool();
stopped=false;
String db = domainbus;
if (db == null) db = Domain.getDomain(null);
@@ -376,20 +386,19 @@ public class Ivy implements Runnable {
watcherId = getWBUId().replace(' ' , '*'); // no space in the watcherId
// readies the rendezvous : an IvyWatcher (thread) per domain bus
for (Domain dom: d) watchers.add(new IvyWatcher(this , dom.getDomainaddr() , dom.getPort()));
- serverThread = new Thread(this);
- serverThread.setName("Ivy TCP server Thread");
- serverThread.setDaemon(true);
- serverThread.start();
+ keeprunning = true ;
+ pool.execute(this);
+
// sends the broadcasts and listen to incoming connexions
for (IvyWatcher iw: watchers) iw.doStart();
}
-
private void waitForRemote(String s) {
try {
- while (starting==true) {
+ while (nbThreads > 0) {
+ traceDebug("I'm waiting before "+s+", a connecting tread is in progress");
Thread.sleep(GRACEDELAY);
- traceDebug("I'm waiting before "+s+", a starting tread is in progress");
+ traceDebug("I'm done waiting before "+s);
}
} catch (InterruptedException ie) {
// should not happen, and it's not a problem anyway
@@ -403,34 +412,22 @@ public class Ivy implements Runnable {
waitForRemote("stopping");
if (stopped) return;
stopped = true;
- serverThread = null;
+ keeprunning = false;
traceDebug("beginning stopping");
try {
- // stopping the serverThread
- Thread t = serverThread;
- if (t != null) {
- t.interrupt(); // The serverThread might be stopped even before having been created
- // System.out.println("IZZZ joining " + t);
- try { t.join(); } catch ( InterruptedException ie ) {
- ie.printStackTrace();
- }
- }
synchronized (lockApp) { app.close(); }
+
// stopping the IvyWatchers
for (IvyWatcher iw: watchers) iw.doStop();
watchers.clear();
// stopping the remaining IvyClients
synchronized (clients) {
- for (IvyClient c : clients.values()) {
- if (c != null) {
- c.close(true);
- // removeClient(c); // useless ?
- }
- }
+ for (IvyClient c : clients.values()) if (c != null) c.close(true);
}
} catch (IOException e) {
traceDebug("IOexception Stop ");
}
+ pool.shutdown();
traceDebug("end stopping");
}
@@ -483,8 +480,10 @@ public class Ivy implements Runnable {
* @param message A String which will be compared to the regular
* expressions of the different clients
* @return returns the number of messages actually sent
+ *
+ * since 1.2.16 goes synchronized to avoid concurrent access
*/
- public final int sendMsg(final String message) throws IvyException {
+ synchronized public final int sendMsg(final String message) throws IvyException {
int count = 0;
waitForRemote("sending");
synchronized (lock) {
@@ -811,6 +810,12 @@ public class Ivy implements Runnable {
public final String getWBUId() {
return "ID<" + appName + myserial + ":" + nextId() + ":" + generator.nextInt() + ">";
}
+
+
+ @Override public String toString() {
+ return "bus <"+appName+">[port:"+applicationPort+",serial:"+myserial+"]";
+ }
+
private synchronized long nextId() { return current++; }
/////////////////////////////////////////////////////////////////:
@@ -819,11 +824,19 @@ public class Ivy implements Runnable {
//
/////////////////////////////////////////////////////////////////:
- protected IvyClient createIvyClient(Socket s , int port, boolean domachin) throws IOException {
- setStarting(true); // this one will stop when the client has finished starting
+ /**
+ * @return false if the client has not been created, true otherwise
+ */
+ protected boolean createIvyClient(Socket s , int port, boolean domachin) throws IOException {
IvyClient i = new IvyClient(this , s , port , domachin);
- i.doStart();
- return i;
+ try {
+ pool.execute(i);
+ } catch (RejectedExecutionException ree) {
+ // in another thread, the pool is shut down
+ traceDebug("in another thread, the pool is shut down");
+ return false;
+ }
+ return true;
}
@@ -846,7 +859,6 @@ public class Ivy implements Runnable {
synchronized (clients) {
clients.put(c.getClientKey() , c);
}
- setStarting(false);
traceDebug("added " + c + " in clients: " + getClientNames(clients));
} else {
traceDebug("not adding "+c+" in clients, double connexion detected, removing lowest one");
@@ -860,25 +872,21 @@ public class Ivy implements Runnable {
}
protected synchronized void addHalf(IvyClient c) {
- //synchronized(lock){
- synchronized (half) { half.put(c.getClientKey() , c); }
- //}
+ synchronized (half) { half.put(c.getClientKey() , c); }
traceDebug("added " + c + " in half: " + getClientNames(half));
}
protected synchronized void removeHalf(IvyClient c) {
- //synchronized(lock) {
- if (half == null||c == null) return;
- synchronized (half) {
- half.remove(c.getClientKey());
- }
- //}
+ if (half == null||c == null) return;
+ synchronized (half) {
+ half.remove(c.getClientKey());
+ }
traceDebug("removed " + c + " from half: " + getClientNames(half));
}
private synchronized IvyClient searchPeer(IvyClient ic) {
synchronized (clients) {
- for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.equals(ic))) return peer;
+ for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.myEquals(ic))) return peer;
}
return null;
}
@@ -888,26 +896,29 @@ public class Ivy implements Runnable {
*/
public void run() {
traceDebug("service thread started"); // THREADDEBUG
- Thread thisThread = Thread.currentThread();
+ serverThread = Thread.currentThread();
+ serverThread.setName("Ivy TCP server Thread");
+ //serverThread.setDaemon(true);
Socket socket = null;
- while ( thisThread == serverThread ){
+ while ( keeprunning ){
try {
synchronized (this) {
//System.out.println("DEBUG stopped: "+stopped);
- if ((thisThread != serverThread)||stopped) break; // early disconnexion
+ if ( (!keeprunning) || stopped ) break; // early disconnexion
}
synchronized (lockApp) {
socket = app.accept(); // TODO I can't synchronize on (this) in the run
}
synchronized (this) {
- if ((thisThread != serverThread)||stopped) break; // early disconnexion
- createIvyClient(socket , 0 , true); // the peer called me
+ if ( (!keeprunning) || stopped ) break; // early disconnexion
+ // the peer called me
+ if ( ! createIvyClient(socket , 0 , true) ) break;
}
} catch (InterruptedIOException ie) {
// traceDebug("server socket was interrupted. good");
- if (thisThread != serverThread) break;
+ if ( !keeprunning ) break;
} catch( IOException e ) {
- if (serverThread == thisThread) {
+ if ( keeprunning ) {
traceDebug("Error IvyServer exception: " + e.getMessage());
System.out.println("Ivy server socket reader caught an exception " + e.getMessage());
System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)");
@@ -920,34 +931,33 @@ public class Ivy implements Runnable {
traceDebug("service thread stopped"); // THREADDEBUG
}
- protected String getAppName() { return appName; }
- protected int getAppPort() { return applicationPort; }
- protected String getReadyMessage() { return ready_message; }
- protected boolean getProtectNewlines() { return doProtectNewlines; }
+ String getAppName() { return appName; }
+ int getAppPort() { return applicationPort; }
+ String getReadyMessage() { return ready_message; }
+ boolean getProtectNewlines() { return doProtectNewlines; }
+ String getWatcherId() { return watcherId; }
+ int getBufferSize() { return bufferSize; }
+ int getSerial() { return myserial; }
+ ExecutorService getPool() { return pool; }
- protected void setStarting(boolean s) {
+ protected void pushThread(String reason) {
synchronized(readyToSend) {
- traceDebug("setStarting "+s);
- starting = s;
+ nbThreads++ ;
+ //System.out.println("DEBUG PUSH "+this+" -- threads: "+nbThreads + "; reason: "+reason);
}
}
- protected String getWatcherId() { return watcherId; }
- protected int getBufferSize() { return bufferSize; }
+ protected void popThread(String reason) {
+ synchronized(readyToSend) {
+ nbThreads-- ;
+ //System.out.println("DEBUG POP "+this+" -- threads: "+nbThreads + "reason: "+reason);
+ }
+ }
- protected int getSerial() { return myserial; }
private void traceDebug(String s){
if (debug) System.out.println("-->Ivy[" + myserial + "]<-- " + s);
}
- // stuff to guarantee that all the treads have left
- synchronized void registerThread(Thread t) { sendThreads.addElement(t); }
- synchronized void unRegisterThread(Thread t) { sendThreads.removeElement(t); }
- synchronized Thread getOneThread() {
- if (sendThreads.size() == 0) return null;
- return (Thread) sendThreads.firstElement();
- }
-
// a small private method for debbugging purposes
private String getClientNames(Map<Integer , IvyClient> t) {
StringBuffer s = new StringBuffer();
diff --git a/src/IvyClient.java b/src/IvyClient.java
index aadbd0b..3cef803 100755
--- a/src/IvyClient.java
+++ b/src/IvyClient.java
@@ -12,6 +12,8 @@
* created for each remote client.
*
* CHANGELOG:
+ * 1.2.17
+ * - fixes a synchronization issue in sendMsg
* 1.2.16
* - now uses the synchronized wrappers of the Java API for all collections
* 1.2.14
@@ -103,7 +105,7 @@ import java.util.Vector;
import java.util.regex.*;
import java.util.Collection;
-public class IvyClient extends Thread {
+public class IvyClient implements Runnable {
// private variables
private final static int MAXPONGCALLBACKS = 10;
@@ -118,11 +120,10 @@ public class IvyClient extends Thread {
private BufferedReader in;
private OutputStream out;
private int remotePort=0;
- private volatile Thread clientThread;// volatile to ensure the quick communication
+ private volatile boolean keepgoing = true;// volatile to ensure the atomicity
private Integer clientKey;
private boolean discCallbackPerformed = false;
private String remoteHostname="unresolved";
- private static ThreadGroup clientsThreadGroup = new ThreadGroup("Ivy clients threadgroup");
// protected variables
String appName="none";
@@ -149,26 +150,17 @@ public class IvyClient extends Thread {
synchronized(bus) {
bus.addHalf(this); // register a half connexion
sendSchizo();
- // the registering (handShake) will take place at the reception of the regexps...
+ // the handShake will take place at the reception of the regexps.
}
}
remoteHostname = socket.getInetAddress().getHostName();
- clientThread = new Thread(clientsThreadGroup, this); // clientThread handles the incoming traffic
- clientThread.setName("Ivy client thread to "+remoteHostname+":"+remotePort);
- }
-
- /* removed from the constructor, to avoid Mulithread correctnaess issuses
- * see http://findbugs.sourceforge.net/bugDescriptions.html#SC_START_IN_CTOR
- */
- protected void doStart() {
- clientThread.start();
}
// sends our ID, whether we initiated the connexion or not
// the ID is the couple "host name,application Port", the host name
// information is in the socket itself, the port is not known if we
// initiate the connexion
- private void sendSchizo() throws IOException {
+ synchronized private void sendSchizo() throws IOException {
traceDebug("sending our service port "+bus.getAppPort());
Map<Integer,String> tosend=bus.getSelfIvyClient().regexpsText;
synchronized (tosend) {
@@ -273,9 +265,9 @@ public class IvyClient extends Thread {
protected int sendMsg(String message) {
int count = 0;
- for (Integer key : regexps.keySet()) {
- Pattern regexp = regexps.get(key);
- synchronized (regexp) {
+ synchronized (regexps) {
+ for (Integer key : regexps.keySet()) {
+ Pattern regexp = regexps.get(key);
Matcher m = regexp.matcher(message);
if (m.matches()) {
count++; // match
@@ -294,10 +286,9 @@ public class IvyClient extends Thread {
/* interrupt the listener thread */
private void stopListening() {
- Thread t = clientThread;
- if (t==null) return; // we can be summoned to quit from two path at a time
- clientThread=null;
- t.interrupt();
+ if ( !keepgoing ) return; // we can be summoned to quit from two path at a time
+ keepgoing = false;
+ interrupt();
}
/*
@@ -330,6 +321,7 @@ public class IvyClient extends Thread {
public void run() {
traceDebug("Thread started");
Thread thisThread = Thread.currentThread();
+ thisThread.setName("Ivy client thread to "+remoteHostname+":"+remotePort);
String msg = null;
try {
traceDebug("connection established with "+
@@ -337,10 +329,10 @@ public class IvyClient extends Thread {
} catch (Exception ie) {
traceDebug("Interrupted while resolving remote hostname");
}
- while (clientThread==thisThread) {
+ while ( keepgoing ) {
try {
if ((msg=in.readLine()) != null ) {
- if (clientThread!=thisThread) break; // early stop during readLine()
+ if ( !keepgoing ) break; // early stop during readLine()
if (!newParseMsg(msg)) {
close(true);
break;
@@ -354,9 +346,9 @@ public class IvyClient extends Thread {
ie.printStackTrace();
} catch (InterruptedIOException ioe) {
traceDebug("I have been interrupted. I'm about to leave my thread loop");
- if (thisThread!=clientThread) break;
+ if ( !keepgoing) break;
} catch (IOException e) {
- if (clientThread!=null) {
+ if ( !keepgoing ) {
traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort());
}
break;
@@ -370,8 +362,8 @@ public class IvyClient extends Thread {
traceDebug("Thread stopped");
}
- @Override public void interrupt(){
- super.interrupt();
+ void interrupt(){
+ Thread.currentThread().interrupt();
try {
if (socket!=null) socket.close();
} catch (IOException ioe) {
@@ -577,7 +569,7 @@ public class IvyClient extends Thread {
try {
bus.addHalf(this);
sendSchizo();
- bus.handShake(this);
+ bus.handShake(this); //
} catch (IOException ioe) {
throw new IvyException(ioe.toString());
}
diff --git a/src/IvyDaemon.java b/src/IvyDaemon.java
index f99202d..40d0535 100644
--- a/src/IvyDaemon.java
+++ b/src/IvyDaemon.java
@@ -12,6 +12,10 @@
* (c) CENA
*
* changelog:
+ * 1.2.16
+ * - now uses a Thread Pool Executor
+ * - now parses the messages: if the message is ".die IvyDaemon", quits the
+ * bus
* 1.2.14
* - remove the Thread.start() from the constructor, to avoid mulithread issues
* see * http://findbugs.sourceforge.net/bugDescriptions.html#SC_START_IN_CTOR
@@ -35,14 +39,19 @@ import fr.dgac.ivy.* ;
import java.io.*;
import java.net.*;
import java.util.Properties ;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import gnu.getopt.Getopt;
-public class IvyDaemon implements Runnable {
+public class IvyDaemon implements Runnable, IvyApplicationListener {
private ServerSocket serviceSocket;
private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
- private volatile Thread daemonThread;// volatile to ensure the quick communication
+ private Thread daemonThread;// volatile to ensure the quick communication
+ private volatile boolean keeprunning = false ;// volatile to ensure the quick communication
private Ivy bus;
+ private ExecutorService pool = null;
+ private static volatile int serial = 0;
public static final int DEFAULT_SERVICE_PORT = 3456 ;
public static final String DEFAULTNAME = "IvyDaemon";
@@ -88,52 +97,56 @@ public class IvyDaemon implements Runnable {
if (!quiet) System.out.println("broadcasting on "+Domain.domains(domain));
bus.start(domain);
if (!quiet) System.out.println("listening on "+servicePort);
- new IvyDaemon(bus,servicePort).doStart();
+ new IvyDaemon(bus,servicePort);
}
public IvyDaemon(Ivy bus,int servicePort) throws IOException {
this.bus=bus;
+ bus.addApplicationListener(this);
serviceSocket = new ServerSocket(servicePort) ;
- daemonThread=new Thread(this);
- daemonThread.setName("Ivy Daemon tool thread");
+ pool = Executors.newCachedThreadPool();
+ keeprunning = true ;
+ pool.execute(this);
}
- protected void doStart() {
- daemonThread.start();
- }
/*
* the service socket reader.
* it could be a thread, but as long as we've got one ....
*/
public void run() {
- Thread thisThread = Thread.currentThread();
+ daemonThread = Thread.currentThread();
+ daemonThread.setName("Ivy Daemon tool thread");
traceDebug("Thread started");
- while ( daemonThread==thisThread ) {
+ while ( keeprunning ) {
+ /* there is no way out of here, except ^C */
try {
new SubReader(serviceSocket.accept());
} catch( IOException e ) {
traceDebug("TCP socket reader caught an exception " + e.getMessage());
}
}
+ System.out.println("outta here");
traceDebug("Thread stopped");
+ pool.shutdown();
}
- class SubReader extends Thread {
+ class SubReader implements Runnable {
BufferedReader in;
SubReader(Socket socket) throws IOException {
in=new BufferedReader(new InputStreamReader(socket.getInputStream()));
- SubReader.this.start();
+ // setName("Subreader "+serial++);
+ pool.execute(SubReader.this);
}
public void run() {
traceDebug("Subreader Thread started");
- String msg = null;
try {
while (true) {
- msg=in.readLine();
+ String msg=in.readLine();
if (msg==null) break;
+ if (msg.compareTo(".die IvyDaemon") == 0) break;
try {
bus.sendMsg(msg);
} catch (IvyException ie) {
@@ -146,9 +159,31 @@ public class IvyDaemon implements Runnable {
throw new RuntimeException();
}
traceDebug("Subreader Thread stopped");
+ try {
+ in.close();
+ } catch (IOException ioe) {
+ // do nothing
+ }
+ pool.shutdown();
+ bus.stop();
+ System.exit(0);
}
}
+ public void connect( IvyClient client ) { }
+ public void disconnect( IvyClient client ) { }
+ public void die( IvyClient client, int id, String msgarg) {
+ keeprunning = false;
+ if ( daemonThread != null ) daemonThread.interrupt();
+ try {
+ serviceSocket.close();
+ } catch (IOException ioe) {
+ // I don't care
+ }
+ }
+
+ public void directMessage( IvyClient client, int id,String msgarg ) {}
+
private static void traceDebug(String s){
if (debug) System.out.println("-->IvyDaemon "+name+"<-- "+s);
}
diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java
index 1dcaab0..c9a661f 100755
--- a/src/IvyWatcher.java
+++ b/src/IvyWatcher.java
@@ -91,13 +91,15 @@ import java.util.Map;
import java.util.HashMap;
import java.util.regex.*;
-class IvyWatcher extends Thread {
+class IvyWatcher implements Runnable {
private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
private boolean alreadyIgnored = false;
private Ivy bus; /* master bus controler */
private DatagramSocket broadcast; /* supervision socket */
private int port;
- private volatile Thread listenThread = null;
+ private String domainaddr;
+ private volatile boolean keeprunning = false;
+ private Thread listenThread = null;
private InetAddress group;
// FIXME should not be static ? (findbugs)
private static int serial=0;
@@ -113,11 +115,10 @@ class IvyWatcher extends Thread {
*/
IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException {
this.bus = bus;
- this.port=port;
+ this.port = port;
+ this.domainaddr = domainaddr;
busWatcherId=bus.getWatcherId();
- listenThread = new Thread(this);
- listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port);
- listenThread.setDaemon(true);
+ keeprunning = true ;
// create the MulticastSocket
try {
group = InetAddress.getByName(domainaddr);
@@ -135,98 +136,30 @@ class IvyWatcher extends Thread {
*/
public void run() {
traceDebug("Thread started"); // THREADDEBUG
- Thread thisThread=Thread.currentThread();
+ listenThread=Thread.currentThread();
+ listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port);
+ // listenThread.setDaemon(true); // not possible in the treadpool ? FIXME
traceDebug("beginning of a watcher Thread");
- InetAddress remotehost=null;
try {
- int remotePort=0;
- while( listenThread==thisThread ) {
+ while( keeprunning ) {
try {
byte buf[] = new byte[256];
DatagramPacket packet=new DatagramPacket(buf,buf.length);
broadcast.receive(packet);
- bus.setStarting(true); // someone's coming
- if (listenThread!=thisThread) break; // I was summoned to leave during the receive
+ bus.pushThread("UDP packet received");
+ if ( !keeprunning ) break; // I was summoned to leave during the receive
String msg = new String(buf,0,packet.getLength());
- String remotehostname=null;
- try {
- remotehost = packet.getAddress();
- remotehostname = remotehost.getHostName();
- Matcher m = recoucou.matcher(msg);
- if (!m.matches()) {
- System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort());
- continue;
- }
- int version = Integer.parseInt(m.group(1));
- if ( version < Protocol.PROTOCOLMINIMUM ) {
- System.err.println("Ignoring bad format broadcast from "+
- remotehostname+":"+packet.getPort()
- +" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum");
- continue;
- }
- remotePort = Integer.parseInt(m.group(2));
- if (bus.getAppPort()==remotePort) { // if (same port number)
- if (busWatcherId!=null) {
- traceDebug("there's an appId: "+m.group(3));
- String otherId=m.group(3);
- String otherName=m.group(4);
- if (busWatcherId.compareTo(otherId)==0) {
- // same port #, same bus Id, It's me, I'm outta here
- traceDebug("ignoring my own broadcast");
- bus.setStarting(false);
- continue;
- } else {
- // same port #, different bus Id, it's another agent
- // implementing the Oh Soooo Cool watcherId undocumented
- // unprotocolar Ivy add on
- traceDebug("accepting a broadcast from a same port by "+otherName);
- }
- } else {
- // there's no watcherId in the broacast. I fall back to a
- // crude strategy: I ignore the first broadcast with the same
- // port number, and accept the following ones
- if (alreadyIgnored) {
- traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort()
- +" on my port number ("+remotePort+"), it's probably someone else");
- } else {
- alreadyIgnored=true;
- traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort()
- +" on my port number ("+remotePort+"), it's probably me");
- continue;
- }
- }
- } // end if (same port #)
- traceDebug("broadcast accepted from " +remotehostname
- +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version);
- if (!alreadyBroadcasted(remotehost.toString(),remotePort)) {
- traceDebug("no known agent originating from " + remotehost + ":" + remotePort);
- try {
- Socket s = new Socket(remotehost,remotePort);
- s.setReceiveBufferSize(bus.getBufferSize());
- //System.out.println("MY DEBUG - buffer size="+s.getReceiveBufferSize());
- s.setTcpNoDelay(true);
- bus.createIvyClient(s,remotePort,false);
- } catch ( java.net.ConnectException jnc ) {
- traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus");
- }
- } else {
- traceDebug("there is already a request originating from " + remotehost + ":" + remotePort);
- }
- } catch (NumberFormatException nfe) {
- System.err.println("Ignoring bad format broadcast from "+remotehostname);
- continue;
- } catch ( UnknownHostException e ) {
- System.err.println("Unkonwn host "+remotehost +","+e.getMessage());
- } catch ( IOException e) {
- System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage());
- e.printStackTrace();
- }
+ boolean b = (parsePacket(packet, msg));
+ bus.popThread("UDP packet processed");
+ if (b) continue; else break;
} catch (InterruptedIOException jii ){
- if (thisThread!=listenThread) { break ;}
+ // System.out.println("WTF UDP packet interrupted");
+ // another thread took place, not important
+ if ( !keeprunning ) { break ;}
}
} // while
} catch (java.net.SocketException se ){
- if (thisThread==listenThread) {
+ if ( keeprunning ) {
traceDebug("socket exception, continuing anyway on other Ivy domains "+se);
}
} catch (IOException ioe ){
@@ -235,9 +168,99 @@ class IvyWatcher extends Thread {
traceDebug("Thread stopped"); // THREADDEBUG
}
- @Override public void interrupt(){
- super.interrupt();
- broadcast.close();
+ /**
+ * parses the content of a received packet.
+ * @return true if the watcher can keep looping (continue), false if there's
+ * a big problem (break).
+ *
+ * first checks if the message structure is correct
+ * then checks if it's our own broadcasts
+ * then checks if there's already a concurrent connexion in progress
+ * if it's ok, creates a new IvyClient
+ *
+ */
+ private boolean parsePacket(DatagramPacket packet, String msg) {
+ // System.out.println("TODO Y - parse In");
+ int remotePort=0;
+ InetAddress remotehost = null;
+ String remotehostname = null;
+ try {
+ remotehost=packet.getAddress();
+ remotehostname=remotehost.getHostName();
+ Matcher m = recoucou.matcher(msg);
+ // is it a correct broadcast packet ?
+ if (!m.matches()) {
+ System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort());
+ return true;
+ }
+ // is it the correct protocol version ?
+ int version = Integer.parseInt(m.group(1));
+ if ( version < Protocol.PROTOCOLMINIMUM ) {
+ System.err.println("Ignoring bad format broadcast from "+
+ remotehostname+":"+packet.getPort()
+ +" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum");
+ return true;
+ }
+ // is it my own broadcast ?
+ remotePort = Integer.parseInt(m.group(2));
+ if (bus.getAppPort()==remotePort) { // if (same port number)
+ if (busWatcherId!=null) {
+ traceDebug("there's an appId: "+m.group(3));
+ String otherId=m.group(3);
+ String otherName=m.group(4);
+ if (busWatcherId.compareTo(otherId)==0) {
+ // same port #, same bus Id, It's me, I'm outta here
+ traceDebug("ignoring my own broadcast");
+ return true;
+ } else {
+ // same port #, different bus Id, it's another agent
+ // implementing the Oh Soooo Cool watcherId undocumented
+ // unprotocolar Ivy add on
+ traceDebug("accepting a broadcast from a same port by "+otherName);
+ }
+ } else {
+ // there's no watcherId in the broacast. I fall back to a
+ // crude strategy: I ignore the first broadcast with the same
+ // port number, and accept the following ones
+ if (alreadyIgnored) {
+ traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort()
+ +" on my port number ("+remotePort+"), it's probably someone else");
+ } else {
+ alreadyIgnored=true;
+ traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort()
+ +" on my port number ("+remotePort+"), it's probably me");
+ return true;
+ }
+ }
+ } // end if (same port #)
+
+ // it's definitively not me, let's shake hands !
+ traceDebug("broadcast accepted from " +remotehostname
+ +":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version);
+
+ if (!alreadyBroadcasted(remotehost.toString(),remotePort)) {
+ traceDebug("no known agent originating from " + remotehost + ":" + remotePort);
+ try {
+ Socket s = new Socket(remotehost,remotePort);
+ s.setReceiveBufferSize(bus.getBufferSize());
+ s.setTcpNoDelay(true);
+ if (!bus.createIvyClient(s,remotePort,false)) return false ;
+ } catch ( java.net.ConnectException jnc ) {
+ traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus");
+ }
+ } else {
+ traceDebug("there is already a request originating from " + remotehost + ":" + remotePort);
+ }
+ } catch (NumberFormatException nfe) {
+ System.err.println("Ignoring bad format broadcast from "+remotehostname);
+ return true;
+ } catch ( UnknownHostException e ) {
+ System.err.println("Unkonwn host "+remotehost +","+e.getMessage());
+ } catch ( IOException e) {
+ System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage());
+ e.printStackTrace();
+ }
+ return true;
}
/**
@@ -245,13 +268,9 @@ class IvyWatcher extends Thread {
*/
synchronized void doStop() {
traceDebug("begining stopping");
- bus.setStarting(true);
- Thread t = listenThread;
- listenThread=null;
- interrupt();
+ keeprunning = false;
+ if (listenThread != null) listenThread.interrupt();
broadcast.close();
- if (t!=null) { t.interrupt(); } // it might not even have been created
- bus.setStarting(false);
traceDebug("stopped");
}
@@ -261,16 +280,18 @@ class IvyWatcher extends Thread {
DatagramPacket packet;
String data;
Ivy bus;
+
public PacketSender(String data, Ivy b) {
this.data=data;
bus = b;
packet=new DatagramPacket(data.getBytes(),data.length(),group,port);
- Thread t = new Thread((PacketSender.this));
- t.setName("Ivy Packet sender");
- t.start();
+ bus.getPool().execute( PacketSender.this );
}
+
public void run() {
+ bus.pushThread("packet sender started");
traceDebug("PacketSender thread started"); // THREADDEBUG
+ Thread.currentThread().setName("Ivy Packet sender");
try {
broadcast.send(packet);
} catch (InterruptedIOException e) {
@@ -279,15 +300,13 @@ class IvyWatcher extends Thread {
e.printStackTrace();
traceDebug("IO interrupted during the broadcast. Do nothing");
} catch ( IOException e ) {
- if (listenThread!=null) {
- System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway");
- // cannot throw new IvyException in a run ...
- e.printStackTrace();
- }
+ System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway");
+ // cannot throw new IvyException in a run ...
+ e.printStackTrace();
}
try { Thread.sleep(100); } catch (InterruptedException ie ){ }
- bus.setStarting(false); // one of the senders has finished its work, plus extra time
traceDebug("PacketSender thread stopped"); // THREADDEBUG
+ bus.popThread("packet sender finished"); // one of the senders has finished its work, plus extra time
}
}
@@ -295,8 +314,8 @@ class IvyWatcher extends Thread {
// String hello = Ivy.PROTOCOLVERSION + " " + bus.getAppPort() + "\n";
String hello = Protocol.PROTOCOLVERSION + " " + bus.getAppPort() + " "+busWatcherId+" "+bus.getSelfIvyClient().getApplicationName()+"\n";
if (broadcast==null) throw new IvyException("IvyWatcher PacketSender null broadcast address");
+ bus.getPool().execute(this);
new PacketSender(hello,bus); // notifies our arrival on each domain: protocol version + port
- listenThread.start();
}
/*
diff --git a/src/Probe.java b/src/Probe.java
index f362b38..a3816f6 100644
--- a/src/Probe.java
+++ b/src/Probe.java
@@ -9,6 +9,8 @@
* Changelog:
* 1.2.16
* - now uses the synchronized wrappers of the Java API for all collections
+ * - gets read of Thread mumbo jumbo, and no more System.exit() on die, we
+ * prefer System.in.close() !
* 1.2.14
* - uses the "new" for: loop construct of Java5
* - throws RuntimeException instead of System.exit(), allows code reuse
@@ -67,7 +69,7 @@ import java.util.*;
import gnu.getopt.Getopt;
import java.util.regex.*;
-public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBindListener, Runnable {
+public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBindListener {
public static final String helpCommands = "Available commands:\n"+
".die CLIENT\t\t\t* sends a die message\n"+
@@ -146,7 +148,6 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin
}
private BufferedReader in;
- private volatile Thread looperThread;
private Ivy bus;
private boolean timestamp,quiet,debug,exitOnDie=false, encore=true;
private static Pattern directMsgRE, timeCountRE;
@@ -169,25 +170,18 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin
this.debug = debug;
}
- public void start(Ivy bus) throws IvyException {
- if (looperThread!=null) throw new IvyException("Probe already started");
- this.bus=bus;
- bus.addApplicationListener(this);
- looperThread=new Thread(this);
- looperThread.setName("Ivy Probe looper thread on readline");
- looperThread.start();
- }
public void setExitOnDie(boolean b) { exitOnDie=b; }
- public void run() {
- traceDebug("Probe Thread started");
- Thread thisThread=Thread.currentThread();
+ public void start(Ivy bus) throws IvyException {
+ this.bus=bus;
+ bus.addApplicationListener(this);
+ traceDebug("Probe Loop started");
String s;
SelfIvyClient sic = bus.getSelfIvyClient();
println(sic.getApplicationName()+ " ready, type .help and return to get help");
// "infinite" loop on keyboard input
- while (encore && looperThread==thisThread) {
+ while ( encore ) {
try {
s=in.readLine();
if (s==null) break;
@@ -202,7 +196,7 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin
}
println("End of input. Good bye !");
bus.stop();
- traceDebug("Probe Thread stopped");
+ traceDebug("Probe Loop stopped");
}
boolean parseCommand(String s) throws IOException {
@@ -351,7 +345,15 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin
public void die(IvyClient client, int id,String msgarg) {
println("received die msg from " + client.getApplicationName() +" with the message: "+msgarg+", good bye");
/* I cannot stop the readLine(), because it is native code ?! */
- if (exitOnDie) System.exit(0);
+ if (exitOnDie) {
+ encore = false;
+ try {
+ System.in.close();
+ } catch (IOException ie) {
+ // perfect !
+ }
+ //System.exit(0);
+ }
}
public void directMessage(IvyClient client, int id, String arg) {
diff --git a/src/Protocol.java b/src/Protocol.java
index 9751661..ddd0161 100644
--- a/src/Protocol.java
+++ b/src/Protocol.java
@@ -13,17 +13,17 @@ package fr.dgac.ivy;
enum Protocol {
- BYE(0), /* end of the peer */
- ADDREGEXP(1), /* the peer adds a regexp */
- MSG(2), /* the peer sends a message */
- ERROR(3), /* error message */
- DELREGEXP(4), /* the peer removes one of his regex */
- ENDREGEXP(5), /* no more regexp in the handshake */
- SCHIZOTOKEN(6), /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */
- DIRECTMSG(7), /* the peer sends a direct message */
- DIE(8), /* the peer wants us to quit */
- PING(9),
- PONG(10);
+ BYE(0), /* end of the peer */
+ ADDREGEXP(1), /* the peer adds a regexp */
+ MSG(2), /* the peer sends a message */
+ ERROR(3), /* error message */
+ DELREGEXP(4), /* the peer removes one of his regex */
+ ENDREGEXP(5), /* no more regexp in the handshake */
+ SCHIZOTOKEN(6), /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */
+ DIRECTMSG(7), /* the peer sends a direct message */
+ DIE(8), /* the peer wants us to quit */
+ PING(9),
+ PONG(10);
final static char STARTARG = '\u0002';/* begin of arguments */
final static char ENDARG = '\u0003'; /* end of arguments */
@@ -50,5 +50,7 @@ enum Protocol {
if (p.value() == i) return p;
throw new IvyException("protocol magic number "+i+" not known");
}
-
+
+ @Override public String toString() { return ""+value; }
+
}
diff --git a/src/ProxyClient.java b/src/ProxyClient.java
index 92f2c75..e6e8606 100644
--- a/src/ProxyClient.java
+++ b/src/ProxyClient.java
@@ -176,7 +176,7 @@ class ProxyClient extends Ivy {
* protocol and forward everything to the proxies.
* TODO: remember everything in case a new proxy client comes ?
*/
- protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException {
+ protected boolean createIvyClient(Socket s,int port, boolean domachin) throws IOException {
IvyClient i;
// TODO si c'est un puppet, je ne dois pas creer de Ghost
// voir meme me deconnecter du biniou ?
@@ -185,8 +185,8 @@ class ProxyClient extends Ivy {
// this new Ivy agent is in fact one of my puppets ...
System.out.println("not Ghosting this (probable) Puppet Ivy agent");
i= new IvyClient(this,s,port,domachin);
- i.start();
- return i;
+ p.bus.getPool().execute(i);
+ return true;
}
}
String key = getWBUId();
@@ -196,7 +196,7 @@ class ProxyClient extends Ivy {
while ((ghostId=id.get(key))==null) { Thread.sleep(200); }
Ghost g = new Ghost(this,s,port,domachin,ghostId,this);
ghosts.put(ghostId,g);
- return g;
+ return true;
} catch (InterruptedException ie) { ie.printStackTrace(); }
System.out.println("error waiting");
throw new RuntimeException();
diff --git a/src/Puppet.java b/src/Puppet.java
index c307e01..ee26cb2 100644
--- a/src/Puppet.java
+++ b/src/Puppet.java
@@ -139,8 +139,9 @@ class Puppet {
static class PuppetIvy extends Ivy {
PuppetIvy(String name,String ready,IvyApplicationListener ial){super(name,ready,ial);}
- protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException {
- return new PuppetIvyClient(PuppetIvy.this,s,port,domachin);
+ protected boolean createIvyClient(Socket s,int port, boolean domachin) throws IOException {
+ new PuppetIvyClient(PuppetIvy.this,s,port,domachin);
+ return true;
}
int getAP() {return getAppPort();}
}
diff --git a/src/SelfIvyClient.java b/src/SelfIvyClient.java
index bd06ee1..2751c92 100644
--- a/src/SelfIvyClient.java
+++ b/src/SelfIvyClient.java
@@ -192,13 +192,16 @@ public class SelfIvyClient extends IvyClient {
this.cb=cb;
this.c=c;
args=a;
- t=new Thread(Runner.this);
- bus.registerThread(t);
- t.setName("Ivy Runner Thread to execute an async callback");
- t.start();
- bus.unRegisterThread(t);
+ //t=new Thread(Runner.this);
+ //bus.registerThread(t);
+ //t.start();
+ bus.getPool().execute(Runner.this);
+ //bus.unRegisterThread(t);
+ }
+ public void run() {
+ Thread.currentThread().setName("Ivy Runner Thread to execute an async callback");
+ cb.receive(c,args);
}
- public void run() { cb.receive(c,args); }
} // class Runner
private void traceDebug(String s){