/// François-Régis Colin
/// http://www.tls.cena.fr/products/ivy/
/// *
/// (C) CENA
/// *
namespace IvyBus
{
using System;
using System.IO;
using System.Collections;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Configuration;
using System.Windows.Forms;
/// The Main bus Class
///
public class Ivy
{
/* Event handler */
public delegate void DirectMessageHandler (IvyClient app, int id, string arg );
public delegate void ClientConnectedHandler (IvyClient app);
public delegate void ClientDisconnectedHandler (IvyClient app);
public delegate bool DieHandler (IvyClient app, int id );
public delegate void MessageHandler (IvyClient app, string[] args );
/* Event */
/// fires when a new client connect to the bus
/// A callback handling the notification of connexions and
/// disconnections, may be null
///
///
public event ClientConnectedHandler clientConnected;
/// fires when a client discconnect from the bus
public event ClientDisconnectedHandler clientDisconnected;
/// fires when a client receive a direct message from another client
public event DirectMessageHandler directMessageReceived;
/// fires when somebody ask for killing every client on the bus
public event DieHandler dieReceived;
/// IvyClients accesses the list of the connected clients
public Hashtable IvyClients
{
get
{
return clients;
}
}
public bool Debug
{
get
{
return debug;
}
set
{
debug = value;
}
}
/// ClientNames accesses the name list of the connected clients
private String ClientNames
{
get
{
String s = appName + " clients are: ";
lock( clients.SyncRoot )
{
foreach (IvyClient client in clients.Values )
{
s += client.ApplicationName + " ";
}
}
return s;
}
}
/// AppName the application name
public String AppName
{
get
{
return appName;
}
}
/// the name of the application on the bus
internal String appName;
/// the protocol version number
internal const int PROCOCOLVERSION = 3;
/// the port for the UDP rendez vous, if none is supplied
internal const int DEFAULT_PORT = 2010;
/// the domain for the UDP rendez vous
internal static readonly String DEFAULT_DOMAIN = "127.255.255.255:" + DEFAULT_PORT;
internal const String libVersion = "1.0.0";
private bool debug;
private static int serial = 0; /* an unique ID for each regexp */
private TcpListener app;
private ArrayList watchers;
private volatile Thread serverThread; // to ensure quick communication of the end
private Hashtable callbacks;
private Hashtable clients;
private String[] messages_classes = null;
private bool stopped = false;
internal int applicationPort; /* Application port number */
internal Hashtable regexp_out;
internal String ready_message = null;
// for synchronous event
private Control syncControl = null;
/// Readies the structures for the software bus connexion.
///
/// This sample shows how to start working with Ivy.
///
/// Ivy bus = new Ivy("Dummy agent","ready");
/// bus.bindMsg("(.*)",new Ivy.MessageHandler(myMessageListener));
/// bus.start(null);
///
/// How to send & receive:
/// the Ivy agent A performs b.bindMsg("^Hello (*)",cb);
/// the Ivy agent B performs b2.sendMsg("Hello world");
/// a thread in A will run the callback cb with its second argument set
/// to a array of String, with one single element, "world"
///
///
/// the real work begin in the start() method
///
///
/// The name of your Ivy agent on the software bus
///
/// The hellow message you will send once ready
///
public Ivy(String name, String message)
{
callbacks = new Hashtable();
clients = Hashtable.Synchronized( new Hashtable() );
regexp_out = Hashtable.Synchronized( new Hashtable());
appName = name;
ready_message = message;
String debug_str = ConfigurationSettings.AppSettings["IVY_DEBUG"];
if ( debug_str != null )
debug = bool.Parse(debug_str);
}
public Ivy(String name, String message, Control sync) :this( name, message )
{
syncControl = sync;
}
/// connects the Ivy bus to a domain or list of domains.
///
///
/// One thread (IvyWatcher) for each traffic rendezvous (either UDP broadcast or TCP Multicast).
/// One thread (serverThread/Ivy) to accept incoming connexions on server socket.
/// a thread for each IvyClient when the connexion has been done.
///
/// a domain of the form 10.0.0:1234, it is similar to the
/// netmask without the trailing .255. This will determine the meeting point
/// of the different applications. Right now, this is done with an UDP
/// broadcast. Beware of routing problems ! You can also use a comma
/// separated list of domains.
/// *
///
///
public void start(String domainbus)
{
if (domainbus == null)
domainbus = Environment.GetEnvironmentVariable("IVYBUS");
if (domainbus == null)
domainbus = DEFAULT_DOMAIN;
try
{
app = new TcpListener(0);
app.Start();
applicationPort = ((IPEndPoint) app.LocalEndpoint).Port;
}
catch (IOException e)
{
throw new IvyException("can't open TCP service socket " + e);
}
traceDebug("lib: " + libVersion + " protocol: " + PROCOCOLVERSION + " TCP service open on port " + applicationPort);
watchers = new ArrayList();
Domain[] d = parseDomains(domainbus);
// readies the rendezvous : an IvyWatcher (thread) per domain bus
for (int index = 0; index < d.Length; index++)
{
IvyWatcher watcher = new IvyWatcher(this, d[index].Domainaddr, d[index].Port);
watchers.Add(watcher);
}
serverThread = new Thread(new ThreadStart(this.Run));
serverThread.Start();
// sends the broadcasts and listen to incoming connexions
for (int i = 0; i < watchers.Count; i++)
{
((IvyWatcher) watchers[i]).start();
}
}
/* a small private method for debbugging purposes */
public String domains(String toparse)
{
String s = "broadcasting on ";
Ivy.Domain[] d = parseDomains(toparse);
for (int index = 0; index < d.Length; index++)
{
s += d[index].Domainaddr + ":" + d[index].Port + " ";
}
return s;
}
internal Domain[] parseDomains(String domainbus)
{
String[] st = domainbus.Split(',');
Domain[] d = new Domain[st.Length];
for (int i = 0; i < st.Length; i++)
{
d[i] = new Domain(IvyWatcher.getDomain(st[i]), IvyWatcher.getPort(st[i]));
}
return d;
}
/// disconnects from the Ivy bus
///
public void stop()
{
lock(this)
{
if (stopped)
return ;
stopped = true;
traceDebug("beginning stopping the bus");
try
{
// stopping the serverThread
Thread t = serverThread;
serverThread = null;
if (t != null)
t.Interrupt();
// The serverThread might be stopped even before having been created
if (app != null)
app.Stop();
// stopping the IvyWatchers
if ( watchers != null )
for (int i = 0; i < watchers.Count; i++)
{
((IvyWatcher) watchers[i]).stop();
}
// stopping the remaining IvyClients
// copy the values in temporary variable to eliminate Thread modifying collection
IvyClient[] copyClient;
copyClient = new IvyClient[clients.Count];
lock( clients.SyncRoot )
{
clients.Values.CopyTo( copyClient,0 );
}
foreach (IvyClient client in copyClient )
{
client.close(true); // will notify the reading thread
//removeClient(client); already donne in the thread
}
}
catch (IOException e)
{
traceDebug("IOexception Stop ");
}
traceDebug("the bus should have stopped so far");
}
}
/// Send a message to someone on the bus
///
///
/// Performs a pattern matching according to everyone's regexps, and sends
/// the results to the relevant ivy agents.
/// There is one thread for each client connected, we could also
/// create another thread each time we send a message.
///
/// A message which will be compared to the regular
/// expressions of the different clients
///
/// the number of messages actually sent
///
///
public int sendMsg(String message)
{
int count = 0;
// an alternate implementation would one sender thread per client
// instead of one for all the clients. It might be a performance issue
lock ( clients.SyncRoot )
{
foreach (IvyClient client in clients.Values )
{
count += client.sendMsg(message);
}
}
return count;
}
/// Send a formated message to someone on the bus
///
///
/// Performs a pattern matching according to everyone's regexps, and sends
/// the results to the relevant ivy agents.
/// There is one thread for each client connected, we could also
/// create another thread each time we send a message.
///
/// A string message format to build the message
/// args used in message format
///
/// the number of messages actually sent
///
///
public int sendMsg(string format, params object[] args )
{
return sendMsg( string.Format( format, args ) );
}
/// Subscribes to a regular expression.
///
///
/// The callback will be executed with
/// the saved parameters of the regexp as arguments when a message will sent
/// by another agent. A program doesn't receive its own messages.
///
///
/// a regular expression, groups are done with parenthesis
///
/// any objects implementing the Ivy.MessageListener
///
/// the id of the regular expression
///
///
public int bindMsg(String regexp, MessageHandler callback)
{
// creates a new binding (regexp,callback)
Int32 key = serial++;
lock (regexp_out.SyncRoot) regexp_out.Add( key, regexp);
lock (callback) callbacks.Add( key, callback);
// notifies the other clients this new regexp
lock ( clients.SyncRoot )
{
foreach (IvyClient c in clients.Values )
{
c.sendRegexp(key, regexp);
}
}
return key;
}
/// unsubscribes a regular expression
///
/// the id of the regular expression, returned when it was bound
///
///
public void unBindMsg(int id)
{
if ((regexp_out[id] == null) || (callbacks[id] == null))
{
throw new IvyException("client wants to remove an unexistant regexp " + id);
}
lock( clients.SyncRoot )
{
foreach (IvyClient c in clients.Values )
{
c.delRegexp(id);
}
}
}
/// unsubscribes a regular expression
///
/// a boolean, true if the regexp existed, false otherwise or
/// whenever an exception occured during unbinding
///
/// the string for the regular expression
///
///
public bool unBindMsg(String re)
{
foreach (Int32 k in regexp_out.Keys )
{
if (((String) regexp_out[k]).CompareTo(re) == 0)
{
try
{
unBindMsg(k);
}
catch (IvyException ie)
{
return false;
}
return true;
}
}
return false;
}
internal void FireDirectMessage (IvyClient app, int id, string arg )
{
if ( directMessageReceived != null )
{
if ( syncControl != null )
syncControl.Invoke( directMessageReceived, new object[]{app,id,arg});
else directMessageReceived( app, id, arg);
}
}
internal void FireClientConnected (IvyClient app)
{
if ( clientConnected != null )
{
if ( syncControl != null )
syncControl.Invoke( clientConnected, new object[]{app});
else clientConnected( app);
}
}
internal void FireClientDisconnected (IvyClient app)
{
if ( clientDisconnected != null )
{
if ( syncControl != null )
syncControl.Invoke( clientDisconnected, new object[]{app});
else clientDisconnected( app);
}
}
internal bool FireDie (IvyClient app, int id )
{
if ( dieReceived != null )
{
if ( syncControl != null )
syncControl.Invoke( dieReceived, new object[]{app,id});
else return dieReceived( app, id);
}
return false;
}
/*
* removes a client from the list
*/
internal void removeClient(IvyClient c)
{
lock( clients.SyncRoot )
{
clients.Remove(c.ClientKey);
}
}
/// gives a list of IvyClient(s) with the name given in parameter
///
/// The name of the Ivy agent you're looking for
///
public ArrayList getIvyClientsByName(String name)
{
ArrayList v = new ArrayList();
foreach (IvyClient ic in clients.Values )
{
if (((ic.ApplicationName).CompareTo(name)) == 0)
v.Add(ic);
}
return v;
}
/////////////////////////////////////////////////////////////////:
//
// Protected methods
//
/////////////////////////////////////////////////////////////////:
internal void addClient(MyTcpClient socket)
{
if (stopped)
return ;
IvyClient client = new IvyClient(this,socket);
lock ( clients.SyncRoot )
{
clients.Add( client.ClientKey, client);
}
traceDebug(ClientNames);
}
internal void callCallback(IvyClient client, Int32 key, String[] tab)
{
MessageHandler callback = (MessageHandler) callbacks[key];
if (callback == null)
{
throw new IvyException("(callCallback) Not regexp matching id " + key);
}
if ( syncControl != null )
syncControl.Invoke( callback, new object[]{client,tab});
else callback(client, tab);
}
private static String[] myTokenize(String s, String separator)
{
int index = 0, last = 0, length = s.Length;
ArrayList v = new ArrayList();
if (length != 0)
while (true)
{
index = s.IndexOf(separator, last);
if (index == - 1)
{
v.Add(s.Substring(last, (length) - (last)));
break;
}
else if (index < s.Length)
{
v.Add(s.Substring(last, (index) - (last)));
last = index + 1;
}
else
{
break;
}
}
String[] tab = new String[v.Count];
v.CopyTo(tab);
return tab;
}
public static String getDomain(String domainbus)
{
if (domainbus == null)
{
domainbus = ConfigurationSettings.AppSettings["IVYBUS"];
}
if (domainbus == null)
{
domainbus = Environment.GetEnvironmentVariable("IVYBUS");
}
if (domainbus == null)
domainbus = DEFAULT_DOMAIN;
return domainbus;
}
/// checks the "validity" of a regular expression.
internal bool CheckRegexp(String exp)
{
bool regexp_ok = true;
if (exp.StartsWith("^") && messages_classes != null)
{
regexp_ok = false;
for (int i = 0; i < messages_classes.Length; i++)
{
if (messages_classes[i].Equals(exp.Substring(1)))
return true;
}
}
return regexp_ok;
}
/*
* prevents two clients from connecting to each other at the same time
* there might still be a lingering bug here, that we could avoid with the
* SchizoToken.
*/
internal bool checkConnected(IvyClient clnt)
{
if (clnt.AppPort == 0)
return false;
lock( clients.SyncRoot )
{
foreach (IvyClient client in clients.Values )
{
if (clnt != client && client.sameClient(clnt))
return true;
}
}
return false;
}
/*
* the service socket thread reader main loop
*/
private void Run()
{
traceDebug("Ivy service Thread started");
bool running = true;
while (running)
{
try
{
Socket sock = app.AcceptSocket();
MyTcpClient socket = new MyTcpClient(sock);
if (stopped)
break;
// early disconnexion
addClient(socket); // the peer called me
}
catch (IOException e)
{
traceDebug("Error IvyServer exception: " + e.Message);
Console.Out.WriteLine("Ivy server socket reader caught an exception " + e.Message);
// e.printStackTrace();
}
catch (ThreadInterruptedException te)
{
Console.Error.WriteLine( te.Message );
Console.Error.WriteLine( te.StackTrace);
running = false;
}
catch (SocketException e)
{
traceDebug("my server socket has been closed");
running = false;
}
}
traceDebug("Ivy service Thread stopped");
}
private void traceDebug(String s)
{
if (debug)
Console.Out.WriteLine("-->ivy<-- " + s);
}
internal class Domain
{
public virtual String Domainaddr
{
get
{
return domainaddr;
}
}
public virtual int Port
{
get
{
return port;
}
}
private String domainaddr;
private int port;
public Domain( String domainaddr, int port)
{
this.domainaddr = domainaddr; this.port = port;
}
public override String ToString()
{
return domainaddr + ":" + port;
}
}
/*
* unitary test. Normally, running Ivy.Ivy should stop in 2.3 seconds :)
*/
[STAThread]
public static void Main(String[] args)
{
Ivy bus = new Ivy("Test Unitaire", "TU ready");
try
{
bus.start(null);
try
{
Thread.Sleep(new TimeSpan(10000 * 2000));
}
catch (ThreadInterruptedException ie)
{
}
bus.stop();
}
catch (IvyException ie)
{
Console.Error.WriteLine( ie.Message );
Console.Error.WriteLine( ie.StackTrace );
}
}
}
}