diff options
author | jestin | 2012-08-23 19:39:32 +0000 |
---|---|---|
committer | jestin | 2012-08-23 19:39:32 +0000 |
commit | e3b0fb9534d783d62245be8d7777f35d34e6d59d (patch) | |
tree | 1216b70c18c478eb1e132c749b4ce4c09ae1b786 /src/Ivy.java | |
parent | e854a58a81ec90e419a4b3effa5a83caac05df90 (diff) | |
download | ivy-java-e3b0fb9534d783d62245be8d7777f35d34e6d59d.zip ivy-java-e3b0fb9534d783d62245be8d7777f35d34e6d59d.tar.gz ivy-java-e3b0fb9534d783d62245be8d7777f35d34e6d59d.tar.bz2 ivy-java-e3b0fb9534d783d62245be8d7777f35d34e6d59d.tar.xz |
Diffstat (limited to 'src/Ivy.java')
-rwxr-xr-x | src/Ivy.java | 146 |
1 files changed, 78 insertions, 68 deletions
diff --git a/src/Ivy.java b/src/Ivy.java index f894ee4..4c6f8ff 100755 --- a/src/Ivy.java +++ b/src/Ivy.java @@ -16,6 +16,8 @@ * * CHANGELOG: * 1.2.16 + * - uses a ThreadPoolExecutor + * - sendMsg goes synchronized * - API break: getIvyClients now returns a Collection, instead of a Vector * - fixes a concurent exception in the stop() method (no more * removeClient , triggered in the SendNow test) @@ -142,6 +144,9 @@ import java.util.HashMap; import java.util.ArrayList; import java.util.Properties; import java.util.StringTokenizer; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; public class Ivy implements Runnable { @@ -157,7 +162,7 @@ public class Ivy implements Runnable { * the library version, useful for development purposes only, when java is * invoked with -DIVY_DEBUG */ - private static final String LIBVERSION ="1.2.16"; + private static final String LIBVERSION ="1.2.17"; /* * private fields @@ -176,17 +181,22 @@ public class Ivy implements Runnable { private boolean debug; private ServerSocket app; private Collection<IvyWatcher> watchers = new ArrayList<IvyWatcher>(); - private volatile Thread serverThread; // to ensure quick communication of the end + private volatile boolean keeprunning = false ; + private Thread serverThread = null ; + private Map<Integer,IvyClient> clients = Collections.synchronizedMap(new HashMap<Integer,IvyClient>()); private Map<Integer,IvyClient> half = Collections.synchronizedMap(new HashMap<Integer,IvyClient>()); + private Vector<IvyApplicationListener> ivyApplicationListenerList = new Vector<IvyApplicationListener>(); private Vector<IvyBindListener> ivyBindListenerList = new Vector<IvyBindListener>(); private Vector<Thread> sendThreads = new Vector<Thread>(); private String[] filter = null; private boolean stopped = true; - private boolean starting = false; + // private boolean starting = false; + private volatile int nbThreads = 0; protected Object readyToSend = new Object(); private boolean doSendToSelf = false; + private ExecutorService pool = null; // FIXME should not be static ? (findbugs) private static int serial = 0; @@ -332,7 +342,7 @@ public class Ivy implements Runnable { public final void start(final String domainbus) throws IvyException { if (!stopped) throw new IvyException("cannot start a bus that's already started"); - setStarting(true); // this will remain true entil one of the PacketSenders has finished + pool = Executors.newCachedThreadPool(); stopped=false; String db = domainbus; if (db == null) db = Domain.getDomain(null); @@ -376,20 +386,19 @@ public class Ivy implements Runnable { watcherId = getWBUId().replace(' ' , '*'); // no space in the watcherId // readies the rendezvous : an IvyWatcher (thread) per domain bus for (Domain dom: d) watchers.add(new IvyWatcher(this , dom.getDomainaddr() , dom.getPort())); - serverThread = new Thread(this); - serverThread.setName("Ivy TCP server Thread"); - serverThread.setDaemon(true); - serverThread.start(); + keeprunning = true ; + pool.execute(this); + // sends the broadcasts and listen to incoming connexions for (IvyWatcher iw: watchers) iw.doStart(); } - private void waitForRemote(String s) { try { - while (starting==true) { + while (nbThreads > 0) { + traceDebug("I'm waiting before "+s+", a connecting tread is in progress"); Thread.sleep(GRACEDELAY); - traceDebug("I'm waiting before "+s+", a starting tread is in progress"); + traceDebug("I'm done waiting before "+s); } } catch (InterruptedException ie) { // should not happen, and it's not a problem anyway @@ -403,34 +412,22 @@ public class Ivy implements Runnable { waitForRemote("stopping"); if (stopped) return; stopped = true; - serverThread = null; + keeprunning = false; traceDebug("beginning stopping"); try { - // stopping the serverThread - Thread t = serverThread; - if (t != null) { - t.interrupt(); // The serverThread might be stopped even before having been created - // System.out.println("IZZZ joining " + t); - try { t.join(); } catch ( InterruptedException ie ) { - ie.printStackTrace(); - } - } synchronized (lockApp) { app.close(); } + // stopping the IvyWatchers for (IvyWatcher iw: watchers) iw.doStop(); watchers.clear(); // stopping the remaining IvyClients synchronized (clients) { - for (IvyClient c : clients.values()) { - if (c != null) { - c.close(true); - // removeClient(c); // useless ? - } - } + for (IvyClient c : clients.values()) if (c != null) c.close(true); } } catch (IOException e) { traceDebug("IOexception Stop "); } + pool.shutdown(); traceDebug("end stopping"); } @@ -483,8 +480,10 @@ public class Ivy implements Runnable { * @param message A String which will be compared to the regular * expressions of the different clients * @return returns the number of messages actually sent + * + * since 1.2.16 goes synchronized to avoid concurrent access */ - public final int sendMsg(final String message) throws IvyException { + synchronized public final int sendMsg(final String message) throws IvyException { int count = 0; waitForRemote("sending"); synchronized (lock) { @@ -811,6 +810,12 @@ public class Ivy implements Runnable { public final String getWBUId() { return "ID<" + appName + myserial + ":" + nextId() + ":" + generator.nextInt() + ">"; } + + + @Override public String toString() { + return "bus <"+appName+">[port:"+applicationPort+",serial:"+myserial+"]"; + } + private synchronized long nextId() { return current++; } /////////////////////////////////////////////////////////////////: @@ -819,11 +824,19 @@ public class Ivy implements Runnable { // /////////////////////////////////////////////////////////////////: - protected IvyClient createIvyClient(Socket s , int port, boolean domachin) throws IOException { - setStarting(true); // this one will stop when the client has finished starting + /** + * @return false if the client has not been created, true otherwise + */ + protected boolean createIvyClient(Socket s , int port, boolean domachin) throws IOException { IvyClient i = new IvyClient(this , s , port , domachin); - i.doStart(); - return i; + try { + pool.execute(i); + } catch (RejectedExecutionException ree) { + // in another thread, the pool is shut down + traceDebug("in another thread, the pool is shut down"); + return false; + } + return true; } @@ -846,7 +859,6 @@ public class Ivy implements Runnable { synchronized (clients) { clients.put(c.getClientKey() , c); } - setStarting(false); traceDebug("added " + c + " in clients: " + getClientNames(clients)); } else { traceDebug("not adding "+c+" in clients, double connexion detected, removing lowest one"); @@ -860,25 +872,21 @@ public class Ivy implements Runnable { } protected synchronized void addHalf(IvyClient c) { - //synchronized(lock){ - synchronized (half) { half.put(c.getClientKey() , c); } - //} + synchronized (half) { half.put(c.getClientKey() , c); } traceDebug("added " + c + " in half: " + getClientNames(half)); } protected synchronized void removeHalf(IvyClient c) { - //synchronized(lock) { - if (half == null||c == null) return; - synchronized (half) { - half.remove(c.getClientKey()); - } - //} + if (half == null||c == null) return; + synchronized (half) { + half.remove(c.getClientKey()); + } traceDebug("removed " + c + " from half: " + getClientNames(half)); } private synchronized IvyClient searchPeer(IvyClient ic) { synchronized (clients) { - for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.equals(ic))) return peer; + for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.myEquals(ic))) return peer; } return null; } @@ -888,26 +896,29 @@ public class Ivy implements Runnable { */ public void run() { traceDebug("service thread started"); // THREADDEBUG - Thread thisThread = Thread.currentThread(); + serverThread = Thread.currentThread(); + serverThread.setName("Ivy TCP server Thread"); + //serverThread.setDaemon(true); Socket socket = null; - while ( thisThread == serverThread ){ + while ( keeprunning ){ try { synchronized (this) { //System.out.println("DEBUG stopped: "+stopped); - if ((thisThread != serverThread)||stopped) break; // early disconnexion + if ( (!keeprunning) || stopped ) break; // early disconnexion } synchronized (lockApp) { socket = app.accept(); // TODO I can't synchronize on (this) in the run } synchronized (this) { - if ((thisThread != serverThread)||stopped) break; // early disconnexion - createIvyClient(socket , 0 , true); // the peer called me + if ( (!keeprunning) || stopped ) break; // early disconnexion + // the peer called me + if ( ! createIvyClient(socket , 0 , true) ) break; } } catch (InterruptedIOException ie) { // traceDebug("server socket was interrupted. good"); - if (thisThread != serverThread) break; + if ( !keeprunning ) break; } catch( IOException e ) { - if (serverThread == thisThread) { + if ( keeprunning ) { traceDebug("Error IvyServer exception: " + e.getMessage()); System.out.println("Ivy server socket reader caught an exception " + e.getMessage()); System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)"); @@ -920,34 +931,33 @@ public class Ivy implements Runnable { traceDebug("service thread stopped"); // THREADDEBUG } - protected String getAppName() { return appName; } - protected int getAppPort() { return applicationPort; } - protected String getReadyMessage() { return ready_message; } - protected boolean getProtectNewlines() { return doProtectNewlines; } + String getAppName() { return appName; } + int getAppPort() { return applicationPort; } + String getReadyMessage() { return ready_message; } + boolean getProtectNewlines() { return doProtectNewlines; } + String getWatcherId() { return watcherId; } + int getBufferSize() { return bufferSize; } + int getSerial() { return myserial; } + ExecutorService getPool() { return pool; } - protected void setStarting(boolean s) { + protected void pushThread(String reason) { synchronized(readyToSend) { - traceDebug("setStarting "+s); - starting = s; + nbThreads++ ; + //System.out.println("DEBUG PUSH "+this+" -- threads: "+nbThreads + "; reason: "+reason); } } - protected String getWatcherId() { return watcherId; } - protected int getBufferSize() { return bufferSize; } + protected void popThread(String reason) { + synchronized(readyToSend) { + nbThreads-- ; + //System.out.println("DEBUG POP "+this+" -- threads: "+nbThreads + "reason: "+reason); + } + } - protected int getSerial() { return myserial; } private void traceDebug(String s){ if (debug) System.out.println("-->Ivy[" + myserial + "]<-- " + s); } - // stuff to guarantee that all the treads have left - synchronized void registerThread(Thread t) { sendThreads.addElement(t); } - synchronized void unRegisterThread(Thread t) { sendThreads.removeElement(t); } - synchronized Thread getOneThread() { - if (sendThreads.size() == 0) return null; - return (Thread) sendThreads.firstElement(); - } - // a small private method for debbugging purposes private String getClientNames(Map<Integer , IvyClient> t) { StringBuffer s = new StringBuffer(); |