aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java.mk4
-rw-r--r--src/Ghost.java21
-rw-r--r--src/ProxyClient.java77
-rw-r--r--src/ProxyMaster.java63
-rw-r--r--src/Puppet.java121
5 files changed, 212 insertions, 74 deletions
diff --git a/java.mk b/java.mk
index c1840c5..a847e2c 100644
--- a/java.mk
+++ b/java.mk
@@ -8,14 +8,14 @@
# 1.2.8 first
#
- GNUPATH = /usr/share/java/gnu-getopt.jar:/usr/share/java/regexp.jar # debian SID
+ GNUPATH =/usr/share/java/gnu-getopt.jar:/usr/share/java/regexp.jar
#GNUPATH = ../bundle # on my MAC
#GNUPATH = /usr/lib/jdk1.1/lib/classes.zip:/usr/share/java/gnu.getopt.jar:/usr/share/java/regexp.jar # debian woody
#GNUPATH = ${HOME}/java/Jars/gnu.getopt.jar:${HOME}/java/Jars/regexp.jar # Others
#RTPATH = /usr/local/jdk1.5.0/jre/lib/rt.jar # for jikes on my box
#RTPATH = /usr/local/jdk118_v3/lib/classes.zip:/home/jestin/java/Jars/swingall.jar # for 1.1.8 on my box
-CLASSPATH = -classpath .:$(GNUPATH)
+CLASSPATH = -classpath $(GNUPATH):.
#JAVA = kaffe
JAVA = java
diff --git a/src/Ghost.java b/src/Ghost.java
index fd7465d..f8f76cf 100644
--- a/src/Ghost.java
+++ b/src/Ghost.java
@@ -18,25 +18,30 @@ class Ghost extends IvyClient {
private String id; // given to the Proxy by the Master
private ProxyClient pc;
- Ghost(Ivy bus, Socket socket,int remotePort,boolean incoming,String id,ProxyClient pc) throws IOException {
+ Ghost(Ivy bus, Socket socket,int remotePort,boolean incoming,String id,ProxyClient pc)
+ throws IOException {
super(bus,socket,remotePort,incoming);
this.id=id;
this.pc=pc;
- System.out.println("new Ghost: "+id+" pc:"+pc);
+ System.out.println("Ghost["+id+"] created");
}
// ProxyClient -> Ghost -> Ivy Bus
- protected synchronized void sendBuffer( String buffer ) throws IvyException {
- // System.out.println("out buffer: "+buffer+" for:"+pc+" id:"+id);
- super.sendBuffer(buffer); // and to all the agents on the Ghost bus ? I'm not sure
+ protected synchronized void sendBuffer( String s ) throws IvyException {
+ System.out.println("Ghost["+id+"] sending ["+s+"]");
+ super.sendBuffer(s); // and to all the agents on the Ghost bus ? I'm not sure
}
// Bus -> Ghost -> ProxyClient -> ProxyMaster -> other buses
protected boolean newParseMsg(String s) throws IvyException {
// I received a message from an agent on the bus
- System.out.println("in buffer to forward from "+id+": "+s);
- if (pc!=null) pc.forward(id,s); // forward to all the puppets
- return super.newParseMsg(s); // I'll take no action
+ if (pc!=null) {
+ System.out.println("Ghost["+id+"] forwarding ["+s+"]");
+ pc.forwardPuppet(id,s); // forward to all the puppets
+ } else {
+ System.out.println("Warning, Ghost ["+id+"] could not forward ["+s+"] to null pc");
+ }
+ return super.newParseMsg(s); // I'm a normal Ivy citizen
}
}
diff --git a/src/ProxyClient.java b/src/ProxyClient.java
index d6c6932..9eb1e41 100644
--- a/src/ProxyClient.java
+++ b/src/ProxyClient.java
@@ -23,9 +23,9 @@ public class ProxyClient extends Ivy {
private BufferedReader in;
private boolean isRunning=false;
private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ;
- private volatile Thread clientThread;// volatile to ensure the quick communication
+ private volatile Thread clientThread; // volatile to ensure the quick communication
private Hashtable id=new Hashtable();
- private Vector ghosts = new Vector();
+ private Hashtable ghosts = new Hashtable();
private Hashtable puppets =new Hashtable(); // key=id value=Puppet
String domain=null;
@@ -72,9 +72,6 @@ public class ProxyClient extends Ivy {
String hostname="localhost";
try {
ProxyClient pc = new ProxyClient(hostname,servicePort,domain);
- if (!quiet) System.out.println("broadcasting on "+pc.domains(domain));
- if (!quiet) System.out.println("contacting tcp:"+hostname+":"+servicePort);
- System.out.println("client running...");
} catch (IvyException ie) {
System.out.println("error, can't connect to Ivy");
ie.printStackTrace();
@@ -87,31 +84,35 @@ public class ProxyClient extends Ivy {
}
public ProxyClient(String hostname,int servicePort,String domain) throws IOException, IvyException {
- super(name,name+" ready",null);
- clientSocket = new Socket(hostname,servicePort) ;
+ super(name,name+" ready",null); // I will join the bus
+ System.out.println("PC contacting tcp:"+hostname+":"+servicePort);
+ clientSocket = new Socket(hostname,servicePort) ; // contacting hostname:servicePort
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
out = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream()));
- clientThread=new Thread(new Servicer());
+ clientThread=new Thread(new Servicer()); //
clientThread.start();
this.domain=domain;
start(domain);
+ send("Hello bus="+domain);
}
RE getId=new RE("^ID id=(.*) value=(.*)");
- RE fwd=new RE("^Forward id=(.*) buffer=(.*)");
+ RE fwdGhost=new RE("^ForwardGhost id=(.*) buffer=(.*)");
+ RE fwdPuppet=new RE("^ForwardPuppet id=(.*) buffer=(.*)");
RE puppetRe=new RE("^CreatePuppet id=(.*)");
void parseMsg(String msg) {
- System.out.println("parsing "+msg);
+ // System.out.println("PC parsing "+msg);
if (getId.match(msg)) {
id.put(getId.getParen(1),getId.getParen(2));
- } else if (puppetRe.match(msg)) {
- // I must create a puppet
- puppets.put(puppetRe.getParen(1),new Puppet(this,domain));
- } else if (fwd.match(msg)) {
- // received a forward request
- // TODO give it to the puppet
- Puppet p = (Puppet)puppets.get(fwd.getParen(1));
- p.parse(fwd.getParen(2));
+ } else if (puppetRe.match(msg)) { // I must create a puppet
+ String puppetId = puppetRe.getParen(1);
+ puppets.put(puppetId,new Puppet(this,puppetId,domain));
+ } else if (fwdGhost.match(msg)) { // I must forward to the ghost
+ Ghost g = (Ghost)ghosts.get(fwdGhost.getParen(1));
+ try { g.sendBuffer(fwdGhost.getParen(2)); } catch( IvyException ie) { ie.printStackTrace(); }
+ } else if (fwdPuppet.match(msg)) { // I must forward to the puppet
+ Puppet p = (Puppet)puppets.get(fwdPuppet.getParen(1));
+ try { p.parse(fwdPuppet.getParen(2)); } catch( IvyException ie) { ie.printStackTrace(); }
} else {
System.out.println("unknown message "+msg);
}
@@ -136,28 +137,44 @@ public class ProxyClient extends Ivy {
System.exit(0);
}
traceDebug("Subreader Thread stopped");
- System.out.println("ProxyClient disconnected");
+ System.out.println("connexion to ProxyMaster lost");
+ for (Enumeration e=puppets.elements();e.hasMoreElements();) ((Puppet)e.nextElement()).stop();
stop(); // leave the bus TODO: make a disconnexion/reconnexion possible ?
}
}
+ void send(String s) { // sends a message to the proxyMaster
+ out.println(s);
+ out.flush();
+ }
+
/*
* here, I create a ghost instead of a normal Ivy Client, to catch the
* protocol and forward everything to the proxies.
* TODO: remember everything in case a new proxy client comes ?
*/
protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException {
+ // TODO si c'est un puppet, je ne dois pas créer de Ghost
+ // voir même me déconnecter du biniou ?
+ for (Enumeration e=puppets.elements();e.hasMoreElements();) {
+ if (( ((Puppet)e.nextElement()).bus.getAP() == port ) && !domachin ) {
+ // this new Ivy agent is in fact one of my puppets ...
+ System.out.println("not Ghosting this (probable) Puppet Ivy agent");
+ return new IvyClient(this,s,port,domachin);
+ }
+ }
String key = getWBUId();
- System.out.println("asking for a key");
- out.println("GetID id="+key); // asks a centralized ID from ProxyMaster
- out.flush();
+ String ghostId;
+ send("GetID id="+key); // asks a centralized ID from ProxyMaster
try { // waits for the answer
- while (id.get(key)==null) { Thread.sleep(200); }
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- }
- System.out.println("key received");
- return new Ghost(this,s,port,domachin,(String)id.get(key),this);
+ while ((ghostId=(String)id.get(key))==null) { Thread.sleep(200); }
+ Ghost g = new Ghost(this,s,port,domachin,ghostId,this);
+ ghosts.put(ghostId,g);
+ return g;
+ } catch (InterruptedException ie) { ie.printStackTrace(); }
+ System.out.println("error waiting");
+ System.exit(0);
+ return null;
}
/*
@@ -165,8 +182,8 @@ public class ProxyClient extends Ivy {
* ProxyMaster
*
*/
- protected void forward(String id,String buffer) {
- out.println("Forward id="+id+" buffer="+buffer);
+ protected void forwardPuppet(String id,String buffer) {
+ out.println("ForwardPuppet id="+id+" buffer="+buffer);
out.flush();
}
diff --git a/src/ProxyMaster.java b/src/ProxyMaster.java
index 09216ed..f80fa11 100644
--- a/src/ProxyMaster.java
+++ b/src/ProxyMaster.java
@@ -9,7 +9,8 @@
* changelog:
* 1.2.12
*/
-package fr.dgac.ivy ; // TODO go into sub tools, and build a shell/.BAT script
+package fr.dgac.ivy.tools ; // TODO go into sub tools, and build a shell/.BAT script
+import fr.dgac.ivy.* ;
import java.io.*;
import java.net.*;
import java.util.* ;
@@ -21,7 +22,9 @@ public class ProxyMaster {
private ServerSocket serviceSocket;
private boolean isRunning=false;
private static boolean debug=false;
+ private boolean doRun=true; // stops running when set to false
private Vector proxyClients = new Vector();
+ private Hashtable ghostFathers = new Hashtable(); // key: ghostId value: SubReader
private static int serial=0;
public static int DEFAULT_SERVICE_PORT = 3456 ;
@@ -81,22 +84,27 @@ public class ProxyMaster {
class SubReader extends Thread {
BufferedReader in;
PrintWriter out;
+ String hostname=null; // I will know from the socket
+ String busDomain=null; // I will know it from the Hello message
+
SubReader(Socket socket) throws IOException {
proxyClients.addElement(this);
- System.out.println("ProxyClient connected");
+ hostname = socket.getInetAddress().getHostName();
in=new BufferedReader(new InputStreamReader(socket.getInputStream()));
out=new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
start();
}
- public void send(String s) {
+
+ public void send(String s) { // sends a message to the SubReader peer ( a ProxyClient )
out.println(s);
out.flush();
}
+
public void run() {
traceDebug("Subreader Thread started");
String msg = null;
try {
- while (true) {
+ while (doRun) {
msg=in.readLine();
if (msg==null) break;
parseMsg(msg);
@@ -107,28 +115,47 @@ public class ProxyMaster {
System.exit(0);
}
traceDebug("Subreader Thread stopped");
- System.out.println("ProxyClient disconnected");
+ System.out.println("ProxyClient on "+hostname+", bus "+busDomain+" disconnected");
proxyClients.removeElement(this);
}
+ RE helloRE=new RE("^Hello bus=(.*)");
RE getId=new RE("^GetID id=(.*)");
- RE fwd=new RE("^Forward id=(.*) buffer=(.*)");
+ RE fwdPuppet=new RE("^ForwardPuppet id=(.*) buffer=(.*)");
+ RE fwdGhost=new RE("^ForwardGhost id=(.*) buffer=(.*)");
+
void parseMsg(String msg) {
- System.out.println("parsing "+msg);
- if (getId.match(msg)) {
- // a new Ghost has appeared
- System.out.println("a new Ghost has appeared");
- int newGhostId = serial++;
- out.println("ID id="+getId.getParen(1)+" value="+newGhostId);
- out.flush();
- // TODO create Puppets in each other ProxyClient
+ // System.out.println("PM parsing "+msg);
+ if (helloRE.match(msg)) {
+ busDomain = helloRE.getParen(1);
+ System.out.println("PC connected from "+hostname+", on the bus "+busDomain);
+ } else if (getId.match(msg)) {
+ // a new Ghost has appeared and requests an Id
+ System.out.println("PM registering a new Ghost");
+ String newGhostId = new Integer(serial++).toString();
+ // I give it its ID
+ send("ID id="+getId.getParen(1)+" value="+newGhostId);
+ ghostFathers.put(newGhostId,this); // remember the SubReader holding this Ghost
+ // I ask all other Clients to prepare a puppet
+ for (Enumeration e=proxyClients.elements();e.hasMoreElements();) {
+ SubReader sr = (SubReader)e.nextElement();
+ if (sr!=SubReader.this) {
+ // System.out.println("propagate CreatePuppet to "+sr.busDomain);
+ sr.send("CreatePuppet id="+newGhostId);
+ } else {
+ // System.out.println("won't propagate CreatePuppet to "+sr.busDomain);
+ }
+ }
+ } else if (fwdGhost.match(msg)) {
+ System.out.println("PM forwarding ["+msg+"] to its Ghost");
+ SubReader sr = (SubReader) ghostFathers.get(fwdGhost.getParen(1));
+ sr.send(msg);
+ } else if (fwdPuppet.match(msg)) {
+ System.out.println("PM forwarding ["+msg+"] to all other PCs");
for (Enumeration e=proxyClients.elements();e.hasMoreElements();) {
SubReader sr = (SubReader)e.nextElement();
- if (sr!=SubReader.this) sr.send("CreatePuppet id="+newGhostId);
+ if (sr!=SubReader.this) sr.send(msg);
}
- } else if (fwd.match(msg)) {
- System.out.println("forwarding "+msg);
- // TODO forward the message to all relevant puppets
} else {
System.out.println("error unknown message "+msg);
}
diff --git a/src/Puppet.java b/src/Puppet.java
index d122d52..e2a90ee 100644
--- a/src/Puppet.java
+++ b/src/Puppet.java
@@ -5,35 +5,124 @@ import java.io.*;
import java.util.*;
import org.apache.regexp.*;
-class Puppet extends Ivy {
+class Puppet {
- Hashtable bound = new Hashtable(); // clef: l'ID de la regexp choisi par le ghost, valeur: l'Int de l'abonnement local
- String myDomain;
+ // the mapping between Ghost regexp and local bus regexp numbers
+ Hashtable bound = new Hashtable(); // ghostID localID
+ Hashtable regexps = new Hashtable(); // ghostID textRegexp
+ String domain;
+ String appName;
ProxyClient pc;
+ String id;
+ boolean started;
+ PuppetIvy bus;
- Puppet(ProxyClient pc,String domain) {
- super("noname","noname not ready",null);
- myDomain=domain;
+ Puppet(ProxyClient pc,String id,String domain) {
+ this.domain=domain;
this.pc=pc;
+ this.id=id;
}
- void parse(String s){
- System.out.println("the puppet must parse "+s);
+ void sendGhost(String s) { pc.send("ForwardGhost id="+id+" buffer="+s); }
+
+ class ForwardMessenger implements IvyMessageListener {
+ String localId,ghostId;
+ public ForwardMessenger(String ghostId,String re) throws IvyException {
+ this.ghostId=ghostId;
+ this.localId = (new Integer(bus.bindMsg(re,ForwardMessenger.this))).toString();
+ bound.put(ghostId,localId);
+ }
+ public void receive(IvyClient ic,String args[]) {
+ String tosend = IvyClient.Msg+" "+ghostId+IvyClient.StartArg;
+ for (int i=0;i<args.length;i++) tosend+=args[i]+IvyClient.EndArg;
+ sendGhost(tosend);
+ }
+ } // ForwardMessenger
+
+ void addRegexp(String ghostId,String re) {
+ regexps.put(ghostId,re);
+ try {
+ if (started) new ForwardMessenger(ghostId,re);
+ } catch( IvyException ie) { ie.printStackTrace(); }
+ }
+
+ void removeRegexp(String ghostId) {
+ try {
+ bus.unBindMsg(Integer.parseInt((String)bound.remove(ghostId)));
+ } catch( IvyException ie) { ie.printStackTrace(); }
+ }
+
+ void stop() {
+ if (started) bus.stop();
}
- protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException {
- return new PuppetClient();
+ // ivy forwarded protocol message
+ static final RE ivyProto = new RE("(\\d+) (\\d+)\\02(.*)");
+ void parse(String s) throws IvyException {
+ if (!ivyProto.match(s)) { System.out.println("Puppet error, can't parse "+s); return; }
+ int pcode=Integer.parseInt(ivyProto.getParen(1));
+ String pid=ivyProto.getParen(2);
+ String args=ivyProto.getParen(3);
+ trace("must parse code:"+pcode+" id:"+pid+" args:"+args);
+ switch (pcode) {
+ case IvyClient.AddRegexp: // the Ghost's peer subscribes to something
+ addRegexp(pid,args);
+ break;
+ case IvyClient.DelRegexp: // the Ghost's peer unsubscribes to something
+ removeRegexp(pid);
+ break;
+ case IvyClient.Bye: // the Ghost's peer disconnects gracefully
+ bus.stop();
+ // TODO end of the puppet ?
+ break;
+ case IvyClient.Die:
+ // the Ghost's peer wants to ... kill ProxyClient ?
+ break;
+ case IvyClient.Msg:
+ // the Ghost's peer sends a message to ProxyClient, with regard to one
+ // of our subscriptions
+ // TODO à qui le faire passer ?
+ break;
+ case IvyClient.SchizoToken:
+ appName = args;
+ bus = new PuppetIvy(appName,appName+" fakeready",null);
+ for (Enumeration e = regexps.keys();e.hasMoreElements();) {
+ String ghostId=(String)e.nextElement();
+ String re=(String)regexps.get(ghostId);
+ new ForwardMessenger(ghostId,re);
+ }
+ started=true;
+ trace("starting the bus on "+domain);
+ bus.start(domain);
+ break;
+ case IvyClient.Error:
+ case IvyClient.EndRegexp:
+ case IvyClient.DirectMsg:
+ case IvyClient.Ping:
+ case IvyClient.Pong:
+ default:
+ trace("unused Ivy protocol code "+pcode);
+ }
}
- class PuppetClient extends IvyClient {
- protected synchronized void sendBuffer( String buffer ) throws IvyException {
- super.sendBuffer(buffer); // and to all the agents on the Ghost bus ? I'm not sure
+ class PuppetIvy extends Ivy {
+ PuppetIvy(String name,String ready,IvyApplicationListener ial){super(name,ready,ial);}
+ protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException {
+ return new PuppetIvyClient(PuppetIvy.this,s,port,domachin);
}
+ int getAP() {return applicationPort;}
+ }
+ class PuppetIvyClient extends IvyClient {
+ PuppetIvyClient(Ivy bus,Socket s,int port,boolean b) throws IOException { super(bus,s,port,b); }
+ protected synchronized void sendBuffer( String s ) throws IvyException {
+ super.sendBuffer(s); // and to all the agents on the Ghost bus ? I'm not sure
+ }
protected boolean newParseMsg(String s) throws IvyException {
- return super.newParseMsg(s);
+ return super.newParseMsg(s); // I'm a normal Ivy citizen
}
- } // class PuppetClient
+ }
-}
+ void trace(String s) { System.out.println("Puppet["+id+"] "+s);}
+}