/// A private Class for the the peers on the bus.
/// *
///
/// François-Régis Colin
///
/// http://www.tls.cena.fr/products/ivy/
/// *
/// each time a connexion is made with a remote peer, the regexp are exchanged
/// once ready, a ready message is sent, and then we can send messages,
/// die messages, direct messages, add or remove regexps, or quit. A thread is
/// created for each remote client.
/// *
///
///
namespace IvyBus
{
using System;
using System.Text.RegularExpressions;
using System.Text;
using System.Collections;
using System.Threading;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Configuration;
internal class MyTcpClient : TcpClient
{
public IPAddress RemoteAddress
{
get
{
IPEndPoint ep = (IPEndPoint)Client.RemoteEndPoint;
return ep.Address;
}
}
public int RemotePort
{
get
{
IPEndPoint ep = (IPEndPoint)Client.RemoteEndPoint;
return ep.Port;
}
}
public MyTcpClient(string host,int port):base(host,port)
{
}
public MyTcpClient(Socket sock)
{
Client = sock;
}
}
public class IvyClient
{
public String ApplicationName
{
get
{
return appName;
}
}
internal Int32 ClientKey
{
get
{
return clientKey;
}
}
public ICollection Regexps
{
get
{
return regexp_text.Values;
}
}
internal int AppPort
{
get
{
return appPort;
}
}
private String RemoteAddress
{
get
{
return socket.RemoteAddress.ToString();
}
}
private String RemotePort
{
get
{
return socket.RemotePort.ToString();
}
}
/* the protocol magic numbers */
internal enum MessageType : int
{
Bye = 0, /* end of the peer */
AddRegexp = 1, /* the peer adds a regexp */
Msg = 2, /* the peer sends a message */
Error = 3, /* error message */
DelRegexp = 4, /* the peer removes one of his regex */
EndRegexp = 5, /* no more regexp in the handshake */
StartRegexp = 6, /* avoid race condition in concurrent connexions */
DirectMsg = 7, /* the peer sends a direct message */
Die = 8, /* the peer wants us to quit */
Ping = 9, /* checks the presence of the other */
Pong = 10 /* checks the presence of the other */
};
internal const String MESSAGE_TERMINATOR = "\n"; /* the next protocol will use \r */
internal const String StartArg = "\u0002"; /* begin of arguments */
internal const String EndArg = "\u0003"; /* end of arguments */
private static int clientSerial = 0; /* an unique ID for each IvyClient */
private Ivy bus;
private MyTcpClient socket;
private StreamReader in_stream;
private StreamWriter out_stream;
private Hashtable regexp_in;
private Hashtable regexp_text;
private int appPort;
private bool peerCalling;
private volatile Thread clientThread; // volatile to ensure the quick communication
private Int32 clientKey;
private static bool doping = (ConfigurationSettings.AppSettings["IVY_PING"] != null);
private const int PINGTIMEOUT = 5000;
private volatile Thread pingerThread;
// protected variables
internal String appName;
internal IvyClient(Ivy bus, MyTcpClient socket)
{
regexp_in = new Hashtable();
regexp_text = new Hashtable();
appName = "Unknown";
appPort = 0;
this.bus = bus;
this.socket = socket;
clientKey = clientSerial++;
in_stream = new StreamReader(socket.GetStream(),System.Text.Encoding.ASCII);
out_stream = new StreamWriter(socket.GetStream(),System.Text.Encoding.ASCII);
// sends our ID, whether we initiated the connexion or not
// the ID is the couple "host name,application Port", the host name
// information is in the socket itself, the port is not known if we
// initiate the connexion
send(MessageType.StartRegexp, bus.applicationPort, bus.appName);
// sends our regexps to the peer
foreach (Int32 ikey in bus.regexp_out.Keys )
{
sendRegexp(ikey, (String) bus.regexp_out[ikey]);
}
send(MessageType.EndRegexp, 0, "");
// spawns a thread to manage the incoming traffic on this
// socket. We should be ready to receive messages now.
clientThread = new Thread(new ThreadStart(this.Run));
clientThread.Start();
if (doping)
{
pingerThread = new Thread(new ThreadStart(PingerRun));
pingerThread.Start();
}
}
/// returns the name of the remote agent.
///
/// allow an Ivy package class to access the list of regexps at a
/// given time.
/// perhaps we should implement a new IvyApplicationListener method to
/// allow the notification of regexp addition and deletion
///
internal void sendRegexp(int id, String regexp)
{
send(MessageType.AddRegexp, id, regexp); /* perhaps we should perform some checking here */
}
internal void delRegexp(int id)
{
send(MessageType.DelRegexp, id, "");
}
/// sends a direct message to the peer
///
/// the numeric value provided to the remote client
///
/// the string that will be match-tested
///
///
internal void sendDirectMsg(int id, String message)
{
send(MessageType.DirectMsg, id, message);
}
/// closes the connexion to the peer.
///
/// should I send Bye message ?
/// the thread managing the socket is stopped
///
///
internal void close(bool notify)
{
traceDebug("closing connexion to " + appName);
if (doping )
{
stopPinging();
}
if (notify)
sendBye("hasta la vista");
stopListening();
// bus.clientDisconnect(this);
socket.Close(); // should I also close in and out ?
}
/// sends the substrings of a message to the peer for each matching regexp.
///
/// the string that will be match-tested
///
/// the number of messages sent to the peer
///
///
internal int sendMsg(String message)
{
int count = 0;
foreach (Int32 key in regexp_in.Keys )
{
Regex regexp = (Regex) regexp_in[key];
Match result = regexp.Match(message);
if (result.Success)
{
send(MessageType.Msg, key, result);
count++;
}
}
return count;
}
internal void stopListening()
{
Thread t = clientThread;
if (t == null)
return ;
// we can be summoned to quit from two path at a time
clientThread = null;
t.Interrupt();
}
/// compares two peers the id is the couple (host,service port).
///
/// the other peer
///
/// true if the peers are similir. This should not happen, it is bad
/// © ® (tm)
///
///
internal bool sameClient(IvyClient clnt)
{
return (appPort != 0 && appPort == clnt.appPort) && (RemoteAddress == clnt.RemoteAddress);
}
/// the code of the thread handling the incoming messages.
///
private void Run()
{
String msg = null;
try
{
traceDebug("Connected from " + RemoteAddress + ":" + RemotePort);
}
catch (Exception ie)
{
traceDebug("Interrupted while resolving remote hostname");
}
traceDebug("Thread started");
bool running = true;
while (running)
{
try
{
if ((msg = in_stream.ReadLine()) != null)
{
// early stop during readLine()
if (doping && (pingerThread != null))
pingerThread.Interrupt();
newParseMsg(msg);
}
else
{
traceDebug("readline null ! leaving the thead");
break;
}
}
catch (IvyException ie)
{
Console.Error.WriteLine( ie.Message );
Console.Error.WriteLine( ie.StackTrace);
running = false;
}
catch (IOException e)
{
traceDebug("abnormally Disconnected from " + RemoteAddress + ":" + RemotePort);
Console.Error.WriteLine( e.Message );
Console.Error.WriteLine( e.StackTrace);
running = false;
break;
}
}
traceDebug("normally Disconnected from " + appName);
traceDebug("Thread stopped");
// first, I'm not a first class IvyClient any more
//bus.removeClient(this);
// invokes the Disconnected applicationListeners
bus.FireClientDisconnected(this);
}
private void sendBuffer(String buffer)
{
buffer += "\n";
try
{
out_stream.Write(buffer);
out_stream.Flush();
}
catch (IOException e)
{
traceDebug("I can't send my message to this client. He probably left");
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the Disconnected applicationListeners
bus.FireClientDisconnected(this);
try
{
close(false);
}
catch (IOException ioe)
{
throw new IvyException("close failed" + ioe.Message);
}
}
}
private void send(MessageType type, int id, String arg)
{
try
{
sendBuffer((int)type + " " + id + StartArg + arg);
}
catch (IvyException ie)
{
Console.Error.WriteLine("received an exception: " + ie.Message);
Console.Error.WriteLine(ie.StackTrace);
}
}
private void send(MessageType type, Int32 id, Match result)
{
String buffer = (int)type + " " + id + StartArg;
// Start at 1 because group 0 represent entire matching
for (int sub = 1; sub < result.Groups.Count; sub++)
{
buffer += result.Groups[sub] + EndArg;
}
try
{
sendBuffer(buffer);
}
catch (IvyException ie)
{
Console.Error.WriteLine("received an exception: " + ie.Message);
Console.Error.WriteLine(ie.StackTrace);
}
}
private void newParseMsg(String s)
{
int from = 0, to = 0;
MessageType msgType;
int msgId;
while ((to < s.Length) && (s[to] != ' '))
{
to++;
}
if (to >= s.Length)
throw new IvyException("protocol error");
try
{
msgType = (MessageType)Int32.Parse(s.Substring(from, (to) - (from)));
}
catch (FormatException nfe)
{
throw new IvyException("protocol error on msgType");
}
from = to + 1;
while ((to < s.Length) && (s[to] != 2))
{
to++;
}
if (to >= s.Length)
throw new IvyException("protocol error");
try
{
msgId = Int32.Parse(s.Substring(from, (to) - (from)));
}
catch (FormatException nfe)
{
throw new IvyException("protocol error on identifier");
}
from = to + 1;
switch (msgType)
{
case MessageType.Die:
traceDebug("received die Message from " + appName);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the die applicationListeners
bool dontkillapp = bus.FireDie(this, msgId);
// makes the bus die
bus.stop();
try
{
close(false);
}
catch (IOException ioe)
{
throw new IvyException(ioe.Message);
}
if ( !dontkillapp )
Environment.Exit( -1 );
break;
case MessageType.Bye:
// the peer quits
traceDebug("received bye Message from " + appName);
// first, I'm not a first class IvyClient any more
bus.removeClient(this);
// invokes the disconnect applicationListeners
//bus.FireClientDisconnected(this); done in Running Thread
try
{
close(false); // will fire diconnected
}
catch (IOException ioe)
{
throw new IvyException(ioe.Message);
}
break;
case MessageType.AddRegexp:
String regexp = s.Substring(from, (s.Length) - (from));
if (bus.CheckRegexp(regexp))
{
try
{
regexp_in.Add( msgId, new Regex(regexp));
regexp_text.Add( msgId, regexp);
}
catch (ArgumentException e)
{
throw new IvyException("regexp error " + e.Message);
}
}
else
{
throw new IvyException("regexp Warning exp='" + regexp + "' can't match removing from " + appName);
}
break;
case MessageType.DelRegexp:
regexp_in.Remove(msgId);
regexp_text.Remove( msgId );
break;
case MessageType.EndRegexp:
bus.FireClientConnected(this);
/*
* the peer is perhaps not ready to handle this message
* an assymetric processing should be written
*/
if (bus.ready_message != null)
sendMsg(bus.ready_message);
break;
case MessageType.Msg:
ArrayList v = new ArrayList();
while (to < s.Length)
{
while ((to < s.Length) && (s[to] != 3))
{
to++;
}
if (to < s.Length)
{
v.Add(s.Substring(from, (to) - (from)));
to++;
from = to;
}
}
String[] tab = new String[v.Count];
for ( int i2 = 0; i2 < v.Count; i2++)
{
tab[i2] = (String) v[i2];
// for developpemnt purposes
// out.println(" *"+tab[i]+"* "+(tab[i]).length());
}
bus.callCallback(this, msgId, tab);
break;
case MessageType.Pong:
String paramPong = s.Substring(from, (s.Length) - (from));
traceDebug("Ping msg from " + appName + " : " + paramPong);
break;
case MessageType.Ping:
// I receive a ping. I can answer a pong.
String param = s.Substring(from, (s.Length) - (from));
traceDebug("Ping msg from " + appName + " : " + param);
sendPong(param);
break;
case MessageType.Error:
String error = s.Substring(from, (s.Length) - (from));
traceDebug("Error msg " + msgId + " " + error);
break;
case MessageType.StartRegexp:
appName = s.Substring(from, (s.Length) - (from));
appPort = msgId;
if (bus.checkConnected(this))
{
try
{
close(false);
}
catch (IOException ioe)
{
throw new IvyException("io " + ioe.Message);
}
throw new IvyException("Rare ! A concurrent connect occured");
}
break;
case MessageType.DirectMsg:
String direct = s.Substring(from, (s.Length) - (from));
bus.FireDirectMessage(this, msgId, direct);
break;
default:
throw new IvyException("protocol error, unknown message type " + msgType);
}
}
private void sendPong(String s)
{
send(MessageType.Pong, 0, s);
}
private void sendPing(String s)
{
send(MessageType.Ping, 0, s);
}
private void sendBye()
{
send(MessageType.Bye, 0, "");
}
private void sendBye(String message)
{
send(MessageType.Bye, 0, message);
}
private void sendDie()
{
send(MessageType.Die, 0, "");
}
private void sendDie(String message)
{
send(MessageType.Die, 0, message);
}
public override String ToString()
{
return "IvyClient " + bus.appName + ":" + appName;
}
private void traceDebug(String s)
{
if (bus.Debug)
Console.Out.WriteLine("-->IvyClient " + bus.appName + ":" + appName + "<-- " + s);
}
internal bool isPinging = false;
private void PingerRun()
{
isPinging = true;
traceDebug("Pinger Thread started");
while (isPinging)
{
try
{
Thread.Sleep(new TimeSpan(10000 * PINGTIMEOUT));
sendPing("are you here ?");
}
catch (ThreadInterruptedException ie)
{
}
}
traceDebug("Pinger Thread stopped");
}
public virtual void stopPinging()
{
isPinging = false;
pingerThread.Interrupt();
}
}
}