summaryrefslogtreecommitdiff
path: root/comm/MsgStream.cc
diff options
context:
space:
mode:
authorchatty1993-04-07 11:50:31 +0000
committerchatty1993-04-07 11:50:31 +0000
commitba066c34dde204aa192d03a23a81356374d93731 (patch)
tree39391f6235d2cf8a59a0634ac5ea430cdd21f5d4 /comm/MsgStream.cc
parent05ab076e1c2a9ca16472f9a6b47b8d22914b3783 (diff)
downloadivy-league-ba066c34dde204aa192d03a23a81356374d93731.zip
ivy-league-ba066c34dde204aa192d03a23a81356374d93731.tar.gz
ivy-league-ba066c34dde204aa192d03a23a81356374d93731.tar.bz2
ivy-league-ba066c34dde204aa192d03a23a81356374d93731.tar.xz
Initial revision
Diffstat (limited to 'comm/MsgStream.cc')
-rw-r--r--comm/MsgStream.cc506
1 files changed, 506 insertions, 0 deletions
diff --git a/comm/MsgStream.cc b/comm/MsgStream.cc
new file mode 100644
index 0000000..07cb5c1
--- /dev/null
+++ b/comm/MsgStream.cc
@@ -0,0 +1,506 @@
+/*
+ * The Unix Channel
+ *
+ * by Michel Beaudouin-Lafon
+ *
+ * Copyright 1990-1993
+ * Laboratoire de Recherche en Informatique (LRI)
+ *
+ * UchMessage streams
+ *
+ * $Id$
+ * $CurLog$
+ */
+
+#include "MsgStream.h"
+#include "Message.h"
+#include "error.h"
+
+// #define DEBUG
+
+#ifdef DEBUG
+#include <stdio.h>
+#define DBG(inst) inst
+#else
+#define DBG(inst)
+#endif
+
+/*?class UchMsgStream
+An object of class \typ{UchMsgStream} is a stream that sends and receives messages:
+we call it a message stream.
+
+Messages are sent with the function \fun{Send}.
+Output messages are normally buffered to increase performance, but synchronous mode is available.
+The output buffer can be flushed by the application, or by the stream itself if needed
+(for instance when the buffer is full, or when a synchronous communication is needed).
+
+Incoming messages are handled by the virtual function \fun{HandleRead} of class \typ{UchChannel}:
+it is redefined so that incoming bytes are packed into messages.
+When a full message is available, \fun{HandleRead} calls the virtual
+function \fun{NewMessage}. In the class \typ{UchMsgStream}, this virtual function does
+nothing but discarding the message.
+
+Messages that need an answer are handled in the following way:
+the sender calls the function\fun{Ask}, and is blocked until the answer is received;
+any incoming messages are stored for later processing.
+When the answer arrives, \fun{Ask} calls the virtual function \fun{ConvertAnswer}.
+The returned value of \fun{ConvertAnswer} is then returned by \fun{Ask},
+thus returning to the application the reply to its question.
+
+On the receiver's side, the following happens:
+when a message sent with \fun{Ask} is received, \fun{NewMessage} is called as usual,
+but an argument indicates that it is a question that needs an answer.
+The receiver must then use \fun{Reply} to send its answer.
+Messages can be sent before replying, but it is not possible to send a question
+on a message stream that is waiting for an answer:
+this would result in a deadlock since the other party is already waiting for an answer
+and is buffering other incoming messages.
+
+The default output mode is buffered (i.e. asynchronous).
+The buffered output is automatically flushed when a question is sent,
+or when the output buffer is full, or explicitly with \fun{Flush}, or with the second argument of \fun{Send}.
+The output mode can be switched to a synchronous mode where each message is sent immediately.
+% It can also be switched to a locked synchronous mode: this is a synchronous mode where
+% the sender waits for each message to be processed before proceeding its execution.
+% This is mainly useful for debugging purposes.
+
+\medskip
+To use a message stream, a program needs to derive the class \typ{UchMsgStream}
+in order to redefine the virtual functions \fun{NewMessage} and \fun{ConvertAnswer}.
+\fun{NewMessage} treats the incoming message, and will probably call \fun{Send} and \fun{Ask}.
+The top level of the program needs just call \fun{HandleRead} in a forever loop.
+
+Most of the time, a program will use several message streams
+(for instance to manage several clients).
+In this case a channel set is the best way to implement the application:
+the definitions in the class \typ{UchMsgStream} of the functions
+\fun{HandleRead}, \fun{HandleWrite} and \fun{HandleSelect} make it easy
+to use this class in combination with the class \typ{UchMultiplexer}.
+The virtual function \fun{HandeWrite} of channels is redefined to flush the output buffer.
+The virtual function \fun{HandleSelect} of channels is redefined to handle the incoming messages
+that were buffered while waiting for an answer.
+As for the single stream situation, you need just derive the class \fun{UchMsgStream} to
+redefine the virtual functions \fun{NewMessage} and \fun{ConvertAnswer}.
+?*/
+
+// ---- no byte swapping ...
+// ---- no locked sync mode ...
+
+/*?nextdoc?*/
+UchMsgStream :: UchMsgStream ()
+: UchStream (), InBuffer (), OutBuffer (), Buffered ()
+{
+ OutSize = 128;
+ State = WAITING;
+ BufferedMessages = FALSE;
+ WaitingForAnswer = FALSE;
+ WaitingReply = FALSE;
+ Sync = FALSE;
+}
+
+/*?
+These constructors are similar to those of class \typ{UchSocket}.
+?*/
+UchMsgStream :: UchMsgStream (UchAddress* bindTo, UchAddress* connectTo)
+: UchStream (bindTo, connectTo), InBuffer (), OutBuffer (), Buffered ()
+{
+ OutSize = 128;
+ State = WAITING;
+ BufferedMessages = FALSE;
+ WaitingForAnswer = FALSE;
+ WaitingReply = FALSE;
+ Sync = FALSE;
+}
+
+// *** this copy constructor might be automatically generated
+/*?nodoc?*/
+UchMsgStream :: UchMsgStream (const UchMsgStream& ms)
+: UchStream (*(UchMsgStream*)&ms), InBuffer (), OutBuffer (), Buffered (*(UchMsgBuffer*)&ms.Buffered)
+{
+ OutSize = ms.OutSize;
+ State = ms.State;
+ BufferedMessages = ms.BufferedMessages;
+ WaitingForAnswer = ms.WaitingForAnswer;
+ WaitingReply = ms.WaitingReply;
+ Sync = ms.Sync;
+}
+
+/*?nextdoc?*/
+void
+UchMsgStream :: InputBuffer (int min, int grow, int max)
+{
+ InBuffer.SetSizes (min, grow, max);
+}
+
+/*?
+Set the input and output buffer sizes.
+Default sizes are used if these functions are not called.
+?*/
+void
+UchMsgStream :: OutputBuffer (int min, int grow, int max)
+{
+ InBuffer.SetSizes (min, grow, max);
+ OutSize = max;
+}
+
+/*?nodoc?*/
+UchMsgStream :: ~UchMsgStream ()
+{
+ Flush ();
+ InBuffer.Delete ();
+ OutBuffer.Delete ();
+ Buffered.Delete ();
+}
+
+/*?nodoc?*/
+UchChannel*
+UchMsgStream :: Copy () const
+{
+ return new UchMsgStream (*this);
+}
+
+
+// read stream into input buffer
+// delete the stream when an oef is received
+// return the number of bytes read
+//
+/*?hidden?*/
+int
+UchMsgStream :: ReadInput ()
+{
+ if (! InBuffer.BufLength ())
+ InBuffer.NeedSize (128);
+ int n = Read (InBuffer);
+ if (n <= 0) {
+ if (n == -2) {
+//printf ("ReadInput: growing\n", n);
+ InBuffer.Grow ();
+ n = Read (InBuffer);
+ }
+//printf ("ReadInput: %d bytes\n", n);
+ if (n < 0)
+ SysError (ErrWarn, "UchMsgStream::HandleRead");
+ if (n <= 0)
+ Delete ();
+ }
+ return n;
+}
+
+/*?
+This virtual function is called when an end of file is read,
+meaning that the communication is terminated.
+It is also called if an error occurs while reading.
+You can check the value of the global variable \var{errno}:
+if it is non zero, \fun{Delete} was called because of an error.
+By default this function does nothing.
+?*/
+void
+UchMsgStream :: Delete ()
+{
+ // nothing
+}
+
+// process the input buffer
+// waitAnswer indicates whether we are waiting for an answer
+// this functions uses a very simple automaton:
+// WAITING: nothing in the buffer
+// GOT_TYPE: read the 1 byte header mark of a message
+// MSG / ASK / ANS for messages, questions, answers
+// SYNC / ASYNC / OK for sync management
+// GOT_LENGTH: read the 4 byte header fo a message
+// DONE: a full message is in the buffer
+//
+// WaitingReply is true if a question has been received and Reply has not been called yet
+// WaitingForAnswer is true when Ask has been called and the answer is not yet there
+//
+/*?hidden?*/
+void
+UchMsgStream :: ProcessInput (UchMsgBuffer& buf, bool waitAnswer)
+{
+DBG (printf ("ProcessInput: ");)
+ for (;;) {
+ switch (State) {
+ case WAITING:
+DBG (printf ("WAITING\n");)
+ buf.Get ((byte*) &InType);
+DBG (if (buf.Error ()) printf (" buf empty\n");)
+ if (buf.Error ())
+ return;
+ WaitingReply = FALSE;
+ switch (InType) {
+ case ASK :
+DBG (printf (" waiting reply\n");)
+ WaitingReply = TRUE;
+ State = GOT_TYPE;
+ break;
+ case ANS :
+DBG (printf (" answer\n");)
+ case MSG :
+DBG (if (InType != ANS) printf (" msg\n");)
+DBG (printf (" got type\n");)
+ State = GOT_TYPE;
+ break;
+ case SYNC :
+ case ASYNC :
+ case OK :
+ default :
+DBG (printf (" unknown !!\n");)
+ State = WAITING;
+ }
+ if (State != GOT_TYPE)
+ break;
+ // fallthrough
+
+ case GOT_TYPE:
+DBG (printf ("GOT_TYPE\n");)
+ if (! buf.Peek ((lword*) &InLength))
+ return;
+ buf.NeedSize ((int) InLength - buf.BufLength ());
+ State = GOT_LENGTH;
+ // fallthrough
+
+ case GOT_LENGTH:
+DBG (printf ("GOT_LENGTH\n");)
+ if (buf.BufLength () < InLength)
+ return;
+ State = DONE;
+ // fallthrough
+
+ case DONE:
+DBG (printf ("DONE\n");)
+ if (waitAnswer) {
+ if (InType == ANS) {
+DBG (printf ("got answer\n");)
+ WaitingForAnswer = FALSE;
+ // answer still in the buffer
+ // the answer is converted and
+ // the buffer is flushed in Ask
+ return;
+ } else {
+ // store incoming message in a separate buffer
+ BufferedMessages = TRUE;
+ Buffered.Append (InType);
+ Buffered.Append (buf.Buffer (), InLength);
+DBG (printf ("buffering\n");)
+ }
+ } else {
+ if (InType == MSG || InType == ASK) {
+DBG (printf ("new message\n");)
+ // pass a fake buffer to the handler
+ UchMsgBuffer fake (buf, InLength);
+ if (! NewMessage (fake, WaitingReply))
+ return;
+ // *** this return breaks the assumption that
+ // *** ProcessInput empties the buffer.
+ // *** this is assumed in HandleRead/HandleSelect
+ // *** because BufferedMessages is reset to FALSE;
+ }
+DBG (else printf ("discarding\n");)
+ }
+ buf.Flush (InLength);
+ State = WAITING;
+ // fallthrough
+ }
+ }
+}
+
+/*?nodoc?*/
+void
+UchMsgStream :: HandleRead ()
+{
+ if (BufferedMessages) {
+ ProcessInput (Buffered, FALSE);
+ BufferedMessages = FALSE;
+ }
+ int n = ReadInput ();
+ if (n <= 0)
+ return;
+ ProcessInput (InBuffer, FALSE);
+}
+
+/*?nodoc?*/
+void
+UchMsgStream :: HandleWrite ()
+{
+ Flush ();
+}
+
+/*?nodoc?*/
+bool
+UchMsgStream :: HandleSelect ()
+{
+ if (BufferedMessages) {
+ ProcessInput (Buffered, FALSE);
+ BufferedMessages = FALSE;
+ }
+ return FALSE;
+}
+
+
+/*?
+This virtual function is called whenever a complete message is in the buffer.
+The buffer contains exactly one message, so that you can use \com{buf.Get(&msg)}
+to extract the message from the buffer.
+\var{ask} is TRUE if the message was sent with \fun{Ask}.
+In this case an answer must be sent back with \fun{Reply}.
+Messages can be sent before replying; they will be buffered by the receiver for later processing;
+thus, you cannot use \fun{Ask} before replying.
+\fun{NewMessage} function must return TRUE if it handled the message, else FALSE.
+If it returns FALSE, it will be called again with the same arguments next time data arrives on this channel.
+In the class \typ{UchMsgStream} this function does nothing and returns TRUE;
+if \var{ask} is TRUE, it replies immediately with an empty message.
+?*/
+bool
+UchMsgStream :: NewMessage (UchMsgBuffer&, bool ask)
+{
+ if (ask) {
+ UchMessage dummy;
+
+ Reply (dummy);
+ }
+ return TRUE;
+}
+
+/*?
+Send a message.
+If \var{flush} is TRUE, the output buffer will be flushed.
+This also happens when the message stream is in synchronous mode,
+or if the output UchMsgBuffer.has exceeded its flush size (see \fun{FlushSize}).
+?*/
+void
+UchMsgStream :: Send (UchMessage& msg, bool flush)
+{
+ OutBuffer.Append ((byte) MSG);
+ OutBuffer.Append (msg);
+ if (flush || Sync || OutBuffer.BufLength () >= OutSize)
+ Flush ();
+}
+
+/*?
+Send a message and wait for an answer.
+Incoming messages that are received while waiting for the answer are kept for later processing.
+The answer message is returned by calling the virtual function \fun{ConvertAnswer}.
+?*/
+void*
+UchMsgStream :: Ask (UchMessage& msg)
+{
+ if (WaitingReply) {
+ Error (ErrWarn, "UchMsgStream::Ask", "cannot ask before replying");
+ return 0;
+ }
+ OutBuffer.Append ((byte) ASK);
+ OutBuffer.Append (msg);
+ Flush ();
+ WaitingForAnswer = TRUE;
+
+ do {
+#ifdef DEBUG
+ printf ("waiting for answer\n");
+#endif
+ int n = ReadInput ();
+#ifdef DEBUG
+ printf (" got %d bytes\n", n);
+#endif
+ if (n <= 0)
+ return 0;
+ ProcessInput (InBuffer, TRUE);
+ } while (WaitingForAnswer);
+
+ // ProcessInput did not flush so that we can convert the answer
+ // *** If ProcessInput would return a void*, we could do all this stuff in it ...
+ UchMsgBuffer fake (InBuffer, InLength);
+ void* res = ConvertAnswer (fake);
+ InBuffer.Flush (InLength);
+ State = WAITING;
+ return res;
+}
+
+/*?
+This function must be used instead of \fun{Send} to send a reply to a message sent by \fun{Ask}.
+?*/
+void
+UchMsgStream :: Reply (UchMessage& msg)
+{
+ if (! WaitingReply) {
+ Error (ErrWarn, "UchMsgStream::Reply", "out of phase reply discarded");
+ return;
+ }
+ OutBuffer.Append ((byte) ANS);
+ OutBuffer.Append (msg);
+ Flush ();
+ WaitingReply = FALSE;
+}
+
+
+/*?
+This function is called by \fun{Ask} when the answer is in the buffer,
+in order to convert it into an object usable by the application.
+?*/
+void*
+UchMsgStream :: ConvertAnswer (UchMsgBuffer&)
+{
+ return 0;
+}
+
+/*?
+Flush the output buffer.
+This function is called automatically when the buffer exceeds its flush size,
+or after each message when this stream is in synchronous mode.
+It is also called from \fun{Ask} to wait for the answer.
+?*/
+void
+UchMsgStream :: Flush ()
+{
+ if (! OutBuffer.BufLength ())
+ return;
+ int n = Write (OutBuffer);
+ OutBuffer.Flush (n);
+}
+
+/*?
+Send a buffer containing a message.
+If \var{flush} is TRUE, the output buffer will be flushed.
+This also happens when the message stream is in synchronous mode,
+or if the output UchMsgBuffer.has exceeded its flush size (see \fun{FlushSize}).
+The buffer {\em must} contain a converted message.
+This can be used for instance from inside \fun{NewMessage} to resend
+the incoming message to another client, without having to convert
+the buffer to a message.
+?*/
+void
+UchMsgStream :: Send (UchMsgBuffer& buf, bool flush)
+{
+ OutBuffer.Append ((byte) MSG);
+ OutBuffer.Append (buf.Buffer (), buf.BufLength ());
+ if (flush || Sync || OutBuffer.BufLength () >= OutSize)
+ Flush ();
+}
+
+#ifdef DOC
+/*?
+This function defines the size of the output buffer that triggers automatic flushing
+in asynchronous mode. By default the flush size is the maximum size of the
+output buffer. As a consequence, it is changed by \fun{OutBuffer}.
+?*/
+void
+UchMsgStream :: FlushSize (int n)
+{ }
+
+/*?nextdoc?*/
+void
+UchMsgStream :: SetSyncMode (bool s)
+{ }
+
+/*?
+A message stream can be in synchronous or asynchronous mode.
+In asynchronous mode output is buffered while in synchronous mode it is not.
+Synchronous mode is usually less efficient than asynchronous mode
+because it makes more system calls to transfer data;
+however synchronous mode can be useful for debugging applications.
+?*/
+bool
+UchMsgStream :: GetSyncMode ()
+{ }
+
+#endif /* DOC */
+