/// François-Régis Colin
/// http://www.tls.cena.fr/products/ivy/
/// *
/// (C) CENA
/// *
namespace IvyBus
{
using System;
using System.Threading;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text.RegularExpressions;
using System.Configuration;
using System.Text;
using System.Diagnostics;
/// IvyWatcher, A private Class for the Ivy rendezvous
///
/// right now, the rendez vous is either an UDP socket or a TCP multicast.
/// The watcher will answer to
/// each peer advertising its arrival on the bus. The intrinsics of Unix are so
/// that the broadcast is done using the same socket, which is not a good
/// thing.
///
internal class IvyWatcher
{
private Ivy bus; /* master bus controler */
private int port;
private volatile Thread listenThread;
private IPAddress group;
private IvyUDPStream stream;
/// creates an Ivy watcher
///
/// the bus
///
/// the domain
///
/// the port number
///
internal IvyWatcher(Ivy bus, String domainaddr, int port)
{
this.bus = bus;
this.port = port;
listenThread = new Thread(new ThreadStart(this.Run));
listenThread.Name = "Ivy UDP Listener Thread";
try
{
group = IPAddress.Parse(domainaddr);
/* supervision socket */
// To do reuseaddr we must use a Socket not a udp client
Socket broadcast = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
IPEndPoint EPhost = new IPEndPoint(IPAddress.Any, port);
broadcast.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast,1);
broadcast.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress,1);
broadcast.Bind(EPhost);
//test isMulticastAddress // TODO better check
//if ( group.IsIPv6Multicast ) yes but in IPV4 how to do
byte[] addr = group.GetAddressBytes();
if ((addr[0] & 0xf0) == 0xe0)
{
broadcast.SetSocketOption(SocketOptionLevel.Udp, SocketOptionName.AddMembership, new MulticastOption( group ));
}
// TODO support the Two protocol
if (bus.protocolVersion == 4)
stream = new IvyUDPStreamV4(broadcast);
else
stream = new IvyUDPStreamV3(broadcast);
}
catch (IOException e)
{
throw new IvyException("IvyWatcher I/O error" + e);
}
}
/// the behaviour of each thread watching the UDP socket.
///
public void Run()
{
Ivy.traceProtocol("IvyWatcher", "beginning of a watcher Thread");
try
{
bool running = true;
while (running)
{
ushort version;
ushort appPort;
string appId;
string appName;
IPEndPoint remoteEP;
stream.receiveMsg(out remoteEP, out version, out appPort, out appId, out appName);
IPAddress remotehost = remoteEP.Address;
Ivy.traceProtocol("IvyWatcher", "Receive Broadcast from " + Dns.GetHostEntry(remotehost).HostName + ":" + remoteEP.Port);
//TODO if ( !isInDomain( remotehost ) ) continue;
if (version != stream.ProtocolVersion)
{
Ivy.traceError("IvyWatcher","Ignoring bad protocol version " + version + " expected " + stream.ProtocolVersion);
continue;
}
// filtrage des self Broadcast
if (appId == bus.AppId)
continue;
if ((appPort == bus.applicationPort) && (remotehost.Equals(bus.applicationHost)))
continue;
Ivy.traceProtocol("IvyWatcher", "reponse au Broadcast de " + Dns.GetHostEntry(remotehost).HostName + ":" + remoteEP.Port + " port " + appPort +
" version " + version +
" id " + appId +
" name " + appName);
try
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint hostEndPoint = new IPEndPoint(remoteEP.Address, appPort);
socket.Blocking = true;
socket.Connect(hostEndPoint);
bus.addClient(socket, appName);
}
catch (Exception e)
{
Ivy.traceError("IvyWatcher","can't connect to " + remotehost + " port " + appPort + " \n" + e.Message);
}
} // while
}
catch (SocketException se)
{
Ivy.traceError("IvyWatcher","watcher socket closed: " + se.Message);
}
catch (IOException ioe)
{
Ivy.traceError("IvyWatcher","watcher thread ended: " + ioe.Message);
}
Ivy.traceProtocol("IvyWatcher", "end of a watcher thread");
}
/// stops the thread waiting on the broadcast socket
///
internal virtual void stop()
{
lock (stream)
{
Ivy.traceProtocol("IvyWatcher", "begining stopping an IvyWatcher");
stream.Close();
if (listenThread != null)
{
// Wait for Thread to end.
bool term = listenThread.Join(10000);
if (!term && (listenThread != null)) listenThread.Abort();
listenThread = null;
}
// it might not even have been created
Ivy.traceProtocol("IvyWatcher", "ending stopping an IvyWatcher");
}
}
internal virtual void start()
{
lock (stream)
{
listenThread.Start();
IPEndPoint EPhost = new IPEndPoint(group, port);
stream.sendMsg(EPhost, bus.applicationPort, bus.AppId, bus.AppName);// notifies our arrival on each domain: protocol version + port
}
}
}
}