/// François-Régis Colin /// http://www.tls.cena.fr/products/ivy/ /// * /// (C) CENA /// * namespace IvyBus { using System; using System.Collections; using System.Collections.Specialized; using System.Collections.Generic; using System.Threading; using System.Text; using System.IO; using System.Net; using System.Net.Sockets; using System.Configuration; using System.Diagnostics; using System.Collections.ObjectModel; using IvyBus.Properties; /// A Class for the the peers on the bus. /// /// /// each time a connexion is made with a remote peer, the regexp are exchanged /// once ready, a ready message is sent, and then we can send messages, /// die messages, direct messages, add or remove regexps, or quit. A thread is /// created for each remote client. /// public class IvyClient : IvyProtocol, IComparable, IDisposable { public int CompareTo(IvyClient other) { if (other == null) return clientPriority; else return (other.clientPriority - clientPriority); } /// /// the client application Name /// public String ApplicationName { get { return appName; } } //public ReadOnlyCollection Regexps //{ // get // { // List tab = new List(); // lock (bindings) // { // foreach (IvyBindingBase bind in bindings.Values) // tab.Add(bind.Expression); // } // return new ReadOnlyCollection(tab); // } //} internal int AppPort { get { return appPort; } } /// /// adress of the client /// public IPAddress RemoteAddress { get { return remoteHost; } } /// /// port of the client /// public int RemotePort { get { return remotePort; } } private Ivy bus; //private Dictionary bindings; private int appPort; private string clientId; /* an unique ID for each IvyClient */ private int clientPriority; /* client priority */ private volatile Thread clientThread; // volatile to ensure the quick communication private bool doping; // false by runtime default private const int PINGTIMEOUT = 5000; private volatile Thread pingerThread; private int localPort; private int remotePort; private IPAddress remoteHost; private int readyToSend; private Object readyToSendLock; // protected variables internal String appName; internal IvyProtocol stream; internal IvyClient(Ivy bus, Socket socket, string appname, int appPort) { //bindings = new Dictionary(); this.appName = appname; this.appPort = appPort; this.bus = bus; readyToSendLock = new Object(); this.readyToSend = 0; // set TCP_NODELAY to lower latency socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.NoDelay, true); localPort = ((IPEndPoint)socket.LocalEndPoint).Port; IPEndPoint endpoint = (IPEndPoint)socket.RemoteEndPoint; remoteHost = endpoint.Address; remotePort = endpoint.Port; if ( bus.ProtocolVersion == 4 ) stream = new IvyTCPStreamV4( socket, this ); else stream = new IvyTCPStreamV3(socket, this); // spawns a thread to manage the incoming traffic on this // socket. We should be ready to receive messages now. clientThread = new Thread(new ThreadStart(this.Run)); clientThread.Name = "Ivy Tcp Client Reader Thread ("+appname+")"; bus.addClient(this); clientThread.Start(); } internal void SendBindings() { try { // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion stream.TokenStartRegexp(bus.applicationPort, bus.appName); // sends our regexps to the peer lock (bus.bindings) { foreach (IvyApplicationBinding bind in bus.bindings.Values) { stream.TokenAddBinding(bind.Binding, bind.Key, bind.FormattedExpression); } } // send end of bindings peers can now send ReadyMessage stream.TokenEndRegexp(); // try to send Ready Msg SendReadyToPeer(); #if (!PocketPC) doping = Properties.Settings.Default.IvyPing; #endif if (doping) { pingerThread = new Thread(new ThreadStart(PingerRun)); pingerThread.Name = "Ivy Pinger Thread"; pingerThread.Start(); } } catch (NullReferenceException ex) { // the client nous a coupé l'herbe sous le pied Ivy.traceError(Resources.IvyClient, Resources.IvyClientLeft + " " + appName + " " + ex.Message); // connexion close in rare concurrent connexion close(false); } catch (ObjectDisposedException ex) { // the client nous a coupé l'herbe sous le pied Ivy.traceError(Resources.IvyClient, Resources.IvyClientLeft + " " + appName + " " + ex.Message); // invokes the Disconnected applicationListeners //bus.OnClientDisconnected(new IvyEventArgs(this,id, message )); // called by the receiver Thread close(false); } catch (IOException ex) { // the client nous a coupé l'herbe sous le pied Ivy.traceError(Resources.IvyClient, Resources.IvyClientLeft + " " + appName + " " + ex.Message); // invokes the Disconnected applicationListeners //bus.OnClientDisconnected(new IvyEventArgs(this,id, message )); // called by the receiver Thread close(false); } } /// returns the name of the remote agent. /// allow an Ivy package class to access the list of regexps at a /// given time. /// perhaps we should implement a new IvyApplicationListener method to /// allow the notification of regexp addition and deletion /// /// sends a direct message to the peer /// /// the numeric value provided to the remote client /// /// the string that will be match-tested /// /// public void SendDirectMsg(int id, string message) { try { stream.TokenDirectMsg( id, message); } catch (IOException ex) { Ivy.traceError(Resources.IvyClient,Resources.IvyClientLeft+ex.Message); // first, I'm not a first class IvyClient any more bus.removeClient(this); // invokes the Disconnected applicationListeners //bus.OnClientDisconnected(new IvyEventArgs(this,id, message )); // should be called by receiver thread close(false); } } /// closes the connexion to the peer. /// /// should I send Bye message ? /// the thread managing the socket is stopped /// /// internal void close(bool notify) { Ivy.traceProtocol(Resources.IvyClient,Resources.Closing + appName); if (doping ) { StopPinging(); } if (notify) try { if (stream != null) stream.TokenBye(0, Resources.ByeMessage); } catch (ObjectDisposedException) { } catch (IOException ioe) { SocketException se = ioe.InnerException as SocketException; if (se != null) { if (!(se.SocketErrorCode == SocketError.ConnectionReset || se.SocketErrorCode == SocketError.ConnectionAborted)) throw new IvyException(ioe.Message); } else { throw new IvyException(ioe.Message); } } // stop the thread and close the stream if (clientThread == null) return; // Tell Thread to stop. if (stream != null) { try { stream.Close(); // should stop the Reading Client Thread } catch (EndOfStreamException ioe) { throw new IvyException(ioe.Message); } catch (IOException ioe) { throw new IvyException(ioe.Message); } //socket.Close(); // pris en charge par stream ( NetWorkStream ) stream = null; } // Potential dead lok when thread issue ClientDisconnected event //if (Thread.CurrentThread != clientThread && (clientThread != null)) //{ // // Wait for Thread to end. // bool term = clientThread.Join(10000); // if (!term && (clientThread != null)) clientThread.Abort(); //} clientThread = null; } /// sends the substrings of a message to the peer for each matching regexp. /// /// the string that will be match-tested /// /// the number of messages sent to the peer /// /// internal int sendMsg(ushort id, string[] args) { int count = 0; try { if (stream != null) { stream.TokenMsg(id, args); count++; } } catch (ObjectDisposedException ex) { Ivy.traceError(Resources.IvyClient, Resources.IvyClientLeft + ex.Message); // first, I'm not a first class IvyClient any more bus.removeClient(this); //TODO trouble in upper loop iter // invokes the Disconnected applicationListeners // in the receiver thread close(false); } catch (IOException ex) { Ivy.traceError(Resources.IvyClient, Resources.IvyClientLeft + ex.Message); // first, I'm not a first class IvyClient any more bus.removeClient(this); //TODO trouble in upper loop iter // invokes the Disconnected applicationListeners // in the receiver thread close(false); } return count; } /// compares two peers the id is the couple (host,service port). /// /// the other peer /// /// true if the peers are similir. This should not happen, it is bad /// © ® (tm) /// /// internal bool sameIvyClient(IvyClient clnt) { // clientId est null si le protocol n'envoie pas le client ID if (clnt.clientId != null && this.clientId != null && clnt.clientId == this.clientId) return true; return (this.appPort != 0) && (this.appPort == clnt.appPort) && (this.RemoteAddress.Equals(clnt.RemoteAddress)); } /// the code of the thread handling the incoming messages. /// private void Run() { Ivy.traceProtocol(Resources.IvyClient, Resources.Connected + RemoteAddress + ":" + RemotePort); Ivy.traceProtocol(Resources.IvyClient,"Thread started"); bool running = true; while ( running && (stream != null) ) { try { if ( stream.ReceiveMsg() ) { // early stop during readLine() if (doping && (pingerThread != null)) pingerThread.Abort(); } else { Ivy.traceProtocol(Resources.IvyClient, Resources.BadReceive); running = false; break; } } catch ( ObjectDisposedException ex ) { Ivy.traceError(Resources.IvyClient, Resources.SocketClosed + ex.Message); running = false; break; } catch (IvyException ie) { Ivy.traceError(Resources.IvyClient, Resources.SocketClosed + ie.Message); running = false; break; } catch (SocketException se) { Ivy.traceError(Resources.IvyClient, Resources.SocketClosed + se.Message ); running = false; break; } catch (IOException ex) { if ( ex.InnerException is SocketException ) { Ivy.traceProtocol(Resources.IvyClient, Resources.SocketClosed ); } else { Ivy.traceError(Resources.IvyClient, Resources.AbDisconnect + RemoteAddress + ":" + RemotePort); } running = false; break; } } Ivy.traceProtocol(Resources.IvyClient,Resources.Disconnected + appName); Ivy.traceProtocol(Resources.IvyClient,"Thread stopped"); // invokes the Disconnected applicationListeners bus.OnClientDisconnected(new IvyEventArgs(this,0, string.Empty )); // first, I'm not a first class IvyClient any more if (stream != null) { stream.Close(); stream = null; } bus.removeClient(this); } void IvyProtocol.Close() { // never call in this side } bool IvyProtocol.ReceiveMsg() { // nerver call in this side return false; } void IvyProtocol.TokenDie(int id, string arg) { Ivy.traceProtocol(Resources.IvyClient, Resources.DieReceive + appName + Resources.Reason + arg); // invokes the die applicationListeners IvyDieEventArgs ev = new IvyDieEventArgs(this, id, arg); bus.OnDie(ev); // first, I'm not a first class IvyClient any more bus.removeClient(this); // makes the bus die bus.Stop(); close(false); if (ev.ForceExit) #if (PocketPC) System.Windows.Forms.Application.Exit(); #else System.Environment.Exit(0); #endif } void IvyProtocol.TokenBye(int err, string arg) { // the peer quits Ivy.traceProtocol(Resources.IvyClient, Resources.ByeReceive + appName + Resources.Reason + arg); // first, I'm not a first class IvyClient any more //bus.removeClient(this); // this is done in the receive thread terminaison!! // invokes the disconnect applicationListeners //bus.FireClientDisconnected(this); done in Running Thread close(false); // will fire disconnected } void IvyProtocol.TokenAddBinding(BindingType type, int id, string expression) { if (type == BindingType.RegularExpression && !bus.CheckRegexp(expression)) { bus.OnClientFilterBinding(new IvyEventArgs(this, id, expression )); return; } IvyBindingBase bind = null; try { switch (type) { case BindingType.RegularExpression: bind = new IvyBindingRegexp(expression); break; case BindingType.Simple: bind = new IvyBindingSimple(expression); break; } bus.AddBinding(id, this, bind); } catch (ArgumentException ex) { //throw new IvyException("binding expression error " + ex.Message); stream.TokenError(7, Resources.BadExpression + " "+ ex.Message); } } void IvyProtocol.TokenDelBinding(int id) { bus.DelBinding(id, this); } void IvyProtocol.TokenMsg(int id, string[] args) { bus.OnMessage(new IvyMessageEventArgs(this, id, args)); } void IvyProtocol.TokenError(int id, string arg) { bus.OnError(new IvyEventArgs(this, id, arg)); Ivy.traceError(Resources.IvyClient,Resources.ErrorReceive + id + " " + arg); } void IvyProtocol.TokenEndRegexp() { bus.OnClientConnected(new IvyEventArgs(this, 0, string.Empty)); /* * the peer is perhaps not ready to handle this message * an assymetric processing should be written */ SendReadyToPeer(); } private void SendReadyToPeer() { lock (readyToSendLock) { readyToSend++; if (bus.ReadyMessage != null && readyToSend == 2) { bus.SendMsgToClient(this, bus.ReadyMessage); } } } void IvyProtocol.TokenStartRegexp(int id, string arg) { //bool bindingToSend = appPort == 0; appName = arg; appPort = id; IvyClient target = this; IvyClient client = bus.checkConnected(this); if (client != null) { // Dilemma choose the rigth client to close // the symetric processing will try to close each other // only one side may be closed //Console.WriteLine(" should close {0} this local {1} rem {2} other local {3} rem {4}", this.appName, this.localPort, this.remotePort, client.localPort, client.remotePort); if (Math.Max(client.localPort, client.remotePort) > Math.Max( this.localPort, this.remotePort )) { target = client; //Console.WriteLine("choose {0} other ports {1},{2}", target.appName, target.localPort, target.remotePort); } else { target = this; //Console.WriteLine("choose {0} this ports {1},{2}", target.appName, target.remotePort, target.localPort); } bus.removeClient(target); target.close(false); //throw new IvyException(Resources.ConcurrentConnect + " " + appName + " " + clientId); } //if ( bindingToSend && target != this) // SendBindings(); } void IvyProtocol.TokenDirectMsg(int id, string arg) { bus.OnDirectMessage(new IvyEventArgs(this,id,arg)); } void IvyProtocol.TokenPing(int id, string arg) { // I receive a ping. I can answer a pong. Ivy.traceProtocol(Resources.IvyClient, Resources.PingReceive + appName + " : " + arg ); stream.TokenPong(id,arg); } void IvyProtocol.TokenPong(int id, string arg) { Ivy.traceProtocol(Resources.IvyClient, Resources.PingReceive + appName + " : " + arg); } /// /// return full Application name pair /// public override String ToString() { return Resources.IvyClient+ " " + bus.appName + ":" + appName; } /* is the Pinging Thread Runninng */ internal bool isPinging; private void PingerRun() { isPinging = true; Ivy.traceProtocol(Resources.IvyClient,Resources.PingerThreadStarted); while (isPinging) { try { Thread.Sleep(PINGTIMEOUT); int id = (int)DateTime.Now.Ticks; stream.TokenPing( id, Resources.PingerThreadMessage); } catch (ThreadAbortException ie) { Ivy.traceError(Resources.IvyClient,Resources.PingerThreadKilled + ie.Message); } } Ivy.traceProtocol(Resources.IvyClient,Resources.PingerThreadStopped); } /// /// stop the pinger Thread /// public virtual void StopPinging() { isPinging = false; //pingerThread.Interrupt(); pingerThread.Abort(); } #region IDisposable Members //Implement IDisposable. public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposing) { // Free other state (managed objects). if (stream != null) { stream.Close(); stream = null; } } // Free your own state (unmanaged objects). // Set large fields to null. } // Use C# destructor syntax for finalization code. ~IvyClient() { // Simply call Dispose(false). Dispose(false); } #endregion } }