/// François-Régis Colin /// http://www.tls.cena.fr/products/ivy/ /// * /// (C) CENA /// * namespace IvyBus { using System; using System.IO; using System.Collections; using System.Net; using System.Net.Sockets; using System.Threading; using System.Configuration; using System.Windows.Forms; using System.Globalization; /// The Main bus Class /// public class Ivy { /* Event handler */ public delegate void DirectMessageHandler (IvyClient app, int id, string arg ); public delegate void ClientConnectedHandler (IvyClient app); public delegate void ClientDisconnectedHandler (IvyClient app); public delegate bool DieHandler (IvyClient app, int id ); public delegate void MessageHandler ( IvyClient app, string[] args ); /* Event */ /// fires when a new client connect to the bus /// A callback handling the notification of connexions and /// disconnections, may be null /// /// public event ClientConnectedHandler clientConnected; /// fires when a client discconnect from the bus public event ClientDisconnectedHandler clientDisconnected; /// fires when a client receive a direct message from another client public event DirectMessageHandler directMessageReceived; /// fires when somebody ask for killing every client on the bus public event DieHandler dieReceived; /// IvyClients accesses the list of the connected clients public Hashtable IvyClients { get { return clients; } } public bool Debug { get { return debug; } set { debug = value; } } /// ClientNames accesses the name list of the connected clients private String ClientNames { get { String s = appName + " clients are: "; lock( clients.SyncRoot ) { foreach (IvyClient client in clients.Values ) { s += client.ApplicationName + " "; } } return s; } } /// AppName the application name public String AppName { get { return appName; } } public String[] Classes { get { return messages_classes; } set { messages_classes = value; } } /// the name of the application on the bus internal String appName; /// the protocol version number internal const int PROCOCOLVERSION = 3; /// the port for the UDP rendez vous, if none is supplied internal const int DEFAULT_PORT = 2010; /// the domain for the UDP rendez vous internal static readonly String DEFAULT_DOMAIN = "127.255.255.255:" + DEFAULT_PORT; internal const String libVersion = "1.0.0"; private bool debug; private static int serial = 0; /* an unique ID for each regexp */ private TcpListener app; private ArrayList watchers; private volatile Thread serverThread; // to ensure quick communication of the end private Hashtable callbacks; private Hashtable clients_data; private Hashtable clients; private String[] messages_classes = null; private bool stopped = false; internal int applicationPort; /* Application port number */ internal Hashtable regexp_out; internal String ready_message = null; // for synchronous event private Control syncControl = null; /// Readies the structures for the software bus connexion. /// /// This sample shows how to start working with Ivy. /// /// Ivy bus = new Ivy("Dummy agent","ready"); /// bus.bindMsg("(.*)",new Ivy.MessageHandler(myMessageListener)); /// bus.start(null); /// /// How to send & receive: /// the Ivy agent A performs b.bindMsg("^Hello (*)",cb); /// the Ivy agent B performs b2.sendMsg("Hello world"); /// a thread in A will run the callback cb with its second argument set /// to a array of String, with one single element, "world" /// /// /// the real work begin in the start() method /// /// /// The name of your Ivy agent on the software bus /// /// The hellow message you will send once ready /// public Ivy(String name, String message) { callbacks = Hashtable.Synchronized( new Hashtable() ); clients = Hashtable.Synchronized( new Hashtable() ); clients_data = Hashtable.Synchronized( new Hashtable() ); regexp_out = Hashtable.Synchronized( new Hashtable()); appName = name; ready_message = message; String debug_str = ConfigurationSettings.AppSettings["IVY_DEBUG"]; if ( debug_str != null ) debug = bool.Parse(debug_str); } public Ivy(String name, String message, Control sync) :this( name, message ) { syncControl = sync; } /// connects the Ivy bus to a domain or list of domains. /// /// /// One thread (IvyWatcher) for each traffic rendezvous (either UDP broadcast or TCP Multicast). /// One thread (serverThread/Ivy) to accept incoming connexions on server socket. /// a thread for each IvyClient when the connexion has been done. /// /// a domain of the form 10.0.0:1234, it is similar to the /// netmask without the trailing .255. This will determine the meeting point /// of the different applications. Right now, this is done with an UDP /// broadcast. Beware of routing problems ! You can also use a comma /// separated list of domains. /// * /// /// public void start(String domainbus) { if (domainbus == null) domainbus = Environment.GetEnvironmentVariable("IVYBUS"); if (domainbus == null) domainbus = DEFAULT_DOMAIN; try { app = new TcpListener(0); app.Start(); applicationPort = ((IPEndPoint) app.LocalEndpoint).Port; } catch (IOException e) { throw new IvyException("can't open TCP service socket " + e); } traceDebug("lib: " + libVersion + " protocol: " + PROCOCOLVERSION + " TCP service open on port " + applicationPort); watchers = new ArrayList(); Domain[] d = parseDomains(domainbus); // readies the rendezvous : an IvyWatcher (thread) per domain bus for (int index = 0; index < d.Length; index++) { IvyWatcher watcher = new IvyWatcher(this, d[index].Domainaddr, d[index].Port); watchers.Add(watcher); } serverThread = new Thread(new ThreadStart(this.Run)); serverThread.Name = "Ivy Tcp Server Thread"; serverThread.Start(); // sends the broadcasts and listen to incoming connexions for (int i = 0; i < watchers.Count; i++) { ((IvyWatcher) watchers[i]).start(); } } /* a small private method for debbugging purposes */ public String domains(String toparse) { String s = "broadcasting on "; Ivy.Domain[] d = parseDomains(toparse); for (int index = 0; index < d.Length; index++) { s += d[index].Domainaddr + ":" + d[index].Port + " "; } return s; } internal Domain[] parseDomains(String domainbus) { String[] st = domainbus.Split(','); Domain[] d = new Domain[st.Length]; for (int i = 0; i < st.Length; i++) { d[i] = new Domain(IvyWatcher.getDomain(st[i]), IvyWatcher.getPort(st[i])); } return d; } /// disconnects from the Ivy bus /// public void stop() { lock(this) { if (stopped) return ; stopped = true; traceDebug("beginning stopping the bus"); try { // stopping the serverThread if (serverThread != null) { app.Stop(); serverThread.Join(); serverThread = null; } // The serverThread might be stopped even before having been created if (app != null) app.Stop(); // stopping the IvyWatchers if ( watchers != null ) for (int i = 0; i < watchers.Count; i++) { ((IvyWatcher) watchers[i]).stop(); } // stopping the remaining IvyClients // copy the values in temporary variable to eliminate Thread modifying collection if ( clients.Count != 0 ) { IvyClient[] copyClient; copyClient = new IvyClient[clients.Count]; lock( clients.SyncRoot ) { clients.Values.CopyTo( copyClient,0 ); } foreach (IvyClient client in copyClient ) { client.close(true); // will notify the reading thread //removeClient(client); already donne in the thread } } } catch (IOException e) { traceDebug("IOexception Stop "); } traceDebug("the bus should have stopped so far"); } } /// Send a message to someone on the bus /// /// /// Performs a pattern matching according to everyone's regexps, and sends /// the results to the relevant ivy agents. /// There is one thread for each client connected, we could also /// create another thread each time we send a message. /// /// A message which will be compared to the regular /// expressions of the different clients /// /// the number of messages actually sent /// /// public int sendMsg(String message) { int count = 0; // an alternate implementation would one sender thread per client // instead of one for all the clients. It might be a performance issue lock ( clients.SyncRoot ) { foreach (IvyClient client in clients.Values ) { count += client.sendMsg(message); } } return count; } /// Send a formated message to someone on the bus /// /// /// Performs a pattern matching according to everyone's regexps, and sends /// the results to the relevant ivy agents. /// There is one thread for each client connected, we could also /// create another thread each time we send a message. /// /// A string message format to build the message /// args used in message format /// /// the number of messages actually sent /// /// public int sendMsg(string format, params object[] args ) { return sendMsg( string.Format( format, args ) ); } /// Send a formated message to someone on the bus /// /// /// Performs a pattern matching according to everyone's regexps, and sends /// the results to the relevant ivy agents. /// There is one thread for each client connected, we could also /// create another thread each time we send a message. /// /// A string message format to build the message /// args used in message format /// /// the number of messages actually sent /// /// public int sendMsg(CultureInfo culture, string format, params object[] args ) { return sendMsg( string.Format( culture, format, args ) ); } /// Subscribes to a regular expression. /// /// /// The callback will be executed with /// the saved parameters of the regexp as arguments when a message will sent /// by another agent. A program doesn't receive its own messages. /// /// /// a regular expression, groups are done with parenthesis /// /// any objects implementing the Ivy.MessageListener /// /// the id of the regular expression /// /// public int bindMsg(String regexp, MessageHandler callback) { return bindMsg( regexp, callback, null); } public int bindMsg(String regexp, MessageHandler callback, object data) { // creates a new binding (regexp,callback) Int32 key = serial++; // TODO Make a unique Object grouping lock (regexp_out.SyncRoot) regexp_out.Add( key, regexp); lock (callbacks.SyncRoot) callbacks.Add( key, callback); lock (clients_data.SyncRoot) clients_data.Add( key, data); // notifies the other clients this new regexp lock ( clients.SyncRoot ) { foreach (IvyClient c in clients.Values ) { c.sendRegexp(key, regexp); } } return key; } /// unsubscribes a regular expression /// /// the id of the regular expression, returned when it was bound /// /// public void unBindMsg(int id) { if ((regexp_out[id] == null) || (callbacks[id] == null)) { throw new IvyException("client wants to remove an unexistant regexp " + id); } lock( clients.SyncRoot ) { foreach (IvyClient c in clients.Values ) { c.delRegexp(id); } } } /// unsubscribes a regular expression /// /// a boolean, true if the regexp existed, false otherwise or /// whenever an exception occured during unbinding /// /// the string for the regular expression /// /// public bool unBindMsg(String re) { foreach (Int32 k in regexp_out.Keys ) { if (((String) regexp_out[k]).CompareTo(re) == 0) { try { unBindMsg(k); } catch (IvyException ie) { return false; } return true; } } return false; } internal void FireDirectMessage (IvyClient app, int id, string arg ) { if ( directMessageReceived != null ) { if ( syncControl != null ) syncControl.Invoke( directMessageReceived, new object[]{app,id,arg}); else directMessageReceived( app, id, arg); } } internal void FireClientConnected (IvyClient app) { if ( clientConnected != null ) { if ( syncControl != null ) syncControl.Invoke( clientConnected, new object[]{app}); else clientConnected( app); } } internal void FireClientDisconnected (IvyClient app) { if ( clientDisconnected != null ) { if ( syncControl != null ) syncControl.Invoke( clientDisconnected, new object[]{app}); else clientDisconnected( app); } } internal bool FireDie (IvyClient app, int id ) { if ( dieReceived != null ) { if ( syncControl != null ) syncControl.Invoke( dieReceived, new object[]{app,id}); else return dieReceived( app, id); } return false; } /* * removes a client from the list */ internal void removeClient(IvyClient c) { lock( clients.SyncRoot ) { clients.Remove(c.ClientKey); } } /// gives a list of IvyClient(s) with the name given in parameter /// /// The name of the Ivy agent you're looking for /// public ArrayList getIvyClientsByName(String name) { ArrayList v = new ArrayList(); foreach (IvyClient ic in clients.Values ) { if (((ic.ApplicationName).CompareTo(name)) == 0) v.Add(ic); } return v; } /////////////////////////////////////////////////////////////////: // // Protected methods // /////////////////////////////////////////////////////////////////: internal void addClient(MyTcpClient socket) { if (stopped) return ; IvyClient client = new IvyClient(this,socket); lock ( clients.SyncRoot ) { clients.Add( client.ClientKey, client); } traceDebug(ClientNames); } internal void callCallback(IvyClient client, Int32 key, String[] tab) { MessageHandler callback = (MessageHandler) callbacks[key]; object data = clients_data[key]; if (callback == null) { throw new IvyException("(callCallback) Not regexp matching id " + key); } if ( syncControl != null ) syncControl.Invoke( callback, new object[]{client,tab}); else callback(client, tab); } private static String[] myTokenize(String s, String separator) { int index = 0, last = 0, length = s.Length; ArrayList v = new ArrayList(); if (length != 0) while (true) { index = s.IndexOf(separator, last); if (index == - 1) { v.Add(s.Substring(last, (length) - (last))); break; } else if (index < s.Length) { v.Add(s.Substring(last, (index) - (last))); last = index + 1; } else { break; } } String[] tab = new String[v.Count]; v.CopyTo(tab); return tab; } public static String getDomain(String domainbus) { if (domainbus == null) { domainbus = ConfigurationSettings.AppSettings["IVYBUS"]; } if (domainbus == null) { domainbus = Environment.GetEnvironmentVariable("IVYBUS"); } if (domainbus == null) domainbus = DEFAULT_DOMAIN; return domainbus; } /// checks the "validity" of a regular expression. internal bool CheckRegexp(String exp) { bool regexp_ok = true; if (exp.StartsWith("^") && messages_classes != null) { regexp_ok = false; for (int i = 0; i < messages_classes.Length; i++) { if (messages_classes[i].Equals(exp.Substring(1))) return true; } } return regexp_ok; } /* * prevents two clients from connecting to each other at the same time * there might still be a lingering bug here, that we could avoid with the * SchizoToken. */ internal bool checkConnected(IvyClient clnt) { if (clnt.AppPort == 0) return false; lock( clients.SyncRoot ) { foreach (IvyClient client in clients.Values ) { if (clnt != client && client.sameClient(clnt)) return true; } } return false; } /* * the service socket thread reader main loop */ private void Run() { traceDebug("Ivy service Thread started"); bool running = true; while (running) { try { Socket sock = app.AcceptSocket(); MyTcpClient socket = new MyTcpClient(sock); if (stopped) break; // early disconnexion addClient(socket); // the peer called me } catch (IOException e) { traceDebug("Error IvyServer exception: " + e.Message); Console.Out.WriteLine("Ivy server socket reader caught an exception " + e.Message); // e.printStackTrace(); } catch (SocketException e) { traceDebug("my server socket has been closed"); running = false; } } traceDebug("Ivy service Thread stopped"); } private void traceDebug(String s) { if (debug) Console.Out.WriteLine("-->ivy<-- " + s); } internal class Domain { public virtual String Domainaddr { get { return domainaddr; } } public virtual int Port { get { return port; } } private String domainaddr; private int port; public Domain( String domainaddr, int port) { this.domainaddr = domainaddr; this.port = port; } public override String ToString() { return domainaddr + ":" + port; } } /* * unitary test. Normally, running Ivy.Ivy should stop in 2.3 seconds :) */ [STAThread] public static void Main(String[] args) { Ivy bus = new Ivy("Test Unitaire", "TU ready"); try { bus.start(null); try { Thread.Sleep(new TimeSpan(10000 * 2000)); } catch (ThreadInterruptedException ie) { } bus.stop(); } catch (IvyException ie) { Console.Error.WriteLine( ie.Message ); Console.Error.WriteLine( ie.StackTrace ); } } } }