From ba066c34dde204aa192d03a23a81356374d93731 Mon Sep 17 00:00:00 2001 From: chatty Date: Wed, 7 Apr 1993 11:50:31 +0000 Subject: Initial revision --- comm/OLD/dgram.cc | 717 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 717 insertions(+) create mode 100644 comm/OLD/dgram.cc (limited to 'comm/OLD/dgram.cc') diff --git a/comm/OLD/dgram.cc b/comm/OLD/dgram.cc new file mode 100644 index 0000000..d6e5c50 --- /dev/null +++ b/comm/OLD/dgram.cc @@ -0,0 +1,717 @@ +/* + * The Unix Channel + * + * by Michel Beaudouin-Lafon + * + * Copyright 1990-1993 + * Laboratoire de Recherche en Informatique (LRI) + * + * Reliable datagrams - to be updated + * + * $Id$ + * $CurLog$ + */ + +#include "MsgBuffer.h" +#include "dgram.h" +#include "error.h" + +#include +#include +#include +#include + +// needed only if DEBUG is active +#include +#define DEBUG //**// +bool DGTrace = FALSE; + +/*?class UchDGRAM +The class \typ{UchDGRAM} is derived from \typ{UchDatagram}. +It adds to a datagram a simple protocol to ensure that all messages sent are received. +It does not prevent message duplication, neither does he ensure that the messages are +received in the order they are sent. + +Each message is sent along with a tag number. The sender keeps each message it sends +until it receives an acknowledge for it. +When a message is received, an acknowledge is sent back. +Unacknowledged messages are resent at regular intervals. +If a message is not acknowledged after a given number of retries, it is simply discarded. +?*/ + +struct PENDING { + UchMsgBuffer* outBuf; + pUchAddress toAddr; + short retries; // for pending + lword id; // for input + + PENDING (UchMsgBuffer* b, UchAddress* a, int r = 0) { outBuf = b; toAddr = a; retries = r; id = 0;} + ~PENDING () { delete outBuf; toAddr = 0; } +}; + +static void +DelPending (void* p) +{ + delete ((PENDING*) p); +} + +/*?hidden?*/ +void +UchDGRAM_TIMER :: Handle (Millisecond) +{ +if (DGTrace) printf ("*** TimeOut ***\n"); + dgram.Expired (); +} + +/*?nodoc?*/ +UchDGRAM_TIMER :: UchDGRAM_TIMER (UchDGRAM& dg) +: CcuBaseTimer (0), + dgram (dg) +{ +} + +UchDGRAM_TIMER :: ~UchDGRAM_TIMER () +{ +} + +/*?hidden?*/ +void +UchDGRAM :: Init () +{ + npending = 0; + ninput = 0; + fromAddr = 0; + retry = 5; + resend = FALSE; + locked = 0; + timeout = 1000; + sync = FALSE; +} + +/*?nextdoc?*/ +UchDGRAM :: UchDGRAM () +: pending (2), input (), timer (*this) +{ + Init (); +} + +/*? +Same constructors as for the class \typ{UchDatagram}; +?*/ +UchDGRAM :: UchDGRAM (UchAddress* b, UchAddress* c) +: UchDatagram (b, c), pending (2), input (), timer (*this) +{ + Init (); +} + +/*? +The destructor of \typ{UchDGRAM} discards all pending and input messages. +A derived class could call \fun{Drain} in its destructor to send all pending messages, +and it could call \fun{Receive} while \fun{NumInput} is non null. +?*/ +UchDGRAM :: ~UchDGRAM () +{ + timer.Stop (); + if (! npending) + return; + for (CcuIdIter iter (pending); iter (); ++iter) { + RemovePending (iter.CurId ()); + if (! npending) + break; + } + if (ninput) { + CcuListIter li (input); + while (++li) + DelPending (*li); + input.Clear (); + } +} + +static const byte DGRAM_ERR = 0; +static const byte DGRAM_SEND = 1; +static const byte DGRAM_ACK = 2; + +// GetInput reads an incoming message +// if its an ack, it is processed +// if it is a normal message, it is stored in the input list +// CheckInput calls GetInput wile there is something to read +// WaitInput returns when a message is in the input list +// Wait waits for the acknowledge of a particular id + +/*?hidden?*/ +int +UchDGRAM :: GetInput (int len) +{ + UchMsgBuffer* buf = new UchMsgBuffer (len); + int n; + while ((n = UchDatagram :: Receive (*buf)) == -1 && errno == EINTR) +DEBUG SysError (ErrWarn, "UchDGRAM::GetInput") + ; + if (n < 0) { + SysError (ErrWarn, "UchDGRAM::GetInput"); + delete buf; + return DGRAM_ERR; + } +DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : received %d bytes\n", buf->BufLength ()); + + lword id; + buf->Get (&id); + byte s; + buf->Get (&s); +DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : id %x byte %d\n", id, s); + + if (s == DGRAM_ACK) { +DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : acknowledged %x\n", id); + RemovePending (id); + delete buf; + return DGRAM_ACK; + } + + PENDING* p = new PENDING (buf, FAddr); + p->id = id; + input.Append (p); + ninput++; + return DGRAM_SEND; +} + +/*?hidden?*/ +bool +UchDGRAM :: CheckInput () +{ + // get any pending messages + for (;;) { + lword np; + if (ioctl (FilDes (), FIONREAD, (char*) &np) < 0) + return FALSE; +DEBUG if (DGTrace) if (np) printf ("UchDGRAM :: CheckInput : FIONREAD says %d\n", np); + if (np == 0) + break; + if (GetInput (int (np)) == DGRAM_ERR) + return FALSE; + } + return TRUE; +} + +/*?hidden?*/ +bool +UchDGRAM :: WaitInput () +{ + // check the input queue + if (ninput) + return TRUE; + + // check pending input + if (! CheckInput ()) + return FALSE; + + // block until something + while (! ninput) + if (GetInput (2048) == DGRAM_ERR) // *** arbitrary max size + return FALSE; + return TRUE; +} + +/*?hidden?*/ +bool +UchDGRAM :: Wait (lword id) +{ +DEBUG if (DGTrace) printf ("UchDGRAM::Wait %x\n", id); + bool ret = FALSE; + bool loop = TRUE; + Lock (); + + PENDING* pend; + while (loop) { + if (! CheckInput ()) + break; + + // did we receive the ack ? + pend = (PENDING*) pending.Get (id); + if (! pend) { + ret = TRUE; + break; + } + + if (resend) { +DEBUG if (DGTrace) printf ("UchDGRAM::Wait : resending\n"); + if (pend->retries <= 0) { + RemovePending (id); + break; + } + + int n = SendBuffer (* pend->outBuf, *pend->toAddr); + if (n < 0) + break; + --pend->retries; + resend = FALSE; + } else +DEBUG if (DGTrace) printf ("UchDGRAM::Wait : waiting\n"), + timer.Wait (); // sets resend to TRUE + } + Unlock (); +DEBUG if (DGTrace) printf ("UchDGRAM::Wait : %s\n", ret ? "done" : "failed"); + return ret; +} + +// SendAck sends an acknowledge +// Expired is called when the resend timer has expired +// PrepareToSend returns a buffer with the header set, +// and adds it to the pending set +// SendBuffer sends a buffer, handling interrupted sys calls +// RemovePending removes a pending message from the table +/*?hidden?*/ +void +UchDGRAM :: SendAck (lword id, UchAddress& addr) +{ + UchMsgBuffer buf; + buf.Append (id); + buf.Append (DGRAM_ACK); + SendBuffer (buf, addr); + fromAddr = & addr; +DEBUG if (DGTrace) printf ("UchDGRAM :: SendAck : acknowledging %x\n", id); +} + +/*?hidden?*/ +void +UchDGRAM :: Expired () +{ + if (locked) + resend = TRUE; + else + Resend (); +} + +/*?hidden?*/ +UchMsgBuffer* +UchDGRAM :: PrepareToSend (UchAddress& to, int retries) +{ + UchMsgBuffer* obuf = new UchMsgBuffer; +/* +UchInetAddress* ia = (UchInetAddress*) &to; +UchAddress* toCopy = new UchInetAddress (ia->Host (), ia->Port ()); +*/ + if (retries == 0) + retries = retry; + PENDING* out = new PENDING (obuf, &to /*toCopy*/, retries); + outId = pending.Store (out); + +DEBUG if (DGTrace) printf ("UchDGRAM :: PrepareToSend : id %x\n", outId); + obuf->Append (outId); + obuf->Append (DGRAM_SEND); + + if (! npending) { + timer.ChangePeriod (timeout); + timer.Restart (); + } + npending++; + + return obuf; +} + +/*?hidden?*/ +int +UchDGRAM :: SendBuffer (UchMsgBuffer& buf, UchAddress& addr) +{ + int n; + while ((n = UchDatagram :: Send (buf, addr, TRUE)) == -1 && errno == EINTR) +DEBUG SysError (ErrWarn, "UchDGRAM::SendBuffer") + ; + return n; +} + +/*?hidden?*/ +void +UchDGRAM :: RemovePending (lword id) +{ + PENDING* pend = (PENDING*) pending.Get (id); + if (! pend) { + Error (ErrWarn, "Receive", "unrecognized ACK"); + return; + } + pending.Remove (id); + delete pend; + npending--; + if (npending == 0) + timer.Stop (); +} + +//---------------- the public functions + +/*?nextdoc?*/ +int +UchDGRAM :: Send (byte* buf, int len, UchAddress& to, bool ack, int retries) +{ +DEBUG if (DGTrace) printf ("UchDGRAM :: Send\n"); + Lock (); + UchMsgBuffer* obuf = PrepareToSend (to, retries); + obuf->Append (buf, len); + int n = SendBuffer (*obuf, to); + CheckInput (); + int ret = (ack || sync) ? (Wait (outId) ? n : -1) : n; + Unlock (); + return ret; +} + +/*?nextdoc?*/ +int +UchDGRAM :: Receive (byte* buf, int len) +{ +DEBUG if (DGTrace) printf ("UchDGRAM :: Receive\n"); + if (! WaitInput ()) + return -1; + PENDING* p = (PENDING*) input.RemoveFirst (); + ninput--; + int n = p->outBuf->BufLength (); + if (len < n) + n = len; + memcpy (buf, p->outBuf->Buffer (), n); + SendAck (p->id, *p->toAddr); + delete p; +DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received %d bytes\n", n); + return n; +} + +/*?nextdoc?*/ +int +UchDGRAM :: Reply (byte* buf, int len, bool ack, int retries) +{ + if (! fromAddr) + return -1; + return Send (buf, len, *fromAddr, ack, retries); +} + +/*?nextdoc?*/ +int +UchDGRAM :: Send (UchMsgBuffer& buf, UchAddress& to, bool peek, bool ack, int retries) +{ +DEBUG if (DGTrace) printf ("UchDGRAM :: Send\n"); + Lock (); + UchMsgBuffer* obuf = PrepareToSend (to, retries); + obuf->Append (buf.Buffer (), buf.BufLength ()); + if (! peek) + buf.Flush (); + int n = SendBuffer (*obuf, to); + CheckInput (); + int ret = (ack || sync) ? (Wait (outId) ? n : -1) : n; + Unlock (); + return ret; +} + +/*?nextdoc?*/ +int +UchDGRAM :: Receive (UchMsgBuffer& buf) +{ +DEBUG if (DGTrace) printf ("UchDGRAM :: Receive\n"); + if (! WaitInput ()) + return -1; + PENDING* p = (PENDING*) input.RemoveFirst (); + ninput--; + int n = p->outBuf->BufLength (); + buf.Append (p->outBuf->Buffer (), n); + SendAck (p->id, *p->toAddr); + delete p; +DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received buffer %d bytes\n", n); + return n; +} + +/*? +These functions are similar to the same functions in the class \typ{UchDatagram}, +except that they manage messages acknowledgement. +\fun{Send} saves the message in a buffer so that it can be resent if no acknowledge +is received. +\fun{Receive} acknowledges the received message. +\fun{Reply} sends the message to the sender of the last received message. +If \var{ack} is TRUE, the acknowledge of the message is waited for. +In that case, \fun{Send} returns -1 if the acknowledge is not received. +If \var{retries} is non zero, it specifies a number of retries for this message +different from the default retry number of this dgram. +?*/ +int +UchDGRAM :: Reply (UchMsgBuffer& buf, bool peek, bool ack, int retries) +{ + if (! fromAddr) + return -1; + return Send (buf, *fromAddr, peek, ack, retries); +} + +/*?nextdoc?*/ +bool +UchDGRAM :: Send (UchMessage& msg, UchAddress& to, bool ack, int retries) +{ +DEBUG if (DGTrace) printf ("UchDGRAM :: Send message\n"); + Lock (); + UchMsgBuffer* obuf = PrepareToSend (to, retries); + obuf->Append (msg); +// int l; +// printf ("outBuffer is %d bytes long\n", l = obuf->BufLength ()); +// for (int i = 0; i < l; i++) printf ("%02x ", obuf->Buffer () [i]); +// printf ("\n"); + int n = SendBuffer (*obuf, to); + CheckInput (); + bool ret; + if (ack || sync) + ret = Wait (outId) ? TRUE : FALSE; + else + ret = bool (n != obuf->BufLength ()); + Unlock (); + return ret; +} + +/*?nextdoc?*/ +bool +UchDGRAM :: Receive (UchMessage* msg) +{ +DEBUG if (DGTrace) printf ("UchDGRAM :: Receive message\n"); + if (! WaitInput ()) + return FALSE; + PENDING* p = (PENDING*) input.RemoveFirst (); + ninput--; +// int n; +// printf ("inBuffer is %d bytes long\n", n = p->outBuf->BufLength ()); +// for (int i = 0; i < n; i++) printf ("%02x ", p->outBuf->Buffer () [i]); +// printf ("\n"); + if (! p->outBuf->Get (msg)) + return FALSE; + SendAck (p->id, *p->toAddr); + delete p; +DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received message\n"); + return TRUE; +} + +/*? +These functions are similar to the previous functions except that they take +a \typ{UchMessage} as argument. +\fun{Send} converts the message in the output buffer. +If \var{ack} is TRUE, the acknowledge of the message is waited for. +In that case, \fun{Send} returns FALSE if the acknowledge is not received. +If \var{retries} is non zero, it specifies a number of retries for this message +different from the default retry number of this dgram. +\fun{Receive} converts the incoming data into the message passed as argument. +\fun{Reply} sends the message to the sender of the last received message. +See also the function \fun{NewMessage} to handle incoming messages. +?*/ +bool +UchDGRAM :: Reply (UchMessage& msg, bool ack, int retries) +{ + if (! fromAddr) + return FALSE; + return Send (msg, *fromAddr, ack, retries); +} + +//----------------------- +#if 0 +bool +UchDGRAM :: Ask (UchMessage& msg, UchAddress& to) +{ +DEBUG if (DGTrace) printf ("UchDGRAM :: Ask\n"); + Lock (); + UchMsgBuffer* obuf = PrepareToSend (to, DGRAM_ASK *******); + obuf->Append (msg); + int n = SendBuffer (*obuf, to); + CheckInput (); + bool ret = Wait (outId) ? TRUE : FALSE; + if (ret) { + // wait answer + for (;;) { + if (! WaitInput ()) + continue; + PENDING* pend = (PENDING*) input.Last (); + if (pend->toAddr == to............. + } + } + Unlock (); + return ret; +} +#endif +//----------------------- + +/*? +Resend all messages for which no acknowledge has been received. +If a UchMessage.has been resent more than the retry number, it is discarded. +When successive messages are to be sent to the same address, only the first +is resent, to avoid connection overflow. +?*/ +void +UchDGRAM :: Resend () +{ +if (DGTrace) printf ("UchDGRAM::Resend\n"); + Lock (); + if (npending) + CheckInput (); + if (! npending) { + resend = FALSE; + Unlock (); + return; + } + + UchAddress* addr = 0; + + PENDING* pend; + for (CcuIdIter iter (pending); pend = (PENDING*) iter (); ++iter) { + if (pend->retries <= 0) { + // the output buffer contains the leading id and type + UchMsgBuffer fake (*pend->outBuf); + lword id; + byte typ; + fake.Get (&id); + fake.Get (&typ); + if (! DiscardNotify (fake, *pend->toAddr)) { + pend->retries = retry; // *** should be controllable ? + continue; + } +if (DGTrace) printf ("UchDGRAM::Resend : abandonning %x\n", iter.CurId ()); + RemovePending (iter.CurId ()); + if (! npending) + break; + } else + if (addr != (UchAddress*) pend->toAddr) { +DEBUG printf ("UchDGRAM::Resend : resending %x\n", iter.CurId ()); + SendBuffer (* pend->outBuf, *pend->toAddr); + --pend->retries; + addr = pend->toAddr; + CheckInput (); + } +DEBUG else printf ("UchDGRAM::Resend : skipping %x\n", iter.CurId ()); + } + + resend = FALSE; + Unlock (); +} + +/*? +This virtual function is called whenever a pending message is about +to be discarded because no acknowledge has been received after +the the default number of retries. +If it returns FALSE, the message is not discarded and its retry count is reset to zero. +Returning FALSE is not very social. +The default action is to return TRUE, thus discarding the pending message. +?*/ +bool +UchDGRAM :: DiscardNotify (UchMsgBuffer&, UchAddress&) +{ + return TRUE; +} + +/*? +Set the interval between retries. +The default value is 1000 (1 second). +?*/ +void +UchDGRAM :: SetRetryTime (Millisecond m) +{ + timeout = m; + if (npending) { + timer.ChangePeriod (m); + timer.Restart (); + } +} + +/*? +Wait until all acknowledges have been received, +or until all messages have been discarded. +?*/ +void +UchDGRAM :: Drain () +{ + if (npending) + CheckInput (); + while (npending) + timer.Wait (); +} + +/*? +This virtual function is called by \fun{HandleRead}. +It must be redefined in a derived class if \fun{HandleRead} is to be used +(for instance if this \typ{UchDGRAM} is put in a channel set). +This function should convert the contents of the buffer into a message, and handle it. +It should return TRUE if the message was correctly handled, else FALSE. +Note that if it returns FALSE, the message will not be acknowledged, and thus it will +be resent later. +The default behaviour of this virtual function is to issue an error message and +to return TRUE. +?*/ +bool +UchDGRAM :: NewMessage (UchMsgBuffer&) +{ + Error (ErrWarn, "UchDGRAM :: NewMessage", "should be defined in derived class"); + return TRUE; +} + +/*? +This is an instance of the virtual function \fun{HandleRead} of class \typ{UchChannel}. +It calls the virtual function \fun{NewMessage} when a message is received. +If \fun{NewMessage} returns TRUE, an acknowledge is sent back. +This functions also handles incoming acknowledges. +?*/ +void +UchDGRAM :: HandleRead () +{ + if (! CheckInput ()) + return; + if (! ninput) + return; + +DEBUG if (DGTrace) printf (">>UchDGRAM :: HandleRead\n"); + PENDING* p = (PENDING*) input.RemoveFirst (); + ninput--; + if (NewMessage (* p->outBuf)) + SendAck (p->id, *p->toAddr); +DEBUG if (DGTrace) printf ("<>UchDGRAM :: HandleSelect\n"); + PENDING* p = (PENDING*) input.RemoveFirst (); + ninput--; + if (NewMessage (* p->outBuf)) + SendAck (p->id, *p->toAddr); +DEBUG if (DGTrace) printf ("<