summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CSharp/Ivy/Ivy/IvyClient.cs534
1 files changed, 261 insertions, 273 deletions
diff --git a/CSharp/Ivy/Ivy/IvyClient.cs b/CSharp/Ivy/Ivy/IvyClient.cs
index 37a9fde..de1d413 100644
--- a/CSharp/Ivy/Ivy/IvyClient.cs
+++ b/CSharp/Ivy/Ivy/IvyClient.cs
@@ -6,8 +6,6 @@
namespace IvyBus
{
using System;
- using System.Text.RegularExpressions;
- using System.Text;
using System.Collections;
using System.Threading;
using System.IO;
@@ -23,29 +21,39 @@ namespace IvyBus
/// die messages, direct messages, add or remove regexps, or quit. A thread is
/// created for each remote client.
/// </remarks>
- public class IvyClient
+ public class IvyClient
{
- public String ApplicationName
+ public class IvyClientPriority : IComparer
{
- get
+
+ // Calls CaseInsensitiveComparer.Compare with the parameters reversed.
+ int IComparer.Compare( Object x, Object y )
{
- return appName;
+ IvyClient c1 = (IvyClient)x;
+ IvyClient c2 = (IvyClient)y;
+ return( c2.clientPriority - c1.clientPriority );
}
-
+
}
- internal Int32 ClientKey
+
+ public String ApplicationName
{
get
{
- return clientKey;
+ return appName;
}
}
- public ICollection Regexps
+
+ public string[] Regexps
{
get
{
- return regexp_text.Values;
+ int i = 0;
+ String[] tab = new String[bindings.Count];
+ foreach( IvyBindingBase bind in bindings.Values )
+ tab[i++] = bind.expression;
+ return tab;
}
}
@@ -75,39 +83,17 @@ namespace IvyBus
}
- /* the protocol magic numbers */
- internal enum MessageType : int
- {
- Bye = 0, /* end of the peer */
- AddRegexp = 1, /* the peer adds a regexp */
- Msg = 2, /* the peer sends a message */
- Error = 3, /* error message */
- DelRegexp = 4, /* the peer removes one of his regex */
- EndRegexp = 5, /* no more regexp in the handshake */
- StartRegexp = 6, /* avoid race condition in concurrent connexions */
- DirectMsg = 7, /* the peer sends a direct message */
- Die = 8, /* the peer wants us to quit */
- Ping = 9, /* checks the presence of the other */
- Pong = 10 /* checks the presence of the other */
- };
-
- internal const String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
- internal const String StartArg = "\u0002"; /* begin of arguments */
- internal const String EndArg = "\u0003"; /* end of arguments */
-
- private static int clientSerial = 0; /* an unique ID for each IvyClient */
-
private Ivy bus;
private Socket socket;
- private StreamWriter out_stream;
- private StreamReader in_stream;
- private Hashtable regexp_in;
- private Hashtable regexp_text;
+ private IvyStream stream;
+ private Hashtable bindings;
private int appPort;
+ private string clientId; /* an unique ID for each IvyClient */
+ private int clientPriority; /* client priority */
+
private bool peerCalling;
private volatile bool running = false;
private volatile Thread clientThread; // volatile to ensure the quick communication
- private Int32 clientKey;
private static bool doping = (ConfigurationSettings.AppSettings["IVY_PING"] != null);
private const int PINGTIMEOUT = 5000;
private volatile Thread pingerThread;
@@ -117,30 +103,33 @@ namespace IvyBus
internal IvyClient(Ivy bus, Socket socket)
{
- regexp_in = Hashtable.Synchronized(new Hashtable());
- regexp_text = Hashtable.Synchronized(new Hashtable());
+ bindings = Hashtable.Synchronized(new Hashtable());
appName = "Unknown";
appPort = 0;
this.bus = bus;
this.socket = socket;
- clientKey = clientSerial++;
- NetworkStream net_stream = new NetworkStream( socket );
- out_stream = new StreamWriter(net_stream,System.Text.Encoding.ASCII);
- in_stream = new StreamReader(net_stream,System.Text.Encoding.ASCII);
+
+ socket.SetSocketOption( SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, 1 );
+ stream = new IvyStream( socket );
+
+ clientPriority = Ivy.DEFAULT_PRIORITY;
+
+ sendApplicationId();
+
// sends our ID, whether we initiated the connexion or not
// the ID is the couple "host name,application Port", the host name
// information is in the socket itself, the port is not known if we
// initiate the connexion
- send(MessageType.StartRegexp, bus.applicationPort, bus.appName);
+ stream.sendMsg(IvyStream.MessageType.StartRegexp, bus.applicationPort, bus.appName);
// sends our regexps to the peer
- lock( bus.regexp_out.SyncRoot )
+ lock( bus.bindings.SyncRoot )
{
- foreach (Int32 ikey in bus.regexp_out.Keys )
+ foreach (Ivy.ApplicationBinding bind in bus.bindings.Values )
{
- sendRegexp(ikey, (String) bus.regexp_out[ikey]);
+ sendBinding(bind);
}
}
- send(MessageType.EndRegexp, 0, "");
+ stream.sendMsg(IvyStream.MessageType.EndRegexp, 0, "");
// spawns a thread to manage the incoming traffic on this
// socket. We should be ready to receive messages now.
clientThread = new Thread(new ThreadStart(this.Run));
@@ -160,18 +149,20 @@ namespace IvyBus
/// perhaps we should implement a new IvyApplicationListener method to
/// allow the notification of regexp addition and deletion
/// </summary>
-
-
- internal void sendRegexp(int id, String regexp)
+ internal void sendApplicationId()
{
- send(MessageType.AddRegexp, id, regexp); /* perhaps we should perform some checking here */
+ stream.sendMsg( IvyStream.MessageType.ApplicationId, bus.applicationPriority, bus.AppId);
}
- internal void delRegexp(int id)
+ internal void sendBinding(Ivy.ApplicationBinding bind)
{
- send(MessageType.DelRegexp, id, "");
+ stream.sendMsg(bind.type == Ivy.BindingType.BindRegexp ? IvyStream.MessageType.AddRegexp: IvyStream.MessageType.AddBinding, bind.key, bind.regexp); /* perhaps we should perform some checking here */
}
+ internal void delBinding(Ivy.ApplicationBinding bind)
+ {
+ stream.sendMsg(bind.type == Ivy.BindingType.BindRegexp ? IvyStream.MessageType.DelRegexp: IvyStream.MessageType.DelBinding, bind.key, "");
+ }
/// <summary> sends a direct message to the peer
/// </summary>
/// <param name='id'>the numeric value provided to the remote client
@@ -181,7 +172,26 @@ namespace IvyBus
/// </param>
public void sendDirectMsg(int id, String message)
{
- send(MessageType.DirectMsg, id, message);
+ try
+ {
+ stream.sendMsg(IvyStream.MessageType.DirectMsg, id, message);
+ }
+ catch (IOException e)
+ {
+ traceDebug("I can't send my message to this client. He probably left");
+ // first, I'm not a first class IvyClient any more
+ bus.removeClient(this);
+ // invokes the Disconnected applicationListeners
+ bus.FireClientDisconnected(this);
+ try
+ {
+ close(false);
+ }
+ catch (IOException ioe)
+ {
+ throw new IvyException("close failed" + ioe.Message);
+ }
+ }
}
/// <summary> closes the connexion to the peer.
@@ -202,10 +212,13 @@ namespace IvyBus
stopListening();
// bus.clientDisconnect(this);
// in_stream close in the thread
- out_stream.Close();
- out_stream = null;
- socket.Close();
- socket = null;
+ if ( stream != null )
+ {
+ stream.Close();
+ stream = null;
+ //socket.Close(); // pris en charge par stream ( NetWorkStream )
+ socket = null;
+ }
}
/// <summary> sends the substrings of a message to the peer for each matching regexp.
@@ -218,16 +231,16 @@ namespace IvyBus
internal int sendMsg(String message)
{
int count = 0;
- lock( regexp_in.SyncRoot )
+
+ lock( bindings.SyncRoot )
{
- IDictionaryEnumerator myEnumerator = regexp_in.GetEnumerator();
- while ( myEnumerator.MoveNext() )
+ IvyBindingSimple.Prepare( message );
+ foreach (IvyBindingBase bind in bindings.Values )
{
- Regex regexp = (Regex) myEnumerator.Value;
- Match result = regexp.Match(message);
- if (result.Success)
+ string[] array = bind.Match(message);
+ if (array!=null)
{
- send(MessageType.Msg, (int)myEnumerator.Key, result);
+ stream.sendMsg(IvyStream.MessageType.Msg, bind.key, array);
count++;
}
}
@@ -241,8 +254,8 @@ namespace IvyBus
return ;
// Tell Thread to stop.
running = false;
- if ( in_stream != null )
- in_stream.Close();
+ if ( stream != null )
+ stream.Close();
if ( Thread.CurrentThread != clientThread )
{
// Wait for Thread to end.
@@ -269,7 +282,10 @@ namespace IvyBus
/// </summary>
private void Run()
{
- String msg = null;
+ IvyStream.MessageType type;
+ int id;
+ string[] args;
+
try
{
traceDebug("Connected from " + RemoteAddress + ":" + RemotePort);
@@ -281,22 +297,20 @@ namespace IvyBus
traceDebug("Thread started");
bool running = true;
- while (running)
+ while ( running && (stream != null) )
{
try
{
- if ((msg = in_stream.ReadLine()) != null)
+ if ( stream.receiveMsg(out type, out id, out args) )
{
// early stop during readLine()
if (doping && (pingerThread != null))
pingerThread.Interrupt();
- if ( msg.Length == 0 )
- Console.Error.WriteLine("Should close");
- newParseMsg(msg);
+ DispatchMsg(type, id, args);
}
else
{
- traceDebug("readline null ! leaving the thead");
+ traceDebug("readline null ! leaving the thread");
break;
}
}
@@ -336,8 +350,11 @@ namespace IvyBus
break;
}
}
- in_stream.Close();
- in_stream = null;
+ if ( stream != null )
+ {
+ stream.Close();
+ stream = null;
+ }
traceDebug("normally Disconnected from " + appName);
traceDebug("Thread stopped");
// invokes the Disconnected applicationListeners
@@ -347,252 +364,224 @@ namespace IvyBus
}
- private void sendBuffer(String buffer)
+
+ private void recvDie( int id, string[] arg )
{
- buffer += "\n";
+ traceDebug("received die Message from " + appName);
+ // first, I'm not a first class IvyClient any more
+ bus.removeClient(this);
+ // invokes the die applicationListeners
+ bool dontkillapp = bus.FireDie(this, id);
+ // makes the bus die
+ bus.stop();
try
{
- if ( out_stream != null )
- {
- out_stream.Write(buffer);
- out_stream.Flush();
- }
+ close(false);
}
- catch (IOException e)
+ catch (IOException ioe)
{
- traceDebug("I can't send my message to this client. He probably left");
- // first, I'm not a first class IvyClient any more
- bus.removeClient(this);
- // invokes the Disconnected applicationListeners
- bus.FireClientDisconnected(this);
- try
- {
- close(false);
- }
- catch (IOException ioe)
- {
- throw new IvyException("close failed" + ioe.Message);
- }
+ throw new IvyException(ioe.Message);
}
+ if ( !dontkillapp )
+ Environment.Exit( -1 );
}
-
- private void send(MessageType type, int id, String arg)
+ private void recvBye( int id, string[] arg )
{
+ // the peer quits
+ traceDebug("received bye Message from " + appName);
+ // first, I'm not a first class IvyClient any more
+ bus.removeClient(this);
+ // invokes the disconnect applicationListeners
+ //bus.FireClientDisconnected(this); done in Running Thread
try
{
- sendBuffer((int)type + " " + id + StartArg + arg);
+ close(false); // will fire diconnected
}
- catch (IvyException ie)
+ catch (IOException ioe)
{
- Console.Error.WriteLine("received an exception: " + ie.Message);
- Console.Error.WriteLine(ie.StackTrace);
+ throw new IvyException(ioe.Message);
}
}
-
- private void send(MessageType type, Int32 id, Match result)
+ private void recvAddRegexp( int id, string[] arg )
{
- String buffer = (int)type + " " + id + StartArg;
- // Start at 1 because group 0 represent entire matching
- for (int sub = 1; sub < result.Groups.Count; sub++)
+ string regexp = arg[0];
+ if (bus.CheckRegexp(regexp))
{
- buffer += result.Groups[sub] + EndArg;
- }
- try
- {
- sendBuffer(buffer);
+ try
+ {
+ IvyBindingRegexp bind = new IvyBindingRegexp(id,regexp);
+ lock( bindings.SyncRoot )
+ {
+ bindings.Add( id, bind);
+ }
+ bus.FireClientAddBinding( this, bind.expression );
+ }
+ catch (ArgumentException e)
+ {
+ throw new IvyException("regexp error " + e.Message);
+ }
}
- catch (IvyException ie)
+ else
{
- Console.Error.WriteLine("received an exception: " + ie.Message);
- Console.Error.WriteLine(ie.StackTrace);
+ traceDebug("regexp Warning exp='" + arg + "' can't match removing from " + appName);
}
}
-
- private void newParseMsg(String s)
+ private void recvDelRegexp( int id, string[] arg )
{
- int from = 0, to = 0;
- MessageType msgType;
- int msgId;
- while ((to < s.Length) && (s[to] != ' '))
+ lock( bindings.SyncRoot )
{
- to++;
+ IvyBindingBase bind = (IvyBindingBase) bindings[id];
+ bus.FireClientRemoveBinding( this, bind.expression );
+ bindings.Remove(id);
}
- if (to >= s.Length)
- throw new IvyException("protocol error");
+
+ }
+ private void recvAddBinding( int id, string[] arg )
+ {
+ string expression = arg[0];
+
try
{
- msgType = (MessageType)Int32.Parse(s.Substring(from, (to) - (from)));
+ IvyBindingSimple bind = new IvyBindingSimple(id, expression);
+
+ lock( bindings.SyncRoot )
+ {
+ bindings.Add( id, bind);
+ }
+ bus.FireClientAddBinding( this, bind.expression );
}
- catch (FormatException nfe)
+ catch (ArgumentException e)
{
- throw new IvyException("protocol error on msgType");
+ throw new IvyException("regexp error " + e.Message);
}
- from = to + 1;
- while ((to < s.Length) && (s[to] != 2))
+
+ }
+ private void recvDelBinding( int id, string[] arg )
+ {
+ lock( bindings.SyncRoot )
{
- to++;
+ IvyBindingBase bind = (IvyBindingBase) bindings[id];
+ bus.FireClientRemoveBinding( this, bind.expression );
+ bindings.Remove(id);
}
- if (to >= s.Length)
- throw new IvyException("protocol error");
- try
+ }
+ private void recvMsg( int id, string[] arg )
+ {
+ bus.callCallback(this, id, arg);
+ }
+ private void recvError( int id, string[] arg )
+ {
+ traceDebug("Error msg " + id + " " + arg);
+ }
+ private void recvApplicationId( int id, string[] arg )
+ {
+ clientId = arg[0];
+ if ( clientPriority != id )
{
- msgId = Int32.Parse(s.Substring(from, (to) - (from)));
+ clientPriority = id;
+ bus.SortClients();
}
- catch (FormatException nfe)
+ }
+ private void recvEndRegexp( int id, string[] arg )
+ {
+ bus.FireClientConnected(this);
+ /*
+ * the peer is perhaps not ready to handle this message
+ * an assymetric processing should be written
+ */
+ if (bus.ready_message != null)
+ sendMsg(bus.ready_message);
+ }
+ private void recvStartRegexp( int id, string[] arg )
+ {
+ appName = arg[0];
+ appPort = id;
+ if (bus.checkConnected(this))
{
- throw new IvyException("protocol error on identifier");
+ try
+ {
+ close(false);
+ }
+ catch (IOException ioe)
+ {
+ throw new IvyException("io " + ioe.Message);
+ }
+ throw new IvyException("Rare ! A concurrent connect occured");
}
- from = to + 1;
+
+ }
+ private void recvDirectMsg( int id, string[] arg )
+ {
+ bus.FireDirectMessage(this, id, arg[0]);
+ }
+ private void recvPing( int id, string[] arg )
+ {
+ // I receive a ping. I can answer a pong.
+ traceDebug("Ping msg from " + appName + " : " + arg);
+ sendPong(arg[0]);
+ }
+ private void recvPong( int id, string[] arg )
+ {
+ traceDebug("Ping msg from " + appName + " : " + arg);
+ }
+ private void DispatchMsg(IvyStream.MessageType msgType, int msgId, string[] msgData)
+ {
+
switch (msgType)
{
- case MessageType.Die:
- traceDebug("received die Message from " + appName);
- // first, I'm not a first class IvyClient any more
- bus.removeClient(this);
- // invokes the die applicationListeners
- bool dontkillapp = bus.FireDie(this, msgId);
- // makes the bus die
- bus.stop();
- try
- {
- close(false);
- }
- catch (IOException ioe)
- {
- throw new IvyException(ioe.Message);
- }
- if ( !dontkillapp )
- Environment.Exit( -1 );
+ case IvyStream.MessageType.Die:
+ recvDie( msgId, msgData );
break;
- case MessageType.Bye:
- // the peer quits
- traceDebug("received bye Message from " + appName);
- // first, I'm not a first class IvyClient any more
- bus.removeClient(this);
- // invokes the disconnect applicationListeners
- //bus.FireClientDisconnected(this); done in Running Thread
- try
- {
- close(false); // will fire diconnected
- }
- catch (IOException ioe)
- {
- throw new IvyException(ioe.Message);
- }
+ case IvyStream.MessageType.Bye:
+ recvBye( msgId, msgData );
break;
- case MessageType.AddRegexp:
- String regexp = s.Substring(from, (s.Length) - (from));
- if (bus.CheckRegexp(regexp))
- {
- try
- {
- lock( regexp_in.SyncRoot )
- {
- regexp_in.Add( msgId, new Regex(regexp,RegexOptions.Compiled|RegexOptions.IgnoreCase));
- }
- lock( regexp_text.SyncRoot )
- {
- regexp_text.Add( msgId, regexp);
- }
- }
- catch (ArgumentException e)
- {
- throw new IvyException("regexp error " + e.Message);
- }
- }
- else
- {
- traceDebug("regexp Warning exp='" + regexp + "' can't match removing from " + appName);
- }
+ case IvyStream.MessageType.AddRegexp:
+ recvAddRegexp( msgId, msgData );
break;
- case MessageType.DelRegexp:
- lock( regexp_in.SyncRoot )
- {
- regexp_in.Remove(msgId);
- }
- lock( regexp_text )
- {
- regexp_text.Remove( msgId );
- }
+ case IvyStream.MessageType.DelRegexp:
+ recvDelRegexp( msgId, msgData );
+ break;
+ case IvyStream.MessageType.AddBinding:
+ recvAddBinding( msgId, msgData );
break;
- case MessageType.EndRegexp:
- bus.FireClientConnected(this);
- /*
- * the peer is perhaps not ready to handle this message
- * an assymetric processing should be written
- */
- if (bus.ready_message != null)
- sendMsg(bus.ready_message);
+ case IvyStream.MessageType.DelBinding:
+ recvDelBinding( msgId, msgData );
break;
- case MessageType.Msg:
- ArrayList v = new ArrayList();
- while (to < s.Length)
- {
- while ((to < s.Length) && (s[to] != 3))
- {
- to++;
- }
- if (to < s.Length)
- {
- v.Add(s.Substring(from, (to) - (from)));
- to++;
- from = to;
- }
- }
- String[] tab = new String[v.Count];
- for ( int i2 = 0; i2 < v.Count; i2++)
- {
- tab[i2] = (String) v[i2];
- // for developpemnt purposes
- // out.println(" *"+tab[i]+"* "+(tab[i]).length());
- }
- bus.callCallback(this, msgId, tab);
+ case IvyStream.MessageType.EndRegexp:
+ recvEndRegexp( msgId, msgData );
break;
- case MessageType.Pong:
- String paramPong = s.Substring(from, (s.Length) - (from));
- traceDebug("Ping msg from " + appName + " : " + paramPong);
+ case IvyStream.MessageType.Msg:
+ recvMsg( msgId, msgData );
break;
- case MessageType.Ping:
- // I receive a ping. I can answer a pong.
- String param = s.Substring(from, (s.Length) - (from));
- traceDebug("Ping msg from " + appName + " : " + param);
- sendPong(param);
+ case IvyStream.MessageType.Pong:
+ recvPong( msgId, msgData );
break;
- case MessageType.Error:
- String error = s.Substring(from, (s.Length) - (from));
- traceDebug("Error msg " + msgId + " " + error);
+ case IvyStream.MessageType.Ping:
+ recvPing( msgId, msgData );
break;
- case MessageType.StartRegexp:
- appName = s.Substring(from, (s.Length) - (from));
- appPort = msgId;
- if (bus.checkConnected(this))
- {
- try
- {
- close(false);
- }
- catch (IOException ioe)
- {
- throw new IvyException("io " + ioe.Message);
- }
- throw new IvyException("Rare ! A concurrent connect occured");
- }
+ case IvyStream.MessageType.Error:
+ recvError( msgId, msgData );
break;
- case MessageType.DirectMsg:
- String direct = s.Substring(from, (s.Length) - (from));
- bus.FireDirectMessage(this, msgId, direct);
+ case IvyStream.MessageType.StartRegexp:
+ recvStartRegexp( msgId, msgData );
break;
+ case IvyStream.MessageType.DirectMsg:
+ recvDirectMsg( msgId, msgData );
+ break;
+ case IvyStream.MessageType.ApplicationId:
+ recvApplicationId( msgId, msgData );
+ break;
default:
throw new IvyException("protocol error, unknown message type " + msgType);
@@ -601,29 +590,29 @@ namespace IvyBus
private void sendPong(String s)
{
- send(MessageType.Pong, 0, s);
+ stream.sendMsg(IvyStream.MessageType.Pong, 0, s);
}
public void sendPing(String s)
{
- send(MessageType.Ping, 0, s);
+ stream.sendMsg(IvyStream.MessageType.Ping, 0, s);
}
private void sendBye()
{
- send(MessageType.Bye, 0, "");
+ stream.sendMsg(IvyStream.MessageType.Bye, 0, "");
}
private void sendBye(String message)
{
- send(MessageType.Bye, 0, message);
+ stream.sendMsg(IvyStream.MessageType.Bye, 0, message);
}
public void sendDie()
{
- send(MessageType.Die, 0, "");
+ stream.sendMsg(IvyStream.MessageType.Die, 0, "");
}
private void sendDie(String message)
{
- send(MessageType.Die, 0, message);
+ stream.sendMsg(IvyStream.MessageType.Die, 0, message);
}
@@ -661,6 +650,5 @@ namespace IvyBus
isPinging = false;
pingerThread.Interrupt();
}
-
}
} \ No newline at end of file