/// A private Class for the the peers on the bus. /// * /// /// François-Régis Colin /// /// http://www.tls.cena.fr/products/ivy/ /// * /// each time a connexion is made with a remote peer, the regexp are exchanged /// once ready, a ready message is sent, and then we can send messages, /// die messages, direct messages, add or remove regexps, or quit. A thread is /// created for each remote client. /// * /// /// namespace IvyBus { using System; using System.Text.RegularExpressions; using System.Text; using System.Collections; using System.Threading; using System.IO; using System.Net; using System.Net.Sockets; using System.Configuration; internal class MyTcpClient : TcpClient { public IPAddress RemoteAddress { get { IPEndPoint ep = (IPEndPoint)Client.RemoteEndPoint; return ep.Address; } } public int RemotePort { get { IPEndPoint ep = (IPEndPoint)Client.RemoteEndPoint; return ep.Port; } } public MyTcpClient(string host,int port):base(host,port) { } public MyTcpClient(Socket sock) { Client = sock; } } public class IvyClient { public String ApplicationName { get { return appName; } } internal Int32 ClientKey { get { return clientKey; } } public ICollection Regexps { get { return regexp_text.Values; } } internal int AppPort { get { return appPort; } } private String RemoteAddress { get { return socket.RemoteAddress.ToString(); } } private String RemotePort { get { return socket.RemotePort.ToString(); } } /* the protocol magic numbers */ internal enum MessageType : int { Bye = 0, /* end of the peer */ AddRegexp = 1, /* the peer adds a regexp */ Msg = 2, /* the peer sends a message */ Error = 3, /* error message */ DelRegexp = 4, /* the peer removes one of his regex */ EndRegexp = 5, /* no more regexp in the handshake */ StartRegexp = 6, /* avoid race condition in concurrent connexions */ DirectMsg = 7, /* the peer sends a direct message */ Die = 8, /* the peer wants us to quit */ Ping = 9, /* checks the presence of the other */ Pong = 10 /* checks the presence of the other */ }; internal const String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ internal const String StartArg = "\u0002"; /* begin of arguments */ internal const String EndArg = "\u0003"; /* end of arguments */ private static int clientSerial = 0; /* an unique ID for each IvyClient */ private Ivy bus; private MyTcpClient socket; private StreamReader in_stream; private StreamWriter out_stream; private Hashtable regexp_in; private Hashtable regexp_text; private int appPort; private bool peerCalling; private volatile Thread clientThread; // volatile to ensure the quick communication private Int32 clientKey; private static bool doping = (ConfigurationSettings.AppSettings["IVY_PING"] != null); private const int PINGTIMEOUT = 5000; private volatile Thread pingerThread; // protected variables internal String appName; internal IvyClient(Ivy bus, MyTcpClient socket) { regexp_in = new Hashtable(); regexp_text = new Hashtable(); appName = "Unknown"; appPort = 0; this.bus = bus; this.socket = socket; clientKey = clientSerial++; in_stream = new StreamReader(socket.GetStream(),System.Text.Encoding.ASCII); out_stream = new StreamWriter(socket.GetStream(),System.Text.Encoding.ASCII); // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion send(MessageType.StartRegexp, bus.applicationPort, bus.appName); // sends our regexps to the peer foreach (Int32 ikey in bus.regexp_out.Keys ) { sendRegexp(ikey, (String) bus.regexp_out[ikey]); } send(MessageType.EndRegexp, 0, ""); // spawns a thread to manage the incoming traffic on this // socket. We should be ready to receive messages now. clientThread = new Thread(new ThreadStart(this.Run)); clientThread.Start(); if (doping) { pingerThread = new Thread(new ThreadStart(PingerRun)); pingerThread.Start(); } } /// returns the name of the remote agent. /// /// allow an Ivy package class to access the list of regexps at a /// given time. /// perhaps we should implement a new IvyApplicationListener method to /// allow the notification of regexp addition and deletion /// internal void sendRegexp(int id, String regexp) { send(MessageType.AddRegexp, id, regexp); /* perhaps we should perform some checking here */ } internal void delRegexp(int id) { send(MessageType.DelRegexp, id, ""); } /// sends a direct message to the peer /// /// the numeric value provided to the remote client /// /// the string that will be match-tested /// /// internal void sendDirectMsg(int id, String message) { send(MessageType.DirectMsg, id, message); } /// closes the connexion to the peer. /// /// should I send Bye message ? /// the thread managing the socket is stopped /// /// internal void close(bool notify) { traceDebug("closing connexion to " + appName); if (doping ) { stopPinging(); } if (notify) sendBye("hasta la vista"); stopListening(); // bus.clientDisconnect(this); socket.Close(); // should I also close in and out ? } /// sends the substrings of a message to the peer for each matching regexp. /// /// the string that will be match-tested /// /// the number of messages sent to the peer /// /// internal int sendMsg(String message) { int count = 0; foreach (Int32 key in regexp_in.Keys ) { Regex regexp = (Regex) regexp_in[key]; Match result = regexp.Match(message); if (result.Success) { send(MessageType.Msg, key, result); count++; } } return count; } internal void stopListening() { Thread t = clientThread; if (t == null) return ; // we can be summoned to quit from two path at a time clientThread = null; t.Interrupt(); } /// compares two peers the id is the couple (host,service port). /// /// the other peer /// /// true if the peers are similir. This should not happen, it is bad /// © ® (tm) /// /// internal bool sameClient(IvyClient clnt) { return (appPort != 0 && appPort == clnt.appPort) && (RemoteAddress == clnt.RemoteAddress); } /// the code of the thread handling the incoming messages. /// private void Run() { String msg = null; try { traceDebug("Connected from " + RemoteAddress + ":" + RemotePort); } catch (Exception ie) { traceDebug("Interrupted while resolving remote hostname"); } traceDebug("Thread started"); bool running = true; while (running) { try { if ((msg = in_stream.ReadLine()) != null) { // early stop during readLine() if (doping && (pingerThread != null)) pingerThread.Interrupt(); newParseMsg(msg); } else { traceDebug("readline null ! leaving the thead"); break; } } catch (IvyException ie) { Console.Error.WriteLine( ie.Message ); Console.Error.WriteLine( ie.StackTrace); running = false; } catch (IOException e) { traceDebug("abnormally Disconnected from " + RemoteAddress + ":" + RemotePort); Console.Error.WriteLine( e.Message ); Console.Error.WriteLine( e.StackTrace); running = false; break; } } traceDebug("normally Disconnected from " + appName); traceDebug("Thread stopped"); // first, I'm not a first class IvyClient any more //bus.removeClient(this); // invokes the Disconnected applicationListeners bus.FireClientDisconnected(this); } private void sendBuffer(String buffer) { buffer += "\n"; try { out_stream.Write(buffer); out_stream.Flush(); } catch (IOException e) { traceDebug("I can't send my message to this client. He probably left"); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the Disconnected applicationListeners bus.FireClientDisconnected(this); try { close(false); } catch (IOException ioe) { throw new IvyException("close failed" + ioe.Message); } } } private void send(MessageType type, int id, String arg) { try { sendBuffer((int)type + " " + id + StartArg + arg); } catch (IvyException ie) { Console.Error.WriteLine("received an exception: " + ie.Message); Console.Error.WriteLine(ie.StackTrace); } } private void send(MessageType type, Int32 id, Match result) { String buffer = (int)type + " " + id + StartArg; // Start at 1 because group 0 represent entire matching for (int sub = 1; sub < result.Groups.Count; sub++) { buffer += result.Groups[sub] + EndArg; } try { sendBuffer(buffer); } catch (IvyException ie) { Console.Error.WriteLine("received an exception: " + ie.Message); Console.Error.WriteLine(ie.StackTrace); } } private void newParseMsg(String s) { int from = 0, to = 0; MessageType msgType; int msgId; while ((to < s.Length) && (s[to] != ' ')) { to++; } if (to >= s.Length) throw new IvyException("protocol error"); try { msgType = (MessageType)Int32.Parse(s.Substring(from, (to) - (from))); } catch (FormatException nfe) { throw new IvyException("protocol error on msgType"); } from = to + 1; while ((to < s.Length) && (s[to] != 2)) { to++; } if (to >= s.Length) throw new IvyException("protocol error"); try { msgId = Int32.Parse(s.Substring(from, (to) - (from))); } catch (FormatException nfe) { throw new IvyException("protocol error on identifier"); } from = to + 1; switch (msgType) { case MessageType.Die: traceDebug("received die Message from " + appName); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners bool dontkillapp = bus.FireDie(this, msgId); // makes the bus die bus.stop(); try { close(false); } catch (IOException ioe) { throw new IvyException(ioe.Message); } if ( !dontkillapp ) Environment.Exit( -1 ); break; case MessageType.Bye: // the peer quits traceDebug("received bye Message from " + appName); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the disconnect applicationListeners //bus.FireClientDisconnected(this); done in Running Thread try { close(false); // will fire diconnected } catch (IOException ioe) { throw new IvyException(ioe.Message); } break; case MessageType.AddRegexp: String regexp = s.Substring(from, (s.Length) - (from)); if (bus.CheckRegexp(regexp)) { try { regexp_in.Add( msgId, new Regex(regexp)); regexp_text.Add( msgId, regexp); } catch (ArgumentException e) { throw new IvyException("regexp error " + e.Message); } } else { throw new IvyException("regexp Warning exp='" + regexp + "' can't match removing from " + appName); } break; case MessageType.DelRegexp: regexp_in.Remove(msgId); regexp_text.Remove( msgId ); break; case MessageType.EndRegexp: bus.FireClientConnected(this); /* * the peer is perhaps not ready to handle this message * an assymetric processing should be written */ if (bus.ready_message != null) sendMsg(bus.ready_message); break; case MessageType.Msg: ArrayList v = new ArrayList(); while (to < s.Length) { while ((to < s.Length) && (s[to] != 3)) { to++; } if (to < s.Length) { v.Add(s.Substring(from, (to) - (from))); to++; from = to; } } String[] tab = new String[v.Count]; for ( int i2 = 0; i2 < v.Count; i2++) { tab[i2] = (String) v[i2]; // for developpemnt purposes // out.println(" *"+tab[i]+"* "+(tab[i]).length()); } bus.callCallback(this, msgId, tab); break; case MessageType.Pong: String paramPong = s.Substring(from, (s.Length) - (from)); traceDebug("Ping msg from " + appName + " : " + paramPong); break; case MessageType.Ping: // I receive a ping. I can answer a pong. String param = s.Substring(from, (s.Length) - (from)); traceDebug("Ping msg from " + appName + " : " + param); sendPong(param); break; case MessageType.Error: String error = s.Substring(from, (s.Length) - (from)); traceDebug("Error msg " + msgId + " " + error); break; case MessageType.StartRegexp: appName = s.Substring(from, (s.Length) - (from)); appPort = msgId; if (bus.checkConnected(this)) { try { close(false); } catch (IOException ioe) { throw new IvyException("io " + ioe.Message); } throw new IvyException("Rare ! A concurrent connect occured"); } break; case MessageType.DirectMsg: String direct = s.Substring(from, (s.Length) - (from)); bus.FireDirectMessage(this, msgId, direct); break; default: throw new IvyException("protocol error, unknown message type " + msgType); } } private void sendPong(String s) { send(MessageType.Pong, 0, s); } private void sendPing(String s) { send(MessageType.Ping, 0, s); } private void sendBye() { send(MessageType.Bye, 0, ""); } private void sendBye(String message) { send(MessageType.Bye, 0, message); } private void sendDie() { send(MessageType.Die, 0, ""); } private void sendDie(String message) { send(MessageType.Die, 0, message); } public override String ToString() { return "IvyClient " + bus.appName + ":" + appName; } private void traceDebug(String s) { if (bus.Debug) Console.Out.WriteLine("-->IvyClient " + bus.appName + ":" + appName + "<-- " + s); } internal bool isPinging = false; private void PingerRun() { isPinging = true; traceDebug("Pinger Thread started"); while (isPinging) { try { Thread.Sleep(new TimeSpan(10000 * PINGTIMEOUT)); sendPing("are you here ?"); } catch (ThreadInterruptedException ie) { } } traceDebug("Pinger Thread stopped"); } public virtual void stopPinging() { isPinging = false; pingerThread.Interrupt(); } } }