d = Domain.parseDomains(db);
if (d.size() == 0) throw new IvyException("no domain found in " + db);
watcherId = getWBUId().replace(' ' , '*'); // no space in the watcherId
// readies the rendezvous : an IvyWatcher (thread) per domain bus
for (Domain dom: d) watchers.add(new IvyWatcher(this , dom.getDomainaddr() , dom.getPort()));
keeprunning = true ;
pool.execute(this);
// sends the broadcasts and listen to incoming connexions
for (IvyWatcher iw: watchers) iw.doStart();
}
private void waitForRemote(String s) {
try {
while (nbThreads > 0) {
traceDebug("I'm waiting before "+s+", a connecting tread is in progress");
Thread.sleep(GRACEDELAY);
traceDebug("I'm done waiting before "+s);
}
} catch (InterruptedException ie) {
// should not happen, and it's not a problem anyway
}
}
/**
* disconnects from the Ivy bus.
*/
public final void stop() {
waitForRemote("stopping");
if (stopped) return;
stopped = true;
keeprunning = false;
traceDebug("beginning stopping");
try {
synchronized (lockApp) { app.close(); }
// stopping the IvyWatchers
for (IvyWatcher iw: watchers) iw.doStop();
watchers.clear();
// stopping the remaining IvyClients
synchronized (clients) {
for (IvyClient c : clients.values()) if (c != null) c.close(true);
}
} catch (IOException e) {
traceDebug("IOexception Stop ");
}
pool.shutdown();
traceDebug("end stopping");
}
/**
* Toggles the sending of messages to oneself, the remote client's
* IvyMessageListeners are processed first, and ourself afterwards.
* @param b true if you want to send the message to yourself. Default
* is false
* @since 1.2.4
*/
public final void sendToSelf(final boolean b) {
doSendToSelf = b;
}
/**
* do I send messsages to myself ?
* @return a boolean
* @since 1.2.4
*/
public final boolean isSendToSelf() {
return doSendToSelf;
}
/**
* selfIvyClient accesssor.
* @return our selfIvyClient
* @since 1.2.4
* @since 1.2.4
*/
public final SelfIvyClient getSelfIvyClient() {
return selfIvyClient;
}
/**
* Toggles the encoding/decoding of messages to prevent bugs related to the
* presence of a "\n".
* @param b true if you want to enforce encoding of newlines. Default
* is false. Every receiver will have to decode newlines
* @since 1.2.5
* The default escape character is a ESC 0x1A
*/
public final void protectNewlines(final boolean b) {
doProtectNewlines = b;
}
/**
* Performs a pattern matching according to everyone's regexps, and sends
* the results to the relevant ivy agents.
* @throws IvyException if there is a problem sending the message
* @param message A String which will be compared to the regular
* expressions of the different clients
* @return returns the number of messages actually sent
*
* since 1.2.16 goes synchronized to avoid concurrent access
*/
synchronized public final int sendMsg(final String message) throws IvyException {
int count = 0;
waitForRemote("sending");
synchronized (lock) {
traceDebug("sending "+message);
String msg = message;
if (doProtectNewlines) msg = IvyClient.encode(message);
else if ( (msg.indexOf(Protocol.NEWLINE) != -1)||(msg.indexOf(Protocol.ENDARG) != -1))
throw new IvyException("newline character not allowed in Ivy messages");
synchronized (clients) {
for ( IvyClient client : clients.values()) if (client != null) count += client.sendMsg(msg);
}
if (doSendToSelf) count+=selfIvyClient.sendSelfMsg(msg);
traceDebug("end sending "+message+" to "+count+" clients");
}
return count;
}
/**
* Subscribes to a regular expression.
*
* The callback will be executed with
* the saved parameters of the regexp as arguments when a message will sent
* by another agent. A program doesn't receive its own messages.
* Example:
*
the Ivy agent A performs
b.bindMsg("^Hello (*)",cb);
*
the Ivy agent B performs b2.sendMsg("Hello world");
*
a thread in A will uun the callback cb with its second argument set
* to a array of String, with one single element, "world"
* @param sregexp a perl regular expression, groups are done with parenthesis
* @param callback any objects implementing the IvyMessageListener
* interface, on the AWT/Swing framework
* @throws IvyException if there is a problem in the binding, be it regexp
* or network
* @return the id of the regular expression
*/
public final int bindMsg(final String sregexp , final IvyMessageListener callback ) throws IvyException {
return bindMsg(sregexp , callback , BindType.NORMAL);
}
/**
* Subscribes to a regular expression with asyncrhonous callback execution.
*
* Same as bindMsg, except that the callback will be executed in a separate
* thread each time.
* WARNING : there is no way to predict the order of execution
* of the * callbacks, i.e. a message received might trigger a callback before
* another one sent before
*
* @since 1.2.4
* @param sregexp a perl compatible regular expression, groups are done with parenthesis
* @param callback any objects implementing the IvyMessageListener
* interface, on the AWT/Swing framework
* @param type if set to NORMAL, it's a normal bind, if it's ASYNC, the
* callback will be created in a newly spawned Thread (Heavy ressources), if
* it's SWING, the callback will be deferred to the Swing Event Dispatch
* Tread
* @throws IvyException if there is a problem binding (network, regexp...)
* @return the int ID of the regular expression.
*/
public final int bindAsyncMsg(final String sregexp, final IvyMessageListener callback, BindType type ) throws IvyException {
return bindMsg(sregexp , callback , type);
}
/**
* Subscribes to a regular expression.
*
* The callback will be executed with
* the saved parameters of the regexp as arguments when a message will sent
* by another agent. A program doesn't receive its own messages,
* except if sendToSelf() is set to true.
* Example:
*
the Ivy agent A performs
b.bindMsg("^Hello (*)",cb);
*
the Ivy agent B performs b2.sendMsg("Hello world");
*
a thread in A will uun the callback cb with its second argument set
* to a array of String, with one single element, "world"
* @since 1.2.4
* @param sregexp a perl regular expression, groups are done with parenthesis
* @param callback any objects implementing the IvyMessageListener
* interface, on the AWT/Swing framework
* @param type if NORMAL (default) it's a normal bind, if ASYNC, each callback will be run in a separate thread, if SWING, the callback will be deferred to the Swing Event Dispatch Thread
* default is NORMAL
* @throws IvyException if there is a problem binding (regexp, network)
* @return the id of the regular expression
*/
public final int bindMsg(final String sregexp , final IvyMessageListener callback , final BindType type ) throws IvyException {
// adds the regexp to our collection in selfIvyClient
int key = selfIvyClient.bindMsg(sregexp , callback , type);
// notifies the other clients this new regexp
synchronized (clients) {
for (IvyClient c : clients.values() ) if (c != null) c.sendRegexp(key , sregexp);
}
return key;
}
/**
* Subscribes to a regular expression for one time only, useful for
* requests, in cunjunction with getWBUId().
*
* The callback will be executed once and only once, and the agent will
* unsubscribe
* @since 1.2.8
* @param sregexp a perl regular expression, groups are done with parenthesis
* @param callback any objects implementing the IvyMessageListener
* interface, on the AWT/Swing framework
* @throws IvyException if there is a problem during the binding
* @return the id of the regular expression
*/
public final int bindMsgOnce(final String sregexp, final IvyMessageListener callback ) throws IvyException {
Once once = new Once(callback);
int id = bindMsg(sregexp , once);
once.setRegexpId(id);
return id;
}
/**
* unsubscribes a regular expression using the id provided at bind time.
*
* @param id the id of the regular expression, returned when it was bound
* @throws IvyException if the id is not valid anymore
*/
public final void unBindMsg(final int id) throws IvyException {
selfIvyClient.unBindMsg(id);
synchronized (clients) {
for (IvyClient ic : clients.values() ) if (ic != null) ic.delRegexp(id );
}
}
/**
* unsubscribes a regular expression based on its string.
*
* @return a boolean, true if the regexp existed, false otherwise or
* whenever an exception occured during unbinding
* @param re the string for the regular expression
*/
public final boolean unBindMsg(final String re) { return selfIvyClient.unBindMsg(re); }
/**
* adds a bind listener to a bus.
* @param callback is an object implementing the IvyBindListener interface
* @return the id of the bind listener, useful if you wish to remove it later
* @since 1.2.4
*/
public final int addBindListener(final IvyBindListener callback){
ivyBindListenerList.addElement(callback);
return ivyBindListenerList.indexOf(callback);
}
/**
* removes a bind listener.
* @param id the id of the bind listener to remove
* @throws IvyException if id is not known
* @since 1.2.4
*/
public final void removeBindListener(final int id) throws IvyException {
try {
ivyBindListenerList.removeElementAt(id);
} catch (ArrayIndexOutOfBoundsException aie) {
throw new IvyException(id + " is not a valid Id");
}
}
/**
* adds an application listener to a bus.
* @param callback is an object implementing the IvyApplicationListener
* interface
* @return the id of the application listener, useful if you wish to remove
* it later
*/
public synchronized final int addApplicationListener(final IvyApplicationListener callback){
ivyApplicationListenerList.addElement(callback);
return ivyApplicationListenerList.indexOf( callback );
}
/**
* removes an application listener.
* @param id the id of the application listener to remove
* @throws IvyException if there is no such id
*/
public synchronized final void removeApplicationListener(final int id) throws IvyException {
try {
ivyApplicationListenerList.removeElementAt(id);
} catch (ArrayIndexOutOfBoundsException aie) {
throw new IvyException(id + " is not a valid Id");
}
}
/**
* sets the filter expression.
* @param f the extensive list of strings beginning the messages
* @since 1.2.9
* @throws IvyException if a filter is already set or the bus is already
* started
*
* once this filter is set, when a client subscribes to a regexp of the
* form "^dummystring...", there is a check against the filter list. If no
* keyword is found to match, the binding is just ignored.
*/
public final synchronized void setFilter(final String[] f) throws IvyException {
if (filter != null) throw new IvyException("only one filter can be set");
if (!stopped) throw new IvyException("cannot set a filter on a bus that's already started");
filter = java.util.Arrays.copyOf(f , f.length);
}
static {
// compiles the static regexps
try {
rangeRE = Pattern.compile("(\\d+)-(\\d+)"); // tcp range min and max
bounded = Pattern.compile("^\\^([a-zA-Z0-9_-]+).*");
} catch ( PatternSyntaxException res ) {
res.printStackTrace();
System.out.println("Regular Expression bug in Ivy source code ... bailing out");
}
}
/**
* checks the "validity" of a regular expression if a filter has been set.
* @since 1.2.9
* @param exp a string regular expression
* TODO must it be synchronized ( RE was not threadsafe, java regexp is )
*/
public final boolean checkRegexp(final String exp) {
if (filter == null) return true; // there's no message filter
Matcher m = bounded.matcher(exp);
if (!m.matches()) return true; // the regexp is not bounded
//System.out.println("the regexp is bounded, "+bounded.getParen(1));
// else the regexp is bounded. The matching string *must* be in the filter
String prems = m.group(1);
for (String f: filter) if (f.compareTo(prems) == 0) return true;
// traceDebug(" classFilter ["+filter[i]+"] vs regexp ["+prems+"]");
return false;
}
// a private class used by bindMsgOnce, to ensure that a callback will be
// executed once, and only once
private class Once implements IvyMessageListener {
private boolean received = false;
private int id = -1;
private IvyMessageListener ocallback = null;
Once(final IvyMessageListener callback){ ocallback = callback; }
synchronized void setRegexpId(final int fid){ id = fid; }
public void receive(final IvyClient ic , final String[] args){
synchronized(Once.this) {
// synchronized because it will most likely be called
// concurrently, and I *do* want to ensure that it won't
// execute twice
if (received||(ocallback == null)||(id == -1)) return;
received = true;
try { Ivy.this.unBindMsg(id); } catch (IvyException ie) { ie.printStackTrace(); }
ocallback.receive(ic , args);
}
}
}
/* invokes the application listeners upon arrival of a new Ivy client */
protected synchronized final void clientConnects(final IvyClient client){
for (IvyApplicationListener ial : ivyApplicationListenerList) ial.connect(client);
}
/* invokes the application listeners upon the departure of an Ivy client */
protected synchronized final void clientDisconnects(final IvyClient client){
for (IvyApplicationListener ial : ivyApplicationListenerList) ial.disconnect(client);
}
/* invokes the bind listeners */
protected final void regexpReceived(final IvyClient client , final int id , final String sregexp){
for (IvyBindListener ibl : ivyBindListenerList) ibl.bindPerformed(client , id , sregexp);
}
/* invokes the bind listeners */
protected final void regexpDeleted(final IvyClient client , final int id , final String sregexp){
for (IvyBindListener ibl : ivyBindListenerList) ibl.unbindPerformed(client , id , sregexp);
}
/*
* invokes the application listeners when we are summoned to die
* then stops
*/
protected synchronized final void dieReceived(final IvyClient client , final int id , final String message){
for (IvyApplicationListener ial : ivyApplicationListenerList) ial.die(client , id , message);
}
/* invokes the direct message callbacks */
protected synchronized final void directMessage(final IvyClient client , final int id , final String msgarg ){
for (IvyApplicationListener ial : ivyApplicationListenerList) ial.directMessage(client , id, msgarg);
}
/**
* gives a list of IvyClient at a given instant.
* @return a collection of IvyClients
*/
public final Collection getIvyClients() {
Collection v = new ArrayList();
synchronized (clients) {
for (IvyClient ic : clients.values() ) if (ic != null) v.add(ic);
}
return v;
}
/**
* gives a list of IvyClient with the name given in parameter.
*
* @param name The name of the Ivy agent you're looking for
* @return a vector of IvyClients
*/
public final Collection getIvyClientsByName(final String name) {
Collection v = new ArrayList();
String icname;
synchronized (clients) {
for (IvyClient ic : clients.values() ) {
if ( (ic == null)||((icname = ic.getApplicationName()) == null) ) break;
if (icname.compareTo(name) == 0) v.add(ic);
}
}
return v;
}
/**
* returns a "wana be unique" ID to make requests on the bus.
*
* @since 1.2.8
* @return returns a string wich is meant to be noisy enough to be unique
*/
public final String getWBUId() {
return "ID<" + appName + myserial + ":" + nextId() + ":" + generator.nextInt() + ">";
}
@Override public String toString() {
return "bus <"+appName+">[port:"+applicationPort+",serial:"+myserial+"]";
}
private synchronized long nextId() { return current++; }
/////////////////////////////////////////////////////////////////:
//
// Protected methods
//
/////////////////////////////////////////////////////////////////:
/**
* @return false if the client has not been created, true otherwise
*/
protected boolean createIvyClient(Socket s , int port, boolean domachin) throws IOException {
IvyClient i = new IvyClient(this , s , port , domachin);
try {
pool.execute(i);
} catch (RejectedExecutionException ree) {
// in another thread, the pool is shut down
traceDebug("in another thread, the pool is shut down");
return false;
}
return true;
}
protected synchronized void removeClient(IvyClient c) {
synchronized(lock) {
synchronized (clients) {
clients.remove(c.getClientKey());
}
traceDebug("removed " + c + " from clients: " + getClientNames(clients));
}
}
protected synchronized void handShake(IvyClient c) {
synchronized(lock) {
removeHalf(c);
if (clients == null||c == null) return;
// TODO check if it's not already here !
IvyClient peer = searchPeer(c);
if ((peer == null) || peer.distanceTo(c)>0 ){
synchronized (clients) {
clients.put(c.getClientKey() , c);
}
traceDebug("added " + c + " in clients: " + getClientNames(clients));
} else {
traceDebug("not adding "+c+" in clients, double connexion detected, removing lowest one");
try {
c.close(false);
} catch (IOException ioe) {
// TODO
}
}
}
}
protected synchronized void addHalf(IvyClient c) {
synchronized (half) { half.put(c.getClientKey() , c); }
traceDebug("added " + c + " in half: " + getClientNames(half));
}
protected synchronized void removeHalf(IvyClient c) {
if (half == null||c == null) return;
synchronized (half) {
half.remove(c.getClientKey());
}
traceDebug("removed " + c + " from half: " + getClientNames(half));
}
private synchronized IvyClient searchPeer(IvyClient ic) {
synchronized (clients) {
for (IvyClient peer : clients.values()) if ((peer != null)&&(peer.myEquals(ic))) return peer;
}
return null;
}
/*
* the service socket thread reader main loop
*/
public void run() {
traceDebug("service thread started"); // THREADDEBUG
serverThread = Thread.currentThread();
serverThread.setName("Ivy TCP server Thread");
//serverThread.setDaemon(true);
Socket socket = null;
while ( keeprunning ){
try {
synchronized (this) {
//System.out.println("DEBUG stopped: "+stopped);
if ( (!keeprunning) || stopped ) break; // early disconnexion
}
synchronized (lockApp) {
socket = app.accept(); // TODO I can't synchronize on (this) in the run
}
synchronized (this) {
if ( (!keeprunning) || stopped ) break; // early disconnexion
// the peer called me
if ( ! createIvyClient(socket , 0 , true) ) break;
}
} catch (InterruptedIOException ie) {
// traceDebug("server socket was interrupted. good");
if ( !keeprunning ) break;
} catch( IOException e ) {
if ( keeprunning ) {
traceDebug("Error IvyServer exception: " + e.getMessage());
System.out.println("Ivy server socket reader caught an exception " + e.getMessage());
System.out.println("this is probably a bug in your JVM ! (e.g. blackdown jdk1.1.8 linux)");
throw new RuntimeException();
} else {
traceDebug("my server socket has been closed");
}
}
}
traceDebug("service thread stopped"); // THREADDEBUG
}
String getAppName() { return appName; }
int getAppPort() { return applicationPort; }
String getReadyMessage() { return ready_message; }
boolean getProtectNewlines() { return doProtectNewlines; }
String getWatcherId() { return watcherId; }
int getBufferSize() { return bufferSize; }
int getSerial() { return myserial; }
ExecutorService getPool() { return pool; }
protected void pushThread(String reason) {
synchronized(readyToSend) {
nbThreads++ ;
//System.out.println("DEBUG PUSH "+this+" -- threads: "+nbThreads + "; reason: "+reason);
}
}
protected void popThread(String reason) {
synchronized(readyToSend) {
nbThreads-- ;
//System.out.println("DEBUG POP "+this+" -- threads: "+nbThreads + "reason: "+reason);
}
}
private void traceDebug(String s){
if (debug) System.out.println("-->Ivy[" + myserial + "]<-- " + s);
}
// a small private method for debbugging purposes
private String getClientNames(Map t) {
StringBuffer s = new StringBuffer();
s.append("(");
synchronized (t) {
for (IvyClient ic : t.values() ) if (ic != null) s.append(ic.getApplicationName() + ",");
}
s.append(")");
return s.toString();
}
}