From 68a44941a13f9a63fa78137441fe16644a83309f Mon Sep 17 00:00:00 2001 From: chatty Date: Wed, 25 Jun 1997 13:54:36 +0000 Subject: Initial revision --- comm/BusAccess.cc | 471 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ comm/BusAccess.h | 52 ++++++ 2 files changed, 523 insertions(+) create mode 100644 comm/BusAccess.cc create mode 100644 comm/BusAccess.h (limited to 'comm') diff --git a/comm/BusAccess.cc b/comm/BusAccess.cc new file mode 100644 index 0000000..c92a52c --- /dev/null +++ b/comm/BusAccess.cc @@ -0,0 +1,471 @@ +/* + * The Unix Channel + * + * by Michel Beaudouin-Lafon + * + * Copyright 1990-1997 + * Laboratoire de Recherche en Informatique (LRI) + * + * CENA bus access, by Stephane Chatty + * + * $Id$ + * $CurLog$ + */ + +#include "BusAccess.h" +#include "Multiplexer.h" +#include "BufStream.h" +#include "ccu/String.h" +#include "dnn/Trigger.h" +#include "dnn/Reaction.h" +#include +#include +#include +#include + +int UchBusAccess::Version = 2; + +typedef enum { + BusBye, /* quitte l'application ( non utilise ) */ + BusRegexp, /* expression reguliere d'un client */ + BusMsg, /* message reel */ + BusError, /* error message */ + BusDelRegexp, /* Remove expression reguliere */ + BusReady +} BusMsgType; + + + + +class UchBusSubscription { +protected: + int Id; + CcuString Regexp; + regex_t Compiled; +public: + UchBusSubscription (int, const char*); +inline int GetId () const { return Id; } +inline const char* GetRegexp () const { return Regexp; } +inline const regex_t* GetCompiled () const { return &Compiled; } +}; + +class UchBusTrigger : public DnnTrigger { +protected: + int Id; + CcuString Regexp; +public: + UchBusTrigger (const char*); +inline int GetId () const { return Id; } +inline void SetId (int id) { Id = id; } +inline const char* GetRegexp () const { return Regexp; } +}; + +class UchBusServer : public UchStream { +public: + UchBusAccess* MyBus; + + UchBusServer (UchBusAccess*); + void HandleRead (); +}; + +class UchBusAgent : public UchBufStream { +protected: + CcuListOf RemoteSubscriptions; + UchBusAccess* MyBus; + + void WriteString (const char*); + void ProcessLine (const char*); + +public: + UchBusAgent (int, UchBusAccess*); + UchBusAgent (const char*, sword, UchBusAccess*); + void HandleRead (); + void SendLocalSubscription (const UchBusTrigger*); + void SendLocalSubscriptions (); + void Emit (const char*); +}; + +/* this is used for local subscriptions */ +UchBusTrigger :: UchBusTrigger (const char* r) +: Id (-1), + Regexp (r) +{ +} + +/* this is used for remote subscriptions */ +UchBusSubscription :: UchBusSubscription (int id, const char* r) +: Id (id), + Regexp (r) +{ + int reg = regcomp (&Compiled, r, REG_ICASE|REG_EXTENDED); + if (reg != 0) + cerr << "bad regexp " << r << "\n"; +} + +UchBusServer :: UchBusServer (UchBusAccess* bus) +: UchStream (new UchInetAddress (ANYADDR)), + MyBus (bus) +{ + if (Listen () < 0) { + cerr << "cannot listen on bus server\n"; + return; + } + SetMode (IORead); + Add (UchMpx); +} + +/* this happens when another agent opens a connection with us */ +void +UchBusServer :: HandleRead () +{ + /* Accept connection */ + int fd = Accept (); + if (fd < 0) { + cerr << "cannot accept connection on server\n"; + return; + } + + /* create agent handle */ + cout << "Someone was there. Hello!\n"; + UchBusAgent* a = new UchBusAgent (fd, MyBus); + + /* send subscriptions*/ + a->SendLocalSubscriptions (); +} + +UchBusAgent :: UchBusAgent (int fd, UchBusAccess* bus) +: UchBufStream (), + RemoteSubscriptions (), + MyBus (bus) +{ + SetMode (IOReadWrite); + UchChannel::Open (fd); + Add (UchMpx); + MyBus->Agents.Append (this); +} + + +UchBusAgent :: UchBusAgent (const char* host, sword port, UchBusAccess *bus) +: UchBufStream (0, new UchInetAddress (host, port)), + RemoteSubscriptions (), + MyBus (bus) +{ + if (!Setup ()) { + cerr << "cannot set up stream on " << host << ":" << port << "\n"; + return; + } + SetMode (IOReadWrite); + Add (UchMpx); + MyBus->Agents.Append (this); +} + + +UchBusAgent :: ~UchBusAgent () +{ + MyBus->Agents.Remove (this); + Remove (); + Close (); +} + + +/* this had to be redefined because UchChannel adds null character +and we don't want it */ +void +UchBusAgent :: WriteString (const char* s) +{ + /* now that this is a BufStream, perhaps we should use Buffer? */ + Write ((byte*) s, strlen (s)); +} + +void +UchBusAgent :: SendLocalSubscriptions () +{ + cout << "sending subscriptions\n"; + CcuListIterOf s = MyBus->LocalSubscriptions; + while (++s) + SendLocalSubscription (*s); + + /* say I'm ready */ + char buf[32]; + sprintf (buf, "%d %d\n", BusReady, 0); + *this << buf; +} + +void +UchBusAgent :: SendLocalSubscription (const UchBusTrigger* s) +{ + char buf[32]; + sprintf (buf, "%d %d ", BusRegexp, s->GetId ()); + WriteString (buf); + WriteString (s->GetRegexp ()); + WriteString ("\n"); + cout << "Sending regexp: " << buf << s->GetRegexp () << '\n'; +} + + +/* this happens when another agent talks */ +void +UchBusAgent :: HandleRead () +{ + /* read in buffer */ + if (InBuffer.BufLength () == 0) + InBuffer.NeedSize (128); + int n = Read (InBuffer); + + /* handle errors */ + if (n == -2) { + InBuffer.Grow (); + n = Read (InBuffer); + } + if (n <= 0) { + delete this; + return; + } + + /* slice input */ + char* start = (char*) InBuffer.Buffer (); + char* stop = start + InBuffer.BufLength (); + for (char* p = start; p < stop; p++) { + if (*p == '\n') { + *p = '\0'; + ProcessLine (start); + start = p+1; + InBuffer.Flush ((byte*) start); + } + } +} + + +#if 0 + byte buf [256]; + int nb_read = Read (buf, 256); + if (nb_read <= 0) { + delete this; + return; + } + buf[nb_read] = '\0'; + cout << "agent says:" << (const char*) buf; +} +#endif + +void +UchBusAgent :: ProcessLine (const char* line) +{ + /* extract */ + int msg_type, id, nb_chars; + int nb_fields = sscanf (line,"%d %d%n", &msg_type, &id, &nb_chars); + if (nb_fields != 2) { + cerr << "Bad format: " << line; + return; + } + + const char* p = (const char*) (line+nb_chars); + switch (msg_type) { + case BusBye: + cout << "agent says goodbye\n"; + delete this; + break; + case BusRegexp: + RemoteSubscriptions.Append (new UchBusSubscription (id, p+1)); + break; + case BusMsg: + MyBus->HandleEvent (id, p); + break; + case BusDelRegexp: + cerr << "not implemented: DelRegexp\n"; + break; + case BusReady: + break; + case BusError: + break; + } +} + +void +UchBusAgent :: Emit (const char* msg) +{ + const int MAX_MATCHING_ARGS = 20; + regmatch_t pmatch[MAX_MATCHING_ARGS+1]; + + /* scan all subscriptions from this agent */ + CcuListIterOf r = RemoteSubscriptions; + while (++r) { + UchBusSubscription* s = *r; + /* does the message match the regexp? */ + if (regexec (s->GetCompiled (), msg, MAX_MATCHING_ARGS, pmatch, 0) != 0) + continue; + + cerr << msg << " matches " << s->GetRegexp () << "\n"; + /* if yes, emit it as such */ + char buf[64]; + sprintf (buf, "%d %d", BusMsg, s->GetId ()); + WriteString (buf); + regmatch_t* p = &pmatch[1]; + while (p->rm_so != -1) { + char buf[256]; + sprintf (buf, " %.*s", p->rm_eo - p->rm_so, msg + p->rm_so); + WriteString (buf); + ++p; + } + WriteString ("\n"); + } +} + + +UchBusAccess :: UchBusAccess (sword port) +: UchDatagram (), + UchBaseSignalHandler (*UchMpx, SigInt), + ListenPort (0), + BroadcastPort (port), + Server (0), + Agents (), + IdToSubscription (256), + LocalSubscriptions () +{ + /** set up datagram **/ + UchInetAddress* a = new UchInetAddress (ANYADDR, port); + BindTo (a); + if (!Open ()) { + cerr << "Cannot open socket\n"; + return; + } + if (!AllowBroadcast ()) { + cerr << "socket won't broadcast\n"; + return; + } + if (!ReuseAddress ()) { + cerr << "socket won't let address be reused\n"; + return; + } + + if (Bind () < 0) { + cerr << "cannot bind socket to port " << port << "\n"; + return; + } + SetMode (IOReadWrite); + + /** set up stream **/ + Server = new UchBusServer (this); + a = dynamic_cast(Server->BoundTo ()); + if (!a) { + cerr << "bad address type?!\n"; + return; + } + ListenPort = a->Port (); + + /** emit handshake on datagram **/ + char handshake[32]; + sprintf (handshake, "%d %hu\n", Version, ListenPort); + UchInetAddress ba (BROADCAST, BroadcastPort); + Send ((byte*)handshake, strlen (handshake), ba); + + /* add to main loop */ + Add (UchMpx); +} + + +UchBusAccess :: ~UchBusAccess () +{ +} + +/* this happens when someone sends a broadcast on the bus: a newcomer */ +void +UchBusAccess :: HandleRead () +{ + /* read data */ + byte buf [256]; + int nb_bytes = Receive (buf, 256); + if (nb_bytes < 0) { + cerr << "read error on broadcast port\n"; + return; + } + + /* decode data: should be a handshake */ + int bus_version; + sword remote_listen_port; + int nb_infos = sscanf ((char*) buf, "%d %hu", &bus_version, &remote_listen_port); + if (nb_infos < 2) { + cerr << "bad data on broadcast port\n"; + return; + } + + /* where did this come from? */ + UchInetAddress* a = dynamic_cast (From ()); + if (!a) { + cerr << "bad address!?\n"; + return; + } + const char* remote_host = a->GetHostName (); + sword remote_port = a->Port (); + + /* sanity checks: is it the same protocol version? is it myself calling? */ + if (remote_listen_port == ListenPort) + return; + if (bus_version != Version) { + cerr << "bad protocol version\n"; + return; + } + + printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port ); + + /* Create agent handle */ + UchBusAgent* ag = new UchBusAgent (remote_host, remote_listen_port, this); + + /* send subscriptions */ + ag->SendLocalSubscriptions (); +} + + +void +UchBusAccess :: HandleEvent (int id, const char* msg) +{ + UchBusTrigger* s = IdToSubscription.Get (id); + if (!s) { + cerr << "bad regexp id " << id << "\n"; + return; + } + cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n"; +} + +/* this is the handler for SIGINT */ +void +UchBusAccess :: DeferredHandle (int) +{ + cerr << "quitting\n"; + UchStop (); +} + +void +UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* regexp) +{ + /* store new subscription */ + UchBusTrigger* s = new UchBusTrigger (regexp); + LocalSubscriptions.Append (s); + int id = IdToSubscription.Store (s); + s->SetId (id); + + /* emit to current remote agents */ + CcuListIterOf a = Agents; + while (++a) + (*a)->SendLocalSubscription (s); + + /* link to reaction */ + r.SubscribeTo (*s); +} + + +void +UchBusAccess :: Emit (const char* fmt, ...) +{ + char message[4096]; + va_list ap; + + va_start (ap, fmt); + vsprintf (message, fmt, ap); + va_end (ap ); + + CcuListIterOf a = Agents; + while (++a) + (*a)->Emit (message); +} + diff --git a/comm/BusAccess.h b/comm/BusAccess.h new file mode 100644 index 0000000..e0bcdf1 --- /dev/null +++ b/comm/BusAccess.h @@ -0,0 +1,52 @@ +/* + * The Unix Channel + * + * by Michel Beaudouin-Lafon + * + * Copyright 1990-1997 + * Laboratoire de Recherche en Informatique (LRI) + * + * Bus access, by Stephane Chatty + * + * $Id$ + * $CurLog$ + */ + + +#ifndef BusAccess_H_ +#define BusAccess_H_ + +#include "Datagram.h" +#include "Stream.h" +#include "SignalHandler.h" +#include "ccu/List.h" +#include "ccu/IdTable.h" + +class UchBusSubscription; +class UchBusTrigger; +class DnnBaseReaction; + +class UchBusAccess : public UchDatagram, public UchBaseSignalHandler { +friend class UchBusAgent; + +protected: +static int Version; + sword ListenPort; + sword BroadcastPort; + UchStream* Server; + CcuListOf Agents; + CcuListOf LocalSubscriptions; + CcuIdTableOf IdToSubscription; + + void HandleRead (); + void DeferredHandle (int); + void HandleEvent (int, const char*); + +public: + UchBusAccess (sword); + ~UchBusAccess (); + void Subscribe (DnnBaseReaction&, const char*); + void Emit (const char*, ...); +}; + +#endif /* BusAccess_H_ */ -- cgit v1.1