/* * The Unix Channel * * by Michel Beaudouin-Lafon * * Copyright 1990-1993 * Laboratoire de Recherche en Informatique (LRI) * * Reliable datagrams - to be updated * * $Id$ * $CurLog$ */ #include "MsgBuffer.h" #include "dgram.h" #include "error.h" #include "Message.h" #include #include #include #include // needed only if DEBUG is active #include #define DEBUG //**// bool DGTrace = false; #if defined(sun) && (defined(__svr4__) || defined(__SVR4)) #include #endif /*?class UchDGRAM The class \typ{UchDGRAM} is derived from \typ{UchDatagram}. It adds to a datagram a simple protocol to ensure that all messages sent are received. It does not prevent message duplication, neither does he ensure that the messages are received in the order they are sent. Each message is sent along with a tag number. The sender keeps each message it sends until it receives an acknowledge for it. When a message is received, an acknowledge is sent back. Unacknowledged messages are resent at regular intervals. If a message is not acknowledged after a given number of retries, it is simply discarded. ?*/ struct PENDING { UchMsgBuffer* outBuf; pUchAddress toAddr; short retries; // for pending lword id; // for input PENDING (UchMsgBuffer* b, UchAddress* a, int r = 0) { outBuf = b; toAddr = a; retries = r; id = 0;} ~PENDING () { delete outBuf; toAddr = 0; } }; /*?hidden?*/ void UchDGRAM_TIMER :: Handle (Millisecond) { if (DGTrace) printf ("*** TimeOut ***\n"); dgram.Expired (); } /*?nodoc?*/ UchDGRAM_TIMER :: UchDGRAM_TIMER (UchDGRAM& dg) : CcuBaseTimer (0), dgram (dg) { } UchDGRAM_TIMER :: ~UchDGRAM_TIMER () { } /*?hidden?*/ void UchDGRAM :: Init () { npending = 0; ninput = 0; fromAddr = 0; retry = 5; resend = false; locked = 0; timeout = 1000; sync = false; } /*?nextdoc?*/ UchDGRAM :: UchDGRAM () : pending (2), input (), timer (*this) { Init (); } /*? Same constructors as for the class \typ{UchDatagram}; ?*/ UchDGRAM :: UchDGRAM (UchAddress* b, UchAddress* c) : UchDatagram (b, c), pending (2), input (), timer (*this) { Init (); } /*? The destructor of \typ{UchDGRAM} discards all pending and input messages. A derived class could call \fun{Drain} in its destructor to send all pending messages, and it could call \fun{Receive} while \fun{NumInput} is non null. ?*/ UchDGRAM :: ~UchDGRAM () { timer.Stop (); if (! npending) return; CcuIdIterOf iter = pending; while (++iter) { RemovePending (iter.CurId ()); if (! npending) break; } if (ninput) { CcuListIterOf li (input); while (++li) delete *li; input.Clear (); } } static const byte DGRAM_ERR = 0; static const byte DGRAM_SEND = 1; static const byte DGRAM_ACK = 2; // GetInput reads an incoming message // if its an ack, it is processed // if it is a normal message, it is stored in the input list // CheckInput calls GetInput wile there is something to read // WaitInput returns when a message is in the input list // Wait waits for the acknowledge of a particular id /*?hidden?*/ int UchDGRAM :: GetInput (int len) { UchMsgBuffer* buf = new UchMsgBuffer (len); int n; while ((n = UchDatagram :: Receive (*buf)) == -1 && errno == EINTR) DEBUG SysError (ErrWarn, "UchDGRAM::GetInput") ; if (n < 0) { SysError (ErrWarn, "UchDGRAM::GetInput"); delete buf; return DGRAM_ERR; } DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : received %d bytes\n", buf->BufLength ()); lword id; buf->ReadLong (id); byte s; buf->ReadByte (s); DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : id %x byte %d\n", id, s); if (s == DGRAM_ACK) { DEBUG if (DGTrace) printf ("UchDGRAM :: GetInput : acknowledged %x\n", id); RemovePending (id); delete buf; return DGRAM_ACK; } PENDING* p = new PENDING (buf, FAddr); p->id = id; input.Append (p); ninput++; return DGRAM_SEND; } /*?hidden?*/ bool UchDGRAM :: CheckInput () { // get any pending messages for (;;) { lword np; if (ioctl (FilDes (), FIONREAD, (char*) &np) < 0) return false; DEBUG if (DGTrace) if (np) printf ("UchDGRAM :: CheckInput : FIONREAD says %d\n", np); if (np == 0) break; if (GetInput (int (np)) == DGRAM_ERR) return false; } return true; } /*?hidden?*/ bool UchDGRAM :: WaitInput () { // check the input queue if (ninput) return true; // check pending input if (! CheckInput ()) return false; // block until something while (! ninput) if (GetInput (2048) == DGRAM_ERR) // *** arbitrary max size return false; return true; } /*?hidden?*/ bool UchDGRAM :: Wait (lword id) { DEBUG if (DGTrace) printf ("UchDGRAM::Wait %x\n", id); bool ret = false; bool loop = true; Lock (); PENDING* pend; while (loop) { if (! CheckInput ()) break; // did we receive the ack ? pend = pending.Get (id); if (! pend) { ret = true; break; } if (resend) { DEBUG if (DGTrace) printf ("UchDGRAM::Wait : resending\n"); if (pend->retries <= 0) { RemovePending (id); break; } int n = SendBuffer (* pend->outBuf, *pend->toAddr); if (n < 0) break; --pend->retries; resend = false; } else DEBUG if (DGTrace) printf ("UchDGRAM::Wait : waiting\n"), timer.Wait (); // sets resend to true } Unlock (); DEBUG if (DGTrace) printf ("UchDGRAM::Wait : %s\n", ret ? "done" : "failed"); return ret; } // SendAck sends an acknowledge // Expired is called when the resend timer has expired // PrepareToSend returns a buffer with the header set, // and adds it to the pending set // SendBuffer sends a buffer, handling interrupted sys calls // RemovePending removes a pending message from the table /*?hidden?*/ void UchDGRAM :: SendAck (lword id, UchAddress& addr) { UchMsgBuffer buf; buf.WriteLong (id); buf.WriteByte (DGRAM_ACK); SendBuffer (buf, addr); fromAddr = & addr; DEBUG if (DGTrace) printf ("UchDGRAM :: SendAck : acknowledging %x\n", id); } /*?hidden?*/ void UchDGRAM :: Expired () { if (locked) resend = true; else Resend (); } /*?hidden?*/ UchMsgBuffer* UchDGRAM :: PrepareToSend (UchAddress& to, int retries) { UchMsgBuffer* obuf = new UchMsgBuffer; /* UchInetAddress* ia = (UchInetAddress*) &to; UchAddress* toCopy = new UchInetAddress (ia->Host (), ia->Port ()); */ if (retries == 0) retries = retry; PENDING* out = new PENDING (obuf, &to /*toCopy*/, retries); outId = pending.Store (out); DEBUG if (DGTrace) printf ("UchDGRAM :: PrepareToSend : id %x\n", outId); obuf->WriteLong (outId); obuf->WriteByte (DGRAM_SEND); if (! npending) { timer.ChangePeriod (timeout); timer.Restart (); } npending++; return obuf; } /*?hidden?*/ int UchDGRAM :: SendBuffer (UchMsgBuffer& buf, UchAddress& addr) { int n; while ((n = UchDatagram :: Send (buf, addr, true)) == -1 && errno == EINTR) DEBUG SysError (ErrWarn, "UchDGRAM::SendBuffer") ; return n; } /*?hidden?*/ void UchDGRAM :: RemovePending (lword id) { PENDING* pend = pending.Get (id); if (! pend) { ::Error (ErrWarn, "Receive", "unrecognized ACK"); return; } pending.Remove (id); delete pend; npending--; if (npending == 0) timer.Stop (); } //---------------- the public functions /*?nextdoc?*/ int UchDGRAM :: Send (byte* buf, int len, UchAddress& to, bool ack, int retries) { DEBUG if (DGTrace) printf ("UchDGRAM :: Send\n"); Lock (); UchMsgBuffer* obuf = PrepareToSend (to, retries); obuf->WriteBuf (buf, len); int n = SendBuffer (*obuf, to); CheckInput (); int ret = (ack || sync) ? (Wait (outId) ? n : -1) : n; Unlock (); return ret; } /*?nextdoc?*/ int UchDGRAM :: Receive (byte* buf, int len) { DEBUG if (DGTrace) printf ("UchDGRAM :: Receive\n"); if (! WaitInput ()) return -1; PENDING* p = input.RemoveFirst (); ninput--; int n = p->outBuf->BufLength (); if (len < n) n = len; memcpy (buf, p->outBuf->Buffer (), n); SendAck (p->id, *p->toAddr); delete p; DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received %d bytes\n", n); return n; } /*?nextdoc?*/ int UchDGRAM :: Reply (byte* buf, int len, bool ack, int retries) { if (! fromAddr) return -1; return Send (buf, len, *fromAddr, ack, retries); } /*?nextdoc?*/ int UchDGRAM :: Send (UchMsgBuffer& buf, UchAddress& to, bool peek, bool ack, int retries) { DEBUG if (DGTrace) printf ("UchDGRAM :: Send\n"); Lock (); UchMsgBuffer* obuf = PrepareToSend (to, retries); obuf->WriteBuf (buf.Buffer (), buf.BufLength ()); if (! peek) buf.Flush (); int n = SendBuffer (*obuf, to); CheckInput (); int ret = (ack || sync) ? (Wait (outId) ? n : -1) : n; Unlock (); return ret; } /*?nextdoc?*/ int UchDGRAM :: Receive (UchMsgBuffer& buf) { DEBUG if (DGTrace) printf ("UchDGRAM :: Receive\n"); if (! WaitInput ()) return -1; PENDING* p = input.RemoveFirst (); ninput--; int n = p->outBuf->BufLength (); buf.WriteBuf (p->outBuf->Buffer (), n); SendAck (p->id, *p->toAddr); delete p; DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received buffer %d bytes\n", n); return n; } /*? These functions are similar to the same functions in the class \typ{UchDatagram}, except that they manage messages acknowledgement. \fun{Send} saves the message in a buffer so that it can be resent if no acknowledge is received. \fun{Receive} acknowledges the received message. \fun{Reply} sends the message to the sender of the last received message. If \var{ack} is true, the acknowledge of the message is waited for. In that case, \fun{Send} returns -1 if the acknowledge is not received. If \var{retries} is non zero, it specifies a number of retries for this message different from the default retry number of this dgram. ?*/ int UchDGRAM :: Reply (UchMsgBuffer& buf, bool peek, bool ack, int retries) { if (! fromAddr) return -1; return Send (buf, *fromAddr, peek, ack, retries); } /*?nextdoc?*/ bool UchDGRAM :: Send (UchMessage& msg, UchAddress& to, bool ack, int retries) { DEBUG if (DGTrace) printf ("UchDGRAM :: Send message\n"); Lock (); UchMsgBuffer* obuf = PrepareToSend (to, retries); obuf->WriteMsg (msg); // int l; // printf ("outBuffer is %d bytes long\n", l = obuf->BufLength ()); // for (int i = 0; i < l; i++) printf ("%02x ", obuf->Buffer () [i]); // printf ("\n"); int n = SendBuffer (*obuf, to); CheckInput (); bool ret; if (ack || sync) ret = Wait (outId) ? true : false; else ret = bool (n != obuf->BufLength ()); Unlock (); return ret; } /*?nextdoc?*/ bool UchDGRAM :: Receive (UchMessage* msg) { DEBUG if (DGTrace) printf ("UchDGRAM :: Receive message\n"); if (! WaitInput ()) return false; PENDING* p = input.RemoveFirst (); ninput--; // int n; // printf ("inBuffer is %d bytes long\n", n = p->outBuf->BufLength ()); // for (int i = 0; i < n; i++) printf ("%02x ", p->outBuf->Buffer () [i]); // printf ("\n"); if (! p->outBuf->ReadMsg (*msg)) return false; SendAck (p->id, *p->toAddr); delete p; DEBUG if (DGTrace) printf ("UchDGRAM :: Receive : received message\n"); return true; } /*? These functions are similar to the previous functions except that they take a \typ{UchMessage} as argument. \fun{Send} converts the message in the output buffer. If \var{ack} is true, the acknowledge of the message is waited for. In that case, \fun{Send} returns false if the acknowledge is not received. If \var{retries} is non zero, it specifies a number of retries for this message different from the default retry number of this dgram. \fun{Receive} converts the incoming data into the message passed as argument. \fun{Reply} sends the message to the sender of the last received message. See also the function \fun{NewMessage} to handle incoming messages. ?*/ bool UchDGRAM :: Reply (UchMessage& msg, bool ack, int retries) { if (! fromAddr) return false; return Send (msg, *fromAddr, ack, retries); } //----------------------- #if 0 bool UchDGRAM :: Ask (UchMessage& msg, UchAddress& to) { DEBUG if (DGTrace) printf ("UchDGRAM :: Ask\n"); Lock (); UchMsgBuffer* obuf = PrepareToSend (to, DGRAM_ASK *******); obuf->WriteMsg (msg); int n = SendBuffer (*obuf, to); CheckInput (); bool ret = Wait (outId) ? true : false; if (ret) { // wait answer for (;;) { if (! WaitInput ()) continue; PENDING* pend = input.Last (); if (pend->toAddr == to............. } } Unlock (); return ret; } #endif //----------------------- /*? Resend all messages for which no acknowledge has been received. If a UchMessage.has been resent more than the retry number, it is discarded. When successive messages are to be sent to the same address, only the first is resent, to avoid connection overflow. ?*/ void UchDGRAM :: Resend () { if (DGTrace) printf ("UchDGRAM::Resend\n"); Lock (); if (npending) CheckInput (); if (! npending) { resend = false; Unlock (); return; } UchAddress* addr = 0; CcuIdIterOf iter = pending; while ( ++iter) { PENDING* pend = *iter; if (pend->retries <= 0) { // the output buffer contains the leading id and type UchMsgBuffer fake (*pend->outBuf); lword id; byte typ; fake.ReadLong (id); fake.ReadByte (typ); if (! DiscardNotify (fake, *pend->toAddr)) { pend->retries = retry; // *** should be controllable ? continue; } if (DGTrace) printf ("UchDGRAM::Resend : abandonning %x\n", iter.CurId ()); RemovePending (iter.CurId ()); if (! npending) break; } else if (addr != (UchAddress*) pend->toAddr) { DEBUG printf ("UchDGRAM::Resend : resending %x\n", iter.CurId ()); SendBuffer (* pend->outBuf, *pend->toAddr); --pend->retries; addr = pend->toAddr; CheckInput (); } DEBUG else printf ("UchDGRAM::Resend : skipping %x\n", iter.CurId ()); } resend = false; Unlock (); } /*? This virtual function is called whenever a pending message is about to be discarded because no acknowledge has been received after the the default number of retries. If it returns false, the message is not discarded and its retry count is reset to zero. Returning false is not very social. The default action is to return true, thus discarding the pending message. ?*/ bool UchDGRAM :: DiscardNotify (UchMsgBuffer&, UchAddress&) { return true; } /*? Set the interval between retries. The default value is 1000 (1 second). ?*/ void UchDGRAM :: SetRetryTime (Millisecond m) { timeout = m; if (npending) { timer.ChangePeriod (m); timer.Restart (); } } /*? Wait until all acknowledges have been received, or until all messages have been discarded. ?*/ void UchDGRAM :: Drain () { if (npending) CheckInput (); while (npending) timer.Wait (); } /*? This virtual function is called by \fun{HandleRead}. It must be redefined in a derived class if \fun{HandleRead} is to be used (for instance if this \typ{UchDGRAM} is put in a channel set). This function should convert the contents of the buffer into a message, and handle it. It should return true if the message was correctly handled, else false. Note that if it returns false, the message will not be acknowledged, and thus it will be resent later. The default behaviour of this virtual function is to issue an error message and to return true. ?*/ bool UchDGRAM :: NewMessage (UchMsgBuffer&) { ::Error (ErrWarn, "UchDGRAM :: NewMessage", "should be defined in derived class"); return true; } /*? This is an instance of the virtual function \fun{HandleRead} of class \typ{UchChannel}. It calls the virtual function \fun{NewMessage} when a message is received. If \fun{NewMessage} returns true, an acknowledge is sent back. This functions also handles incoming acknowledges. ?*/ void UchDGRAM :: HandleRead () { if (! CheckInput ()) return; if (! ninput) return; DEBUG if (DGTrace) printf (">>UchDGRAM :: HandleRead\n"); PENDING* p = input.RemoveFirst (); ninput--; if (NewMessage (* p->outBuf)) SendAck (p->id, *p->toAddr); DEBUG if (DGTrace) printf ("<>UchDGRAM :: HandleSelect\n"); PENDING* p = input.RemoveFirst (); ninput--; if (NewMessage (* p->outBuf)) SendAck (p->id, *p->toAddr); DEBUG if (DGTrace) printf ("<