/**
* IvyWatcher, A private Class for the Ivy rendezvous
*
* @author Yannick Jestin
* @author Francois-Regis Colin
* @author http://www.tls.cena.fr/products/ivy/
*
* (C) CENA
*
* right now, the rendez vous is either an UDP socket or a TCP multicast.
* The watcher will answer to
* each peer advertising its arrival on the bus. The intrinsics of Unix are so
* that the broadcast is done using the same socket, which is not a good
* thing.
*
* CHANGELOG:
* 1.2.16
* - now uses the synchronized wrappers of the Java API for all collections
* - move out the Domain related-code to the Domain class
* 1.2.15
* - allows the fine tuning of the IvyClient socket buffersize using
* IVY_BUFFERSIZE property
* 1.2.14
* - tries to fix a lock on accept() by becoming a Thread instead of
* runnalbe (see tests/test2)
* - removed unread field (domainaddr, e.g.)
* - throws RuntimeException instead of System.exit(), allows code reuse
* - switch from gnu regexp (deprecated) to the built in java regexp
* - add generic types to declarations
* 1.2.13:
* - TCP_NO_DELAY to disable Nagle's algorithm
* - private static ?! pour already present ...
* 1.2.9:
* - added an application Id in the UDP broadcast. It seems to be ok with
* most implementations ( VERSION PORT APPID APPNAME \n) is compatible with (VERSION
* APPID). If I receive a broadcast with with the same TCP port number,
* I ignore the first and accept the new ones
* 1.2.8:
* - alreadyBroadcasted was static, thus Ivy Agents within the same JVM used
* to share the list of agents already connected. A nasty bug.
* 1.2.7:
* - better handling of multiple connexions from the same remote agent when
* there are different broadcast addresses ( introduced the alreadyBroadcasted
* function )
* 1.2.6:
* - IOException now goes silent when we asked the bus to stop()
* - use a new buffer for each Datagram received, to prevent an old bug
* 1.2.5:
* - getDomain now sends IvyException for malformed broadcast addresses
* - uses apache jakarta-regexp instead of gnu-regexp
* - throws an IvyException if the broadcast domain cannot be resolved
* 1.2.4:
* - sends the broadcast before listening to the other's broadcasts.
* I can't wait for all the broadcast to be sent before starting the listen
* mode, otherwise another agent behaving likewise could be started
* meanwhile, and one would not "see" each other.
* - (REMOVED) allows the connexion from a remote host with the same port number
* it's too complicated to know if the packet is from ourselves...
* - deals with the protocol errors in a more efficient way. The goal is not
* to loose our connectivity because of a rude agent.
* fixes Bug J005 (YJ + JPI)
* 1.2.3:
* - the packet sending is done in its own thread from now on (PacketSender)
* I don't care stopping it, since it can't be blocked.
* - checks whether I have been interrupted just after the receive (start()
* then stop() immediately).
* 1.2.1:
* - can be Interrupted during the broadcast Send. I catch the
* and do nothing with it. InterruptedIOException
* - changed the fill character from 0 to 10, in order to prevent a nasty bug
* on Windows XP machines
* - fixed a NullPointerException while trying to stop a Thread before having
* created it.
* 1.0.12:
* - setSoTimeout on socket
* - the broadcast reader Thread goes volatile
* 1.0.10:
* - isInDomain() is wrong in multicast. I've removed it
* - there was a remanence effect in the datagrampacket buffer. I clean it up after each message
* - cleaned up the getDomain() and getPort() code
* - close message sends an interruption on all threads for a clean exit
* - removed the timeout bug eating all the CPU resources
* - now handles a Vector of broadcast listeners
*/
package fr.dgac.ivy ;
import java.lang.Thread;
import java.net.*;
import java.io.*;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.*;
class IvyWatcher implements Runnable {
private static boolean debug = (System.getProperty("IVY_DEBUG")!=null);
private boolean alreadyIgnored = false;
private Ivy bus; /* master bus controler */
private DatagramSocket broadcast; /* supervision socket */
private int port;
private String domainaddr;
private volatile boolean keeprunning = false;
private Thread listenThread = null;
private InetAddress group;
// FIXME should not be static ? (findbugs)
private static int serial=0;
private int myserial=serial++;
private String busWatcherId = null;
private static Pattern recoucou;
/**
* creates an Ivy watcher
* @param bus the bus
* @param net the domain
*/
IvyWatcher(Ivy bus,String domainaddr,int port) throws IvyException {
this.bus = bus;
this.port = port;
this.domainaddr = domainaddr;
busWatcherId=bus.getWatcherId();
keeprunning = true ;
// create the MulticastSocket
try {
group = InetAddress.getByName(domainaddr);
broadcast = new MulticastSocket(port);
if (group.isMulticastAddress()) ((MulticastSocket)broadcast).joinGroup(group);
broadcast.setSoTimeout(Ivy.TIMEOUTLENGTH);
} catch ( UnknownHostException uhe ) {
} catch ( IOException e ) {
throw new IvyException("IvyWatcher I/O error" + e );
}
}
/**
* the behaviour of each thread watching the UDP socket.
*/
public void run() {
traceDebug("Thread started"); // THREADDEBUG
listenThread=Thread.currentThread();
listenThread.setName("Ivy Watcher thread for "+domainaddr+":"+port);
// listenThread.setDaemon(true); // not possible in the treadpool ? FIXME
traceDebug("beginning of a watcher Thread");
try {
while( keeprunning ) {
try {
byte buf[] = new byte[256];
DatagramPacket packet=new DatagramPacket(buf,buf.length);
broadcast.receive(packet);
bus.pushThread("UDP packet received");
if ( !keeprunning ) break; // I was summoned to leave during the receive
String msg = new String(buf,0,packet.getLength());
boolean b = (parsePacket(packet, msg));
bus.popThread("UDP packet processed");
if (b) continue; else break;
} catch (InterruptedIOException jii ){
// System.out.println("WTF UDP packet interrupted");
// another thread took place, not important
if ( !keeprunning ) { break ;}
}
} // while
} catch (java.net.SocketException se ){
if ( keeprunning ) {
traceDebug("socket exception, continuing anyway on other Ivy domains "+se);
}
} catch (IOException ioe ){
System.out.println("IO Exception, continuing anyway on other Ivy domains "+ioe);
}
traceDebug("Thread stopped"); // THREADDEBUG
}
/**
* parses the content of a received packet.
* @return true if the watcher can keep looping (continue), false if there's
* a big problem (break).
*
* first checks if the message structure is correct
* then checks if it's our own broadcasts
* then checks if there's already a concurrent connexion in progress
* if it's ok, creates a new IvyClient
*
*/
private boolean parsePacket(DatagramPacket packet, String msg) {
// System.out.println("TODO Y - parse In");
int remotePort=0;
InetAddress remotehost = null;
String remotehostname = null;
try {
remotehost=packet.getAddress();
remotehostname=remotehost.getHostName();
Matcher m = recoucou.matcher(msg);
// is it a correct broadcast packet ?
if (!m.matches()) {
System.err.println("Ignoring bad format broadcast from "+ remotehostname+":"+packet.getPort());
return true;
}
// is it the correct protocol version ?
int version = Integer.parseInt(m.group(1));
if ( version < Protocol.PROTOCOLMINIMUM ) {
System.err.println("Ignoring bad format broadcast from "+
remotehostname+":"+packet.getPort()
+" protocol version "+remotehost+" we need "+Protocol.PROTOCOLMINIMUM+" minimum");
return true;
}
// is it my own broadcast ?
remotePort = Integer.parseInt(m.group(2));
if (bus.getAppPort()==remotePort) { // if (same port number)
if (busWatcherId!=null) {
traceDebug("there's an appId: "+m.group(3));
String otherId=m.group(3);
String otherName=m.group(4);
if (busWatcherId.compareTo(otherId)==0) {
// same port #, same bus Id, It's me, I'm outta here
traceDebug("ignoring my own broadcast");
return true;
} else {
// same port #, different bus Id, it's another agent
// implementing the Oh Soooo Cool watcherId undocumented
// unprotocolar Ivy add on
traceDebug("accepting a broadcast from a same port by "+otherName);
}
} else {
// there's no watcherId in the broacast. I fall back to a
// crude strategy: I ignore the first broadcast with the same
// port number, and accept the following ones
if (alreadyIgnored) {
traceDebug("received another broadcast from "+ remotehostname+":"+packet.getPort()
+" on my port number ("+remotePort+"), it's probably someone else");
} else {
alreadyIgnored=true;
traceDebug("ignoring a broadcast from "+ remotehostname+":"+packet.getPort()
+" on my port number ("+remotePort+"), it's probably me");
return true;
}
}
} // end if (same port #)
// it's definitively not me, let's shake hands !
traceDebug("broadcast accepted from " +remotehostname
+":"+packet.getPort()+", port:"+remotePort+", protocol version:"+version);
if (!alreadyBroadcasted(remotehost.toString(),remotePort)) {
traceDebug("no known agent originating from " + remotehost + ":" + remotePort);
try {
Socket s = new Socket(remotehost,remotePort);
s.setReceiveBufferSize(bus.getBufferSize());
s.setTcpNoDelay(true);
if (!bus.createIvyClient(s,remotePort,false)) return false ;
} catch ( java.net.ConnectException jnc ) {
traceDebug("cannot connect to "+remotehostname+":"+remotePort+", he probably stopped his bus");
}
} else {
traceDebug("there is already a request originating from " + remotehost + ":" + remotePort);
}
} catch (NumberFormatException nfe) {
System.err.println("Ignoring bad format broadcast from "+remotehostname);
return true;
} catch ( UnknownHostException e ) {
System.err.println("Unkonwn host "+remotehost +","+e.getMessage());
} catch ( IOException e) {
System.err.println("can't connect to "+remotehost+" port "+ remotePort+e.getMessage());
e.printStackTrace();
}
return true;
}
/**
* stops the thread waiting on the broadcast socket
*/
synchronized void doStop() {
traceDebug("begining stopping");
keeprunning = false;
if (listenThread != null) listenThread.interrupt();
broadcast.close();
traceDebug("stopped");
}
private class PacketSender implements Runnable {
// do I need multiple packetsenders ? Well, there is one PacketSender per
// domain.
DatagramPacket packet;
String data;
Ivy bus;
public PacketSender(String data, Ivy b) {
this.data=data;
bus = b;
packet=new DatagramPacket(data.getBytes(),data.length(),group,port);
bus.getPool().execute( PacketSender.this );
}
public void run() {
bus.pushThread("packet sender started");
traceDebug("PacketSender thread started"); // THREADDEBUG
Thread.currentThread().setName("Ivy Packet sender");
try {
broadcast.send(packet);
} catch (InterruptedIOException e) {
// somebody interrupts my IO. Thread, do nothing.
System.out.println(e.bytesTransferred+" bytes transferred anyway, out of " + data.length());
e.printStackTrace();
traceDebug("IO interrupted during the broadcast. Do nothing");
} catch ( IOException e ) {
System.out.println("Broadcast Error " + e.getMessage()+" continuing anyway");
// cannot throw new IvyException in a run ...
e.printStackTrace();
}
try { Thread.sleep(100); } catch (InterruptedException ie ){ }
traceDebug("PacketSender thread stopped"); // THREADDEBUG
bus.popThread("packet sender finished"); // one of the senders has finished its work, plus extra time
}
}
synchronized void doStart() throws IvyException {
// String hello = Ivy.PROTOCOLVERSION + " " + bus.getAppPort() + "\n";
String hello = Protocol.PROTOCOLVERSION + " " + bus.getAppPort() + " "+busWatcherId+" "+bus.getSelfIvyClient().getApplicationName()+"\n";
if (broadcast==null) throw new IvyException("IvyWatcher PacketSender null broadcast address");
bus.getPool().execute(this);
new PacketSender(hello,bus); // notifies our arrival on each domain: protocol version + port
}
/*
* since 1.2.7 pre ....
* went local instead of static ! fixed a nasty bug in 1.2.8
* checks if there is already a broadcast received from the same address
* on the same port
*
*/
private Map alreadySocks=Collections.synchronizedMap(new HashMap());
private boolean alreadyBroadcasted(String s,int port) {
// System.out.println("DEBUUUUUUUG " + s+ ":" + port);
if (s==null) return false;
synchronized (alreadySocks) {
Integer i = alreadySocks.get(s);
if (((i!=null)&&(i.compareTo(port))==0)) return true;
alreadySocks.put(s,port);
return false;
}
}
private void traceDebug(String s){
if (debug) System.out.println("-->IvyWatcher["+myserial+","+bus.getSerial()+"]<-- "+s);
}
static {
try {
recoucou = Pattern.compile("([0-9]+) ([0-9]+) ([^ ]*) (.*)",Pattern.DOTALL);
} catch (PatternSyntaxException res) {
res.printStackTrace();
System.out.println("Regular Expression bug in Ivy source code ... bailing out");
throw new RuntimeException();
}
}
} // class IvyWatcher