summaryrefslogtreecommitdiff
path: root/comm/OLD/dgram.cc
diff options
context:
space:
mode:
authorchatty1993-04-07 11:50:31 +0000
committerchatty1993-04-07 11:50:31 +0000
commitba066c34dde204aa192d03a23a81356374d93731 (patch)
tree39391f6235d2cf8a59a0634ac5ea430cdd21f5d4 /comm/OLD/dgram.cc
parent05ab076e1c2a9ca16472f9a6b47b8d22914b3783 (diff)
downloadivy-league-ba066c34dde204aa192d03a23a81356374d93731.zip
ivy-league-ba066c34dde204aa192d03a23a81356374d93731.tar.gz
ivy-league-ba066c34dde204aa192d03a23a81356374d93731.tar.bz2
ivy-league-ba066c34dde204aa192d03a23a81356374d93731.tar.xz
Initial revision
Diffstat (limited to 'comm/OLD/dgram.cc')
-rw-r--r--comm/OLD/dgram.cc717
1 files changed, 717 insertions, 0 deletions
diff --git a/comm/OLD/dgram.cc b/comm/OLD/dgram.cc
new file mode 100644
index 0000000..d6e5c50
--- /dev/null
+++ b/comm/OLD/dgram.cc
@@ -0,0 +1,717 @@
+/*
+ * The Unix Channel
+ *
+ * by Michel Beaudouin-Lafon
+ *
+ * Copyright 1990-1993
+ * Laboratoire de Recherche en Informatique (LRI)
+ *
+ * Reliable datagrams - to be updated
+ *
+ * $Id$
+ * $CurLog$
+ */
+
+#include "MsgBuffer.h"
+#include "dgram.h"
+#include "error.h"
+
+#include <sys/ioctl.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <memory.h>
+
+// needed only if DEBUG is active
+#include <stdio.h>
+#define DEBUG //**//
+bool DGTrace = FALSE;
+
+/*?class UchDGRAM
+The class \typ{UchDGRAM} is derived from \typ{UchDatagram}.
+It adds to a datagram a simple protocol to ensure that all messages sent are received.
+It does not prevent message duplication, neither does he ensure that the messages are
+received in the order they are sent.
+
+Each message is sent along with a tag number. The sender keeps each message it sends
+until it receives an acknowledge for it.
+When a message is received, an acknowledge is sent back.
+Unacknowledged messages are resent at regular intervals.
+If a message is not acknowledged after a given number of retries, it is simply discarded.
+?*/
+
+struct PENDING {
+ UchMsgBuffer* outBuf;
+ pUchAddress toAddr;
+ short retries; // for pending
+ lword id; // for input
+
+ PENDING (UchMsgBuffer* b, UchAddress* a, int r = 0) { outBuf = b; toAddr = a; retries = r; id = 0;}
+ ~PENDING () { delete outBuf; toAddr = 0; }
+};
+
+static void
+DelPending (void* p)
+{
+ delete ((PENDING*) p);
+}
+
+/*?hidden?*/
+void
+UchDGRAM_TIMER :: Handle (Millisecond)
+{
+if (DGTrace) printf ("*** TimeOut ***\n");
+ dgram.Expired ();
+}
+
+/*?nodoc?*/
+UchDGRAM_TIMER :: UchDGRAM_TIMER (UchDGRAM& dg)
+: CcuBaseTimer (0),
+ dgram (dg)
+{
+}
+
+UchDGRAM_TIMER :: ~UchDGRAM_TIMER ()
+{
+}
+
+/*?hidden?*/
+void
+UchDGRAM :: Init ()
+{
+ npending = 0;
+ ninput = 0;
+ fromAddr = 0;
+ retry = 5;
+ resend = FALSE;
+ locked = 0;
+ timeout = 1000;
+ sync = FALSE;
+}
+
+/*?nextdoc?*/
+UchDGRAM :: UchDGRAM ()
+: pending (2), input (), timer (*this)
+{
+ Init ();
+}
+
+/*?
+Same constructors as for the class \typ{UchDatagram};
+?*/
+UchDGRAM :: UchDGRAM (UchAddress* b, UchAddress* c)
+: UchDatagram (b, c), pending (2), input (), timer (*this)
+{
+ Init ();
+}
+
+/*?
+The destructor of \typ{UchDGRAM} discards all pending and input messages.
+A derived class could call \fun{Drain} in its destructor to send all pending messages,
+and it could call \fun{Receive} while \fun{NumInput} is non null.
+?*/
+UchDGRAM :: ~UchDGRAM ()
+{
+ timer.Stop ();
+ if (! npending)
+ return;
+ for (CcuIdIter iter (pending); iter (); ++iter) {
+ RemovePending (iter.CurId ());
+ if (! npending)
+ break;
+ }
+ if (ninput) {
+ CcuListIter li (input);
+ while (++li)
+ DelPending (*li);
+ input.Clear ();
+ }
+}
+
+static const byte DGRAM_ERR = 0;
+static const byte DGRAM_SEND = 1;
+static const byte DGRAM_ACK = 2;
+
+// GetInput reads an incoming message
+// if its an ack, it is processed
+// if it is a normal message, it is stored in the input list
+// CheckInput calls GetInput wile there is something to read
+// WaitInput returns when a message is in the input list
+// Wait waits for the acknowledge of a particular id
+
+/*?hidden?*/
+int
+UchDGRAM :: GetInput (int len)
+{
+ UchMsgBuffer* buf = new UchMsgBuffer (len);
+ int n;
+ while ((n = UchDatagram :: Receive (*buf)) == -1 && errno == EINTR)
+DEBUG SysError (ErrWarn, "UchDGRAM::GetInput")
+ ;
+ if (n < 0) {
+ SysError (ErrWarn, "UchDGRAM::GetInput");
+ delete buf;
+ return DGRAM_ERR;
+ }
+DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : received %d bytes\n", buf->BufLength ());
+
+ lword id;
+ buf->Get (&id);
+ byte s;
+ buf->Get (&s);
+DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : id %x byte %d\n", id, s);
+
+ if (s == DGRAM_ACK) {
+DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : acknowledged %x\n", id);
+ RemovePending (id);
+ delete buf;
+ return DGRAM_ACK;
+ }
+
+ PENDING* p = new PENDING (buf, FAddr);
+ p->id = id;
+ input.Append (p);
+ ninput++;
+ return DGRAM_SEND;
+}
+
+/*?hidden?*/
+bool
+UchDGRAM :: CheckInput ()
+{
+ // get any pending messages
+ for (;;) {
+ lword np;
+ if (ioctl (FilDes (), FIONREAD, (char*) &np) < 0)
+ return FALSE;
+DEBUG if (DGTrace) if (np) printf ("UchDGRAM :: CheckInput : FIONREAD says %d\n", np);
+ if (np == 0)
+ break;
+ if (GetInput (int (np)) == DGRAM_ERR)
+ return FALSE;
+ }
+ return TRUE;
+}
+
+/*?hidden?*/
+bool
+UchDGRAM :: WaitInput ()
+{
+ // check the input queue
+ if (ninput)
+ return TRUE;
+
+ // check pending input
+ if (! CheckInput ())
+ return FALSE;
+
+ // block until something
+ while (! ninput)
+ if (GetInput (2048) == DGRAM_ERR) // *** arbitrary max size
+ return FALSE;
+ return TRUE;
+}
+
+/*?hidden?*/
+bool
+UchDGRAM :: Wait (lword id)
+{
+DEBUG if (DGTrace) printf ("UchDGRAM::Wait %x\n", id);
+ bool ret = FALSE;
+ bool loop = TRUE;
+ Lock ();
+
+ PENDING* pend;
+ while (loop) {
+ if (! CheckInput ())
+ break;
+
+ // did we receive the ack ?
+ pend = (PENDING*) pending.Get (id);
+ if (! pend) {
+ ret = TRUE;
+ break;
+ }
+
+ if (resend) {
+DEBUG if (DGTrace) printf ("UchDGRAM::Wait : resending\n");
+ if (pend->retries <= 0) {
+ RemovePending (id);
+ break;
+ }
+
+ int n = SendBuffer (* pend->outBuf, *pend->toAddr);
+ if (n < 0)
+ break;
+ --pend->retries;
+ resend = FALSE;
+ } else
+DEBUG if (DGTrace) printf ("UchDGRAM::Wait : waiting\n"),
+ timer.Wait (); // sets resend to TRUE
+ }
+ Unlock ();
+DEBUG if (DGTrace) printf ("UchDGRAM::Wait : %s\n", ret ? "done" : "failed");
+ return ret;
+}
+
+// SendAck sends an acknowledge
+// Expired is called when the resend timer has expired
+// PrepareToSend returns a buffer with the header set,
+// and adds it to the pending set
+// SendBuffer sends a buffer, handling interrupted sys calls
+// RemovePending removes a pending message from the table
+/*?hidden?*/
+void
+UchDGRAM :: SendAck (lword id, UchAddress& addr)
+{
+ UchMsgBuffer buf;
+ buf.Append (id);
+ buf.Append (DGRAM_ACK);
+ SendBuffer (buf, addr);
+ fromAddr = & addr;
+DEBUG if (DGTrace) printf ("UchDGRAM :: SendAck : acknowledging %x\n", id);
+}
+
+/*?hidden?*/
+void
+UchDGRAM :: Expired ()
+{
+ if (locked)
+ resend = TRUE;
+ else
+ Resend ();
+}
+
+/*?hidden?*/
+UchMsgBuffer*
+UchDGRAM :: PrepareToSend (UchAddress& to, int retries)
+{
+ UchMsgBuffer* obuf = new UchMsgBuffer;
+/*
+UchInetAddress* ia = (UchInetAddress*) &to;
+UchAddress* toCopy = new UchInetAddress (ia->Host (), ia->Port ());
+*/
+ if (retries == 0)
+ retries = retry;
+ PENDING* out = new PENDING (obuf, &to /*toCopy*/, retries);
+ outId = pending.Store (out);
+
+DEBUG if (DGTrace) printf ("UchDGRAM :: PrepareToSend : id %x\n", outId);
+ obuf->Append (outId);
+ obuf->Append (DGRAM_SEND);
+
+ if (! npending) {
+ timer.ChangePeriod (timeout);
+ timer.Restart ();
+ }
+ npending++;
+
+ return obuf;
+}
+
+/*?hidden?*/
+int
+UchDGRAM :: SendBuffer (UchMsgBuffer& buf, UchAddress& addr)
+{
+ int n;
+ while ((n = UchDatagram :: Send (buf, addr, TRUE)) == -1 && errno == EINTR)
+DEBUG SysError (ErrWarn, "UchDGRAM::SendBuffer")
+ ;
+ return n;
+}
+
+/*?hidden?*/
+void
+UchDGRAM :: RemovePending (lword id)
+{
+ PENDING* pend = (PENDING*) pending.Get (id);
+ if (! pend) {
+ Error (ErrWarn, "Receive", "unrecognized ACK");
+ return;
+ }
+ pending.Remove (id);
+ delete pend;
+ npending--;
+ if (npending == 0)
+ timer.Stop ();
+}
+
+//---------------- the public functions
+
+/*?nextdoc?*/
+int
+UchDGRAM :: Send (byte* buf, int len, UchAddress& to, bool ack, int retries)
+{
+DEBUG if (DGTrace) printf ("UchDGRAM :: Send\n");
+ Lock ();
+ UchMsgBuffer* obuf = PrepareToSend (to, retries);
+ obuf->Append (buf, len);
+ int n = SendBuffer (*obuf, to);
+ CheckInput ();
+ int ret = (ack || sync) ? (Wait (outId) ? n : -1) : n;
+ Unlock ();
+ return ret;
+}
+
+/*?nextdoc?*/
+int
+UchDGRAM :: Receive (byte* buf, int len)
+{
+DEBUG if (DGTrace) printf ("UchDGRAM :: Receive\n");
+ if (! WaitInput ())
+ return -1;
+ PENDING* p = (PENDING*) input.RemoveFirst ();
+ ninput--;
+ int n = p->outBuf->BufLength ();
+ if (len < n)
+ n = len;
+ memcpy (buf, p->outBuf->Buffer (), n);
+ SendAck (p->id, *p->toAddr);
+ delete p;
+DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received %d bytes\n", n);
+ return n;
+}
+
+/*?nextdoc?*/
+int
+UchDGRAM :: Reply (byte* buf, int len, bool ack, int retries)
+{
+ if (! fromAddr)
+ return -1;
+ return Send (buf, len, *fromAddr, ack, retries);
+}
+
+/*?nextdoc?*/
+int
+UchDGRAM :: Send (UchMsgBuffer& buf, UchAddress& to, bool peek, bool ack, int retries)
+{
+DEBUG if (DGTrace) printf ("UchDGRAM :: Send\n");
+ Lock ();
+ UchMsgBuffer* obuf = PrepareToSend (to, retries);
+ obuf->Append (buf.Buffer (), buf.BufLength ());
+ if (! peek)
+ buf.Flush ();
+ int n = SendBuffer (*obuf, to);
+ CheckInput ();
+ int ret = (ack || sync) ? (Wait (outId) ? n : -1) : n;
+ Unlock ();
+ return ret;
+}
+
+/*?nextdoc?*/
+int
+UchDGRAM :: Receive (UchMsgBuffer& buf)
+{
+DEBUG if (DGTrace) printf ("UchDGRAM :: Receive\n");
+ if (! WaitInput ())
+ return -1;
+ PENDING* p = (PENDING*) input.RemoveFirst ();
+ ninput--;
+ int n = p->outBuf->BufLength ();
+ buf.Append (p->outBuf->Buffer (), n);
+ SendAck (p->id, *p->toAddr);
+ delete p;
+DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received buffer %d bytes\n", n);
+ return n;
+}
+
+/*?
+These functions are similar to the same functions in the class \typ{UchDatagram},
+except that they manage messages acknowledgement.
+\fun{Send} saves the message in a buffer so that it can be resent if no acknowledge
+is received.
+\fun{Receive} acknowledges the received message.
+\fun{Reply} sends the message to the sender of the last received message.
+If \var{ack} is TRUE, the acknowledge of the message is waited for.
+In that case, \fun{Send} returns -1 if the acknowledge is not received.
+If \var{retries} is non zero, it specifies a number of retries for this message
+different from the default retry number of this dgram.
+?*/
+int
+UchDGRAM :: Reply (UchMsgBuffer& buf, bool peek, bool ack, int retries)
+{
+ if (! fromAddr)
+ return -1;
+ return Send (buf, *fromAddr, peek, ack, retries);
+}
+
+/*?nextdoc?*/
+bool
+UchDGRAM :: Send (UchMessage& msg, UchAddress& to, bool ack, int retries)
+{
+DEBUG if (DGTrace) printf ("UchDGRAM :: Send message\n");
+ Lock ();
+ UchMsgBuffer* obuf = PrepareToSend (to, retries);
+ obuf->Append (msg);
+// int l;
+// printf ("outBuffer is %d bytes long\n", l = obuf->BufLength ());
+// for (int i = 0; i < l; i++) printf ("%02x ", obuf->Buffer () [i]);
+// printf ("\n");
+ int n = SendBuffer (*obuf, to);
+ CheckInput ();
+ bool ret;
+ if (ack || sync)
+ ret = Wait (outId) ? TRUE : FALSE;
+ else
+ ret = bool (n != obuf->BufLength ());
+ Unlock ();
+ return ret;
+}
+
+/*?nextdoc?*/
+bool
+UchDGRAM :: Receive (UchMessage* msg)
+{
+DEBUG if (DGTrace) printf ("UchDGRAM :: Receive message\n");
+ if (! WaitInput ())
+ return FALSE;
+ PENDING* p = (PENDING*) input.RemoveFirst ();
+ ninput--;
+// int n;
+// printf ("inBuffer is %d bytes long\n", n = p->outBuf->BufLength ());
+// for (int i = 0; i < n; i++) printf ("%02x ", p->outBuf->Buffer () [i]);
+// printf ("\n");
+ if (! p->outBuf->Get (msg))
+ return FALSE;
+ SendAck (p->id, *p->toAddr);
+ delete p;
+DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received message\n");
+ return TRUE;
+}
+
+/*?
+These functions are similar to the previous functions except that they take
+a \typ{UchMessage} as argument.
+\fun{Send} converts the message in the output buffer.
+If \var{ack} is TRUE, the acknowledge of the message is waited for.
+In that case, \fun{Send} returns FALSE if the acknowledge is not received.
+If \var{retries} is non zero, it specifies a number of retries for this message
+different from the default retry number of this dgram.
+\fun{Receive} converts the incoming data into the message passed as argument.
+\fun{Reply} sends the message to the sender of the last received message.
+See also the function \fun{NewMessage} to handle incoming messages.
+?*/
+bool
+UchDGRAM :: Reply (UchMessage& msg, bool ack, int retries)
+{
+ if (! fromAddr)
+ return FALSE;
+ return Send (msg, *fromAddr, ack, retries);
+}
+
+//-----------------------
+#if 0
+bool
+UchDGRAM :: Ask (UchMessage& msg, UchAddress& to)
+{
+DEBUG if (DGTrace) printf ("UchDGRAM :: Ask\n");
+ Lock ();
+ UchMsgBuffer* obuf = PrepareToSend (to, DGRAM_ASK *******);
+ obuf->Append (msg);
+ int n = SendBuffer (*obuf, to);
+ CheckInput ();
+ bool ret = Wait (outId) ? TRUE : FALSE;
+ if (ret) {
+ // wait answer
+ for (;;) {
+ if (! WaitInput ())
+ continue;
+ PENDING* pend = (PENDING*) input.Last ();
+ if (pend->toAddr == to.............
+ }
+ }
+ Unlock ();
+ return ret;
+}
+#endif
+//-----------------------
+
+/*?
+Resend all messages for which no acknowledge has been received.
+If a UchMessage.has been resent more than the retry number, it is discarded.
+When successive messages are to be sent to the same address, only the first
+is resent, to avoid connection overflow.
+?*/
+void
+UchDGRAM :: Resend ()
+{
+if (DGTrace) printf ("UchDGRAM::Resend\n");
+ Lock ();
+ if (npending)
+ CheckInput ();
+ if (! npending) {
+ resend = FALSE;
+ Unlock ();
+ return;
+ }
+
+ UchAddress* addr = 0;
+
+ PENDING* pend;
+ for (CcuIdIter iter (pending); pend = (PENDING*) iter (); ++iter) {
+ if (pend->retries <= 0) {
+ // the output buffer contains the leading id and type
+ UchMsgBuffer fake (*pend->outBuf);
+ lword id;
+ byte typ;
+ fake.Get (&id);
+ fake.Get (&typ);
+ if (! DiscardNotify (fake, *pend->toAddr)) {
+ pend->retries = retry; // *** should be controllable ?
+ continue;
+ }
+if (DGTrace) printf ("UchDGRAM::Resend : abandonning %x\n", iter.CurId ());
+ RemovePending (iter.CurId ());
+ if (! npending)
+ break;
+ } else
+ if (addr != (UchAddress*) pend->toAddr) {
+DEBUG printf ("UchDGRAM::Resend : resending %x\n", iter.CurId ());
+ SendBuffer (* pend->outBuf, *pend->toAddr);
+ --pend->retries;
+ addr = pend->toAddr;
+ CheckInput ();
+ }
+DEBUG else printf ("UchDGRAM::Resend : skipping %x\n", iter.CurId ());
+ }
+
+ resend = FALSE;
+ Unlock ();
+}
+
+/*?
+This virtual function is called whenever a pending message is about
+to be discarded because no acknowledge has been received after
+the the default number of retries.
+If it returns FALSE, the message is not discarded and its retry count is reset to zero.
+Returning FALSE is not very social.
+The default action is to return TRUE, thus discarding the pending message.
+?*/
+bool
+UchDGRAM :: DiscardNotify (UchMsgBuffer&, UchAddress&)
+{
+ return TRUE;
+}
+
+/*?
+Set the interval between retries.
+The default value is 1000 (1 second).
+?*/
+void
+UchDGRAM :: SetRetryTime (Millisecond m)
+{
+ timeout = m;
+ if (npending) {
+ timer.ChangePeriod (m);
+ timer.Restart ();
+ }
+}
+
+/*?
+Wait until all acknowledges have been received,
+or until all messages have been discarded.
+?*/
+void
+UchDGRAM :: Drain ()
+{
+ if (npending)
+ CheckInput ();
+ while (npending)
+ timer.Wait ();
+}
+
+/*?
+This virtual function is called by \fun{HandleRead}.
+It must be redefined in a derived class if \fun{HandleRead} is to be used
+(for instance if this \typ{UchDGRAM} is put in a channel set).
+This function should convert the contents of the buffer into a message, and handle it.
+It should return TRUE if the message was correctly handled, else FALSE.
+Note that if it returns FALSE, the message will not be acknowledged, and thus it will
+be resent later.
+The default behaviour of this virtual function is to issue an error message and
+to return TRUE.
+?*/
+bool
+UchDGRAM :: NewMessage (UchMsgBuffer&)
+{
+ Error (ErrWarn, "UchDGRAM :: NewMessage", "should be defined in derived class");
+ return TRUE;
+}
+
+/*?
+This is an instance of the virtual function \fun{HandleRead} of class \typ{UchChannel}.
+It calls the virtual function \fun{NewMessage} when a message is received.
+If \fun{NewMessage} returns TRUE, an acknowledge is sent back.
+This functions also handles incoming acknowledges.
+?*/
+void
+UchDGRAM :: HandleRead ()
+{
+ if (! CheckInput ())
+ return;
+ if (! ninput)
+ return;
+
+DEBUG if (DGTrace) printf (">>UchDGRAM :: HandleRead\n");
+ PENDING* p = (PENDING*) input.RemoveFirst ();
+ ninput--;
+ if (NewMessage (* p->outBuf))
+ SendAck (p->id, *p->toAddr);
+DEBUG if (DGTrace) printf ("<<UchDGRAM :: HandleRead\n");
+}
+
+/*?
+This is an instance of the virtual function \fun{HandleSelect} of class \typ{UchChannel}.
+It calls the virtual function \fun{NewMessage} when a message is in the input buffer.
+If \fun{NewMessage} returns TRUE, an acknowledge is sent back.
+This functions also handles incoming acknowledges.
+?*/
+bool
+UchDGRAM :: HandleSelect ()
+{
+ if (! ninput)
+ return FALSE;
+DEBUG if (DGTrace) printf (">>UchDGRAM :: HandleSelect\n");
+ PENDING* p = (PENDING*) input.RemoveFirst ();
+ ninput--;
+ if (NewMessage (* p->outBuf))
+ SendAck (p->id, *p->toAddr);
+DEBUG if (DGTrace) printf ("<<UchDGRAM :: HandleSelect\n");
+ return TRUE;
+}
+
+#ifdef DOC
+// fake entries for documentation
+
+/*?
+Return the number of pending messages
+?*/
+int
+UchDGRAM :: NumPending ()
+{ }
+
+/*?
+Return the number of waiting input messages
+?*/
+int
+UchDGRAM :: NumInput ()
+{ }
+
+/*?
+Set the number of times a message is resent until it is discarded.
+The default value is 5.
+?*/
+void
+UchDGRAM :: SetRetry (short r)
+{ }
+
+/*?
+Set the synchronous mode of this \typ{UchDGRAM}.
+If \var{s} is TRUE, the \fun{Send} functions always wait for the acknowledge of the messages.
+The default value is FALSE.
+?*/
+void
+UchDGRAM :: SetSync (bool s)
+{ }
+
+#endif /* DOC */
+