/* * Ivy League * * Ivy bus access * * Copyright 1997-2000 * Centre d'Etudes de la Navigation Aerienne (CENA) * * by Stephane Chatty * * $Id$ * */ #include "BusAccess.h" #include "Scheduler.h" #include "BufStream.h" #include "InetAddress.h" #include "ivl/String.h" #include "ivl/Trigger.h" #include "ivl/Reaction.h" #include "ivl/Signal.h" #include #include #include #include #include #include int IvlBusAccess::Version = 3; const char* const IvlBusAccess::DefaultBus = "127:2010"; const sword IvlBusAccess::DefaultBusPort = 2010; const char* const IvlBusAccess::DefaultBusNet = "127"; static int _BusDebug = 0; const unsigned char STX = '\002'; const unsigned char ETX = '\003'; typedef enum { BusBye, /* quitte l'application */ BusRegexp, /* expression reguliere d'un client */ BusMsg, /* message reel */ BusError, /* error message */ BusDelRegexp, /* Remove expression reguliere */ BusReady, BusHello, BusDirectMsg, BusDie } BusMsgType; class IvlBusSubscription { protected: int Id; IvlString Regexp; regex_t Compiled; public: IvlBusSubscription (int, const char*); inline int GetId () const { return Id; } inline const char* GetRegexp () const { return Regexp; } inline const regex_t* GetCompiled () const { return &Compiled; } }; class IvlBusTrigger : public IvlTrigger { protected: int Id; IvlString Regexp; public: IvlBusTrigger (const char*); inline int GetId () const { return Id; } inline void SetId (int id) { Id = id; } inline const char* GetRegexp () const { return Regexp; } }; class IvlBusServer : public IvlStream { public: IvlBusAccess* MyBus; IvlBusServer (IvlBusAccess*); void HandleRead (); }; /* this is used for local subscriptions */ IvlBusTrigger :: IvlBusTrigger (const char* r) : Id (-1), Regexp (r) { } /* this is used for remote subscriptions */ IvlBusSubscription :: IvlBusSubscription (int id, const char* r) : Id (id), Regexp (r) { int reg = regcomp (&Compiled, r, REG_ICASE|REG_EXTENDED); if (reg != 0) cerr << "Ivy warning: bad regexp " << r << "\n"; } IvlBusServer :: IvlBusServer (IvlBusAccess* bus) : IvlStream (new IvlInetAddress (ANYADDR)), MyBus (bus) { if (Listen () < 0) { perror ("Ivy warning. Cannot listen on bus server"); return; } SetMode (IORead); Add (IvlScd); } /* this happens when another agent opens a connection with us */ void IvlBusServer :: HandleRead () { /* Accept connection */ int fd = Accept (); if (fd < 0) { perror ("Ivy warning. Cannot accept connection on server"); return; } /* create agent handle */ if (_BusDebug) cout << "Someone was there. Hello!\n"; IvlBusAgent* a = new IvlBusAgent (fd, MyBus); /* send name and subscriptions */ a->SayHello (); } IvlBusAgent :: IvlBusAgent (int fd, IvlBusAccess* bus) : IvlBufStream (), RemoteSubscriptions (), MyBus (bus), DirectMsg (16) { SetMode (IORead); IvlChannel::Open (fd); Add (IvlScd); MyBus->Agents.Append (this); } IvlBusAgent :: IvlBusAgent (lword host, sword port, IvlBusAccess *bus) : IvlBufStream (0, new IvlInetAddress (host, port)), RemoteSubscriptions (), MyBus (bus), DirectMsg (16) { if (!Setup ()) { cerr << "Ivy warning. Cannot set up stream on"; perror (""); return; } SetMode (IORead); Add (IvlScd); MyBus->Agents.Append (this); } IvlBusAgent :: ~IvlBusAgent () { MyBus->Agents.Remove (this); Remove (); Close (); } /* this had to be redefined because IvlChannel adds null character and we don't want it */ void IvlBusAgent :: WriteString (const char* s) { /* now that this is a BufStream, perhaps we should use Buffer? */ Write ((byte*) s, strlen (s)); } void IvlBusAgent :: SayHello () { IvlSignalBlocker b (SigPipe); char buf[32]; /* Say hello */ sprintf (buf, "%d %d%c", BusHello, MyBus->ListenPort, STX); WriteString (buf); WriteString (MyBus->Name); WriteString ("\n"); if (_BusDebug) cout << "sending subscriptions\n"; IvlListIterOf s = MyBus->LocalSubscriptions; while (++s) SendLocalSubscription (*s); /* say I'm ready */ sprintf (buf, "%d 0%c\n", BusReady, STX); WriteString (buf); } void IvlBusAgent :: SendLocalSubscription (const IvlBusTrigger* s) { char buf[32]; sprintf (buf, "%d %d%c", BusRegexp, s->GetId (), STX); WriteString (buf); WriteString (s->GetRegexp ()); WriteString ("\n"); if (_BusDebug) cout << "Sending regexp: " << buf << s->GetRegexp () << '\n'; } void IvlBusAgent :: SayBye () { IvlSignalBlocker b (SigPipe); char buf[32]; /* Say bye */ sprintf (buf, "%d 0%c\n", BusBye, STX); WriteString (buf); } /* this happens when another agent talks to us */ void IvlBusAgent :: HandleRead () { /* read in buffer */ if (InBuffer.BufLength () == 0) InBuffer.NeedSize (128); int n = Read (InBuffer); /* handle errors */ if (n == -2) { InBuffer.Grow (); n = Read (InBuffer); } if (n <= 0) { delete this; return; } /* slice input */ char* start = (char*) InBuffer.Buffer (); char* stop = start + InBuffer.BufLength (); for (char* p = start; p < stop; p++) { if (*p == '\n') { *p = '\0'; ProcessLine (start); start = p+1; InBuffer.Flush ((byte*) start); } } } static int id = 0; static int has_id (IvlBusSubscription* s) { return (s->GetId () == id); } void IvlBusAgent :: ProcessLine (const char* line) { /* message structure is: Bye: "0 0\002" Regexp: "1 23\002b(la)bl(a*)b(la)" Msg: "2 23\002match1\003match2\003match3" Error: "3 0\002error!" DelRegexp: "4 0\002" Ready: "5 0\002" Hello: "6 12526\002TESTAGENT" DirectMsg: "7 23\002blablabla" */ /* extract message header */ int msg_type, id, nb_chars; int nb_fields = sscanf (line,"%d %d%n", &msg_type, &id, &nb_chars); if (nb_fields != 2) { cerr << "Ivy warning: bad message format: " << line; return; } const char* p = (const char*) (line+nb_chars); IvlEvent* ev; switch (msg_type) { case BusBye: if (_BusDebug) cout << "[bye " << Name << "]\n"; ev = new IvlBusAgentEvent (this, IvlBusAgentEvent::BusAgentBye); Bye.Dispatch (*ev); delete this; break; case BusRegexp: if (_BusDebug) cout << "[regexp " << Name << " " << p+1 << "]\n"; RemoteSubscriptions.Append (new IvlBusSubscription (id, p+1)); break; case BusMsg: if (_BusDebug) cout << "[event " << Name << " " << p+1 << "]\n"; MyBus->HandleEvent (id, p+1); break; case BusDelRegexp: if (_BusDebug) cout << "[del " << Name << " " << id << "]\n"; ::id = id; RemoteSubscriptions.Remove (has_id); break; case BusReady: if (_BusDebug) cout << "[ready " << Name << "]\n"; ev = new IvlBusAgentEvent (this, IvlBusAgentEvent::BusNewAgent); MyBus->NewAgents.Dispatch (*ev); break; case BusDirectMsg: { if (_BusDebug) cout << "[direct " << Name << id << " " << p+1 << "]\n"; ev = new IvlDirectEvent (this, p+1); IvlTrigger* t = DirectMsg[id]; t->Dispatch (*ev); break; } case BusHello: Name = p+1; if (_BusDebug) cout << "[hello " << Name << "]\n"; if (id && !MyBus->CheckUnicity (this, id)) delete this; else RemoteId = id; break; case BusError: break; case BusDie: /* too violent */ if (_BusDebug) cout << "[die]\n"; exit (0); } } void IvlBusAgent :: EmitEvent (const char* msg) { IvlSignalBlocker b (SigPipe); const int MAX_MATCHING_ARGS = 20; regmatch_t pmatch[MAX_MATCHING_ARGS+1]; /* scan all subscriptions from this agent */ IvlListIterOf r = RemoteSubscriptions; while (++r) { IvlBusSubscription* s = *r; /* does the message match the regexp? */ if (regexec (s->GetCompiled (), msg, MAX_MATCHING_ARGS, pmatch, 0) != 0) continue; /* if yes, emit it as such */ if (_BusDebug) cout << msg << " matches " << s->GetRegexp () << "\n"; char buf[64]; sprintf (buf, "%d %d%c", BusMsg, s->GetId (), STX); WriteString (buf); #ifdef GNU_REGEXP regmatch_t* p = &pmatch[1]; while (p->rm_so != -1) { char buf[4096]; sprintf (buf, "%.*s%c", p->rm_eo - p->rm_so, msg + p->rm_so, ETX); WriteString (buf); ++p; } #else for (int i = 1; i < s->GetCompiled ()->re_nsub+1; i ++ ) { if (pmatch[i].rm_so != -1 ) { sprintf (buf, "%.*s%c", pmatch[i].rm_eo - pmatch[i].rm_so, msg + pmatch[i].rm_so, ETX); } else { sprintf (buf, "%c", ETX); } WriteString (buf); } #endif WriteString ("\n"); } } /* emit direct message */ void IvlBusAgent :: Emit (int id, const char* fmt, ...) { /* build message */ char message[4096]; va_list ap; va_start (ap, fmt); vsprintf (message, fmt, ap); va_end (ap ); /* send it */ IvlSignalBlocker b (SigPipe); char buf[32]; sprintf (buf, "%d %d%c", BusDirectMsg, id, STX); WriteString (buf); WriteString (message); WriteString ("\n"); } IvlBusAccess :: IvlBusAccess (const char* name, const char* bus) : IvlDatagram (), IvlBaseScheduledHandler (*IvlScd, SigInt), Name (name), ListenPort (0), BroadcastPort (0), Server (0), Agents (), IdToSubscription (256), LocalSubscriptions () { /** determine network list as well as broadcast port **/ /* (we accept addresses such as 123.231,123.123:2000 or 123.231 or :2000) */ /* first, let's find something to parse */ if (!bus || !*bus) bus = getenv ("IVYBUS"); if (!bus || !*bus) bus = DefaultBus; /* then, let's get a port number. We'll parse network list later.*/ sword port; char* q = strchr (bus, ':'); if (bus && (port = atoi (q+1))) BroadcastPort = port; else BroadcastPort = DefaultBusPort; /** set up UDP broadcast socket **/ IvlInetAddress* a = new IvlInetAddress (ANYADDR, BroadcastPort); BindTo (a); if (!Open ()) { perror ("Ivy warning. Cannot open socket"); return; } if (!AllowBroadcast ()) { perror ("Ivy warning. Socket won't broadcast"); return; } if (!ReuseAddress ()) { perror ("Ivy warning. Socket won't let address be reused"); return; } if (Bind () < 0) { cerr << "Ivy warning. Cannot bind socket to port " << BroadcastPort; perror (""); return; } SetMode (IORead); /** set up stream **/ Server = new IvlBusServer (this); a = dynamic_cast(Server->BoundTo ()); if (!a) { cerr << "Ivy warning: bad address type?!\n"; return; } ListenPort = a->Port (); /** broadcast handshakes on every network **/ char handshake[32]; sprintf (handshake, "%d %hu\n", Version, ListenPort); /* if we only have a port number, resort to default value for network */ if (bus == q) bus = DefaultBusNet; /* parse network list and send broadcast handshakes. This is long and painful but inet_aton is sloppy. If someone knows other builtin routines that do that... */ unsigned long mask = 0xffffffff; unsigned char elem = 0; int numdigit = 0; int numelem = 0; int error = 0; for (;;) { /* address elements are up to 3 digits... */ if (!error && isdigit (*bus)) { if (numdigit < 3 && numelem < 4) { elem = 10 * elem + *bus - '0'; } else { error = 1; } /* ... terminated by a point, a comma or a colon, or the end of string */ } else if (!error && (*bus == '.' || *bus == ',' || *bus == ':' || *bus == '\0')) { mask = (mask ^ (0xff << (8*(3-numelem)))) | (elem << (8*(3-numelem))); /* after a point, expect next address element */ if (*bus == '.') { numelem++; /* addresses are terminated by a comma or end of string */ } else { printf ("Broadcasting on network %lx, port %d\n", mask, BroadcastPort); IvlInetAddress ba (mask, BroadcastPort); if (Send ((byte*)handshake, strlen (handshake), ba) < 0) { cerr << "Ivy warning. Broadcast to 0x" << hex (mask) << ":" << BroadcastPort; perror (" failed"); } numelem = 0; mask = 0xffffffff; } numdigit = 0; elem = 0; /* recover from bad addresses at next comma or colon or at end of string */ } else if (*bus == ',' || *bus == ':' || *bus == '\0') { cerr << "Ivy warning: bad broadcast address\n"; elem = 0; numelem = 0; numdigit = 0; mask = 0xffffffff; error = 0; /* ignore spaces */ } else if (*bus == ' ') { /* everything else is illegal */ } else { error = 1; } /* end of string or colon */ if (*bus == '\0' || *bus == ':') break; ++bus; } /* add to main loop */ Add (IvlScd); } void IvlBusAccess :: Clear () { /* say bye to all remote agents, remove and delete them */ IvlBusAgent* a; while (a = Agents.RemoveFirst ()) { a->SayBye (); delete a; } } IvlBusAccess :: ~IvlBusAccess () { } /* this happens when someone sends a broadcast on the bus: a newcomer */ void IvlBusAccess :: HandleRead () { /* read data */ byte buf [256]; int nb_bytes = Receive (buf, 256); if (nb_bytes < 0) { perror ("Ivy warning. Read error on broadcast port"); return; } /* decode data: should be a handshake */ int bus_version; sword remote_listen_port; int nb_infos = sscanf ((char*) buf, "%d %hu", &bus_version, &remote_listen_port); if (nb_infos < 2) { cerr << "Ivy warning: bad data on broadcast port\n"; return; } /* where did this come from? */ IvlInetAddress* a = dynamic_cast (From ()); if (!a) { cerr << "Ivy warning: bad address!?\n"; return; } /* sanity checks: is it the same protocol version? is it myself calling? */ if (remote_listen_port == ListenPort) return; if (bus_version != Version) { cerr << "Ivy warning: bad protocol version\n"; return; } if (_BusDebug) { const char* remote_host = a->GetHostName (); sword remote_port = a->Port (); printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port ); } /* Create agent handle */ // IvlBusAgent* ag = new IvlBusAgent (remote_host, remote_listen_port, this); IvlBusAgent* ag = new IvlBusAgent (a->Host (), remote_listen_port, this); /* send name and subscriptions */ ag->SayHello (); } void IvlBusAccess :: HandleEvent (int id, const char* msg) { IvlBusTrigger* s = IdToSubscription.Get (id); if (!s) { cerr << "Ivy warning: bad regexp id " << id << "\n"; return; } if (_BusDebug) cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n"; IvlBusEvent* ev = new IvlBusEvent (s->GetRegexp (), msg); s->Dispatch (*ev); } /* this is the handler for SIGINT */ void IvlBusAccess :: DeferredHandle (int) { cerr << "quitting\n"; Clear (); IvlStop (); } bool IvlBusAccess :: CheckUnicity (IvlBusAgent* a, sword id) { IvlListIterOf li = Agents; while (++li) { if ((*li)->RemoteId == id && a != (*li)) return false; } return true; } void IvlBusAccess :: Subscribe (IvlBaseReaction& r, const char* fmt, ...) { /* build regexp */ char regexp[4096]; va_list ap; va_start (ap, fmt); vsprintf (regexp, fmt, ap); va_end (ap ); /* store new subscription */ IvlBusTrigger* s = new IvlBusTrigger (regexp); LocalSubscriptions.Append (s); int id = IdToSubscription.Store (s); s->SetId (id); /* emit to current remote agents */ IvlListIterOf a = Agents; while (++a) (*a)->SendLocalSubscription (s); /* link to reaction */ r.SubscribeTo (*s); } void IvlBusAccess :: Emit (const char* fmt, ...) { /* build message */ char message[4096]; va_list ap; va_start (ap, fmt); vsprintf (message, fmt, ap); va_end (ap ); IvlListIterOf a = Agents; while (++a) (*a)->EmitEvent (message); } IvlEventType* IvlBusAgentEvent::BusNewAgent = new IvlEventType ("new agent", 0, 0); IvlEventType* IvlBusAgentEvent::BusAgentBye = new IvlEventType ("agent bye", 0, 0); IvlBusEvent :: IvlBusEvent (const char* r, const char* m) : IvlEvent (), Regexp (r), NbMatches (0), Matches (m), MatchList () { register const char* p = m; register char* q = (char*) m; bool end = false; while (!end) { switch (*q) { case '\0': end = true; if (q == p) break; /* else fall through */ case ETX: NbMatches++; *q = '\0'; MatchList.Append (new IvlString (p)); p = q + 1; /* fall through */ default: ++q; } } } IvlBusEvent :: ~IvlBusEvent () { IvlListIterOf li = MatchList; while (++li) delete (*li); } IvlBusAgentEvent :: IvlBusAgentEvent (IvlBusAgent* a, IvlEventType* t) : IvlEvent (t), Agent (a) { } IvlBusAgentEvent :: ~IvlBusAgentEvent () { } IvlDirectEvent :: IvlDirectEvent (IvlBusAgent* a, const char* m) : IvlEvent (), Agent (a), Msg (m) { } IvlDirectEvent::~IvlDirectEvent () { }