/* * The Unix Channel * * by Michel Beaudouin-Lafon * * Copyright 1990-1997 * Laboratoire de Recherche en Informatique (LRI) * * CENA bus access, by Stephane Chatty * * $Id$ * $CurLog$ */ #include "BusAccess.h" #include "Multiplexer.h" #include "BufStream.h" #include "ccu/String.h" #include "dnn/Trigger.h" #include "dnn/Reaction.h" #include "ccu/Signal.h" #include #include #include #include #include #include int UchBusAccess::Version = 3; const char* const UchBusAccess::DefaultBus = "127:2010"; const sword UchBusAccess::DefaultBusPort = 2010; const char* const UchBusAccess::DefaultBusNet = "127"; static int _BusDebug = 0; const unsigned char STX = '\002'; const unsigned char ETX = '\003'; typedef enum { BusBye, /* quitte l'application */ BusRegexp, /* expression reguliere d'un client */ BusMsg, /* message reel */ BusError, /* error message */ BusDelRegexp, /* Remove expression reguliere */ BusReady, BusHello, BusDirectMsg, BusDie } BusMsgType; class UchBusSubscription { protected: int Id; CcuString Regexp; regex_t Compiled; public: UchBusSubscription (int, const char*); inline int GetId () const { return Id; } inline const char* GetRegexp () const { return Regexp; } inline const regex_t* GetCompiled () const { return &Compiled; } }; class UchBusTrigger : public DnnTrigger { protected: int Id; CcuString Regexp; public: UchBusTrigger (const char*); inline int GetId () const { return Id; } inline void SetId (int id) { Id = id; } inline const char* GetRegexp () const { return Regexp; } }; class UchBusServer : public UchStream { public: UchBusAccess* MyBus; UchBusServer (UchBusAccess*); void HandleRead (); }; /* this is used for local subscriptions */ UchBusTrigger :: UchBusTrigger (const char* r) : Id (-1), Regexp (r) { } /* this is used for remote subscriptions */ UchBusSubscription :: UchBusSubscription (int id, const char* r) : Id (id), Regexp (r) { int reg = regcomp (&Compiled, r, REG_ICASE|REG_EXTENDED); if (reg != 0) cerr << "bad regexp " << r << "\n"; } UchBusServer :: UchBusServer (UchBusAccess* bus) : UchStream (new UchInetAddress (ANYADDR)), MyBus (bus) { if (Listen () < 0) { cerr << "cannot listen on bus server\n"; return; } SetMode (IORead); Add (UchMpx); } /* this happens when another agent opens a connection with us */ void UchBusServer :: HandleRead () { /* Accept connection */ int fd = Accept (); if (fd < 0) { cerr << "cannot accept connection on server\n"; return; } /* create agent handle */ if (_BusDebug) cout << "Someone was there. Hello!\n"; UchBusAgent* a = new UchBusAgent (fd, MyBus); /* send name and subscriptions */ a->SayHello (); } UchBusAgent :: UchBusAgent (int fd, UchBusAccess* bus) : UchBufStream (), RemoteSubscriptions (), MyBus (bus), DirectMsg (16) { // SetMode (IOReadWrite); SetMode (IORead); UchChannel::Open (fd); Add (UchMpx); MyBus->Agents.Append (this); } UchBusAgent :: UchBusAgent (const char* host, sword port, UchBusAccess *bus) : UchBufStream (0, new UchInetAddress (host, port)), RemoteSubscriptions (), MyBus (bus), DirectMsg (16) { if (!Setup ()) { cerr << "cannot set up stream on " << host << ":" << port << "\n"; return; } SetMode (IORead); Add (UchMpx); MyBus->Agents.Append (this); } UchBusAgent :: ~UchBusAgent () { MyBus->Agents.Remove (this); Remove (); Close (); } /* this had to be redefined because UchChannel adds null character and we don't want it */ void UchBusAgent :: WriteString (const char* s) { /* now that this is a BufStream, perhaps we should use Buffer? */ Write ((byte*) s, strlen (s)); } void UchBusAgent :: SayHello () { CcuSignalBlocker b (SigPipe); char buf[32]; /* Say hello */ sprintf (buf, "%d %d%c", BusHello, MyBus->ListenPort, STX); WriteString (buf); WriteString (MyBus->Name); WriteString ("\n"); if (_BusDebug) cout << "sending subscriptions\n"; CcuListIterOf s = MyBus->LocalSubscriptions; while (++s) SendLocalSubscription (*s); /* say I'm ready */ sprintf (buf, "%d 0%c\n", BusReady, STX); WriteString (buf); } void UchBusAgent :: SendLocalSubscription (const UchBusTrigger* s) { char buf[32]; sprintf (buf, "%d %d%c", BusRegexp, s->GetId (), STX); WriteString (buf); WriteString (s->GetRegexp ()); WriteString ("\n"); if (_BusDebug) cout << "Sending regexp: " << buf << s->GetRegexp () << '\n'; } void UchBusAgent :: SayBye () { CcuSignalBlocker b (SigPipe); char buf[32]; /* Say bye */ sprintf (buf, "%d 0%c\n", BusBye, STX); WriteString (buf); } /* this happens when another agent talks */ void UchBusAgent :: HandleRead () { /* read in buffer */ if (InBuffer.BufLength () == 0) InBuffer.NeedSize (128); int n = Read (InBuffer); /* handle errors */ if (n == -2) { InBuffer.Grow (); n = Read (InBuffer); } if (n <= 0) { delete this; return; } /* slice input */ char* start = (char*) InBuffer.Buffer (); char* stop = start + InBuffer.BufLength (); for (char* p = start; p < stop; p++) { if (*p == '\n') { *p = '\0'; ProcessLine (start); start = p+1; InBuffer.Flush ((byte*) start); } } } static int id = 0; static int has_id (UchBusSubscription* s) { return (s->GetId () == id); } void UchBusAgent :: ProcessLine (const char* line) { /* message structure is: Bye: "0 0\002" Regexp: "1 23\002b(la)bl(a*)b(la)" Msg: "2 23\002match1\003match2\003match3" Error: "3 0\002error!" DelRegexp: "4 0\002" Ready: "5 0\002" Hello: "6 12526\002TESTAGENT" DirectMsg: "7 23\002blablabla" */ /* extract message header */ int msg_type, id, nb_chars; int nb_fields = sscanf (line,"%d %d%n", &msg_type, &id, &nb_chars); if (nb_fields != 2) { cerr << "Bad format: " << line; return; } const char* p = (const char*) (line+nb_chars); DnnEvent* ev; switch (msg_type) { case BusBye: if (_BusDebug) cout << "[bye " << Name << "]\n"; ev = new UchAgentEvent (this, UchAgentEvent::BusAgentBye); Bye.Dispatch (*ev); delete this; break; case BusRegexp: if (_BusDebug) cout << "[regexp " << Name << " " << p+1 << "]\n"; RemoteSubscriptions.Append (new UchBusSubscription (id, p+1)); break; case BusMsg: if (_BusDebug) cout << "[event " << Name << " " << p+1 << "]\n"; MyBus->HandleEvent (id, p+1); break; case BusDelRegexp: if (_BusDebug) cout << "[del " << Name << " " << id << "]\n"; ::id = id; RemoteSubscriptions.Remove (has_id); break; case BusReady: if (_BusDebug) cout << "[ready " << Name << "]\n"; ev = new UchAgentEvent (this, UchAgentEvent::BusNewAgent); MyBus->NewAgents.Dispatch (*ev); break; case BusDirectMsg: { if (_BusDebug) cout << "[direct " << Name << id << " " << p+1 << "]\n"; ev = new UchDirectEvent (this, p+1); DnnTrigger* t = DirectMsg[id]; t->Dispatch (*ev); break; } case BusHello: Name = p+1; if (_BusDebug) cout << "[hello " << Name << "]\n"; if (id && !MyBus->CheckUnicity (this, id)) delete this; else RemoteId = id; break; case BusError: break; case BusDie: /* too violent */ if (_BusDebug) cout << "[die]\n"; exit (0); } } void UchBusAgent :: EmitEvent (const char* msg) { CcuSignalBlocker b (SigPipe); const int MAX_MATCHING_ARGS = 20; regmatch_t pmatch[MAX_MATCHING_ARGS+1]; /* scan all subscriptions from this agent */ CcuListIterOf r = RemoteSubscriptions; while (++r) { UchBusSubscription* s = *r; /* does the message match the regexp? */ if (regexec (s->GetCompiled (), msg, MAX_MATCHING_ARGS, pmatch, 0) != 0) continue; /* if yes, emit it as such */ if (_BusDebug) cout << msg << " matches " << s->GetRegexp () << "\n"; char buf[64]; sprintf (buf, "%d %d%c", BusMsg, s->GetId (), STX); WriteString (buf); #ifdef GNU_REGEXP regmatch_t* p = &pmatch[1]; while (p->rm_so != -1) { char buf[4096]; sprintf (buf, "%.*s%c", p->rm_eo - p->rm_so, msg + p->rm_so, ETX); WriteString (buf); ++p; } #else for (int i = 1; i < s->GetCompiled ()->re_nsub+1; i ++ ) { if (pmatch[i].rm_so != -1 ) { sprintf (buf, "%.*s%c", pmatch[i].rm_eo - pmatch[i].rm_so, msg + pmatch[i].rm_so, ETX); } else { sprintf (buf, "%c", ETX); } WriteString (buf); } #endif WriteString ("\n"); } } /* emit direct message */ void UchBusAgent :: Emit (int id, const char* fmt, ...) { /* build message */ char message[4096]; va_list ap; va_start (ap, fmt); vsprintf (message, fmt, ap); va_end (ap ); /* send it */ CcuSignalBlocker b (SigPipe); char buf[32]; sprintf (buf, "%d %d%c", BusDirectMsg, id, STX); WriteString (buf); WriteString (message); WriteString ("\n"); } UchBusAccess :: UchBusAccess (const char* name, const char* bus) : UchDatagram (), UchBaseSignalHandler (*UchMpx, SigInt), Name (name), ListenPort (0), BroadcastPort (0), Server (0), Agents (), IdToSubscription (256), LocalSubscriptions () { /** determine network list as well as broadcast port **/ /* (we accept addresses such as 123.231,123.123:2000 or 123.231 or :2000) */ /* first, let's find something to parse */ if (!bus || !*bus) bus = getenv ("IVYBUS"); if (!bus || !*bus) bus = DefaultBus; /* then, let's get a port number. We'll parse network list later.*/ sword port; char* q = strchr (bus, ':'); if (bus && (port = atoi (q+1))) BroadcastPort = port; else BroadcastPort = DefaultBusPort; /** set up UDP broadcast socket **/ UchInetAddress* a = new UchInetAddress (ANYADDR, BroadcastPort); BindTo (a); if (!Open ()) { perror ("Cannot open socket: "); return; } if (!AllowBroadcast ()) { perror ("socket won't broadcast: "); return; } if (!ReuseAddress ()) { perror ("socket won't let address be reused: "); return; } if (Bind () < 0) { cerr << "cannot bind socket to port " << BroadcastPort << "\n"; return; } // SetMode (IOReadWrite); SetMode (IORead); /** set up stream **/ Server = new UchBusServer (this); a = dynamic_cast(Server->BoundTo ()); if (!a) { cerr << "bad address type?!\n"; return; } ListenPort = a->Port (); /** broadcast handshakes on every network **/ char handshake[32]; sprintf (handshake, "%d %hu\n", Version, ListenPort); /* if we only have a port number, resort to default value for network */ if (bus == q) bus = DefaultBusNet; /* parse network list and send broadcast handshakes. This is long and painful but inet_aton is sloppy. If someone knows other builtin routines that do that... */ unsigned long mask = 0xffffffff; unsigned char elem = 0; int numdigit = 0; int numelem = 0; int error = 0; for (;;) { /* address elements are up to 3 digits... */ if (!error && isdigit (*bus)) { if (numdigit < 3 && numelem < 4) { elem = 10 * elem + *bus -'0'; } else { error = 1; } /* ... terminated by a point, a comma or a colon, or the end of string */ } else if (!error && (*bus == '.' || *bus == ',' || *bus == ':' || *bus == '\0')) { mask = (mask ^ (0xff << (8*(3-numelem)))) | (elem << (8*(3-numelem))); /* after a point, expect next address element */ if (*bus == '.') { numelem++; /* addresses are terminated by a comma or end of string */ } else { printf ("Broadcasting on network %lx, port %d\n", mask, BroadcastPort); UchInetAddress ba (mask, BroadcastPort); Send ((byte*)handshake, strlen (handshake), ba); numelem = 0; mask = 0xffffffff; } numdigit = 0; elem = 0; /* recover from bad addresses at next comma or colon or at end of string */ } else if (*bus == ',' || *bus == ':' || *bus == '\0') { fprintf (stderr, "bad broadcast address\n"); elem = 0; numelem = 0; numdigit = 0; mask = 0xffffffff; error = 0; /* ignore spaces */ } else if (*bus == ' ') { /* everything else is illegal */ } else { error = 1; } /* end of string or colon */ if (*bus == '\0' || *bus == ':') break; ++bus; } /* add to main loop */ Add (UchMpx); } void UchBusAccess :: Clear () { /* say bye to all remote agents, remove and delete them */ UchBusAgent* a; while (a = Agents.RemoveFirst ()) { a->SayBye (); delete a; } } UchBusAccess :: ~UchBusAccess () { } /* this happens when someone sends a broadcast on the bus: a newcomer */ void UchBusAccess :: HandleRead () { /* read data */ byte buf [256]; int nb_bytes = Receive (buf, 256); if (nb_bytes < 0) { cerr << "read error on broadcast port\n"; return; } /* decode data: should be a handshake */ int bus_version; sword remote_listen_port; int nb_infos = sscanf ((char*) buf, "%d %hu", &bus_version, &remote_listen_port); if (nb_infos < 2) { cerr << "bad data on broadcast port\n"; return; } /* where did this come from? */ UchInetAddress* a = dynamic_cast (From ()); if (!a) { cerr << "bad address!?\n"; return; } const char* remote_host = a->GetHostName (); sword remote_port = a->Port (); /* sanity checks: is it the same protocol version? is it myself calling? */ if (remote_listen_port == ListenPort) return; if (bus_version != Version) { cerr << "bad protocol version\n"; return; } if (_BusDebug) printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port ); /* Create agent handle */ UchBusAgent* ag = new UchBusAgent (remote_host, remote_listen_port, this); /* send name and subscriptions */ ag->SayHello (); } void UchBusAccess :: HandleEvent (int id, const char* msg) { UchBusTrigger* s = IdToSubscription.Get (id); if (!s) { cerr << "bad regexp id " << id << "\n"; return; } if (_BusDebug) cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n"; UchBusEvent* ev = new UchBusEvent (s->GetRegexp (), msg); s->Dispatch (*ev); } /* this is the handler for SIGINT */ void UchBusAccess :: DeferredHandle (int) { cerr << "quitting\n"; Clear (); UchStop (); } bool UchBusAccess :: CheckUnicity (UchBusAgent* a, sword id) { CcuListIterOf li = Agents; while (++li) { if ((*li)->RemoteId == id && a != (*li)) return false; } return true; } void UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* fmt, ...) { /* build regexp */ char regexp[4096]; va_list ap; va_start (ap, fmt); vsprintf (regexp, fmt, ap); va_end (ap ); /* store new subscription */ UchBusTrigger* s = new UchBusTrigger (regexp); LocalSubscriptions.Append (s); int id = IdToSubscription.Store (s); s->SetId (id); /* emit to current remote agents */ CcuListIterOf a = Agents; while (++a) (*a)->SendLocalSubscription (s); /* link to reaction */ r.SubscribeTo (*s); } void UchBusAccess :: Emit (const char* fmt, ...) { /* build message */ char message[4096]; va_list ap; va_start (ap, fmt); vsprintf (message, fmt, ap); va_end (ap ); CcuListIterOf a = Agents; while (++a) (*a)->EmitEvent (message); } DnnEventType* UchAgentEvent::BusNewAgent = new DnnEventType ("new agent", 0, 0); DnnEventType* UchAgentEvent::BusAgentBye = new DnnEventType ("agent bye", 0, 0); UchBusEvent :: UchBusEvent (const char* r, const char* m) : DnnEvent (), Regexp (r), NbMatches (0), Matches (m), MatchList () { register const char* p = m; register char* q = (char*) m; bool end = false; while (!end) { switch (*q) { case '\0': end = true; if (q == p) break; /* else fall through */ case ETX: NbMatches++; *q = '\0'; MatchList.Append (new CcuString (p)); p = q + 1; /* fall through */ default: ++q; } } } UchBusEvent :: ~UchBusEvent () { CcuListIterOf li = MatchList; while (++li) delete (*li); } UchAgentEvent :: UchAgentEvent (UchBusAgent* a, DnnEventType* t) : DnnEvent (t), Agent (a) { } UchAgentEvent :: ~UchAgentEvent () { } UchDirectEvent :: UchDirectEvent (UchBusAgent* a, const char* m) : DnnEvent (), Agent (a), Msg (m) { } UchDirectEvent::~UchDirectEvent () { }