/// François-Régis Colin
/// http://www.tls.cena.fr/products/ivy/
/// *
/// (C) CENA
/// *
namespace IvyBus
{
using System;
using System.Collections;
using System.Threading;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Configuration;
/// A Class for the the peers on the bus.
///
///
/// each time a connexion is made with a remote peer, the regexp are exchanged
/// once ready, a ready message is sent, and then we can send messages,
/// die messages, direct messages, add or remove regexps, or quit. A thread is
/// created for each remote client.
///
public class IvyClient
{
public class IvyClientPriority : IComparer
{
// Calls CaseInsensitiveComparer.Compare with the parameters reversed.
int IComparer.Compare( Object x, Object y )
{
IvyClient c1 = (IvyClient)x;
IvyClient c2 = (IvyClient)y;
return( c2.clientPriority - c1.clientPriority );
}
}
public String ApplicationName
{
get
{
return appName;
}
}
public string[] Regexps
{
get
{
int i = 0;
String[] tab = new String[bindings.Count];
foreach( IvyBindingBase bind in bindings.Values )
tab[i++] = bind.expression;
return tab;
}
}
internal int AppPort
{
get
{
return appPort;
}
}
public String RemoteAddress
{
get
{
IPHostEntry hostInfo = Dns.GetHostByAddress(((IPEndPoint)socket.RemoteEndPoint).Address);
return hostInfo.HostName;
}
}
public int RemotePort
{
get
{
return ((IPEndPoint)socket.RemoteEndPoint).Port;
}
}
private Ivy bus;
private Socket socket;
private IvyStream stream;
private Hashtable bindings;
private int appPort;
private string clientId; /* an unique ID for each IvyClient */
private int clientPriority; /* client priority */
private bool peerCalling;
private volatile bool running = false;
private volatile Thread clientThread; // volatile to ensure the quick communication
private static bool doping = (ConfigurationSettings.AppSettings["IVY_PING"] != null);
private const int PINGTIMEOUT = 5000;
private volatile Thread pingerThread;
// protected variables
internal String appName;
internal IvyClient(Ivy bus, Socket socket)
{
bindings = Hashtable.Synchronized(new Hashtable());
appName = "Unknown";
appPort = 0;
this.bus = bus;
this.socket = socket;
socket.SetSocketOption( SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, 1 );
stream = new IvyStream( socket );
clientPriority = Ivy.DEFAULT_PRIORITY;
sendApplicationId();
// sends our ID, whether we initiated the connexion or not
// the ID is the couple "host name,application Port", the host name
// information is in the socket itself, the port is not known if we
// initiate the connexion
stream.sendMsg(IvyStream.MessageType.StartRegexp, bus.applicationPort, bus.appName);
// sends our regexps to the peer
lock( bus.bindings.SyncRoot )
{
foreach (Ivy.ApplicationBinding bind in bus.bindings.Values )
{
sendBinding(bind);
}
}
stream.sendMsg(IvyStream.MessageType.EndRegexp, 0, "");
// spawns a thread to manage the incoming traffic on this
// socket. We should be ready to receive messages now.
clientThread = new Thread(new ThreadStart(this.Run));
clientThread.Name = "Ivy Tcp Client Reader Thread";
clientThread.Start();
if (doping)
{
pingerThread = new Thread(new ThreadStart(PingerRun));
pingerThread.Name = "Ivy Pinger Thread";
pingerThread.Start();
}
}
/// returns the name of the remote agent.
/// allow an Ivy package class to access the list of regexps at a
/// given time.
/// perhaps we should implement a new IvyApplicationListener method to
/// allow the notification of regexp addition and deletion
///
internal void sendApplicationId()
{
stream.sendMsg( IvyStream.MessageType.ApplicationId, bus.applicationPriority, bus.AppId);
}
internal void sendBinding(Ivy.ApplicationBinding bind)
{
stream.sendMsg(bind.type == Ivy.BindingType.BindRegexp ? IvyStream.MessageType.AddRegexp: IvyStream.MessageType.AddBinding, bind.key, bind.regexp); /* perhaps we should perform some checking here */
}
internal void delBinding(Ivy.ApplicationBinding bind)
{
stream.sendMsg(bind.type == Ivy.BindingType.BindRegexp ? IvyStream.MessageType.DelRegexp: IvyStream.MessageType.DelBinding, bind.key, "");
}
/// sends a direct message to the peer
///
/// the numeric value provided to the remote client
///
/// the string that will be match-tested
///
///
public void sendDirectMsg(int id, String message)
{
try
{
stream.sendMsg(IvyStream.MessageType.DirectMsg, id, message);
}
catch (IOException e)
{
traceDebug("I can't send my message to this client. He probably left");
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the Disconnected applicationListeners
bus.FireClientDisconnected(this);
try
{
close(false);
}
catch (IOException ioe)
{
throw new IvyException("close failed" + ioe.Message);
}
}
}
/// closes the connexion to the peer.
///
/// should I send Bye message ?
/// the thread managing the socket is stopped
///
///
internal void close(bool notify)
{
traceDebug("closing connexion to " + appName);
if (doping )
{
stopPinging();
}
if (notify)
sendBye("hasta la vista");
stopListening();
// bus.clientDisconnect(this);
// in_stream close in the thread
if ( stream != null )
{
stream.Close();
stream = null;
//socket.Close(); // pris en charge par stream ( NetWorkStream )
socket = null;
}
}
/// sends the substrings of a message to the peer for each matching regexp.
///
/// the string that will be match-tested
///
/// the number of messages sent to the peer
///
///
internal int sendMsg(String message)
{
int count = 0;
lock( bindings.SyncRoot )
{
IvyBindingSimple.Prepare( message );
foreach (IvyBindingBase bind in bindings.Values )
{
string[] array = bind.Match(message);
if (array!=null)
{
stream.sendMsg(IvyStream.MessageType.Msg, bind.key, array);
count++;
}
}
}
return count;
}
internal void stopListening()
{
if (clientThread == null)
return ;
// Tell Thread to stop.
running = false;
if ( stream != null )
stream.Close();
if ( Thread.CurrentThread != clientThread )
{
// Wait for Thread to end.
clientThread.Join();
}
clientThread = null;
}
/// compares two peers the id is the couple (host,service port).
///
/// the other peer
///
/// true if the peers are similir. This should not happen, it is bad
/// © ® (tm)
///
///
internal bool sameClient(IvyClient clnt)
{
return (appPort != 0 && appPort == clnt.appPort) && (RemoteAddress == clnt.RemoteAddress);
}
/// the code of the thread handling the incoming messages.
///
private void Run()
{
IvyStream.MessageType type;
int id;
string[] args;
try
{
traceDebug("Connected from " + RemoteAddress + ":" + RemotePort);
}
catch (Exception ie)
{
traceDebug("Interrupted while resolving remote hostname");
}
traceDebug("Thread started");
bool running = true;
while ( running && (stream != null) )
{
try
{
if ( stream.receiveMsg(out type, out id, out args) )
{
// early stop during readLine()
if (doping && (pingerThread != null))
pingerThread.Interrupt();
DispatchMsg(type, id, args);
}
else
{
traceDebug("readline null ! leaving the thread");
break;
}
}
catch ( ObjectDisposedException e )
{
traceDebug( "ivyclient socket closed" );
running = false;
break;
}
catch (IvyException ie)
{
Console.Error.WriteLine( ie.Message );
Console.Error.WriteLine( ie.StackTrace);
running = false;
break;
}
catch (SocketException se)
{
traceDebug( "ivyclient socket closed" );
running = false;
break;
}
catch (IOException e)
{
if ( e.InnerException is SocketException )
{
traceDebug( "ivyclient socket closed" );
}
else
{
traceDebug("abnormally Disconnected from " + RemoteAddress + ":" + RemotePort);
Console.Error.WriteLine( e.Message );
Console.Error.WriteLine( e.StackTrace);
}
running = false;
break;
}
}
if ( stream != null )
{
stream.Close();
stream = null;
}
traceDebug("normally Disconnected from " + appName);
traceDebug("Thread stopped");
// invokes the Disconnected applicationListeners
bus.FireClientDisconnected(this);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
}
private void recvDie( int id, string[] arg )
{
traceDebug("received die Message from " + appName);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
bool dontkillapp = bus.FireDie(this, id);
// makes the bus die
bus.stop();
try
{
close(false);
}
catch (IOException ioe)
{
throw new IvyException(ioe.Message);
}
if ( !dontkillapp )
Environment.Exit( -1 );
}
private void recvBye( int id, string[] arg )
{
// the peer quits
traceDebug("received bye Message from " + appName);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the disconnect applicationListeners
//bus.FireClientDisconnected(this); done in Running Thread
try
{
close(false); // will fire diconnected
}
catch (IOException ioe)
{
throw new IvyException(ioe.Message);
}
}
private void recvAddRegexp( int id, string[] arg )
{
string regexp = arg[0];
if (bus.CheckRegexp(regexp))
{
try
{
IvyBindingRegexp bind = new IvyBindingRegexp(id,regexp);
lock( bindings.SyncRoot )
{
bindings.Add( id, bind);
}
bus.FireClientAddBinding( this, bind.expression );
}
catch (ArgumentException e)
{
throw new IvyException("regexp error " + e.Message);
}
}
else
{
traceDebug("regexp Warning exp='" + arg + "' can't match removing from " + appName);
}
}
private void recvDelRegexp( int id, string[] arg )
{
lock( bindings.SyncRoot )
{
IvyBindingBase bind = (IvyBindingBase) bindings[id];
bus.FireClientRemoveBinding( this, bind.expression );
bindings.Remove(id);
}
}
private void recvAddBinding( int id, string[] arg )
{
string expression = arg[0];
try
{
IvyBindingSimple bind = new IvyBindingSimple(id, expression);
lock( bindings.SyncRoot )
{
bindings.Add( id, bind);
}
bus.FireClientAddBinding( this, bind.expression );
}
catch (ArgumentException e)
{
throw new IvyException("regexp error " + e.Message);
}
}
private void recvDelBinding( int id, string[] arg )
{
lock( bindings.SyncRoot )
{
IvyBindingBase bind = (IvyBindingBase) bindings[id];
bus.FireClientRemoveBinding( this, bind.expression );
bindings.Remove(id);
}
}
private void recvMsg( int id, string[] arg )
{
bus.callCallback(this, id, arg);
}
private void recvError( int id, string[] arg )
{
traceDebug("Error msg " + id + " " + arg);
}
private void recvApplicationId( int id, string[] arg )
{
clientId = arg[0];
if ( clientPriority != id )
{
clientPriority = id;
bus.SortClients();
}
}
private void recvEndRegexp( int id, string[] arg )
{
bus.FireClientConnected(this);
/*
* the peer is perhaps not ready to handle this message
* an assymetric processing should be written
*/
if (bus.ready_message != null)
sendMsg(bus.ready_message);
}
private void recvStartRegexp( int id, string[] arg )
{
appName = arg[0];
appPort = id;
if (bus.checkConnected(this))
{
try
{
close(false);
}
catch (IOException ioe)
{
throw new IvyException("io " + ioe.Message);
}
throw new IvyException("Rare ! A concurrent connect occured");
}
}
private void recvDirectMsg( int id, string[] arg )
{
bus.FireDirectMessage(this, id, arg[0]);
}
private void recvPing( int id, string[] arg )
{
// I receive a ping. I can answer a pong.
traceDebug("Ping msg from " + appName + " : " + arg);
sendPong(arg[0]);
}
private void recvPong( int id, string[] arg )
{
traceDebug("Ping msg from " + appName + " : " + arg);
}
private void DispatchMsg(IvyStream.MessageType msgType, int msgId, string[] msgData)
{
switch (msgType)
{
case IvyStream.MessageType.Die:
recvDie( msgId, msgData );
break;
case IvyStream.MessageType.Bye:
recvBye( msgId, msgData );
break;
case IvyStream.MessageType.AddRegexp:
recvAddRegexp( msgId, msgData );
break;
case IvyStream.MessageType.DelRegexp:
recvDelRegexp( msgId, msgData );
break;
case IvyStream.MessageType.AddBinding:
recvAddBinding( msgId, msgData );
break;
case IvyStream.MessageType.DelBinding:
recvDelBinding( msgId, msgData );
break;
case IvyStream.MessageType.EndRegexp:
recvEndRegexp( msgId, msgData );
break;
case IvyStream.MessageType.Msg:
recvMsg( msgId, msgData );
break;
case IvyStream.MessageType.Pong:
recvPong( msgId, msgData );
break;
case IvyStream.MessageType.Ping:
recvPing( msgId, msgData );
break;
case IvyStream.MessageType.Error:
recvError( msgId, msgData );
break;
case IvyStream.MessageType.StartRegexp:
recvStartRegexp( msgId, msgData );
break;
case IvyStream.MessageType.DirectMsg:
recvDirectMsg( msgId, msgData );
break;
case IvyStream.MessageType.ApplicationId:
recvApplicationId( msgId, msgData );
break;
default:
throw new IvyException("protocol error, unknown message type " + msgType);
}
}
private void sendPong(String s)
{
stream.sendMsg(IvyStream.MessageType.Pong, 0, s);
}
public void sendPing(String s)
{
stream.sendMsg(IvyStream.MessageType.Ping, 0, s);
}
private void sendBye()
{
stream.sendMsg(IvyStream.MessageType.Bye, 0, "");
}
private void sendBye(String message)
{
stream.sendMsg(IvyStream.MessageType.Bye, 0, message);
}
public void sendDie()
{
stream.sendMsg(IvyStream.MessageType.Die, 0, "");
}
private void sendDie(String message)
{
stream.sendMsg(IvyStream.MessageType.Die, 0, message);
}
public override String ToString()
{
return "IvyClient " + bus.appName + ":" + appName;
}
private void traceDebug(String s)
{
if (bus.Debug)
Console.Out.WriteLine("-->IvyClient " + bus.appName + ":" + appName + "<-- " + s);
}
internal bool isPinging = false;
private void PingerRun()
{
isPinging = true;
traceDebug("Pinger Thread started");
while (isPinging)
{
try
{
Thread.Sleep(new TimeSpan(10000 * PINGTIMEOUT));
sendPing("are you here ?");
}
catch (ThreadInterruptedException ie)
{
}
}
traceDebug("Pinger Thread stopped");
}
public virtual void stopPinging()
{
isPinging = false;
pingerThread.Interrupt();
}
}
}