diff options
-rw-r--r-- | java.mk | 4 | ||||
-rw-r--r-- | src/Ghost.java | 21 | ||||
-rw-r--r-- | src/ProxyClient.java | 77 | ||||
-rw-r--r-- | src/ProxyMaster.java | 63 | ||||
-rw-r--r-- | src/Puppet.java | 121 |
5 files changed, 212 insertions, 74 deletions
@@ -8,14 +8,14 @@ # 1.2.8 first # - GNUPATH = /usr/share/java/gnu-getopt.jar:/usr/share/java/regexp.jar # debian SID + GNUPATH =/usr/share/java/gnu-getopt.jar:/usr/share/java/regexp.jar #GNUPATH = ../bundle # on my MAC #GNUPATH = /usr/lib/jdk1.1/lib/classes.zip:/usr/share/java/gnu.getopt.jar:/usr/share/java/regexp.jar # debian woody #GNUPATH = ${HOME}/java/Jars/gnu.getopt.jar:${HOME}/java/Jars/regexp.jar # Others #RTPATH = /usr/local/jdk1.5.0/jre/lib/rt.jar # for jikes on my box #RTPATH = /usr/local/jdk118_v3/lib/classes.zip:/home/jestin/java/Jars/swingall.jar # for 1.1.8 on my box -CLASSPATH = -classpath .:$(GNUPATH) +CLASSPATH = -classpath $(GNUPATH):. #JAVA = kaffe JAVA = java diff --git a/src/Ghost.java b/src/Ghost.java index fd7465d..f8f76cf 100644 --- a/src/Ghost.java +++ b/src/Ghost.java @@ -18,25 +18,30 @@ class Ghost extends IvyClient { private String id; // given to the Proxy by the Master private ProxyClient pc; - Ghost(Ivy bus, Socket socket,int remotePort,boolean incoming,String id,ProxyClient pc) throws IOException { + Ghost(Ivy bus, Socket socket,int remotePort,boolean incoming,String id,ProxyClient pc) + throws IOException { super(bus,socket,remotePort,incoming); this.id=id; this.pc=pc; - System.out.println("new Ghost: "+id+" pc:"+pc); + System.out.println("Ghost["+id+"] created"); } // ProxyClient -> Ghost -> Ivy Bus - protected synchronized void sendBuffer( String buffer ) throws IvyException { - // System.out.println("out buffer: "+buffer+" for:"+pc+" id:"+id); - super.sendBuffer(buffer); // and to all the agents on the Ghost bus ? I'm not sure + protected synchronized void sendBuffer( String s ) throws IvyException { + System.out.println("Ghost["+id+"] sending ["+s+"]"); + super.sendBuffer(s); // and to all the agents on the Ghost bus ? I'm not sure } // Bus -> Ghost -> ProxyClient -> ProxyMaster -> other buses protected boolean newParseMsg(String s) throws IvyException { // I received a message from an agent on the bus - System.out.println("in buffer to forward from "+id+": "+s); - if (pc!=null) pc.forward(id,s); // forward to all the puppets - return super.newParseMsg(s); // I'll take no action + if (pc!=null) { + System.out.println("Ghost["+id+"] forwarding ["+s+"]"); + pc.forwardPuppet(id,s); // forward to all the puppets + } else { + System.out.println("Warning, Ghost ["+id+"] could not forward ["+s+"] to null pc"); + } + return super.newParseMsg(s); // I'm a normal Ivy citizen } } diff --git a/src/ProxyClient.java b/src/ProxyClient.java index d6c6932..9eb1e41 100644 --- a/src/ProxyClient.java +++ b/src/ProxyClient.java @@ -23,9 +23,9 @@ public class ProxyClient extends Ivy { private BufferedReader in; private boolean isRunning=false; private static boolean debug = (System.getProperty("IVY_DEBUG")!=null) ; - private volatile Thread clientThread;// volatile to ensure the quick communication + private volatile Thread clientThread; // volatile to ensure the quick communication private Hashtable id=new Hashtable(); - private Vector ghosts = new Vector(); + private Hashtable ghosts = new Hashtable(); private Hashtable puppets =new Hashtable(); // key=id value=Puppet String domain=null; @@ -72,9 +72,6 @@ public class ProxyClient extends Ivy { String hostname="localhost"; try { ProxyClient pc = new ProxyClient(hostname,servicePort,domain); - if (!quiet) System.out.println("broadcasting on "+pc.domains(domain)); - if (!quiet) System.out.println("contacting tcp:"+hostname+":"+servicePort); - System.out.println("client running..."); } catch (IvyException ie) { System.out.println("error, can't connect to Ivy"); ie.printStackTrace(); @@ -87,31 +84,35 @@ public class ProxyClient extends Ivy { } public ProxyClient(String hostname,int servicePort,String domain) throws IOException, IvyException { - super(name,name+" ready",null); - clientSocket = new Socket(hostname,servicePort) ; + super(name,name+" ready",null); // I will join the bus + System.out.println("PC contacting tcp:"+hostname+":"+servicePort); + clientSocket = new Socket(hostname,servicePort) ; // contacting hostname:servicePort in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); out = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream())); - clientThread=new Thread(new Servicer()); + clientThread=new Thread(new Servicer()); // clientThread.start(); this.domain=domain; start(domain); + send("Hello bus="+domain); } RE getId=new RE("^ID id=(.*) value=(.*)"); - RE fwd=new RE("^Forward id=(.*) buffer=(.*)"); + RE fwdGhost=new RE("^ForwardGhost id=(.*) buffer=(.*)"); + RE fwdPuppet=new RE("^ForwardPuppet id=(.*) buffer=(.*)"); RE puppetRe=new RE("^CreatePuppet id=(.*)"); void parseMsg(String msg) { - System.out.println("parsing "+msg); + // System.out.println("PC parsing "+msg); if (getId.match(msg)) { id.put(getId.getParen(1),getId.getParen(2)); - } else if (puppetRe.match(msg)) { - // I must create a puppet - puppets.put(puppetRe.getParen(1),new Puppet(this,domain)); - } else if (fwd.match(msg)) { - // received a forward request - // TODO give it to the puppet - Puppet p = (Puppet)puppets.get(fwd.getParen(1)); - p.parse(fwd.getParen(2)); + } else if (puppetRe.match(msg)) { // I must create a puppet + String puppetId = puppetRe.getParen(1); + puppets.put(puppetId,new Puppet(this,puppetId,domain)); + } else if (fwdGhost.match(msg)) { // I must forward to the ghost + Ghost g = (Ghost)ghosts.get(fwdGhost.getParen(1)); + try { g.sendBuffer(fwdGhost.getParen(2)); } catch( IvyException ie) { ie.printStackTrace(); } + } else if (fwdPuppet.match(msg)) { // I must forward to the puppet + Puppet p = (Puppet)puppets.get(fwdPuppet.getParen(1)); + try { p.parse(fwdPuppet.getParen(2)); } catch( IvyException ie) { ie.printStackTrace(); } } else { System.out.println("unknown message "+msg); } @@ -136,28 +137,44 @@ public class ProxyClient extends Ivy { System.exit(0); } traceDebug("Subreader Thread stopped"); - System.out.println("ProxyClient disconnected"); + System.out.println("connexion to ProxyMaster lost"); + for (Enumeration e=puppets.elements();e.hasMoreElements();) ((Puppet)e.nextElement()).stop(); stop(); // leave the bus TODO: make a disconnexion/reconnexion possible ? } } + void send(String s) { // sends a message to the proxyMaster + out.println(s); + out.flush(); + } + /* * here, I create a ghost instead of a normal Ivy Client, to catch the * protocol and forward everything to the proxies. * TODO: remember everything in case a new proxy client comes ? */ protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException { + // TODO si c'est un puppet, je ne dois pas créer de Ghost + // voir même me déconnecter du biniou ? + for (Enumeration e=puppets.elements();e.hasMoreElements();) { + if (( ((Puppet)e.nextElement()).bus.getAP() == port ) && !domachin ) { + // this new Ivy agent is in fact one of my puppets ... + System.out.println("not Ghosting this (probable) Puppet Ivy agent"); + return new IvyClient(this,s,port,domachin); + } + } String key = getWBUId(); - System.out.println("asking for a key"); - out.println("GetID id="+key); // asks a centralized ID from ProxyMaster - out.flush(); + String ghostId; + send("GetID id="+key); // asks a centralized ID from ProxyMaster try { // waits for the answer - while (id.get(key)==null) { Thread.sleep(200); } - } catch (InterruptedException ie) { - ie.printStackTrace(); - } - System.out.println("key received"); - return new Ghost(this,s,port,domachin,(String)id.get(key),this); + while ((ghostId=(String)id.get(key))==null) { Thread.sleep(200); } + Ghost g = new Ghost(this,s,port,domachin,ghostId,this); + ghosts.put(ghostId,g); + return g; + } catch (InterruptedException ie) { ie.printStackTrace(); } + System.out.println("error waiting"); + System.exit(0); + return null; } /* @@ -165,8 +182,8 @@ public class ProxyClient extends Ivy { * ProxyMaster * */ - protected void forward(String id,String buffer) { - out.println("Forward id="+id+" buffer="+buffer); + protected void forwardPuppet(String id,String buffer) { + out.println("ForwardPuppet id="+id+" buffer="+buffer); out.flush(); } diff --git a/src/ProxyMaster.java b/src/ProxyMaster.java index 09216ed..f80fa11 100644 --- a/src/ProxyMaster.java +++ b/src/ProxyMaster.java @@ -9,7 +9,8 @@ * changelog: * 1.2.12 */ -package fr.dgac.ivy ; // TODO go into sub tools, and build a shell/.BAT script +package fr.dgac.ivy.tools ; // TODO go into sub tools, and build a shell/.BAT script +import fr.dgac.ivy.* ; import java.io.*; import java.net.*; import java.util.* ; @@ -21,7 +22,9 @@ public class ProxyMaster { private ServerSocket serviceSocket; private boolean isRunning=false; private static boolean debug=false; + private boolean doRun=true; // stops running when set to false private Vector proxyClients = new Vector(); + private Hashtable ghostFathers = new Hashtable(); // key: ghostId value: SubReader private static int serial=0; public static int DEFAULT_SERVICE_PORT = 3456 ; @@ -81,22 +84,27 @@ public class ProxyMaster { class SubReader extends Thread { BufferedReader in; PrintWriter out; + String hostname=null; // I will know from the socket + String busDomain=null; // I will know it from the Hello message + SubReader(Socket socket) throws IOException { proxyClients.addElement(this); - System.out.println("ProxyClient connected"); + hostname = socket.getInetAddress().getHostName(); in=new BufferedReader(new InputStreamReader(socket.getInputStream())); out=new PrintWriter(new OutputStreamWriter(socket.getOutputStream())); start(); } - public void send(String s) { + + public void send(String s) { // sends a message to the SubReader peer ( a ProxyClient ) out.println(s); out.flush(); } + public void run() { traceDebug("Subreader Thread started"); String msg = null; try { - while (true) { + while (doRun) { msg=in.readLine(); if (msg==null) break; parseMsg(msg); @@ -107,28 +115,47 @@ public class ProxyMaster { System.exit(0); } traceDebug("Subreader Thread stopped"); - System.out.println("ProxyClient disconnected"); + System.out.println("ProxyClient on "+hostname+", bus "+busDomain+" disconnected"); proxyClients.removeElement(this); } + RE helloRE=new RE("^Hello bus=(.*)"); RE getId=new RE("^GetID id=(.*)"); - RE fwd=new RE("^Forward id=(.*) buffer=(.*)"); + RE fwdPuppet=new RE("^ForwardPuppet id=(.*) buffer=(.*)"); + RE fwdGhost=new RE("^ForwardGhost id=(.*) buffer=(.*)"); + void parseMsg(String msg) { - System.out.println("parsing "+msg); - if (getId.match(msg)) { - // a new Ghost has appeared - System.out.println("a new Ghost has appeared"); - int newGhostId = serial++; - out.println("ID id="+getId.getParen(1)+" value="+newGhostId); - out.flush(); - // TODO create Puppets in each other ProxyClient + // System.out.println("PM parsing "+msg); + if (helloRE.match(msg)) { + busDomain = helloRE.getParen(1); + System.out.println("PC connected from "+hostname+", on the bus "+busDomain); + } else if (getId.match(msg)) { + // a new Ghost has appeared and requests an Id + System.out.println("PM registering a new Ghost"); + String newGhostId = new Integer(serial++).toString(); + // I give it its ID + send("ID id="+getId.getParen(1)+" value="+newGhostId); + ghostFathers.put(newGhostId,this); // remember the SubReader holding this Ghost + // I ask all other Clients to prepare a puppet + for (Enumeration e=proxyClients.elements();e.hasMoreElements();) { + SubReader sr = (SubReader)e.nextElement(); + if (sr!=SubReader.this) { + // System.out.println("propagate CreatePuppet to "+sr.busDomain); + sr.send("CreatePuppet id="+newGhostId); + } else { + // System.out.println("won't propagate CreatePuppet to "+sr.busDomain); + } + } + } else if (fwdGhost.match(msg)) { + System.out.println("PM forwarding ["+msg+"] to its Ghost"); + SubReader sr = (SubReader) ghostFathers.get(fwdGhost.getParen(1)); + sr.send(msg); + } else if (fwdPuppet.match(msg)) { + System.out.println("PM forwarding ["+msg+"] to all other PCs"); for (Enumeration e=proxyClients.elements();e.hasMoreElements();) { SubReader sr = (SubReader)e.nextElement(); - if (sr!=SubReader.this) sr.send("CreatePuppet id="+newGhostId); + if (sr!=SubReader.this) sr.send(msg); } - } else if (fwd.match(msg)) { - System.out.println("forwarding "+msg); - // TODO forward the message to all relevant puppets } else { System.out.println("error unknown message "+msg); } diff --git a/src/Puppet.java b/src/Puppet.java index d122d52..e2a90ee 100644 --- a/src/Puppet.java +++ b/src/Puppet.java @@ -5,35 +5,124 @@ import java.io.*; import java.util.*; import org.apache.regexp.*; -class Puppet extends Ivy { +class Puppet { - Hashtable bound = new Hashtable(); // clef: l'ID de la regexp choisi par le ghost, valeur: l'Int de l'abonnement local - String myDomain; + // the mapping between Ghost regexp and local bus regexp numbers + Hashtable bound = new Hashtable(); // ghostID localID + Hashtable regexps = new Hashtable(); // ghostID textRegexp + String domain; + String appName; ProxyClient pc; + String id; + boolean started; + PuppetIvy bus; - Puppet(ProxyClient pc,String domain) { - super("noname","noname not ready",null); - myDomain=domain; + Puppet(ProxyClient pc,String id,String domain) { + this.domain=domain; this.pc=pc; + this.id=id; } - void parse(String s){ - System.out.println("the puppet must parse "+s); + void sendGhost(String s) { pc.send("ForwardGhost id="+id+" buffer="+s); } + + class ForwardMessenger implements IvyMessageListener { + String localId,ghostId; + public ForwardMessenger(String ghostId,String re) throws IvyException { + this.ghostId=ghostId; + this.localId = (new Integer(bus.bindMsg(re,ForwardMessenger.this))).toString(); + bound.put(ghostId,localId); + } + public void receive(IvyClient ic,String args[]) { + String tosend = IvyClient.Msg+" "+ghostId+IvyClient.StartArg; + for (int i=0;i<args.length;i++) tosend+=args[i]+IvyClient.EndArg; + sendGhost(tosend); + } + } // ForwardMessenger + + void addRegexp(String ghostId,String re) { + regexps.put(ghostId,re); + try { + if (started) new ForwardMessenger(ghostId,re); + } catch( IvyException ie) { ie.printStackTrace(); } + } + + void removeRegexp(String ghostId) { + try { + bus.unBindMsg(Integer.parseInt((String)bound.remove(ghostId))); + } catch( IvyException ie) { ie.printStackTrace(); } + } + + void stop() { + if (started) bus.stop(); } - protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException { - return new PuppetClient(); + // ivy forwarded protocol message + static final RE ivyProto = new RE("(\\d+) (\\d+)\\02(.*)"); + void parse(String s) throws IvyException { + if (!ivyProto.match(s)) { System.out.println("Puppet error, can't parse "+s); return; } + int pcode=Integer.parseInt(ivyProto.getParen(1)); + String pid=ivyProto.getParen(2); + String args=ivyProto.getParen(3); + trace("must parse code:"+pcode+" id:"+pid+" args:"+args); + switch (pcode) { + case IvyClient.AddRegexp: // the Ghost's peer subscribes to something + addRegexp(pid,args); + break; + case IvyClient.DelRegexp: // the Ghost's peer unsubscribes to something + removeRegexp(pid); + break; + case IvyClient.Bye: // the Ghost's peer disconnects gracefully + bus.stop(); + // TODO end of the puppet ? + break; + case IvyClient.Die: + // the Ghost's peer wants to ... kill ProxyClient ? + break; + case IvyClient.Msg: + // the Ghost's peer sends a message to ProxyClient, with regard to one + // of our subscriptions + // TODO à qui le faire passer ? + break; + case IvyClient.SchizoToken: + appName = args; + bus = new PuppetIvy(appName,appName+" fakeready",null); + for (Enumeration e = regexps.keys();e.hasMoreElements();) { + String ghostId=(String)e.nextElement(); + String re=(String)regexps.get(ghostId); + new ForwardMessenger(ghostId,re); + } + started=true; + trace("starting the bus on "+domain); + bus.start(domain); + break; + case IvyClient.Error: + case IvyClient.EndRegexp: + case IvyClient.DirectMsg: + case IvyClient.Ping: + case IvyClient.Pong: + default: + trace("unused Ivy protocol code "+pcode); + } } - class PuppetClient extends IvyClient { - protected synchronized void sendBuffer( String buffer ) throws IvyException { - super.sendBuffer(buffer); // and to all the agents on the Ghost bus ? I'm not sure + class PuppetIvy extends Ivy { + PuppetIvy(String name,String ready,IvyApplicationListener ial){super(name,ready,ial);} + protected IvyClient createIvyClient(Socket s,int port, boolean domachin) throws IOException { + return new PuppetIvyClient(PuppetIvy.this,s,port,domachin); } + int getAP() {return applicationPort;} + } + class PuppetIvyClient extends IvyClient { + PuppetIvyClient(Ivy bus,Socket s,int port,boolean b) throws IOException { super(bus,s,port,b); } + protected synchronized void sendBuffer( String s ) throws IvyException { + super.sendBuffer(s); // and to all the agents on the Ghost bus ? I'm not sure + } protected boolean newParseMsg(String s) throws IvyException { - return super.newParseMsg(s); + return super.newParseMsg(s); // I'm a normal Ivy citizen } - } // class PuppetClient + } -} + void trace(String s) { System.out.println("Puppet["+id+"] "+s);} +} |