From bfe5acb474ef27c02e76522d8df33736168be762 Mon Sep 17 00:00:00 2001 From: fcolin Date: Thu, 1 Feb 2007 13:04:58 +0000 Subject: Utilisateur : Fcolin Date : 2/02/01 Heure : 18:30 Archivé dans $/Ivy Commentaire: win CE compile not finished (vss 6) --- Bus/Ivy/ThreadedSocket.cxx | 357 ++++++++++++++++++++++++++------------------- 1 file changed, 209 insertions(+), 148 deletions(-) diff --git a/Bus/Ivy/ThreadedSocket.cxx b/Bus/Ivy/ThreadedSocket.cxx index 955af12..7f36f07 100644 --- a/Bus/Ivy/ThreadedSocket.cxx +++ b/Bus/Ivy/ThreadedSocket.cxx @@ -3,29 +3,15 @@ ////////////////////////////////////////////////////////////////////// #include "stdafx.h" -#include + +#ifdef _DEBUG +#define DEBUG_NEW new(__FILE__, __LINE__) +#define new DEBUG_NEW +#endif #include "ThreadedSocket.h" -CThreadedSocketException::CThreadedSocketException(char* pchMessage) -{ - m_strMessage = pchMessage; - m_nError = WSAGetLastError(); -} - -bool CThreadedSocketException::GetErrorMessage(string & lpstrError, UINT nMaxError, - PUINT pnHelpContext /*= NULL*/) -{ - lpstrError = m_strMessage; - lpstrError += "error"; - if(m_nError != 0) - { - lpstrError += " #"; - lpstrError += m_nError; - } - return true; -} ////////////////////////////////////////////////////////////////////// // Construction/Destruction @@ -35,22 +21,11 @@ CThreadedSocket::CThreadedSocket() { m_hSocket = INVALID_SOCKET; - h_thread = NULL; - // Create events to wait on - // for receivier part - m_hEvent[0] = WSACreateEvent(); - if ( m_hEvent[0] == WSA_INVALID_EVENT ) - { - TRACE( "CThreadedSocketException(WSACreateEvent )\n"); - throw( new CThreadedSocketException( "WSACreateEvent" )); - } - // for transmitter part - m_hEvent[1] = WSACreateEvent(); - if ( m_hEvent[1] == WSA_INVALID_EVENT ) - { - TRACE( "CThreadedSocketException(WSACreateEvent )\n"); - throw( new CThreadedSocketException( "WSACreateEvent" )); - } + h_reader = NULL; + h_writer = NULL; + listen_mode = false; + connect_pending = true; + send_pending = false; } CThreadedSocket::~CThreadedSocket() @@ -59,14 +34,17 @@ CThreadedSocket::~CThreadedSocket() Close(); //if ( thread ) // On fait de l'auto delete mais dans le cas de terminaison anormale l'object reste ????!!! // delete thread; - WSACloseEvent( m_hEvent[0] ); - WSACloseEvent( m_hEvent[1] ); } -void CThreadedSocket::Create(UINT nSocketPort, int nSocketType, - long lEvent, const char * lpszSocketAddress) +int CThreadedSocket::Create(UINT nSocketPort, int nSocketType, const char * lpszSocketAddress) { - Socket(nSocketType, lEvent); - Bind(nSocketPort,lpszSocketAddress); + if ( Socket(nSocketType) < 0 ) + return SOCKET_ERROR; + + send_count = CreateSemaphore( NULL, 0, 100, NULL); // unnamed semaphore + if (send_count == NULL) + return SOCKET_ERROR; + + return Bind(nSocketPort,lpszSocketAddress); } ///////////////////////////////////////////////////////////////////////////// @@ -99,27 +77,31 @@ void CThreadedSocket::GetSockName(string & rSocketAddress, UINT& rSocketPort) ///////////////////////////////////////////////////////////////////////////// // CAscynSocket Operations -void CThreadedSocket::Accept(CThreadedSocket& rConnectedSocket, +int CThreadedSocket::Accept(CThreadedSocket& rConnectedSocket, SOCKADDR* lpSockAddr, int* lpSockAddrLen) { ASSERT(rConnectedSocket.m_hSocket == INVALID_SOCKET); - SOCKET hTemp = accept(m_hSocket, lpSockAddr, lpSockAddrLen); if (hTemp == INVALID_SOCKET) { rConnectedSocket.m_hSocket = INVALID_SOCKET; - throw( new CThreadedSocketException( "accept" ) ); + return SOCKET_ERROR; } else { rConnectedSocket.m_hSocket = hTemp; - rConnectedSocket.StartListener(m_EventMask); + rConnectedSocket.send_count = CreateSemaphore( NULL, 0, 100, NULL); // unnamed semaphore + if (rConnectedSocket.send_count == NULL) + return SOCKET_ERROR; + + rConnectedSocket.StartListener(); + return hTemp; } } -void CThreadedSocket::Bind(UINT nSocketPort, const char * lpszSocketAddress) +int CThreadedSocket::Bind(UINT nSocketPort, const char * lpszSocketAddress) { SOCKADDR_IN sockAddr; @@ -135,51 +117,57 @@ void CThreadedSocket::Bind(UINT nSocketPort, const char * lpszSocketAddress) if (lResult == INADDR_NONE) { WSASetLastError(WSAEINVAL); - throw( new CThreadedSocketException( "bind" ) ); + return SOCKET_ERROR; } sockAddr.sin_addr.s_addr = lResult; } sockAddr.sin_port = htons((u_short)nSocketPort); - Bind((SOCKADDR*)&sockAddr, sizeof(sockAddr)); + return Bind((SOCKADDR*)&sockAddr, sizeof(sockAddr)); } void CThreadedSocket::Close() { if (m_hSocket != INVALID_SOCKET) { - AsyncSelect(0); ASSERT(SOCKET_ERROR != closesocket(m_hSocket)); m_hSocket = INVALID_SOCKET; - //WSASetEvent( m_hEventObject ); + } } -bool CThreadedSocket::Connect(const SOCKADDR* lpSockAddr, int nSockAddrLen) +int CThreadedSocket::Connect(const SOCKADDR* lpSockAddr, int nSockAddrLen) { - if ( !connect(m_hSocket, lpSockAddr, nSockAddrLen) ) + int ok; + fd_set writefds; + + FD_ZERO(&writefds); + + FD_SET(m_hSocket, &writefds); + + ok = connect(m_hSocket, lpSockAddr, nSockAddrLen); + if ( !ok ) { - DWORD err = WSAGetLastError(); - if ( err == 0 ) return true; + int err = GetLastError(); + if ( err == 0 ) return err; TRACE( "***************************************Error connect %d\n", err); if ( err != WSAEWOULDBLOCK ) - return false; + return -1; // Wait for connection to complete TRACE( "***************************************Waiting for connection to complete\n"); - err = WSAWaitForMultipleEvents( 1, m_hEvent, false, WSA_INFINITE, true); + FD_SET(m_hSocket, &writefds); + err = select( 1, NULL, &writefds, NULL, NULL ); //TRACE( "Thread socket %d wait return %d\n", m_hSocket, err ); - if ( err == WSA_WAIT_FAILED ) - { - return false; - } - - return true; + if ( !err ) + connect_pending = true; + return err; } - return true; + connect_pending = true; + return 0; } -bool CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort) +int CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort) { ASSERT(lpszHostAddress != NULL); @@ -199,7 +187,7 @@ bool CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort) else { WSASetLastError(WSAEINVAL); - return false; + return SOCKET_ERROR; } } @@ -211,8 +199,6 @@ bool CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort) int CThreadedSocket::Receive(void* lpBuf, int nBufLen, int nFlags) { int lg = recv(m_hSocket, (LPSTR)lpBuf, nBufLen, nFlags); - if ( lg == SOCKET_ERROR ) - throw( new CThreadedSocketException( "recv" ) ); return lg; } @@ -230,13 +216,12 @@ int CThreadedSocket::ReceiveFrom(void* lpBuf, int nBufLen, string & rSocketAddre return nResult; } -void CThreadedSocket::Send(const void* lpBuf, int nBufLen, int nFlags) +int CThreadedSocket::Send(const void* lpBuf, int nBufLen, int nFlags) { - if ( send(m_hSocket, (LPSTR)lpBuf, nBufLen, nFlags) != nBufLen ) - throw( new CThreadedSocketException( "send" ) ); + return send(m_hSocket, (LPSTR)lpBuf, nBufLen, nFlags); } -void CThreadedSocket::SendTo(const void* lpBuf, int nBufLen, UINT nHostPort, const char * lpszHostAddress, int nFlags) +int CThreadedSocket::SendTo(const void* lpBuf, int nBufLen, UINT nHostPort, const char * lpszHostAddress, int nFlags) { SOCKADDR_IN sockAddr; @@ -258,17 +243,16 @@ void CThreadedSocket::SendTo(const void* lpBuf, int nBufLen, UINT nHostPort, con else { WSASetLastError(WSAEINVAL); - throw( new CThreadedSocketException( "SendTo" )); - return; + return SOCKET_ERROR; } } } sockAddr.sin_port = htons((u_short)nHostPort); - SendTo(lpBuf, nBufLen, (SOCKADDR*)&sockAddr, sizeof(sockAddr), nFlags); + return SendTo(lpBuf, nBufLen, (SOCKADDR*)&sockAddr, sizeof(sockAddr), nFlags); } -void CThreadedSocket::AddMember( const char * lpszHostAddress ) +int CThreadedSocket::AddMember( const char * lpszHostAddress ) { int multicast_ttl = 64; // region struct ip_mreq imr; @@ -289,8 +273,7 @@ void CThreadedSocket::AddMember( const char * lpszHostAddress ) else { WSASetLastError(WSAEINVAL); - throw( new CThreadedSocketException( "AddMenber" )); - return; + return SOCKET_ERROR; } } } @@ -299,19 +282,11 @@ void CThreadedSocket::AddMember( const char * lpszHostAddress ) SetSockOpt( IP_MULTICAST_TTL, &multicast_ttl, sizeof( multicast_ttl ), IPPROTO_IP ); SetSockOpt( IP_ADD_MEMBERSHIP, &imr, sizeof( imr ), IPPROTO_IP ); } - -} -void CThreadedSocket::AsyncSelect(long lEvent) -{ - ASSERT(m_hSocket != INVALID_SOCKET); - if ( WSAEventSelect(m_hSocket, m_hEvent[0], lEvent) ) - throw( new CThreadedSocketException( "WSAEventSelect" )); + return 0; } + ///////////////////////////////////////////////////////////////////////////// // CThreadedSocket Overridable callbacks -void CThreadedSocket::OnWakeup() -{ -} void CThreadedSocket::OnReceive(int /*nErrorCode*/) { } @@ -340,108 +315,194 @@ void CThreadedSocket::OnClose(int /*nErrorCode*/) // CThreadedSocket Implementation -void CThreadedSocket::Socket(int nSocketType, long lEvent, int nProtocolType, int nAddressFormat) +int CThreadedSocket::Socket(int nSocketType, int nProtocolType, int nAddressFormat) { ASSERT(m_hSocket == INVALID_SOCKET); - m_hSocket = socket(nAddressFormat,nSocketType,nProtocolType); -// m_hSocket = WSASocket ( nAddressFormat, nSocketType,nProtocolType, NULL, 0, 0 ); - + m_hSocket = socket(nAddressFormat,nSocketType,nProtocolType); - if (m_hSocket == INVALID_SOCKET) + if ( m_hSocket < 0 ) { - throw( new CThreadedSocketException( "socket" )); + return m_hSocket; } - StartListener(lEvent); + return StartListener(); +} +int CThreadedSocket::StartListener() +{ + h_reader = CreateThread(NULL,0,SocketReaderProc, this, 0, &reader_id); + return h_reader != NULL ? 0 : SOCKET_ERROR; +} +int CThreadedSocket::StartWriter() +{ + h_writer = CreateThread(NULL,0,SocketWriterProc, this, 0, &writer_id); + return h_writer != NULL ? 0 : SOCKET_ERROR; +} +bool CThreadedSocket::SignalWriter() +{ + long PreviousCount = 0; + int err; + err = ReleaseSemaphore(send_count, 1, &PreviousCount); + TRACE("CThreadedSocket::SignalWriter() PreviousCount = %ld \n", PreviousCount ); + return (err != 0); } -void CThreadedSocket::StartListener(long lEvent) +// Implementation +DWORD WINAPI CThreadedSocket::SocketReaderProc( LPVOID pParam ) { - m_EventMask = lEvent; - h_thread = CreateThread(NULL,0,SocketThreadProc, this, 0, &thread_id); - if ( ! h_thread ) - throw( new CThreadedSocketException( "socket listener" )); + CThreadedSocket* pObject = (CThreadedSocket*)pParam; + + if (pObject == NULL ) + { + return SOCKET_ERROR; // if pObject is not valid + } + + return pObject->SocketReader(); } // Implementation -DWORD WINAPI CThreadedSocket::SocketThreadProc( LPVOID pParam ) +DWORD WINAPI CThreadedSocket::SocketWriterProc( LPVOID pParam ) { CThreadedSocket* pObject = (CThreadedSocket*)pParam; if (pObject == NULL ) { - throw( new CThreadedSocketException( "bad socket proc" )); - return -1; // if pObject is not valid + return SOCKET_ERROR; // if pObject is not valid } - return pObject->SocketThread(); + return pObject->SocketWriter(); } -UINT CThreadedSocket::SocketThread( ) +UINT CThreadedSocket::SocketReader( ) { - DWORD err; - WSANETWORKEVENTS NetworkEvents; + int err; + int sock_err; + unsigned long readcount; + fd_set readfds; + fd_set writefds; + fd_set exceptfds; + + FD_ZERO(&readfds); + FD_ZERO(&writefds); + FD_ZERO(&exceptfds); - AsyncSelect(m_EventMask) ; while ( m_hSocket != INVALID_SOCKET ) { - err = WSAWaitForMultipleEvents( 2, m_hEvent, false, WSA_INFINITE, true); + FD_SET(m_hSocket, &readfds); + if ( connect_pending ) + FD_SET(m_hSocket, &writefds); + FD_SET(m_hSocket, &exceptfds); + + err = select( 1, &readfds, &writefds, &exceptfds, NULL ); + sock_err = GetLastError(); + switch ( err ) { - case WSA_WAIT_FAILED: - TRACE( "CThreadedSocketException(WSAWaitForMultipleEvents )\n"); - throw( new CThreadedSocketException( "WSAWaitForMultipleEvents" )); - return err; + case 0: + TRACE( "CThreadedSocket::SocketThread Unhandled Timeout Event !\n"); break; - case WSA_WAIT_EVENT_0: - - NetworkEvents.lNetworkEvents=0; - - if ( WSAEnumNetworkEvents ( m_hSocket, m_hEvent[0], &NetworkEvents )) - { - TRACE( "CThreadedSocketException(WSAEnumNetworkEvents )\n"); - throw( new CThreadedSocketException( "WSAEnumNetworkEvents" )); - return -1; - } + case SOCKET_ERROR: - - if ( NetworkEvents.lNetworkEvents & FD_READ ) - OnReceive(NetworkEvents.iErrorCode[ FD_READ_BIT ]); - - if ( NetworkEvents.lNetworkEvents & FD_WRITE ) - OnSend( NetworkEvents.iErrorCode[ FD_WRITE_BIT ] ); - - if ( NetworkEvents.lNetworkEvents & FD_ACCEPT ) - OnAccept( NetworkEvents.iErrorCode[ FD_ACCEPT_BIT ] ); - - - if ( NetworkEvents.lNetworkEvents & FD_CONNECT ) - OnConnect( NetworkEvents.iErrorCode[ FD_CONNECT_BIT ] ); + TRACE( "CThreadedSocketException( select )\n"); + Close(); + h_reader = NULL; + return sock_err; + break; + default: - if ( NetworkEvents.lNetworkEvents & FD_OOB ) - OnOutOfBandData( NetworkEvents.iErrorCode[ FD_OOB_BIT ] ); - if ( NetworkEvents.lNetworkEvents & FD_CLOSE ) + if ( FD_ISSET(m_hSocket, &readfds) ) { - OnClose( NetworkEvents.iErrorCode[ FD_CLOSE_BIT ] ); + IOCtl( FIONREAD, &readcount ); + if ( listen_mode ) + OnAccept( sock_err ); + else if ( ! readcount ) + OnClose( sock_err ); + else OnReceive(sock_err); } - break; - case WSA_WAIT_EVENT_0 +1 : - if ( m_hSocket != INVALID_SOCKET ) + if ( FD_ISSET(m_hSocket, &writefds) ) { - OnWakeup(); + if ( connect_pending ) + { + OnConnect( sock_err ); + connect_pending = false; + } } - // else Close by other Thread do silent terminaison - WSAResetEvent( m_hEvent[1] ); - + + if ( FD_ISSET(m_hSocket, &exceptfds) ) + OnOutOfBandData( sock_err ); + + + break; + } + } + Close(); + h_reader = NULL; + return 0; +} + +UINT CThreadedSocket::SocketWriter( ) +{ + int err; + int sock_err; + fd_set writefds; + + FD_ZERO(&writefds); + + + while ( m_hSocket != INVALID_SOCKET ) + { + DWORD dwWaitResult; + + // Wait for message to send + + dwWaitResult = WaitForSingleObject( send_count, INFINITE ); + + switch (dwWaitResult) { + + // The semaphore object was signaled. + case WAIT_OBJECT_0: + // OK to really send the message. + break; + + // Semaphore was nonsignaled, so a time-out occurred. + case WAIT_TIMEOUT: + case WAIT_FAILED: + default: + TRACE( "CThreadedSocketException( SocketWriter WaitForSingleObject )\n"); + Close(); + h_writer = NULL; + return GetLastError(); + break; + } + + FD_SET(m_hSocket, &writefds); + + err = select( 1, NULL, &writefds, NULL, NULL ); + sock_err = GetLastError(); + + switch ( err ) + { + case 0: + TRACE( "CThreadedSocket::SocketThread Unhandled Timeout Event !\n"); + break; + case SOCKET_ERROR: + TRACE( "CThreadedSocketException( select )\n"); + Close(); + h_reader = NULL; + return sock_err; break; default: - TRACE( "CThreadedSocket::SocketThread Unhandled Events !\n"); + + if ( FD_ISSET(m_hSocket, &writefds) ) + { + OnSend( sock_err ); + } + break; } } Close(); - h_thread = NULL; + h_writer = NULL; return 0; } -- cgit v1.1