aboutsummaryrefslogtreecommitdiff
path: root/src/Ivy.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/Ivy.java')
-rwxr-xr-xsrc/Ivy.java146
1 files changed, 78 insertions, 68 deletions
diff --git a/src/Ivy.java b/src/Ivy.java
index f894ee4..4c6f8ff 100755
--- a/src/Ivy.java
+++ b/src/Ivy.java
@@ -16,6 +16,8 @@
*
* CHANGELOG:
* 1.2.16
+ * - uses a ThreadPoolExecutor
+ * - sendMsg goes synchronized
* - API break: getIvyClients now returns a Collection, instead of a Vector
* - fixes a concurent exception in the stop() method (no more
* removeClient , triggered in the SendNow test)
@@ -142,6 +144,9 @@ import java.util.HashMap;
import java.util.ArrayList;
import java.util.Properties;
import java.util.StringTokenizer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
public class Ivy implements Runnable {
@@ -157,7 +162,7 @@ public class Ivy implements Runnable {
* the library version, useful for development purposes only, when java is
* invoked with -DIVY_DEBUG
*/
- private static final String LIBVERSION ="1.2.16";
+ private static final String LIBVERSION ="1.2.17";
/*
* private fields
@@ -176,17 +181,22 @@ public class Ivy implements Runnable {
private boolean debug;
private ServerSocket app;
private Collection<IvyWatcher> watchers = new ArrayList<IvyWatcher>();
- private volatile Thread serverThread; // to ensure quick communication of the end
+ private volatile boolean keeprunning = false ;
+ private Thread serverThread = null ;
+
private Map<Integer,IvyClient> clients = Collections.synchronizedMap(new HashMap<Integer,IvyClient>());
private Map<Integer,IvyClient> half = Collections.synchronizedMap(new HashMap<Integer,IvyClient>());
+
private Vector<IvyApplicationListener> ivyApplicationListenerList = new Vector<IvyApplicationListener>();
private Vector<IvyBindListener> ivyBindListenerList = new Vector<IvyBindListener>();
private Vector<Thread> sendThreads = new Vector<Thread>();
private String[] filter = null;
private boolean stopped = true;
- private boolean starting = false;
+ // private boolean starting = false;
+ private volatile int nbThreads = 0;
protected Object readyToSend = new Object();
private boolean doSendToSelf = false;
+ private ExecutorService pool = null;
// FIXME should not be static ? (findbugs)
private static int serial = 0;
@@ -332,7 +342,7 @@ public class Ivy implements Runnable {
public final void start(final String domainbus) throws IvyException {
if (!stopped) throw new IvyException("cannot start a bus that's already started");
- setStarting(true); // this will remain true entil one of the PacketSenders has finished
+ pool = Executors.newCachedThreadPool();
stopped=false;
String db = domainbus;
if (db == null) db = Domain.getDomain(null);
@@ -376,20 +386,19 @@ public class Ivy implements Runnable {
watcherId = getWBUId().replace(' ' , '*'); // no space in the watcherId
// readies the rendezvous : an IvyWatcher (thread) per domain bus
for (Domain dom: d) watchers.add(new IvyWatcher(this , dom.getDomainaddr() , dom.getPort()));
- serverThread = new Thread(this);
- serverThread.setName("Ivy TCP server Thread");
- serverThread.setDaemon(true);
- serverThread.start();
+ keeprunning = true ;
+ pool.execute(this);
+
// sends the broadcasts and listen to incoming connexions
for (IvyWatcher iw: watchers) iw.doStart();
}
-
private void waitForRemote(String s) {
try {
- while (starting==true) {
+ while (nbThreads > 0) {
+ traceDebug("I'm waiting before "+s+", a connecting tread is in progress");
Thread.sleep(GRACEDELAY);
- traceDebug("I'm waiting before "+s+", a starting tread is in progress");
+ traceDebug("I'm done waiting before "+s);
}
} catch (InterruptedException ie) {
// should not happen, and it's not a problem anyway
@@ -403,34 +412,22 @@ public class Ivy implements Runnable {
waitForRemote("stopping");
if (stopped) return;
stopped = true;
- serverThread = null;
+ keeprunning = false;
traceDebug("beginning stopping");
try {
- // stopping the serverThread
- Thread t = serverThread;
- if (t != null) {
- t.interrupt(); // The serverThread might be stopped even before having been created
- // System.out.println("IZZZ joining " + t);
- try { t.join(); } catch ( InterruptedException ie ) {
- ie.printStackTrace();
- }
- }
synchronized (lockApp) { app.close(); }
+
// stopping the IvyWatchers
for (IvyWatcher iw: watchers) iw.doStop();
watchers.clear();
// stopping the remaining IvyClients
synchronized (clients) {
- for (IvyClient c : clients.values()) {
- if (c != null) {
- c.close(true);
- // removeClient(c); // useless ?
- }
- }
+ for (IvyClient c : clients.values()) if (c != null) c.close(true);
}
} catch (IOException e) {
traceDebug("IOexception Stop ");
}
+ pool.shutdown();
traceDebug("end stopping");
}
@@ -483,8 +480,10 @@ public class Ivy implements Runnable {
* @param message A String which will be compared to the regular
* expressions of the different clients
* @return returns the number of messages actually sent
+ *
+ * since 1.2.16 goes synchronized to avoid concurrent access
*/
- public final int sendMsg(final String message) throws IvyException {
+ synchronized public final int sendMsg(final String message) throws IvyException {
int count = 0;
waitForRemote("sending");
synchronized (lock) {
@@ -811,6 +810,12 @@ public class Ivy implements Runnable {
public final String getWBUId() {
return "ID<" + appName + myserial + ":" + nextId() + ":" + generator.nextInt() + ">";
}
+
+
+ @Override public String toString() {
+ return "bus <"+appName+">[port:"+applicationPort+",serial:"+myserial+"]";
+ }
+
private synchronized long nextId() { return current++; }
/////////////////////////////////////////////////////////////////:
@@ -819,11 +824,19 @@ public class Ivy implements Runnable {
//
/////////////////////////////////////////////////////////////////:
- protected IvyClient createIvyClient(Socket s , int port, boolean domachin) throws IOException {
- setStarting(true); // this one will stop when the client has finished starting
+ /**
+ * @return false if the client has not been created, true otherwise
+ */
+ protected boolean createIvyClient(Socket s , int port, boolean domachin) throws IOException {
IvyClient i = new IvyClient(this , s , port , domachin);
- i.doStart();
- return i;
+ try {
+ pool.execute(i);
+ } catch (RejectedExecutionException ree) {
+ // in another thread, the pool is shut down
+ traceDebug("in another thread, the pool is shut down");
+ return false;
+ }
+ return true;
}
@@ -846,7 +859,6 @@ public class Ivy implements Runnable {
synchronized (clients) {
clients.put(c.getClientKey() , c);
}
- setStarting(false);
traceDebug("added " + c + " in clients: " + getClientNames(clients));
} else {
traceDebug("not adding "+c+" in clients, double connexion detected, removing lowest one");
@@ -860,25 +872,21 @@ public class Ivy implements Runnable {
}
protected synchronized void addHalf(IvyClient c) {
- //synchronized(lock){
- synchronized (half) { half.put(c.getClientKey() , c); }
- //}
+ synchronized (half) { half.put(c.getClientKey() , c); }
traceDebug("added " + c + " in half: " + getClientNames(half));
}
protected synchronized void removeHalf(IvyClient c) {
- //synchronized(lock) {
- if (half == null||c == null) return;
- synchronized (half) {
- half.remove(c.getClientKey());
- }
- //}
+ if (half == null||c == null) return;
+ synchronized (half) {
+ half.remove(c.getClientKey());
+ }
traceDebug("removed " + c + " from half: " + getClientNames(half));
}
private synchronized IvyClient searchPeer(IvyClient ic) {
synchronized (clients) {
- for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.equals(ic))) return peer;
+ for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.myEquals(ic))) return peer;
}
return null;
}
@@ -888,26 +896,29 @@ public class Ivy implements Runnable {
*/
public void run() {
traceDebug("service thread started"); // THREADDEBUG
- Thread thisThread = Thread.currentThread();
+ serverThread = Thread.currentThread();
+ serverThread.setName("Ivy TCP server Thread");
+ //serverThread.setDaemon(true);
Socket socket = null;
- while ( thisThread == serverThread ){
+ while ( keeprunning ){
try {
synchronized (this) {
//System.out.println("DEBUG stopped: "+stopped);
- if ((thisThread != serverThread)||stopped) break; // early disconnexion
+ if ( (!keeprunning) || stopped ) break; // early disconnexion
}
synchronized (lockApp) {
socket = app.accept(); // TODO I can't synchronize on (this) in the run
}
synchronized (this) {
- if ((thisThread != serverThread)||stopped) break; // early disconnexion
- createIvyClient(socket , 0 , true); // the peer called me
+ if ( (!keeprunning) || stopped ) break; // early disconnexion
+ // the peer called me
+ if ( ! createIvyClient(socket , 0 , true) ) break;
}
} catch (InterruptedIOException ie) {
// traceDebug("server socket was interrupted. good");
- if (thisThread != serverThread) break;
+ if ( !keeprunning ) break;
} catch( IOException e ) {
- if (serverThread == thisThread) {
+ if ( keeprunning ) {
traceDebug("Error IvyServer exception: " + e.getMessage());
System.out.println("Ivy server socket reader caught an exception " + e.getMessage());
System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)");
@@ -920,34 +931,33 @@ public class Ivy implements Runnable {
traceDebug("service thread stopped"); // THREADDEBUG
}
- protected String getAppName() { return appName; }
- protected int getAppPort() { return applicationPort; }
- protected String getReadyMessage() { return ready_message; }
- protected boolean getProtectNewlines() { return doProtectNewlines; }
+ String getAppName() { return appName; }
+ int getAppPort() { return applicationPort; }
+ String getReadyMessage() { return ready_message; }
+ boolean getProtectNewlines() { return doProtectNewlines; }
+ String getWatcherId() { return watcherId; }
+ int getBufferSize() { return bufferSize; }
+ int getSerial() { return myserial; }
+ ExecutorService getPool() { return pool; }
- protected void setStarting(boolean s) {
+ protected void pushThread(String reason) {
synchronized(readyToSend) {
- traceDebug("setStarting "+s);
- starting = s;
+ nbThreads++ ;
+ //System.out.println("DEBUG PUSH "+this+" -- threads: "+nbThreads + "; reason: "+reason);
}
}
- protected String getWatcherId() { return watcherId; }
- protected int getBufferSize() { return bufferSize; }
+ protected void popThread(String reason) {
+ synchronized(readyToSend) {
+ nbThreads-- ;
+ //System.out.println("DEBUG POP "+this+" -- threads: "+nbThreads + "reason: "+reason);
+ }
+ }
- protected int getSerial() { return myserial; }
private void traceDebug(String s){
if (debug) System.out.println("-->Ivy[" + myserial + "]<-- " + s);
}
- // stuff to guarantee that all the treads have left
- synchronized void registerThread(Thread t) { sendThreads.addElement(t); }
- synchronized void unRegisterThread(Thread t) { sendThreads.removeElement(t); }
- synchronized Thread getOneThread() {
- if (sendThreads.size() == 0) return null;
- return (Thread) sendThreads.firstElement();
- }
-
// a small private method for debbugging purposes
private String getClientNames(Map<Integer , IvyClient> t) {
StringBuffer s = new StringBuffer();