/* * The Unix Channel * * by Michel Beaudouin-Lafon * * Copyright 1990-1997 * Laboratoire de Recherche en Informatique (LRI) * * CENA bus access, by Stephane Chatty * * $Id$ * $CurLog$ */ #include "BusAccess.h" #include "Multiplexer.h" #include "BufStream.h" #include "ccu/String.h" #include "dnn/Trigger.h" #include "dnn/Reaction.h" #include #include #include #include int UchBusAccess::Version = 2; typedef enum { BusBye, /* quitte l'application ( non utilise ) */ BusRegexp, /* expression reguliere d'un client */ BusMsg, /* message reel */ BusError, /* error message */ BusDelRegexp, /* Remove expression reguliere */ BusReady } BusMsgType; class UchBusSubscription { protected: int Id; CcuString Regexp; regex_t Compiled; public: UchBusSubscription (int, const char*); inline int GetId () const { return Id; } inline const char* GetRegexp () const { return Regexp; } inline const regex_t* GetCompiled () const { return &Compiled; } }; class UchBusTrigger : public DnnTrigger { protected: int Id; CcuString Regexp; public: UchBusTrigger (const char*); inline int GetId () const { return Id; } inline void SetId (int id) { Id = id; } inline const char* GetRegexp () const { return Regexp; } }; class UchBusServer : public UchStream { public: UchBusAccess* MyBus; UchBusServer (UchBusAccess*); void HandleRead (); }; class UchBusAgent : public UchBufStream { protected: CcuListOf RemoteSubscriptions; UchBusAccess* MyBus; void WriteString (const char*); void ProcessLine (const char*); public: UchBusAgent (int, UchBusAccess*); UchBusAgent (const char*, sword, UchBusAccess*); void HandleRead (); void SendLocalSubscription (const UchBusTrigger*); void SendLocalSubscriptions (); void Emit (const char*); }; /* this is used for local subscriptions */ UchBusTrigger :: UchBusTrigger (const char* r) : Id (-1), Regexp (r) { } /* this is used for remote subscriptions */ UchBusSubscription :: UchBusSubscription (int id, const char* r) : Id (id), Regexp (r) { int reg = regcomp (&Compiled, r, REG_ICASE|REG_EXTENDED); if (reg != 0) cerr << "bad regexp " << r << "\n"; } UchBusServer :: UchBusServer (UchBusAccess* bus) : UchStream (new UchInetAddress (ANYADDR)), MyBus (bus) { if (Listen () < 0) { cerr << "cannot listen on bus server\n"; return; } SetMode (IORead); Add (UchMpx); } /* this happens when another agent opens a connection with us */ void UchBusServer :: HandleRead () { /* Accept connection */ int fd = Accept (); if (fd < 0) { cerr << "cannot accept connection on server\n"; return; } /* create agent handle */ cout << "Someone was there. Hello!\n"; UchBusAgent* a = new UchBusAgent (fd, MyBus); /* send subscriptions*/ a->SendLocalSubscriptions (); } UchBusAgent :: UchBusAgent (int fd, UchBusAccess* bus) : UchBufStream (), RemoteSubscriptions (), MyBus (bus) { SetMode (IOReadWrite); UchChannel::Open (fd); Add (UchMpx); MyBus->Agents.Append (this); } UchBusAgent :: UchBusAgent (const char* host, sword port, UchBusAccess *bus) : UchBufStream (0, new UchInetAddress (host, port)), RemoteSubscriptions (), MyBus (bus) { if (!Setup ()) { cerr << "cannot set up stream on " << host << ":" << port << "\n"; return; } SetMode (IOReadWrite); Add (UchMpx); MyBus->Agents.Append (this); } UchBusAgent :: ~UchBusAgent () { MyBus->Agents.Remove (this); Remove (); Close (); } /* this had to be redefined because UchChannel adds null character and we don't want it */ void UchBusAgent :: WriteString (const char* s) { /* now that this is a BufStream, perhaps we should use Buffer? */ Write ((byte*) s, strlen (s)); } void UchBusAgent :: SendLocalSubscriptions () { cout << "sending subscriptions\n"; CcuListIterOf s = MyBus->LocalSubscriptions; while (++s) SendLocalSubscription (*s); /* say I'm ready */ char buf[32]; sprintf (buf, "%d %d\n", BusReady, 0); *this << buf; } void UchBusAgent :: SendLocalSubscription (const UchBusTrigger* s) { char buf[32]; sprintf (buf, "%d %d ", BusRegexp, s->GetId ()); WriteString (buf); WriteString (s->GetRegexp ()); WriteString ("\n"); cout << "Sending regexp: " << buf << s->GetRegexp () << '\n'; } /* this happens when another agent talks */ void UchBusAgent :: HandleRead () { /* read in buffer */ if (InBuffer.BufLength () == 0) InBuffer.NeedSize (128); int n = Read (InBuffer); /* handle errors */ if (n == -2) { InBuffer.Grow (); n = Read (InBuffer); } if (n <= 0) { delete this; return; } /* slice input */ char* start = (char*) InBuffer.Buffer (); char* stop = start + InBuffer.BufLength (); for (char* p = start; p < stop; p++) { if (*p == '\n') { *p = '\0'; ProcessLine (start); start = p+1; InBuffer.Flush ((byte*) start); } } } #if 0 byte buf [256]; int nb_read = Read (buf, 256); if (nb_read <= 0) { delete this; return; } buf[nb_read] = '\0'; cout << "agent says:" << (const char*) buf; } #endif void UchBusAgent :: ProcessLine (const char* line) { /* extract */ int msg_type, id, nb_chars; int nb_fields = sscanf (line,"%d %d%n", &msg_type, &id, &nb_chars); if (nb_fields != 2) { cerr << "Bad format: " << line; return; } const char* p = (const char*) (line+nb_chars); switch (msg_type) { case BusBye: cout << "agent says goodbye\n"; delete this; break; case BusRegexp: RemoteSubscriptions.Append (new UchBusSubscription (id, p+1)); break; case BusMsg: MyBus->HandleEvent (id, p); break; case BusDelRegexp: cerr << "not implemented: DelRegexp\n"; break; case BusReady: break; case BusError: break; } } void UchBusAgent :: Emit (const char* msg) { const int MAX_MATCHING_ARGS = 20; regmatch_t pmatch[MAX_MATCHING_ARGS+1]; /* scan all subscriptions from this agent */ CcuListIterOf r = RemoteSubscriptions; while (++r) { UchBusSubscription* s = *r; /* does the message match the regexp? */ if (regexec (s->GetCompiled (), msg, MAX_MATCHING_ARGS, pmatch, 0) != 0) continue; cerr << msg << " matches " << s->GetRegexp () << "\n"; /* if yes, emit it as such */ char buf[64]; sprintf (buf, "%d %d", BusMsg, s->GetId ()); WriteString (buf); regmatch_t* p = &pmatch[1]; while (p->rm_so != -1) { char buf[256]; sprintf (buf, " %.*s", p->rm_eo - p->rm_so, msg + p->rm_so); WriteString (buf); ++p; } WriteString ("\n"); } } UchBusAccess :: UchBusAccess (sword port) : UchDatagram (), UchBaseSignalHandler (*UchMpx, SigInt), ListenPort (0), BroadcastPort (port), Server (0), Agents (), IdToSubscription (256), LocalSubscriptions () { /** set up datagram **/ UchInetAddress* a = new UchInetAddress (ANYADDR, port); BindTo (a); if (!Open ()) { cerr << "Cannot open socket\n"; return; } if (!AllowBroadcast ()) { cerr << "socket won't broadcast\n"; return; } if (!ReuseAddress ()) { cerr << "socket won't let address be reused\n"; return; } if (Bind () < 0) { cerr << "cannot bind socket to port " << port << "\n"; return; } SetMode (IOReadWrite); /** set up stream **/ Server = new UchBusServer (this); a = dynamic_cast(Server->BoundTo ()); if (!a) { cerr << "bad address type?!\n"; return; } ListenPort = a->Port (); /** emit handshake on datagram **/ char handshake[32]; sprintf (handshake, "%d %hu\n", Version, ListenPort); UchInetAddress ba (BROADCAST, BroadcastPort); Send ((byte*)handshake, strlen (handshake), ba); /* add to main loop */ Add (UchMpx); } UchBusAccess :: ~UchBusAccess () { } /* this happens when someone sends a broadcast on the bus: a newcomer */ void UchBusAccess :: HandleRead () { /* read data */ byte buf [256]; int nb_bytes = Receive (buf, 256); if (nb_bytes < 0) { cerr << "read error on broadcast port\n"; return; } /* decode data: should be a handshake */ int bus_version; sword remote_listen_port; int nb_infos = sscanf ((char*) buf, "%d %hu", &bus_version, &remote_listen_port); if (nb_infos < 2) { cerr << "bad data on broadcast port\n"; return; } /* where did this come from? */ UchInetAddress* a = dynamic_cast (From ()); if (!a) { cerr << "bad address!?\n"; return; } const char* remote_host = a->GetHostName (); sword remote_port = a->Port (); /* sanity checks: is it the same protocol version? is it myself calling? */ if (remote_listen_port == ListenPort) return; if (bus_version != Version) { cerr << "bad protocol version\n"; return; } printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port ); /* Create agent handle */ UchBusAgent* ag = new UchBusAgent (remote_host, remote_listen_port, this); /* send subscriptions */ ag->SendLocalSubscriptions (); } void UchBusAccess :: HandleEvent (int id, const char* msg) { UchBusTrigger* s = IdToSubscription.Get (id); if (!s) { cerr << "bad regexp id " << id << "\n"; return; } cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n"; } /* this is the handler for SIGINT */ void UchBusAccess :: DeferredHandle (int) { cerr << "quitting\n"; UchStop (); } void UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* regexp) { /* store new subscription */ UchBusTrigger* s = new UchBusTrigger (regexp); LocalSubscriptions.Append (s); int id = IdToSubscription.Store (s); s->SetId (id); /* emit to current remote agents */ CcuListIterOf a = Agents; while (++a) (*a)->SendLocalSubscription (s); /* link to reaction */ r.SubscribeTo (*s); } void UchBusAccess :: Emit (const char* fmt, ...) { char message[4096]; va_list ap; va_start (ap, fmt); vsprintf (message, fmt, ap); va_end (ap ); CcuListIterOf a = Agents; while (++a) (*a)->Emit (message); }