summaryrefslogtreecommitdiff
path: root/comm
diff options
context:
space:
mode:
authorchatty2000-11-28 17:07:43 +0000
committerchatty2000-11-28 17:07:43 +0000
commit6cea474b7fbf7606d0ab92b24e7ab9f3a7f7d460 (patch)
tree030b95692a3f0eabe4e4d6a19ae83fee7f4bdc43 /comm
parent66caeca4d381a66409c3fc4e812a3e2e99fae081 (diff)
downloadivy-league-6cea474b7fbf7606d0ab92b24e7ab9f3a7f7d460.zip
ivy-league-6cea474b7fbf7606d0ab92b24e7ab9f3a7f7d460.tar.gz
ivy-league-6cea474b7fbf7606d0ab92b24e7ab9f3a7f7d460.tar.bz2
ivy-league-6cea474b7fbf7606d0ab92b24e7ab9f3a7f7d460.tar.xz
Now handle Die
Added events Protocol version 3 Fixed IOReadWrite
Diffstat (limited to 'comm')
-rw-r--r--comm/BusAccess.cc329
1 files changed, 264 insertions, 65 deletions
diff --git a/comm/BusAccess.cc b/comm/BusAccess.cc
index c92a52c..826146a 100644
--- a/comm/BusAccess.cc
+++ b/comm/BusAccess.cc
@@ -18,20 +18,30 @@
#include "ccu/String.h"
#include "dnn/Trigger.h"
#include "dnn/Reaction.h"
+#include "ccu/Signal.h"
+#include <stdlib.h>
#include <stdio.h>
#include <ostream.h>
#include <regex.h>
#include <stdarg.h>
-int UchBusAccess::Version = 2;
+int UchBusAccess::Version = 3;
+
+static int _BusDebug = 0;
+
+const unsigned char STX = '\002';
+const unsigned char ETX = '\003';
typedef enum {
- BusBye, /* quitte l'application ( non utilise ) */
+ BusBye, /* quitte l'application */
BusRegexp, /* expression reguliere d'un client */
BusMsg, /* message reel */
BusError, /* error message */
BusDelRegexp, /* Remove expression reguliere */
- BusReady
+ BusReady,
+ BusHello,
+ BusDirectMsg,
+ BusDie
} BusMsgType;
@@ -68,23 +78,6 @@ public:
void HandleRead ();
};
-class UchBusAgent : public UchBufStream {
-protected:
- CcuListOf<UchBusSubscription> RemoteSubscriptions;
- UchBusAccess* MyBus;
-
- void WriteString (const char*);
- void ProcessLine (const char*);
-
-public:
- UchBusAgent (int, UchBusAccess*);
- UchBusAgent (const char*, sword, UchBusAccess*);
- void HandleRead ();
- void SendLocalSubscription (const UchBusTrigger*);
- void SendLocalSubscriptions ();
- void Emit (const char*);
-};
-
/* this is used for local subscriptions */
UchBusTrigger :: UchBusTrigger (const char* r)
: Id (-1),
@@ -126,19 +119,22 @@ UchBusServer :: HandleRead ()
}
/* create agent handle */
- cout << "Someone was there. Hello!\n";
+ if (_BusDebug)
+ cout << "Someone was there. Hello!\n";
UchBusAgent* a = new UchBusAgent (fd, MyBus);
-
- /* send subscriptions*/
- a->SendLocalSubscriptions ();
+
+ /* send name and subscriptions */
+ a->SayHello ();
}
UchBusAgent :: UchBusAgent (int fd, UchBusAccess* bus)
: UchBufStream (),
RemoteSubscriptions (),
- MyBus (bus)
+ MyBus (bus),
+ DirectMsg (16)
{
- SetMode (IOReadWrite);
+// SetMode (IOReadWrite);
+ SetMode (IORead);
UchChannel::Open (fd);
Add (UchMpx);
MyBus->Agents.Append (this);
@@ -148,13 +144,14 @@ UchBusAgent :: UchBusAgent (int fd, UchBusAccess* bus)
UchBusAgent :: UchBusAgent (const char* host, sword port, UchBusAccess *bus)
: UchBufStream (0, new UchInetAddress (host, port)),
RemoteSubscriptions (),
- MyBus (bus)
+ MyBus (bus),
+ DirectMsg (16)
{
if (!Setup ()) {
cerr << "cannot set up stream on " << host << ":" << port << "\n";
return;
}
- SetMode (IOReadWrite);
+ SetMode (IORead);
Add (UchMpx);
MyBus->Agents.Append (this);
}
@@ -178,28 +175,49 @@ UchBusAgent :: WriteString (const char* s)
}
void
-UchBusAgent :: SendLocalSubscriptions ()
+UchBusAgent :: SayHello ()
{
- cout << "sending subscriptions\n";
+ CcuSignalBlocker b (SigPipe);
+ char buf[32];
+
+ /* Say hello */
+ sprintf (buf, "%d %d%c", BusHello, MyBus->ListenPort, STX);
+ WriteString (buf);
+ WriteString (MyBus->Name);
+ WriteString ("\n");
+
+ if (_BusDebug)
+ cout << "sending subscriptions\n";
CcuListIterOf<UchBusTrigger> s = MyBus->LocalSubscriptions;
while (++s)
SendLocalSubscription (*s);
/* say I'm ready */
- char buf[32];
- sprintf (buf, "%d %d\n", BusReady, 0);
- *this << buf;
+ sprintf (buf, "%d 0%c\n", BusReady, STX);
+ WriteString (buf);
}
void
UchBusAgent :: SendLocalSubscription (const UchBusTrigger* s)
{
char buf[32];
- sprintf (buf, "%d %d ", BusRegexp, s->GetId ());
+ sprintf (buf, "%d %d%c", BusRegexp, s->GetId (), STX);
WriteString (buf);
WriteString (s->GetRegexp ());
WriteString ("\n");
- cout << "Sending regexp: " << buf << s->GetRegexp () << '\n';
+ if (_BusDebug)
+ cout << "Sending regexp: " << buf << s->GetRegexp () << '\n';
+}
+
+void
+UchBusAgent :: SayBye ()
+{
+ CcuSignalBlocker b (SigPipe);
+ char buf[32];
+
+ /* Say bye */
+ sprintf (buf, "%d 0%c\n", BusBye, STX);
+ WriteString (buf);
}
@@ -235,23 +253,29 @@ UchBusAgent :: HandleRead ()
}
}
+static int id = 0;
-#if 0
- byte buf [256];
- int nb_read = Read (buf, 256);
- if (nb_read <= 0) {
- delete this;
- return;
- }
- buf[nb_read] = '\0';
- cout << "agent says:" << (const char*) buf;
+static int
+has_id (UchBusSubscription* s)
+{
+ return (s->GetId () == id);
}
-#endif
void
UchBusAgent :: ProcessLine (const char* line)
{
- /* extract */
+ /* message structure is:
+ Bye: "0 0\002"
+ Regexp: "1 23\002b(la)bl(a*)b(la)"
+ Msg: "2 23\002match1\003match2\003match3"
+ Error: "3 0\002error!"
+ DelRegexp: "4 0\002"
+ Ready: "5 0\002"
+ Hello: "6 12526\002TESTAGENT"
+ DirectMsg: "7 23\002blablabla"
+ */
+
+ /* extract message header */
int msg_type, id, nb_chars;
int nb_fields = sscanf (line,"%d %d%n", &msg_type, &id, &nb_chars);
if (nb_fields != 2) {
@@ -260,30 +284,69 @@ UchBusAgent :: ProcessLine (const char* line)
}
const char* p = (const char*) (line+nb_chars);
+ DnnEvent* ev;
switch (msg_type) {
case BusBye:
- cout << "agent says goodbye\n";
+ if (_BusDebug)
+ cout << "[bye " << Name << "]\n";
+ ev = new UchAgentEvent (this, UchAgentEvent::BusAgentBye);
+ Bye.Dispatch (*ev);
delete this;
break;
case BusRegexp:
+ if (_BusDebug)
+ cout << "[regexp " << Name << " " << p+1 << "]\n";
RemoteSubscriptions.Append (new UchBusSubscription (id, p+1));
break;
case BusMsg:
- MyBus->HandleEvent (id, p);
+ if (_BusDebug)
+ cout << "[event " << Name << " " << p+1 << "]\n";
+ MyBus->HandleEvent (id, p+1);
break;
case BusDelRegexp:
- cerr << "not implemented: DelRegexp\n";
+ if (_BusDebug)
+ cout << "[del " << Name << " " << id << "]\n";
+ ::id = id;
+ RemoteSubscriptions.Remove (has_id);
break;
case BusReady:
+ if (_BusDebug)
+ cout << "[ready " << Name << "]\n";
+ ev = new UchAgentEvent (this, UchAgentEvent::BusNewAgent);
+ MyBus->NewAgents.Dispatch (*ev);
+ break;
+ case BusDirectMsg:
+ {
+ if (_BusDebug)
+ cout << "[direct " << Name << id << " " << p+1 << "]\n";
+ ev = new UchDirectEvent (this, p+1);
+ DnnTrigger* t = DirectMsg[id];
+ t->Dispatch (*ev);
+ break;
+ }
+ case BusHello:
+ Name = p+1;
+ if (_BusDebug)
+ cout << "[hello " << Name << "]\n";
+ if (id && !MyBus->CheckUnicity (this, id))
+ delete this;
+ else
+ RemoteId = id;
break;
case BusError:
break;
+ case BusDie:
+ /* too violent */
+ if (_BusDebug)
+ cout << "[die]\n";
+ exit (0);
}
}
void
-UchBusAgent :: Emit (const char* msg)
+UchBusAgent :: EmitEvent (const char* msg)
{
+ CcuSignalBlocker b (SigPipe);
const int MAX_MATCHING_ARGS = 20;
regmatch_t pmatch[MAX_MATCHING_ARGS+1];
@@ -295,26 +358,59 @@ UchBusAgent :: Emit (const char* msg)
if (regexec (s->GetCompiled (), msg, MAX_MATCHING_ARGS, pmatch, 0) != 0)
continue;
- cerr << msg << " matches " << s->GetRegexp () << "\n";
/* if yes, emit it as such */
+ if (_BusDebug)
+ cout << msg << " matches " << s->GetRegexp () << "\n";
char buf[64];
- sprintf (buf, "%d %d", BusMsg, s->GetId ());
+ sprintf (buf, "%d %d%c", BusMsg, s->GetId (), STX);
WriteString (buf);
+#ifdef GNU_REGEXP
regmatch_t* p = &pmatch[1];
while (p->rm_so != -1) {
- char buf[256];
- sprintf (buf, " %.*s", p->rm_eo - p->rm_so, msg + p->rm_so);
+ char buf[4096];
+ sprintf (buf, "%.*s%c", p->rm_eo - p->rm_so, msg + p->rm_so, ETX);
WriteString (buf);
++p;
}
+#else
+ for (int i = 1; i < s->GetCompiled ()->re_nsub+1; i ++ ) {
+ if (pmatch[i].rm_so != -1 ) {
+ sprintf (buf, "%.*s%c", pmatch[i].rm_eo - pmatch[i].rm_so, msg + pmatch[i].rm_so, ETX);
+ } else {
+ sprintf (buf, "%c", ETX);
+ }
+ WriteString (buf);
+ }
+#endif
WriteString ("\n");
}
}
+/* emit direct message */
+void
+UchBusAgent :: Emit (int id, const char* fmt, ...)
+{
+ /* build message */
+ char message[4096];
+ va_list ap;
+
+ va_start (ap, fmt);
+ vsprintf (message, fmt, ap);
+ va_end (ap );
+
+ /* send it */
+ CcuSignalBlocker b (SigPipe);
+ char buf[32];
+ sprintf (buf, "%d %d%c", BusDirectMsg, id, STX);
+ WriteString (buf);
+ WriteString (message);
+ WriteString ("\n");
+}
-UchBusAccess :: UchBusAccess (sword port)
+UchBusAccess :: UchBusAccess (const char* name, sword port)
: UchDatagram (),
UchBaseSignalHandler (*UchMpx, SigInt),
+ Name (name),
ListenPort (0),
BroadcastPort (port),
Server (0),
@@ -326,15 +422,15 @@ UchBusAccess :: UchBusAccess (sword port)
UchInetAddress* a = new UchInetAddress (ANYADDR, port);
BindTo (a);
if (!Open ()) {
- cerr << "Cannot open socket\n";
+ perror ("Cannot open socket: ");
return;
}
if (!AllowBroadcast ()) {
- cerr << "socket won't broadcast\n";
+ perror ("socket won't broadcast: ");
return;
}
if (!ReuseAddress ()) {
- cerr << "socket won't let address be reused\n";
+ perror ("socket won't let address be reused: ");
return;
}
@@ -342,7 +438,8 @@ UchBusAccess :: UchBusAccess (sword port)
cerr << "cannot bind socket to port " << port << "\n";
return;
}
- SetMode (IOReadWrite);
+// SetMode (IOReadWrite);
+ SetMode (IORead);
/** set up stream **/
Server = new UchBusServer (this);
@@ -363,9 +460,21 @@ UchBusAccess :: UchBusAccess (sword port)
Add (UchMpx);
}
+void
+UchBusAccess :: Clear ()
+{
+ /* say bye to all remote agents, remove and delete them */
+
+ UchBusAgent* a;
+ while (a = Agents.RemoveFirst ()) {
+ a->SayBye ();
+ delete a;
+ }
+}
UchBusAccess :: ~UchBusAccess ()
{
+
}
/* this happens when someone sends a broadcast on the bus: a newcomer */
@@ -406,13 +515,14 @@ UchBusAccess :: HandleRead ()
return;
}
- printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port );
+ if (_BusDebug)
+ printf ("Arrivee de %s:%hu port %hu\n", remote_host, remote_port, remote_listen_port );
/* Create agent handle */
UchBusAgent* ag = new UchBusAgent (remote_host, remote_listen_port, this);
- /* send subscriptions */
- ag->SendLocalSubscriptions ();
+ /* send name and subscriptions */
+ ag->SayHello ();
}
@@ -424,7 +534,11 @@ UchBusAccess :: HandleEvent (int id, const char* msg)
cerr << "bad regexp id " << id << "\n";
return;
}
- cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n";
+
+ if (_BusDebug)
+ cout << "REGEXP " << s->GetId () << " MATCHED: " << msg << "\n";
+ UchBusEvent* ev = new UchBusEvent (s->GetRegexp (), msg);
+ s->Dispatch (*ev);
}
/* this is the handler for SIGINT */
@@ -432,12 +546,32 @@ void
UchBusAccess :: DeferredHandle (int)
{
cerr << "quitting\n";
+ Clear ();
UchStop ();
}
+bool
+UchBusAccess :: CheckUnicity (UchBusAgent* a, sword id)
+{
+ CcuListIterOf<UchBusAgent> li = Agents;
+ while (++li) {
+ if ((*li)->RemoteId == id && a != (*li))
+ return false;
+ }
+ return true;
+}
+
void
-UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* regexp)
+UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* fmt, ...)
{
+ /* build regexp */
+ char regexp[4096];
+ va_list ap;
+
+ va_start (ap, fmt);
+ vsprintf (regexp, fmt, ap);
+ va_end (ap );
+
/* store new subscription */
UchBusTrigger* s = new UchBusTrigger (regexp);
LocalSubscriptions.Append (s);
@@ -457,6 +591,7 @@ UchBusAccess :: Subscribe (DnnBaseReaction& r, const char* regexp)
void
UchBusAccess :: Emit (const char* fmt, ...)
{
+ /* build message */
char message[4096];
va_list ap;
@@ -466,6 +601,70 @@ UchBusAccess :: Emit (const char* fmt, ...)
CcuListIterOf <UchBusAgent> a = Agents;
while (++a)
- (*a)->Emit (message);
+ (*a)->EmitEvent (message);
}
+
+DnnEventType* UchAgentEvent::BusNewAgent = new DnnEventType ("new agent", 0, 0);
+DnnEventType* UchAgentEvent::BusAgentBye = new DnnEventType ("agent bye", 0, 0);
+
+UchBusEvent :: UchBusEvent (const char* r, const char* m)
+: DnnEvent (),
+ Regexp (r),
+ NbMatches (0),
+ Matches (m),
+ MatchList ()
+{
+ register const char* p = m;
+ register char* q = (char*) m;
+ bool end = false;
+ while (!end) {
+ switch (*q) {
+ case '\0':
+ end = true;
+ if (q == p)
+ break;
+ /* else fall through */
+ case ETX:
+ NbMatches++;
+ *q = '\0';
+ MatchList.Append (new CcuString (p));
+ p = q + 1;
+ /* fall through */
+ default:
+ ++q;
+ }
+ }
+}
+
+
+UchBusEvent :: ~UchBusEvent ()
+{
+ CcuListIterOf<CcuString> li = MatchList;
+ while (++li)
+ delete (*li);
+}
+
+UchAgentEvent :: UchAgentEvent (UchBusAgent* a, DnnEventType* t)
+: DnnEvent (t),
+ Agent (a)
+{
+}
+
+
+UchAgentEvent :: ~UchAgentEvent ()
+{
+}
+
+UchDirectEvent :: UchDirectEvent (UchBusAgent* a, const char* m)
+: DnnEvent (),
+ Agent (a),
+ Msg (m)
+{
+}
+
+
+UchDirectEvent::~UchDirectEvent ()
+{
+}
+