summaryrefslogtreecommitdiff
path: root/Ivy/ThreadedSocket.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'Ivy/ThreadedSocket.cxx')
-rw-r--r--Ivy/ThreadedSocket.cxx547
1 files changed, 547 insertions, 0 deletions
diff --git a/Ivy/ThreadedSocket.cxx b/Ivy/ThreadedSocket.cxx
new file mode 100644
index 0000000..ee1e03b
--- /dev/null
+++ b/Ivy/ThreadedSocket.cxx
@@ -0,0 +1,547 @@
+// ThreadedSocket.cpp: implementation of the CThreadedSocket class.
+//
+//////////////////////////////////////////////////////////////////////
+
+#include "IvyStdAfx.h"
+
+
+#include "ThreadedSocket.h"
+
+
+
+//////////////////////////////////////////////////////////////////////
+// Construction/Destruction
+//////////////////////////////////////////////////////////////////////
+
+CThreadedSocket::CThreadedSocket()
+{
+
+ m_hSocket = INVALID_SOCKET;
+ h_reader = NULL;
+ h_writer = NULL;
+ reader_id = 0;
+ writer_id = 0;
+ listen_mode = false;
+ connect_pending = true;
+ send_pending = false;
+}
+
+CThreadedSocket::~CThreadedSocket()
+{
+ if (m_hSocket != INVALID_SOCKET)
+ Close();
+}
+int CThreadedSocket::Create(UINT nSocketPort, int nSocketType, const char * lpszSocketAddress)
+{
+ if ( Socket(nSocketType) < 0 )
+ return SOCKET_ERROR;
+
+ send_count = CreateSemaphore( NULL, 0, 100, NULL); // unnamed semaphore
+ if (send_count == NULL)
+ return SOCKET_ERROR;
+
+ return Bind(nSocketPort,lpszSocketAddress);
+}
+
+/////////////////////////////////////////////////////////////////////////////
+// CThreadedSocket Attributes
+
+
+
+void CThreadedSocket::GetPeerName(ivy::string & rPeerAddress, UINT& rPeerPort)
+{
+ SOCKADDR_IN sockAddr;
+ memset(&sockAddr, 0, sizeof(sockAddr));
+
+ int nSockAddrLen = sizeof(sockAddr);
+ GetPeerName((SOCKADDR*)&sockAddr, &nSockAddrLen);
+ rPeerPort = ntohs(sockAddr.sin_port);
+ rPeerAddress = inet_ntoa(sockAddr.sin_addr);
+}
+
+void CThreadedSocket::GetSockName(ivy::string & rSocketAddress, UINT& rSocketPort)
+{
+ SOCKADDR_IN sockAddr;
+ memset(&sockAddr, 0, sizeof(sockAddr));
+
+ int nSockAddrLen = sizeof(sockAddr);
+ GetSockName((SOCKADDR*)&sockAddr, &nSockAddrLen);
+ rSocketPort = ntohs(sockAddr.sin_port);
+ rSocketAddress = inet_ntoa(sockAddr.sin_addr);
+}
+
+/////////////////////////////////////////////////////////////////////////////
+// CAscynSocket Operations
+
+SOCKET CThreadedSocket::Accept(CThreadedSocket& rConnectedSocket,
+ SOCKADDR* lpSockAddr, int* lpSockAddrLen)
+{
+ ASSERT(rConnectedSocket.m_hSocket == INVALID_SOCKET);
+
+ SOCKET hTemp = accept(m_hSocket, lpSockAddr, lpSockAddrLen);
+
+ if (hTemp == INVALID_SOCKET)
+ {
+ rConnectedSocket.m_hSocket = INVALID_SOCKET;
+ return SOCKET_ERROR;
+ }
+ else
+ {
+ rConnectedSocket.m_hSocket = hTemp;
+ rConnectedSocket.send_count = CreateSemaphore( NULL, 0, 100, NULL); // unnamed semaphore
+ if (rConnectedSocket.send_count == NULL)
+ return SOCKET_ERROR;
+
+ rConnectedSocket.StartListener();
+ return hTemp;
+ }
+}
+
+int CThreadedSocket::Bind(UINT nSocketPort, const char * lpszSocketAddress)
+{
+
+ SOCKADDR_IN sockAddr;
+ memset(&sockAddr,0,sizeof(sockAddr));
+
+ sockAddr.sin_family = AF_INET;
+
+ if (lpszSocketAddress == NULL)
+ sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ else
+ {
+ DWORD lResult = inet_addr(lpszSocketAddress);
+ if (lResult == INADDR_NONE)
+ {
+ WSASetLastError(WSAEINVAL);
+ return SOCKET_ERROR;
+ }
+ sockAddr.sin_addr.s_addr = lResult;
+ }
+
+ sockAddr.sin_port = htons((u_short)nSocketPort);
+
+ return Bind((SOCKADDR*)&sockAddr, sizeof(sockAddr));
+}
+
+void CThreadedSocket::Close()
+{
+ long PreviousCount = 0;
+ int err;
+ DWORD currentThreadId;
+
+
+ if (m_hSocket != INVALID_SOCKET)
+ {
+ currentThreadId = GetCurrentThreadId();
+
+ //ASSERT(SOCKET_ERROR != closesocket(m_hSocket));
+ SOCKET temp = m_hSocket; // Thread ACK
+ m_hSocket = INVALID_SOCKET;
+ TRACE( "CThreadedSocket::Close (reader=0x%0lx) (writer=0x%0lx)\n", reader_id, writer_id );
+ closesocket(temp); // close silently
+
+ //if ( thread ) // On fait de l'auto delete mais dans le cas de terminaison anormale l'object reste ????!!!
+ // delete thread;
+// TRACE("CThreadedSocket waiting for thread end ...\n");
+ if ( h_reader && currentThreadId != reader_id )
+ WaitForSingleObject( h_reader, 5000 );
+ // wake up writer
+ if ( h_writer && currentThreadId != writer_id )
+ {
+ err = ReleaseSemaphore(send_count, 1, &PreviousCount);
+ WaitForSingleObject( h_writer, 5000 );
+ }
+// TRACE("CThreadedSocket all thread ended\n");
+ }
+}
+int CThreadedSocket::Listen(int nConnectionBacklog)
+{
+ int err = listen(m_hSocket, nConnectionBacklog);
+ if ( !err )
+ listen_mode = true;
+ return StartListener();
+}
+
+int CThreadedSocket::Connect(const SOCKADDR* lpSockAddr, int nSockAddrLen)
+{
+ int ok;
+ fd_set writefds;
+
+ FD_ZERO(&writefds);
+
+ FD_SET(m_hSocket, &writefds);
+
+ ok = connect(m_hSocket, lpSockAddr, nSockAddrLen);
+ if ( ok != 0 )
+ {
+ int err = this->GetLastError();
+ if ( err == 0 ) return err;
+ TRACE( "***************************************Error connect %d\n", err);
+ if ( err != WSAEWOULDBLOCK )
+ return -1;
+ // Wait for connection to complete
+ TRACE( "***************************************Waiting for connection to complete\n");
+ FD_SET(m_hSocket, &writefds);
+ err = select( 1, NULL, &writefds, NULL, NULL );
+ //TRACE( "Thread socket %d wait return %d\n", m_hSocket, err );
+ if ( !err )
+ connect_pending = true;
+ return err;
+ }
+ connect_pending = true;
+ return StartListener();
+}
+
+int CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort)
+{
+
+ ASSERT(lpszHostAddress != NULL);
+
+ SOCKADDR_IN sockAddr;
+ memset(&sockAddr,0,sizeof(sockAddr));
+
+ sockAddr.sin_family = AF_INET;
+ sockAddr.sin_addr.s_addr = inet_addr(lpszHostAddress);
+
+ if (sockAddr.sin_addr.s_addr == INADDR_NONE)
+ {
+ LPHOSTENT lphost;
+ lphost = gethostbyname(lpszHostAddress);
+ if (lphost != NULL)
+ sockAddr.sin_addr.s_addr = ((LPIN_ADDR)lphost->h_addr)->s_addr;
+ else
+ {
+ WSASetLastError(WSAEINVAL);
+ return SOCKET_ERROR;
+ }
+ }
+
+ sockAddr.sin_port = htons((u_short)nHostPort);
+
+ return Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr));
+}
+
+size_t CThreadedSocket::Receive(void* lpBuf, size_t nBufLen, int nFlags)
+{
+ int lg = recv(m_hSocket, (LPSTR)lpBuf, (int)nBufLen, nFlags);
+ return lg;
+}
+
+size_t CThreadedSocket::ReceiveFrom(void* lpBuf, size_t nBufLen, ivy::string & rSocketAddress, UINT& rSocketPort, int nFlags)
+{
+ SOCKADDR_IN sockAddr;
+
+ memset(&sockAddr, 0, sizeof(sockAddr));
+
+ size_t nSockAddrLen = sizeof(sockAddr);
+ size_t nResult = ReceiveFrom(lpBuf, nBufLen, (SOCKADDR*)&sockAddr, &nSockAddrLen, nFlags);
+ rSocketPort = ntohs(sockAddr.sin_port);
+ rSocketAddress = inet_ntoa(sockAddr.sin_addr);
+
+ return nResult;
+}
+
+size_t CThreadedSocket::Send(const void* lpBuf, size_t nBufLen, int nFlags)
+{
+ return send(m_hSocket, (LPSTR)lpBuf, (int)nBufLen, nFlags);
+}
+
+size_t CThreadedSocket::SendTo(const void* lpBuf, size_t nBufLen, UINT nHostPort, const char * lpszHostAddress, int nFlags)
+{
+ SOCKADDR_IN sockAddr;
+
+ memset(&sockAddr,0,sizeof(sockAddr));
+
+ sockAddr.sin_family = AF_INET;
+
+ if (lpszHostAddress == NULL)
+ sockAddr.sin_addr.s_addr = htonl(INADDR_BROADCAST);
+ else
+ {
+ sockAddr.sin_addr.s_addr = inet_addr(lpszHostAddress);
+ if (sockAddr.sin_addr.s_addr == INADDR_NONE)
+ {
+ LPHOSTENT lphost;
+ lphost = gethostbyname(lpszHostAddress);
+ if (lphost != NULL)
+ sockAddr.sin_addr.s_addr = ((LPIN_ADDR)lphost->h_addr)->s_addr;
+ else
+ {
+ WSASetLastError(WSAEINVAL);
+ return SOCKET_ERROR;
+ }
+ }
+ }
+
+ sockAddr.sin_port = htons((u_short)nHostPort);
+
+ return SendTo(lpBuf, nBufLen, (SOCKADDR*)&sockAddr, sizeof(sockAddr), nFlags);
+ }
+int CThreadedSocket::AddMember( const char * lpszHostAddress )
+{
+ int multicast_ttl = 64; // region
+ struct ip_mreq imr;
+
+ TRACE("CThreadedSocket::AddMember adding %s\n", lpszHostAddress );
+ imr.imr_multiaddr.s_addr = INADDR_ANY;
+ imr.imr_interface.s_addr = INADDR_ANY;
+
+ if (lpszHostAddress )
+ {
+ imr.imr_multiaddr.s_addr = inet_addr(lpszHostAddress);
+ if (imr.imr_multiaddr.s_addr == INADDR_NONE)
+ {
+ LPHOSTENT lphost;
+ lphost = gethostbyname(lpszHostAddress);
+ if (lphost != NULL)
+ imr.imr_multiaddr.s_addr = ((LPIN_ADDR)lphost->h_addr)->s_addr;
+ else
+ {
+ WSASetLastError(WSAEINVAL);
+ return SOCKET_ERROR;
+ }
+ }
+ }
+ if ( IN_CLASSD( htonl(imr.imr_multiaddr.s_addr) ) )
+ {
+ SetSockOpt( IP_MULTICAST_TTL, &multicast_ttl, sizeof( multicast_ttl ), IPPROTO_IP );
+ SetSockOpt( IP_ADD_MEMBERSHIP, &imr, sizeof( imr ), IPPROTO_IP );
+ }
+ return 0;
+}
+
+/////////////////////////////////////////////////////////////////////////////
+// CThreadedSocket Overridable callbacks
+void CThreadedSocket::OnReceive(int /*nErrorCode*/)
+{
+}
+
+void CThreadedSocket::OnSend(int /*nErrorCode*/)
+{
+}
+
+void CThreadedSocket::OnOutOfBandData(int /*nErrorCode*/)
+{
+}
+
+void CThreadedSocket::OnAccept(int /*nErrorCode*/)
+{
+}
+
+void CThreadedSocket::OnConnect(int /*nErrorCode*/)
+{
+}
+
+void CThreadedSocket::OnClose(int /*nErrorCode*/)
+{
+}
+
+/////////////////////////////////////////////////////////////////////////////
+// CThreadedSocket Implementation
+
+
+SOCKET CThreadedSocket::Socket(int nSocketType, int nProtocolType, int nAddressFormat)
+{
+ ASSERT(m_hSocket == INVALID_SOCKET);
+
+ m_hSocket = socket(nAddressFormat,nSocketType,nProtocolType);
+
+ return m_hSocket;
+ //return StartListener();
+}
+
+int CThreadedSocket::StartListener()
+{
+ h_reader = CreateThread(NULL,0,SocketReaderProc, this, 0, &reader_id);
+ return h_reader != NULL ? 0 : SOCKET_ERROR;
+}
+int CThreadedSocket::StartWriter()
+{
+ h_writer = CreateThread(NULL,0,SocketWriterProc, this, 0, &writer_id);
+ return h_writer != NULL ? 0 : SOCKET_ERROR;
+}
+bool CThreadedSocket::SignalWriter()
+{
+ long PreviousCount = 0;
+ BOOL err;
+ err = ReleaseSemaphore(send_count, 1, &PreviousCount);
+// TRACE("CThreadedSocket::SignalWriter() PreviousCount = %ld \n", PreviousCount );
+ return (err != 0);
+}
+
+// Implementation
+DWORD WINAPI CThreadedSocket::SocketReaderProc( LPVOID pParam )
+{
+ CThreadedSocket* pObject = (CThreadedSocket*)pParam;
+
+ if (pObject == NULL )
+ {
+ return SOCKET_ERROR; // if pObject is not valid
+ }
+
+ return pObject->SocketReader();
+}
+// Implementation
+DWORD WINAPI CThreadedSocket::SocketWriterProc( LPVOID pParam )
+{
+ CThreadedSocket* pObject = (CThreadedSocket*)pParam;
+
+ if (pObject == NULL )
+ {
+ return SOCKET_ERROR; // if pObject is not valid
+ }
+
+ return pObject->SocketWriter();
+}
+
+UINT CThreadedSocket::SocketReader( )
+{
+ int err;
+ int sock_err;
+ unsigned long readcount = 0;
+ fd_set readfds;
+ fd_set writefds;
+ fd_set exceptfds;
+
+ TRACE( "CThreadedSocket::SocketReader( START thread_id =( 0x%x) )\n",reader_id);
+
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+ FD_ZERO(&exceptfds);
+
+ while ( m_hSocket != INVALID_SOCKET )
+ {
+ FD_SET(m_hSocket, &readfds);
+ if ( connect_pending )
+ FD_SET(m_hSocket, &writefds);
+ else
+ FD_CLR(m_hSocket, &writefds);
+ FD_SET(m_hSocket, &exceptfds);
+
+ err = select( 1, &readfds, &writefds, &exceptfds, NULL );
+ sock_err = this->GetLastError();
+
+ switch ( err )
+ {
+ case 0:
+ TRACE( "CThreadedSocket::SocketThread Unhandled Timeout Event !\n");
+ break;
+ case SOCKET_ERROR:
+
+ TRACE( "CThreadedSocket::SocketReader( select error thread_id =( 0x%x) )\n",reader_id);
+ if ( (sock_err != WSAENOTSOCK ) && ( m_hSocket != INVALID_SOCKET )) // could be Invalid if close when in select
+ Close();
+ h_reader = NULL;
+ return sock_err;
+ break;
+ default:
+
+
+ if ( FD_ISSET(m_hSocket, &readfds) )
+ {
+ readcount = 0;
+ IOCtl( FIONREAD, &readcount );
+ if ( listen_mode )
+ OnAccept( sock_err );
+ else if ( ! readcount )
+ OnClose( sock_err );
+ else OnReceive(sock_err);
+ }
+
+ if ( FD_ISSET(m_hSocket, &writefds) )
+ {
+ if ( connect_pending )
+ {
+ OnConnect( sock_err );
+ connect_pending = false;
+ }
+ }
+
+ if ( FD_ISSET(m_hSocket, &exceptfds) )
+ OnOutOfBandData( sock_err );
+
+
+ break;
+ }
+ }
+ if (m_hSocket != INVALID_SOCKET)
+ Close();
+ h_reader = NULL;
+ TRACE( "CThreadedSocket::SocketReader( END thread_id =( 0x%x) )\n",reader_id);
+
+ return 0;
+}
+
+UINT CThreadedSocket::SocketWriter( )
+{
+ int err;
+ int sock_err;
+ fd_set writefds;
+
+ TRACE( "CThreadedSocket::SocketWriter( START thread_id =( 0x%x) )\n",writer_id);
+
+ FD_ZERO(&writefds);
+
+
+ while ( m_hSocket != INVALID_SOCKET )
+ {
+ DWORD dwWaitResult;
+
+ // Wait for message to send
+
+ dwWaitResult = WaitForSingleObject( send_count, INFINITE );
+
+ switch (dwWaitResult) {
+
+ // The semaphore object was signaled.
+ case WAIT_OBJECT_0:
+ // OK to really send the message.
+ break;
+
+ // Semaphore was nonsignaled, so a time-out occurred.
+ case WAIT_TIMEOUT:
+ case WAIT_FAILED:
+ default:
+ TRACE( "CThreadedSocket::SocketWriter( WaitForSingleObject error thread_id =( %d) )\n",writer_id);
+ if (m_hSocket != INVALID_SOCKET)
+ Close();
+ h_writer = NULL;
+ return this->GetLastError();
+ break;
+ }
+ if (m_hSocket != INVALID_SOCKET)
+ {
+ FD_SET(m_hSocket, &writefds);
+
+ err = select( 1, NULL, &writefds, NULL, NULL );
+ sock_err = this->GetLastError();
+
+ switch ( err )
+ {
+ case 0:
+ TRACE( "CThreadedSocket::SocketThread Unhandled Timeout Event !\n");
+ break;
+ case SOCKET_ERROR:
+ TRACE( "CThreadedSocketException( select )\n");
+ if ( (sock_err != WSAENOTSOCK ) && ( m_hSocket != INVALID_SOCKET )) // could be Invalid if close when in select
+ Close();
+ h_reader = NULL;
+ return sock_err;
+ break;
+ default:
+
+ if ( FD_ISSET(m_hSocket, &writefds) )
+ {
+ OnSend( sock_err );
+ }
+
+ break;
+ }
+ }
+ }
+ Close();
+ h_writer = NULL;
+ TRACE( "CThreadedSocket::SocketWriter( END thread_id =( 0x%x) )\n",writer_id);
+
+ return 0;
+}