/// François-Régis Colin /// http://www.tls.cena.fr/products/ivy/ /// * /// (C) CENA /// * namespace IvyBus { using System; using System.Collections; using System.Collections.Specialized; using System.Collections.Generic; using System.Threading; using System.Text; using System.IO; using System.Net; using System.Net.Sockets; using System.Configuration; using System.Diagnostics; /// A Class for the the peers on the bus. /// /// /// each time a connexion is made with a remote peer, the regexp are exchanged /// once ready, a ready message is sent, and then we can send messages, /// die messages, direct messages, add or remove regexps, or quit. A thread is /// created for each remote client. /// public class IvyClient : IvyProtocol, IComparable { public int CompareTo(IvyClient y) { return (y.clientPriority - clientPriority); } public String ApplicationName { get { return appName; } } public string[] Regexps { get { int i = 0; String[] tab = new String[bindings.Count]; foreach( IvyBindingBase bind in bindings.Values ) tab[i++] = bind.expression; return tab; } } internal int AppPort { get { return appPort; } } public String RemoteAddress { get { return remoteHost; } } public int RemotePort { get { return remotePort; } } private Ivy bus; private Socket socket; private Dictionary bindings; private int appPort; private string clientId; /* an unique ID for each IvyClient */ private int clientPriority; /* client priority */ private volatile Thread clientThread; // volatile to ensure the quick communication private bool doping = false; private const int PINGTIMEOUT = 5000; private volatile Thread pingerThread; private int remotePort; private string remoteHost; // protected variables internal String appName; internal IvyTCPStream stream; internal IvyClient(Ivy bus, Socket socket, string appname) { bindings = new Dictionary(); appName = appname; appPort = 0; this.bus = bus; this.socket = socket; IPHostEntry hostInfo = Dns.GetHostEntry(((IPEndPoint)socket.RemoteEndPoint).Address); remoteHost = hostInfo.HostName; remotePort = ((IPEndPoint)socket.RemoteEndPoint).Port; #if (!PocketPC ) doping = Properties.Settings.Default.IvyPing; #endif #if (!PocketPC ) socket.SetSocketOption( SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, 1 ); #endif if ( bus.ProtocolVersion == 4 ) stream = new IvyTCPStreamV4( socket, this ); else stream = new IvyTCPStreamV3(socket, this); clientPriority = Ivy.DEFAULT_PRIORITY; stream.TokenApplicationId(bus.applicationPriority, bus.AppId); // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion stream.TokenStartRegexp( bus.applicationPort, bus.appName ); // sends our regexps to the peer lock( bus.bindings ) { foreach (Ivy.ApplicationBinding bind in bus.bindings.Values ) { stream.TokenAddBinding(bind); } } stream.TokenEndRegexp(); // spawns a thread to manage the incoming traffic on this // socket. We should be ready to receive messages now. clientThread = new Thread(new ThreadStart(this.Run)); clientThread.Name = "Ivy Tcp Client Reader Thread"; clientThread.Start(); if (doping) { pingerThread = new Thread(new ThreadStart(PingerRun)); pingerThread.Name = "Ivy Pinger Thread"; pingerThread.Start(); } } /// returns the name of the remote agent. /// allow an Ivy package class to access the list of regexps at a /// given time. /// perhaps we should implement a new IvyApplicationListener method to /// allow the notification of regexp addition and deletion /// /// sends a direct message to the peer /// /// the numeric value provided to the remote client /// /// the string that will be match-tested /// /// public void sendDirectMsg(ushort id, string message) { try { stream.TokenDirectMsg( id, message); } catch (IOException e) { traceDebug("I can't send my message to this client. He probably left "+e.Message); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the Disconnected applicationListeners bus.FireClientDisconnected(this); try { close(false); } catch (IOException ioe) { throw new IvyException("close failed" + ioe.Message); } } } /// closes the connexion to the peer. /// /// should I send Bye message ? /// the thread managing the socket is stopped /// /// internal void close(bool notify) { traceDebug("closing connexion to " + appName); if (doping ) { stopPinging(); } if (notify) stream.TokenBye(0,"hasta la vista"); stopListening(); // bus.clientDisconnect(this); // in_stream close in the thread if ( stream != null ) { stream.Close(); stream = null; //socket.Close(); // pris en charge par stream ( NetWorkStream ) socket = null; } } /// sends the substrings of a message to the peer for each matching regexp. /// /// the string that will be match-tested /// /// the number of messages sent to the peer /// /// internal int sendMsg(String message) { int count = 0; lock( bindings ) { // hash message in V4 protocol only if (bus.ProtocolVersion == 4) IvyBindingSimple.Prepare(message); foreach (IvyBindingBase bind in bindings.Values ) { string[] args = bind.Match(message); if (args!=null) { stream.TokenMsg(bind.key, args); count++; } } } return count; } internal void stopListening() { if (clientThread == null) return ; // Tell Thread to stop. if ( stream != null ) stream.Close(); if (Thread.CurrentThread != clientThread && (clientThread != null) ) { // Wait for Thread to end. bool term = clientThread.Join(10000); if (!term && (clientThread != null) ) clientThread.Abort(); } clientThread = null; } /// compares two peers the id is the couple (host,service port). /// /// the other peer /// /// true if the peers are similir. This should not happen, it is bad /// © ® (tm) /// /// internal bool sameClient(IvyClient clnt) { return (appPort != 0 && appPort == clnt.appPort) && (RemoteAddress == clnt.RemoteAddress); } /// the code of the thread handling the incoming messages. /// private void Run() { try { traceDebug("Connected from " + RemoteAddress + ":" + RemotePort); } catch (Exception ie) { traceDebug("Interrupted while resolving remote hostname "+ie.Message); } traceDebug("Thread started"); bool running = true; while ( running && (stream != null) ) { try { if ( stream.receiveMsg() ) { // early stop during readLine() if (doping && (pingerThread != null)) pingerThread.Abort(); } else { traceDebug("receiveMsg false ! leaving the thread"); break; } } catch ( ObjectDisposedException e ) { traceDebug( "socket closed "+e.Message ); running = false; break; } catch (IvyException ie) { traceDebug("socket closed IvyException" + ie.Message); running = false; break; } catch (SocketException se) { traceDebug( "socket closed "+se.Message ); running = false; break; } catch (IOException e) { if ( e.InnerException is SocketException ) { traceDebug( "socket closed" ); } else { traceDebug("abnormally Disconnected from " + RemoteAddress + ":" + RemotePort); } running = false; break; } } traceDebug("normally Disconnected from " + appName); traceDebug("Thread stopped"); // invokes the Disconnected applicationListeners bus.FireClientDisconnected(this); // first, I'm not a first class IvyClient any more if (stream != null) { stream.Close(); stream = null; } bus.removeClient(this); } override internal void TokenDie(ushort id, string arg) { traceDebug("received die Message from " + appName); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the die applicationListeners bool dontkillapp = bus.FireDie(this, id, arg); // makes the bus die bus.stop(); try { close(false); } catch (IOException ioe) { throw new IvyException(ioe.Message); } if ( !dontkillapp ) System.Windows.Forms.Application.Exit(); } override internal void TokenBye(ushort err, string arg) { // the peer quits traceDebug("received bye Message from " + appName + "Raison: "+ arg); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the disconnect applicationListeners //bus.FireClientDisconnected(this); done in Running Thread try { close(false); // will fire diconnected } catch (IOException ioe) { throw new IvyException(ioe.Message); } } override internal void TokenAddBinding(Ivy.ApplicationBinding bind) { try { lock( bindings ) { IvyBindingBase binding; if (bind.type == Ivy.BindingType.BindRegexp) binding = new IvyBindingRegexp(bind.key, bind.expression); else binding = new IvyBindingSimple(bind.key, bind.expression); bindings.Add( bind.key, binding); } bus.FireClientAddBinding( this, bind.expression ); } catch (ArgumentException e) { throw new IvyException("binding expression error " + e.Message); } } override internal void TokenDelBinding(ushort id) { lock( bindings ) { try { IvyBindingBase bind = (IvyBindingBase)bindings[id]; bus.FireClientRemoveBinding(this, bind.expression); bindings.Remove(id); } catch (KeyNotFoundException ex) { traceDebug("DelBinding " + ex.Message); } } } override internal void TokenMsg(ushort id, string[] arg) { bus.FireCallback(this, id, arg); } override internal void TokenError(ushort id, string arg) { traceDebug("Error msg " + id + " " + arg); } override internal void TokenApplicationId(ushort id, string arg) { clientId = arg; if ( clientPriority != id ) { clientPriority = id; bus.SortClients(); } } override internal void TokenEndRegexp() { bus.FireClientConnected(this); /* * the peer is perhaps not ready to handle this message * an assymetric processing should be written */ if (bus.ReadyMessage != null) sendMsg(bus.ReadyMessage); } override internal void TokenStartRegexp(ushort id, string arg) { appName = arg; appPort = id; if (bus.checkConnected(this)) { try { close(false); } catch (IOException ioe) { throw new IvyException("io " + ioe.Message); } throw new IvyException("Rare ! A concurrent connect occured"); } } override internal void TokenDirectMsg(ushort id, string arg) { bus.FireDirectMessage(this, id, arg ); } override internal void TokenPing(string arg) { // I receive a ping. I can answer a pong. traceDebug("Ping msg from " + appName + " : " + arg ); stream.TokenPong(arg); } override internal void TokenPong(string arg) { traceDebug("Ping msg from " + appName + " : " + arg); } public override String ToString() { return "IvyClient " + bus.appName + ":" + appName; } [Conditional("DEBUG")] private void traceDebug(String s) { Trace.Assert(!Ivy.VerboseDebug, "-->IvyClient " + bus.appName + ":" + appName + "<-- " + s); } internal bool isPinging = false; private void PingerRun() { isPinging = true; traceDebug("Pinger Thread started"); while (isPinging) { try { Thread.Sleep(PINGTIMEOUT); stream.TokenPing("are you here ?"); } catch (ThreadAbortException ie) { traceDebug("Pinger Thread killed "+ie.Message); } } traceDebug("Pinger Thread stopped"); } public virtual void stopPinging() { isPinging = false; //pingerThread.Interrupt(); pingerThread.Abort(); } } }