From 66bbcfce2bb9265505f2046ea89a1b33a4a1c212 Mon Sep 17 00:00:00 2001 From: chatty Date: Tue, 10 May 1994 09:56:22 +0000 Subject: replaced TRUE/FALSE by true/false Split into classes BufStream and MsgStream --- comm/MsgStream.cc | 460 ++++++++++++++++++++++++++++++++++-------------------- comm/MsgStream.h | 69 +++++--- 2 files changed, 334 insertions(+), 195 deletions(-) (limited to 'comm') diff --git a/comm/MsgStream.cc b/comm/MsgStream.cc index 5f27e04..ff552cf 100644 --- a/comm/MsgStream.cc +++ b/comm/MsgStream.cc @@ -6,7 +6,7 @@ * Copyright 1990-1993 * Laboratoire de Recherche en Informatique (LRI) * - * UchMessage streams + * Message streams * * $Id$ * $CurLog$ @@ -16,14 +16,196 @@ #include "Message.h" #include "error.h" -// #define DEBUG -#ifdef DEBUG -#include -#define DBG(inst) inst -#else -#define DBG(inst) -#endif +/*? +These constructors are similar to those of class \typ{UchSocket}. +?*/ +UchBufStream :: UchBufStream (UchAddress* bindTo, UchAddress* connectTo) +: UchStream (bindTo, connectTo), + InBuffer (), + OutBuffer (), + OutSize (128), + Sync (false) +{ +} + +// *** this copy constructor might be automatically generated +/*?nodoc?*/ +UchBufStream :: UchBufStream (const UchBufStream& ms) +: UchStream (ms), + InBuffer (), + OutBuffer (), + OutSize (ms.OutSize), + Sync (ms.Sync) +{ +} + +/*?nodoc?*/ +UchBufStream :: ~UchBufStream () +{ + Flush (); + InBuffer.Clear (); + OutBuffer.Clear (); +} + +/*?nextdoc?*/ +void +UchBufStream :: InputBuffer (int min, int grow, int max) +{ + InBuffer.SetSizes (min, grow, max); +} + +/*? +Set the input and output buffer sizes. +Default sizes are used if these functions are not called. +?*/ +void +UchBufStream :: OutputBuffer (int min, int grow, int max) +{ + OutBuffer.SetSizes (min, grow, max); + OutSize = max; +} + +/*? +Flush the output buffer. +This function is called automatically when the buffer exceeds its flush size, +or after each message when this stream is in synchronous mode. +It is also called from \fun{Ask} to wait for the answer. +?*/ +void +UchBufStream :: Flush () +{ + if (OutBuffer.BufLength () == 0) + return; + int n = Write (OutBuffer); + OutBuffer.Flush (n); +} + +/*?nodoc?*/ +void +UchBufStream :: HandleWrite () +{ + Flush (); +} + + +// read stream into input buffer +// delete the stream when an eof is received +// return the number of bytes read +// +/*?hidden?*/ +int +UchBufStream :: ReadInput () +{ + if (InBuffer.BufLength () == 0) + InBuffer.NeedSize (128); + int n = Read (InBuffer); + if (n <= 0) { + if (n == -2) { + InBuffer.Grow (); + n = Read (InBuffer); + } + if (n < 0) + SysError (ErrWarn, "UchBufStream::ReadInput"); + if (n <= 0) + Closing (n < 0 ? false : true); + } + return n; +} + +/*? +This virtual function is called when an end of file is read, +meaning that the communication is terminated. +It is also called if an error occurs while reading. +You can check the value of the global variable \var{errno}: +if it is non zero, \fun{Closing} was called because of an error. % wrong +By default this function does nothing. +?*/ +void +UchBufStream :: Closing (bool) +{ +} + + +void +UchBufStream :: WriteLong (lword l) +{ + OutBuffer << l; +} + +void +UchBufStream :: WriteShort (sword s) +{ + OutBuffer << s; +} + +void +UchBufStream :: WriteByte (byte b) +{ + OutBuffer << b; +} + +void +UchBufStream :: WriteChar (char c) +{ + OutBuffer << c; +} + +void +UchBufStream :: WriteString (const char* s) +{ + OutBuffer << s; +} + +void +UchBufStream :: WriteBuf (const byte* b, int n) +{ + OutBuffer.WriteBuf (b, n); +} + +void +UchBufStream :: ReadLong (lword& l) +{ + InBuffer >> l; +} + +void +UchBufStream :: ReadShort (sword& s) +{ + InBuffer >> s; +} + +void +UchBufStream :: ReadByte (byte& b) +{ + InBuffer >> b; +} + +void +UchBufStream :: ReadChar (char& c) +{ + InBuffer >> c; +} + +void +UchBufStream :: ReadString (char* s) +{ + InBuffer >> s; +} + +void +UchBufStream :: ReadString (CcuString& s) +{ + InBuffer >> s; +} + +void +UchBufStream :: ReadBuf (byte* b, int n) +{ + InBuffer.Get (b, n); +} + + /*?class UchMsgStream An object of class \typ{UchMsgStream} is a stream that sends and receives messages: @@ -90,53 +272,28 @@ redefine the virtual functions \fun{NewMessage} and \fun{ConvertAnswer}. These constructors are similar to those of class \typ{UchSocket}. ?*/ UchMsgStream :: UchMsgStream (UchAddress* bindTo, UchAddress* connectTo) -: UchStream (bindTo, connectTo), InBuffer (), OutBuffer (), Buffered () +: UchBufStream (bindTo, connectTo), + Buffered () { - OutSize = 128; State = WAITING; - BufferedMessages = FALSE; - WaitingForAnswer = FALSE; - WaitingReply = FALSE; - Sync = FALSE; + BufferedMessages = false; + WaitingReply = false; } // *** this copy constructor might be automatically generated /*?nodoc?*/ UchMsgStream :: UchMsgStream (const UchMsgStream& ms) -: UchStream (*(UchMsgStream*)&ms), InBuffer (), OutBuffer (), Buffered (*(UchMsgBuffer*)&ms.Buffered) +: UchBufStream (ms), Buffered (*(UchMsgBuffer*)&ms.Buffered) { - OutSize = ms.OutSize; State = ms.State; BufferedMessages = ms.BufferedMessages; - WaitingForAnswer = ms.WaitingForAnswer; WaitingReply = ms.WaitingReply; - Sync = ms.Sync; } -/*?nextdoc?*/ -void -UchMsgStream :: InputBuffer (int min, int grow, int max) -{ - InBuffer.SetSizes (min, grow, max); -} - -/*? -Set the input and output buffer sizes. -Default sizes are used if these functions are not called. -?*/ -void -UchMsgStream :: OutputBuffer (int min, int grow, int max) -{ - OutBuffer.SetSizes (min, grow, max); - OutSize = max; -} /*?nodoc?*/ UchMsgStream :: ~UchMsgStream () { - Flush (); - InBuffer.Clear (); - OutBuffer.Clear (); Buffered.Clear (); } @@ -148,46 +305,6 @@ UchMsgStream :: Copy () const } -// read stream into input buffer -// delete the stream when an oef is received -// return the number of bytes read -// -/*?hidden?*/ -int -UchMsgStream :: ReadInput () -{ - if (! InBuffer.BufLength ()) - InBuffer.NeedSize (128); - int n = Read (InBuffer); - if (n <= 0) { - if (n == -2) { -//printf ("ReadInput: growing\n", n); - InBuffer.Grow (); - n = Read (InBuffer); - } -//printf ("ReadInput: %d bytes\n", n); - if (n < 0) - SysError (ErrWarn, "UchMsgStream::HandleRead"); - if (n <= 0) - Delete (); - } - return n; -} - -/*? -This virtual function is called when an end of file is read, -meaning that the communication is terminated. -It is also called if an error occurs while reading. -You can check the value of the global variable \var{errno}: -if it is non zero, \fun{Delete} was called because of an error. -By default this function does nothing. -?*/ -void -UchMsgStream :: Delete () -{ - // nothing -} - // process the input buffer // waitAnswer indicates whether we are waiting for an answer // this functions uses a very simple automaton: @@ -199,32 +316,32 @@ UchMsgStream :: Delete () // DONE: a full message is in the buffer // // WaitingReply is true if a question has been received and Reply has not been called yet -// WaitingForAnswer is true when Ask has been called and the answer is not yet there // /*?hidden?*/ -void -UchMsgStream :: ProcessInput (UchMsgBuffer& buf, bool waitAnswer) +/*! +Return a message when it's been fully read. +!*/ +UchMessage* +UchMsgStream :: Process (UchMsgBuffer& buf, bool waitAnswer) { for (;;) { switch (State) { case WAITING: - buf.Get (InType); + buf.ReadByte (InType); if (buf.Error ()) - return; - WaitingReply = FALSE; + return 0; + WaitingReply = false; switch (InType) { - case ASK : - WaitingReply = TRUE; - State = GOT_TYPE; - break; - case ANS : - case MSG : + case ASK: + WaitingReply = true; + case ANS: + case MSG: State = GOT_TYPE; break; - case SYNC : - case ASYNC : - case OK : - default : + case SYNC: + case ASYNC: + case OK: + default: State = WAITING; } if (State != GOT_TYPE) @@ -233,41 +350,41 @@ UchMsgStream :: ProcessInput (UchMsgBuffer& buf, bool waitAnswer) case GOT_TYPE: if (! buf.Peek ((lword*) &InLength)) - return; + return 0; buf.NeedSize ((int) InLength - buf.BufLength ()); State = GOT_LENGTH; // fallthrough case GOT_LENGTH: if (buf.BufLength () < InLength) - return; + return 0; State = DONE; // fallthrough case DONE: if (waitAnswer) { if (InType == ANS) { - WaitingForAnswer = FALSE; - // answer still in the buffer - // the answer is converted and - // the buffer is flushed in Ask - return; + UchMsgBuffer fake (buf, InLength); + UchMessage* ans = ConvertAnswer (fake); + buf.Flush (InLength); + State = WAITING; + return ans; } else { // store incoming message in a separate buffer - BufferedMessages = TRUE; - Buffered.Append (InType); - Buffered.Append (buf.Buffer (), InLength); + BufferedMessages = true; + Buffered.WriteByte (InType); + Buffered.WriteBuf (buf.Buffer (), InLength); } } else { if (InType == MSG || InType == ASK) { // pass a fake buffer to the handler UchMsgBuffer fake (buf, InLength); if (! NewMessage (fake, WaitingReply)) - return; + return 0; // *** this return breaks the assumption that - // *** ProcessInput empties the buffer. + // *** Process empties the buffer. // *** this is assumed in HandleRead/HandleSelect - // *** because BufferedMessages is reset to FALSE; + // *** because BufferedMessages is reset to false; } } buf.Flush (InLength); @@ -282,20 +399,13 @@ void UchMsgStream :: HandleRead () { if (BufferedMessages) { - ProcessInput (Buffered, FALSE); - BufferedMessages = FALSE; + Process (Buffered, false); + BufferedMessages = false; } - int n = ReadInput (); - if (n <= 0) + if (ReadInput () <= 0) return; - ProcessInput (InBuffer, FALSE); -} -/*?nodoc?*/ -void -UchMsgStream :: HandleWrite () -{ - Flush (); + Process (InBuffer, false); } /*?nodoc?*/ @@ -303,52 +413,78 @@ bool UchMsgStream :: HandleSelect () { if (BufferedMessages) { - ProcessInput (Buffered, FALSE); - BufferedMessages = FALSE; + Process (Buffered, false); + BufferedMessages = false; } - return FALSE; + return false; } /*? This virtual function is called whenever a complete message is in the buffer. -The buffer contains exactly one message, so that you can use \com{buf.Get (msg)} +The buffer contains exactly one message, so that you can use \com{buf.ReadMsg (msg)} to extract the message from the buffer. -\var{ask} is TRUE if the message was sent with \fun{Ask}. +\var{ask} is true if the message was sent with \fun{Ask}. In this case an answer must be sent back with \fun{Reply}. -Messages can be sent before replying; they will be buffered by the receiver for later processing; -thus, you cannot use \fun{Ask} before replying. -\fun{NewMessage} function must return TRUE if it handled the message, else FALSE. -If it returns FALSE, it will be called again with the same arguments next time data arrives on this channel. -In the class \typ{UchMsgStream} this function does nothing and returns TRUE; -if \var{ask} is TRUE, it replies immediately with an empty message. +Messages can be sent before replying; they will be buffered by the receiver for later processing. +But you cannot use \fun{Ask} before replying. +\fun{NewMessage} function must return true if it handled the message, else false. +If it returns false, it will be called again with the same arguments next time data arrives on this channel. +In the class \typ{UchMsgStream} this function does nothing and returns true; +if \var{ask} is true, it replies immediately with an empty message. ?*/ bool UchMsgStream :: NewMessage (UchMsgBuffer&, bool ask) { if (ask) { - UchMessage dummy; - + UchMessage dummy; Reply (dummy); } - return TRUE; + return true; } /*? Send a message. -If \var{flush} is TRUE, the output buffer will be flushed. +If \var{flush} is true, the output buffer will be flushed. This also happens when the message stream is in synchronous mode, -or if the output UchMsgBuffer.has exceeded its flush size (see \fun{FlushSize}). +or if the output buffer has exceeded its flush size (see \fun{FlushSize}). ?*/ void UchMsgStream :: Send (UchMessage& msg, bool flush) { - OutBuffer.Append ((byte) MSG); - OutBuffer.Append (msg); + OutBuffer.WriteByte (MSG); + WriteMsg (msg); if (flush || Sync || OutBuffer.BufLength () >= OutSize) Flush (); } +void +UchMsgStream :: WriteMsg (UchMessage& msg) +{ + OutBuffer.WriteMsg (msg); +} + +bool +UchMsgStream :: ReadMsg (UchMessage& msg) +{ + UchMsgBuffer& buf = BufferedMessages ? Buffered : InBuffer; + int l = buf.BufLength (); // store current offset in the buffer + lword msglen; + buf >> msglen; + if (l < msglen) + return false; + msg.ReadFrom (*this, msglen); + /* skip end of msg if necessary */ + int rl = l - buf.BufLength (); + if (rl == msglen) + return true; + else if (rl < msglen) { + buf.Flush ((int)(msglen) - rl); + return true; + } else + return false; +} + /*? Send a message and wait for an answer. Incoming messages that are received while waiting for the answer are kept for later processing. @@ -358,28 +494,21 @@ UchMessage* UchMsgStream :: Ask (UchMessage& msg) { if (WaitingReply) { - Error (ErrWarn, "UchMsgStream::Ask", "cannot ask before replying"); + ::Error (ErrWarn, "UchMsgStream::Ask", "cannot ask before replying"); return 0; } - OutBuffer.Append ((byte) ASK); - OutBuffer.Append (msg); + OutBuffer.WriteByte (ASK); + WriteMsg (msg); Flush (); - WaitingForAnswer = TRUE; + UchMessage* ans = 0; do { - int n = ReadInput (); - if (n <= 0) + if (ReadInput () <= 0) return 0; - ProcessInput (InBuffer, TRUE); - } while (WaitingForAnswer); + ans = Process (InBuffer, true); + } while (!ans); - // ProcessInput did not flush so that we can convert the answer - // *** If ProcessInput would return a void*, we could do all this stuff in it ... - UchMsgBuffer fake (InBuffer, InLength); - UchMessage* res = ConvertAnswer (fake); - InBuffer.Flush (InLength); - State = WAITING; - return res; + return ans; } /*? @@ -389,13 +518,13 @@ void UchMsgStream :: Reply (UchMessage& msg) { if (! WaitingReply) { - Error (ErrWarn, "UchMsgStream::Reply", "out of phase reply discarded"); + ::Error (ErrWarn, "UchMsgStream::Reply", "out of phase reply discarded"); return; } - OutBuffer.Append ((byte) ANS); - OutBuffer.Append (msg); + OutBuffer.WriteByte ((byte) ANS); + WriteMsg (msg); Flush (); - WaitingReply = FALSE; + WaitingReply = false; } @@ -410,23 +539,8 @@ UchMsgStream :: ConvertAnswer (UchMsgBuffer&) } /*? -Flush the output buffer. -This function is called automatically when the buffer exceeds its flush size, -or after each message when this stream is in synchronous mode. -It is also called from \fun{Ask} to wait for the answer. -?*/ -void -UchMsgStream :: Flush () -{ - if (! OutBuffer.BufLength ()) - return; - int n = Write (OutBuffer); - OutBuffer.Flush (n); -} - -/*? Send a buffer containing a message. -If \var{flush} is TRUE, the output buffer will be flushed. +If \var{flush} is true, the output buffer will be flushed. This also happens when the message stream is in synchronous mode, or if the output UchMsgBuffer.has exceeded its flush size (see \fun{FlushSize}). The buffer {\em must} contain a converted message. @@ -437,8 +551,8 @@ the buffer to a message. void UchMsgStream :: Send (UchMsgBuffer& buf, bool flush) { - OutBuffer.Append ((byte) MSG); - OutBuffer.Append (buf.Buffer (), buf.BufLength ()); + OutBuffer.WriteByte ((byte) MSG); + OutBuffer.WriteBuf (buf.Buffer (), buf.BufLength ()); if (flush || Sync || OutBuffer.BufLength () >= OutSize) Flush (); } diff --git a/comm/MsgStream.h b/comm/MsgStream.h index bfa8e78..b33437a 100644 --- a/comm/MsgStream.h +++ b/comm/MsgStream.h @@ -20,51 +20,76 @@ #include "MsgBuffer.h" class UchMessage; -class UchMsgStream : public UchStream { -private: - +class UchBufStream : public UchStream { protected: - UchMsgBuffer InBuffer; - int InSize; - UchMsgBuffer OutBuffer; + UchMsgBuffer InBuffer; + UchMsgBuffer OutBuffer; int OutSize; + bool Sync; + + UchBufStream (const UchBufStream&); + int ReadInput (); + void HandleWrite (); + + void WriteLong (lword); + void WriteShort (sword); + void WriteByte (byte); + void WriteChar (char); + void WriteString (const char*); + void WriteBuf (const byte*, int); + + + void ReadLong (lword&); + void ReadShort (sword&); + void ReadByte (byte&); + void ReadChar (char&); + void ReadString (char*); + void ReadString (CcuString&); + void ReadBuf (byte*, int); + +public: + UchBufStream (UchAddress* = 0, UchAddress* = 0); + ~UchBufStream (); + + void InputBuffer (int min, int grow, int max); + void OutputBuffer (int min, int grow, int max); +inline bool GetSyncMode () { return Sync; } +inline void SetSyncMode (bool s) { Sync = s; Flush (); } +inline void FlushSize (int n) { OutSize = n; } +virtual void Flush (); + +virtual void Closing (bool); +}; + +class UchMsgStream : public UchBufStream { +protected: enum STATE { WAITING, GOT_TYPE, GOT_LENGTH, DONE}; enum TYPE { MSG = 1, ASK, ANS, SYNC, ASYNC, OK }; STATE State; bool BufferedMessages; UchMsgBuffer Buffered; - bool WaitingForAnswer; bool WaitingReply; int InLength; byte InType; - bool Sync; - void ProcessInput (UchMsgBuffer&, bool); - int ReadInput (); + UchMessage* Process (UchMsgBuffer&, bool); + UchMsgStream (const UchMsgStream&); + void WriteMsg (UchMessage&); + bool ReadMsg (UchMessage&); public: - UchMsgStream (const UchMsgStream&); UchMsgStream (UchAddress* = 0, UchAddress* = 0); ~UchMsgStream (); - void InputBuffer (int min, int grow, int max); - void OutputBuffer (int min, int grow, int max); -inline void FlushSize (int n) { OutSize = n; } UchChannel* Copy () const; void HandleRead (); - void HandleWrite (); bool HandleSelect (); -inline bool GetSyncMode () { return Sync; } -inline void SetSyncMode (bool s) { Sync = s; Flush (); } virtual bool NewMessage (UchMsgBuffer&, bool); virtual UchMessage* ConvertAnswer (UchMsgBuffer&); -virtual void Delete (); - void Send (UchMessage&, bool = FALSE); + void Send (UchMessage&, bool = false); UchMessage* Ask (UchMessage&); void Reply (UchMessage&); - void Flush (); - void Send (UchMsgBuffer&, bool = FALSE); + void Send (UchMsgBuffer&, bool = false); }; #endif /* MsgStream_H_ */ - -- cgit v1.1