/// François-Régis Colin
/// http://www.tls.cena.fr/products/ivy/
/// *
/// (C) CENA
/// *
namespace IvyBus
{
using System;
using System.IO;
using System.Collections;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Configuration;
using System.Windows.Forms;
using System.Globalization;
using System.Text.RegularExpressions;
/// The Main bus Class
///
public class Ivy
{
/* Event handler */
public delegate void DirectMessageHandler (IvyClient app, int id, string arg );
public delegate void ClientConnectedHandler (IvyClient app);
public delegate void ClientDisconnectedHandler (IvyClient app);
public delegate bool DieHandler (IvyClient app, int id );
public delegate void ClientAddBindingHandler (IvyClient app, string arg );
public delegate void ClientRemoveBindingHandler (IvyClient app, string arg );
public delegate void MessageHandler ( IvyClient app, string[] args );
/* Event */
/// fires when a new client connect to the bus
/// A callback handling the notification of connexions and
/// disconnections, may be null
///
///
public event ClientConnectedHandler clientConnected;
/// fires when a client discconnect from the bus
public event ClientDisconnectedHandler clientDisconnected;
/// fires when a client receive a direct message from another client
public event DirectMessageHandler directMessageReceived;
/// fires when somebody ask for killing every client on the bus
public event DieHandler dieReceived;
/// fires when a client receive a add binding from another client
public event ClientAddBindingHandler addBinding;
/// fires when a client receive a remove binding from another client
public event ClientAddBindingHandler removeBinding;
/// IvyClients accesses the list of the connected clients
public ArrayList IvyClients
{
get
{
return clients;
}
}
public bool Debug
{
get
{
return debug;
}
set
{
debug = value;
}
}
/// ClientNames accesses the name list of the connected clients
private string ClientNames
{
get
{
string s = appName + " clients are: ";
lock( clients.SyncRoot )
{
foreach (IvyClient client in clients )
{
s += client.ApplicationName + " ";
}
}
return s;
}
}
/// AppName the application name
public string AppName
{
get
{
return appName;
}
}
public string AppId
{
get
{
return applicationUniqueId;
}
}
public int AppPriority
{
set
{
applicationPriority = value;
lock ( clients.SyncRoot )
{
foreach (IvyClient client in clients )
{
client.sendApplicationId();
}
}
}
get
{
return applicationPriority;
}
}
///SentMessageClasses the first word token of sent messages
/// optimise the parsing process when sending messages
///
public string[] SentMessageClasses
{
get
{
return sent_messageClasses;
}
set
{
sent_messageClasses = value;
}
}
internal class MyTcpListener : TcpListener
{
public MyTcpListener(IPAddress address, int port) : base(address, port)
{
}
public bool IsActive()
{
return this.Active;
}
}
/// the name of the application on the bus
internal string appName;
/// the protocol version number
internal const int PROCOCOLVERSION = 4;
/// the port for the UDP rendez vous, if none is supplied
internal const int DEFAULT_PORT = 2010;
// client default priority
internal const int DEFAULT_PRIORITY = 100;
/// the domain for the UDP rendez vous
internal static readonly string DEFAULT_DOMAIN = "127.255.255.255:" + DEFAULT_PORT;
internal const string libVersion = "2.0.0";
private bool debug;
private static int serial = 0; /* an unique ID for each regexp */
private MyTcpListener app;
private ArrayList watchers;
private volatile Thread serverThread; // to ensure quick communication of the end
internal enum BindingType { BindRegexp, BindSimple };
internal struct ApplicationBinding /* Self Applications bindings */
{
internal BindingType type;
internal Int32 key;
internal MessageHandler callback;
internal string regexp;
internal object[] args;
}
internal Hashtable bindings;
private ArrayList clients;
private string[] sent_messageClasses = null;
private bool stopped = false;
internal int applicationPort; /* Application port number */
internal string applicationUniqueId; /* identifier Application unique timestamp-ipaddress-port */
internal int applicationPriority = DEFAULT_PRIORITY;
internal string ready_message = null;
// for synchronous event
private Control syncControl = null;
/// Readies the structures for the software bus connexion.
///
/// This sample shows how to start working with Ivy.
///
/// Ivy bus = new Ivy("Dummy agent","ready");
/// bus.bindMsg("(.*)",new Ivy.MessageHandler(myMessageListener));
/// bus.start(null);
///
/// How to send & receive:
/// the Ivy agent A performs b.bindMsg("^Hello (*)",cb);
/// the Ivy agent B performs b2.sendMsg("Hello world");
/// a thread in A will run the callback cb with its second argument set
/// to a array of string, with one single element, "world"
///
///
/// the real work begin in the start() method
///
///
/// The name of your Ivy agent on the software bus
///
/// The hellow message you will send once ready
///
public Ivy(string name, string message)
{
clients = ArrayList.Synchronized( new ArrayList() );
//callbacks = Hashtable.Synchronized( new Hashtable() );
//clients_data = Hashtable.Synchronized( new Hashtable() );
//regexp_out = Hashtable.Synchronized( new Hashtable());
bindings = Hashtable.Synchronized( new Hashtable());
appName = name;
ready_message = message;
string debug_str = ConfigurationSettings.AppSettings["IVY_DEBUG"];
if ( debug_str != null )
debug = bool.Parse(debug_str);
}
public Ivy(string name, string message, Control sync) :this( name, message )
{
syncControl = sync;
}
/// connects the Ivy bus to a domain or list of domains.
///
///
/// One thread (IvyWatcher) for each traffic rendezvous (either UDP broadcast or TCP Multicast).
/// One thread (serverThread/Ivy) to accept incoming connexions on server socket.
/// a thread for each IvyClient when the connexion has been done.
///
/// a domain of the form 10.0.0:1234, it is similar to the
/// netmask without the trailing .255. This will determine the meeting point
/// of the different applications. Right now, this is done with an UDP
/// broadcast. Beware of routing problems ! You can also use a comma
/// separated list of domains.
/// *
///
///
public void start(string domainbus)
{
if (domainbus == null)
domainbus = Environment.GetEnvironmentVariable("IVYBUS");
if (domainbus == null)
domainbus = DEFAULT_DOMAIN;
try
{
IPAddress localaddr = Dns.Resolve(Dns.GetHostName()).AddressList[0];
app = new MyTcpListener(IPAddress.Any,0);
app.Start();
applicationPort = ((IPEndPoint) app.LocalEndpoint).Port;
applicationUniqueId = string.Format("{0}-{1}-{2}",
DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond,
localaddr.ToString().Replace(".",""),
applicationPort );
}
catch (IOException e)
{
throw new IvyException("can't open TCP service socket " + e);
}
traceDebug("lib: " + libVersion + " protocol: " + PROCOCOLVERSION + " TCP service open on port " + applicationPort);
watchers = new ArrayList();
Domain[] d = parseDomains(domainbus);
// readies the rendezvous : an IvyWatcher (thread) per domain bus
for (int index = 0; index < d.Length; index++)
{
IvyWatcher watcher = new IvyWatcher(this, d[index].Domainaddr, d[index].Port);
watchers.Add(watcher);
}
serverThread = new Thread(new ThreadStart(this.Run));
serverThread.Name = "Ivy Tcp Server Thread";
serverThread.Start();
// Wait for readyness
while ( serverThread.ThreadState != ThreadState.Running || !app.IsActive())
{
traceDebug( " Ivy Threading start in progress..." );
Thread.Sleep( 100 );
}
// sends the broadcasts and listen to incoming connexions
for (int i = 0; i < watchers.Count; i++)
{
((IvyWatcher) watchers[i]).start();
}
}
public void SortClients()
{
lock ( clients.SyncRoot )
{
clients.Sort(new IvyClient.IvyClientPriority());
}
}
/* a small private method for debbugging purposes */
public string domains(string toparse)
{
string s = "broadcasting on ";
Ivy.Domain[] d = parseDomains(toparse);
for (int index = 0; index < d.Length; index++)
{
s += d[index].Domainaddr + ":" + d[index].Port + " ";
}
return s;
}
internal Domain[] parseDomains(string domainbus)
{
string[] st = domainbus.Split(',');
Domain[] d = new Domain[st.Length];
for (int i = 0; i < st.Length; i++)
{
d[i] = new Domain(IvyWatcher.getDomain(st[i]), IvyWatcher.getPort(st[i]));
}
return d;
}
/// disconnects from the Ivy bus
///
public void stop()
{
lock(this)
{
if (stopped)
return ;
stopped = true;
traceDebug("beginning stopping the bus");
try
{
// stopping the serverThread
if (serverThread != null)
{
app.Stop();
serverThread.Join();
serverThread = null;
}
// The serverThread might be stopped even before having been created
if (app != null)
app.Stop();
// stopping the IvyWatchers
if ( watchers != null )
for (int i = 0; i < watchers.Count; i++)
{
((IvyWatcher) watchers[i]).stop();
}
// stopping the remaining IvyClients
// copy the values in temporary variable to eliminate Thread modifying collection
if ( clients.Count != 0 )
{
IvyClient[] copyClient;
copyClient = new IvyClient[clients.Count];
lock( clients.SyncRoot )
{
clients.CopyTo( copyClient,0 );
}
foreach (IvyClient client in copyClient )
{
client.close(true); // will notify the reading thread
//removeClient(client); already donne in the thread
}
}
}
catch (IOException e)
{
traceDebug("IOexception Stop ");
}
traceDebug("the bus should have stopped so far");
}
}
/// Send a message to someone on the bus
///
///
/// Performs a pattern matching according to everyone's regexps, and sends
/// the results to the relevant ivy agents.
/// There is one thread for each client connected, we could also
/// create another thread each time we send a message.
///
/// A message which will be compared to the regular
/// expressions of the different clients
///
/// the number of messages actually sent
///
///
public int sendMsg(string message)
{
int count = 0;
// an alternate implementation would one sender thread per client
// instead of one for all the clients. It might be a performance issue
lock ( clients.SyncRoot )
{
foreach (IvyClient client in clients )
{
count += client.sendMsg(message);
}
}
return count;
}
/// Send a formated message to someone on the bus
///
///
/// Performs a pattern matching according to everyone's regexps, and sends
/// the results to the relevant ivy agents.
/// There is one thread for each client connected, we could also
/// create another thread each time we send a message.
///
/// A string message format to build the message
/// args used in message format
///
/// the number of messages actually sent
///
///
public int sendMsg(string format, params object[] args )
{
return sendMsg( string.Format( format, args ) );
}
/// Send a formated message to someone on the bus
///
///
/// Performs a pattern matching according to everyone's regexps, and sends
/// the results to the relevant ivy agents.
/// There is one thread for each client connected, we could also
/// create another thread each time we send a message.
///
/// A string message format to build the message
/// args used in message format
///
/// the number of messages actually sent
///
///
public int sendMsg(CultureInfo culture, string format, params object[] args )
{
return sendMsg( string.Format( culture, format, args ) );
}
/// Subscribes to a regular expression.
///
///
/// The callback will be executed with
/// the saved parameters of the regexp as arguments when a message will sent
/// by another agent. A program doesn't receive its own messages.
///
///
/// a regular expression, groups are done with parenthesis
///
/// any objects implementing the Ivy.MessageListener
///
/// the id of the regular expression
///
///
//
// for compatibility raison with old IVY
public int bindMsg(String regexp, MessageHandler callback)
{
return bindMsg( regexp, callback, null);
}
public int bindMsg(string regexp, MessageHandler callback, params object[] args )
{
// creates a new binding (regexp,callback)
ApplicationBinding newbind;
newbind.type = BindingType.BindRegexp;
newbind.key = serial++;
newbind.regexp = regexp;
newbind.callback = callback;
newbind.args = args;
lock (bindings.SyncRoot) bindings.Add( newbind.key, newbind);
// notifies the other clients this new regexp
lock ( clients.SyncRoot )
{
foreach (IvyClient c in clients )
{
c.sendBinding(newbind);
}
}
return newbind.key;
}
/// unsubscribes a regular expression
///
/// the id of the regular expression, returned when it was bound
///
///
public void unBindMsg(int id)
{
if ( ! bindings.ContainsKey( id ) )
{
throw new IvyException("client wants to remove an unexistant regexp " + id);
}
ApplicationBinding bind = (ApplicationBinding) bindings[id];
lock( clients.SyncRoot )
{
foreach (IvyClient c in clients )
{
c.delBinding(bind);
}
}
lock( bindings.SyncRoot ) bindings.Remove( id );
}
/// unsubscribes a regular expression
///
/// a boolean, true if the regexp existed, false otherwise or
/// whenever an exception occured during unbinding
///
/// the string for the regular expression
///
///
public bool unBindMsg(string re)
{
foreach (ApplicationBinding bind in bindings.Values )
{
if ( bind.regexp == re )
{
try
{
unBindMsg(bind.key);
}
catch (IvyException ie)
{
return false;
}
return true;
}
}
return false;
}
/// Subscribes to a simple expression ( msg ar1 arg2 arg3 etc).
///
///
/// The callback will be executed with
/// the saved parameters of the regexp as arguments when a message will sent
/// by another agent. A program doesn't receive its own messages.
///
///
/// a regular expression, groups are done with parenthesis
///
/// any objects implementing the Ivy.MessageListener
///
/// the id of the regular expression
///
///
public int bindSimpleMsg(string expression, MessageHandler callback, params object[] args )
{
// creates a new binding (regexp,callback)
ApplicationBinding newbind;
newbind.type = BindingType.BindSimple;
newbind.key = serial++;
newbind.regexp = expression;
newbind.callback = callback;
newbind.args = args;
lock (bindings.SyncRoot) bindings.Add( newbind.key, newbind);
// notifies the other clients this new regexp
lock ( clients.SyncRoot )
{
foreach (IvyClient c in clients )
{
c.sendBinding(newbind);
}
}
return newbind.key;
}
internal void FireDirectMessage (IvyClient app, int id, string arg )
{
if ( directMessageReceived != null )
{
if ( syncControl != null )
syncControl.Invoke( directMessageReceived, new object[]{app,id,arg});
else directMessageReceived( app, id, arg);
}
}
internal void FireClientConnected (IvyClient app)
{
if ( clientConnected != null )
{
if ( syncControl != null )
syncControl.Invoke( clientConnected, new object[]{app});
else clientConnected( app);
}
}
internal void FireClientDisconnected (IvyClient app)
{
if ( clientDisconnected != null )
{
if ( syncControl != null )
syncControl.Invoke( clientDisconnected, new object[]{app});
else clientDisconnected( app);
}
}
internal void FireClientAddBinding (IvyClient app, string regexp)
{
if ( addBinding != null )
{
if ( syncControl != null )
syncControl.Invoke( addBinding, new object[]{app,regexp});
else addBinding( app,regexp);
}
}
internal void FireClientRemoveBinding (IvyClient app, string regexp)
{
if ( removeBinding != null )
{
if ( syncControl != null )
syncControl.Invoke( removeBinding, new object[]{app,regexp});
else removeBinding( app,regexp);
}
}
internal bool FireDie (IvyClient app, int id )
{
if ( dieReceived != null )
{
if ( syncControl != null )
syncControl.Invoke( dieReceived, new object[]{app,id});
else return dieReceived( app, id);
}
return false;
}
/*
* removes a client from the list
*/
internal void removeClient(IvyClient c)
{
lock( clients.SyncRoot )
{
clients.Remove(c);
}
}
/// gives a list of IvyClient(s) with the name given in parameter
///
/// The name of the Ivy agent you're looking for
///
public ArrayList getIvyClientsByName(string name)
{
ArrayList v = new ArrayList();
foreach (IvyClient ic in clients )
{
if (((ic.ApplicationName).CompareTo(name)) == 0)
v.Add(ic);
}
return v;
}
/////////////////////////////////////////////////////////////////:
//
// Protected methods
//
/////////////////////////////////////////////////////////////////:
internal void addClient(Socket socket)
{
if (stopped)
return ;
IvyClient client = new IvyClient(this,socket);
lock ( clients.SyncRoot )
{
clients.Add(client);
}
traceDebug(ClientNames);
}
internal void callCallback(IvyClient client, Int32 key, string[] tab)
{
ApplicationBinding bind = ( ApplicationBinding ) bindings[key];
if (bind.callback == null)
{
throw new IvyException("(callCallback) Not regexp matching id " + key);
}
if ( syncControl != null )
syncControl.Invoke( bind.callback, new object[]{client,tab});
else bind.callback(client, tab);
}
public static string getDomain(string domainbus)
{
if (domainbus == null)
{
domainbus = ConfigurationSettings.AppSettings["IVYBUS"];
}
if (domainbus == null)
{
domainbus = Environment.GetEnvironmentVariable("IVYBUS");
}
if (domainbus == null)
domainbus = DEFAULT_DOMAIN;
return domainbus;
}
/// checks the "validity" of a regular expression.
internal bool CheckRegexp(string exp)
{
bool regexp_ok = true;
// TODO Bug
// ClockStop ClockStart & ^Clock(Start|Pause)
// should Stop to the first parent
if ((sent_messageClasses != null) && exp.StartsWith("^") )
{
regexp_ok = false;
// extract first word from regexp...
string token = Regex.Replace(exp, @"^\^(?[a-zA-Z_0-9-]+).*" , @"${token}");
foreach (string exp_class in sent_messageClasses)
{
if ( exp_class.StartsWith(token) )
return true;
}
}
return regexp_ok;
}
/*
* prevents two clients from connecting to each other at the same time
* there might still be a lingering bug here, that we could avoid with the
* SchizoToken.
*/
internal bool checkConnected(IvyClient clnt)
{
if (clnt.AppPort == 0)
return false;
lock( clients.SyncRoot )
{
foreach (IvyClient client in clients )
{
if (clnt != client && client.sameClient(clnt))
return true;
}
}
return false;
}
/*
* the service socket thread reader main loop
*/
private void Run()
{
traceDebug("Ivy service Thread started");
bool running = true;
while (running)
{
try
{
Socket socket = app.AcceptSocket();
if (stopped)
break;
// early disconnexion
addClient(socket); // the peer called me
}
catch (IOException e)
{
traceDebug("Error IvyServer exception: " + e.Message);
Console.Out.WriteLine("Ivy server socket reader caught an exception " + e.Message);
// e.printStackTrace();
}
catch (SocketException e)
{
traceDebug("my server socket has been closed");
running = false;
}
}
traceDebug("Ivy service Thread stopped");
}
private void traceDebug(string s)
{
if (debug)
Console.Out.WriteLine("-->ivy<-- " + s);
}
internal class Domain
{
public virtual string Domainaddr
{
get
{
return domainaddr;
}
}
public virtual int Port
{
get
{
return port;
}
}
private string domainaddr;
private int port;
public Domain( string domainaddr, int port)
{
this.domainaddr = domainaddr; this.port = port;
}
public override string ToString()
{
return domainaddr + ":" + port;
}
}
}
}