/// François-Régis Colin /// http://www.tls.cena.fr/products/ivy/ /// * /// (C) CENA /// * namespace IvyBus { using System; using System.IO; using System.Collections; using System.Collections.Specialized; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Threading; using System.Configuration; using System.Windows.Forms; using System.Globalization; using System.Text.RegularExpressions; using System.Text; /// The Main bus Class /// public class Ivy { /* Event handler */ public delegate void DirectMessageHandler (IvyClient app, int id, string arg ); public delegate void ClientConnectedHandler (IvyClient app); public delegate void ClientDisconnectedHandler (IvyClient app); public delegate bool DieHandler (IvyClient app, int id, string arg ); public delegate void ClientAddBindingHandler (IvyClient app, string arg ); public delegate void ClientRemoveBindingHandler (IvyClient app, string arg ); public delegate void MessageHandler ( IvyClient app, string[] args ); /* Event */ /// fires when a new client connect to the bus /// A callback handling the notification of connexions and /// disconnections, may be null /// /// public event ClientConnectedHandler clientConnected; /// fires when a client discconnect from the bus public event ClientDisconnectedHandler clientDisconnected; /// fires when a client receive a direct message from another client public event DirectMessageHandler directMessageReceived; /// fires when somebody ask for killing every client on the bus public event DieHandler dieReceived; /// fires when a client receive a add binding from another client public event ClientAddBindingHandler addBinding; /// fires when a client receive a remove binding from another client public event ClientAddBindingHandler removeBinding; /// IvyClients accesses the list of the connected clients public List IvyClients { get { return clients; } } public bool Debug { get { return debug; } set { debug = value; } } /// ClientNames accesses the name list of the connected clients private string ClientNames { get { StringBuilder s = new StringBuilder(); s.Append(appName); s.Append( " clients are: " ); lock( clients ) { foreach (IvyClient client in clients ) { s.Append(client.ApplicationName); s.Append( " " ); } } return s.ToString(); } } /// AppName the application name public string AppName { get { return appName; } } public string AppId { get { return applicationUniqueId; } } public ushort AppPriority { set { applicationPriority = value; lock ( clients ) { foreach (IvyClient client in clients ) { client.stream.TokenApplicationId(applicationPriority, AppId); } } } get { return applicationPriority; } } ///SentMessageClasses the first word token of sent messages /// optimise the parsing process when sending messages /// public string[] SentMessageClasses { get { return sent_messageClasses; } set { sent_messageClasses = value; } } internal class MyTcpListener : TcpListener { public MyTcpListener(IPAddress address, int port) : base(address, port) { } public bool IsActive() { return this.Active; } } /// the name of the application on the bus internal string appName; /// the port for the UDP rendez vous, if none is supplied internal const ushort DEFAULT_PORT = 2010; // client default priority internal const ushort DEFAULT_PRIORITY = 100; /// the domain for the UDP rendez vous internal static readonly string DEFAULT_DOMAIN = "127.255.255.255:" + DEFAULT_PORT; internal const string libVersion = "2.0.0"; private bool debug; private static ushort serial = 0; /* an unique ID for each regexp */ private MyTcpListener app; private List watchers; private volatile Thread serverThread; // to ensure quick communication of the end internal enum BindingType { BindRegexp, BindSimple }; internal struct ApplicationBinding /* Self Applications bindings */ { internal BindingType type; internal ushort key; internal MessageHandler callback; internal string regexp; internal object[] args; } internal Dictionary bindings; private List clients; private string[] sent_messageClasses = null; private bool stopped = false; internal ushort applicationPort; /* Application port number */ internal IPAddress applicationHost; /* Application host number */ internal string applicationUniqueId; /* identifier Application unique timestamp-ipaddress-port */ internal ushort applicationPriority = DEFAULT_PRIORITY; internal string ready_message = null; // for synchronous event private Control syncControl = null; /// Readies the structures for the software bus connexion. /// /// This sample shows how to start working with Ivy. /// /// Ivy bus = new Ivy("Dummy agent","ready"); /// bus.bindMsg("(.*)",new Ivy.MessageHandler(myMessageListener)); /// bus.start(null); /// /// How to send & receive: /// the Ivy agent A performs b.bindMsg("^Hello (*)",cb); /// the Ivy agent B performs b2.sendMsg("Hello world"); /// a thread in A will run the callback cb with its second argument set /// to a array of string, with one single element, "world" /// /// /// the real work begin in the start() method /// /// /// The name of your Ivy agent on the software bus /// /// The hellow message you will send once ready /// public Ivy(string name, string message) { //clients = ArrayList.Synchronized( new ArrayList() ); //bindings = Hashtable.Synchronized( new Hashtable()); clients = new List(); bindings = new Dictionary(); appName = name; ready_message = message; debug = Properties.Settings.Default.IvyDebug; } public Ivy(string name, string message, Control sync) :this( name, message ) { syncControl = sync; } /// connects the Ivy bus to a domain or list of domains. /// /// /// One thread (IvyWatcher) for each traffic rendezvous (either UDP broadcast or TCP Multicast). /// One thread (serverThread/Ivy) to accept incoming connexions on server socket. /// a thread for each IvyClient when the connexion has been done. /// /// a domain of the form 10.0.0:1234, it is similar to the /// netmask without the trailing .255. This will determine the meeting point /// of the different applications. Right now, this is done with an UDP /// broadcast. Beware of routing problems ! You can also use a comma /// separated list of domains. /// * /// /// public void start(string domainbus) { if (domainbus == null) domainbus = Environment.GetEnvironmentVariable("IVYBUS"); if (domainbus == null) domainbus = DEFAULT_DOMAIN; try { IPAddress localaddr = Dns.Resolve(Dns.GetHostName()).AddressList[0]; app = new MyTcpListener(IPAddress.Any,0); app.Start(); applicationPort = (ushort)((IPEndPoint) app.LocalEndpoint).Port; applicationHost = localaddr; applicationUniqueId = string.Format("{0}-{1}-{2}", DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond, localaddr.ToString().Replace(".",""), applicationPort ); } catch (IOException e) { throw new IvyException("can't open TCP service socket " + e); } traceDebug("lib: " + libVersion + " protocol: " + Properties.Settings.Default.IvyProtocolVersion + " TCP service open on port " + applicationPort); watchers = new List(); Domain[] d = parseDomains(domainbus); // readies the rendezvous : an IvyWatcher (thread) per domain bus for (int index = 0; index < d.Length; index++) { IvyWatcher watcher = new IvyWatcher(this, d[index].Domainaddr, d[index].Port); watchers.Add(watcher); } serverThread = new Thread(new ThreadStart(this.Run)); serverThread.Name = "Ivy Tcp Server Thread"; serverThread.Start(); // Wait for readyness while ( serverThread.ThreadState != ThreadState.Running || !app.IsActive()) { traceDebug( " Ivy Threading start in progress..." ); Thread.Sleep( 100 ); } // sends the broadcasts and listen to incoming connexions for (int i = 0; i < watchers.Count; i++) { watchers[i].start(); } } internal void SortClients() { lock ( clients ) { //clients.Sort(new IvyClient.IvyClientPriority()); clients.Sort(); } } /* a small private method for debbugging purposes */ public string domains(string toparse) { StringBuilder s = new StringBuilder("broadcasting on "); Ivy.Domain[] d = parseDomains(toparse); for (int index = 0; index < d.Length; index++) { s.Append(d[index].Domainaddr); s.Append(":"); s.Append(d[index].Port); s.Append( " " ); } return s.ToString(); } internal Domain[] parseDomains(string domainbus) { string[] st = domainbus.Split(','); Domain[] d = new Domain[st.Length]; for (int i = 0; i < st.Length; i++) { d[i] = new Domain(IvyWatcher.getDomain(st[i]), IvyWatcher.getPort(st[i])); } return d; } /// disconnects from the Ivy bus /// public void stop() { lock(this) { if (stopped) return ; stopped = true; traceDebug("beginning stopping the bus"); try { // stopping the serverThread if (serverThread != null) { app.Stop(); serverThread.Join(); serverThread = null; } // The serverThread might be stopped even before having been created if (app != null) app.Stop(); // stopping the IvyWatchers if ( watchers != null ) for (int i = 0; i < watchers.Count; i++) { ((IvyWatcher) watchers[i]).stop(); } // stopping the remaining IvyClients // copy the values in temporary variable to eliminate Thread modifying collection if ( clients.Count != 0 ) { IvyClient[] copyClient; copyClient = new IvyClient[clients.Count]; lock( clients ) { clients.CopyTo( copyClient,0 ); } foreach (IvyClient client in copyClient ) { client.close(true); // will notify the reading thread //removeClient(client); already donne in the thread } } } catch (IOException e) { traceDebug("IOexception Stop "); } traceDebug("the bus should have stopped so far"); } } /// Send a message to someone on the bus /// /// /// Performs a pattern matching according to everyone's regexps, and sends /// the results to the relevant ivy agents. /// There is one thread for each client connected, we could also /// create another thread each time we send a message. /// /// A message which will be compared to the regular /// expressions of the different clients /// /// the number of messages actually sent /// /// public int sendMsg(string message) { int count = 0; // an alternate implementation would one sender thread per client // instead of one for all the clients. It might be a performance issue lock ( clients ) { foreach (IvyClient client in clients ) { count += client.sendMsg(message); } } return count; } /// Send a formated message to someone on the bus /// /// /// Performs a pattern matching according to everyone's regexps, and sends /// the results to the relevant ivy agents. /// There is one thread for each client connected, we could also /// create another thread each time we send a message. /// /// A string message format to build the message /// args used in message format /// /// the number of messages actually sent /// /// public int sendMsg(string format, params object[] args ) { return sendMsg( string.Format( format, args ) ); } /// Send a formated message to someone on the bus /// /// /// Performs a pattern matching according to everyone's regexps, and sends /// the results to the relevant ivy agents. /// There is one thread for each client connected, we could also /// create another thread each time we send a message. /// /// A string message format to build the message /// args used in message format /// /// the number of messages actually sent /// /// public int sendMsg(CultureInfo culture, string format, params object[] args ) { return sendMsg( string.Format( culture, format, args ) ); } /// Subscribes to a regular expression. /// /// /// The callback will be executed with /// the saved parameters of the regexp as arguments when a message will sent /// by another agent. A program doesn't receive its own messages. /// /// /// a regular expression, groups are done with parenthesis /// /// any objects implementing the Ivy.MessageListener /// /// the id of the regular expression /// /// // // for compatibility raison with old IVY public int bindMsg(string regexp, MessageHandler callback) { return bindMsg( regexp, callback, null); } public int bindMsg(string regexp, MessageHandler callback, params object[] args ) { // creates a new binding (regexp,callback) ApplicationBinding newbind; newbind.type = BindingType.BindRegexp; newbind.key = serial++; newbind.regexp = regexp; newbind.callback = callback; newbind.args = args; lock (bindings) bindings.Add( newbind.key, newbind); // notifies the other clients this new regexp lock ( clients ) { foreach (IvyClient c in clients ) { c.stream.TokenAddRegexp(newbind.key,newbind.regexp); } } return newbind.key; } /// unsubscribes a regular expression /// /// the id of the regular expression, returned when it was bound /// /// public void unBindMsg(ushort id) { if ( ! bindings.ContainsKey( id ) ) { throw new IvyException("client wants to remove an unexistant regexp " + id); } ApplicationBinding bind = (ApplicationBinding) bindings[id]; lock( clients ) { foreach (IvyClient c in clients ) { c.stream.TokenDelBinding(id); } } lock( bindings ) bindings.Remove( id ); } /// unsubscribes a regular expression /// /// a boolean, true if the regexp existed, false otherwise or /// whenever an exception occured during unbinding /// /// the string for the regular expression /// /// public bool unBindMsg(string re) { foreach (ApplicationBinding bind in bindings.Values ) { if ( bind.regexp == re ) { try { unBindMsg(bind.key); } catch (IvyException ie) { return false; } return true; } } return false; } /// Subscribes to a simple expression ( msg ar1 arg2 arg3 etc). /// /// /// The callback will be executed with /// the saved parameters of the regexp as arguments when a message will sent /// by another agent. A program doesn't receive its own messages. /// /// /// a regular expression, groups are done with parenthesis /// /// any objects implementing the Ivy.MessageListener /// /// the id of the regular expression /// /// public int bindSimpleMsg(string expression, MessageHandler callback, params object[] args ) { // creates a new binding (regexp,callback) ApplicationBinding newbind; newbind.type = BindingType.BindSimple; newbind.key = serial++; newbind.regexp = expression; newbind.callback = callback; newbind.args = args; lock (bindings) bindings.Add( newbind.key, newbind); // notifies the other clients this new regexp lock ( clients ) { foreach (IvyClient c in clients ) { c.stream.TokenAddBinding(newbind.key,newbind.regexp); } } return newbind.key; } public int Die( string target, string message ) { List v = getIvyClientsByName(target); for (int i = 0; i < v.Count; i++) v[i].stream.TokenDie(0,message); return v.Count; } public int Ping( string target, string message) { List v = getIvyClientsByName(target); for (int i = 0; i < v.Count; i++) v[i].stream.TokenPing(message); return v.Count; } internal void FireDirectMessage(IvyClient app, int id, string arg) { if ( directMessageReceived != null ) { if ( syncControl != null ) syncControl.Invoke( directMessageReceived, new object[]{app,id,arg}); else directMessageReceived( app, id, arg); } } internal void FireClientConnected (IvyClient app) { if ( clientConnected != null ) { if ( syncControl != null ) syncControl.Invoke( clientConnected, new object[]{app}); else clientConnected( app); } } internal void FireClientDisconnected (IvyClient app) { if ( clientDisconnected != null ) { if ( syncControl != null ) syncControl.Invoke( clientDisconnected, new object[]{app}); else clientDisconnected( app); } } internal void FireClientAddBinding (IvyClient app, string regexp) { if ( addBinding != null ) { if ( syncControl != null ) syncControl.Invoke( addBinding, new object[]{app,regexp}); else addBinding( app,regexp); } } internal void FireClientRemoveBinding (IvyClient app, string regexp) { if ( removeBinding != null ) { if ( syncControl != null ) syncControl.Invoke( removeBinding, new object[]{app,regexp}); else removeBinding( app,regexp); } } internal bool FireDie (IvyClient app, int id, string arg ) { if ( dieReceived != null ) { if ( syncControl != null ) syncControl.Invoke( dieReceived, new object[]{app,id,arg}); else return dieReceived( app, id, arg); } return false; } /* * removes a client from the list */ internal void removeClient(IvyClient c) { lock( clients ) { clients.Remove(c); } } /// gives a list of IvyClient(s) with the name given in parameter /// /// The name of the Ivy agent you're looking for /// public List getIvyClientsByName(string name) { List v = new List(); foreach (IvyClient ic in clients ) { if (ic.ApplicationName.CompareTo(name) == 0) v.Add(ic); } return v; } /////////////////////////////////////////////////////////////////: // // Protected methods // /////////////////////////////////////////////////////////////////: internal void addClient(Socket socket,string appname) { if (stopped) return ; IvyClient client = new IvyClient(this,socket,appname); lock ( clients ) { clients.Add(client); } traceDebug(ClientNames); } internal void FireCallback(IvyClient client, int key, string[] arg) { ApplicationBinding bind = bindings[key]; if (bind.callback == null) { throw new IvyException("(callCallback) Not regexp matching id " + key); } if ( syncControl != null ) syncControl.Invoke(bind.callback, new object[] { client, arg }); else bind.callback(client, arg); } public static string getDomain(string domainbus) { if (domainbus == null) { domainbus = Environment.GetEnvironmentVariable("IVYBUS"); } if (domainbus == null) { domainbus = Properties.Settings.Default.IvyBus; } if (domainbus == null) domainbus = DEFAULT_DOMAIN; return domainbus; } /// checks the "validity" of a regular expression. //TODO put in IvyBinding internal bool CheckRegexp(string exp) { bool regexp_ok = true; // TODO Bug // ClockStop ClockStart & ^Clock(Start|Pause) // should Stop to the first parent if ((sent_messageClasses != null) && exp.StartsWith("^") ) { regexp_ok = false; // extract first word from regexp... string token = Regex.Replace(exp, @"^\^(?[a-zA-Z_0-9-]+).*" , @"${token}"); foreach (string exp_class in sent_messageClasses) { if ( exp_class.StartsWith(token) ) return true; } } return regexp_ok; } /* * prevents two clients from connecting to each other at the same time * there might still be a lingering bug here, that we could avoid with the * SchizoToken. */ internal bool checkConnected(IvyClient clnt) { if (clnt.AppPort == 0) return false; lock( clients ) { foreach (IvyClient client in clients ) { if (clnt != client && client.sameClient(clnt)) return true; } } return false; } /* * the service socket thread reader main loop */ private void Run() { traceDebug("Ivy service Thread started"); bool running = true; while (running) { try { Socket socket = app.AcceptSocket(); if (stopped) break; // early disconnexion addClient(socket,"Unkown(waiting for name reception)"); // the peer called me } catch (IOException e) { traceDebug("Error IvyServer exception: " + e.Message); Console.Error.WriteLine("Ivy server socket reader caught an exception " + e.Message); // e.printStackTrace(); } catch (SocketException e) { traceDebug("my server socket has been closed"); running = false; } } traceDebug("Ivy service Thread stopped"); } private void traceDebug(string s) { if (debug) Console.WriteLine("-->ivy<-- " + s); } internal class Domain { public virtual string Domainaddr { get { return domainaddr; } } public virtual int Port { get { return port; } } private string domainaddr; private int port; public Domain( string domainaddr, int port) { this.domainaddr = domainaddr; this.port = port; } public override string ToString() { return domainaddr + ":" + port; } } } }