summaryrefslogtreecommitdiff
path: root/comm
diff options
context:
space:
mode:
authorchatty1994-05-10 09:56:22 +0000
committerchatty1994-05-10 09:56:22 +0000
commit66bbcfce2bb9265505f2046ea89a1b33a4a1c212 (patch)
tree87556ff1c18565c4f2a3c5d41d675418d6cb5a9e /comm
parente2a9c878c42fc5653f53cc98a9f43aa2488fe961 (diff)
downloadivy-league-66bbcfce2bb9265505f2046ea89a1b33a4a1c212.zip
ivy-league-66bbcfce2bb9265505f2046ea89a1b33a4a1c212.tar.gz
ivy-league-66bbcfce2bb9265505f2046ea89a1b33a4a1c212.tar.bz2
ivy-league-66bbcfce2bb9265505f2046ea89a1b33a4a1c212.tar.xz
replaced TRUE/FALSE by true/false
Split into classes BufStream and MsgStream
Diffstat (limited to 'comm')
-rw-r--r--comm/MsgStream.cc460
-rw-r--r--comm/MsgStream.h69
2 files changed, 334 insertions, 195 deletions
diff --git a/comm/MsgStream.cc b/comm/MsgStream.cc
index 5f27e04..ff552cf 100644
--- a/comm/MsgStream.cc
+++ b/comm/MsgStream.cc
@@ -6,7 +6,7 @@
* Copyright 1990-1993
* Laboratoire de Recherche en Informatique (LRI)
*
- * UchMessage streams
+ * Message streams
*
* $Id$
* $CurLog$
@@ -16,14 +16,196 @@
#include "Message.h"
#include "error.h"
-// #define DEBUG
-#ifdef DEBUG
-#include <stdio.h>
-#define DBG(inst) inst
-#else
-#define DBG(inst)
-#endif
+/*?
+These constructors are similar to those of class \typ{UchSocket}.
+?*/
+UchBufStream :: UchBufStream (UchAddress* bindTo, UchAddress* connectTo)
+: UchStream (bindTo, connectTo),
+ InBuffer (),
+ OutBuffer (),
+ OutSize (128),
+ Sync (false)
+{
+}
+
+// *** this copy constructor might be automatically generated
+/*?nodoc?*/
+UchBufStream :: UchBufStream (const UchBufStream& ms)
+: UchStream (ms),
+ InBuffer (),
+ OutBuffer (),
+ OutSize (ms.OutSize),
+ Sync (ms.Sync)
+{
+}
+
+/*?nodoc?*/
+UchBufStream :: ~UchBufStream ()
+{
+ Flush ();
+ InBuffer.Clear ();
+ OutBuffer.Clear ();
+}
+
+/*?nextdoc?*/
+void
+UchBufStream :: InputBuffer (int min, int grow, int max)
+{
+ InBuffer.SetSizes (min, grow, max);
+}
+
+/*?
+Set the input and output buffer sizes.
+Default sizes are used if these functions are not called.
+?*/
+void
+UchBufStream :: OutputBuffer (int min, int grow, int max)
+{
+ OutBuffer.SetSizes (min, grow, max);
+ OutSize = max;
+}
+
+/*?
+Flush the output buffer.
+This function is called automatically when the buffer exceeds its flush size,
+or after each message when this stream is in synchronous mode.
+It is also called from \fun{Ask} to wait for the answer.
+?*/
+void
+UchBufStream :: Flush ()
+{
+ if (OutBuffer.BufLength () == 0)
+ return;
+ int n = Write (OutBuffer);
+ OutBuffer.Flush (n);
+}
+
+/*?nodoc?*/
+void
+UchBufStream :: HandleWrite ()
+{
+ Flush ();
+}
+
+
+// read stream into input buffer
+// delete the stream when an eof is received
+// return the number of bytes read
+//
+/*?hidden?*/
+int
+UchBufStream :: ReadInput ()
+{
+ if (InBuffer.BufLength () == 0)
+ InBuffer.NeedSize (128);
+ int n = Read (InBuffer);
+ if (n <= 0) {
+ if (n == -2) {
+ InBuffer.Grow ();
+ n = Read (InBuffer);
+ }
+ if (n < 0)
+ SysError (ErrWarn, "UchBufStream::ReadInput");
+ if (n <= 0)
+ Closing (n < 0 ? false : true);
+ }
+ return n;
+}
+
+/*?
+This virtual function is called when an end of file is read,
+meaning that the communication is terminated.
+It is also called if an error occurs while reading.
+You can check the value of the global variable \var{errno}:
+if it is non zero, \fun{Closing} was called because of an error. % wrong
+By default this function does nothing.
+?*/
+void
+UchBufStream :: Closing (bool)
+{
+}
+
+
+void
+UchBufStream :: WriteLong (lword l)
+{
+ OutBuffer << l;
+}
+
+void
+UchBufStream :: WriteShort (sword s)
+{
+ OutBuffer << s;
+}
+
+void
+UchBufStream :: WriteByte (byte b)
+{
+ OutBuffer << b;
+}
+
+void
+UchBufStream :: WriteChar (char c)
+{
+ OutBuffer << c;
+}
+
+void
+UchBufStream :: WriteString (const char* s)
+{
+ OutBuffer << s;
+}
+
+void
+UchBufStream :: WriteBuf (const byte* b, int n)
+{
+ OutBuffer.WriteBuf (b, n);
+}
+
+void
+UchBufStream :: ReadLong (lword& l)
+{
+ InBuffer >> l;
+}
+
+void
+UchBufStream :: ReadShort (sword& s)
+{
+ InBuffer >> s;
+}
+
+void
+UchBufStream :: ReadByte (byte& b)
+{
+ InBuffer >> b;
+}
+
+void
+UchBufStream :: ReadChar (char& c)
+{
+ InBuffer >> c;
+}
+
+void
+UchBufStream :: ReadString (char* s)
+{
+ InBuffer >> s;
+}
+
+void
+UchBufStream :: ReadString (CcuString& s)
+{
+ InBuffer >> s;
+}
+
+void
+UchBufStream :: ReadBuf (byte* b, int n)
+{
+ InBuffer.Get (b, n);
+}
+
+
/*?class UchMsgStream
An object of class \typ{UchMsgStream} is a stream that sends and receives messages:
@@ -90,53 +272,28 @@ redefine the virtual functions \fun{NewMessage} and \fun{ConvertAnswer}.
These constructors are similar to those of class \typ{UchSocket}.
?*/
UchMsgStream :: UchMsgStream (UchAddress* bindTo, UchAddress* connectTo)
-: UchStream (bindTo, connectTo), InBuffer (), OutBuffer (), Buffered ()
+: UchBufStream (bindTo, connectTo),
+ Buffered ()
{
- OutSize = 128;
State = WAITING;
- BufferedMessages = FALSE;
- WaitingForAnswer = FALSE;
- WaitingReply = FALSE;
- Sync = FALSE;
+ BufferedMessages = false;
+ WaitingReply = false;
}
// *** this copy constructor might be automatically generated
/*?nodoc?*/
UchMsgStream :: UchMsgStream (const UchMsgStream& ms)
-: UchStream (*(UchMsgStream*)&ms), InBuffer (), OutBuffer (), Buffered (*(UchMsgBuffer*)&ms.Buffered)
+: UchBufStream (ms), Buffered (*(UchMsgBuffer*)&ms.Buffered)
{
- OutSize = ms.OutSize;
State = ms.State;
BufferedMessages = ms.BufferedMessages;
- WaitingForAnswer = ms.WaitingForAnswer;
WaitingReply = ms.WaitingReply;
- Sync = ms.Sync;
}
-/*?nextdoc?*/
-void
-UchMsgStream :: InputBuffer (int min, int grow, int max)
-{
- InBuffer.SetSizes (min, grow, max);
-}
-
-/*?
-Set the input and output buffer sizes.
-Default sizes are used if these functions are not called.
-?*/
-void
-UchMsgStream :: OutputBuffer (int min, int grow, int max)
-{
- OutBuffer.SetSizes (min, grow, max);
- OutSize = max;
-}
/*?nodoc?*/
UchMsgStream :: ~UchMsgStream ()
{
- Flush ();
- InBuffer.Clear ();
- OutBuffer.Clear ();
Buffered.Clear ();
}
@@ -148,46 +305,6 @@ UchMsgStream :: Copy () const
}
-// read stream into input buffer
-// delete the stream when an oef is received
-// return the number of bytes read
-//
-/*?hidden?*/
-int
-UchMsgStream :: ReadInput ()
-{
- if (! InBuffer.BufLength ())
- InBuffer.NeedSize (128);
- int n = Read (InBuffer);
- if (n <= 0) {
- if (n == -2) {
-//printf ("ReadInput: growing\n", n);
- InBuffer.Grow ();
- n = Read (InBuffer);
- }
-//printf ("ReadInput: %d bytes\n", n);
- if (n < 0)
- SysError (ErrWarn, "UchMsgStream::HandleRead");
- if (n <= 0)
- Delete ();
- }
- return n;
-}
-
-/*?
-This virtual function is called when an end of file is read,
-meaning that the communication is terminated.
-It is also called if an error occurs while reading.
-You can check the value of the global variable \var{errno}:
-if it is non zero, \fun{Delete} was called because of an error.
-By default this function does nothing.
-?*/
-void
-UchMsgStream :: Delete ()
-{
- // nothing
-}
-
// process the input buffer
// waitAnswer indicates whether we are waiting for an answer
// this functions uses a very simple automaton:
@@ -199,32 +316,32 @@ UchMsgStream :: Delete ()
// DONE: a full message is in the buffer
//
// WaitingReply is true if a question has been received and Reply has not been called yet
-// WaitingForAnswer is true when Ask has been called and the answer is not yet there
//
/*?hidden?*/
-void
-UchMsgStream :: ProcessInput (UchMsgBuffer& buf, bool waitAnswer)
+/*!
+Return a message when it's been fully read.
+!*/
+UchMessage*
+UchMsgStream :: Process (UchMsgBuffer& buf, bool waitAnswer)
{
for (;;) {
switch (State) {
case WAITING:
- buf.Get (InType);
+ buf.ReadByte (InType);
if (buf.Error ())
- return;
- WaitingReply = FALSE;
+ return 0;
+ WaitingReply = false;
switch (InType) {
- case ASK :
- WaitingReply = TRUE;
- State = GOT_TYPE;
- break;
- case ANS :
- case MSG :
+ case ASK:
+ WaitingReply = true;
+ case ANS:
+ case MSG:
State = GOT_TYPE;
break;
- case SYNC :
- case ASYNC :
- case OK :
- default :
+ case SYNC:
+ case ASYNC:
+ case OK:
+ default:
State = WAITING;
}
if (State != GOT_TYPE)
@@ -233,41 +350,41 @@ UchMsgStream :: ProcessInput (UchMsgBuffer& buf, bool waitAnswer)
case GOT_TYPE:
if (! buf.Peek ((lword*) &InLength))
- return;
+ return 0;
buf.NeedSize ((int) InLength - buf.BufLength ());
State = GOT_LENGTH;
// fallthrough
case GOT_LENGTH:
if (buf.BufLength () < InLength)
- return;
+ return 0;
State = DONE;
// fallthrough
case DONE:
if (waitAnswer) {
if (InType == ANS) {
- WaitingForAnswer = FALSE;
- // answer still in the buffer
- // the answer is converted and
- // the buffer is flushed in Ask
- return;
+ UchMsgBuffer fake (buf, InLength);
+ UchMessage* ans = ConvertAnswer (fake);
+ buf.Flush (InLength);
+ State = WAITING;
+ return ans;
} else {
// store incoming message in a separate buffer
- BufferedMessages = TRUE;
- Buffered.Append (InType);
- Buffered.Append (buf.Buffer (), InLength);
+ BufferedMessages = true;
+ Buffered.WriteByte (InType);
+ Buffered.WriteBuf (buf.Buffer (), InLength);
}
} else {
if (InType == MSG || InType == ASK) {
// pass a fake buffer to the handler
UchMsgBuffer fake (buf, InLength);
if (! NewMessage (fake, WaitingReply))
- return;
+ return 0;
// *** this return breaks the assumption that
- // *** ProcessInput empties the buffer.
+ // *** Process empties the buffer.
// *** this is assumed in HandleRead/HandleSelect
- // *** because BufferedMessages is reset to FALSE;
+ // *** because BufferedMessages is reset to false;
}
}
buf.Flush (InLength);
@@ -282,20 +399,13 @@ void
UchMsgStream :: HandleRead ()
{
if (BufferedMessages) {
- ProcessInput (Buffered, FALSE);
- BufferedMessages = FALSE;
+ Process (Buffered, false);
+ BufferedMessages = false;
}
- int n = ReadInput ();
- if (n <= 0)
+ if (ReadInput () <= 0)
return;
- ProcessInput (InBuffer, FALSE);
-}
-/*?nodoc?*/
-void
-UchMsgStream :: HandleWrite ()
-{
- Flush ();
+ Process (InBuffer, false);
}
/*?nodoc?*/
@@ -303,52 +413,78 @@ bool
UchMsgStream :: HandleSelect ()
{
if (BufferedMessages) {
- ProcessInput (Buffered, FALSE);
- BufferedMessages = FALSE;
+ Process (Buffered, false);
+ BufferedMessages = false;
}
- return FALSE;
+ return false;
}
/*?
This virtual function is called whenever a complete message is in the buffer.
-The buffer contains exactly one message, so that you can use \com{buf.Get (msg)}
+The buffer contains exactly one message, so that you can use \com{buf.ReadMsg (msg)}
to extract the message from the buffer.
-\var{ask} is TRUE if the message was sent with \fun{Ask}.
+\var{ask} is true if the message was sent with \fun{Ask}.
In this case an answer must be sent back with \fun{Reply}.
-Messages can be sent before replying; they will be buffered by the receiver for later processing;
-thus, you cannot use \fun{Ask} before replying.
-\fun{NewMessage} function must return TRUE if it handled the message, else FALSE.
-If it returns FALSE, it will be called again with the same arguments next time data arrives on this channel.
-In the class \typ{UchMsgStream} this function does nothing and returns TRUE;
-if \var{ask} is TRUE, it replies immediately with an empty message.
+Messages can be sent before replying; they will be buffered by the receiver for later processing.
+But you cannot use \fun{Ask} before replying.
+\fun{NewMessage} function must return true if it handled the message, else false.
+If it returns false, it will be called again with the same arguments next time data arrives on this channel.
+In the class \typ{UchMsgStream} this function does nothing and returns true;
+if \var{ask} is true, it replies immediately with an empty message.
?*/
bool
UchMsgStream :: NewMessage (UchMsgBuffer&, bool ask)
{
if (ask) {
- UchMessage dummy;
-
+ UchMessage dummy;
Reply (dummy);
}
- return TRUE;
+ return true;
}
/*?
Send a message.
-If \var{flush} is TRUE, the output buffer will be flushed.
+If \var{flush} is true, the output buffer will be flushed.
This also happens when the message stream is in synchronous mode,
-or if the output UchMsgBuffer.has exceeded its flush size (see \fun{FlushSize}).
+or if the output buffer has exceeded its flush size (see \fun{FlushSize}).
?*/
void
UchMsgStream :: Send (UchMessage& msg, bool flush)
{
- OutBuffer.Append ((byte) MSG);
- OutBuffer.Append (msg);
+ OutBuffer.WriteByte (MSG);
+ WriteMsg (msg);
if (flush || Sync || OutBuffer.BufLength () >= OutSize)
Flush ();
}
+void
+UchMsgStream :: WriteMsg (UchMessage& msg)
+{
+ OutBuffer.WriteMsg (msg);
+}
+
+bool
+UchMsgStream :: ReadMsg (UchMessage& msg)
+{
+ UchMsgBuffer& buf = BufferedMessages ? Buffered : InBuffer;
+ int l = buf.BufLength (); // store current offset in the buffer
+ lword msglen;
+ buf >> msglen;
+ if (l < msglen)
+ return false;
+ msg.ReadFrom (*this, msglen);
+ /* skip end of msg if necessary */
+ int rl = l - buf.BufLength ();
+ if (rl == msglen)
+ return true;
+ else if (rl < msglen) {
+ buf.Flush ((int)(msglen) - rl);
+ return true;
+ } else
+ return false;
+}
+
/*?
Send a message and wait for an answer.
Incoming messages that are received while waiting for the answer are kept for later processing.
@@ -358,28 +494,21 @@ UchMessage*
UchMsgStream :: Ask (UchMessage& msg)
{
if (WaitingReply) {
- Error (ErrWarn, "UchMsgStream::Ask", "cannot ask before replying");
+ ::Error (ErrWarn, "UchMsgStream::Ask", "cannot ask before replying");
return 0;
}
- OutBuffer.Append ((byte) ASK);
- OutBuffer.Append (msg);
+ OutBuffer.WriteByte (ASK);
+ WriteMsg (msg);
Flush ();
- WaitingForAnswer = TRUE;
+ UchMessage* ans = 0;
do {
- int n = ReadInput ();
- if (n <= 0)
+ if (ReadInput () <= 0)
return 0;
- ProcessInput (InBuffer, TRUE);
- } while (WaitingForAnswer);
+ ans = Process (InBuffer, true);
+ } while (!ans);
- // ProcessInput did not flush so that we can convert the answer
- // *** If ProcessInput would return a void*, we could do all this stuff in it ...
- UchMsgBuffer fake (InBuffer, InLength);
- UchMessage* res = ConvertAnswer (fake);
- InBuffer.Flush (InLength);
- State = WAITING;
- return res;
+ return ans;
}
/*?
@@ -389,13 +518,13 @@ void
UchMsgStream :: Reply (UchMessage& msg)
{
if (! WaitingReply) {
- Error (ErrWarn, "UchMsgStream::Reply", "out of phase reply discarded");
+ ::Error (ErrWarn, "UchMsgStream::Reply", "out of phase reply discarded");
return;
}
- OutBuffer.Append ((byte) ANS);
- OutBuffer.Append (msg);
+ OutBuffer.WriteByte ((byte) ANS);
+ WriteMsg (msg);
Flush ();
- WaitingReply = FALSE;
+ WaitingReply = false;
}
@@ -410,23 +539,8 @@ UchMsgStream :: ConvertAnswer (UchMsgBuffer&)
}
/*?
-Flush the output buffer.
-This function is called automatically when the buffer exceeds its flush size,
-or after each message when this stream is in synchronous mode.
-It is also called from \fun{Ask} to wait for the answer.
-?*/
-void
-UchMsgStream :: Flush ()
-{
- if (! OutBuffer.BufLength ())
- return;
- int n = Write (OutBuffer);
- OutBuffer.Flush (n);
-}
-
-/*?
Send a buffer containing a message.
-If \var{flush} is TRUE, the output buffer will be flushed.
+If \var{flush} is true, the output buffer will be flushed.
This also happens when the message stream is in synchronous mode,
or if the output UchMsgBuffer.has exceeded its flush size (see \fun{FlushSize}).
The buffer {\em must} contain a converted message.
@@ -437,8 +551,8 @@ the buffer to a message.
void
UchMsgStream :: Send (UchMsgBuffer& buf, bool flush)
{
- OutBuffer.Append ((byte) MSG);
- OutBuffer.Append (buf.Buffer (), buf.BufLength ());
+ OutBuffer.WriteByte ((byte) MSG);
+ OutBuffer.WriteBuf (buf.Buffer (), buf.BufLength ());
if (flush || Sync || OutBuffer.BufLength () >= OutSize)
Flush ();
}
diff --git a/comm/MsgStream.h b/comm/MsgStream.h
index bfa8e78..b33437a 100644
--- a/comm/MsgStream.h
+++ b/comm/MsgStream.h
@@ -20,51 +20,76 @@
#include "MsgBuffer.h"
class UchMessage;
-class UchMsgStream : public UchStream {
-private:
-
+class UchBufStream : public UchStream {
protected:
- UchMsgBuffer InBuffer;
- int InSize;
- UchMsgBuffer OutBuffer;
+ UchMsgBuffer InBuffer;
+ UchMsgBuffer OutBuffer;
int OutSize;
+ bool Sync;
+
+ UchBufStream (const UchBufStream&);
+ int ReadInput ();
+ void HandleWrite ();
+
+ void WriteLong (lword);
+ void WriteShort (sword);
+ void WriteByte (byte);
+ void WriteChar (char);
+ void WriteString (const char*);
+ void WriteBuf (const byte*, int);
+
+
+ void ReadLong (lword&);
+ void ReadShort (sword&);
+ void ReadByte (byte&);
+ void ReadChar (char&);
+ void ReadString (char*);
+ void ReadString (CcuString&);
+ void ReadBuf (byte*, int);
+
+public:
+ UchBufStream (UchAddress* = 0, UchAddress* = 0);
+ ~UchBufStream ();
+
+ void InputBuffer (int min, int grow, int max);
+ void OutputBuffer (int min, int grow, int max);
+inline bool GetSyncMode () { return Sync; }
+inline void SetSyncMode (bool s) { Sync = s; Flush (); }
+inline void FlushSize (int n) { OutSize = n; }
+virtual void Flush ();
+
+virtual void Closing (bool);
+};
+
+class UchMsgStream : public UchBufStream {
+protected:
enum STATE { WAITING, GOT_TYPE, GOT_LENGTH, DONE};
enum TYPE { MSG = 1, ASK, ANS, SYNC, ASYNC, OK };
STATE State;
bool BufferedMessages;
UchMsgBuffer Buffered;
- bool WaitingForAnswer;
bool WaitingReply;
int InLength;
byte InType;
- bool Sync;
- void ProcessInput (UchMsgBuffer&, bool);
- int ReadInput ();
+ UchMessage* Process (UchMsgBuffer&, bool);
+ UchMsgStream (const UchMsgStream&);
+ void WriteMsg (UchMessage&);
+ bool ReadMsg (UchMessage&);
public:
- UchMsgStream (const UchMsgStream&);
UchMsgStream (UchAddress* = 0, UchAddress* = 0);
~UchMsgStream ();
- void InputBuffer (int min, int grow, int max);
- void OutputBuffer (int min, int grow, int max);
-inline void FlushSize (int n) { OutSize = n; }
UchChannel* Copy () const;
void HandleRead ();
- void HandleWrite ();
bool HandleSelect ();
-inline bool GetSyncMode () { return Sync; }
-inline void SetSyncMode (bool s) { Sync = s; Flush (); }
virtual bool NewMessage (UchMsgBuffer&, bool);
virtual UchMessage* ConvertAnswer (UchMsgBuffer&);
-virtual void Delete ();
- void Send (UchMessage&, bool = FALSE);
+ void Send (UchMessage&, bool = false);
UchMessage* Ask (UchMessage&);
void Reply (UchMessage&);
- void Flush ();
- void Send (UchMsgBuffer&, bool = FALSE);
+ void Send (UchMsgBuffer&, bool = false);
};
#endif /* MsgStream_H_ */
-