From 4ffe8b84071babe544086f94c66431380d301d59 Mon Sep 17 00:00:00 2001
From: jestin
Date: Sat, 12 May 2012 14:26:08 +0000
Subject: - minor code cleanup - adds a separate file (Protocol.java)
containing the Enum values in a proper pattern - resolved a synchronization
bug on Ivy.stop()
---
src/IvyClient.java | 115 +++++++++++++++++++++++------------------------------
1 file changed, 49 insertions(+), 66 deletions(-)
(limited to 'src/IvyClient.java')
diff --git a/src/IvyClient.java b/src/IvyClient.java
index 441013f..8e20d2d 100755
--- a/src/IvyClient.java
+++ b/src/IvyClient.java
@@ -1,5 +1,7 @@
/**
- * the peers on the bus.
+ * the local handle to a peer on the bus, a Thread is associated to each
+ * instance, performing the socket handling, the protocol parsing, and the
+ * callback running.
*
* @author Yannick Jestin
* @author http://www.tls.cena.fr/products/ivy/
@@ -91,34 +93,17 @@ package fr.dgac.ivy ;
import java.lang.Thread;
import java.net.*;
import java.io.*;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.Map;
import java.util.HashMap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Vector;
import java.util.regex.*;
import java.util.Collection;
public class IvyClient extends Thread {
-
- /* the protocol magic numbers */
- final static int Bye = 0; /* end of the peer */
- final static int AddRegexp = 1;/* the peer adds a regexp */
- final static int Msg = 2 ; /* the peer sends a message */
- final static int Error = 3; /* error message */
- final static int DelRegexp = 4;/* the peer removes one of his regex */
- final static int EndRegexp = 5;/* no more regexp in the handshake */
- final static int SchizoToken = 6; /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */
- final static int DirectMsg = 7;/* the peer sends a direct message */
- final static int Die = 8; /* the peer wants us to quit */
- final static int Ping = 9; // from outer space
- final static int Pong = 10;
- final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
- final static String StartArg = "\u0002";/* begin of arguments */
- final static String EndArg = "\u0003"; /* end of arguments */
- final static String escape ="\u001A";
- final static char escapeChar = escape.charAt(0);
- final static char endArgChar = EndArg.charAt(0);
- final static char newLineChar = '\n';
// private variables
private final static int MAXPONGCALLBACKS = 10;
@@ -126,8 +111,8 @@ public class IvyClient extends Thread {
private static int pingSerial = 0;
private static final Object lock = new Object();
private static int clientSerial=0; /* an unique ID for each IvyClient */
- private Map PingCallbacksTable =
- Collections.synchronizedMap(new HashMap());
+ private SortedMap PingCallbacksTable =
+ Collections.synchronizedSortedMap(new TreeMap());
private Ivy bus;
private Socket socket;
@@ -138,6 +123,7 @@ public class IvyClient extends Thread {
private Integer clientKey;
private boolean discCallbackPerformed = false;
private String remoteHostname="unresolved";
+ private static ThreadGroup clientsThreadGroup = new ThreadGroup("Ivy clients threadgroup");
// protected variables
String appName="none";
@@ -167,7 +153,7 @@ public class IvyClient extends Thread {
}
}
remoteHostname = socket.getInetAddress().getHostName();
- clientThread = new Thread(this); // clientThread handles the incoming traffic
+ clientThread = new Thread(clientsThreadGroup, this); // clientThread handles the incoming traffic
clientThread.setName("Ivy client thread to "+remoteHostname+":"+remotePort);
}
@@ -186,10 +172,10 @@ public class IvyClient extends Thread {
traceDebug("sending our service port "+bus.getAppPort());
Map tosend=bus.getSelfIvyClient().regexpsText;
synchronized (tosend) {
- sendString(SchizoToken,bus.getAppPort(),bus.getAppName());
+ sendString(Protocol.SCHIZOTOKEN,bus.getAppPort(),bus.getAppName());
for (Map.Entry me : tosend.entrySet())
sendRegexp( me.getKey().intValue() , me.getValue() );
- sendString( EndRegexp,0,"");
+ sendString( Protocol.ENDREGEXP,0,"");
}
}
@@ -219,7 +205,7 @@ public class IvyClient extends Thread {
* The content is not modifyable because String are not mutable, and cannot
* be modified once they are create.
*/
- public Collection getRegexps() { return regexpsText.values(); }
+ public Collection getRegexps() { return new ArrayList(regexpsText.values()); }
/**
* allow an Ivy package class to access the list of regexps at a
@@ -239,9 +225,9 @@ public class IvyClient extends Thread {
* @param message the string that will be match-tested
*/
public void sendDirectMsg(int id,String message) throws IvyException {
- if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
+ if ( (message.indexOf(Protocol.NEWLINE)!=-1)||(message.indexOf(Protocol.ENDARG)!=-1))
throw new IvyException("newline character not allowed in Ivy messages");
- sendString(DirectMsg,id,message);
+ sendString(Protocol.DIRECTMSG,id,message);
}
/* closes the connexion to the peer */
@@ -257,7 +243,7 @@ public class IvyClient extends Thread {
* @param message the message that will be carried
*/
public void sendDie(String message) {
- sendString(Die,0,message);
+ sendString(Protocol.DIE,0,message);
}
/**
@@ -267,7 +253,7 @@ public class IvyClient extends Thread {
*/
public void ping(PingCallback pc) throws IvyException {
PCHadd(pingSerial,pc);
- sendString(Ping,pingSerial,"");
+ sendString(Protocol.PING,pingSerial,"");
incSerial();
}
@@ -279,11 +265,11 @@ public class IvyClient extends Thread {
//
///////////////////////////////////////////////////
- static String decode(String s) { return s.replace(escapeChar,'\n'); }
- static String encode(String s) { return s.replace('\n',escapeChar); }
+ static String decode(String s) { return s.replace(Protocol.ESCAPE,'\n'); }
+ static String encode(String s) { return s.replace('\n',Protocol.ESCAPE); }
Integer getClientKey() { return clientKey ; }
- protected void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);}
- protected void delRegexp(int id) {sendString(DelRegexp,id,"");}
+ protected void sendRegexp(int id,String regexp) {sendString(Protocol.ADDREGEXP,id,regexp);}
+ protected void delRegexp(int id) {sendString(Protocol.DELREGEXP,id,"");}
protected int sendMsg(String message) {
int count = 0;
@@ -293,7 +279,7 @@ public class IvyClient extends Thread {
Matcher m = regexp.matcher(message);
if (m.matches()) {
count++; // match
- sendResult(Msg,key,m);
+ sendResult(Protocol.MSG,key,m);
}
}
}
@@ -323,7 +309,8 @@ public class IvyClient extends Thread {
return (clnt.socket.getPort()-socket.getLocalPort());
}
- protected boolean equals(IvyClient clnt) {
+ //FIXME !!!! @override ? Object ?
+ protected boolean myEquals(IvyClient clnt) {
if (clnt==this) return true;
// TODO go beyond the port number ! add some host processing, cf:
// IvyWatcher ...
@@ -412,25 +399,25 @@ public class IvyClient extends Thread {
}
}
- private void sendString(int type, int id, String arg) {
+ private void sendString(Protocol type, int id, String arg) {
try {
- sendBuffer(type+" "+id+StartArg+arg);
+ sendBuffer(type.value()+" "+id+Protocol.STARTARG+arg);
} catch (IvyException ie ) {
System.err.println("received an exception: " + ie.getMessage());
ie.printStackTrace();
}
}
- private void sendResult(int type,Integer id, Matcher m) {
+ private void sendResult(Protocol type,Integer id, Matcher m) {
try {
StringBuffer buffer = new StringBuffer();
- buffer.append(type);
+ buffer.append(type.value());
buffer.append(" ");
buffer.append(id);
- buffer.append(StartArg);
+ buffer.append(Protocol.STARTARG);
for(int i=1;i<=m.groupCount();i++){
buffer.append(m.group(i));
- buffer.append(EndArg);
+ buffer.append(Protocol.ENDARG);
}
sendBuffer(buffer.toString());
} catch (IvyException ie ) {
@@ -472,7 +459,8 @@ public class IvyClient extends Thread {
protected boolean newParseMsg(String s) throws IvyException {
if (s==null) throw new IvyException("null string to parse in protocol");
byte[] b = s.getBytes();
- int from=0,to=0,msgType;
+ int from=0,to=0;
+ Protocol msgType;
Integer msgId;
while ((to=b.length) {
@@ -500,7 +483,7 @@ public class IvyClient extends Thread {
}
from=to+1;
switch (msgType) {
- case Die:
+ case DIE:
traceDebug("received die Message from " + appName);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
@@ -515,7 +498,7 @@ public class IvyClient extends Thread {
throw new IvyException(ioe.getMessage());
}
break;
- case Bye:
+ case BYE:
// the peer quits
traceDebug("received bye Message from "+appName);
// first, I'm not a first class IvyClient any more
@@ -529,13 +512,13 @@ public class IvyClient extends Thread {
throw new IvyException(ioe.getMessage());
}
break;
- case Pong:
+ case PONG:
PCHget(msgId);
break;
- case Ping:
- sendString(Pong,msgId.intValue(),"");
+ case PING:
+ sendString(Protocol.PONG,msgId.intValue(),"");
break;
- case AddRegexp:
+ case ADDREGEXP:
String regexp=s.substring(from,b.length);
if ( bus.checkRegexp(regexp) ) {
try {
@@ -545,7 +528,7 @@ public class IvyClient extends Thread {
} catch (PatternSyntaxException e) {
// the remote client sent an invalid regexp !
traceDebug("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp");
- sendBuffer(Error+e.toString());
+ sendBuffer(Protocol.ERROR.value()+" "+e.toString());
}
} else {
// throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName);
@@ -553,17 +536,17 @@ public class IvyClient extends Thread {
bus.regexpReceived(this,msgId.intValue(),regexp);
}
break;
- case DelRegexp:
+ case DELREGEXP:
regexps.remove(msgId);
String text=(String)regexpsText.remove(msgId);
bus.regexpDeleted(this,msgId.intValue(),text);
break;
- case EndRegexp:
+ case ENDREGEXP:
bus.clientConnects(this);
String srm = bus.getReadyMessage();
if (srm!=null) sendMsg(srm);
break;
- case Msg:
+ case MSG:
Vector v = new Vector();
while (to(PingCallbacksTable.keySet()).first();
- PingCallbackHolder pch = (PingCallbackHolder)PingCallbacksTable.remove(smallest);
+ Integer smallest=PingCallbacksTable.firstKey();
+ PingCallbackHolder pch = PingCallbacksTable.remove(smallest);
System.err.println("no response from "+getApplicationName()+" to ping "+smallest+" after "+pch.age()+" ms, discarding");
}
}
--
cgit v1.1