// BufferedSocket.cpp: implementation of the CBufferedSocket class. // ////////////////////////////////////////////////////////////////////// #include "IvyStdAfx.h" #include "BufferedSocket.h" #define BUFFER_SIZE 4096 ////////////////////////////////////////////////////////////////////// // Construction/Destruction ////////////////////////////////////////////////////////////////////// CBufferedSocket::CBufferedSocket() { separator = '\n'; buffer_size = BUFFER_SIZE; buffer = (char*)malloc( buffer_size ); current_ptr = buffer; connected = false; InitializeCriticalSection( &m_CritSection ); } CBufferedSocket::~CBufferedSocket() { free( buffer ); } void CBufferedSocket::Accept(CBufferedSocket& rConnectedSocket, SOCKADDR* lpSockAddr , int* lpSockAddrLen ) { CThreadedSocket::Accept(rConnectedSocket, lpSockAddr , lpSockAddrLen ); rConnectedSocket.connected = true; } void CBufferedSocket::OnReceive(int nErrorCode) { char *ptr_sep; char *ptr; size_t nb_to_read = 0; size_t nb; char *tmp_buf; SOCKADDR addr; size_t len = sizeof( addr ); /* limitation taille buffer */ nb_to_read = buffer_size - (current_ptr - buffer ); if( nb_to_read == 0 ) { buffer_size *= 2; /* twice old size */ tmp_buf = (char*)realloc( buffer, buffer_size ); if (!tmp_buf ) { TRACE("HandleSocket Buffer Memory Alloc Error\n"); exit(0); } buffer = tmp_buf; TRACE( "Buffer Limit reached realloc new size %ld\n", buffer_size ); nb_to_read = buffer_size - (current_ptr - buffer ); } nb = ReceiveFrom( current_ptr, nb_to_read, &addr, &len, 0/*MSG_PARTIAL*/ ); ASSERT ( memchr( current_ptr, 0, nb) == NULL ); if ( nb == SOCKET_ERROR ) { int err = this->GetLastError(); if ( err != WSAESHUTDOWN ) // shutdown by remote side ? TRACE("error Receive %d socket %d\n",this->GetLastError(),m_hSocket); Close(); return; } if ( nb == 0 ) // shutdown by remote side ? { Close(); return; } current_ptr += nb; ptr = buffer; while ((ptr_sep = (char*)memchr (ptr, separator, current_ptr - ptr ))) { *ptr_sep = '\0'; //TRACE("message %s\n", ptr ); OnReceive( ptr ); ptr = ++ptr_sep; //ASSERT( ptr < (buffer + buffer_size) ); } if (ptr < current_ptr ) { /* recopie ligne incomplete au debut du buffer */ len = current_ptr - ptr; memcpy (buffer, ptr, len ); current_ptr = buffer + len; } else { current_ptr = buffer; } } void CBufferedSocket::OnReceive( char *line ) { } void CBufferedSocket::SetSeparator( char sep ) { separator = sep; } void CBufferedSocket::OnSend( int nErrorCode ) { ivy::string msg; bool empty; // on essaye de garder la section critique a l'interieur de la boucle // pour permettre l'entrelacement avec la partie emetrice do { EnterCriticalSection( &m_CritSection ); empty = buf_out.empty(); if ( !empty ) msg = buf_out.front(); LeaveCriticalSection( &m_CritSection ); if ( !empty ) { // TRACE("CBufferedSocket::OnSend Sending buffer %s\n",msg.c_str()); size_t lg = msg.length(); size_t sent = CThreadedSocket::Send( msg.c_str(), lg ); if ( sent == lg ) { // emission correcte on enleve le msg de la file EnterCriticalSection( &m_CritSection ); buf_out.pop_front(); LeaveCriticalSection( &m_CritSection ); } else { // erreur int err = this->GetLastError(); switch ( err ){ case WSAEWOULDBLOCK: // si la file est pleine on sort en silence ! break; case WSAECONNABORTED: // broken pipe on sort en silence break; default: TRACE("CBufferedSocket::OnWakeup error %d Sending buffer %s \n",err,msg.c_str()); break; } break; } } } while ( !empty ); } void CBufferedSocket::OnConnect( int nErrorCode ) { connected = true; StartWriter(); // TRACE("CBufferedSocket::OnConnect buffer size %d\n",buf_out.size()); } void CBufferedSocket::Send ( const char * data ) { //BOOL toBeSignaled; EnterCriticalSection( &m_CritSection ); //toBeSignaled = buf_out.IsEmpty() && connected; buf_out.push_back( ivy::string(data) ); LeaveCriticalSection( &m_CritSection ); //TRACE("CBufferedSocket::Send Adding buffer to send count %d\n",buf_out.size()); // if ( connected ) // { bool ok = SignalWriter(); if ( !ok ) TRACE( "CBufferedSocket::SignalWriter Error %d\n", this->GetLastError()); // } }