From 6ca781b8a38474ab428d5fcb3b489dfe3e974334 Mon Sep 17 00:00:00 2001 From: bustico Date: Fri, 21 Mar 2008 09:03:34 +0000 Subject: - numerous fixes - socket in non blocking mode (resolve some deadlock, and agent are immune to another agent beeing blocked) --- src/ivysocket.c | 323 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 185 insertions(+), 138 deletions(-) (limited to 'src/ivysocket.c') diff --git a/src/ivysocket.c b/src/ivysocket.c index f9f5bd1..74bd15e 100644 --- a/src/ivysocket.c +++ b/src/ivysocket.c @@ -26,7 +26,7 @@ #include #include #include -//#include +#include #ifdef WIN32 #define close closesocket @@ -42,15 +42,15 @@ #include #endif +#include "param.h" #include "list.h" #include "ivychannel.h" #include "ivysocket.h" #include "ivyloop.h" #include "ivybuffer.h" +#include "ivyfifo.h" #include "ivydebug.h" -#define BUFFER_SIZE 4096 /* taille buffer initiale on multiple pas deux a chaque realloc */ - struct _server { Server next; HANDLE fd; @@ -58,6 +58,7 @@ struct _server { unsigned short port; void *(*create)(Client client); void (*handle_delete)(Client client, void *data); + void (*handle_decongestion)(Client client, void *data); SocketInterpretation interpretation; }; @@ -66,14 +67,18 @@ struct _client { HANDLE fd; Channel channel; unsigned short port; + char app_uuid[128]; struct sockaddr_in from; SocketInterpretation interpretation; void (*handle_delete)(Client client, void *data); + void (*handle_decongestion)(Client client, void *data); char terminator; /* character delimiter of the message */ /* Buffer de reception */ long buffer_size; char *buffer; /* dynamicaly reallocated */ char *ptr; + /* Buffer d'emission */ + IvyFifoBuffer *ifb; /* le buffer circulaire en cas de congestion */ /* user data */ void *data; #ifdef OPENMP @@ -90,6 +95,10 @@ static int debug_send = 0; WSADATA WsaData; #endif + +static SendState BufferizedSocketSendRaw (const Client client, const char *buffer, const int len ); + + void SocketInit() { if ( getenv( "IVY_DEBUG_SEND" )) debug_send = 1; @@ -106,10 +115,15 @@ static void DeleteSocket(void *data) #ifdef OPENMP omp_destroy_lock (&(client->fdLock)); #endif - + if (client->ifb != NULL) { + IvyFifoDelete (client->ifb); + client->ifb = NULL; + } + IVY_LIST_REMOVE (clients_list, client ); } + static void DeleteServerSocket(void *data) { Server server = (Server )data; @@ -121,6 +135,8 @@ static void DeleteServerSocket(void *data) close (server->fd ); IVY_LIST_REMOVE (servers_list, server); } + + static void HandleSocket (Channel channel, HANDLE fd, void *data) { Client client = (Client)data; @@ -171,7 +187,7 @@ static void HandleSocket (Channel channel, HANDLE fd, void *data) if (ptr < client->ptr ) { /* recopie ligne incomplete au debut du buffer */ len = client->ptr - ptr; - memcpy (client->buffer, ptr, len ); + memmove (client->buffer, ptr, len ); client->ptr = client->buffer + len; } else @@ -180,6 +196,25 @@ static void HandleSocket (Channel channel, HANDLE fd, void *data) } } + + +static void HandleCongestionWrite (Channel channel, HANDLE fd, void *data) +{ + Client client = (Client)data; + + if (IvyFifoSendSocket (client->ifb, fd) == 0) { + // Not congestionned anymore + IvyChannelClearWritableEvent (channel); + // printf ("DBG> Socket *DE*congestionnee\n"); + IvyFifoDelete (client->ifb); + client->ifb = NULL; + if (client->handle_decongestion ) + (*client->handle_decongestion) (client, client->data ); + + } +} + + static void HandleServer(Channel channel, HANDLE fd, void *data) { Server server = (Server ) data; @@ -187,7 +222,7 @@ static void HandleServer(Channel channel, HANDLE fd, void *data) HANDLE ns; socklen_t addrlen; struct sockaddr_in remote2; - // long socketFlag; + long socketFlag; TRACE( "Accepting Connection...\n"); @@ -202,7 +237,7 @@ static void HandleServer(Channel channel, HANDLE fd, void *data) IVY_LIST_ADD_START (clients_list, client ); - client->buffer_size = BUFFER_SIZE; + client->buffer_size = IVY_BUFFER_SIZE; client->buffer = malloc( client->buffer_size ); if (!client->buffer ) { @@ -212,17 +247,21 @@ static void HandleServer(Channel channel, HANDLE fd, void *data) client->terminator = '\n'; client->from = remote2; client->fd = ns; + client->ifb = NULL; + strcpy (client->app_uuid, "init by HandleServer"); -/* socketFlag = fcntl (client->fd, F_GETFD); */ -/* if (fcntl (client->fd, F_SETFD, socketFlag|O_NONBLOCK)) { */ -/* fprintf(stderr,"Warning : Setting socket in nonblock mode FAILED\n"); */ -/* } */ + socketFlag = fcntl (client->fd, F_GETFD); + if (fcntl (client->fd, F_SETFD, socketFlag|O_NONBLOCK)) { + fprintf(stderr,"Warning : Setting socket in nonblock mode FAILED\n"); + } - client->channel = IvyChannelAdd (ns, client, DeleteSocket, HandleSocket ); + client->channel = IvyChannelAdd (ns, client, DeleteSocket, HandleSocket, + HandleCongestionWrite); client->interpretation = server->interpretation; client->ptr = client->buffer; client->handle_delete = server->handle_delete; + client->handle_decongestion = server->handle_decongestion; client->data = (*server->create) (client ); #ifdef OPENMP omp_init_lock (&(client->fdLock)); @@ -236,6 +275,7 @@ static void HandleServer(Channel channel, HANDLE fd, void *data) Server SocketServer(unsigned short port, void*(*create)(Client client), void(*handle_delete)(Client client, void *data), + void(*handle_decongestion)(Client client, void *data), void(*interpretation) (Client client, void *data, char *ligne)) { Server server; @@ -255,24 +295,27 @@ Server SocketServer(unsigned short port, local.sin_addr.s_addr = INADDR_ANY; local.sin_port = htons (port); + if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&one,sizeof(one)) < 0) - { + { #ifdef WIN32 - fprintf(stderr," setsockopt %d\n",WSAGetLastError()); + fprintf(stderr," setsockopt %d\n",WSAGetLastError()); #endif - perror ("*** set socket option SO_REUSEADDR ***"); - exit(0); - } + perror ("*** set socket option SO_REUSEADDR ***"); + exit(0); + } #ifdef SO_REUSEPORT if (setsockopt (fd, SOL_SOCKET, SO_REUSEPORT, (char *)&one, sizeof (one)) < 0) - { - perror ("*** set socket option REUSEPORT ***"); - exit(0); - } + { + perror ("*** set socket option REUSEPORT ***"); + exit(0); + } #endif + + if (bind(fd, (struct sockaddr *)&local, sizeof(local)) < 0) { perror ("*** bind ***"); @@ -294,9 +337,11 @@ Server SocketServer(unsigned short port, IVY_LIST_ADD_START (servers_list, server ); server->fd = fd; - server->channel = IvyChannelAdd (fd, server, DeleteServerSocket, HandleServer ); + server->channel = IvyChannelAdd (fd, server, DeleteServerSocket, + HandleServer, NULL); server->create = create; server->handle_delete = handle_delete; + server->handle_decongestion = handle_decongestion; server->interpretation = interpretation; server->port = ntohs(local.sin_port); IVY_LIST_ADD_END (servers_list, server ); @@ -359,136 +404,111 @@ void SocketClose (Client client ) IvyChannelRemove (client->channel ); } -int SocketSendRaw (const Client client, const char *buffer, const int len ) +SendState SocketSendRaw (const Client client, const char *buffer, const int len ) { - int err; - int waiting= 0; - - if (!client) - return waiting; - + SendState state; + + if (!client) + return SendParamError; + #ifdef OPENMP - omp_set_lock (&(client->fdLock)); + omp_set_lock (&(client->fdLock)); #endif + + state = BufferizedSocketSendRaw (client, buffer, len); - if ( debug_send ) - { - /* try to determine if we are going to block */ - fd_set wrset; - int ready; - struct timeval timeout; - timeout.tv_usec = 0; - timeout.tv_sec = 0; - FD_ZERO(&wrset); - FD_SET( client->fd, &wrset ); - ready = select(client->fd+1, 0, &wrset, 0, &timeout); - /* fprintf(stderr,"Ivy: select ready=%d fd=%d\n",ready,FD_ISSET( client->fd, &wrset ) ); */ - if(ready < 0) { - perror("Ivy: SocketSendRaw select"); - } - if ( !FD_ISSET( client->fd, &wrset ) ) - { - fprintf(stderr,"Ivy: Client Queue Full Waiting.........."); - waiting = 1; - } - - } - - err = send (client->fd, buffer, len, 0 ); - if (err != len ) - perror ("*** send ***"); - if ( debug_send && waiting ) - { - fprintf(stderr,"... OK\n"); - } #ifdef OPENMP - omp_unset_lock (&(client->fdLock)); + omp_unset_lock (&(client->fdLock)); #endif - // GROS DEBUG -/* { */ -/* char *toPrint = strndup (buffer, len); */ -/* printf ("DBG> SocketSendRaw [%d] -> '%s'\n", len, toPrint); */ -/* free (toPrint); */ -/* } */ - // END DEBUG - - return waiting; + + return state; } -int SocketSendRawWithId( const Client client, const char *id, const char *buffer, const int len ) + + + +static SendState BufferizedSocketSendRaw (const Client client, const char *buffer, const int len ) { - int err; - int waiting= 0; + ssize_t reallySent; + SendState state; + + if (client->ifb != NULL) { + // Socket en congestion : on rajoute juste le flux dans le buffer, + // quand la socket sera dispo en ecriture, le select appellera la callback + // pour vider ce buffer + IvyFifoWrite (client->ifb, buffer, len); + state = IvyFifoIsFull (client->ifb) ? SendStateFifoFull : SendStillCongestion; + } else { + // on tente d'ecrire direct dans la socket + reallySent = send (client->fd, buffer, len, MSG_DONTWAIT); + if (reallySent == len) { + state = SendOk; // PAS CONGESTIONNEE + } else if (reallySent == -1) { + state = SendError; // ERREUR + } else { + // socket congestionnée + // on initialise une fifo pour accumuler les données + client->ifb = IvyFifoNew (); + IvyFifoWrite (client->ifb, &(buffer[reallySent]), len-reallySent); + // on ajoute un fdset pour que le select appelle une callback pour vider + // le buffer quand la socket sera à nouveau libre + IvyChannelAddWritableEvent (client->channel); + state = SendStateChangeToCongestion; + } + } + return (state); +} - if (!client) - return waiting; - -#ifdef OPENMP - omp_set_lock (&(client->fdLock)); -#endif - if ( debug_send ) - { - /* try to determine if we are going to block */ - fd_set wrset; - int ready; - struct timeval timeout; - timeout.tv_usec = 0; - timeout.tv_sec = 0; - FD_ZERO(&wrset); - FD_SET( client->fd, &wrset ); - ready = select(client->fd+1, 0, &wrset, 0, &timeout); - /* fprintf(stderr,"Ivy: select ready=%d fd=%d\n",ready,FD_ISSET( client->fd, &wrset ) ); */ - if(ready < 0) { - perror("Ivy: SocketSendRaw select"); - } - if ( !FD_ISSET( client->fd, &wrset ) ) - { - fprintf(stderr,"Ivy: Client Queue Full Waiting.........."); - waiting = 1; - } - } +SendState SocketSendRawWithId( const Client client, const char *id, const char *buffer, const int len ) +{ + SendState s1, s2; + +#ifdef OPENMP + omp_set_lock (&(client->fdLock)); +#endif + + s1 = BufferizedSocketSendRaw (client, id, strlen (id)); - send (client->fd, id, strlen (id), 0 ); - err = send (client->fd, buffer, len, 0 ); - if (err != len ) - perror ("*** send ***"); - if ( debug_send && waiting ) - { - fprintf(stderr,"... OK\n"); - } + s2 = BufferizedSocketSendRaw (client, buffer, len); + #ifdef OPENMP - omp_unset_lock (&(client->fdLock)); + omp_unset_lock (&(client->fdLock)); #endif + + if (s1 == SendStateChangeToCongestion) { + // si le passage en congestion s'est fait sur l'envoi de l'id + s2 = s1; + } - return waiting; + return (s2); } + void SocketSetData (Client client, void *data ) { - if (client) - client->data = data; + if (client) { + client->data = data; + } } -int SocketSend (Client client, char *fmt, ... ) +SendState SocketSend (Client client, char *fmt, ... ) { - int waiting = 0; - static IvyBuffer buffer = {NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */ + SendState state; + static IvyBuffer buffer = {NULL, 0, 0 }; /* Use static mem to eliminate multiple call to malloc /free */ #ifdef OPENMP #pragma omp threadprivate (buffer) #endif - va_list ap; - int len; - if (!client) - return waiting; - va_start (ap, fmt ); - buffer.offset = 0; - len = make_message (&buffer, fmt, ap ); - waiting = SocketSendRaw (client, buffer.data, len ); - va_end (ap ); - return waiting; + va_list ap; + int len; + va_start (ap, fmt ); + buffer.offset = 0; + len = make_message (&buffer, fmt, ap ); + state = SocketSendRaw (client, buffer.data, len ); + va_end (ap ); + return state; } void *SocketGetData (Client client ) @@ -499,7 +519,7 @@ void *SocketGetData (Client client ) void SocketBroadcast ( char *fmt, ... ) { Client client; - static IvyBuffer buffer = {NULL, 0, 0 }; /* Use satic mem to eliminate + static IvyBuffer buffer = {NULL, 0, 0 }; /* Use static mem to eliminate multiple call to malloc /free */ #ifdef OPENMP #pragma omp threadprivate (buffer) @@ -522,7 +542,8 @@ Ouverture d'un canal TCP/IP en mode client Client SocketConnect (char * host, unsigned short port, void *data, SocketInterpretation interpretation, - void (*handle_delete)(Client client, void *data) + void (*handle_delete)(Client client, void *data), + void(*handle_decongestion)(Client client, void *data) ) { struct hostent *rhost; @@ -532,19 +553,20 @@ Client SocketConnect (char * host, unsigned short port, return NULL; } return SocketConnectAddr ((struct in_addr*)(rhost->h_addr), port, data, - interpretation, handle_delete); + interpretation, handle_delete, handle_decongestion); } Client SocketConnectAddr (struct in_addr * addr, unsigned short port, void *data, SocketInterpretation interpretation, - void (*handle_delete)(Client client, void *data) + void (*handle_delete)(Client client, void *data), + void(*handle_decongestion)(Client client, void *data) ) { HANDLE handle; Client client; struct sockaddr_in remote; - // long socketFlag; + long socketFlag; remote.sin_family = AF_INET; remote.sin_addr = *addr; @@ -559,14 +581,14 @@ Client SocketConnectAddr (struct in_addr * addr, unsigned short port, perror ("*** client connect ***"); return NULL; }; -/* socketFlag = fcntl (handle, F_GETFD); */ -/* if (fcntl (handle, F_SETFD, socketFlag|O_NONBLOCK)) { */ -/* fprintf(stderr,"Warning : Setting socket in nonblock mode FAILED\n"); */ -/* } */ + socketFlag = fcntl (handle, F_GETFD); + if (fcntl (handle, F_SETFD, socketFlag|O_NONBLOCK)) { + fprintf(stderr,"Warning : Setting socket in nonblock mode FAILED\n"); + } IVY_LIST_ADD_START(clients_list, client ); - client->buffer_size = BUFFER_SIZE; + client->buffer_size = IVY_BUFFER_SIZE; client->buffer = malloc( client->buffer_size ); if (!client->buffer ) { @@ -575,14 +597,19 @@ Client SocketConnectAddr (struct in_addr * addr, unsigned short port, } client->terminator = '\n'; client->fd = handle; - client->channel = IvyChannelAdd (handle, client, DeleteSocket, HandleSocket ); + client->channel = IvyChannelAdd (handle, client, DeleteSocket, + HandleSocket, HandleCongestionWrite ); client->interpretation = interpretation; client->ptr = client->buffer; client->data = data; client->handle_delete = handle_delete; + client->handle_decongestion = handle_decongestion; client->from.sin_family = AF_INET; client->from.sin_addr = *addr; client->from.sin_port = htons (port); + client->ifb = NULL; + strcpy (client->app_uuid, "init by SocketConnectAddr"); + #ifdef OPENMP omp_init_lock (&(client->fdLock)); @@ -696,7 +723,7 @@ Client SocketBroadcastCreate (unsigned short port, IVY_LIST_ADD_START(clients_list, client ); - client->buffer_size = BUFFER_SIZE; + client->buffer_size = IVY_BUFFER_SIZE; client->buffer = malloc( client->buffer_size ); if (!client->buffer ) { @@ -705,10 +732,14 @@ Client SocketBroadcastCreate (unsigned short port, } client->terminator = '\n'; client->fd = handle; - client->channel = IvyChannelAdd (handle, client, DeleteSocket, HandleSocket ); + client->channel = IvyChannelAdd (handle, client, DeleteSocket, + HandleSocket, HandleCongestionWrite); client->interpretation = interpretation; client->ptr = client->buffer; client->data = data; + client->ifb = NULL; + strcpy (client->app_uuid, "init by SocketBroadcastCreate"); + #ifdef OPENMP omp_init_lock (&(client->fdLock)); #endif @@ -780,3 +811,19 @@ Multicast datagrams with initial TTL 255 are unrestricted in scope. return 1; } +extern void SocketSetUuid (Client client, const char *uuid) +{ + strncpy (client->app_uuid, uuid, sizeof (client->app_uuid)); +} + +const char* SocketGetUuid (const Client client) +{ + return client->app_uuid; +} + +extern int SocketCmpUuid (const Client c1, const Client c2) +{ + return strncmp (c1->app_uuid, c2->app_uuid, sizeof (c1->app_uuid)); +} + + -- cgit v1.1