From 4ffe8b84071babe544086f94c66431380d301d59 Mon Sep 17 00:00:00 2001 From: jestin Date: Sat, 12 May 2012 14:26:08 +0000 Subject: - minor code cleanup - adds a separate file (Protocol.java) containing the Enum values in a proper pattern - resolved a synchronization bug on Ivy.stop() --- src/IvyClient.java | 115 +++++++++++++++++++++++------------------------------ 1 file changed, 49 insertions(+), 66 deletions(-) (limited to 'src/IvyClient.java') diff --git a/src/IvyClient.java b/src/IvyClient.java index 441013f..8e20d2d 100755 --- a/src/IvyClient.java +++ b/src/IvyClient.java @@ -1,5 +1,7 @@ /** - * the peers on the bus. + * the local handle to a peer on the bus, a Thread is associated to each + * instance, performing the socket handling, the protocol parsing, and the + * callback running. * * @author Yannick Jestin * @author http://www.tls.cena.fr/products/ivy/ @@ -91,34 +93,17 @@ package fr.dgac.ivy ; import java.lang.Thread; import java.net.*; import java.io.*; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.Map; import java.util.HashMap; +import java.util.ArrayList; import java.util.Collections; import java.util.Vector; import java.util.regex.*; import java.util.Collection; public class IvyClient extends Thread { - - /* the protocol magic numbers */ - final static int Bye = 0; /* end of the peer */ - final static int AddRegexp = 1;/* the peer adds a regexp */ - final static int Msg = 2 ; /* the peer sends a message */ - final static int Error = 3; /* error message */ - final static int DelRegexp = 4;/* the peer removes one of his regex */ - final static int EndRegexp = 5;/* no more regexp in the handshake */ - final static int SchizoToken = 6; /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */ - final static int DirectMsg = 7;/* the peer sends a direct message */ - final static int Die = 8; /* the peer wants us to quit */ - final static int Ping = 9; // from outer space - final static int Pong = 10; - final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ - final static String StartArg = "\u0002";/* begin of arguments */ - final static String EndArg = "\u0003"; /* end of arguments */ - final static String escape ="\u001A"; - final static char escapeChar = escape.charAt(0); - final static char endArgChar = EndArg.charAt(0); - final static char newLineChar = '\n'; // private variables private final static int MAXPONGCALLBACKS = 10; @@ -126,8 +111,8 @@ public class IvyClient extends Thread { private static int pingSerial = 0; private static final Object lock = new Object(); private static int clientSerial=0; /* an unique ID for each IvyClient */ - private Map PingCallbacksTable = - Collections.synchronizedMap(new HashMap()); + private SortedMap PingCallbacksTable = + Collections.synchronizedSortedMap(new TreeMap()); private Ivy bus; private Socket socket; @@ -138,6 +123,7 @@ public class IvyClient extends Thread { private Integer clientKey; private boolean discCallbackPerformed = false; private String remoteHostname="unresolved"; + private static ThreadGroup clientsThreadGroup = new ThreadGroup("Ivy clients threadgroup"); // protected variables String appName="none"; @@ -167,7 +153,7 @@ public class IvyClient extends Thread { } } remoteHostname = socket.getInetAddress().getHostName(); - clientThread = new Thread(this); // clientThread handles the incoming traffic + clientThread = new Thread(clientsThreadGroup, this); // clientThread handles the incoming traffic clientThread.setName("Ivy client thread to "+remoteHostname+":"+remotePort); } @@ -186,10 +172,10 @@ public class IvyClient extends Thread { traceDebug("sending our service port "+bus.getAppPort()); Map tosend=bus.getSelfIvyClient().regexpsText; synchronized (tosend) { - sendString(SchizoToken,bus.getAppPort(),bus.getAppName()); + sendString(Protocol.SCHIZOTOKEN,bus.getAppPort(),bus.getAppName()); for (Map.Entry me : tosend.entrySet()) sendRegexp( me.getKey().intValue() , me.getValue() ); - sendString( EndRegexp,0,""); + sendString( Protocol.ENDREGEXP,0,""); } } @@ -219,7 +205,7 @@ public class IvyClient extends Thread { * The content is not modifyable because String are not mutable, and cannot * be modified once they are create. */ - public Collection getRegexps() { return regexpsText.values(); } + public Collection getRegexps() { return new ArrayList(regexpsText.values()); } /** * allow an Ivy package class to access the list of regexps at a @@ -239,9 +225,9 @@ public class IvyClient extends Thread { * @param message the string that will be match-tested */ public void sendDirectMsg(int id,String message) throws IvyException { - if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1)) + if ( (message.indexOf(Protocol.NEWLINE)!=-1)||(message.indexOf(Protocol.ENDARG)!=-1)) throw new IvyException("newline character not allowed in Ivy messages"); - sendString(DirectMsg,id,message); + sendString(Protocol.DIRECTMSG,id,message); } /* closes the connexion to the peer */ @@ -257,7 +243,7 @@ public class IvyClient extends Thread { * @param message the message that will be carried */ public void sendDie(String message) { - sendString(Die,0,message); + sendString(Protocol.DIE,0,message); } /** @@ -267,7 +253,7 @@ public class IvyClient extends Thread { */ public void ping(PingCallback pc) throws IvyException { PCHadd(pingSerial,pc); - sendString(Ping,pingSerial,""); + sendString(Protocol.PING,pingSerial,""); incSerial(); } @@ -279,11 +265,11 @@ public class IvyClient extends Thread { // /////////////////////////////////////////////////// - static String decode(String s) { return s.replace(escapeChar,'\n'); } - static String encode(String s) { return s.replace('\n',escapeChar); } + static String decode(String s) { return s.replace(Protocol.ESCAPE,'\n'); } + static String encode(String s) { return s.replace('\n',Protocol.ESCAPE); } Integer getClientKey() { return clientKey ; } - protected void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);} - protected void delRegexp(int id) {sendString(DelRegexp,id,"");} + protected void sendRegexp(int id,String regexp) {sendString(Protocol.ADDREGEXP,id,regexp);} + protected void delRegexp(int id) {sendString(Protocol.DELREGEXP,id,"");} protected int sendMsg(String message) { int count = 0; @@ -293,7 +279,7 @@ public class IvyClient extends Thread { Matcher m = regexp.matcher(message); if (m.matches()) { count++; // match - sendResult(Msg,key,m); + sendResult(Protocol.MSG,key,m); } } } @@ -323,7 +309,8 @@ public class IvyClient extends Thread { return (clnt.socket.getPort()-socket.getLocalPort()); } - protected boolean equals(IvyClient clnt) { + //FIXME !!!! @override ? Object ? + protected boolean myEquals(IvyClient clnt) { if (clnt==this) return true; // TODO go beyond the port number ! add some host processing, cf: // IvyWatcher ... @@ -412,25 +399,25 @@ public class IvyClient extends Thread { } } - private void sendString(int type, int id, String arg) { + private void sendString(Protocol type, int id, String arg) { try { - sendBuffer(type+" "+id+StartArg+arg); + sendBuffer(type.value()+" "+id+Protocol.STARTARG+arg); } catch (IvyException ie ) { System.err.println("received an exception: " + ie.getMessage()); ie.printStackTrace(); } } - private void sendResult(int type,Integer id, Matcher m) { + private void sendResult(Protocol type,Integer id, Matcher m) { try { StringBuffer buffer = new StringBuffer(); - buffer.append(type); + buffer.append(type.value()); buffer.append(" "); buffer.append(id); - buffer.append(StartArg); + buffer.append(Protocol.STARTARG); for(int i=1;i<=m.groupCount();i++){ buffer.append(m.group(i)); - buffer.append(EndArg); + buffer.append(Protocol.ENDARG); } sendBuffer(buffer.toString()); } catch (IvyException ie ) { @@ -472,7 +459,8 @@ public class IvyClient extends Thread { protected boolean newParseMsg(String s) throws IvyException { if (s==null) throw new IvyException("null string to parse in protocol"); byte[] b = s.getBytes(); - int from=0,to=0,msgType; + int from=0,to=0; + Protocol msgType; Integer msgId; while ((to=b.length) { @@ -500,7 +483,7 @@ public class IvyClient extends Thread { } from=to+1; switch (msgType) { - case Die: + case DIE: traceDebug("received die Message from " + appName); // first, I'm not a first class IvyClient any more bus.removeClient(this); @@ -515,7 +498,7 @@ public class IvyClient extends Thread { throw new IvyException(ioe.getMessage()); } break; - case Bye: + case BYE: // the peer quits traceDebug("received bye Message from "+appName); // first, I'm not a first class IvyClient any more @@ -529,13 +512,13 @@ public class IvyClient extends Thread { throw new IvyException(ioe.getMessage()); } break; - case Pong: + case PONG: PCHget(msgId); break; - case Ping: - sendString(Pong,msgId.intValue(),""); + case PING: + sendString(Protocol.PONG,msgId.intValue(),""); break; - case AddRegexp: + case ADDREGEXP: String regexp=s.substring(from,b.length); if ( bus.checkRegexp(regexp) ) { try { @@ -545,7 +528,7 @@ public class IvyClient extends Thread { } catch (PatternSyntaxException e) { // the remote client sent an invalid regexp ! traceDebug("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp"); - sendBuffer(Error+e.toString()); + sendBuffer(Protocol.ERROR.value()+" "+e.toString()); } } else { // throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName); @@ -553,17 +536,17 @@ public class IvyClient extends Thread { bus.regexpReceived(this,msgId.intValue(),regexp); } break; - case DelRegexp: + case DELREGEXP: regexps.remove(msgId); String text=(String)regexpsText.remove(msgId); bus.regexpDeleted(this,msgId.intValue(),text); break; - case EndRegexp: + case ENDREGEXP: bus.clientConnects(this); String srm = bus.getReadyMessage(); if (srm!=null) sendMsg(srm); break; - case Msg: + case MSG: Vector v = new Vector(); while (to(PingCallbacksTable.keySet()).first(); - PingCallbackHolder pch = (PingCallbackHolder)PingCallbacksTable.remove(smallest); + Integer smallest=PingCallbacksTable.firstKey(); + PingCallbackHolder pch = PingCallbacksTable.remove(smallest); System.err.println("no response from "+getApplicationName()+" to ping "+smallest+" after "+pch.age()+" ms, discarding"); } } -- cgit v1.1