aboutsummaryrefslogtreecommitdiff
path: root/src/IvyClient.java
diff options
context:
space:
mode:
authorjestin2012-05-12 14:26:08 +0000
committerjestin2012-05-12 14:26:08 +0000
commit4ffe8b84071babe544086f94c66431380d301d59 (patch)
tree12c9c3a4d6f00c071d8cd298f041dc383e887ff7 /src/IvyClient.java
parent6fbefad24ec7e8783365db61b03357d50ee0dd56 (diff)
downloadivy-java-4ffe8b84071babe544086f94c66431380d301d59.zip
ivy-java-4ffe8b84071babe544086f94c66431380d301d59.tar.gz
ivy-java-4ffe8b84071babe544086f94c66431380d301d59.tar.bz2
ivy-java-4ffe8b84071babe544086f94c66431380d301d59.tar.xz
- minor code cleanup
- adds a separate file (Protocol.java) containing the Enum values in a proper pattern - resolved a synchronization bug on Ivy.stop()
Diffstat (limited to 'src/IvyClient.java')
-rwxr-xr-xsrc/IvyClient.java115
1 files changed, 49 insertions, 66 deletions
diff --git a/src/IvyClient.java b/src/IvyClient.java
index 441013f..8e20d2d 100755
--- a/src/IvyClient.java
+++ b/src/IvyClient.java
@@ -1,5 +1,7 @@
/**
- * the peers on the bus.
+ * the local handle to a peer on the bus, a Thread is associated to each
+ * instance, performing the socket handling, the protocol parsing, and the
+ * callback running.
*
* @author Yannick Jestin
* @author <a href="http://www.tls.cena.fr/products/ivy/">http://www.tls.cena.fr/products/ivy/</a>
@@ -91,34 +93,17 @@ package fr.dgac.ivy ;
import java.lang.Thread;
import java.net.*;
import java.io.*;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.Map;
import java.util.HashMap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Vector;
import java.util.regex.*;
import java.util.Collection;
public class IvyClient extends Thread {
-
- /* the protocol magic numbers */
- final static int Bye = 0; /* end of the peer */
- final static int AddRegexp = 1;/* the peer adds a regexp */
- final static int Msg = 2 ; /* the peer sends a message */
- final static int Error = 3; /* error message */
- final static int DelRegexp = 4;/* the peer removes one of his regex */
- final static int EndRegexp = 5;/* no more regexp in the handshake */
- final static int SchizoToken = 6; /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */
- final static int DirectMsg = 7;/* the peer sends a direct message */
- final static int Die = 8; /* the peer wants us to quit */
- final static int Ping = 9; // from outer space
- final static int Pong = 10;
- final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
- final static String StartArg = "\u0002";/* begin of arguments */
- final static String EndArg = "\u0003"; /* end of arguments */
- final static String escape ="\u001A";
- final static char escapeChar = escape.charAt(0);
- final static char endArgChar = EndArg.charAt(0);
- final static char newLineChar = '\n';
// private variables
private final static int MAXPONGCALLBACKS = 10;
@@ -126,8 +111,8 @@ public class IvyClient extends Thread {
private static int pingSerial = 0;
private static final Object lock = new Object();
private static int clientSerial=0; /* an unique ID for each IvyClient */
- private Map <Integer,PingCallbackHolder>PingCallbacksTable =
- Collections.synchronizedMap(new HashMap<Integer,PingCallbackHolder>());
+ private SortedMap <Integer,PingCallbackHolder>PingCallbacksTable =
+ Collections.synchronizedSortedMap(new TreeMap<Integer,PingCallbackHolder>());
private Ivy bus;
private Socket socket;
@@ -138,6 +123,7 @@ public class IvyClient extends Thread {
private Integer clientKey;
private boolean discCallbackPerformed = false;
private String remoteHostname="unresolved";
+ private static ThreadGroup clientsThreadGroup = new ThreadGroup("Ivy clients threadgroup");
// protected variables
String appName="none";
@@ -167,7 +153,7 @@ public class IvyClient extends Thread {
}
}
remoteHostname = socket.getInetAddress().getHostName();
- clientThread = new Thread(this); // clientThread handles the incoming traffic
+ clientThread = new Thread(clientsThreadGroup, this); // clientThread handles the incoming traffic
clientThread.setName("Ivy client thread to "+remoteHostname+":"+remotePort);
}
@@ -186,10 +172,10 @@ public class IvyClient extends Thread {
traceDebug("sending our service port "+bus.getAppPort());
Map<Integer,String> tosend=bus.getSelfIvyClient().regexpsText;
synchronized (tosend) {
- sendString(SchizoToken,bus.getAppPort(),bus.getAppName());
+ sendString(Protocol.SCHIZOTOKEN,bus.getAppPort(),bus.getAppName());
for (Map.Entry<Integer,String> me : tosend.entrySet())
sendRegexp( me.getKey().intValue() , me.getValue() );
- sendString( EndRegexp,0,"");
+ sendString( Protocol.ENDREGEXP,0,"");
}
}
@@ -219,7 +205,7 @@ public class IvyClient extends Thread {
* The content is not modifyable because String are not mutable, and cannot
* be modified once they are create.
*/
- public Collection<String> getRegexps() { return regexpsText.values(); }
+ public Collection<String> getRegexps() { return new ArrayList<String>(regexpsText.values()); }
/**
* allow an Ivy package class to access the list of regexps at a
@@ -239,9 +225,9 @@ public class IvyClient extends Thread {
* @param message the string that will be match-tested
*/
public void sendDirectMsg(int id,String message) throws IvyException {
- if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
+ if ( (message.indexOf(Protocol.NEWLINE)!=-1)||(message.indexOf(Protocol.ENDARG)!=-1))
throw new IvyException("newline character not allowed in Ivy messages");
- sendString(DirectMsg,id,message);
+ sendString(Protocol.DIRECTMSG,id,message);
}
/* closes the connexion to the peer */
@@ -257,7 +243,7 @@ public class IvyClient extends Thread {
* @param message the message that will be carried
*/
public void sendDie(String message) {
- sendString(Die,0,message);
+ sendString(Protocol.DIE,0,message);
}
/**
@@ -267,7 +253,7 @@ public class IvyClient extends Thread {
*/
public void ping(PingCallback pc) throws IvyException {
PCHadd(pingSerial,pc);
- sendString(Ping,pingSerial,"");
+ sendString(Protocol.PING,pingSerial,"");
incSerial();
}
@@ -279,11 +265,11 @@ public class IvyClient extends Thread {
//
///////////////////////////////////////////////////
- static String decode(String s) { return s.replace(escapeChar,'\n'); }
- static String encode(String s) { return s.replace('\n',escapeChar); }
+ static String decode(String s) { return s.replace(Protocol.ESCAPE,'\n'); }
+ static String encode(String s) { return s.replace('\n',Protocol.ESCAPE); }
Integer getClientKey() { return clientKey ; }
- protected void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);}
- protected void delRegexp(int id) {sendString(DelRegexp,id,"");}
+ protected void sendRegexp(int id,String regexp) {sendString(Protocol.ADDREGEXP,id,regexp);}
+ protected void delRegexp(int id) {sendString(Protocol.DELREGEXP,id,"");}
protected int sendMsg(String message) {
int count = 0;
@@ -293,7 +279,7 @@ public class IvyClient extends Thread {
Matcher m = regexp.matcher(message);
if (m.matches()) {
count++; // match
- sendResult(Msg,key,m);
+ sendResult(Protocol.MSG,key,m);
}
}
}
@@ -323,7 +309,8 @@ public class IvyClient extends Thread {
return (clnt.socket.getPort()-socket.getLocalPort());
}
- protected boolean equals(IvyClient clnt) {
+ //FIXME !!!! @override ? Object ?
+ protected boolean myEquals(IvyClient clnt) {
if (clnt==this) return true;
// TODO go beyond the port number ! add some host processing, cf:
// IvyWatcher ...
@@ -412,25 +399,25 @@ public class IvyClient extends Thread {
}
}
- private void sendString(int type, int id, String arg) {
+ private void sendString(Protocol type, int id, String arg) {
try {
- sendBuffer(type+" "+id+StartArg+arg);
+ sendBuffer(type.value()+" "+id+Protocol.STARTARG+arg);
} catch (IvyException ie ) {
System.err.println("received an exception: " + ie.getMessage());
ie.printStackTrace();
}
}
- private void sendResult(int type,Integer id, Matcher m) {
+ private void sendResult(Protocol type,Integer id, Matcher m) {
try {
StringBuffer buffer = new StringBuffer();
- buffer.append(type);
+ buffer.append(type.value());
buffer.append(" ");
buffer.append(id);
- buffer.append(StartArg);
+ buffer.append(Protocol.STARTARG);
for(int i=1;i<=m.groupCount();i++){
buffer.append(m.group(i));
- buffer.append(EndArg);
+ buffer.append(Protocol.ENDARG);
}
sendBuffer(buffer.toString());
} catch (IvyException ie ) {
@@ -472,7 +459,8 @@ public class IvyClient extends Thread {
protected boolean newParseMsg(String s) throws IvyException {
if (s==null) throw new IvyException("null string to parse in protocol");
byte[] b = s.getBytes();
- int from=0,to=0,msgType;
+ int from=0,to=0;
+ Protocol msgType;
Integer msgId;
while ((to<b.length)&&(b[to]!=' ')) to++;
// return false au lieu de throw
@@ -480,12 +468,7 @@ public class IvyClient extends Thread {
System.out.println("Ivy protocol error from "+appName);
return false;
}
- try {
- msgType = Integer.parseInt(s.substring(from,to));
- } catch (NumberFormatException nfe) {
- System.out.println("Ivy protocol error on msgType from "+appName);
- return false;
- }
+ msgType = Protocol.fromString(s.substring(from,to));
from=to+1;
while ((to<b.length)&&(b[to]!=2)) to++;
if (to>=b.length) {
@@ -500,7 +483,7 @@ public class IvyClient extends Thread {
}
from=to+1;
switch (msgType) {
- case Die:
+ case DIE:
traceDebug("received die Message from " + appName);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
@@ -515,7 +498,7 @@ public class IvyClient extends Thread {
throw new IvyException(ioe.getMessage());
}
break;
- case Bye:
+ case BYE:
// the peer quits
traceDebug("received bye Message from "+appName);
// first, I'm not a first class IvyClient any more
@@ -529,13 +512,13 @@ public class IvyClient extends Thread {
throw new IvyException(ioe.getMessage());
}
break;
- case Pong:
+ case PONG:
PCHget(msgId);
break;
- case Ping:
- sendString(Pong,msgId.intValue(),"");
+ case PING:
+ sendString(Protocol.PONG,msgId.intValue(),"");
break;
- case AddRegexp:
+ case ADDREGEXP:
String regexp=s.substring(from,b.length);
if ( bus.checkRegexp(regexp) ) {
try {
@@ -545,7 +528,7 @@ public class IvyClient extends Thread {
} catch (PatternSyntaxException e) {
// the remote client sent an invalid regexp !
traceDebug("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp");
- sendBuffer(Error+e.toString());
+ sendBuffer(Protocol.ERROR.value()+" "+e.toString());
}
} else {
// throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName);
@@ -553,17 +536,17 @@ public class IvyClient extends Thread {
bus.regexpReceived(this,msgId.intValue(),regexp);
}
break;
- case DelRegexp:
+ case DELREGEXP:
regexps.remove(msgId);
String text=(String)regexpsText.remove(msgId);
bus.regexpDeleted(this,msgId.intValue(),text);
break;
- case EndRegexp:
+ case ENDREGEXP:
bus.clientConnects(this);
String srm = bus.getReadyMessage();
if (srm!=null) sendMsg(srm);
break;
- case Msg:
+ case MSG:
Vector <String>v = new Vector<String>();
while (to<b.length) {
while ( (to<b.length) && (b[to]!=3) ) to++;
@@ -580,11 +563,11 @@ public class IvyClient extends Thread {
traceDebug(tab);
bus.getSelfIvyClient().callCallback(this,msgId,tab);
break;
- case Error:
+ case ERROR:
String error=s.substring(from,b.length);
traceDebug("Error msg "+msgId+" "+error);
break;
- case SchizoToken: // aka BeginRegexp in other implementations, or MsgSync
+ case SCHIZOTOKEN: // aka BeginRegexp in other implementations, or MsgSync
appName=s.substring(from,b.length);
remotePort=msgId.intValue();
traceDebug("the peer sent his service port: "+remotePort);
@@ -605,7 +588,7 @@ public class IvyClient extends Thread {
bus.handShake(this);
}
break;
- case DirectMsg:
+ case DIRECTMSG:
String direct=s.substring(from,b.length);
bus.directMessage( this, msgId.intValue(), direct );
break;
@@ -617,7 +600,7 @@ public class IvyClient extends Thread {
}
//private void sendBye() {sendString(Bye,0,"");}
- private void sendBye(String message) {sendString(Bye,0,message);}
+ private void sendBye(String message) {sendString(Protocol.BYE,0,message);}
private void traceDebug(String s){
String app="noname";
@@ -648,8 +631,8 @@ public class IvyClient extends Thread {
// more than MAXPONGCALLBACKS callbacks, we ought to limit to prevent a
// memory leak
// TODO remove the first
- Integer smallest=(Integer)new java.util.TreeSet<Integer>(PingCallbacksTable.keySet()).first();
- PingCallbackHolder pch = (PingCallbackHolder)PingCallbacksTable.remove(smallest);
+ Integer smallest=PingCallbacksTable.firstKey();
+ PingCallbackHolder pch = PingCallbacksTable.remove(smallest);
System.err.println("no response from "+getApplicationName()+" to ping "+smallest+" after "+pch.age()+" ms, discarding");
}
}