/// François-Régis Colin /// http://www.tls.cena.fr/products/ivy/ /// * /// (C) CENA /// * namespace IvyBus { using System; using System.Collections; using System.Threading; using System.Text; using System.IO; using System.Net; using System.Net.Sockets; using System.Configuration; /// A Class for the the peers on the bus. /// /// /// each time a connexion is made with a remote peer, the regexp are exchanged /// once ready, a ready message is sent, and then we can send messages, /// die messages, direct messages, add or remove regexps, or quit. A thread is /// created for each remote client. /// public class IvyClient { public class IvyClientPriority : IComparer { // Calls CaseInsensitiveComparer.Compare with the parameters reversed. int IComparer.Compare( Object x, Object y ) { IvyClient c1 = (IvyClient)x; IvyClient c2 = (IvyClient)y; return( c2.clientPriority - c1.clientPriority ); } } public String ApplicationName { get { return appName; } } public string[] Regexps { get { int i = 0; String[] tab = new String[bindings.Count]; foreach( IvyBindingBase bind in bindings.Values ) tab[i++] = bind.expression; return tab; } } internal int AppPort { get { return appPort; } } public String RemoteAddress { get { IPHostEntry hostInfo = Dns.GetHostByAddress(((IPEndPoint)socket.RemoteEndPoint).Address); return hostInfo.HostName; } } public int RemotePort { get { return ((IPEndPoint)socket.RemoteEndPoint).Port; } } private Ivy bus; private Socket socket; private IvyStream stream; private Hashtable bindings; private int appPort; private string clientId; /* an unique ID for each IvyClient */ private int clientPriority; /* client priority */ private bool peerCalling; private volatile bool running = false; private volatile Thread clientThread; // volatile to ensure the quick communication private static bool doping = (ConfigurationSettings.AppSettings["IVY_PING"] != null); private const int PINGTIMEOUT = 5000; private volatile Thread pingerThread; // protected variables internal String appName; internal IvyClient(Ivy bus, Socket socket, string appname) { bindings = Hashtable.Synchronized(new Hashtable()); appName = appname; appPort = 0; this.bus = bus; this.socket = socket; socket.SetSocketOption( SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, 1 ); stream = new IvyStream( socket ); clientPriority = Ivy.DEFAULT_PRIORITY; sendApplicationId(); // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion stream.sendMsg(IvyStream.MessageType.StartRegexp, bus.applicationPort, Encoding.ASCII.GetBytes( bus.appName )); // sends our regexps to the peer lock( bus.bindings.SyncRoot ) { foreach (Ivy.ApplicationBinding bind in bus.bindings.Values ) { sendBinding(bind); } } stream.sendMsg(IvyStream.MessageType.EndRegexp, 0, Encoding.ASCII.GetBytes("")); // spawns a thread to manage the incoming traffic on this // socket. We should be ready to receive messages now. clientThread = new Thread(new ThreadStart(this.Run)); clientThread.Name = "Ivy Tcp Client Reader Thread"; clientThread.Start(); if (doping) { pingerThread = new Thread(new ThreadStart(PingerRun)); pingerThread.Name = "Ivy Pinger Thread"; pingerThread.Start(); } } /// returns the name of the remote agent. /// allow an Ivy package class to access the list of regexps at a /// given time. /// perhaps we should implement a new IvyApplicationListener method to /// allow the notification of regexp addition and deletion /// internal void sendApplicationId() { stream.sendMsg( IvyStream.MessageType.ApplicationId, bus.applicationPriority, Encoding.ASCII.GetBytes( bus.AppId ) ); } internal void sendBinding(Ivy.ApplicationBinding bind) { stream.sendMsg(bind.type == Ivy.BindingType.BindRegexp ? IvyStream.MessageType.AddRegexp: IvyStream.MessageType.AddBinding, bind.key, Encoding.ASCII.GetBytes( bind.regexp )); /* perhaps we should perform some checking here */ } internal void delBinding(Ivy.ApplicationBinding bind) { stream.sendMsg(bind.type == Ivy.BindingType.BindRegexp ? IvyStream.MessageType.DelRegexp: IvyStream.MessageType.DelBinding, bind.key, Encoding.ASCII.GetBytes( "" )); } /// sends a direct message to the peer /// /// the numeric value provided to the remote client /// /// the string that will be match-tested /// /// public void sendDirectMsg(int id, string message) { try { stream.sendMsg(IvyStream.MessageType.DirectMsg, id, Encoding.ASCII.GetBytes( message ) ); } catch (IOException e) { traceDebug("I can't send my message to this client. He probably left"); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the Disconnected applicationListeners bus.FireClientDisconnected(this); try { close(false); } catch (IOException ioe) { throw new IvyException("close failed" + ioe.Message); } } } /// closes the connexion to the peer. /// /// should I send Bye message ? /// the thread managing the socket is stopped /// /// internal void close(bool notify) { traceDebug("closing connexion to " + appName); if (doping ) { stopPinging(); } if (notify) sendBye("hasta la vista"); stopListening(); // bus.clientDisconnect(this); // in_stream close in the thread if ( stream != null ) { stream.Close(); stream = null; //socket.Close(); // pris en charge par stream ( NetWorkStream ) socket = null; } } /// sends the substrings of a message to the peer for each matching regexp. /// /// the string that will be match-tested /// /// the number of messages sent to the peer /// /// internal int sendMsg(String message) { int count = 0; lock( bindings.SyncRoot ) { IvyBindingSimple.Prepare( message ); foreach (IvyBindingBase bind in bindings.Values ) { IvyArgument args = bind.Match(message); if (args!=null) { stream.sendMsg(IvyStream.MessageType.Msg, bind.key, args.Serialize()); count++; } } } return count; } internal void stopListening() { if (clientThread == null) return ; // Tell Thread to stop. running = false; if ( stream != null ) stream.Close(); if ( Thread.CurrentThread != clientThread ) { // Wait for Thread to end. clientThread.Join(); } clientThread = null; } /// compares two peers the id is the couple (host,service port). /// /// the other peer /// /// true if the peers are similir. This should not happen, it is bad /// © ® (tm) /// /// internal bool sameClient(IvyClient clnt) { return (appPort != 0 && appPort == clnt.appPort) && (RemoteAddress == clnt.RemoteAddress); } /// the code of the thread handling the incoming messages. /// private void Run() { IvyStream.MessageType type; int id; byte[] data; int length; try { traceDebug("Connected from " + RemoteAddress + ":" + RemotePort); } catch (Exception ie) { traceDebug("Interrupted while resolving remote hostname"); } traceDebug("Thread started"); bool running = true; while ( running && (stream != null) ) { try { if ( stream.receiveMsg(out type, out id, out data ) ) { // early stop during readLine() if (doping && (pingerThread != null)) pingerThread.Interrupt(); DispatchMsg(type, id, data); } else { traceDebug("receiveMsg false ! leaving the thread"); break; } } catch ( ObjectDisposedException e ) { traceDebug( "ivyclient socket closed" ); running = false; break; } catch (IvyException ie) { Console.Error.WriteLine( ie.Message ); Console.Error.WriteLine( ie.StackTrace); running = false; break; } catch (SocketException se) { traceDebug( "ivyclient socket closed" ); running = false; break; } catch (IOException e) { if ( e.InnerException is SocketException ) { traceDebug( "ivyclient socket closed" ); } else { traceDebug("abnormally Disconnected from " + RemoteAddress + ":" + RemotePort); Console.Error.WriteLine( e.Message ); Console.Error.WriteLine( e.StackTrace); } running = false; break; } } if ( stream != null ) { stream.Close(); stream = null; } traceDebug("normally Disconnected from " + appName); traceDebug("Thread stopped"); // invokes the Disconnected applicationListeners bus.FireClientDisconnected(this); // first, I'm not a first class IvyClient any more bus.removeClient(this); } private void recvDie( int id, string arg ) { traceDebug("received die Message from " + appName); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners bool dontkillapp = bus.FireDie(this, id); // makes the bus die bus.stop(); try { close(false); } catch (IOException ioe) { throw new IvyException(ioe.Message); } if ( !dontkillapp ) Environment.Exit( -1 ); } private void recvBye( int id, string arg ) { // the peer quits traceDebug("received bye Message from " + appName + "Raison: "+ arg); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the disconnect applicationListeners //bus.FireClientDisconnected(this); done in Running Thread try { close(false); // will fire diconnected } catch (IOException ioe) { throw new IvyException(ioe.Message); } } private void recvAddRegexp( int id, string regexp ) { if (bus.CheckRegexp(regexp)) { try { IvyBindingRegexp bind = new IvyBindingRegexp(id,regexp); lock( bindings.SyncRoot ) { bindings.Add( id, bind); } bus.FireClientAddBinding( this, bind.expression ); } catch (ArgumentException e) { throw new IvyException("regexp error " + e.Message); } } else { traceDebug("regexp Warning exp='" + regexp + "' can't match removing from " + appName); } } private void recvDelRegexp( int id, string arg ) { lock( bindings.SyncRoot ) { IvyBindingBase bind = (IvyBindingBase) bindings[id]; bus.FireClientRemoveBinding( this, bind.expression ); bindings.Remove(id); } } private void recvAddBinding( int id, string expression ) { try { IvyBindingSimple bind = new IvyBindingSimple(id, expression); lock( bindings.SyncRoot ) { bindings.Add( id, bind); } bus.FireClientAddBinding( this, bind.expression ); } catch (ArgumentException e) { throw new IvyException("regexp error " + e.Message); } } private void recvDelBinding( int id, string arg ) { lock( bindings.SyncRoot ) { IvyBindingBase bind = (IvyBindingBase) bindings[id]; bus.FireClientRemoveBinding( this, bind.expression ); bindings.Remove(id); } } private void recvMsg( int id, byte[] arg ) { bus.callCallback(this, id, arg); } private void recvError( int id, string arg ) { traceDebug("Error msg " + id + " " + arg); } private void recvApplicationId( int id, string arg ) { clientId = arg; if ( clientPriority != id ) { clientPriority = id; bus.SortClients(); } } private void recvEndRegexp( int id, string arg ) { bus.FireClientConnected(this); /* * the peer is perhaps not ready to handle this message * an assymetric processing should be written */ if (bus.ready_message != null) sendMsg(bus.ready_message); } private void recvStartRegexp( int id, string arg ) { appName = arg; appPort = id; if (bus.checkConnected(this)) { try { close(false); } catch (IOException ioe) { throw new IvyException("io " + ioe.Message); } throw new IvyException("Rare ! A concurrent connect occured"); } } private void recvDirectMsg( int id, byte[] arg ) { bus.FireDirectMessage(this, id, arg ); } private void recvPing( int id, byte[] arg ) { // I receive a ping. I can answer a pong. traceDebug("Ping msg from " + appName + " : " + Encoding.ASCII.GetString( arg) ); sendPong(arg); } private void recvPong( int id, byte[] arg ) { traceDebug("Ping msg from " + appName + " : " + arg); } private void DispatchMsg(IvyStream.MessageType msgType, int msgId, byte[] msgData) { string strarg = Encoding.ASCII.GetString( msgData ); switch (msgType) { case IvyStream.MessageType.Die: recvDie( msgId, strarg ); break; case IvyStream.MessageType.Bye: recvBye( msgId, strarg ); break; case IvyStream.MessageType.AddRegexp: recvAddRegexp( msgId, strarg ); break; case IvyStream.MessageType.DelRegexp: recvDelRegexp( msgId, strarg ); break; case IvyStream.MessageType.AddBinding: recvAddBinding( msgId, strarg ); break; case IvyStream.MessageType.DelBinding: recvDelBinding( msgId, strarg ); break; case IvyStream.MessageType.EndRegexp: recvEndRegexp( msgId, strarg ); break; case IvyStream.MessageType.Msg: recvMsg( msgId, msgData ); break; case IvyStream.MessageType.Pong: recvPong( msgId, msgData ); break; case IvyStream.MessageType.Ping: recvPing( msgId, msgData ); break; case IvyStream.MessageType.Error: recvError( msgId, strarg ); break; case IvyStream.MessageType.StartRegexp: recvStartRegexp( msgId, strarg ); break; case IvyStream.MessageType.DirectMsg: recvDirectMsg( msgId, msgData ); break; case IvyStream.MessageType.ApplicationId: recvApplicationId( msgId, strarg ); break; default: throw new IvyException("protocol error, unknown message type " + msgType); } } private void sendPong(byte[] s) { stream.sendMsg(IvyStream.MessageType.Pong, 0, s); } public void sendPing(string s) { stream.sendMsg(IvyStream.MessageType.Ping, 0, Encoding.ASCII.GetBytes(s)); } private void sendBye(string message) { stream.sendMsg(IvyStream.MessageType.Bye, 0, Encoding.ASCII.GetBytes( message )); } public void sendDie(string message) { stream.sendMsg(IvyStream.MessageType.Die, 0, Encoding.ASCII.GetBytes( message ) ); } public override String ToString() { return "IvyClient " + bus.appName + ":" + appName; } private void traceDebug(String s) { if (bus.Debug) Console.Out.WriteLine("-->IvyClient " + bus.appName + ":" + appName + "<-- " + s); } internal bool isPinging = false; private void PingerRun() { isPinging = true; traceDebug("Pinger Thread started"); while (isPinging) { try { Thread.Sleep(new TimeSpan(10000 * PINGTIMEOUT)); sendPing("are you here ?"); } catch (ThreadInterruptedException ie) { } } traceDebug("Pinger Thread stopped"); } public virtual void stopPinging() { isPinging = false; pingerThread.Interrupt(); } } }