// ThreadedSocket.cpp: implementation of the CThreadedSocket class. // ////////////////////////////////////////////////////////////////////// #include "IvyStdAfx.h" #include "ThreadedSocket.h" ////////////////////////////////////////////////////////////////////// // Construction/Destruction ////////////////////////////////////////////////////////////////////// CThreadedSocket::CThreadedSocket() { m_hSocket = INVALID_SOCKET; h_reader = NULL; h_writer = NULL; reader_id = 0; writer_id = 0; listen_mode = false; connect_pending = true; send_pending = false; } CThreadedSocket::~CThreadedSocket() { if (m_hSocket != INVALID_SOCKET) Close(); } int CThreadedSocket::Create(UINT nSocketPort, int nSocketType, const char * lpszSocketAddress) { if ( Socket(nSocketType) < 0 ) return SOCKET_ERROR; send_count = CreateSemaphore( NULL, 0, 100, NULL); // unnamed semaphore if (send_count == NULL) return SOCKET_ERROR; return Bind(nSocketPort,lpszSocketAddress); } ///////////////////////////////////////////////////////////////////////////// // CThreadedSocket Attributes void CThreadedSocket::GetPeerName(ivy::string & rPeerAddress, UINT& rPeerPort) { SOCKADDR_IN sockAddr; memset(&sockAddr, 0, sizeof(sockAddr)); int nSockAddrLen = sizeof(sockAddr); GetPeerName((SOCKADDR*)&sockAddr, &nSockAddrLen); rPeerPort = ntohs(sockAddr.sin_port); rPeerAddress = inet_ntoa(sockAddr.sin_addr); } void CThreadedSocket::GetSockName(ivy::string & rSocketAddress, UINT& rSocketPort) { SOCKADDR_IN sockAddr; memset(&sockAddr, 0, sizeof(sockAddr)); int nSockAddrLen = sizeof(sockAddr); GetSockName((SOCKADDR*)&sockAddr, &nSockAddrLen); rSocketPort = ntohs(sockAddr.sin_port); rSocketAddress = inet_ntoa(sockAddr.sin_addr); } unsigned short int CThreadedSocket::GetLocalPort() { int err; struct sockaddr_in name; socklen_t len = sizeof(name); err = getsockname (m_hSocket, (struct sockaddr *)&name, &len ); if (err < 0 ) return 0; return name.sin_port; } unsigned short int CThreadedSocket::GetRemotePort() { int err; struct sockaddr_in name; socklen_t len = sizeof(name); err = getpeername (m_hSocket, (struct sockaddr *)&name, &len ); if (err < 0 ) return 0; return name.sin_port; } ///////////////////////////////////////////////////////////////////////////// // CAscynSocket Operations SOCKET CThreadedSocket::Accept(CThreadedSocket& rConnectedSocket, SOCKADDR* lpSockAddr, int* lpSockAddrLen) { ASSERT(rConnectedSocket.m_hSocket == INVALID_SOCKET); SOCKET hTemp = accept(m_hSocket, lpSockAddr, lpSockAddrLen); if (hTemp == INVALID_SOCKET) { rConnectedSocket.m_hSocket = INVALID_SOCKET; return SOCKET_ERROR; } else { rConnectedSocket.m_hSocket = hTemp; rConnectedSocket.send_count = CreateSemaphore( NULL, 0, 200, NULL); // unnamed semaphore if (rConnectedSocket.send_count == NULL) return SOCKET_ERROR; rConnectedSocket.StartListener(); return hTemp; } } int CThreadedSocket::Bind(UINT nSocketPort, const char * lpszSocketAddress) { SOCKADDR_IN sockAddr; memset(&sockAddr,0,sizeof(sockAddr)); sockAddr.sin_family = AF_INET; if (lpszSocketAddress == NULL) sockAddr.sin_addr.s_addr = htonl(INADDR_ANY); else { DWORD lResult = inet_addr(lpszSocketAddress); if (lResult == INADDR_NONE) { WSASetLastError(WSAEINVAL); return SOCKET_ERROR; } sockAddr.sin_addr.s_addr = lResult; } sockAddr.sin_port = htons((u_short)nSocketPort); return Bind((SOCKADDR*)&sockAddr, sizeof(sockAddr)); } void CThreadedSocket::Close() { long PreviousCount = 0; int err; DWORD currentThreadId; if (m_hSocket != INVALID_SOCKET) { currentThreadId = GetCurrentThreadId(); //ASSERT(SOCKET_ERROR != closesocket(m_hSocket)); SOCKET temp = m_hSocket; // Thread ACK m_hSocket = INVALID_SOCKET; TRACE( "CThreadedSocket::Close (current=0x%0lx) (reader=0x%0lx) (writer=0x%0lx)\n", currentThreadId, reader_id, writer_id ); closesocket(temp); // close silently //if ( thread ) // On fait de l'auto delete mais dans le cas de terminaison anormale l'object reste ????!!! // delete thread; if ( h_reader && currentThreadId != reader_id ) { TRACE("CThreadedSocket waiting for thread end (reader=0x%0lx)...\n",reader_id); DWORD res = WaitForSingleObject( h_reader, INFINITE ); if ( res != WAIT_OBJECT_0 ) { TRACE("CThreadedSocket waiting for thread end (reader=0x%0lx) error 0x%0lx lasterror 0x%0lx\n",reader_id,res, ::GetLastError()); } } // wake up writer if ( h_writer && currentThreadId != writer_id ) { TRACE("CThreadedSocket waiting for thread end (writer=0x%0lx)...\n",writer_id); err = ReleaseSemaphore(send_count, 1, &PreviousCount); DWORD res = WaitForSingleObject( h_writer, INFINITE ); if ( res != WAIT_OBJECT_0 ) { TRACE("CThreadedSocket waiting for thread end (writer=0x%0lx) error 0x%0lx lasterror 0x%0lx\n",writer_id,res, ::GetLastError()); } } TRACE("CThreadedSocket all thread ended (reader=0x%0lx) (writer=0x%0lx)\n", reader_id, writer_id); } } int CThreadedSocket::Listen(int nConnectionBacklog) { int err = listen(m_hSocket, nConnectionBacklog); if ( !err ) listen_mode = true; return StartListener(); } int CThreadedSocket::Connect(const SOCKADDR* lpSockAddr, int nSockAddrLen) { int ok; fd_set writefds; FD_ZERO(&writefds); FD_SET(m_hSocket, &writefds); ok = connect(m_hSocket, lpSockAddr, nSockAddrLen); if ( ok != 0 ) { int err = this->GetLastError(); if ( err == 0 ) return err; TRACE( "***************************************Error connect %d\n", err); if ( err != WSAEWOULDBLOCK ) return -1; // Wait for connection to complete TRACE( "***************************************Waiting for connection to complete\n"); FD_SET(m_hSocket, &writefds); err = select( 1, NULL, &writefds, NULL, NULL ); //TRACE( "Thread socket %d wait return %d\n", m_hSocket, err ); if ( !err ) connect_pending = true; return err; } connect_pending = true; return StartListener(); } int CThreadedSocket::Connect(const char * lpszHostAddress, UINT nHostPort) { ASSERT(lpszHostAddress != NULL); SOCKADDR_IN sockAddr; memset(&sockAddr,0,sizeof(sockAddr)); sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = inet_addr(lpszHostAddress); if (sockAddr.sin_addr.s_addr == INADDR_NONE) { LPHOSTENT lphost; lphost = gethostbyname(lpszHostAddress); if (lphost != NULL) sockAddr.sin_addr.s_addr = ((LPIN_ADDR)lphost->h_addr)->s_addr; else { WSASetLastError(WSAEINVAL); return SOCKET_ERROR; } } sockAddr.sin_port = htons((u_short)nHostPort); return Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr)); } size_t CThreadedSocket::Receive(void* lpBuf, size_t nBufLen, int nFlags) { int lg = recv(m_hSocket, (LPSTR)lpBuf, (int)nBufLen, nFlags); return lg; } size_t CThreadedSocket::ReceiveFrom(void* lpBuf, size_t nBufLen, ivy::string & rSocketAddress, UINT& rSocketPort, int nFlags) { SOCKADDR_IN sockAddr; memset(&sockAddr, 0, sizeof(sockAddr)); size_t nSockAddrLen = sizeof(sockAddr); size_t nResult = ReceiveFrom(lpBuf, nBufLen, (SOCKADDR*)&sockAddr, &nSockAddrLen, nFlags); rSocketPort = ntohs(sockAddr.sin_port); rSocketAddress = inet_ntoa(sockAddr.sin_addr); return nResult; } size_t CThreadedSocket::Send(const void* lpBuf, size_t nBufLen, int nFlags) { return send(m_hSocket, (LPSTR)lpBuf, (int)nBufLen, nFlags); } size_t CThreadedSocket::SendTo(const void* lpBuf, size_t nBufLen, UINT nHostPort, const char * lpszHostAddress, int nFlags) { SOCKADDR_IN sockAddr; memset(&sockAddr,0,sizeof(sockAddr)); sockAddr.sin_family = AF_INET; if (lpszHostAddress == NULL) sockAddr.sin_addr.s_addr = htonl(INADDR_BROADCAST); else { sockAddr.sin_addr.s_addr = inet_addr(lpszHostAddress); if (sockAddr.sin_addr.s_addr == INADDR_NONE) { LPHOSTENT lphost; lphost = gethostbyname(lpszHostAddress); if (lphost != NULL) sockAddr.sin_addr.s_addr = ((LPIN_ADDR)lphost->h_addr)->s_addr; else { WSASetLastError(WSAEINVAL); return SOCKET_ERROR; } } } sockAddr.sin_port = htons((u_short)nHostPort); return SendTo(lpBuf, nBufLen, (SOCKADDR*)&sockAddr, sizeof(sockAddr), nFlags); } int CThreadedSocket::AddMember( const char * lpszHostAddress ) { int multicast_ttl = 64; // region struct ip_mreq imr; TRACE("CThreadedSocket::AddMember adding %s\n", lpszHostAddress ); imr.imr_multiaddr.s_addr = INADDR_ANY; imr.imr_interface.s_addr = INADDR_ANY; if (lpszHostAddress ) { imr.imr_multiaddr.s_addr = inet_addr(lpszHostAddress); if (imr.imr_multiaddr.s_addr == INADDR_NONE) { LPHOSTENT lphost; lphost = gethostbyname(lpszHostAddress); if (lphost != NULL) imr.imr_multiaddr.s_addr = ((LPIN_ADDR)lphost->h_addr)->s_addr; else { WSASetLastError(WSAEINVAL); return SOCKET_ERROR; } } } if ( IN_CLASSD( htonl(imr.imr_multiaddr.s_addr) ) ) { int err; err = SetSockOpt( IP_MULTICAST_TTL, &multicast_ttl, sizeof( multicast_ttl ), IPPROTO_IP ); if ( err != 0 ) { TRACE("CThreadedSocket::AddMember IP_MULTICAST_TTL error %d\n", GetLastError()); return err; } err = SetSockOpt( IP_ADD_MEMBERSHIP, &imr, sizeof( imr ), IPPROTO_IP ); if ( err != 0 ) { TRACE("CThreadedSocket::AddMember IP_ADD_MEMBERSHIP error %d\n", GetLastError()); return err; } } return 0; } ///////////////////////////////////////////////////////////////////////////// // CThreadedSocket Overridable callbacks void CThreadedSocket::OnReceive(int /*nErrorCode*/) { } void CThreadedSocket::OnSend(int /*nErrorCode*/) { } void CThreadedSocket::OnOutOfBandData(int /*nErrorCode*/) { } void CThreadedSocket::OnAccept(int /*nErrorCode*/) { } void CThreadedSocket::OnConnect(int /*nErrorCode*/) { } void CThreadedSocket::OnClose(int /*nErrorCode*/) { } ///////////////////////////////////////////////////////////////////////////// // CThreadedSocket Implementation SOCKET CThreadedSocket::Socket(int nSocketType, int nProtocolType, int nAddressFormat) { ASSERT(m_hSocket == INVALID_SOCKET); m_hSocket = socket(nAddressFormat,nSocketType,nProtocolType); return m_hSocket; //return StartListener(); } int CThreadedSocket::StartListener() { h_reader = CreateThread(NULL,0,SocketReaderProc, this, 0, &reader_id); return h_reader != NULL ? 0 : SOCKET_ERROR; } int CThreadedSocket::StartWriter() { h_writer = CreateThread(NULL,0,SocketWriterProc, this, 0, &writer_id); return h_writer != NULL ? 0 : SOCKET_ERROR; } bool CThreadedSocket::SignalWriter() { long PreviousCount = 0; BOOL err; err = ReleaseSemaphore(send_count, 1, &PreviousCount); // TRACE("CThreadedSocket::SignalWriter() PreviousCount = %ld \n", PreviousCount ); return (err != 0); } // Implementation DWORD WINAPI CThreadedSocket::SocketReaderProc( LPVOID pParam ) { CThreadedSocket* pObject = (CThreadedSocket*)pParam; if (pObject == NULL ) { return SOCKET_ERROR; // if pObject is not valid } return pObject->SocketReader(); } // Implementation DWORD WINAPI CThreadedSocket::SocketWriterProc( LPVOID pParam ) { CThreadedSocket* pObject = (CThreadedSocket*)pParam; if (pObject == NULL ) { return SOCKET_ERROR; // if pObject is not valid } return pObject->SocketWriter(); } UINT CThreadedSocket::SocketReader( ) { int err; int sock_err; unsigned long readcount = 0; fd_set readfds; fd_set writefds; fd_set exceptfds; TRACE( "CThreadedSocket::SocketReader( START thread_id =( 0x%x) )\n",reader_id); FD_ZERO(&readfds); FD_ZERO(&writefds); FD_ZERO(&exceptfds); while ( m_hSocket != INVALID_SOCKET ) { FD_SET(m_hSocket, &readfds); if ( connect_pending ) FD_SET(m_hSocket, &writefds); else FD_CLR(m_hSocket, &writefds); FD_SET(m_hSocket, &exceptfds); err = select( 1, &readfds, &writefds, &exceptfds, NULL ); if (m_hSocket == INVALID_SOCKET) { TRACE( "CThreadedSocket::SocketThread no more Socket *********** !thread_id =( 0x%x) \n",reader_id); return -1; } sock_err = this->GetLastError(); switch ( err ) { case 0: TRACE( "CThreadedSocket::SocketThread Unhandled Timeout Event !\n"); break; case SOCKET_ERROR: TRACE( "CThreadedSocket::SocketReader( select error thread_id =( 0x%x) )\n",reader_id); if ( (sock_err != WSAENOTSOCK ) && ( m_hSocket != INVALID_SOCKET )) // could be Invalid if close when in select Close(); h_reader = NULL; return sock_err; break; default: if ( FD_ISSET(m_hSocket, &readfds) ) { readcount = 0; IOCtl( FIONREAD, &readcount ); if ( listen_mode ) OnAccept( sock_err ); else if ( ! readcount ) OnClose( sock_err ); else OnReceive(sock_err); } if ( FD_ISSET(m_hSocket, &writefds) ) { if ( connect_pending ) { OnConnect( sock_err ); connect_pending = false; } } if ( FD_ISSET(m_hSocket, &exceptfds) ) OnOutOfBandData( sock_err ); break; } } if (m_hSocket != INVALID_SOCKET) Close(); h_reader = NULL; TRACE( "CThreadedSocket::SocketReader( END thread_id =( 0x%x) )\n",reader_id); return 0; } UINT CThreadedSocket::SocketWriter( ) { int err; int sock_err; fd_set writefds; TRACE( "CThreadedSocket::SocketWriter( START thread_id =( 0x%x) )\n",writer_id); FD_ZERO(&writefds); while ( m_hSocket != INVALID_SOCKET ) { DWORD dwWaitResult; // Wait for message to send dwWaitResult = WaitForSingleObject( send_count, INFINITE ); switch (dwWaitResult) { // The semaphore object was signaled. case WAIT_OBJECT_0: // OK to really send the message. break; // Semaphore was nonsignaled, so a time-out occurred. case WAIT_TIMEOUT: case WAIT_FAILED: default: TRACE( "CThreadedSocket::SocketWriter( WaitForSingleObject error thread_id =( %d) )\n",writer_id); if (m_hSocket != INVALID_SOCKET) Close(); h_writer = NULL; return this->GetLastError(); break; } if (m_hSocket != INVALID_SOCKET) { FD_SET(m_hSocket, &writefds); err = select( 1, NULL, &writefds, NULL, NULL ); sock_err = this->GetLastError(); switch ( err ) { case 0: TRACE( "CThreadedSocket::SocketThread Unhandled Timeout Event !\n"); break; case SOCKET_ERROR: TRACE( "CThreadedSocketException( select )\n"); if ( (sock_err != WSAENOTSOCK ) && ( m_hSocket != INVALID_SOCKET )) // could be Invalid if close when in select Close(); h_reader = NULL; return sock_err; break; default: if ( FD_ISSET(m_hSocket, &writefds) ) { OnSend( sock_err ); } break; } } } Close(); h_writer = NULL; TRACE( "CThreadedSocket::SocketWriter( END thread_id =( 0x%x) )\n",writer_id); return 0; }