summaryrefslogtreecommitdiff
path: root/comm
diff options
context:
space:
mode:
authorchatty1997-06-25 13:54:36 +0000
committerchatty1997-06-25 13:54:36 +0000
commit68a44941a13f9a63fa78137441fe16644a83309f (patch)
treec0a02a2f3d730e3922e0a80f281db02bf83d16ca /comm
parentc3f953864d5a37997a7e32df76e33bc23e163911 (diff)
downloadivy-league-68a44941a13f9a63fa78137441fe16644a83309f.zip
ivy-league-68a44941a13f9a63fa78137441fe16644a83309f.tar.gz
ivy-league-68a44941a13f9a63fa78137441fe16644a83309f.tar.bz2
ivy-league-68a44941a13f9a63fa78137441fe16644a83309f.tar.xz
Initial revision
Diffstat (limited to 'comm')
-rw-r--r--comm/BusAccess.cc471
-rw-r--r--comm/BusAccess.h52
2 files changed, 523 insertions, 0 deletions
diff --git a/comm/BusAccess.cc b/comm/BusAccess.cc
new file mode 100644
index 0000000..c92a52c
--- /dev/null
+++ b/comm/BusAccess.cc
@@ -0,0 +1,471 @@
+/*
+ * The Unix Channel
+ *
+ * by Michel Beaudouin-Lafon
+ *
+ * Copyright 1990-1997
+ * Laboratoire de Recherche en Informatique (LRI)
+ *
+ * CENA bus access, by Stephane Chatty
+ *
+ * $Id$
+ * $CurLog$
+ */
+
+#include "BusAccess.h"
+#include "Multiplexer.h"
+#include "BufStream.h"
+#include "ccu/String.h"
+#include "dnn/Trigger.h"
+#include "dnn/Reaction.h"
+#include <stdio.h>
+#include <ostream.h>
+#include <regex.h>
+#include <stdarg.h>
+
+int UchBusAccess::Version = 2;
+
+typedef enum {
+ BusBye, /* quitte l'application ( non utilise ) */
+ BusRegexp, /* expression reguliere d'un client */
+ BusMsg, /* message reel */
+ BusError, /* error message */
+ BusDelRegexp, /* Remove expression reguliere */
+ BusReady
+} BusMsgType;
+
+
+
+
+class UchBusSubscription {
+protected:
+ int Id;
+ CcuString Regexp;
+ regex_t Compiled;
+public:
+ UchBusSubscription (int, const char*);
+inline int GetId () const { return Id; }
+inline const char* GetRegexp () const { return Regexp; }
+inline const regex_t* GetCompiled () const { return &Compiled; }
+};
+
+class UchBusTrigger : public DnnTrigger {
+protected:
+ int Id;
+ CcuString Regexp;
+public:
+ UchBusTrigger (const char*);
+inline int GetId () const { return Id; }
+inline void SetId (int id) { Id = id; }
+inline const char* GetRegexp () const { return Regexp; }
+};
+
+class UchBusServer : public UchStream {
+public:
+ UchBusAccess* MyBus;
+
+ UchBusServer (UchBusAccess*);
+ void HandleRead ();
+};
+
+class UchBusAgent : public UchBufStream {
+protected:
+ CcuListOf<UchBusSubscription> RemoteSubscriptions;
+ UchBusAccess* MyBus;
+
+ void WriteString (const char*);
+ void ProcessLine (const char*);
+
+public:
+ UchBusAgent (int, UchBusAccess*);
+ UchBusAgent (const char*, sword, UchBusAccess*);
+ void HandleRead ();
+ void SendLocalSubscription (const UchBusTrigger*);
+ void SendLocalSubscriptions ();
+ void Emit (const char*);
+};
+
+/* this is used for local subscriptions */
+UchBusTrigger :: UchBusTrigger (const char* r)
+: Id (-1),
+ Regexp (r)
+{
+}
+
+/* this is used for remote subscriptions */
+UchBusSubscription :: UchBusSubscription (int id, const char* r)
+: Id (id),
+ Regexp (r)
+{
+ int reg = regcomp (&Compiled, r, REG_ICASE|REG_EXTENDED);
+ if (reg != 0)
+ cerr << "bad regexp " << r << "\n";
+}
+
+UchBusServer :: UchBusServer (UchBusAccess* bus)
+: UchStream (new UchInetAddress (ANYADDR)),
+ MyBus (bus)
+{
+ if (Listen () < 0) {
+ cerr << "cannot listen on bus server\n";
+ return;
+ }
+ SetMode (IORead);
+ Add (UchMpx);
+}
+
+/* this happens when another agent opens a connection with us */
+void
+UchBusServer :: HandleRead ()
+{
+ /* Accept connection */
+ int fd = Accept ();
+ if (fd < 0) {
+ cerr << "cannot accept connection on server\n";
+ return;
+ }
+
+ /* create agent handle */
+ cout << "Someone was there. Hello!\n";
+ UchBusAgent* a = new UchBusAgent (fd, MyBus);
+
+ /* send subscriptions*/
+ a->SendLocalSubscriptions ();
+}
+
+UchBusAgent :: UchBusAgent (int fd, UchBusAccess* bus)
+: UchBufStream (),
+ RemoteSubscriptions (),
+ MyBus (bus)
+{
+ SetMode (IOReadWrite);
+ UchChannel::Open (fd);
+ Add (UchMpx);
+ MyBus->Agents.Append (this);
+}
+
+
+UchBusAgent :: UchBusAgent (const char* host, sword port, UchBusAccess *bus)
+: UchBufStream (0, new UchInetAddress (host, port)),
+ RemoteSubscriptions (),
+ MyBus (bus)
+{
+ if (!Setup ()) {
+ cerr << "cannot set up stream on " << host << ":" << port << "\n";
+ return;
+ }
+ SetMode (IOReadWrite);
+ Add (UchMpx);
+ MyBus->Agents.Append (this);
+}
+
+
+UchBusAgent :: ~UchBusAgent ()
+{
+ MyBus->Agents.Remove (this);
+ Remove ();
+ Close ();
+}
+
+
+/* this had to be redefined because UchChannel adds null character
+and we don't want it */
+void
+UchBusAgent :: WriteString (const char* s)
+{
+ /* now that this is a BufStream, perhaps we should use Buffer? */
+ Write ((byte*) s, strlen (s));
+}
+
+void
+UchBusAgent :: SendLocalSubscriptions ()
+{
+ cout << "sending subscriptions\n";
+ CcuListIterOf<UchBusTrigger> s = MyBus->LocalSubscriptions;
+ while (++s)
+ SendLocalSubscription (*s);
+
+ /* say I'm ready */
+ char buf[32];
+ sprintf (buf, "%d %d\n", BusReady, 0);
+ *this << buf;
+}
+
+void
+UchBusAgent :: SendLocalSubscription (const UchBusTrigger* s)
+{
+ char buf[32];
+ sprintf (buf, "%d %d ", BusRegexp, s->GetId ());
+ WriteString (buf);
+ WriteString (s->GetRegexp ());
+ WriteString ("\n");
+ cout << "Sending regexp: " << buf << s->GetRegexp () << '\n';
+}
+
+
+/* this happens when another agent talks */
+void
+UchBusAgent :: HandleRead ()
+{
+ /* read in buffer */
+ if (InBuffer.BufLength () == 0)
+ InBuffer.NeedSize (128);
+ int n = Read (InBuffer);
+
+ /* handle errors */
+ if (n == -2) {
+ InBuffer.Grow ();
+ n = Read (InBuffer);
+ }
+ if (n <= 0) {
+ delete this;
+ return;
+ }
+
+ /* slice input */
+ char* start = (char*) InBuffer.Buffer ();
+ char* stop = start + InBuffer.BufLength ();
+ for (char* p = start; p < stop; p++) {
+ if (*p == '\n') {
+ *p = '\0';
+ ProcessLine (start);
+ start = p+1;
+ InBuffer.Flush ((byte*) start);
+ }
+ }
+}
+
+
+#if 0
+ byte buf [256];
+ int nb_read = Read (buf, 256);
+ if (nb_read <= 0) {
+ delete this;
+ return;
+ }
+ buf[nb_read] = '\0';
+ cout << "agent says:" << (const char*) buf;
+}
+#endif
+
+void
+UchBusAgent :: ProcessLine (const char* line)
+{
+ /* extract */
+ int msg_type, id, nb_chars;
+ int nb_fields = sscanf (line,"%d %d%n", &msg_type, &id, &nb_chars);
+ if (nb_fields != 2) {
+ cerr << "Bad format: " << line;
+ return;
+ }
+
+ const char* p = (const char*) (line+nb_chars);
+ switch (msg_type) {
+ case BusBye:
+ cout << "agent says goodbye\n";
+ delete this;
+ break;
+ case BusRegexp:
+ RemoteSubscriptions.Append (new UchBusSubscription (id, p+1));
+ break;
+ case BusMsg:
+ MyBus->HandleEvent (id, p);
+ break;
+ case BusDelRegexp:
+ cerr << "not implemented: DelRegexp\n";
+ break;
+ case BusReady:
+ break;
+ case BusError:
+ break;
+ }
+}
+
+void
+UchBusAgent :: Emit (const char* msg)
+{
+ const int MAX_MATCHING_ARGS = 20;
+ regmatch_t pmatch[MAX_MATCHING_ARGS+1];
+
+ /* scan all subscriptions from this agent */
+ CcuListIterOf<UchBusSubscription> r = RemoteSubscriptions;
+ while (++r) {
+ UchBusSubscription* s = *r;
+ /* does the message match the regexp? */
+ if (regexec (s->GetCompiled (), msg, MAX_MATCHING_ARGS, pmatch, 0) != 0)
+ continue;
+
+ cerr << msg << " matches " << s->GetRegexp () << "\n";
+ /* if yes, emit it as such */
+ char buf[64];
+ sprintf (buf, "%d %d", BusMsg, s->GetId ());
+ WriteString (buf);
+ regmatch_t* p = &pmatch[1];
+ while (p->rm_so != -1) {
+ char buf[256];
+ sprintf (buf, " %.*s", p->rm_eo - p->rm_so, msg + p->rm_so);
+ WriteString (buf);
+ ++p;
+ }
+ WriteString ("\n");
+ }
+}
+
+
+UchBusAccess :: UchBusAccess (sword port)
+: UchDatagram (),
+ UchBaseSignalHandler (*UchMpx, SigInt),
+ ListenPort (0),
+ BroadcastPort (port),
+ Server (0),
+ Agents (),
+ IdToSubscription (256),
+ LocalSubscriptions ()
+{
+ /** set up datagram **/
+ UchInetAddress* a = new UchInetAddress (ANYADDR, port);
+ BindTo (a);
+ if (!Open ()) {
+ cerr << "Cannot open socket\n";
+ return;
+ }
+ if (!AllowBroadcast ()) {
+ cerr << "socket won't broadcast\n";
+ return;
+ }
+ if (!ReuseAddress ()) {
+ cerr << "socket won't let address be reused\n";
+ return;
+ }
+
+ if (Bind () < 0) {
+ cerr << "cannot bind socket to port " << port << "\n";
+ return;
+ }
+ SetMode (IOReadWrite);
+
+ /** set up stream **/
+ Server = new UchBusServer (this);
+ a = dynamic_cast<UchInetAddress*>(Server->BoundTo ());
+ if (!a) {
+ cerr << "bad address type?!\n";
+ return;
+ }
+ ListenPort = a->Port ();
+
+ /** emit handshake on datagram **/
+ char handshake[32];
+ sprintf (handshake, "%d %hu\n", Version, ListenPort);
+ UchInetAddress ba (BROADCAST, BroadcastPort);
+ Send ((byte*)handshake, strlen (handshake), ba);
+
+ /* add to main loop */
+ Add (UchMpx);
+}
+
+
+UchBusAccess :: ~UchBusAccess ()
+{
+}
+
+/* this happens when someone sends a broadcast on the bus: a newcomer */
+void
+UchBusAccess :: HandleRead ()
+{
+ /* read data */
+ byte buf [256];
+ int nb_bytes = Receive (buf, 256);
+ if (nb_bytes < 0) {
+ cerr << "read error on broadcast port\n";
+ return;
+ }
+
+ /* decode data: should be a handshake */
+ int bus_version;
+ sword remote_listen_port;
+ int nb_infos = sscanf ((char*) buf, "%d %hu", &bus_version, &remote_listen_port);
+ if (nb_infos < 2) {
+ cerr << "bad data on broadcast port\n";
+ return;
+ }
+
+ /* where did this come from? */
+ UchInetAddress* a = dynamic_cast<UchInetAddress*> (From ());
+ if (!a) {
+ cerr << "bad address!?\n";
+ return;
+ }
+ const char* remote_host = a->GetHostName ();
+ sword remote_port = a->Port ();
+
+ /* sanity checks: is it the same protocol version? is it myself calling? */
+ if (remote_listen_port == ListenPort)
+ return;
+ if (bus_version != Version) {
+ cerr << "bad protocol version\n";
+ return;
+ }
+
+ printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port );
+
+ /* Create agent handle */
+ UchBusAgent* ag = new UchBusAgent (remote_host, remote_listen_port, this);
+
+ /* send subscriptions */
+ ag->SendLocalSubscriptions ();
+}
+
+
+void
+UchBusAccess :: HandleEvent (int id, const char* msg)
+{
+ UchBusTrigger* s = IdToSubscription.Get (id);
+ if (!s) {
+ cerr << "bad regexp id " << id << "\n";
+ return;
+ }
+ cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n";
+}
+
+/* this is the handler for SIGINT */
+void
+UchBusAccess :: DeferredHandle (int)
+{
+ cerr << "quitting\n";
+ UchStop ();
+}
+
+void
+UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* regexp)
+{
+ /* store new subscription */
+ UchBusTrigger* s = new UchBusTrigger (regexp);
+ LocalSubscriptions.Append (s);
+ int id = IdToSubscription.Store (s);
+ s->SetId (id);
+
+ /* emit to current remote agents */
+ CcuListIterOf<UchBusAgent> a = Agents;
+ while (++a)
+ (*a)->SendLocalSubscription (s);
+
+ /* link to reaction */
+ r.SubscribeTo (*s);
+}
+
+
+void
+UchBusAccess :: Emit (const char* fmt, ...)
+{
+ char message[4096];
+ va_list ap;
+
+ va_start (ap, fmt);
+ vsprintf (message, fmt, ap);
+ va_end (ap );
+
+ CcuListIterOf <UchBusAgent> a = Agents;
+ while (++a)
+ (*a)->Emit (message);
+}
+
diff --git a/comm/BusAccess.h b/comm/BusAccess.h
new file mode 100644
index 0000000..e0bcdf1
--- /dev/null
+++ b/comm/BusAccess.h
@@ -0,0 +1,52 @@
+/*
+ * The Unix Channel
+ *
+ * by Michel Beaudouin-Lafon
+ *
+ * Copyright 1990-1997
+ * Laboratoire de Recherche en Informatique (LRI)
+ *
+ * Bus access, by Stephane Chatty
+ *
+ * $Id$
+ * $CurLog$
+ */
+
+
+#ifndef BusAccess_H_
+#define BusAccess_H_
+
+#include "Datagram.h"
+#include "Stream.h"
+#include "SignalHandler.h"
+#include "ccu/List.h"
+#include "ccu/IdTable.h"
+
+class UchBusSubscription;
+class UchBusTrigger;
+class DnnBaseReaction;
+
+class UchBusAccess : public UchDatagram, public UchBaseSignalHandler {
+friend class UchBusAgent;
+
+protected:
+static int Version;
+ sword ListenPort;
+ sword BroadcastPort;
+ UchStream* Server;
+ CcuListOf<UchBusAgent> Agents;
+ CcuListOf<UchBusTrigger> LocalSubscriptions;
+ CcuIdTableOf<UchBusTrigger> IdToSubscription;
+
+ void HandleRead ();
+ void DeferredHandle (int);
+ void HandleEvent (int, const char*);
+
+public:
+ UchBusAccess (sword);
+ ~UchBusAccess ();
+ void Subscribe (DnnBaseReaction&, const char*);
+ void Emit (const char*, ...);
+};
+
+#endif /* BusAccess_H_ */