diff options
author | jestin | 2012-04-26 15:26:33 +0000 |
---|---|---|
committer | jestin | 2012-04-26 15:26:33 +0000 |
commit | ac8c6c0d9bb921166697e9b13009f9ff83ba9716 (patch) | |
tree | 1661b2885aa4d4be2f3cf26deda479641d6f6630 /src | |
parent | 934dfab1b2e6571f241facfcadc733d25a411024 (diff) | |
download | ivy-java-ac8c6c0d9bb921166697e9b13009f9ff83ba9716.zip ivy-java-ac8c6c0d9bb921166697e9b13009f9ff83ba9716.tar.gz ivy-java-ac8c6c0d9bb921166697e9b13009f9ff83ba9716.tar.bz2 ivy-java-ac8c6c0d9bb921166697e9b13009f9ff83ba9716.tar.xz |
Goes to a set of synchronized collections (Map, HashMap, ...) to try and avoid
multithreaded issues ...
Diffstat (limited to 'src')
-rwxr-xr-x | src/Ivy.java | 95 | ||||
-rwxr-xr-x | src/IvyClient.java | 52 | ||||
-rwxr-xr-x | src/IvyWatcher.java | 22 | ||||
-rw-r--r-- | src/Probe.java | 16 | ||||
-rw-r--r-- | src/SelfIvyClient.java | 63 | ||||
-rw-r--r-- | src/WaiterClient.java | 6 |
6 files changed, 155 insertions, 99 deletions
diff --git a/src/Ivy.java b/src/Ivy.java index 2196321..b727c2a 100755 --- a/src/Ivy.java +++ b/src/Ivy.java @@ -15,6 +15,8 @@ *</pre> * * CHANGELOG: + * 1.2.16 + * - now uses the synchronized wrappers of the Java API for all collections * 1.2.15 * - allows the fine tuning of the IvyClient socket buffersize using * IVY_BUFFERSIZE property @@ -125,9 +127,14 @@ package fr.dgac.ivy; import java.net.*; import java.io.*; -import java.util.*; import gnu.getopt.Getopt; import java.util.regex.*; +import java.util.Vector; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.Properties; +import java.util.StringTokenizer; public class Ivy implements Runnable { @@ -157,7 +164,7 @@ public class Ivy implements Runnable { * the library version, useful for development purposes only, when java is * invoked with -DIVY_DEBUG */ - public static final String LIBVERSION ="1.2.15"; + public static final String LIBVERSION ="1.2.16"; public static final int TIMEOUTLENGTH = 1000; private String appName; @@ -171,8 +178,8 @@ public class Ivy implements Runnable { private ServerSocket app; private Vector<IvyWatcher> watchers = new Vector<IvyWatcher>(); private volatile Thread serverThread; // to ensure quick communication of the end - private Hashtable<Integer,IvyClient> clients = new Hashtable<Integer,IvyClient>(); - private Hashtable<Integer,IvyClient> half = new Hashtable<Integer,IvyClient>(); + private Map<Integer,IvyClient> clients = Collections.synchronizedMap(new HashMap<Integer,IvyClient>()); + private Map<Integer,IvyClient> half = Collections.synchronizedMap(new HashMap<Integer,IvyClient>()); private Vector<IvyApplicationListener> ivyApplicationListenerList = new Vector<IvyApplicationListener>(); private Vector<IvyBindListener> ivyBindListenerList = new Vector<IvyBindListener>(); private Vector<Thread> sendThreads = new Vector<Thread>(); @@ -265,9 +272,11 @@ public class Ivy implements Runnable { /* * since 1.2.8 */ - protected static IvyClient alreadyThere(final Hashtable<Integer , IvyClient> c , final String name) { - for (IvyClient ic : c.values()) { - if ((ic != null)&&(name.compareTo(ic.getApplicationName()) == 0)) return ic; + protected static IvyClient alreadyThere(final Map<Integer , IvyClient> c , final String name) { + synchronized (c) { + for (IvyClient ic : c.values()) { + if ((ic != null)&&(name.compareTo(ic.getApplicationName()) == 0)) return ic; + } } return null; } @@ -405,10 +414,12 @@ public class Ivy implements Runnable { for (IvyWatcher iw: watchers) iw.doStop(); watchers.clear(); // stopping the remaining IvyClients - for (IvyClient c : clients.values()) { - if (c != null) { - c.close(true); - removeClient(c); + synchronized (clients) { + for (IvyClient c : clients.values()) { + if (c != null) { + c.close(true); + removeClient(c); + } } } } catch (IOException e) { @@ -476,7 +487,9 @@ public class Ivy implements Runnable { if (doProtectNewlines) msg = IvyClient.encode(message); else if ( (msg.indexOf(IvyClient.newLineChar) != -1)||(msg.indexOf(IvyClient.endArgChar) != -1)) throw new IvyException("newline character not allowed in Ivy messages"); - for ( IvyClient client : clients.values()) if (client != null) count += client.sendMsg(msg); + synchronized (clients) { + for ( IvyClient client : clients.values()) if (client != null) count += client.sendMsg(msg); + } if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(msg); traceDebug("end sending "+message+" to "+count+" clients"); } @@ -554,7 +567,9 @@ public class Ivy implements Runnable { // adds the regexp to our collection in selfIvyClient int key = selfIvyClient.bindMsg(sregexp , callback , type); // notifies the other clients this new regexp - for (IvyClient c : clients.values() ) if (c != null) c.sendRegexp(key , sregexp); + synchronized (clients) { + for (IvyClient c : clients.values() ) if (c != null) c.sendRegexp(key , sregexp); + } return key; } @@ -586,7 +601,9 @@ public class Ivy implements Runnable { */ public final void unBindMsg(final int id) throws IvyException { selfIvyClient.unBindMsg(id); - for (IvyClient ic : clients.values() ) if (ic != null) ic.delRegexp(id ); + synchronized (clients) { + for (IvyClient ic : clients.values() ) if (ic != null) ic.delRegexp(id ); + } } /** @@ -751,7 +768,9 @@ public class Ivy implements Runnable { */ public final Vector<IvyClient> getIvyClients() { Vector<IvyClient> v = new Vector<IvyClient>(); - for (IvyClient ic : clients.values() ) if (ic != null) v.addElement(ic); + synchronized (clients) { + for (IvyClient ic : clients.values() ) if (ic != null) v.addElement(ic); + } return v; } @@ -764,9 +783,11 @@ public class Ivy implements Runnable { public final Vector<IvyClient> getIvyClientsByName(final String name) { Vector<IvyClient> v = new Vector<IvyClient>(); String icname; - for (IvyClient ic : clients.values() ) { - if ( (ic == null)||((icname = ic.getApplicationName()) == null) ) break; - if (icname.compareTo(name) == 0) v.addElement(ic); + synchronized (clients) { + for (IvyClient ic : clients.values() ) { + if ( (ic == null)||((icname = ic.getApplicationName()) == null) ) break; + if (icname.compareTo(name) == 0) v.addElement(ic); + } } return v; } @@ -842,7 +863,9 @@ public class Ivy implements Runnable { protected synchronized void removeClient(IvyClient c) { synchronized(lock) { - clients.remove(c.getClientKey()); + synchronized (clients) { + clients.remove(c.getClientKey()); + } traceDebug("removed " + c + " from clients: " + getClientNames(clients)); } } @@ -854,7 +877,9 @@ public class Ivy implements Runnable { // TODO check if it's not already here ! IvyClient peer = searchPeer(c); if ((peer == null) || peer.distanceTo(c)>0 ){ - clients.put(c.getClientKey() , c); + synchronized (clients) { + clients.put(c.getClientKey() , c); + } setStarting(false); traceDebug("added " + c + " in clients: " + getClientNames(clients)); } else { @@ -869,15 +894,19 @@ public class Ivy implements Runnable { } protected synchronized void addHalf(IvyClient c) { - synchronized(lock){ half.put(c.getClientKey() , c); } + //synchronized(lock){ + synchronized (half) { half.put(c.getClientKey() , c); } + //} traceDebug("added " + c + " in half: " + getClientNames(half)); } protected synchronized void removeHalf(IvyClient c) { - synchronized(lock) { + //synchronized(lock) { if (half == null||c == null) return; - half.remove(c.getClientKey()); - } + synchronized (half) { + half.remove(c.getClientKey()); + } + //} traceDebug("removed " + c + " from half: " + getClientNames(half)); } @@ -894,13 +923,11 @@ public class Ivy implements Runnable { */ private synchronized IvyClient searchPeer(IvyClient ic) { - synchronized(lock) { - //for (Enumeration<IvyClient> e = half.elements(); e.hasMoreElements(); ) { -// peer = e.nextElement(); -// if ((peer != null)&&(peer.equals(ic))) return peer; - // } - for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.equals(ic))) return peer; - } + //synchronized(lock) { + synchronized (clients) { + for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.equals(ic))) return peer; + } + //} return null; } @@ -970,10 +997,12 @@ public class Ivy implements Runnable { } // a small private method for debbugging purposes - private String getClientNames(Hashtable<Integer , IvyClient> t) { + private String getClientNames(Map<Integer , IvyClient> t) { StringBuffer s = new StringBuffer(); s.append("("); - for (IvyClient ic : t.values() ) if (ic != null) s.append(ic.getApplicationName() + ","); + synchronized (t) { + for (IvyClient ic : t.values() ) if (ic != null) s.append(ic.getApplicationName() + ","); + } s.append(")"); return s.toString(); } diff --git a/src/IvyClient.java b/src/IvyClient.java index 8ed25f7..17ce224 100755 --- a/src/IvyClient.java +++ b/src/IvyClient.java @@ -10,6 +10,8 @@ * created for each remote client. * * CHANGELOG: + * 1.2.16 + * - now uses the synchronized wrappers of the Java API for all collections * 1.2.14 * - use autoboxing for the creation of Integer (instead of * new Integer(int). This alows caching, avoids object allocation, and the @@ -89,8 +91,12 @@ package fr.dgac.ivy ; import java.lang.Thread; import java.net.*; import java.io.*; -import java.util.*; +import java.util.Map; +import java.util.HashMap; +import java.util.Collections; +import java.util.Vector; import java.util.regex.*; +import java.util.Collection; public class IvyClient extends Thread { @@ -119,7 +125,8 @@ public class IvyClient extends Thread { private static int pingSerial = 0; private static final Object lock = new Object(); private static int clientSerial=0; /* an unique ID for each IvyClient */ - private Hashtable <Integer,PingCallbackHolder>PingCallbacksTable = new Hashtable<Integer,PingCallbackHolder>(); + private Map <Integer,PingCallbackHolder>PingCallbacksTable = + Collections.synchronizedMap(new HashMap<Integer,PingCallbackHolder>()); private Ivy bus; private Socket socket; @@ -133,8 +140,8 @@ public class IvyClient extends Thread { // protected variables String appName="none"; - Hashtable <Integer,Pattern>regexps = new Hashtable<Integer,Pattern>(); - Hashtable <Integer,String>regexpsText = new Hashtable<Integer,String>(); + Map <Integer,Pattern>regexps = Collections.synchronizedMap(new HashMap<Integer,Pattern>()); + Map <Integer,String>regexpsText = Collections.synchronizedMap(new HashMap<Integer,String>()); static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; // int protocol; private boolean incoming; @@ -175,7 +182,7 @@ public class IvyClient extends Thread { // initiate the connexion private void sendSchizo() throws IOException { traceDebug("sending our service port "+bus.getAppPort()); - Hashtable<Integer,String> tosend=bus.getSelfIvyClient().regexpsText; + Map<Integer,String> tosend=bus.getSelfIvyClient().regexpsText; sendString(SchizoToken,bus.getAppPort(),bus.getAppName()); for (Integer ikey : tosend.keySet()) sendRegexp(ikey.intValue(),tosend.get(ikey)); sendString( EndRegexp,0,""); @@ -207,7 +214,7 @@ public class IvyClient extends Thread { * The content is not modifyable because String are not mutable, and cannot * be modified once they are create. */ - public Enumeration<String> getRegexps() { return regexpsText.elements(); } + public Collection<String> getRegexps() { return regexpsText.values(); } /** * allow an Ivy package class to access the list of regexps at a @@ -217,8 +224,7 @@ public class IvyClient extends Thread { public String[] getRegexpsArray() { String[] s = new String[regexpsText.size()]; int i=0; - for (Enumeration<String>e=getRegexps();e.hasMoreElements();) - s[i++]=e.nextElement(); + for (String sr : getRegexps()) s[i++]=sr; return s; } @@ -628,24 +634,28 @@ public class IvyClient extends Thread { } void PCHadd(int serial,PingCallback pc) { - PingCallbacksTable.put(serial,new PingCallbackHolder(pc)); - if (PingCallbacksTable.size()>MAXPONGCALLBACKS) { - // more than MAXPONGCALLBACKS callbacks, we ought to limit to prevent a - // memory leak - // TODO remove the first - Integer smallest=(Integer)new TreeSet<Integer>(PingCallbacksTable.keySet()).first(); - PingCallbackHolder pch = (PingCallbackHolder)PingCallbacksTable.remove(smallest); - System.err.println("no response from "+getApplicationName()+" to ping "+smallest+" after "+pch.age()+" ms, discarding"); + synchronized (PingCallbacksTable) { + PingCallbacksTable.put(serial,new PingCallbackHolder(pc)); + if (PingCallbacksTable.size()>MAXPONGCALLBACKS) { + // more than MAXPONGCALLBACKS callbacks, we ought to limit to prevent a + // memory leak + // TODO remove the first + Integer smallest=(Integer)new java.util.TreeSet<Integer>(PingCallbacksTable.keySet()).first(); + PingCallbackHolder pch = (PingCallbackHolder)PingCallbacksTable.remove(smallest); + System.err.println("no response from "+getApplicationName()+" to ping "+smallest+" after "+pch.age()+" ms, discarding"); + } } } void PCHget(Integer serial) { - PingCallbackHolder pc = (PingCallbackHolder)PingCallbacksTable.remove(serial); - if (pc==null) { - System.err.println("warning: pong received for a long lost callback"); - return; + synchronized (PingCallbacksTable) { + PingCallbackHolder pc = (PingCallbackHolder)PingCallbacksTable.remove(serial); + if (pc==null) { + System.err.println("warning: pong received for a long lost callback"); + return; + } + pc.run(); } - pc.run(); } private class PingCallbackHolder { diff --git a/src/IvyWatcher.java b/src/IvyWatcher.java index 1e33302..54eca11 100755 --- a/src/IvyWatcher.java +++ b/src/IvyWatcher.java @@ -14,6 +14,8 @@ * thing. * * CHANGELOG: + * 1.2.16 + * - now uses the synchronized wrappers of the Java API for all collections * 1.2.15 * - allows the fine tuning of the IvyClient socket buffersize using * IVY_BUFFERSIZE property @@ -83,7 +85,9 @@ package fr.dgac.ivy ; import java.lang.Thread; import java.net.*; import java.io.*; -import java.util.Hashtable; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; import java.util.regex.*; class IvyWatcher extends Thread { @@ -295,17 +299,17 @@ class IvyWatcher extends Thread { * checks if there is already a broadcast received from the same address * on the same port * - * regoes static ... */ - //private static Hashtable alreadySocks=new Hashtable(); - private Hashtable<String,Integer> alreadySocks=new Hashtable<String,Integer>(); - private synchronized boolean alreadyBroadcasted(String s,int port) { + private Map<String,Integer> alreadySocks=Collections.synchronizedMap(new HashMap<String,Integer>()); + private boolean alreadyBroadcasted(String s,int port) { // System.out.println("DEBUUUUUUUG " + s+ ":" + port); if (s==null) return false; - Integer i = alreadySocks.get(s); - if (((i!=null)&&(i.compareTo(port))==0)) return true; - alreadySocks.put(s,port); - return false; + synchronized (alreadySocks) { + Integer i = alreadySocks.get(s); + if (((i!=null)&&(i.compareTo(port))==0)) return true; + alreadySocks.put(s,port); + return false; + } } /* diff --git a/src/Probe.java b/src/Probe.java index 4425dd4..c8021c5 100644 --- a/src/Probe.java +++ b/src/Probe.java @@ -7,6 +7,8 @@ * (c) CENA * * Changelog: + * 1.2.16 + * - now uses the synchronized wrappers of the Java API for all collections * 1.2.14 * - uses the "new" for: loop construct of Java5 * - throws RuntimeException instead of System.exit(), allows code reuse @@ -247,9 +249,8 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin int total=0; int boundedtotal=0; for (IvyClient ic: v) { - for (Enumeration<String> e = ic.getRegexps();e.hasMoreElements();) { + for (String r : ic.getRegexps()) { total++; - String r = e.nextElement(); if (r.startsWith("^")) boundedtotal++; println(ic.getApplicationName()+" has subscribed to: "+r); } @@ -262,19 +263,16 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin Vector<IvyClient>v=bus.getIvyClientsByName(target); if (v.size()==0) println("no Ivy client with the name \""+target+"\""); for (IvyClient ic:v) { - for (Enumeration<String> e = ic.getRegexps();e.hasMoreElements();) { + for (String r : ic.getRegexps()) { total++; - String r = e.nextElement(); if (r.startsWith("^")) boundedtotal++; - println(target+" has subscribed to: "+(String)e.nextElement()); + println(target+" has subscribed to: "+r); } System.out.println("total: "+total+", unbounded:"+(total-boundedtotal)); } } else if (s.lastIndexOf(".bound")>=0){ println("you have subscribed to:"); - for (Enumeration<String> e = bus.getSelfIvyClient().getRegexps();e.hasMoreElements();) { - println("\t"+e.nextElement()); - } + for (String re : bus.getSelfIvyClient().getRegexps()) println("\t"+re); } else if (s.lastIndexOf(".bind ")>=0){ String regexp=s.substring(6); try { @@ -348,8 +346,6 @@ public class Probe implements IvyApplicationListener, IvyMessageListener, IvyBin public void connect(IvyClient client) { println(client.getApplicationName() + " connected " ); - // for (java.util.Enumeration e=client.getRegexps();e.hasMoreElements();) - // println(client.getApplicationName() + " subscribes to " +e.nextElement() ); } public void disconnect(IvyClient client) { diff --git a/src/SelfIvyClient.java b/src/SelfIvyClient.java index a87c859..3e9ba6a 100644 --- a/src/SelfIvyClient.java +++ b/src/SelfIvyClient.java @@ -6,6 +6,8 @@ * @since 1.2.4 * * CHANGELOG: + * 1.2.16 + * - now uses the synchronized wrappers of the Java API for all collections * 1.2.14 * - uses autoboxing for Boolean * - switch from gnu regexp (deprecated) to the built in java regexp @@ -22,15 +24,19 @@ */ package fr.dgac.ivy ; -import java.util.*; +import java.util.Map; +import java.util.HashMap; +import java.util.Collections; import java.util.regex.*; public class SelfIvyClient extends IvyClient { private Ivy bus; private static int serial=0; /* an unique ID for each regexp */ - private Hashtable<Integer,IvyMessageListener> callbacks=new Hashtable<Integer,IvyMessageListener>(); - private Hashtable<Integer,BindType> threadedFlag=new Hashtable<Integer,BindType>(); + private Map<Integer,IvyMessageListener> callbacks= + Collections.synchronizedMap(new HashMap<Integer,IvyMessageListener>()); + private Map<Integer,BindType> threadedFlag= + Collections.synchronizedMap(new HashMap<Integer,BindType>()); public void sendDirectMsg(int id,String message) { bus.directMessage(this,id,message); @@ -43,42 +49,50 @@ public class SelfIvyClient extends IvyClient { this.appName=appName; } - synchronized protected int bindMsg(String sregexp, IvyMessageListener callback, BindType type ) throws IvyException { + protected int bindMsg(String sregexp, IvyMessageListener callback, BindType type ) throws IvyException { // creates a new binding (regexp,callback) try { Pattern re=Pattern.compile(sregexp,Pattern.DOTALL); Integer key = serial++; regexps.put(key,re); regexpsText.put(key,sregexp); - callbacks.put(key,callback); - threadedFlag.put(key,type); // use autoboxing of boolean + synchronized (callbacks) { + callbacks.put(key,callback); + } + synchronized (threadedFlag) { + threadedFlag.put(key,type); // use autoboxing of boolean + } return key.intValue(); } catch (PatternSyntaxException ree) { throw new IvyException("Invalid regexp " + sregexp); } } - synchronized protected void unBindMsg(int id) throws IvyException { + protected synchronized void unBindMsg(int id) throws IvyException { Integer key = id; - if ( ( regexps.remove(key) == null ) - || (regexpsText.remove(key) == null ) - || (callbacks.remove(key) == null ) - || (threadedFlag.remove(key) == null ) - ) - throw new IvyException("client wants to remove an unexistant regexp "+id); + synchronized (regexps) { synchronized (callbacks) { synchronized (threadedFlag) { + if ( ( regexps.remove(key) == null ) + || (regexpsText.remove(key) == null ) + || (callbacks.remove(key) == null ) + || (threadedFlag.remove(key) == null ) + ) + throw new IvyException("client wants to remove an unexistant regexp "+id); + } } } } // unbinds to the first regexp - synchronized protected boolean unBindMsg(String re) { - if (!regexpsText.contains(re)) return false; - for (Integer k : regexpsText.keySet()) { - if ( (regexpsText.get(k)).compareTo(re) == 0) { - try { - bus.unBindMsg(k.intValue()); - } catch (IvyException ie) { - return false; + protected synchronized boolean unBindMsg(String re) { + synchronized (regexpsText) { + if (regexpsText.get(re) == null) return false; + for (Integer k : regexpsText.keySet()) { + if ( (regexpsText.get(k)).compareTo(re) == 0) { + try { + bus.unBindMsg(k.intValue()); + } catch (IvyException ie) { + return false; + } + return true; } - return true; } } return false; @@ -106,7 +120,10 @@ public class SelfIvyClient extends IvyClient { } protected void callCallback(IvyClient client, Integer key, String[] tab) { - IvyMessageListener callback=callbacks.get(key); + IvyMessageListener callback; + synchronized (callbacks) { + callback=callbacks.get(key); + } if (callback==null) { traceDebug("Not regexp matching id "+key.intValue()+", it must have been unsubscribed concurrently"); return; diff --git a/src/WaiterClient.java b/src/WaiterClient.java index 33196f2..70c3f81 100644 --- a/src/WaiterClient.java +++ b/src/WaiterClient.java @@ -8,7 +8,7 @@ */ package fr.dgac.ivy ; -import java.util.Hashtable; +import java.util.Map; class WaiterClient extends IvyApplicationAdapter implements Runnable { private static final int INCREMENT = 100; @@ -17,9 +17,9 @@ class WaiterClient extends IvyApplicationAdapter implements Runnable { private boolean forever=false; private Thread t; String name; - private Hashtable <Integer,IvyClient>clients ; + private Map <Integer,IvyClient>clients ; - WaiterClient(String n,int timeout,Hashtable <Integer,IvyClient>clients) { + WaiterClient(String n,int timeout,Map <Integer,IvyClient>clients) { this.timeout=timeout; this.clients=clients; name=n; |