/// François-Régis Colin /// http://www.tls.cena.fr/products/ivy/ /// * /// (C) CENA /// * namespace IvyBus { using System; using System.Threading; using System.IO; using System.Net; using System.Net.Sockets; using System.Text.RegularExpressions; using System.Configuration; using System.Text; /// IvyWatcher, A private Class for the Ivy rendezvous /// /// right now, the rendez vous is either an UDP socket or a TCP multicast. /// The watcher will answer to /// each peer advertising its arrival on the bus. The intrinsics of Unix are so /// that the broadcast is done using the same socket, which is not a good /// thing. /// internal class IvyWatcher { private Ivy bus; /* master bus controler */ private String domainaddr; private int port; private volatile Thread listenThread; private IPAddress group; private IvyUDPStream stream; /// creates an Ivy watcher /// /// the bus /// /// the domain /// /// the port number /// internal IvyWatcher(Ivy bus, String domainaddr, int port) { this.bus = bus; this.domainaddr = domainaddr; this.port = port; listenThread = new Thread(new ThreadStart(this.Run)); listenThread.Name = "Ivy UDP Listener Thread"; try { group = IPAddress.Parse(domainaddr); /* supervision socket */ // To do reuseaddr we must use a Socket not a udp client Socket broadcast = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); IPEndPoint EPhost = new IPEndPoint(IPAddress.Any, port); broadcast.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast,1); broadcast.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress,1); broadcast.Bind(EPhost); //test isMulticastAddress // TODO better check //if ( group.IsIPv6Multicast ) yes but in IPV4 how to do if ((group.Address & 0xf0000000) == 0xe0000000) { broadcast.SetSocketOption(SocketOptionLevel.Udp, SocketOptionName.AddMembership, new MulticastOption( group )); } stream = new IvyUDPStream(broadcast); } catch (IOException e) { throw new IvyException("IvyWatcher I/O error" + e); } } /// the behaviour of each thread watching the UDP socket. /// public void Run() { traceDebug("beginning of a watcher Thread"); try { bool running = true; while (running) { try { int version; int port; string appId; string appName; IPEndPoint remoteEP; stream.receiveMsg(out remoteEP, out version,out port,out appId,out appName); IPAddress remotehost = remoteEP.Address; traceDebug("BUSWATCHER Receive Broadcast from " + Dns.GetHostByAddress(remotehost).HostName+ ":" + remoteEP.Port); //TODO if ( !isInDomain( remotehost ) ) continue; if (version != IvyUDPStream.PROCOCOLVERSION) { Console.Error.WriteLine("Ignoring bad protocol version {0} expected {1}", version, IvyUDPStream.PROCOCOLVERSION); continue; } if ( appId == bus.AppId ) // self Broadcast continue; traceDebug("BUSWATCHER Broadcast de " + Dns.GetHostByAddress(remotehost).HostName + ":" + remoteEP.Port + " port " + port + " version " + version + " id " + appId + " name " + appName); try { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); IPEndPoint hostEndPoint = new IPEndPoint(remoteEP.Address, port); socket.Connect(hostEndPoint); bus.addClient(socket,appName); } catch (Exception e) { Console.Error.WriteLine("can't connect to " + remotehost + " port " + port + " \n"+ e.Message); } } catch (IOException jii) { Console.Error.WriteLine("UDP listener " + jii.Message); running = false; } // catch (SocketException se) // { // traceDebug( "watcher socket closed: " + se.Message); // } } // while } catch (SocketException se) { traceDebug( "watcher socket closed: " + se.Message); } catch (IOException ioe) { traceDebug( "watcher thread ended: " + ioe.Message); } traceDebug("end of a watcher thread"); } /// stops the thread waiting on the broadcast socket /// internal virtual void stop() { lock(this) { traceDebug("begining stopping an IvyWatcher"); stream.Close(); if (listenThread != null) { listenThread.Join(); listenThread = null; } // it might not even have been created traceDebug("ending stopping an IvyWatcher"); } } internal virtual void start() { lock(this) { listenThread.Start(); IPEndPoint EPhost = new IPEndPoint(group, port); stream.sendMsg(EPhost, bus.applicationPort, bus.AppId, bus.AppName);// notifies our arrival on each domain: protocol version + port } } internal static String getDomain(String net) { int sep_index = net.LastIndexOf(":"); if (sep_index != - 1) { net = net.Substring(0, (sep_index) - (0)); } try { net += ".255.255.255"; Regex exp = new Regex("^(\\d+\\.\\d+\\.\\d+\\.\\d+).*"); net = exp.Replace(net, "$1"); } catch (ArgumentException e) { Console.Out.WriteLine("Bad broascat addr " + net); return null; } // out.println("net: "+net); return net; } internal static int getPort(String net) { int sep_index = net.LastIndexOf(":"); int port = (sep_index == - 1)?Ivy.DEFAULT_PORT:Int32.Parse(net.Substring(sep_index + 1)); // out.println("net: ["+net+"]\nsep_index: "+sep_index+"\nport: "+port); return port; } private void traceDebug(String s) { if (bus.Debug) Console.Out.WriteLine("-->ivywatcher<-- " + s); } } // class IvyWatcher /* EOF */ }