/**
* the peers on the bus.
*
* @author Yannick Jestin
* @author http://www.tls.cena.fr/products/ivy/
*
* each time a connexion is made with a remote peer, the regexp are exchanged
* once ready, a ready message is sent, and then we can send messages,
* die messages, direct messages, add or remove regexps, or quit. A thread is
* created for each remote client.
*
* CHANGELOG:
* 1.2.14
* - use autoboxing for the creation of Integer (instead of
* new Integer(int). This alows caching, avoids object allocation, and the
* code will be faster
* - removed the synchronized on boxed primitive (Integer(0) for lock, which
* could be cached and reused elsewhere). Lock is now a new Object()
* - remove the Thread.start() from the constructor, to avoid mulithread issues
* see * http://findbugs.sourceforge.net/bugDescriptions.html#SC_START_IN_CTOR
* now ,we have to call IvyClient.start() after it has been created
* - add generic types to declarations
* - remove sendBye(), which is never called
* - switch from gnu regexp (deprecated) to the built in java regexp
* 1.2.12
* - Ping and Pong are back ...
* 1.2.8
* - no CheckRegexp anymore
* - synchronized(regexps) pour le match et le getParen():
* quoting http://jakarta.apache.org/regexp/apidocs/org/apache/regexp/RE.html ,
* However, RE and RECompiler are not threadsafe (for efficiency reasons,
* and because requiring thread safety in this class is deemed to be a rare
* requirement), so you will need to construct a separate compiler or
* matcher object for each thread (unless you do thread synchronization
* yourself)
* - reintroduces bugs for multibus connexions. I can't fix a cross
* implementation bug.
* 1.2.6
* - major cleanup to handle simultaneous connections, e.g., between two
* busses within the same process ( AsyncAPI test is very stressful )
* I made an assymetric processing to elect the client that should
* disconnect based on the socket ports ... might work...
* - jakarta regexp are not meant to be threadsafe, so for match() and
* compile() must be enclaused in a synchronized block
* - now sends back an error message when an incorrect regexp is sent
* the message is supposed to be readable
* - sendMsg has no more async parameter
* 1.2.5:
* - no more java ping pong
* 1.2.5:
* - use org.apache.regexp instead of gnu-regexp
* http://jakarta.apache.org/regexp/apidocs/
* 1.2.4:
* - sendBuffer goes synchronized
* - sendMsg now has a async parameter, allowing the use of threads to
* delegate the sending of messages
* - API change, IvyException raised when \n or \0x3 are present in bus.sendMsg()
* - breaks the connexion with faulty Ivy clients (either protocol or invalid
* regexps, closes bug J007 (CM))
* - sendDie now always requires a reason
* - invokes the disconnect applicationListeners at the end of the run()
* loop.
* closes Bug J006 (YJ)
* - changed the access of some functions ( sendRegexp, etc ) to protected
* 1.2.3:
* - silently stops on InterruptedIOException.
* - direct Messages
* - deals with early stops during readline
* 1.2.2:
* - cleared a bug causing the CPU to be eating when a remote client left the
* bus. closes Damien Figarol bug reported on december, 2002. It is handled
* in the readline() thread
* 1.2.1:
* - removes a NullPointerException when stops pinging on a pinger that
* wasn't even started
* 1.0.12:
* - introducing a Ping and Pong in the protocol, in order to detect the loss of
* connection faster. Enabled through the use of -DIVY_PING variable only
* the PINGTIMEOUT value in milliseconds allows me to have a status of the
* socket guaranteed after this timeout
* - right handling of IOExceptions in sendBuffer, the Client is removed from
* the bus
* - sendDie goes public, so does sendDie(String)
* - appName visibility changed from private to protected
* 1.0.10:
* - removed the timeout bug eating all the CPU resources
*/
package fr.dgac.ivy ;
import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.*;
import java.util.regex.*;
public class IvyClient extends Thread {
/* the protocol magic numbers */
final static int Bye = 0; /* end of the peer */
final static int AddRegexp = 1;/* the peer adds a regexp */
final static int Msg = 2 ; /* the peer sends a message */
final static int Error = 3; /* error message */
final static int DelRegexp = 4;/* the peer removes one of his regex */
final static int EndRegexp = 5;/* no more regexp in the handshake */
final static int SchizoToken = 6; /* avoid race condition in concurrent connexions, aka BeginRegexp in other implementations */
final static int DirectMsg = 7;/* the peer sends a direct message */
final static int Die = 8; /* the peer wants us to quit */
final static int Ping = 9; // from outer space
final static int Pong = 10;
final static String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
final static String StartArg = "\u0002";/* begin of arguments */
final static String EndArg = "\u0003"; /* end of arguments */
final static String escape ="\u001A";
final static char escapeChar = escape.charAt(0);
final static char endArgChar = EndArg.charAt(0);
final static char newLineChar = '\n';
// private variables
private final static int MAXPONGCALLBACKS = 10;
private static int pingSerial = 0;
private static final Object lock = new Object();
private static int clientSerial=0; /* an unique ID for each IvyClient */
private Hashtable PingCallbacksTable = new Hashtable();
private Ivy bus;
private Socket socket;
private BufferedReader in;
private OutputStream out;
private int remotePort=0;
private volatile Thread clientThread;// volatile to ensure the quick communication
private Integer clientKey;
private boolean discCallbackPerformed = false;
private String remoteHostname="unresolved";
// protected variables
String appName="none";
Hashtable regexps = new Hashtable();
Hashtable regexpsText = new Hashtable();
static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
// int protocol;
private boolean incoming;
IvyClient() { }
IvyClient(Ivy bus, Socket socket,int remotePort,boolean incoming) throws IOException {
synchronized(lock) { clientKey=clientSerial++; }
this.bus = bus;
this.remotePort = remotePort;
this.incoming = incoming;
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = socket.getOutputStream();
incoming=(remotePort==0);
traceDebug(((incoming)?"incoming":"outgoing")+" connection on "+socket);
this.socket = socket;
if (!incoming) {
synchronized(bus) {
bus.addHalf(this); // register a half connexion
sendSchizo();
// the registering (handShake) will take place at the reception of the regexps...
}
}
remoteHostname = socket.getInetAddress().getHostName();
clientThread = new Thread(this); // clientThread handles the incoming traffic
}
/* removed from the constructor, to avoid Mulithread correctnaess issuses
* see http://findbugs.sourceforge.net/bugDescriptions.html#SC_START_IN_CTOR
*/
protected void doStart() {
clientThread.start();
}
// sends our ID, whether we initiated the connexion or not
// the ID is the couple "host name,application Port", the host name
// information is in the socket itself, the port is not known if we
// initiate the connexion
private void sendSchizo() throws IOException {
traceDebug("sending our service port "+bus.getAppPort());
Hashtable tosend=bus.getSelfIvyClient().regexpsText;
sendString(SchizoToken,bus.getAppPort(),bus.getAppName());
for (Integer ikey : tosend.keySet()) sendRegexp(ikey.intValue(),tosend.get(ikey));
sendString( EndRegexp,0,"");
}
public String toString() {
return "IC["+clientKey+","+bus.getSerial()+"] "+bus.getAppName()+":"+appName+":"+remotePort; }
public String toStringExt() {
return "client socket:"+socket+", remoteport:" + remotePort;
}
/**
* returns the name of the remote agent.
*/
public String getApplicationName() { return appName ; }
/**
* returns the host name of the remote agent.
* @since 1.2.7
*/
public String getHostName() { return remoteHostname ; }
/**
* allow an Ivy package class to access the list of regexps at a
* given time.
* perhaps we should implement a new IvyApplicationListener method to
* allow the notification of regexp addition and deletion
* The content is not modifyable because String are not mutable, and cannot
* be modified once they are create.
*/
public Enumeration getRegexps() { return regexpsText.elements(); }
/**
* allow an Ivy package class to access the list of regexps at a
* given time.
* @since 1.2.4
*/
public String[] getRegexpsArray() {
String[] s = new String[regexpsText.size()];
int i=0;
for (Enumeratione=getRegexps();e.hasMoreElements();)
s[i++]=e.nextElement();
return s;
}
/**
* sends a direct message to the peer
* @param id the numeric value provided to the remote client
* @param message the string that will be match-tested
*/
public void sendDirectMsg(int id,String message) throws IvyException {
if ( (message.indexOf(IvyClient.newLineChar)!=-1)||(message.indexOf(IvyClient.endArgChar)!=-1))
throw new IvyException("newline character not allowed in Ivy messages");
sendString(DirectMsg,id,message);
}
/* closes the connexion to the peer */
protected void close(boolean notify) throws IOException {
traceDebug("closing connexion to "+appName);
if (notify) sendBye("hasta la vista");
stopListening();
socket.close(); // TODO is it necessary ? trying to fix a deadlock
}
/**
* asks the remote client to leave the bus.
* @param message the message that will be carried
*/
public void sendDie(String message) {
sendString(Die,0,message);
}
/**
* triggers a Ping, and executes the callback
* @param pc the callback that will be triggerred (once) when the ponc is
* received
*/
public void ping(PingCallback pc) throws IvyException {
PCHadd(pingSerial,pc);
sendString(Ping,pingSerial++,"");
}
///////////////////////////////////////////////////
//
// PROTECTED METHODS
//
///////////////////////////////////////////////////
static String decode(String s) { return s.replace(escapeChar,'\n'); }
static String encode(String s) { return s.replace('\n',escapeChar); }
Integer getClientKey() { return clientKey ; }
protected void sendRegexp(int id,String regexp) {sendString(AddRegexp,id,regexp);}
protected void delRegexp(int id) {sendString(DelRegexp,id,"");}
protected int sendMsg(String message) {
int count = 0;
for (Integer key : regexps.keySet()) {
Pattern regexp = regexps.get(key);
synchronized (regexp) {
Matcher m = regexp.matcher(message);
if (m.matches()) {
count++; // match
sendResult(Msg,key,m);
}
}
}
return count;
}
///////////////////////////////////////////////////
//
// PRIVATE METHODS
//
///////////////////////////////////////////////////
/* interrupt the listener thread */
private void stopListening() {
Thread t = clientThread;
if (t==null) return; // we can be summoned to quit from two path at a time
clientThread=null;
t.interrupt();
}
/*
* compares two peers the id is the couple (host,service port).
* true if the peers are similar. This should not happen, it is bad
*/
protected int distanceTo(IvyClient clnt) {
// return clnt.clientKey.compareTo(clientKey); // Wrong. it's random...
return (clnt.socket.getPort()-socket.getLocalPort());
}
protected boolean equals(IvyClient clnt) {
if (clnt==this) return true;
// TODO go beyond the port number ! add some host processing, cf:
// IvyWatcher ...
if (remotePort==clnt.remotePort) return true;
/*
e.g.
if (socket.getInetAddress()==null) return false;
if (clnt.socket.getInetAddress()==null) return false;
if (!socket.getInetAddress().equals(clnt.socket.getInetAddress())) return false;
*/
return false;
}
/*
* the code of the thread handling the incoming messages.
*/
public void run() {
traceDebug("Thread started");
Thread thisThread = Thread.currentThread();
String msg = null;
try {
traceDebug("connection established with "+
socket.getInetAddress().getHostName()+ ":"+socket.getPort());
} catch (Exception ie) {
traceDebug("Interrupted while resolving remote hostname");
}
while (clientThread==thisThread) {
try {
if ((msg=in.readLine()) != null ) {
if (clientThread!=thisThread) break; // early stop during readLine()
if (!newParseMsg(msg)) {
close(true);
break;
}
} else {
traceDebug("readline null ! leaving the thread");
break;
}
} catch (IvyException ie) {
traceDebug("caught an IvyException");
ie.printStackTrace();
} catch (InterruptedIOException ioe) {
traceDebug("I have been interrupted. I'm about to leave my thread loop");
if (thisThread!=clientThread) break;
} catch (IOException e) {
if (clientThread!=null) {
traceDebug("abnormally Disconnected from "+ socket.getInetAddress().getHostName()+":"+socket.getPort());
}
break;
}
}
traceDebug("normally Disconnected from "+ bus.getAppName());
bus.removeClient(this);
// invokes the disconnect applicationListeners
if (!discCallbackPerformed) bus.clientDisconnects(this);
discCallbackPerformed=true;
traceDebug("Thread stopped");
}
@Override public void interrupt(){
super.interrupt();
try {
if (socket!=null) socket.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
protected synchronized void sendBuffer( String buffer ) throws IvyException {
buffer += "\n";
try {
out.write(buffer.getBytes() );
out.flush();
} catch ( IOException e ) {
traceDebug("I can't send my message to this client. He probably left");
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the disconnect applicationListeners
if (!discCallbackPerformed) bus.clientDisconnects(this);
discCallbackPerformed=true;
try {
close(false);
} catch (IOException ioe) {
throw new IvyException("close failed"+ioe.getMessage());
}
}
}
private void sendString(int type, int id, String arg) {
try {
sendBuffer(type+" "+id+StartArg+arg);
} catch (IvyException ie ) {
System.err.println("received an exception: " + ie.getMessage());
ie.printStackTrace();
}
}
private void sendResult(int type,Integer id, Matcher m) {
try {
StringBuffer buffer = new StringBuffer();
buffer.append(type);
buffer.append(" ");
buffer.append(id);
buffer.append(StartArg);
for(int i=1;i<=m.groupCount();i++){
buffer.append(m.group(i));
buffer.append(EndArg);
}
sendBuffer(buffer.toString());
} catch (IvyException ie ) {
System.err.println("received an exception: " + ie.getMessage());
ie.printStackTrace();
} catch (StringIndexOutOfBoundsException sioobe) {
System.out.println("arg: "+m.groupCount()+" "+m);
sioobe.printStackTrace();
}
}
private String dumpHex(String s) {
byte[] b = s.getBytes();
StringBuffer outDump = new StringBuffer();
StringBuffer zu = new StringBuffer("\t");
for (int i=0;i15) ? c : 'X');
zu.append(" ");
}
outDump.append(zu);
return outDump.toString();
}
private String dumpMsg(String s) {
StringBuffer deb = new StringBuffer(" \""+s+"\" "+s.length()+" cars, ");
for (int i=0;i=b.length) {
System.out.println("Ivy protocol error from "+appName);
return false;
}
try {
msgType = Integer.parseInt(s.substring(from,to));
} catch (NumberFormatException nfe) {
System.out.println("Ivy protocol error on msgType from "+appName);
return false;
}
from=to+1;
while ((to=b.length) {
System.out.println("Ivy protocol error from "+appName);
return false;
}
try {
msgId = new Integer(s.substring(from,to));
} catch (NumberFormatException nfe) {
System.out.println("Ivy protocol error from "+appName+" "+s.substring(from,to)+" is not a number");
return false;
}
from=to+1;
switch (msgType) {
case Die:
traceDebug("received die Message from " + appName);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
String message=s.substring(from,b.length);
bus.dieReceived(this,msgId.intValue(),message);
// makes the bus die
bus.stop();
try {
close(false);
} catch (IOException ioe) {
throw new IvyException(ioe.getMessage());
}
break;
case Bye:
// the peer quits
traceDebug("received bye Message from "+appName);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
if (!discCallbackPerformed) bus.clientDisconnects(this);
discCallbackPerformed=true;
try {
close(false);
} catch (IOException ioe) {
throw new IvyException(ioe.getMessage());
}
break;
case Pong:
PCHget(msgId);
break;
case Ping:
sendString(Pong,msgId.intValue(),"");
break;
case AddRegexp:
String regexp=s.substring(from,b.length);
if ( bus.checkRegexp(regexp) ) {
try {
regexps.put(msgId,Pattern.compile(regexp,Pattern.DOTALL));
regexpsText.put(msgId,regexp);
bus.regexpReceived(this,msgId.intValue(),regexp);
} catch (PatternSyntaxException e) {
// the remote client sent an invalid regexp !
traceDebug("invalid regexp sent by " +appName+" ("+regexp+"), I will ignore this regexp");
sendBuffer(Error+e.toString());
}
} else {
// throw new IvyException("regexp Warning exp='"+regexp+"' can't match removing from "+appName);
traceDebug("Warning "+appName+" subscribes to '"+regexp+"', it can't match our message filter");
bus.regexpReceived(this,msgId.intValue(),regexp);
}
break;
case DelRegexp:
regexps.remove(msgId);
String text=(String)regexpsText.remove(msgId);
bus.regexpDeleted(this,msgId.intValue(),text);
break;
case EndRegexp:
bus.clientConnects(this);
String srm = bus.getReadyMessage();
if (srm!=null) sendMsg(srm);
break;
case Msg:
Vector v = new Vector();
while (toIvyClient["+clientKey+","+serial+"] "+app+" (remote "+appName+")<-- "+s);
}
private void traceDebug(String[] tab){
StringBuffer s = new StringBuffer(" string array ");
s.append(tab.length);
s.append(" elements: ");
for (String ss: tab) {
s.append("(");
s.append(ss);
s.append(") ");
}
traceDebug(s.toString());
}
void PCHadd(int serial,PingCallback pc) {
PingCallbacksTable.put(serial,new PingCallbackHolder(pc));
if (PingCallbacksTable.size()>MAXPONGCALLBACKS) {
// more than MAXPONGCALLBACKS callbacks, we ought to limit to prevent a
// memory leak
// TODO remove the first
Integer smallest=(Integer)new TreeSet(PingCallbacksTable.keySet()).first();
PingCallbackHolder pch = (PingCallbackHolder)PingCallbacksTable.remove(smallest);
System.err.println("no response from "+getApplicationName()+" to ping "+smallest+" after "+pch.age()+" ms, discarding");
}
}
void PCHget(Integer serial) {
PingCallbackHolder pc = (PingCallbackHolder)PingCallbacksTable.remove(serial);
if (pc==null) {
System.err.println("warning: pong received for a long lost callback");
return;
}
pc.run();
}
private class PingCallbackHolder {
PingCallback pc;
long epoch;
int age() { return (int)(System.currentTimeMillis()-epoch); }
PingCallbackHolder(PingCallback pc) {
this.pc=pc;
epoch=System.currentTimeMillis();
}
void run() {
pc.pongReceived(IvyClient.this,age());
}
}
public static void main(String[] args) {
String s="hello\nworld";
String dest=encode(s);
System.out.println("avant: <"+s+">\naprès: <"+dest+">");
System.out.println("tailles: "+s.length()+" "+dest.length());
}
}