/// François-Régis Colin
/// http://www.tls.cena.fr/products/ivy/
/// *
/// (C) CENA
/// *
namespace IvyBus
{
using System;
using System.Threading;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text.RegularExpressions;
using System.Configuration;
using System.Text;
using System.Diagnostics;
/// IvyWatcher, A private Class for the Ivy rendezvous
///
/// right now, the rendez vous is either an UDP socket or a TCP multicast.
/// The watcher will answer to
/// each peer advertising its arrival on the bus. The intrinsics of Unix are so
/// that the broadcast is done using the same socket, which is not a good
/// thing.
///
internal class IvyWatcher
{
private Ivy bus; /* master bus controler */
private String domainaddr;
private int port;
private volatile Thread listenThread;
private IPAddress group;
private IvyUDPStream stream;
/// creates an Ivy watcher
///
/// the bus
///
/// the domain
///
/// the port number
///
internal IvyWatcher(Ivy bus, String domainaddr, int port)
{
this.bus = bus;
this.domainaddr = domainaddr;
this.port = port;
listenThread = new Thread(new ThreadStart(this.Run));
listenThread.Name = "Ivy UDP Listener Thread";
try
{
group = IPAddress.Parse(domainaddr);
/* supervision socket */
// To do reuseaddr we must use a Socket not a udp client
Socket broadcast = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
IPEndPoint EPhost = new IPEndPoint(IPAddress.Any, port);
broadcast.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast,1);
broadcast.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress,1);
broadcast.Bind(EPhost);
//test isMulticastAddress // TODO better check
//if ( group.IsIPv6Multicast ) yes but in IPV4 how to do
byte[] addr = group.GetAddressBytes();
if ((addr[0] & 0xf0) == 0xe0)
{
broadcast.SetSocketOption(SocketOptionLevel.Udp, SocketOptionName.AddMembership, new MulticastOption( group ));
}
// TODO support the Two protocol
if (bus.protocolVersion == 4)
stream = new IvyUDPStreamV4(broadcast);
else
stream = new IvyUDPStreamV3(broadcast);
}
catch (IOException e)
{
throw new IvyException("IvyWatcher I/O error" + e);
}
}
/// the behaviour of each thread watching the UDP socket.
///
public void Run()
{
traceDebug("beginning of a watcher Thread");
try
{
bool running = true;
while (running)
{
try
{
ushort version;
ushort port;
string appId;
string appName;
IPEndPoint remoteEP;
stream.receiveMsg(out remoteEP, out version,out port,out appId,out appName);
IPAddress remotehost = remoteEP.Address;
traceDebug("BUSWATCHER Receive Broadcast from " + Dns.GetHostEntry(remotehost).HostName + ":" + remoteEP.Port);
//TODO if ( !isInDomain( remotehost ) ) continue;
if (version != stream.ProtocolVersion)
{
traceDebug("Ignoring bad protocol version "+ version + " expected " + stream.ProtocolVersion);
continue;
}
// filtrage des self Broadcast
if ( appId == bus.AppId )
continue;
if ( (port == bus.applicationPort) && (remotehost.Equals(bus.applicationHost)) )
continue;
traceDebug("BUSWATCHER reponse au Broadcast de " + Dns.GetHostEntry(remotehost).HostName + ":" + remoteEP.Port + " port " + port +
" version " + version +
" id " + appId +
" name " + appName);
try
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint hostEndPoint = new IPEndPoint(remoteEP.Address, port);
socket.Connect(hostEndPoint);
bus.addClient(socket,appName);
}
catch (Exception e)
{
traceDebug("can't connect to " + remotehost + " port " + port + " \n" + e.Message);
}
}
catch (IOException jii)
{
traceDebug("UDP listener " + jii.Message);
running = false;
}
// catch (SocketException se)
// {
// traceDebug( "watcher socket closed: " + se.Message);
// }
} // while
}
catch (SocketException se)
{
traceDebug("watcher socket closed: " + se.Message);
}
catch (IOException ioe)
{
traceDebug("watcher thread ended: " + ioe.Message);
}
traceDebug("end of a watcher thread");
}
/// stops the thread waiting on the broadcast socket
///
internal virtual void stop()
{
lock(this)
{
traceDebug("begining stopping an IvyWatcher");
stream.Close();
if (listenThread != null)
{
listenThread.Join();
listenThread = null;
}
// it might not even have been created
traceDebug("ending stopping an IvyWatcher");
}
}
internal virtual void start()
{
lock(this)
{
listenThread.Start();
IPEndPoint EPhost = new IPEndPoint(group, port);
stream.sendMsg(EPhost, bus.applicationPort, bus.AppId, bus.AppName);// notifies our arrival on each domain: protocol version + port
}
}
internal static String getDomain(String net)
{
int sep_index = net.LastIndexOf(":");
if (sep_index != - 1)
{
net = net.Substring(0, (sep_index) - (0));
}
try
{
net += ".255.255.255";
Regex exp = new Regex("^(\\d+\\.\\d+\\.\\d+\\.\\d+).*");
net = exp.Replace(net, "$1");
}
catch (ArgumentException e)
{
traceDebug("Bad broascat addr " + net + "error " + e.Message);
return null;
}
return net;
}
internal static int getPort(String net)
{
int sep_index = net.LastIndexOf(":");
int port = (sep_index == - 1)?Ivy.DEFAULT_PORT:Int32.Parse(net.Substring(sep_index + 1));
return port;
}
[Conditional("DEBUG")]
private static void traceDebug(string s)
{
Trace.Assert(!Ivy.VerboseDebug, "-->ivywatcher<-- " + s);
}
}
// class IvyWatcher
/* EOF */
}