From 2658a3c52a9a0d22ece7f52f87bae7d57ce5f995 Mon Sep 17 00:00:00 2001 From: fcolin Date: Thu, 1 Feb 2007 09:46:37 +0000 Subject: Utilisateur : Fcolin Date : 30/06/05 Heure : 14:21 Archivé dans $/CSharp/Ivy Commentaire: Bug start time < 0 en l'absence de rejeu a voir ajouter 1s de blank au debut (vss 21) --- CSharp/Ivy/Ivy/IvyClient.cs | 534 ++++++++++++++++++++++---------------------- 1 file changed, 261 insertions(+), 273 deletions(-) (limited to 'CSharp') diff --git a/CSharp/Ivy/Ivy/IvyClient.cs b/CSharp/Ivy/Ivy/IvyClient.cs index 37a9fde..de1d413 100644 --- a/CSharp/Ivy/Ivy/IvyClient.cs +++ b/CSharp/Ivy/Ivy/IvyClient.cs @@ -6,8 +6,6 @@ namespace IvyBus { using System; - using System.Text.RegularExpressions; - using System.Text; using System.Collections; using System.Threading; using System.IO; @@ -23,29 +21,39 @@ namespace IvyBus /// die messages, direct messages, add or remove regexps, or quit. A thread is /// created for each remote client. /// - public class IvyClient + public class IvyClient { - public String ApplicationName + public class IvyClientPriority : IComparer { - get + + // Calls CaseInsensitiveComparer.Compare with the parameters reversed. + int IComparer.Compare( Object x, Object y ) { - return appName; + IvyClient c1 = (IvyClient)x; + IvyClient c2 = (IvyClient)y; + return( c2.clientPriority - c1.clientPriority ); } - + } - internal Int32 ClientKey + + public String ApplicationName { get { - return clientKey; + return appName; } } - public ICollection Regexps + + public string[] Regexps { get { - return regexp_text.Values; + int i = 0; + String[] tab = new String[bindings.Count]; + foreach( IvyBindingBase bind in bindings.Values ) + tab[i++] = bind.expression; + return tab; } } @@ -75,39 +83,17 @@ namespace IvyBus } - /* the protocol magic numbers */ - internal enum MessageType : int - { - Bye = 0, /* end of the peer */ - AddRegexp = 1, /* the peer adds a regexp */ - Msg = 2, /* the peer sends a message */ - Error = 3, /* error message */ - DelRegexp = 4, /* the peer removes one of his regex */ - EndRegexp = 5, /* no more regexp in the handshake */ - StartRegexp = 6, /* avoid race condition in concurrent connexions */ - DirectMsg = 7, /* the peer sends a direct message */ - Die = 8, /* the peer wants us to quit */ - Ping = 9, /* checks the presence of the other */ - Pong = 10 /* checks the presence of the other */ - }; - - internal const String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ - internal const String StartArg = "\u0002"; /* begin of arguments */ - internal const String EndArg = "\u0003"; /* end of arguments */ - - private static int clientSerial = 0; /* an unique ID for each IvyClient */ - private Ivy bus; private Socket socket; - private StreamWriter out_stream; - private StreamReader in_stream; - private Hashtable regexp_in; - private Hashtable regexp_text; + private IvyStream stream; + private Hashtable bindings; private int appPort; + private string clientId; /* an unique ID for each IvyClient */ + private int clientPriority; /* client priority */ + private bool peerCalling; private volatile bool running = false; private volatile Thread clientThread; // volatile to ensure the quick communication - private Int32 clientKey; private static bool doping = (ConfigurationSettings.AppSettings["IVY_PING"] != null); private const int PINGTIMEOUT = 5000; private volatile Thread pingerThread; @@ -117,30 +103,33 @@ namespace IvyBus internal IvyClient(Ivy bus, Socket socket) { - regexp_in = Hashtable.Synchronized(new Hashtable()); - regexp_text = Hashtable.Synchronized(new Hashtable()); + bindings = Hashtable.Synchronized(new Hashtable()); appName = "Unknown"; appPort = 0; this.bus = bus; this.socket = socket; - clientKey = clientSerial++; - NetworkStream net_stream = new NetworkStream( socket ); - out_stream = new StreamWriter(net_stream,System.Text.Encoding.ASCII); - in_stream = new StreamReader(net_stream,System.Text.Encoding.ASCII); + + socket.SetSocketOption( SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, 1 ); + stream = new IvyStream( socket ); + + clientPriority = Ivy.DEFAULT_PRIORITY; + + sendApplicationId(); + // sends our ID, whether we initiated the connexion or not // the ID is the couple "host name,application Port", the host name // information is in the socket itself, the port is not known if we // initiate the connexion - send(MessageType.StartRegexp, bus.applicationPort, bus.appName); + stream.sendMsg(IvyStream.MessageType.StartRegexp, bus.applicationPort, bus.appName); // sends our regexps to the peer - lock( bus.regexp_out.SyncRoot ) + lock( bus.bindings.SyncRoot ) { - foreach (Int32 ikey in bus.regexp_out.Keys ) + foreach (Ivy.ApplicationBinding bind in bus.bindings.Values ) { - sendRegexp(ikey, (String) bus.regexp_out[ikey]); + sendBinding(bind); } } - send(MessageType.EndRegexp, 0, ""); + stream.sendMsg(IvyStream.MessageType.EndRegexp, 0, ""); // spawns a thread to manage the incoming traffic on this // socket. We should be ready to receive messages now. clientThread = new Thread(new ThreadStart(this.Run)); @@ -160,18 +149,20 @@ namespace IvyBus /// perhaps we should implement a new IvyApplicationListener method to /// allow the notification of regexp addition and deletion /// - - - internal void sendRegexp(int id, String regexp) + internal void sendApplicationId() { - send(MessageType.AddRegexp, id, regexp); /* perhaps we should perform some checking here */ + stream.sendMsg( IvyStream.MessageType.ApplicationId, bus.applicationPriority, bus.AppId); } - internal void delRegexp(int id) + internal void sendBinding(Ivy.ApplicationBinding bind) { - send(MessageType.DelRegexp, id, ""); + stream.sendMsg(bind.type == Ivy.BindingType.BindRegexp ? IvyStream.MessageType.AddRegexp: IvyStream.MessageType.AddBinding, bind.key, bind.regexp); /* perhaps we should perform some checking here */ } + internal void delBinding(Ivy.ApplicationBinding bind) + { + stream.sendMsg(bind.type == Ivy.BindingType.BindRegexp ? IvyStream.MessageType.DelRegexp: IvyStream.MessageType.DelBinding, bind.key, ""); + } /// sends a direct message to the peer /// /// the numeric value provided to the remote client @@ -181,7 +172,26 @@ namespace IvyBus /// public void sendDirectMsg(int id, String message) { - send(MessageType.DirectMsg, id, message); + try + { + stream.sendMsg(IvyStream.MessageType.DirectMsg, id, message); + } + catch (IOException e) + { + traceDebug("I can't send my message to this client. He probably left"); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the Disconnected applicationListeners + bus.FireClientDisconnected(this); + try + { + close(false); + } + catch (IOException ioe) + { + throw new IvyException("close failed" + ioe.Message); + } + } } /// closes the connexion to the peer. @@ -202,10 +212,13 @@ namespace IvyBus stopListening(); // bus.clientDisconnect(this); // in_stream close in the thread - out_stream.Close(); - out_stream = null; - socket.Close(); - socket = null; + if ( stream != null ) + { + stream.Close(); + stream = null; + //socket.Close(); // pris en charge par stream ( NetWorkStream ) + socket = null; + } } /// sends the substrings of a message to the peer for each matching regexp. @@ -218,16 +231,16 @@ namespace IvyBus internal int sendMsg(String message) { int count = 0; - lock( regexp_in.SyncRoot ) + + lock( bindings.SyncRoot ) { - IDictionaryEnumerator myEnumerator = regexp_in.GetEnumerator(); - while ( myEnumerator.MoveNext() ) + IvyBindingSimple.Prepare( message ); + foreach (IvyBindingBase bind in bindings.Values ) { - Regex regexp = (Regex) myEnumerator.Value; - Match result = regexp.Match(message); - if (result.Success) + string[] array = bind.Match(message); + if (array!=null) { - send(MessageType.Msg, (int)myEnumerator.Key, result); + stream.sendMsg(IvyStream.MessageType.Msg, bind.key, array); count++; } } @@ -241,8 +254,8 @@ namespace IvyBus return ; // Tell Thread to stop. running = false; - if ( in_stream != null ) - in_stream.Close(); + if ( stream != null ) + stream.Close(); if ( Thread.CurrentThread != clientThread ) { // Wait for Thread to end. @@ -269,7 +282,10 @@ namespace IvyBus /// private void Run() { - String msg = null; + IvyStream.MessageType type; + int id; + string[] args; + try { traceDebug("Connected from " + RemoteAddress + ":" + RemotePort); @@ -281,22 +297,20 @@ namespace IvyBus traceDebug("Thread started"); bool running = true; - while (running) + while ( running && (stream != null) ) { try { - if ((msg = in_stream.ReadLine()) != null) + if ( stream.receiveMsg(out type, out id, out args) ) { // early stop during readLine() if (doping && (pingerThread != null)) pingerThread.Interrupt(); - if ( msg.Length == 0 ) - Console.Error.WriteLine("Should close"); - newParseMsg(msg); + DispatchMsg(type, id, args); } else { - traceDebug("readline null ! leaving the thead"); + traceDebug("readline null ! leaving the thread"); break; } } @@ -336,8 +350,11 @@ namespace IvyBus break; } } - in_stream.Close(); - in_stream = null; + if ( stream != null ) + { + stream.Close(); + stream = null; + } traceDebug("normally Disconnected from " + appName); traceDebug("Thread stopped"); // invokes the Disconnected applicationListeners @@ -347,252 +364,224 @@ namespace IvyBus } - private void sendBuffer(String buffer) + + private void recvDie( int id, string[] arg ) { - buffer += "\n"; + traceDebug("received die Message from " + appName); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the die applicationListeners + bool dontkillapp = bus.FireDie(this, id); + // makes the bus die + bus.stop(); try { - if ( out_stream != null ) - { - out_stream.Write(buffer); - out_stream.Flush(); - } + close(false); } - catch (IOException e) + catch (IOException ioe) { - traceDebug("I can't send my message to this client. He probably left"); - // first, I'm not a first class IvyClient any more - bus.removeClient(this); - // invokes the Disconnected applicationListeners - bus.FireClientDisconnected(this); - try - { - close(false); - } - catch (IOException ioe) - { - throw new IvyException("close failed" + ioe.Message); - } + throw new IvyException(ioe.Message); } + if ( !dontkillapp ) + Environment.Exit( -1 ); } - - private void send(MessageType type, int id, String arg) + private void recvBye( int id, string[] arg ) { + // the peer quits + traceDebug("received bye Message from " + appName); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the disconnect applicationListeners + //bus.FireClientDisconnected(this); done in Running Thread try { - sendBuffer((int)type + " " + id + StartArg + arg); + close(false); // will fire diconnected } - catch (IvyException ie) + catch (IOException ioe) { - Console.Error.WriteLine("received an exception: " + ie.Message); - Console.Error.WriteLine(ie.StackTrace); + throw new IvyException(ioe.Message); } } - - private void send(MessageType type, Int32 id, Match result) + private void recvAddRegexp( int id, string[] arg ) { - String buffer = (int)type + " " + id + StartArg; - // Start at 1 because group 0 represent entire matching - for (int sub = 1; sub < result.Groups.Count; sub++) + string regexp = arg[0]; + if (bus.CheckRegexp(regexp)) { - buffer += result.Groups[sub] + EndArg; - } - try - { - sendBuffer(buffer); + try + { + IvyBindingRegexp bind = new IvyBindingRegexp(id,regexp); + lock( bindings.SyncRoot ) + { + bindings.Add( id, bind); + } + bus.FireClientAddBinding( this, bind.expression ); + } + catch (ArgumentException e) + { + throw new IvyException("regexp error " + e.Message); + } } - catch (IvyException ie) + else { - Console.Error.WriteLine("received an exception: " + ie.Message); - Console.Error.WriteLine(ie.StackTrace); + traceDebug("regexp Warning exp='" + arg + "' can't match removing from " + appName); } } - - private void newParseMsg(String s) + private void recvDelRegexp( int id, string[] arg ) { - int from = 0, to = 0; - MessageType msgType; - int msgId; - while ((to < s.Length) && (s[to] != ' ')) + lock( bindings.SyncRoot ) { - to++; + IvyBindingBase bind = (IvyBindingBase) bindings[id]; + bus.FireClientRemoveBinding( this, bind.expression ); + bindings.Remove(id); } - if (to >= s.Length) - throw new IvyException("protocol error"); + + } + private void recvAddBinding( int id, string[] arg ) + { + string expression = arg[0]; + try { - msgType = (MessageType)Int32.Parse(s.Substring(from, (to) - (from))); + IvyBindingSimple bind = new IvyBindingSimple(id, expression); + + lock( bindings.SyncRoot ) + { + bindings.Add( id, bind); + } + bus.FireClientAddBinding( this, bind.expression ); } - catch (FormatException nfe) + catch (ArgumentException e) { - throw new IvyException("protocol error on msgType"); + throw new IvyException("regexp error " + e.Message); } - from = to + 1; - while ((to < s.Length) && (s[to] != 2)) + + } + private void recvDelBinding( int id, string[] arg ) + { + lock( bindings.SyncRoot ) { - to++; + IvyBindingBase bind = (IvyBindingBase) bindings[id]; + bus.FireClientRemoveBinding( this, bind.expression ); + bindings.Remove(id); } - if (to >= s.Length) - throw new IvyException("protocol error"); - try + } + private void recvMsg( int id, string[] arg ) + { + bus.callCallback(this, id, arg); + } + private void recvError( int id, string[] arg ) + { + traceDebug("Error msg " + id + " " + arg); + } + private void recvApplicationId( int id, string[] arg ) + { + clientId = arg[0]; + if ( clientPriority != id ) { - msgId = Int32.Parse(s.Substring(from, (to) - (from))); + clientPriority = id; + bus.SortClients(); } - catch (FormatException nfe) + } + private void recvEndRegexp( int id, string[] arg ) + { + bus.FireClientConnected(this); + /* + * the peer is perhaps not ready to handle this message + * an assymetric processing should be written + */ + if (bus.ready_message != null) + sendMsg(bus.ready_message); + } + private void recvStartRegexp( int id, string[] arg ) + { + appName = arg[0]; + appPort = id; + if (bus.checkConnected(this)) { - throw new IvyException("protocol error on identifier"); + try + { + close(false); + } + catch (IOException ioe) + { + throw new IvyException("io " + ioe.Message); + } + throw new IvyException("Rare ! A concurrent connect occured"); } - from = to + 1; + + } + private void recvDirectMsg( int id, string[] arg ) + { + bus.FireDirectMessage(this, id, arg[0]); + } + private void recvPing( int id, string[] arg ) + { + // I receive a ping. I can answer a pong. + traceDebug("Ping msg from " + appName + " : " + arg); + sendPong(arg[0]); + } + private void recvPong( int id, string[] arg ) + { + traceDebug("Ping msg from " + appName + " : " + arg); + } + private void DispatchMsg(IvyStream.MessageType msgType, int msgId, string[] msgData) + { + switch (msgType) { - case MessageType.Die: - traceDebug("received die Message from " + appName); - // first, I'm not a first class IvyClient any more - bus.removeClient(this); - // invokes the die applicationListeners - bool dontkillapp = bus.FireDie(this, msgId); - // makes the bus die - bus.stop(); - try - { - close(false); - } - catch (IOException ioe) - { - throw new IvyException(ioe.Message); - } - if ( !dontkillapp ) - Environment.Exit( -1 ); + case IvyStream.MessageType.Die: + recvDie( msgId, msgData ); break; - case MessageType.Bye: - // the peer quits - traceDebug("received bye Message from " + appName); - // first, I'm not a first class IvyClient any more - bus.removeClient(this); - // invokes the disconnect applicationListeners - //bus.FireClientDisconnected(this); done in Running Thread - try - { - close(false); // will fire diconnected - } - catch (IOException ioe) - { - throw new IvyException(ioe.Message); - } + case IvyStream.MessageType.Bye: + recvBye( msgId, msgData ); break; - case MessageType.AddRegexp: - String regexp = s.Substring(from, (s.Length) - (from)); - if (bus.CheckRegexp(regexp)) - { - try - { - lock( regexp_in.SyncRoot ) - { - regexp_in.Add( msgId, new Regex(regexp,RegexOptions.Compiled|RegexOptions.IgnoreCase)); - } - lock( regexp_text.SyncRoot ) - { - regexp_text.Add( msgId, regexp); - } - } - catch (ArgumentException e) - { - throw new IvyException("regexp error " + e.Message); - } - } - else - { - traceDebug("regexp Warning exp='" + regexp + "' can't match removing from " + appName); - } + case IvyStream.MessageType.AddRegexp: + recvAddRegexp( msgId, msgData ); break; - case MessageType.DelRegexp: - lock( regexp_in.SyncRoot ) - { - regexp_in.Remove(msgId); - } - lock( regexp_text ) - { - regexp_text.Remove( msgId ); - } + case IvyStream.MessageType.DelRegexp: + recvDelRegexp( msgId, msgData ); + break; + case IvyStream.MessageType.AddBinding: + recvAddBinding( msgId, msgData ); break; - case MessageType.EndRegexp: - bus.FireClientConnected(this); - /* - * the peer is perhaps not ready to handle this message - * an assymetric processing should be written - */ - if (bus.ready_message != null) - sendMsg(bus.ready_message); + case IvyStream.MessageType.DelBinding: + recvDelBinding( msgId, msgData ); break; - case MessageType.Msg: - ArrayList v = new ArrayList(); - while (to < s.Length) - { - while ((to < s.Length) && (s[to] != 3)) - { - to++; - } - if (to < s.Length) - { - v.Add(s.Substring(from, (to) - (from))); - to++; - from = to; - } - } - String[] tab = new String[v.Count]; - for ( int i2 = 0; i2 < v.Count; i2++) - { - tab[i2] = (String) v[i2]; - // for developpemnt purposes - // out.println(" *"+tab[i]+"* "+(tab[i]).length()); - } - bus.callCallback(this, msgId, tab); + case IvyStream.MessageType.EndRegexp: + recvEndRegexp( msgId, msgData ); break; - case MessageType.Pong: - String paramPong = s.Substring(from, (s.Length) - (from)); - traceDebug("Ping msg from " + appName + " : " + paramPong); + case IvyStream.MessageType.Msg: + recvMsg( msgId, msgData ); break; - case MessageType.Ping: - // I receive a ping. I can answer a pong. - String param = s.Substring(from, (s.Length) - (from)); - traceDebug("Ping msg from " + appName + " : " + param); - sendPong(param); + case IvyStream.MessageType.Pong: + recvPong( msgId, msgData ); break; - case MessageType.Error: - String error = s.Substring(from, (s.Length) - (from)); - traceDebug("Error msg " + msgId + " " + error); + case IvyStream.MessageType.Ping: + recvPing( msgId, msgData ); break; - case MessageType.StartRegexp: - appName = s.Substring(from, (s.Length) - (from)); - appPort = msgId; - if (bus.checkConnected(this)) - { - try - { - close(false); - } - catch (IOException ioe) - { - throw new IvyException("io " + ioe.Message); - } - throw new IvyException("Rare ! A concurrent connect occured"); - } + case IvyStream.MessageType.Error: + recvError( msgId, msgData ); break; - case MessageType.DirectMsg: - String direct = s.Substring(from, (s.Length) - (from)); - bus.FireDirectMessage(this, msgId, direct); + case IvyStream.MessageType.StartRegexp: + recvStartRegexp( msgId, msgData ); break; + case IvyStream.MessageType.DirectMsg: + recvDirectMsg( msgId, msgData ); + break; + case IvyStream.MessageType.ApplicationId: + recvApplicationId( msgId, msgData ); + break; default: throw new IvyException("protocol error, unknown message type " + msgType); @@ -601,29 +590,29 @@ namespace IvyBus private void sendPong(String s) { - send(MessageType.Pong, 0, s); + stream.sendMsg(IvyStream.MessageType.Pong, 0, s); } public void sendPing(String s) { - send(MessageType.Ping, 0, s); + stream.sendMsg(IvyStream.MessageType.Ping, 0, s); } private void sendBye() { - send(MessageType.Bye, 0, ""); + stream.sendMsg(IvyStream.MessageType.Bye, 0, ""); } private void sendBye(String message) { - send(MessageType.Bye, 0, message); + stream.sendMsg(IvyStream.MessageType.Bye, 0, message); } public void sendDie() { - send(MessageType.Die, 0, ""); + stream.sendMsg(IvyStream.MessageType.Die, 0, ""); } private void sendDie(String message) { - send(MessageType.Die, 0, message); + stream.sendMsg(IvyStream.MessageType.Die, 0, message); } @@ -661,6 +650,5 @@ namespace IvyBus isPinging = false; pingerThread.Interrupt(); } - } } \ No newline at end of file -- cgit v1.1