From ba066c34dde204aa192d03a23a81356374d93731 Mon Sep 17 00:00:00 2001 From: chatty Date: Wed, 7 Apr 1993 11:50:31 +0000 Subject: Initial revision --- comm/MsgStream.cc | 506 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 506 insertions(+) create mode 100644 comm/MsgStream.cc (limited to 'comm/MsgStream.cc') diff --git a/comm/MsgStream.cc b/comm/MsgStream.cc new file mode 100644 index 0000000..07cb5c1 --- /dev/null +++ b/comm/MsgStream.cc @@ -0,0 +1,506 @@ +/* + * The Unix Channel + * + * by Michel Beaudouin-Lafon + * + * Copyright 1990-1993 + * Laboratoire de Recherche en Informatique (LRI) + * + * UchMessage streams + * + * $Id$ + * $CurLog$ + */ + +#include "MsgStream.h" +#include "Message.h" +#include "error.h" + +// #define DEBUG + +#ifdef DEBUG +#include +#define DBG(inst) inst +#else +#define DBG(inst) +#endif + +/*?class UchMsgStream +An object of class \typ{UchMsgStream} is a stream that sends and receives messages: +we call it a message stream. + +Messages are sent with the function \fun{Send}. +Output messages are normally buffered to increase performance, but synchronous mode is available. +The output buffer can be flushed by the application, or by the stream itself if needed +(for instance when the buffer is full, or when a synchronous communication is needed). + +Incoming messages are handled by the virtual function \fun{HandleRead} of class \typ{UchChannel}: +it is redefined so that incoming bytes are packed into messages. +When a full message is available, \fun{HandleRead} calls the virtual +function \fun{NewMessage}. In the class \typ{UchMsgStream}, this virtual function does +nothing but discarding the message. + +Messages that need an answer are handled in the following way: +the sender calls the function\fun{Ask}, and is blocked until the answer is received; +any incoming messages are stored for later processing. +When the answer arrives, \fun{Ask} calls the virtual function \fun{ConvertAnswer}. +The returned value of \fun{ConvertAnswer} is then returned by \fun{Ask}, +thus returning to the application the reply to its question. + +On the receiver's side, the following happens: +when a message sent with \fun{Ask} is received, \fun{NewMessage} is called as usual, +but an argument indicates that it is a question that needs an answer. +The receiver must then use \fun{Reply} to send its answer. +Messages can be sent before replying, but it is not possible to send a question +on a message stream that is waiting for an answer: +this would result in a deadlock since the other party is already waiting for an answer +and is buffering other incoming messages. + +The default output mode is buffered (i.e. asynchronous). +The buffered output is automatically flushed when a question is sent, +or when the output buffer is full, or explicitly with \fun{Flush}, or with the second argument of \fun{Send}. +The output mode can be switched to a synchronous mode where each message is sent immediately. +% It can also be switched to a locked synchronous mode: this is a synchronous mode where +% the sender waits for each message to be processed before proceeding its execution. +% This is mainly useful for debugging purposes. + +\medskip +To use a message stream, a program needs to derive the class \typ{UchMsgStream} +in order to redefine the virtual functions \fun{NewMessage} and \fun{ConvertAnswer}. +\fun{NewMessage} treats the incoming message, and will probably call \fun{Send} and \fun{Ask}. +The top level of the program needs just call \fun{HandleRead} in a forever loop. + +Most of the time, a program will use several message streams +(for instance to manage several clients). +In this case a channel set is the best way to implement the application: +the definitions in the class \typ{UchMsgStream} of the functions +\fun{HandleRead}, \fun{HandleWrite} and \fun{HandleSelect} make it easy +to use this class in combination with the class \typ{UchMultiplexer}. +The virtual function \fun{HandeWrite} of channels is redefined to flush the output buffer. +The virtual function \fun{HandleSelect} of channels is redefined to handle the incoming messages +that were buffered while waiting for an answer. +As for the single stream situation, you need just derive the class \fun{UchMsgStream} to +redefine the virtual functions \fun{NewMessage} and \fun{ConvertAnswer}. +?*/ + +// ---- no byte swapping ... +// ---- no locked sync mode ... + +/*?nextdoc?*/ +UchMsgStream :: UchMsgStream () +: UchStream (), InBuffer (), OutBuffer (), Buffered () +{ + OutSize = 128; + State = WAITING; + BufferedMessages = FALSE; + WaitingForAnswer = FALSE; + WaitingReply = FALSE; + Sync = FALSE; +} + +/*? +These constructors are similar to those of class \typ{UchSocket}. +?*/ +UchMsgStream :: UchMsgStream (UchAddress* bindTo, UchAddress* connectTo) +: UchStream (bindTo, connectTo), InBuffer (), OutBuffer (), Buffered () +{ + OutSize = 128; + State = WAITING; + BufferedMessages = FALSE; + WaitingForAnswer = FALSE; + WaitingReply = FALSE; + Sync = FALSE; +} + +// *** this copy constructor might be automatically generated +/*?nodoc?*/ +UchMsgStream :: UchMsgStream (const UchMsgStream& ms) +: UchStream (*(UchMsgStream*)&ms), InBuffer (), OutBuffer (), Buffered (*(UchMsgBuffer*)&ms.Buffered) +{ + OutSize = ms.OutSize; + State = ms.State; + BufferedMessages = ms.BufferedMessages; + WaitingForAnswer = ms.WaitingForAnswer; + WaitingReply = ms.WaitingReply; + Sync = ms.Sync; +} + +/*?nextdoc?*/ +void +UchMsgStream :: InputBuffer (int min, int grow, int max) +{ + InBuffer.SetSizes (min, grow, max); +} + +/*? +Set the input and output buffer sizes. +Default sizes are used if these functions are not called. +?*/ +void +UchMsgStream :: OutputBuffer (int min, int grow, int max) +{ + InBuffer.SetSizes (min, grow, max); + OutSize = max; +} + +/*?nodoc?*/ +UchMsgStream :: ~UchMsgStream () +{ + Flush (); + InBuffer.Delete (); + OutBuffer.Delete (); + Buffered.Delete (); +} + +/*?nodoc?*/ +UchChannel* +UchMsgStream :: Copy () const +{ + return new UchMsgStream (*this); +} + + +// read stream into input buffer +// delete the stream when an oef is received +// return the number of bytes read +// +/*?hidden?*/ +int +UchMsgStream :: ReadInput () +{ + if (! InBuffer.BufLength ()) + InBuffer.NeedSize (128); + int n = Read (InBuffer); + if (n <= 0) { + if (n == -2) { +//printf ("ReadInput: growing\n", n); + InBuffer.Grow (); + n = Read (InBuffer); + } +//printf ("ReadInput: %d bytes\n", n); + if (n < 0) + SysError (ErrWarn, "UchMsgStream::HandleRead"); + if (n <= 0) + Delete (); + } + return n; +} + +/*? +This virtual function is called when an end of file is read, +meaning that the communication is terminated. +It is also called if an error occurs while reading. +You can check the value of the global variable \var{errno}: +if it is non zero, \fun{Delete} was called because of an error. +By default this function does nothing. +?*/ +void +UchMsgStream :: Delete () +{ + // nothing +} + +// process the input buffer +// waitAnswer indicates whether we are waiting for an answer +// this functions uses a very simple automaton: +// WAITING: nothing in the buffer +// GOT_TYPE: read the 1 byte header mark of a message +// MSG / ASK / ANS for messages, questions, answers +// SYNC / ASYNC / OK for sync management +// GOT_LENGTH: read the 4 byte header fo a message +// DONE: a full message is in the buffer +// +// WaitingReply is true if a question has been received and Reply has not been called yet +// WaitingForAnswer is true when Ask has been called and the answer is not yet there +// +/*?hidden?*/ +void +UchMsgStream :: ProcessInput (UchMsgBuffer& buf, bool waitAnswer) +{ +DBG (printf ("ProcessInput: ");) + for (;;) { + switch (State) { + case WAITING: +DBG (printf ("WAITING\n");) + buf.Get ((byte*) &InType); +DBG (if (buf.Error ()) printf (" buf empty\n");) + if (buf.Error ()) + return; + WaitingReply = FALSE; + switch (InType) { + case ASK : +DBG (printf (" waiting reply\n");) + WaitingReply = TRUE; + State = GOT_TYPE; + break; + case ANS : +DBG (printf (" answer\n");) + case MSG : +DBG (if (InType != ANS) printf (" msg\n");) +DBG (printf (" got type\n");) + State = GOT_TYPE; + break; + case SYNC : + case ASYNC : + case OK : + default : +DBG (printf (" unknown !!\n");) + State = WAITING; + } + if (State != GOT_TYPE) + break; + // fallthrough + + case GOT_TYPE: +DBG (printf ("GOT_TYPE\n");) + if (! buf.Peek ((lword*) &InLength)) + return; + buf.NeedSize ((int) InLength - buf.BufLength ()); + State = GOT_LENGTH; + // fallthrough + + case GOT_LENGTH: +DBG (printf ("GOT_LENGTH\n");) + if (buf.BufLength () < InLength) + return; + State = DONE; + // fallthrough + + case DONE: +DBG (printf ("DONE\n");) + if (waitAnswer) { + if (InType == ANS) { +DBG (printf ("got answer\n");) + WaitingForAnswer = FALSE; + // answer still in the buffer + // the answer is converted and + // the buffer is flushed in Ask + return; + } else { + // store incoming message in a separate buffer + BufferedMessages = TRUE; + Buffered.Append (InType); + Buffered.Append (buf.Buffer (), InLength); +DBG (printf ("buffering\n");) + } + } else { + if (InType == MSG || InType == ASK) { +DBG (printf ("new message\n");) + // pass a fake buffer to the handler + UchMsgBuffer fake (buf, InLength); + if (! NewMessage (fake, WaitingReply)) + return; + // *** this return breaks the assumption that + // *** ProcessInput empties the buffer. + // *** this is assumed in HandleRead/HandleSelect + // *** because BufferedMessages is reset to FALSE; + } +DBG (else printf ("discarding\n");) + } + buf.Flush (InLength); + State = WAITING; + // fallthrough + } + } +} + +/*?nodoc?*/ +void +UchMsgStream :: HandleRead () +{ + if (BufferedMessages) { + ProcessInput (Buffered, FALSE); + BufferedMessages = FALSE; + } + int n = ReadInput (); + if (n <= 0) + return; + ProcessInput (InBuffer, FALSE); +} + +/*?nodoc?*/ +void +UchMsgStream :: HandleWrite () +{ + Flush (); +} + +/*?nodoc?*/ +bool +UchMsgStream :: HandleSelect () +{ + if (BufferedMessages) { + ProcessInput (Buffered, FALSE); + BufferedMessages = FALSE; + } + return FALSE; +} + + +/*? +This virtual function is called whenever a complete message is in the buffer. +The buffer contains exactly one message, so that you can use \com{buf.Get(&msg)} +to extract the message from the buffer. +\var{ask} is TRUE if the message was sent with \fun{Ask}. +In this case an answer must be sent back with \fun{Reply}. +Messages can be sent before replying; they will be buffered by the receiver for later processing; +thus, you cannot use \fun{Ask} before replying. +\fun{NewMessage} function must return TRUE if it handled the message, else FALSE. +If it returns FALSE, it will be called again with the same arguments next time data arrives on this channel. +In the class \typ{UchMsgStream} this function does nothing and returns TRUE; +if \var{ask} is TRUE, it replies immediately with an empty message. +?*/ +bool +UchMsgStream :: NewMessage (UchMsgBuffer&, bool ask) +{ + if (ask) { + UchMessage dummy; + + Reply (dummy); + } + return TRUE; +} + +/*? +Send a message. +If \var{flush} is TRUE, the output buffer will be flushed. +This also happens when the message stream is in synchronous mode, +or if the output UchMsgBuffer.has exceeded its flush size (see \fun{FlushSize}). +?*/ +void +UchMsgStream :: Send (UchMessage& msg, bool flush) +{ + OutBuffer.Append ((byte) MSG); + OutBuffer.Append (msg); + if (flush || Sync || OutBuffer.BufLength () >= OutSize) + Flush (); +} + +/*? +Send a message and wait for an answer. +Incoming messages that are received while waiting for the answer are kept for later processing. +The answer message is returned by calling the virtual function \fun{ConvertAnswer}. +?*/ +void* +UchMsgStream :: Ask (UchMessage& msg) +{ + if (WaitingReply) { + Error (ErrWarn, "UchMsgStream::Ask", "cannot ask before replying"); + return 0; + } + OutBuffer.Append ((byte) ASK); + OutBuffer.Append (msg); + Flush (); + WaitingForAnswer = TRUE; + + do { +#ifdef DEBUG + printf ("waiting for answer\n"); +#endif + int n = ReadInput (); +#ifdef DEBUG + printf (" got %d bytes\n", n); +#endif + if (n <= 0) + return 0; + ProcessInput (InBuffer, TRUE); + } while (WaitingForAnswer); + + // ProcessInput did not flush so that we can convert the answer + // *** If ProcessInput would return a void*, we could do all this stuff in it ... + UchMsgBuffer fake (InBuffer, InLength); + void* res = ConvertAnswer (fake); + InBuffer.Flush (InLength); + State = WAITING; + return res; +} + +/*? +This function must be used instead of \fun{Send} to send a reply to a message sent by \fun{Ask}. +?*/ +void +UchMsgStream :: Reply (UchMessage& msg) +{ + if (! WaitingReply) { + Error (ErrWarn, "UchMsgStream::Reply", "out of phase reply discarded"); + return; + } + OutBuffer.Append ((byte) ANS); + OutBuffer.Append (msg); + Flush (); + WaitingReply = FALSE; +} + + +/*? +This function is called by \fun{Ask} when the answer is in the buffer, +in order to convert it into an object usable by the application. +?*/ +void* +UchMsgStream :: ConvertAnswer (UchMsgBuffer&) +{ + return 0; +} + +/*? +Flush the output buffer. +This function is called automatically when the buffer exceeds its flush size, +or after each message when this stream is in synchronous mode. +It is also called from \fun{Ask} to wait for the answer. +?*/ +void +UchMsgStream :: Flush () +{ + if (! OutBuffer.BufLength ()) + return; + int n = Write (OutBuffer); + OutBuffer.Flush (n); +} + +/*? +Send a buffer containing a message. +If \var{flush} is TRUE, the output buffer will be flushed. +This also happens when the message stream is in synchronous mode, +or if the output UchMsgBuffer.has exceeded its flush size (see \fun{FlushSize}). +The buffer {\em must} contain a converted message. +This can be used for instance from inside \fun{NewMessage} to resend +the incoming message to another client, without having to convert +the buffer to a message. +?*/ +void +UchMsgStream :: Send (UchMsgBuffer& buf, bool flush) +{ + OutBuffer.Append ((byte) MSG); + OutBuffer.Append (buf.Buffer (), buf.BufLength ()); + if (flush || Sync || OutBuffer.BufLength () >= OutSize) + Flush (); +} + +#ifdef DOC +/*? +This function defines the size of the output buffer that triggers automatic flushing +in asynchronous mode. By default the flush size is the maximum size of the +output buffer. As a consequence, it is changed by \fun{OutBuffer}. +?*/ +void +UchMsgStream :: FlushSize (int n) +{ } + +/*?nextdoc?*/ +void +UchMsgStream :: SetSyncMode (bool s) +{ } + +/*? +A message stream can be in synchronous or asynchronous mode. +In asynchronous mode output is buffered while in synchronous mode it is not. +Synchronous mode is usually less efficient than asynchronous mode +because it makes more system calls to transfer data; +however synchronous mode can be useful for debugging applications. +?*/ +bool +UchMsgStream :: GetSyncMode () +{ } + +#endif /* DOC */ + -- cgit v1.1