From b3978cb627b616e50da87972a2fe593712e728c0 Mon Sep 17 00:00:00 2001 From: fcolin Date: Thu, 1 Feb 2007 09:45:58 +0000 Subject: Utilisateur : Fcolin Date : 19/05/03 Heure : 15:31 Créé Commentaire: (vss 1) --- CSharp/Ivy/Ivy/IvyClient.cs | 633 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 633 insertions(+) create mode 100644 CSharp/Ivy/Ivy/IvyClient.cs (limited to 'CSharp') diff --git a/CSharp/Ivy/Ivy/IvyClient.cs b/CSharp/Ivy/Ivy/IvyClient.cs new file mode 100644 index 0000000..532fdd5 --- /dev/null +++ b/CSharp/Ivy/Ivy/IvyClient.cs @@ -0,0 +1,633 @@ +/// A private Class for the the peers on the bus. +/// * +/// +/// François-Régis Colin +/// +/// http://www.tls.cena.fr/products/ivy/ +/// * +/// each time a connexion is made with a remote peer, the regexp are exchanged +/// once ready, a ready message is sent, and then we can send messages, +/// die messages, direct messages, add or remove regexps, or quit. A thread is +/// created for each remote client. +/// * +/// +/// +namespace IvyBus +{ + using System; + using System.Text.RegularExpressions; + using System.Text; + using System.Collections; + using System.Threading; + using System.IO; + using System.Net; + using System.Net.Sockets; + using System.Configuration; + + public class MyTcpClient : TcpClient + { + public IPAddress RemoteAddress + { + get + { + IPEndPoint ep = (IPEndPoint)Client.RemoteEndPoint; + return ep.Address; + } + + } + public int RemotePort + { + get + { + IPEndPoint ep = (IPEndPoint)Client.RemoteEndPoint; + return ep.Port; + } + + } + public MyTcpClient(string host,int port):base(host,port) + { + } + public MyTcpClient(Socket sock) + { + Client = sock; + } + } + + public class IvyClient + { + public virtual String ApplicationName + { + get + { + return appName; + } + + } + internal Int32 ClientKey + { + get + { + return clientKey; + } + + } + public ICollection Regexps + { + get + { + return regexp_text.Values; + } + + } + internal int AppPort + { + get + { + return appPort; + } + + } + private String RemoteAddress + { + get + { + return socket.RemoteAddress.ToString(); + } + + } + private String RemotePort + { + get + { + return socket.RemotePort.ToString(); + } + + } + + /* the protocol magic numbers */ + internal enum MessageType : int + { + Bye = 0, /* end of the peer */ + AddRegexp = 1, /* the peer adds a regexp */ + Msg = 2, /* the peer sends a message */ + Error = 3, /* error message */ + DelRegexp = 4, /* the peer removes one of his regex */ + EndRegexp = 5, /* no more regexp in the handshake */ + StartRegexp = 6, /* avoid race condition in concurrent connexions */ + DirectMsg = 7, /* the peer sends a direct message */ + Die = 8, /* the peer wants us to quit */ + Ping = 9, /* checks the presence of the other */ + Pong = 10 /* checks the presence of the other */ + }; + + internal const String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */ + internal const String StartArg = "\u0002"; /* begin of arguments */ + internal const String EndArg = "\u0003"; /* end of arguments */ + + private static int clientSerial = 0; /* an unique ID for each IvyClient */ + + private Ivy bus; + private MyTcpClient socket; + private StreamReader in_stream; + private StreamWriter out_stream; + private Hashtable regexp_in; + private Hashtable regexp_text; + private int appPort; + private bool peerCalling; + private volatile Thread clientThread; // volatile to ensure the quick communication + private Int32 clientKey; + private static bool doping = (ConfigurationSettings.AppSettings["IVY_PING"] != null); + internal const int PINGTIMEOUT = 5000; + private volatile Thread pingerThread; + + // protected variables + internal String appName; + + internal IvyClient(Ivy bus, MyTcpClient socket) + { + regexp_in = new Hashtable(); + regexp_text = new Hashtable(); + appName = "Unknown"; + appPort = 0; + this.bus = bus; + this.socket = socket; + clientKey = clientSerial++; + in_stream = new StreamReader(socket.GetStream(),System.Text.Encoding.ASCII); + out_stream = new StreamWriter(socket.GetStream(),System.Text.Encoding.ASCII); + Hashtable regexps = bus.regexp_out; + // sends our ID, whether we initiated the connexion or not + // the ID is the couple "host name,application Port", the host name + // information is in the socket itself, the port is not known if we + // initiate the connexion + send(MessageType.StartRegexp, bus.applicationPort, bus.appName); + // sends our regexps to the peer + foreach (Int32 ikey in regexps.Keys ) + { + sendRegexp(ikey, (String) regexps[ikey]); + } + send(MessageType.EndRegexp, 0, ""); + // spawns a thread to manage the incoming traffic on this + // socket. We should be ready to receive messages now. + clientThread = new Thread(new ThreadStart(this.Run)); + clientThread.Start(); + if (doping) + { + pingerThread = new Thread(new ThreadStart(PingerRun)); + pingerThread.Start(); + } + } + + /// returns the name of the remote agent. + /// + + + /// allow an Ivy package class to access the list of regexps at a + /// given time. + /// perhaps we should implement a new IvyApplicationListener method to + /// allow the notification of regexp addition and deletion + /// + + + internal void sendRegexp(int id, String regexp) + { + send(MessageType.AddRegexp, id, regexp); /* perhaps we should perform some checking here */ + } + + public virtual void delRegexp(int id) + { + send(MessageType.DelRegexp, id, ""); + } + + /// sends a direct message to the peer + /// + /// the numeric value provided to the remote client + /// + /// the string that will be match-tested + /// + /// + public void sendDirectMsg(int id, String message) + { + send(MessageType.DirectMsg, id, message); + } + + /// closes the connexion to the peer. + /// + /// should I send Bye message ? + /// the thread managing the socket is stopped + /// + /// + internal void close(bool notify) + { + traceDebug("closing connexion to " + appName); + if (doping ) + { + stopPinging(); + } + if (notify) + sendBye("hasta la vista"); + stopListening(); + // bus.clientDisconnect(this); + socket.Close(); // should I also close in and out ? + } + + /// sends the substrings of a message to the peer for each matching regexp. + /// + /// the string that will be match-tested + /// + /// the number of messages sent to the peer + /// + /// + internal int sendMsg(String message) + { + int count = 0; + foreach (Int32 key in regexp_in.Keys ) + { + Regex regexp = (Regex) regexp_in[key]; + Match result = regexp.Match(message); + if (result.Success) + { + send(MessageType.Msg, key, result); + count++; + } + } + return count; + } + + internal void stopListening() + { + Thread t = clientThread; + if (t == null) + return ; + // we can be summoned to quit from two path at a time + clientThread = null; + t.Interrupt(); + } + + /// compares two peers the id is the couple (host,service port). + /// + /// the other peer + /// + /// true if the peers are similir. This should not happen, it is bad + /// © ® (tm) + /// + /// + internal bool sameClient(IvyClient clnt) + { + return (appPort != 0 && appPort == clnt.appPort) && (RemoteAddress == clnt.RemoteAddress); + } + + /// the code of the thread handling the incoming messages. + /// + public void Run() + { + String msg = null; + try + { + traceDebug("Connected from " + RemoteAddress + ":" + RemotePort); + } + catch (Exception ie) + { + traceDebug("Interrupted while resolving remote hostname"); + } + traceDebug("Thread started"); + bool running = true; + while (running) + { + try + { + if ((msg = in_stream.ReadLine()) != null) + { + // early stop during readLine() + if (doping && (pingerThread != null)) + pingerThread.Interrupt(); + newParseMsg(msg); + } + else + { + traceDebug("readline null ! leaving the thead"); + break; + } + } + catch (IvyException ie) + { + Console.Error.WriteLine( ie.Message ); + Console.Error.WriteLine( ie.StackTrace); + running = false; + } + catch (IOException e) + { + traceDebug("abnormally Disconnected from " + RemoteAddress + ":" + RemotePort); + Console.Error.WriteLine( e.Message ); + Console.Error.WriteLine( e.StackTrace); + running = false; + break; + } + } + traceDebug("normally Disconnected from " + appName); + traceDebug("Thread stopped"); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the Disconnected applicationListeners + bus.FireClientDisconnected(this); + } + + private void sendBuffer(String buffer) + { + buffer += "\n"; + try + { + out_stream.Write(buffer); + out_stream.Flush(); + } + catch (IOException e) + { + traceDebug("I can't send my message to this client. He probably left"); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the Disconnected applicationListeners + bus.FireClientDisconnected(this); + try + { + close(false); + } + catch (IOException ioe) + { + throw new IvyException("close failed" + ioe.Message); + } + } + } + + private void send(MessageType type, int id, String arg) + { + try + { + sendBuffer((int)type + " " + id + StartArg + arg); + } + catch (IvyException ie) + { + Console.Error.WriteLine("received an exception: " + ie.Message); + Console.Error.WriteLine(ie.StackTrace); + } + } + + private void send(MessageType type, Int32 id, Match result) + { + String buffer = (int)type + " " + id + StartArg; + // Start at 1 because group 0 represent entire matching + for (int sub = 1; sub < result.Groups.Count; sub++) + { + buffer += result.Groups[sub] + EndArg; + } + try + { + sendBuffer(buffer); + } + catch (IvyException ie) + { + Console.Error.WriteLine("received an exception: " + ie.Message); + Console.Error.WriteLine(ie.StackTrace); + } + } + + private void newParseMsg(String s) + { + int from = 0, to = 0; + MessageType msgType; + int msgId; + while ((to < s.Length) && (s[to] != ' ')) + { + to++; + } + if (to >= s.Length) + throw new IvyException("protocol error"); + try + { + msgType = (MessageType)Int32.Parse(s.Substring(from, (to) - (from))); + } + catch (FormatException nfe) + { + throw new IvyException("protocol error on msgType"); + } + from = to + 1; + while ((to < s.Length) && (s[to] != 2)) + { + to++; + } + if (to >= s.Length) + throw new IvyException("protocol error"); + try + { + msgId = Int32.Parse(s.Substring(from, (to) - (from))); + } + catch (FormatException nfe) + { + throw new IvyException("protocol error on identifier"); + } + from = to + 1; + switch (msgType) + { + case MessageType.Die: + traceDebug("received die Message from " + appName); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the die applicationListeners + bus.FireDie(this, msgId); + // makes the bus die + bus.stop(); + try + { + close(false); + } + catch (IOException ioe) + { + throw new IvyException(ioe.Message); + } + break; + + case MessageType.Bye: + // the peer quits + traceDebug("received bye Message from " + appName); + // first, I'm not a first class IvyClient any more + bus.removeClient(this); + // invokes the disconnect applicationListeners + //bus.FireClientDisconnected(this); done in Running Thread + try + { + close(false); // will fire diconnected + } + catch (IOException ioe) + { + throw new IvyException(ioe.Message); + } + break; + + case MessageType.AddRegexp: + String regexp = s.Substring(from, (s.Length) - (from)); + if (bus.CheckRegexp(regexp)) + { + try + { + regexp_in.Add( msgId, new Regex(regexp)); + regexp_text.Add( msgId, regexp); + } + catch (ArgumentException e) + { + throw new IvyException("regexp error " + e.Message); + } + } + else + { + throw new IvyException("regexp Warning exp='" + regexp + "' can't match removing from " + appName); + } + break; + + case MessageType.DelRegexp: + regexp_in.Remove(msgId); + regexp_text.Remove( msgId ); + break; + + case MessageType.EndRegexp: + bus.FireClientConnected(this); + /* + * the peer is perhaps not ready to handle this message + * an assymetric processing should be written + */ + if (bus.ready_message != null) + sendMsg(bus.ready_message); + break; + + case MessageType.Msg: + ArrayList v = new ArrayList(); + while (to < s.Length) + { + while ((to < s.Length) && (s[to] != 3)) + { + to++; + } + if (to < s.Length) + { + v.Add(s.Substring(from, (to) - (from))); + to++; + from = to; + } + } + String[] tab = new String[v.Count]; + for ( int i2 = 0; i2 < v.Count; i2++) + { + tab[i2] = (String) v[i2]; + // for developpemnt purposes + // out.println(" *"+tab[i]+"* "+(tab[i]).length()); + } + bus.callCallback(this, msgId, tab); + break; + + case MessageType.Pong: + String paramPong = s.Substring(from, (s.Length) - (from)); + traceDebug("Ping msg from " + appName + " : " + paramPong); + break; + + case MessageType.Ping: + // I receive a ping. I can answer a pong. + String param = s.Substring(from, (s.Length) - (from)); + traceDebug("Ping msg from " + appName + " : " + param); + sendPong(param); + break; + + case MessageType.Error: + String error = s.Substring(from, (s.Length) - (from)); + traceDebug("Error msg " + msgId + " " + error); + break; + + case MessageType.StartRegexp: + appName = s.Substring(from, (s.Length) - (from)); + appPort = msgId; + if (bus.checkConnected(this)) + { + try + { + close(false); + } + catch (IOException ioe) + { + throw new IvyException("io " + ioe.Message); + } + throw new IvyException("Rare ! A concurrent connect occured"); + } + break; + + case MessageType.DirectMsg: + String direct = s.Substring(from, (s.Length) - (from)); + bus.FireDirectMessage(this, msgId, direct); + break; + + default: + throw new IvyException("protocol error, unknown message type " + msgType); + + } + } + + internal void sendPong(String s) + { + send(MessageType.Pong, 0, s); + } + public void sendPing(String s) + { + send(MessageType.Ping, 0, s); + } + + private void sendBye() + { + send(MessageType.Bye, 0, ""); + } + private void sendBye(String message) + { + send(MessageType.Bye, 0, message); + } + + public virtual void sendDie() + { + send(MessageType.Die, 0, ""); + } + public virtual void sendDie(String message) + { + send(MessageType.Die, 0, message); + } + + + public override String ToString() + { + return "IvyClient " + bus.appName + ":" + appName; + } + + private void traceDebug(String s) + { + if (bus.Debug) + Console.Out.WriteLine("-->IvyClient " + bus.appName + ":" + appName + "<-- " + s); + } + + internal bool isPinging = false; + public void PingerRun() + { + isPinging = true; + traceDebug("Pinger Thread started"); + while (isPinging) + { + try + { + Thread.Sleep(new TimeSpan(10000 * PINGTIMEOUT)); + sendPing("are you here ?"); + } + catch (ThreadInterruptedException ie) + { + } + } + traceDebug("Pinger Thread stopped"); + } + public virtual void stopPinging() + { + isPinging = false; + pingerThread.Interrupt(); + } + + } +} \ No newline at end of file -- cgit v1.1