aboutsummaryrefslogtreecommitdiff
path: root/src/IvyClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/IvyClient.java')
-rwxr-xr-xsrc/IvyClient.java269
1 files changed, 184 insertions, 85 deletions
diff --git a/src/IvyClient.java b/src/IvyClient.java
index 8aad199..b69c8d3 100755
--- a/src/IvyClient.java
+++ b/src/IvyClient.java
@@ -10,6 +10,18 @@
* created for each remote client.
*
* CHANGELOG:
+ * 1.2.4:
+ * - sendBuffer goes synchronized
+ * - sendMsg now has a async parameter, allowing the use of threads to
+ * delegate the sending of messages
+ * - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg()
+ * - breaks the connexion with faulty Ivy clients (either protocol or invalid
+ * regexps, closes bug J007 (CM))
+ * - sendDie now always requires a reason
+ * - invokes the disconnect applicationListeners at the end of the run()
+ * loop.
+ * closes Bug J006 (YJ)
+ * - changed the access of some functions ( sendRegexp, etc ) to protected
* 1.2.3:
* - silently stops on InterruptedIOException.
* - direct Messages
@@ -50,54 +62,64 @@ public class IvyClient implements Runnable {
final static int DelRegexp = 4;/* the peer removes one of his regex */
final static int EndRegexp = 5;/* no more regexp in the handshake */
final static int SchizoToken = 6; /* avoid race condition in concurrent connexions */
+ /* SchizoToken is aka BeginRegexp in other implementations */
final static int DirectMsg = 7;/* the peer sends a direct message */
final static int Die = 8; /* the peer wants us to quit */
final static int Ping = 9; /* checks the presence of the other */
final static int Pong = 10; /* checks the presence of the other */
-
final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
final static String StartArg = "\u0002";/* begin of arguments */
final static String EndArg = "\u0003"; /* end of arguments */
+ final static String escape ="\u001A";
+ final static char escapeChar = escape.charAt(0);
+ final static char endArgChar = EndArg.charAt(0);
+ final static char newLineChar = '\n';
-
- private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
+ // private variables
+ private String messages_classes[] = null;
private Ivy bus;
private Socket socket;
private BufferedReader in;
private OutputStream out;
- private Hashtable regexp_in = new Hashtable();
- private Hashtable regexp_text = new Hashtable();
private int appPort;
private boolean peerCalling;
private volatile Thread clientThread;// volatile to ensure the quick communication
private Integer clientKey ;
private static boolean doping = (System.getProperty("IVY_PING")!=null) ;
- final static int PINGTIMEOUT = 5000;
+ private final static int PINGTIMEOUT = 5000;
private PINGER pinger;
private volatile Thread pingerThread;
+ private boolean discCallbackPerformed = false;
// protected variables
String appName;
+ Hashtable regexps = new Hashtable();
+ Hashtable regexpsText = new Hashtable();
+ static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
+ int protocol;
+
+ IvyClient(){}
- IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey) throws IOException {
+ IvyClient(Ivy bus, Socket socket,boolean peerCalling,Integer clientKey,int protocol) throws IOException {
appName = "Unknown";
appPort = 0;
this.bus = bus;
this.socket = socket;
this.peerCalling=peerCalling;
this.clientKey=clientKey;
+ this.protocol=protocol;
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = socket.getOutputStream();
- Hashtable regexps=bus.regexp_out;
+ Hashtable tosend=bus.selfIvyClient.regexpsText;
// sends our ID, whether we initiated the connexion or not
// the ID is the couple "host name,application Port", the host name
// information is in the socket itself, the port is not known if we
// initiate the connexion
send(SchizoToken,bus.applicationPort,bus.appName);
// sends our regexps to the peer
- for (Enumeration e = regexps.keys(); e.hasMoreElements(); ) {
+ for (Enumeration e = tosend.keys(); e.hasMoreElements(); ) {
Integer ikey = (Integer)e.nextElement();
- sendRegexp( ikey.intValue(),(String)regexps.get(ikey));
+ sendRegexp(ikey.intValue(),(String)tosend.get(ikey));
}
send( EndRegexp,0,"");
// spawns a thread to manage the incoming traffic on this
@@ -111,88 +133,138 @@ public class IvyClient implements Runnable {
}
}
+ public String toString() { return "IvyClient "+bus.appName+":"+appName; }
+
/**
* returns the name of the remote agent.
*/
public String getApplicationName() { return appName ; }
- Integer getClientKey() { return clientKey ; }
-
/**
* allow an Ivy package class to access the list of regexps at a
* given time.
* perhaps we should implement a new IvyApplicationListener method to
* allow the notification of regexp addition and deletion
+ * The content is not modifyable because String are not mutable, and cannot
+ * be modified once they are create.
+ * @see getRegexpsArray to get a String[] result
*/
- Enumeration getRegexps() { return regexp_text.elements(); }
+ public Enumeration getRegexps() { return regexpsText.elements(); }
- int getAppPort() { return appPort ; }
-
- void sendRegexp(int id,String regexp) {
- send(AddRegexp,id,regexp); /* perhaps we should perform some checking here */
+ /**
+ * allow an Ivy package class to access the list of regexps at a
+ * given time.
+ * @since 1.2.4
+ */
+ public String[] getRegexpsArray() {
+ String[] s = new String[regexpsText.size()];
+ int i=0;
+ for (Enumeration e=getRegexps();e.hasMoreElements();)
+ s[i++]=(String)e.nextElement();
+ return s;
}
- public void delRegexp(int id) {send( DelRegexp,id,"");}
-
/**
* sends a direct message to the peer
* @param id the numeric value provided to the remote client
* @param message the string that will be match-tested
*/
- public void sendDirectMsg(int id,String message) { send(DirectMsg,id,message); }
+ public void sendDirectMsg(int id,String message) throws IvyException {
+ if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
+ throw new IvyException("newline character not allowed in Ivy messages");
+ send(DirectMsg,id,message);
+ }
- /**
- * closes the connexion to the peer.
- * @param notify should I send Bye message ?
- * the thread managing the socket is stopped
- */
- void close(boolean notify) throws IOException {
+ /* closes the connexion to the peer */
+ protected void close(boolean notify) throws IOException {
+ bus.waitForAll();
traceDebug("closing connexion to "+appName);
if (doping&&(pinger!=null)) { pinger.stopPinging(); }
if (notify) sendBye("hasta la vista");
stopListening();
- // bus.clientDisconnect(this);
- socket.close(); // should I also close in and out ?
+ socket.close();
}
/**
- * sends the substrings of a message to the peer for each matching regexp.
- * @param message the string that will be match-tested
- * @return the number of messages sent to the peer
+ * asks the remote client to leave the bus.
+ * @param message the message that will be carried
*/
- int sendMsg( String message ) {
+ public void sendDie(String message) { send(Die,0,message); }
+
+ /**
+ * checks the "validity" of a regular expression.
+ * @param exp the string being a regular expression
+ * @return true if the regexp is valid
+ * @since 1.2.4
+ */
+ public boolean CheckRegexp( String exp ) {
+ boolean ok = true;
+ if ( exp.startsWith( "^" )&&messages_classes!=null) {
+ ok=false;
+ for (int i=0 ; i < messages_classes.length;i++) {
+ if (messages_classes[i].equals(exp.substring(1))) return true;
+ }
+ }
+ return ok;
+ }
+
+ ///////////////////////////////////////////////////
+ //
+ // PROTECTED METHODS
+ //
+ ///////////////////////////////////////////////////
+ static String decode(String s) { return s.replace(escapeChar,'\n'); }
+ static String encode(String s) { return s.replace('\n',escapeChar); }
+ Integer getClientKey() { return clientKey ; }
+ int getAppPort() { return appPort ; }
+ void sendRegexp(int id,String regexp) {send(AddRegexp,id,regexp);}
+ void delRegexp(int id) {send( DelRegexp,id,"");}
+
+ int sendMsg(String message,boolean async) {
+ if (async) {
+ new Sender(message);
+ return -1;
+ } else { return sendMsg(message); }
+ }
+
+ private int sendMsg(String message) {
int count = 0;
- for (Enumeration e = regexp_in.keys();e.hasMoreElements();) {
+ for (Enumeration e = regexps.keys();e.hasMoreElements();) {
Integer key = (Integer)e.nextElement();
- RE regexp = (RE)regexp_in.get(key);
+ RE regexp = (RE)regexps.get(key);
+ int nb = regexp.getNumSubs();
REMatch result = regexp.getMatch(message);
- if ( result != null ) {
- send(Msg,key,regexp.getNumSubs(),result);
- count++;
- }
+ if (result==null) continue; // no match
+ count++; // match
+ send(Msg,key,regexp.getNumSubs(),result);
}
return count;
}
-
- void stopListening() {
+
+ ///////////////////////////////////////////////////
+ //
+ // PRIVATE METHODS
+ //
+ ///////////////////////////////////////////////////
+
+ /* interrupt the listener thread */
+ private void stopListening() {
Thread t = clientThread;
if (t==null) return; // we can be summoned to quit from two path at a time
clientThread=null;
t.interrupt();
}
- /**
+ /*
* compares two peers the id is the couple (host,service port).
- * @param clnt the other peer
- * @return true if the peers are similir. This should not happen, it is bad
- * © ® (tm)
+ * true if the peers are similir. This should not happen, it is bad
*/
- boolean sameClient( IvyClient clnt ) {
+ protected boolean sameClient( IvyClient clnt ) {
return ( appPort != 0 && appPort == clnt.appPort )
&& ( getRemoteAddress() == clnt.getRemoteAddress() ) ;
}
- /**
+ /*
* the code of the thread handling the incoming messages.
*/
public void run() {
@@ -209,7 +281,10 @@ public class IvyClient implements Runnable {
if ((msg=in.readLine()) != null ) {
if (clientThread!=thisThread) break; // early stop during readLine()
if (doping && (pingerThread!=null)) pingerThread.interrupt();
- newParseMsg(msg);
+ if (!newParseMsg(msg)) {
+ close(true);
+ break;
+ }
} else {
traceDebug("readline null ! leaving the thead");
break;
@@ -226,9 +301,12 @@ public class IvyClient implements Runnable {
}
traceDebug("normally Disconnected from "+ appName);
traceDebug("Thread stopped");
+ // invokes the disconnect applicationListeners
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
}
- private void sendBuffer( String buffer ) throws IvyException {
+ private synchronized void sendBuffer( String buffer ) throws IvyException {
buffer += "\n";
try {
out.write(buffer.getBytes() );
@@ -237,8 +315,9 @@ public class IvyClient implements Runnable {
traceDebug("I can't send my message to this client. He probably left");
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
- // invokes the die applicationListeners
- bus.disconnectReceived(this);
+ // invokes the disconnect applicationListeners
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
try {
close(false);
} catch (IOException ioe) {
@@ -258,12 +337,8 @@ public class IvyClient implements Runnable {
private void send(int type, Integer id, int nbsub, REMatch result) {
String buffer = type+" "+id+StartArg;
- // Start at 1 because group 0 represent entire matching
- for(int sub = 1; sub <= nbsub; sub++) {
- if (result.getStartIndex(sub) > -1) {
- buffer += result.toString(sub)+EndArg;
- }
- }
+ for(int sub = 1; sub <= nbsub; sub++) if (result.getStartIndex(sub) > -1)
+ buffer += result.toString(sub)+EndArg;
try {
sendBuffer(buffer);
} catch (IvyException ie ) {
@@ -293,24 +368,33 @@ public class IvyClient implements Runnable {
return s;
}
- private void newParseMsg(String s) throws IvyException {
+ private boolean newParseMsg(String s) throws IvyException {
byte[] b = s.getBytes();
int from=0,to=0,msgType;
Integer msgId;
while ((to<b.length)&&(b[to]!=' ')) to++;
- if (to>=b.length) throw new IvyException("protocol error");
+ // return false au lieu de throw
+ if (to>=b.length) {
+ System.out.println("protocol error from "+appName);
+ return false;
+ }
try {
msgType = Integer.parseInt(s.substring(from,to));
} catch (NumberFormatException nfe) {
- throw new IvyException("protocol error on msgType");
+ System.out.println("protocol error on msgType from "+appName);
+ return false;
}
from=to+1;
while ((to<b.length)&&(b[to]!=2)) to++;
- if (to>=b.length) throw new IvyException("protocol error");
+ if (to>=b.length) {
+ System.out.println("protocol error from "+appName);
+ return false;
+ }
try {
msgId = new Integer(s.substring(from,to));
} catch (NumberFormatException nfe) {
- throw new IvyException("protocol error on identifier");
+ System.out.println("protocol error from "+appName+" "+s.substring(from,to)+" is not a number");
+ return false;
}
from=to+1;
switch (msgType) {
@@ -319,7 +403,8 @@ public class IvyClient implements Runnable {
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
- bus.dieReceived(this,msgId.intValue());
+ String message=s.substring(from,b.length);
+ bus.dieReceived(this,msgId.intValue(),message);
// makes the bus die
bus.stop();
try {
@@ -334,7 +419,8 @@ public class IvyClient implements Runnable {
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
- bus.disconnectReceived(this);
+ if (!discCallbackPerformed) bus.clientDisconnects(this);
+ discCallbackPerformed=true;
try {
close(false);
} catch (IOException ioe) {
@@ -343,24 +429,28 @@ public class IvyClient implements Runnable {
break;
case AddRegexp:
String regexp=s.substring(from,b.length);
- if ( bus.CheckRegexp(regexp) ) {
+ if ( CheckRegexp(regexp) ) {
try {
- regexp_in.put(msgId,new RE(regexp));
- regexp_text.put(msgId,regexp);
+ regexps.put(msgId,new RE(regexp));
+ regexpsText.put(msgId,regexp);
+ bus.regexpReceived(this,msgId.intValue(),regexp);
} catch (REException e) {
- throw new IvyException("regexp error " +e.getMessage());
+ // the remote client sent an invalid regexp !
+ System.out.println("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp");
+ return true;
}
} else {
throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName);
}
break;
case DelRegexp:
- regexp_in.remove(msgId);
- regexp_text.remove(msgId);
+ regexps.remove(msgId);
+ String text=(String)regexpsText.remove(msgId);
+ bus.regexpDeleted(this,msgId.intValue(),text);
break;
case EndRegexp:
- bus.connect(this);
- /*
+ bus.clientConnects(this);
+ /* TODO check with the protocol itself.
* the peer is perhaps not ready to handle this message
* an assymetric processing should be written
*/
@@ -371,7 +461,7 @@ public class IvyClient implements Runnable {
while (to<b.length) {
while ( (to<b.length) && (b[to]!=3) ) to++;
if (to<b.length) {
- v.addElement(s.substring(from,to));
+ v.addElement(decode(s.substring(from,to)));
to++;
from=to;
}
@@ -382,7 +472,7 @@ public class IvyClient implements Runnable {
// for developpemnt purposes
// System.out.println(" *"+tab[i]+"* "+(tab[i]).length());
}
- bus.callCallback(this,msgId,tab);
+ bus.selfIvyClient.callCallback(this,msgId,tab);
break;
case Pong:
String paramPong=s.substring(from,b.length);
@@ -398,7 +488,7 @@ public class IvyClient implements Runnable {
String error=s.substring(from,b.length);
traceDebug("Error msg "+msgId+" "+error);
break;
- case SchizoToken:
+ case SchizoToken: // aka BeginRegexp in other implementations
appName=s.substring(from,b.length);
appPort=msgId.intValue();
if ( bus.checkConnected(this) ) {
@@ -415,25 +505,19 @@ public class IvyClient implements Runnable {
bus.directMessage( this, msgId.intValue(), direct );
break;
default:
- throw new IvyException("protocol error, unknown message type "+msgType);
+ System.out.println("protocol error from "+appName+", unknown message type "+msgType);
+ return false;
}
+ return true;
}
- void sendPong(String s) {send(Pong,0,s);}
- void sendPing(String s) {send(Ping,0,s);}
-
+ protected void sendPong(String s) {send(Pong,0,s);}
+ protected void sendPing(String s) {send(Ping,0,s);}
private void sendBye() {send(Bye,0,"");}
private void sendBye(String message) {send(Bye,0,message);}
-
- public void sendDie() { send(Die,0,""); }
- public void sendDie(String message) {send(Die,0,message);}
private InetAddress getRemoteAddress() { return socket.getInetAddress(); }
- public String toString() {
- return "IvyClient "+bus.appName+":"+appName;
- }
-
private void traceDebug(String s){
if (debug) System.out.println("-->IvyClient "+bus.appName+":"+appName+"<-- "+s);
}
@@ -455,4 +539,19 @@ public class IvyClient implements Runnable {
public void stopPinging() { isPinging=false; pingerThread.interrupt();}
}
+ // a class to perform the threaded execution of each new message
+ // this is an experimental feature introduced in 1.2.4
+ class Sender implements Runnable {
+ String message;
+ private Thread t;
+ public Sender(String message) {
+ this.message=message;
+ t=new Thread(Sender.this);
+ bus.registerThread(t);
+ t.start();
+ bus.unRegisterThread(t);
+ }
+ public void run() { sendMsg(message); }
+ } // class Sender
+
}