diff options
Diffstat (limited to 'Bus/Ivy')
-rw-r--r-- | Bus/Ivy/ThreadedSocket.cxx | 357 |
1 files changed, 209 insertions, 148 deletions
diff --git a/Bus/Ivy/ThreadedSocket.cxx b/Bus/Ivy/ThreadedSocket.cxx index 955af12..7f36f07 100644 --- a/Bus/Ivy/ThreadedSocket.cxx +++ b/Bus/Ivy/ThreadedSocket.cxx @@ -3,29 +3,15 @@ //////////////////////////////////////////////////////////////////////
#include "stdafx.h"
-#include <stddef.h>
+
+#ifdef _DEBUG
+#define DEBUG_NEW new(__FILE__, __LINE__)
+#define new DEBUG_NEW
+#endif
#include "ThreadedSocket.h"
-CThreadedSocketException::CThreadedSocketException(char* pchMessage)
-{
- m_strMessage = pchMessage;
- m_nError = WSAGetLastError();
-}
-
-bool CThreadedSocketException::GetErrorMessage(string & lpstrError, UINT nMaxError,
- PUINT pnHelpContext /*= NULL*/)
-{
- lpstrError = m_strMessage;
- lpstrError += "error";
- if(m_nError != 0)
- {
- lpstrError += " #";
- lpstrError += m_nError;
- }
- return true;
-}
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
@@ -35,22 +21,11 @@ CThreadedSocket::CThreadedSocket() {
m_hSocket = INVALID_SOCKET;
- h_thread = NULL;
- // Create events to wait on
- // for receivier part
- m_hEvent[0] = WSACreateEvent();
- if ( m_hEvent[0] == WSA_INVALID_EVENT )
- {
- TRACE( "CThreadedSocketException(WSACreateEvent )\n");
- throw( new CThreadedSocketException( "WSACreateEvent" ));
- }
- // for transmitter part
- m_hEvent[1] = WSACreateEvent();
- if ( m_hEvent[1] == WSA_INVALID_EVENT )
- {
- TRACE( "CThreadedSocketException(WSACreateEvent )\n");
- throw( new CThreadedSocketException( "WSACreateEvent" ));
- }
+ h_reader = NULL;
+ h_writer = NULL;
+ listen_mode = false;
+ connect_pending = true;
+ send_pending = false;
}
CThreadedSocket::~CThreadedSocket()
@@ -59,14 +34,17 @@ CThreadedSocket::~CThreadedSocket() Close();
//if ( thread ) // On fait de l'auto delete mais dans le cas de terminaison anormale l'object reste ????!!!
// delete thread;
- WSACloseEvent( m_hEvent[0] );
- WSACloseEvent( m_hEvent[1] );
}
-void CThreadedSocket::Create(UINT nSocketPort, int nSocketType,
- long lEvent, const char * lpszSocketAddress)
+int CThreadedSocket::Create(UINT nSocketPort, int nSocketType, const char * lpszSocketAddress)
{
- Socket(nSocketType, lEvent);
- Bind(nSocketPort,lpszSocketAddress);
+ if ( Socket(nSocketType) < 0 )
+ return SOCKET_ERROR;
+
+ send_count = CreateSemaphore( NULL, 0, 100, NULL); // unnamed semaphore
+ if (send_count == NULL)
+ return SOCKET_ERROR;
+
+ return Bind(nSocketPort,lpszSocketAddress);
}
/////////////////////////////////////////////////////////////////////////////
@@ -99,27 +77,31 @@ void CThreadedSocket::GetSockName(string & rSocketAddress, UINT& rSocketPort) /////////////////////////////////////////////////////////////////////////////
// CAscynSocket Operations
-void CThreadedSocket::Accept(CThreadedSocket& rConnectedSocket,
+int CThreadedSocket::Accept(CThreadedSocket& rConnectedSocket,
SOCKADDR* lpSockAddr, int* lpSockAddrLen)
{
ASSERT(rConnectedSocket.m_hSocket == INVALID_SOCKET);
-
SOCKET hTemp = accept(m_hSocket, lpSockAddr, lpSockAddrLen);
if (hTemp == INVALID_SOCKET)
{
rConnectedSocket.m_hSocket = INVALID_SOCKET;
- throw( new CThreadedSocketException( "accept" ) );
+ return SOCKET_ERROR;
}
else
{
rConnectedSocket.m_hSocket = hTemp;
- rConnectedSocket.StartListener(m_EventMask);
+ rConnectedSocket.send_count = CreateSemaphore( NULL, 0, 100, NULL); // unnamed semaphore
+ if (rConnectedSocket.send_count == NULL)
+ return SOCKET_ERROR;
+
+ rConnectedSocket.StartListener();
+ return hTemp;
}
}
-void CThreadedSocket::Bind(UINT nSocketPort, const char * lpszSocketAddress)
+int CThreadedSocket::Bind(UINT nSocketPort, const char * lpszSocketAddress)
{
SOCKADDR_IN sockAddr;
@@ -135,51 +117,57 @@ void CThreadedSocket::Bind(UINT nSocketPort, const char * lpszSocketAddress) if (lResult == INADDR_NONE)
{
WSASetLastError(WSAEINVAL);
- throw( new CThreadedSocketException( "bind" ) );
+ return SOCKET_ERROR;
}
sockAddr.sin_addr.s_addr = lResult;
}
sockAddr.sin_port = htons((u_short)nSocketPort);
- Bind((SOCKADDR*)&sockAddr, sizeof(sockAddr));
+ return Bind((SOCKADDR*)&sockAddr, sizeof(sockAddr));
}
void CThreadedSocket::Close()
{
if (m_hSocket != INVALID_SOCKET)
{
- AsyncSelect(0);
ASSERT(SOCKET_ERROR != closesocket(m_hSocket));
m_hSocket = INVALID_SOCKET;
- //WSASetEvent( m_hEventObject );
+
}
}
-bool CThreadedSocket::Connect(const SOCKADDR* lpSockAddr, int nSockAddrLen)
+int CThreadedSocket::Connect(const SOCKADDR* lpSockAddr, int nSockAddrLen)
{
- if ( !connect(m_hSocket, lpSockAddr, nSockAddrLen) )
+ int ok;
+ fd_set writefds;
+
+ FD_ZERO(&writefds);
+
+ FD_SET(m_hSocket, &writefds);
+
+ ok = connect(m_hSocket, lpSockAddr, nSockAddrLen);
+ if ( !ok )
{
- DWORD err = WSAGetLastError();
- if ( err == 0 ) return true;
+ int err = GetLastError();
+ if ( err == 0 ) return err;
TRACE( "***************************************Error connect %d\n", err);
if ( err != WSAEWOULDBLOCK )
- return false;
+ return -1;
// Wait for connection to complete
TRACE( "***************************************Waiting for connection to complete\n");
- err = WSAWaitForMultipleEvents( 1, m_hEvent, false, WSA_INFINITE, true);
+ FD_SET(m_hSocket, &writefds);
+ err = select( 1, NULL, &writefds, NULL, NULL );
//TRACE( "Thread socket %d wait return %d\n", m_hSocket, err );
- if ( err == WSA_WAIT_FAILED )
- {
- return false;
- }
-
- return true;
+ if ( !err )
+ connect_pending = true;
+ return err;
}
- return true;
+ connect_pending = true;
+ return 0;
}
-bool CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort)
+int CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort)
{
ASSERT(lpszHostAddress != NULL);
@@ -199,7 +187,7 @@ bool CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort) else
{
WSASetLastError(WSAEINVAL);
- return false;
+ return SOCKET_ERROR;
}
}
@@ -211,8 +199,6 @@ bool CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort) int CThreadedSocket::Receive(void* lpBuf, int nBufLen, int nFlags)
{
int lg = recv(m_hSocket, (LPSTR)lpBuf, nBufLen, nFlags);
- if ( lg == SOCKET_ERROR )
- throw( new CThreadedSocketException( "recv" ) );
return lg;
}
@@ -230,13 +216,12 @@ int CThreadedSocket::ReceiveFrom(void* lpBuf, int nBufLen, string & rSocketAddre return nResult;
}
-void CThreadedSocket::Send(const void* lpBuf, int nBufLen, int nFlags)
+int CThreadedSocket::Send(const void* lpBuf, int nBufLen, int nFlags)
{
- if ( send(m_hSocket, (LPSTR)lpBuf, nBufLen, nFlags) != nBufLen )
- throw( new CThreadedSocketException( "send" ) );
+ return send(m_hSocket, (LPSTR)lpBuf, nBufLen, nFlags);
}
-void CThreadedSocket::SendTo(const void* lpBuf, int nBufLen, UINT nHostPort, const char * lpszHostAddress, int nFlags)
+int CThreadedSocket::SendTo(const void* lpBuf, int nBufLen, UINT nHostPort, const char * lpszHostAddress, int nFlags)
{
SOCKADDR_IN sockAddr;
@@ -258,17 +243,16 @@ void CThreadedSocket::SendTo(const void* lpBuf, int nBufLen, UINT nHostPort, con else
{
WSASetLastError(WSAEINVAL);
- throw( new CThreadedSocketException( "SendTo" ));
- return;
+ return SOCKET_ERROR;
}
}
}
sockAddr.sin_port = htons((u_short)nHostPort);
- SendTo(lpBuf, nBufLen, (SOCKADDR*)&sockAddr, sizeof(sockAddr), nFlags);
+ return SendTo(lpBuf, nBufLen, (SOCKADDR*)&sockAddr, sizeof(sockAddr), nFlags);
}
-void CThreadedSocket::AddMember( const char * lpszHostAddress )
+int CThreadedSocket::AddMember( const char * lpszHostAddress )
{
int multicast_ttl = 64; // region
struct ip_mreq imr;
@@ -289,8 +273,7 @@ void CThreadedSocket::AddMember( const char * lpszHostAddress ) else
{
WSASetLastError(WSAEINVAL);
- throw( new CThreadedSocketException( "AddMenber" ));
- return;
+ return SOCKET_ERROR;
}
}
}
@@ -299,19 +282,11 @@ void CThreadedSocket::AddMember( const char * lpszHostAddress ) SetSockOpt( IP_MULTICAST_TTL, &multicast_ttl, sizeof( multicast_ttl ), IPPROTO_IP );
SetSockOpt( IP_ADD_MEMBERSHIP, &imr, sizeof( imr ), IPPROTO_IP );
}
-
-}
-void CThreadedSocket::AsyncSelect(long lEvent)
-{
- ASSERT(m_hSocket != INVALID_SOCKET);
- if ( WSAEventSelect(m_hSocket, m_hEvent[0], lEvent) )
- throw( new CThreadedSocketException( "WSAEventSelect" ));
+ return 0;
}
+
/////////////////////////////////////////////////////////////////////////////
// CThreadedSocket Overridable callbacks
-void CThreadedSocket::OnWakeup()
-{
-}
void CThreadedSocket::OnReceive(int /*nErrorCode*/)
{
}
@@ -340,108 +315,194 @@ void CThreadedSocket::OnClose(int /*nErrorCode*/) // CThreadedSocket Implementation
-void CThreadedSocket::Socket(int nSocketType, long lEvent, int nProtocolType, int nAddressFormat)
+int CThreadedSocket::Socket(int nSocketType, int nProtocolType, int nAddressFormat)
{
ASSERT(m_hSocket == INVALID_SOCKET);
- m_hSocket = socket(nAddressFormat,nSocketType,nProtocolType);
-// m_hSocket = WSASocket ( nAddressFormat, nSocketType,nProtocolType, NULL, 0, 0 );
-
+ m_hSocket = socket(nAddressFormat,nSocketType,nProtocolType);
- if (m_hSocket == INVALID_SOCKET)
+ if ( m_hSocket < 0 )
{
- throw( new CThreadedSocketException( "socket" ));
+ return m_hSocket;
}
- StartListener(lEvent);
+ return StartListener();
+}
+int CThreadedSocket::StartListener()
+{
+ h_reader = CreateThread(NULL,0,SocketReaderProc, this, 0, &reader_id);
+ return h_reader != NULL ? 0 : SOCKET_ERROR;
+}
+int CThreadedSocket::StartWriter()
+{
+ h_writer = CreateThread(NULL,0,SocketWriterProc, this, 0, &writer_id);
+ return h_writer != NULL ? 0 : SOCKET_ERROR;
+}
+bool CThreadedSocket::SignalWriter()
+{
+ long PreviousCount = 0;
+ int err;
+ err = ReleaseSemaphore(send_count, 1, &PreviousCount);
+ TRACE("CThreadedSocket::SignalWriter() PreviousCount = %ld \n", PreviousCount );
+ return (err != 0);
}
-void CThreadedSocket::StartListener(long lEvent)
+// Implementation
+DWORD WINAPI CThreadedSocket::SocketReaderProc( LPVOID pParam )
{
- m_EventMask = lEvent;
- h_thread = CreateThread(NULL,0,SocketThreadProc, this, 0, &thread_id);
- if ( ! h_thread )
- throw( new CThreadedSocketException( "socket listener" ));
+ CThreadedSocket* pObject = (CThreadedSocket*)pParam;
+
+ if (pObject == NULL )
+ {
+ return SOCKET_ERROR; // if pObject is not valid
+ }
+
+ return pObject->SocketReader();
}
// Implementation
-DWORD WINAPI CThreadedSocket::SocketThreadProc( LPVOID pParam )
+DWORD WINAPI CThreadedSocket::SocketWriterProc( LPVOID pParam )
{
CThreadedSocket* pObject = (CThreadedSocket*)pParam;
if (pObject == NULL )
{
- throw( new CThreadedSocketException( "bad socket proc" ));
- return -1; // if pObject is not valid
+ return SOCKET_ERROR; // if pObject is not valid
}
- return pObject->SocketThread();
+ return pObject->SocketWriter();
}
-UINT CThreadedSocket::SocketThread( )
+UINT CThreadedSocket::SocketReader( )
{
- DWORD err;
- WSANETWORKEVENTS NetworkEvents;
+ int err;
+ int sock_err;
+ unsigned long readcount;
+ fd_set readfds;
+ fd_set writefds;
+ fd_set exceptfds;
+
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+ FD_ZERO(&exceptfds);
- AsyncSelect(m_EventMask) ;
while ( m_hSocket != INVALID_SOCKET )
{
- err = WSAWaitForMultipleEvents( 2, m_hEvent, false, WSA_INFINITE, true);
+ FD_SET(m_hSocket, &readfds);
+ if ( connect_pending )
+ FD_SET(m_hSocket, &writefds);
+ FD_SET(m_hSocket, &exceptfds);
+
+ err = select( 1, &readfds, &writefds, &exceptfds, NULL );
+ sock_err = GetLastError();
+
switch ( err )
{
- case WSA_WAIT_FAILED:
- TRACE( "CThreadedSocketException(WSAWaitForMultipleEvents )\n");
- throw( new CThreadedSocketException( "WSAWaitForMultipleEvents" ));
- return err;
+ case 0:
+ TRACE( "CThreadedSocket::SocketThread Unhandled Timeout Event !\n");
break;
- case WSA_WAIT_EVENT_0:
-
- NetworkEvents.lNetworkEvents=0;
-
- if ( WSAEnumNetworkEvents ( m_hSocket, m_hEvent[0], &NetworkEvents ))
- {
- TRACE( "CThreadedSocketException(WSAEnumNetworkEvents )\n");
- throw( new CThreadedSocketException( "WSAEnumNetworkEvents" ));
- return -1;
- }
+ case SOCKET_ERROR:
-
- if ( NetworkEvents.lNetworkEvents & FD_READ )
- OnReceive(NetworkEvents.iErrorCode[ FD_READ_BIT ]);
-
- if ( NetworkEvents.lNetworkEvents & FD_WRITE )
- OnSend( NetworkEvents.iErrorCode[ FD_WRITE_BIT ] );
-
- if ( NetworkEvents.lNetworkEvents & FD_ACCEPT )
- OnAccept( NetworkEvents.iErrorCode[ FD_ACCEPT_BIT ] );
-
-
- if ( NetworkEvents.lNetworkEvents & FD_CONNECT )
- OnConnect( NetworkEvents.iErrorCode[ FD_CONNECT_BIT ] );
+ TRACE( "CThreadedSocketException( select )\n");
+ Close();
+ h_reader = NULL;
+ return sock_err;
+ break;
+ default:
- if ( NetworkEvents.lNetworkEvents & FD_OOB )
- OnOutOfBandData( NetworkEvents.iErrorCode[ FD_OOB_BIT ] );
- if ( NetworkEvents.lNetworkEvents & FD_CLOSE )
+ if ( FD_ISSET(m_hSocket, &readfds) )
{
- OnClose( NetworkEvents.iErrorCode[ FD_CLOSE_BIT ] );
+ IOCtl( FIONREAD, &readcount );
+ if ( listen_mode )
+ OnAccept( sock_err );
+ else if ( ! readcount )
+ OnClose( sock_err );
+ else OnReceive(sock_err);
}
- break;
- case WSA_WAIT_EVENT_0 +1 :
- if ( m_hSocket != INVALID_SOCKET )
+ if ( FD_ISSET(m_hSocket, &writefds) )
{
- OnWakeup();
+ if ( connect_pending )
+ {
+ OnConnect( sock_err );
+ connect_pending = false;
+ }
}
- // else Close by other Thread do silent terminaison
- WSAResetEvent( m_hEvent[1] );
-
+
+ if ( FD_ISSET(m_hSocket, &exceptfds) )
+ OnOutOfBandData( sock_err );
+
+
+ break;
+ }
+ }
+ Close();
+ h_reader = NULL;
+ return 0;
+}
+
+UINT CThreadedSocket::SocketWriter( )
+{
+ int err;
+ int sock_err;
+ fd_set writefds;
+
+ FD_ZERO(&writefds);
+
+
+ while ( m_hSocket != INVALID_SOCKET )
+ {
+ DWORD dwWaitResult;
+
+ // Wait for message to send
+
+ dwWaitResult = WaitForSingleObject( send_count, INFINITE );
+
+ switch (dwWaitResult) {
+
+ // The semaphore object was signaled.
+ case WAIT_OBJECT_0:
+ // OK to really send the message.
+ break;
+
+ // Semaphore was nonsignaled, so a time-out occurred.
+ case WAIT_TIMEOUT:
+ case WAIT_FAILED:
+ default:
+ TRACE( "CThreadedSocketException( SocketWriter WaitForSingleObject )\n");
+ Close();
+ h_writer = NULL;
+ return GetLastError();
+ break;
+ }
+
+ FD_SET(m_hSocket, &writefds);
+
+ err = select( 1, NULL, &writefds, NULL, NULL );
+ sock_err = GetLastError();
+
+ switch ( err )
+ {
+ case 0:
+ TRACE( "CThreadedSocket::SocketThread Unhandled Timeout Event !\n");
+ break;
+ case SOCKET_ERROR:
+ TRACE( "CThreadedSocketException( select )\n");
+ Close();
+ h_reader = NULL;
+ return sock_err;
break;
default:
- TRACE( "CThreadedSocket::SocketThread Unhandled Events !\n");
+
+ if ( FD_ISSET(m_hSocket, &writefds) )
+ {
+ OnSend( sock_err );
+ }
+
break;
}
}
Close();
- h_thread = NULL;
+ h_writer = NULL;
return 0;
}
|