From 6cea474b7fbf7606d0ab92b24e7ab9f3a7f7d460 Mon Sep 17 00:00:00 2001 From: chatty Date: Tue, 28 Nov 2000 17:07:43 +0000 Subject: Now handle Die Added events Protocol version 3 Fixed IOReadWrite --- comm/BusAccess.cc | 329 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 264 insertions(+), 65 deletions(-) (limited to 'comm/BusAccess.cc') diff --git a/comm/BusAccess.cc b/comm/BusAccess.cc index c92a52c..826146a 100644 --- a/comm/BusAccess.cc +++ b/comm/BusAccess.cc @@ -18,20 +18,30 @@ #include "ccu/String.h" #include "dnn/Trigger.h" #include "dnn/Reaction.h" +#include "ccu/Signal.h" +#include #include #include #include #include -int UchBusAccess::Version = 2; +int UchBusAccess::Version = 3; + +static int _BusDebug = 0; + +const unsigned char STX = '\002'; +const unsigned char ETX = '\003'; typedef enum { - BusBye, /* quitte l'application ( non utilise ) */ + BusBye, /* quitte l'application */ BusRegexp, /* expression reguliere d'un client */ BusMsg, /* message reel */ BusError, /* error message */ BusDelRegexp, /* Remove expression reguliere */ - BusReady + BusReady, + BusHello, + BusDirectMsg, + BusDie } BusMsgType; @@ -68,23 +78,6 @@ public: void HandleRead (); }; -class UchBusAgent : public UchBufStream { -protected: - CcuListOf RemoteSubscriptions; - UchBusAccess* MyBus; - - void WriteString (const char*); - void ProcessLine (const char*); - -public: - UchBusAgent (int, UchBusAccess*); - UchBusAgent (const char*, sword, UchBusAccess*); - void HandleRead (); - void SendLocalSubscription (const UchBusTrigger*); - void SendLocalSubscriptions (); - void Emit (const char*); -}; - /* this is used for local subscriptions */ UchBusTrigger :: UchBusTrigger (const char* r) : Id (-1), @@ -126,19 +119,22 @@ UchBusServer :: HandleRead () } /* create agent handle */ - cout << "Someone was there. Hello!\n"; + if (_BusDebug) + cout << "Someone was there. Hello!\n"; UchBusAgent* a = new UchBusAgent (fd, MyBus); - - /* send subscriptions*/ - a->SendLocalSubscriptions (); + + /* send name and subscriptions */ + a->SayHello (); } UchBusAgent :: UchBusAgent (int fd, UchBusAccess* bus) : UchBufStream (), RemoteSubscriptions (), - MyBus (bus) + MyBus (bus), + DirectMsg (16) { - SetMode (IOReadWrite); +// SetMode (IOReadWrite); + SetMode (IORead); UchChannel::Open (fd); Add (UchMpx); MyBus->Agents.Append (this); @@ -148,13 +144,14 @@ UchBusAgent :: UchBusAgent (int fd, UchBusAccess* bus) UchBusAgent :: UchBusAgent (const char* host, sword port, UchBusAccess *bus) : UchBufStream (0, new UchInetAddress (host, port)), RemoteSubscriptions (), - MyBus (bus) + MyBus (bus), + DirectMsg (16) { if (!Setup ()) { cerr << "cannot set up stream on " << host << ":" << port << "\n"; return; } - SetMode (IOReadWrite); + SetMode (IORead); Add (UchMpx); MyBus->Agents.Append (this); } @@ -178,28 +175,49 @@ UchBusAgent :: WriteString (const char* s) } void -UchBusAgent :: SendLocalSubscriptions () +UchBusAgent :: SayHello () { - cout << "sending subscriptions\n"; + CcuSignalBlocker b (SigPipe); + char buf[32]; + + /* Say hello */ + sprintf (buf, "%d %d%c", BusHello, MyBus->ListenPort, STX); + WriteString (buf); + WriteString (MyBus->Name); + WriteString ("\n"); + + if (_BusDebug) + cout << "sending subscriptions\n"; CcuListIterOf s = MyBus->LocalSubscriptions; while (++s) SendLocalSubscription (*s); /* say I'm ready */ - char buf[32]; - sprintf (buf, "%d %d\n", BusReady, 0); - *this << buf; + sprintf (buf, "%d 0%c\n", BusReady, STX); + WriteString (buf); } void UchBusAgent :: SendLocalSubscription (const UchBusTrigger* s) { char buf[32]; - sprintf (buf, "%d %d ", BusRegexp, s->GetId ()); + sprintf (buf, "%d %d%c", BusRegexp, s->GetId (), STX); WriteString (buf); WriteString (s->GetRegexp ()); WriteString ("\n"); - cout << "Sending regexp: " << buf << s->GetRegexp () << '\n'; + if (_BusDebug) + cout << "Sending regexp: " << buf << s->GetRegexp () << '\n'; +} + +void +UchBusAgent :: SayBye () +{ + CcuSignalBlocker b (SigPipe); + char buf[32]; + + /* Say bye */ + sprintf (buf, "%d 0%c\n", BusBye, STX); + WriteString (buf); } @@ -235,23 +253,29 @@ UchBusAgent :: HandleRead () } } +static int id = 0; -#if 0 - byte buf [256]; - int nb_read = Read (buf, 256); - if (nb_read <= 0) { - delete this; - return; - } - buf[nb_read] = '\0'; - cout << "agent says:" << (const char*) buf; +static int +has_id (UchBusSubscription* s) +{ + return (s->GetId () == id); } -#endif void UchBusAgent :: ProcessLine (const char* line) { - /* extract */ + /* message structure is: + Bye: "0 0\002" + Regexp: "1 23\002b(la)bl(a*)b(la)" + Msg: "2 23\002match1\003match2\003match3" + Error: "3 0\002error!" + DelRegexp: "4 0\002" + Ready: "5 0\002" + Hello: "6 12526\002TESTAGENT" + DirectMsg: "7 23\002blablabla" + */ + + /* extract message header */ int msg_type, id, nb_chars; int nb_fields = sscanf (line,"%d %d%n", &msg_type, &id, &nb_chars); if (nb_fields != 2) { @@ -260,30 +284,69 @@ UchBusAgent :: ProcessLine (const char* line) } const char* p = (const char*) (line+nb_chars); + DnnEvent* ev; switch (msg_type) { case BusBye: - cout << "agent says goodbye\n"; + if (_BusDebug) + cout << "[bye " << Name << "]\n"; + ev = new UchAgentEvent (this, UchAgentEvent::BusAgentBye); + Bye.Dispatch (*ev); delete this; break; case BusRegexp: + if (_BusDebug) + cout << "[regexp " << Name << " " << p+1 << "]\n"; RemoteSubscriptions.Append (new UchBusSubscription (id, p+1)); break; case BusMsg: - MyBus->HandleEvent (id, p); + if (_BusDebug) + cout << "[event " << Name << " " << p+1 << "]\n"; + MyBus->HandleEvent (id, p+1); break; case BusDelRegexp: - cerr << "not implemented: DelRegexp\n"; + if (_BusDebug) + cout << "[del " << Name << " " << id << "]\n"; + ::id = id; + RemoteSubscriptions.Remove (has_id); break; case BusReady: + if (_BusDebug) + cout << "[ready " << Name << "]\n"; + ev = new UchAgentEvent (this, UchAgentEvent::BusNewAgent); + MyBus->NewAgents.Dispatch (*ev); + break; + case BusDirectMsg: + { + if (_BusDebug) + cout << "[direct " << Name << id << " " << p+1 << "]\n"; + ev = new UchDirectEvent (this, p+1); + DnnTrigger* t = DirectMsg[id]; + t->Dispatch (*ev); + break; + } + case BusHello: + Name = p+1; + if (_BusDebug) + cout << "[hello " << Name << "]\n"; + if (id && !MyBus->CheckUnicity (this, id)) + delete this; + else + RemoteId = id; break; case BusError: break; + case BusDie: + /* too violent */ + if (_BusDebug) + cout << "[die]\n"; + exit (0); } } void -UchBusAgent :: Emit (const char* msg) +UchBusAgent :: EmitEvent (const char* msg) { + CcuSignalBlocker b (SigPipe); const int MAX_MATCHING_ARGS = 20; regmatch_t pmatch[MAX_MATCHING_ARGS+1]; @@ -295,26 +358,59 @@ UchBusAgent :: Emit (const char* msg) if (regexec (s->GetCompiled (), msg, MAX_MATCHING_ARGS, pmatch, 0) != 0) continue; - cerr << msg << " matches " << s->GetRegexp () << "\n"; /* if yes, emit it as such */ + if (_BusDebug) + cout << msg << " matches " << s->GetRegexp () << "\n"; char buf[64]; - sprintf (buf, "%d %d", BusMsg, s->GetId ()); + sprintf (buf, "%d %d%c", BusMsg, s->GetId (), STX); WriteString (buf); +#ifdef GNU_REGEXP regmatch_t* p = &pmatch[1]; while (p->rm_so != -1) { - char buf[256]; - sprintf (buf, " %.*s", p->rm_eo - p->rm_so, msg + p->rm_so); + char buf[4096]; + sprintf (buf, "%.*s%c", p->rm_eo - p->rm_so, msg + p->rm_so, ETX); WriteString (buf); ++p; } +#else + for (int i = 1; i < s->GetCompiled ()->re_nsub+1; i ++ ) { + if (pmatch[i].rm_so != -1 ) { + sprintf (buf, "%.*s%c", pmatch[i].rm_eo - pmatch[i].rm_so, msg + pmatch[i].rm_so, ETX); + } else { + sprintf (buf, "%c", ETX); + } + WriteString (buf); + } +#endif WriteString ("\n"); } } +/* emit direct message */ +void +UchBusAgent :: Emit (int id, const char* fmt, ...) +{ + /* build message */ + char message[4096]; + va_list ap; + + va_start (ap, fmt); + vsprintf (message, fmt, ap); + va_end (ap ); + + /* send it */ + CcuSignalBlocker b (SigPipe); + char buf[32]; + sprintf (buf, "%d %d%c", BusDirectMsg, id, STX); + WriteString (buf); + WriteString (message); + WriteString ("\n"); +} -UchBusAccess :: UchBusAccess (sword port) +UchBusAccess :: UchBusAccess (const char* name, sword port) : UchDatagram (), UchBaseSignalHandler (*UchMpx, SigInt), + Name (name), ListenPort (0), BroadcastPort (port), Server (0), @@ -326,15 +422,15 @@ UchBusAccess :: UchBusAccess (sword port) UchInetAddress* a = new UchInetAddress (ANYADDR, port); BindTo (a); if (!Open ()) { - cerr << "Cannot open socket\n"; + perror ("Cannot open socket: "); return; } if (!AllowBroadcast ()) { - cerr << "socket won't broadcast\n"; + perror ("socket won't broadcast: "); return; } if (!ReuseAddress ()) { - cerr << "socket won't let address be reused\n"; + perror ("socket won't let address be reused: "); return; } @@ -342,7 +438,8 @@ UchBusAccess :: UchBusAccess (sword port) cerr << "cannot bind socket to port " << port << "\n"; return; } - SetMode (IOReadWrite); +// SetMode (IOReadWrite); + SetMode (IORead); /** set up stream **/ Server = new UchBusServer (this); @@ -363,9 +460,21 @@ UchBusAccess :: UchBusAccess (sword port) Add (UchMpx); } +void +UchBusAccess :: Clear () +{ + /* say bye to all remote agents, remove and delete them */ + + UchBusAgent* a; + while (a = Agents.RemoveFirst ()) { + a->SayBye (); + delete a; + } +} UchBusAccess :: ~UchBusAccess () { + } /* this happens when someone sends a broadcast on the bus: a newcomer */ @@ -406,13 +515,14 @@ UchBusAccess :: HandleRead () return; } - printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port ); + if (_BusDebug) + printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port ); /* Create agent handle */ UchBusAgent* ag = new UchBusAgent (remote_host, remote_listen_port, this); - /* send subscriptions */ - ag->SendLocalSubscriptions (); + /* send name and subscriptions */ + ag->SayHello (); } @@ -424,7 +534,11 @@ UchBusAccess :: HandleEvent (int id, const char* msg) cerr << "bad regexp id " << id << "\n"; return; } - cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n"; + + if (_BusDebug) + cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n"; + UchBusEvent* ev = new UchBusEvent (s->GetRegexp (), msg); + s->Dispatch (*ev); } /* this is the handler for SIGINT */ @@ -432,12 +546,32 @@ void UchBusAccess :: DeferredHandle (int) { cerr << "quitting\n"; + Clear (); UchStop (); } +bool +UchBusAccess :: CheckUnicity (UchBusAgent* a, sword id) +{ + CcuListIterOf li = Agents; + while (++li) { + if ((*li)->RemoteId == id && a != (*li)) + return false; + } + return true; +} + void -UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* regexp) +UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* fmt, ...) { + /* build regexp */ + char regexp[4096]; + va_list ap; + + va_start (ap, fmt); + vsprintf (regexp, fmt, ap); + va_end (ap ); + /* store new subscription */ UchBusTrigger* s = new UchBusTrigger (regexp); LocalSubscriptions.Append (s); @@ -457,6 +591,7 @@ UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* regexp) void UchBusAccess :: Emit (const char* fmt, ...) { + /* build message */ char message[4096]; va_list ap; @@ -466,6 +601,70 @@ UchBusAccess :: Emit (const char* fmt, ...) CcuListIterOf a = Agents; while (++a) - (*a)->Emit (message); + (*a)->EmitEvent (message); } + +DnnEventType* UchAgentEvent::BusNewAgent = new DnnEventType ("new agent", 0, 0); +DnnEventType* UchAgentEvent::BusAgentBye = new DnnEventType ("agent bye", 0, 0); + +UchBusEvent :: UchBusEvent (const char* r, const char* m) +: DnnEvent (), + Regexp (r), + NbMatches (0), + Matches (m), + MatchList () +{ + register const char* p = m; + register char* q = (char*) m; + bool end = false; + while (!end) { + switch (*q) { + case '\0': + end = true; + if (q == p) + break; + /* else fall through */ + case ETX: + NbMatches++; + *q = '\0'; + MatchList.Append (new CcuString (p)); + p = q + 1; + /* fall through */ + default: + ++q; + } + } +} + + +UchBusEvent :: ~UchBusEvent () +{ + CcuListIterOf li = MatchList; + while (++li) + delete (*li); +} + +UchAgentEvent :: UchAgentEvent (UchBusAgent* a, DnnEventType* t) +: DnnEvent (t), + Agent (a) +{ +} + + +UchAgentEvent :: ~UchAgentEvent () +{ +} + +UchDirectEvent :: UchDirectEvent (UchBusAgent* a, const char* m) +: DnnEvent (), + Agent (a), + Msg (m) +{ +} + + +UchDirectEvent::~UchDirectEvent () +{ +} + -- cgit v1.1