/// a software bus package /// * /// /// François-Régis Colin /// /// http://www.tls.cena.fr/products/ivy/ /// * ///
/// Ivy bus = new Ivy("Dummy agent","ready");
/// bus.bindMsg("(.*)",new IvyMessageHandler(myMessageListener));
/// bus.start(null);
/// 
/// ///
namespace IvyBus { using System; using System.IO; using System.Collections; using System.Net; using System.Net.Sockets; using System.Threading; using System.Configuration; public class Ivy { /* Event handler */ public delegate void DirectMessageHandler (IvyClient app, int id, string arg ); public delegate void ClientConnectedHandler (IvyClient app); public delegate void ClientDisconnectedHandler (IvyClient app); public delegate bool DieHandler (IvyClient app, int id ); public delegate void MessageHandler (IvyClient app, string[] args ); /* Event */ public event ClientConnectedHandler clientConnected; public event ClientDisconnectedHandler clientDisconnected; public event DirectMessageHandler directMessageReceived; public event DieHandler dieReceived; public Hashtable IvyClients { get { return clients; } } public bool Debug { get { return debug; } } private String ClientNames { get { String s = appName + " clients are: "; lock( clients.SyncRoot ) { foreach (IvyClient client in clients.Values ) { s += client.ApplicationName + " "; } } return s; } } public String AppName { get { return appName; } } /// the name of the application on the bus /// internal String appName; /// the protocol version number /// public const int PROCOCOLVERSION = 3; /// the port for the UDP rendez vous, if none is supplied /// public const int DEFAULT_PORT = 2010; /// the domain for the UDP rendez vous /// public static readonly String DEFAULT_DOMAIN = "127.255.255.255:" + DEFAULT_PORT; /// the library version, useful for development purposes only, when java is /// invoked with -DIVY_DEBUG /// public const String libVersion = "1.0.0"; private bool debug; private static int serial = 0; /* an unique ID for each regexp */ private TcpListener app; private ArrayList watchers; private volatile Thread serverThread; // to ensure quick communication of the end private Hashtable callbacks; private Hashtable clients; private String[] messages_classes = null; private bool stopped = false; internal int applicationPort; /* Application port number */ internal Hashtable regexp_out; internal String ready_message = null; public const int TIMEOUTLENGTH = 3000; /// Readies the structures for the software bus connexion. /// * /// All the dirty work is done un the start() method /// /// /// The name of your Ivy agent on the software bus /// /// The hellow message you will send once ready /// /// A callback handling the notification of connexions and /// disconnections, may be null /// /// public Ivy(String name, String message) { callbacks = new Hashtable(); clients = Hashtable.Synchronized( new Hashtable() ); regexp_out = new Hashtable(); appName = name; ready_message = message; String debug_str = ConfigurationSettings.AppSettings["IVY_DEBUG"]; if ( debug_str != null ) debug = bool.Parse(debug_str); } /// connects the Ivy bus to a domain or list of domains. /// * ///
  • One thread (IvyWatcher) for each traffic rendezvous (either UDP broadcast or TCPMulticast) ///
  • One thread (serverThread/Ivy) to accept incoming connexions on server socket ///
  • a thread for each IvyClient when the connexion has been done /// * ///
  • /// a domain of the form 10.0.0:1234, it is similar to the /// netmask without the trailing .255. This will determine the meeting point /// of the different applications. Right now, this is done with an UDP /// broadcast. Beware of routing problems ! You can also use a comma /// separated list of domains. /// * /// /// public void start(String domainbus) { if (domainbus == null) domainbus = Environment.GetEnvironmentVariable("IVYBUS"); if (domainbus == null) domainbus = DEFAULT_DOMAIN; try { app = new TcpListener(0); app.Start(); applicationPort = ((IPEndPoint) app.LocalEndpoint).Port; } catch (IOException e) { throw new IvyException("can't open TCP service socket " + e); } traceDebug("lib: " + libVersion + " protocol: " + PROCOCOLVERSION + " TCP service open on port " + applicationPort); watchers = new ArrayList(); Domain[] d = parseDomains(domainbus); // readies the rendezvous : an IvyWatcher (thread) per domain bus for (int index = 0; index < d.Length; index++) { IvyWatcher watcher = new IvyWatcher(this, d[index].Domainaddr, d[index].Port); watchers.Add(watcher); } serverThread = new Thread(new ThreadStart(this.Run)); serverThread.Start(); // sends the broadcasts and listen to incoming connexions for (int i = 0; i < watchers.Count; i++) { ((IvyWatcher) watchers[i]).start(); } } internal Domain[] parseDomains(String domainbus) { String[] st = domainbus.Split(','); Domain[] d = new Domain[st.Length]; for (int i = 0; i < st.Length; i++) { d[i] = new Domain(IvyWatcher.getDomain(st[i]), IvyWatcher.getPort(st[i])); } return d; } /// disconnects from the Ivy bus /// public void stop() { lock(this) { if (stopped) return ; stopped = true; traceDebug("beginning stopping the bus"); try { // stopping the serverThread Thread t = serverThread; serverThread = null; if (t != null) t.Interrupt(); // The serverThread might be stopped even before having been created if (app != null) app.Stop(); // stopping the IvyWatchers if ( watchers != null ) for (int i = 0; i < watchers.Count; i++) { ((IvyWatcher) watchers[i]).stop(); } // stopping the remaining IvyClients lock(clients.SyncRoot) { foreach (IvyClient c in clients.Values ) { c.close(true); // will notify the reading thread //removeClient(c); already donne in the thread } } } catch (IOException e) { traceDebug("IOexception Stop "); } traceDebug("the bus should have stopped so far"); } } /// Performs a pattern matching according to everyone's regexps, and sends /// the results to the relevant ivy agents. ///

    There is one thread for each client connected, we could also /// create another thread each time we send a message. ///

    /// A String which will be compared to the regular /// expressions of the different clients /// /// the number of messages actually sent /// /// public int sendMsg(String message) { int count = 0; // an alternate implementation would one sender thread per client // instead of one for all the clients. It might be a performance issue lock ( clients.SyncRoot ) { foreach (IvyClient client in clients.Values ) { count += client.sendMsg(message); } } return count; } /// Subscribes to a regular expression. /// * /// The callback will be executed with /// the saved parameters of the regexp as arguments when a message will sent /// by another agent. A program doesn't receive its own messages. ///

    Example: ///
    the Ivy agent A performs

    b.bindMsg("^Hello (*)",cb);
    ///
    the Ivy agent B performs
    b2.sendMsg("Hello world");
    ///
    a thread in A will uun the callback cb with its second argument set /// to a array of String, with one single element, "world" ///
    /// a perl regular expression, groups are done with parenthesis /// /// any objects implementing the IvyMessageListener /// interface, on the AWT/Swing framework /// /// the id of the regular expression /// /// public int bindMsg(String regexp, MessageHandler callback) { // creates a new binding (regexp,callback) Int32 key = serial++; lock (regexp_out) regexp_out.Add( key, regexp); lock (callback) callbacks.Add( key, callback); // notifies the other clients this new regexp lock ( clients.SyncRoot ) { foreach (IvyClient c in clients.Values ) { c.sendRegexp(key, regexp); } } return key; } /// unsubscribes a regular expression /// * /// /// the id of the regular expression, returned when it was bound /// /// public void unBindMsg(int id) { if ((regexp_out[id] == null) || (callbacks[id] == null)) { throw new IvyException("client wants to remove an unexistant regexp " + id); } lock( clients.SyncRoot ) { foreach (IvyClient c in clients.Values ) { c.delRegexp(id); } } } /// unsubscribes a regular expression /// * /// /// a boolean, true if the regexp existed, false otherwise or /// whenever an exception occured during unbinding /// /// the string for the regular expression /// /// public bool unBindMsg(String re) { foreach (Int32 k in regexp_out.Keys ) { if (((String) regexp_out[k]).CompareTo(re) == 0) { try { unBindMsg(k); } catch (IvyException ie) { return false; } return true; } } return false; } internal void FireDirectMessage (IvyClient app, int id, string arg ) { if ( directMessageReceived != null ) directMessageReceived( app, id, arg); } internal void FireClientConnected (IvyClient app) { if ( clientConnected != null ) clientConnected( app); } internal void FireClientDisconnected (IvyClient app) { if ( clientDisconnected != null ) clientDisconnected( app); } internal bool FireDie (IvyClient app, int id ) { if ( dieReceived != null ) return dieReceived( app, id); else return false; } /* * removes a client from the list */ internal void removeClient(IvyClient c) { lock( clients.SyncRoot ) { clients.Remove(c.ClientKey); } } /// gives the names of IvyClient(s) /// /// gives a list of IvyClient(s) with the name given in parameter /// * /// /// The name of the Ivy agent you're looking for /// /// public ArrayList getIvyClientsByName(String name) { ArrayList v = new ArrayList(); foreach (IvyClient ic in clients.Values ) { if (((ic.ApplicationName).CompareTo(name)) == 0) v.Add(ic); } return v; } /////////////////////////////////////////////////////////////////: // // Protected methods // /////////////////////////////////////////////////////////////////: internal void addClient(MyTcpClient socket) { lock(this) { if (stopped) return ; IvyClient client = new IvyClient(this,socket); lock ( clients.SyncRoot ) { clients.Add( client.ClientKey, client); } traceDebug(ClientNames); } } internal void callCallback(IvyClient client, Int32 key, String[] tab) { MessageHandler callback = (MessageHandler) callbacks[key]; if (callback == null) { throw new IvyException("(callCallback) Not regexp matching id " + key); } callback(client, tab); } private static String[] myTokenize(String s, String separator) { int index = 0, last = 0, length = s.Length; ArrayList v = new ArrayList(); if (length != 0) while (true) { index = s.IndexOf(separator, last); if (index == - 1) { v.Add(s.Substring(last, (length) - (last))); break; } else if (index < s.Length) { v.Add(s.Substring(last, (index) - (last))); last = index + 1; } else { break; } } String[] tab = new String[v.Count]; v.CopyTo(tab); return tab; } public static String getDomain(String domainbus) { if (domainbus == null) { domainbus = ConfigurationSettings.AppSettings["IVYBUS"]; } if (domainbus == null) { domainbus = Environment.GetEnvironmentVariable("IVYBUS"); } if (domainbus == null) domainbus = DEFAULT_DOMAIN; return domainbus; } /// checks the "validity" of a regular expression. /// internal bool CheckRegexp(String exp) { bool regexp_ok = true; if (exp.StartsWith("^") && messages_classes != null) { regexp_ok = false; for (int i = 0; i < messages_classes.Length; i++) { if (messages_classes[i].Equals(exp.Substring(1))) return true; } } return regexp_ok; } /* * prevents two clients from connecting to each other at the same time * there might still be a lingering bug here, that we could avoid with the * SchizoToken. */ internal bool checkConnected(IvyClient clnt) { if (clnt.AppPort == 0) return false; lock( clients.SyncRoot ) { foreach (IvyClient client in clients.Values ) { if (clnt != client && client.sameClient(clnt)) return true; } } return false; } /* * the service socket thread reader main loop */ private void Run() { traceDebug("Ivy service Thread started"); bool running = true; while (running) { try { Socket sock = app.AcceptSocket(); MyTcpClient socket = new MyTcpClient(sock); if (stopped) break; // early disconnexion addClient(socket); // the peer called me } catch (IOException e) { traceDebug("Error IvyServer exception: " + e.Message); Console.Out.WriteLine("Ivy server socket reader caught an exception " + e.Message); // e.printStackTrace(); } catch (SocketException e) { traceDebug("my server socket has been closed"); running = false; } } traceDebug("Ivy service Thread stopped"); } private void traceDebug(String s) { if (debug) Console.Out.WriteLine("-->ivy<-- " + s); } /* a small private method for debbugging purposes */ public String domains(String toparse) { String s = "broadcasting on "; Ivy.Domain[] d = parseDomains(toparse); for (int index = 0; index < d.Length; index++) { s += d[index].Domainaddr + ":" + d[index].Port + " "; } return s; } internal class Domain { public virtual String Domainaddr { get { return domainaddr; } } public virtual int Port { get { return port; } } private String domainaddr; private int port; public Domain( String domainaddr, int port) { this.domainaddr = domainaddr; this.port = port; } public override String ToString() { return domainaddr + ":" + port; } } /* * unitary test. Normally, running Ivy.Ivy should stop in 2.3 seconds :) */ [STAThread] public static void Main(String[] args) { Ivy bus = new Ivy("Test Unitaire", "TU ready"); try { bus.start(null); try { Thread.Sleep(new TimeSpan(10000 * 2000)); } catch (ThreadInterruptedException ie) { } bus.stop(); } catch (IvyException ie) { Console.Error.WriteLine( ie.Message ); Console.Error.WriteLine( ie.StackTrace ); } } } // class Ivy /* EOF */ }