/// François-Régis Colin /// http://www.tls.cena.fr/products/ivy/ /// * /// (C) CENA /// * namespace IvyBus { using System; using System.Collections; using System.Collections.Specialized; using System.Collections.Generic; using System.Threading; using System.Text; using System.IO; using System.Net; using System.Net.Sockets; using System.Configuration; using System.Diagnostics; /// A Class for the the peers on the bus. /// /// /// each time a connexion is made with a remote peer, the regexp are exchanged /// once ready, a ready message is sent, and then we can send messages, /// die messages, direct messages, add or remove regexps, or quit. A thread is /// created for each remote client. /// public class IvyClient : IvyProtocol, IComparable { public int CompareTo(IvyClient y) { return (y.clientPriority - clientPriority); } public String ApplicationName { get { return appName; } } public StringCollection Regexps { get { StringCollection tab = new StringCollection(); foreach( IvyBindingBase bind in bindings.Values ) tab.Add(bind.expression); return tab; } } internal int AppPort { get { return appPort; } } public IPAddress RemoteAddress { get { return remoteHost; } } public int RemotePort { get { return remotePort; } } private Ivy bus; private Dictionary bindings; private int appPort; private string clientId; /* an unique ID for each IvyClient */ private int clientPriority; /* client priority */ private volatile Thread clientThread; // volatile to ensure the quick communication private bool doping; // false by runtime default private const int PINGTIMEOUT = 5000; private volatile Thread pingerThread; private int remotePort; private IPAddress remoteHost; // protected variables internal String appName; internal IvyProtocol stream; internal IvyClient(Ivy bus, Socket socket, string appname) { bindings = new Dictionary(); appName = appname; appPort = 0; this.bus = bus; IPEndPoint endpoint = (IPEndPoint)socket.RemoteEndPoint; remoteHost = endpoint.Address; remotePort = endpoint.Port; #if (!PocketPC ) doping = Properties.Settings.Default.IvyPing; #endif #if (!PocketPC ) socket.SetSocketOption( SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, 1 ); #endif if ( bus.ProtocolVersion == 4 ) stream = new IvyTCPStreamV4( socket, this ); else stream = new IvyTCPStreamV3(socket, this); clientPriority = Ivy.DEFAULT_PRIORITY; stream.TokenApplicationId(bus.applicationPriority, bus.AppId); // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion stream.TokenStartRegexp( bus.applicationPort, bus.appName ); // sends our regexps to the peer lock( bus.bindings ) { foreach (Ivy.ApplicationBinding bind in bus.bindings.Values ) { stream.TokenAddBinding(bind); } } stream.TokenEndRegexp(); // spawns a thread to manage the incoming traffic on this // socket. We should be ready to receive messages now. clientThread = new Thread(new ThreadStart(this.Run)); clientThread.Name = "Ivy Tcp Client Reader Thread"; clientThread.Start(); if (doping) { pingerThread = new Thread(new ThreadStart(PingerRun)); pingerThread.Name = "Ivy Pinger Thread"; pingerThread.Start(); } } /// returns the name of the remote agent. /// allow an Ivy package class to access the list of regexps at a /// given time. /// perhaps we should implement a new IvyApplicationListener method to /// allow the notification of regexp addition and deletion /// /// sends a direct message to the peer /// /// the numeric value provided to the remote client /// /// the string that will be match-tested /// /// public void sendDirectMsg(ushort id, string message) { try { stream.TokenDirectMsg( id, message); } catch (IOException e) { traceDebug("I can't send my message to this client. He probably left "+e.Message); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the Disconnected applicationListeners bus.FireClientDisconnected(this); close(false); } } /// closes the connexion to the peer. /// /// should I send Bye message ? /// the thread managing the socket is stopped /// /// internal void close(bool notify) { traceDebug("closing connexion to " + appName); if (doping ) { stopPinging(); } if (notify) try { stream.TokenBye(0, "hasta la vista"); } catch (IOException ioe) { throw new IvyException(ioe.Message); } // stop the thread and close the stream if (clientThread == null) return; // Tell Thread to stop. if (stream != null) { try { stream.Close(); // should stop the Reading Client Thread } catch (IOException ioe) { throw new IvyException(ioe.Message); } //socket.Close(); // pris en charge par stream ( NetWorkStream ) stream = null; } // Potential dead lok when thread issue ClientDisconnected event //if (Thread.CurrentThread != clientThread && (clientThread != null)) //{ // // Wait for Thread to end. // bool term = clientThread.Join(10000); // if (!term && (clientThread != null)) clientThread.Abort(); //} clientThread = null; } /// sends the substrings of a message to the peer for each matching regexp. /// /// the string that will be match-tested /// /// the number of messages sent to the peer /// /// internal int sendMsg(String message) { int count = 0; lock( bindings ) { // hash message in V4 protocol only if (bus.ProtocolVersion == 4) IvyBindingSimple.Prepare(message); foreach (IvyBindingBase bind in bindings.Values ) { string[] args = bind.Match(message); if ( stream != null && args!=null ) { stream.TokenMsg(bind.key, args); count++; } } } return count; } /// compares two peers the id is the couple (host,service port). /// /// the other peer /// /// true if the peers are similir. This should not happen, it is bad /// © ® (tm) /// /// internal bool sameClient(IvyClient clnt) { return (appPort != 0 && appPort == clnt.appPort) && (RemoteAddress == clnt.RemoteAddress); } /// the code of the thread handling the incoming messages. /// private void Run() { traceDebug("Connected from " + RemoteAddress + ":" + RemotePort); traceDebug("Thread started"); bool running = true; while ( running && (stream != null) ) { try { if ( stream.receiveMsg() ) { // early stop during readLine() if (doping && (pingerThread != null)) pingerThread.Abort(); } else { traceDebug("receiveMsg false ! leaving the thread"); running = false; break; } } catch ( ObjectDisposedException e ) { traceDebug( "socket closed "+e.Message ); running = false; break; } catch (IvyException ie) { traceDebug("socket closed IvyException" + ie.Message); running = false; break; } catch (SocketException se) { traceDebug( "socket closed "+se.Message ); running = false; break; } catch (IOException e) { if ( e.InnerException is SocketException ) { traceDebug( "socket closed" ); } else { traceDebug("abnormally Disconnected from " + RemoteAddress + ":" + RemotePort); } running = false; break; } } traceDebug("normally Disconnected from " + appName); traceDebug("Thread stopped"); // invokes the Disconnected applicationListeners bus.FireClientDisconnected(this); // first, I'm not a first class IvyClient any more if (stream != null) { stream.Close(); stream = null; } bus.removeClient(this); } void IvyProtocol.Close() { // never call in this side } bool IvyProtocol.receiveMsg() { // nerver call in this side return false; } void IvyProtocol.TokenDie(ushort id, string arg) { traceDebug("received die Message from " + appName + "Raison: "+ arg); // invokes the die applicationListeners Ivy.ApplicationExit killapp = bus.FireDie(this, id, arg); // first, I'm not a first class IvyClient any more bus.removeClient(this); // makes the bus die bus.stop(); close(false); if (killapp == Ivy.ApplicationExit.FORCE_EXIT) System.Windows.Forms.Application.Exit(); } void IvyProtocol.TokenBye(ushort err, string arg) { // the peer quits traceDebug("received bye Message from " + appName + "Raison: "+ arg); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the disconnect applicationListeners //bus.FireClientDisconnected(this); done in Running Thread close(false); // will fire diconnected } void IvyProtocol.TokenAddBinding(Ivy.ApplicationBinding bind) { try { if (bind.type == Ivy.BindingType.BindRegexp && !bus.CheckRegexp(bind.expression) ) { bus.FireClientFilterBinding(this, bind.expression); return; } lock (bindings) { IvyBindingBase binding; if (bind.type == Ivy.BindingType.BindRegexp) { binding = new IvyBindingRegexp(bind.key, bind.expression); } else binding = new IvyBindingSimple(bind.key, bind.expression); bindings.Add(bind.key, binding); } bus.FireClientAddBinding(this, bind.expression); } catch (ArgumentException e) { throw new IvyException("binding expression error " + e.Message); } } void IvyProtocol.TokenDelBinding(ushort id) { lock( bindings ) { try { IvyBindingBase bind = (IvyBindingBase)bindings[id]; bus.FireClientRemoveBinding(this, bind.expression); bindings.Remove(id); } catch (KeyNotFoundException ex) { traceDebug("DelBinding " + ex.Message); } } } void IvyProtocol.TokenMsg(ushort id, string[] arg) { bus.FireCallback(this, id, arg); } void IvyProtocol.TokenError(ushort id, string arg) { bus.FireError(this, id, arg); traceDebug("Error msg " + id + " " + arg); } void IvyProtocol.TokenApplicationId(ushort id, string arg) { clientId = arg; if ( clientPriority != id ) { clientPriority = id; bus.SortClients(); } } void IvyProtocol.TokenEndRegexp() { bus.FireClientConnected(this); /* * the peer is perhaps not ready to handle this message * an assymetric processing should be written */ if (bus.ReadyMessage != null) sendMsg(bus.ReadyMessage); } void IvyProtocol.TokenStartRegexp(ushort id, string arg) { appName = arg; appPort = id; if (bus.checkConnected(this)) { close(false); throw new IvyException("Rare ! A concurrent connect occured"); } } void IvyProtocol.TokenDirectMsg(ushort id, string arg) { bus.FireDirectMessage(this, id, arg ); } void IvyProtocol.TokenPing(string arg) { // I receive a ping. I can answer a pong. traceDebug("Ping msg from " + appName + " : " + arg ); stream.TokenPong(arg); } void IvyProtocol.TokenPong(string arg) { traceDebug("Ping msg from " + appName + " : " + arg); } public override String ToString() { return "IvyClient " + bus.appName + ":" + appName; } [Conditional("DEBUG")] private void traceDebug(String s) { Trace.Assert(!Ivy.VerboseDebug, "-->IvyClient " + this.bus.appName + ":" + appName + "<-- " + s); } internal bool isPinging = false; private void PingerRun() { isPinging = true; traceDebug("Pinger Thread started"); while (isPinging) { try { Thread.Sleep(PINGTIMEOUT); stream.TokenPing("are you here ?"); } catch (ThreadAbortException ie) { traceDebug("Pinger Thread killed "+ie.Message); } } traceDebug("Pinger Thread stopped"); } public virtual void stopPinging() { isPinging = false; //pingerThread.Interrupt(); pingerThread.Abort(); } } }