summaryrefslogtreecommitdiff
path: root/src/ivysocket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ivysocket.c')
-rw-r--r--src/ivysocket.c323
1 files changed, 185 insertions, 138 deletions
diff --git a/src/ivysocket.c b/src/ivysocket.c
index f9f5bd1..74bd15e 100644
--- a/src/ivysocket.c
+++ b/src/ivysocket.c
@@ -26,7 +26,7 @@
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
-//#include <fcntl.h>
+#include <fcntl.h>
#ifdef WIN32
#define close closesocket
@@ -42,15 +42,15 @@
#include <signal.h>
#endif
+#include "param.h"
#include "list.h"
#include "ivychannel.h"
#include "ivysocket.h"
#include "ivyloop.h"
#include "ivybuffer.h"
+#include "ivyfifo.h"
#include "ivydebug.h"
-#define BUFFER_SIZE 4096 /* taille buffer initiale on multiple pas deux a chaque realloc */
-
struct _server {
Server next;
HANDLE fd;
@@ -58,6 +58,7 @@ struct _server {
unsigned short port;
void *(*create)(Client client);
void (*handle_delete)(Client client, void *data);
+ void (*handle_decongestion)(Client client, void *data);
SocketInterpretation interpretation;
};
@@ -66,14 +67,18 @@ struct _client {
HANDLE fd;
Channel channel;
unsigned short port;
+ char app_uuid[128];
struct sockaddr_in from;
SocketInterpretation interpretation;
void (*handle_delete)(Client client, void *data);
+ void (*handle_decongestion)(Client client, void *data);
char terminator; /* character delimiter of the message */
/* Buffer de reception */
long buffer_size;
char *buffer; /* dynamicaly reallocated */
char *ptr;
+ /* Buffer d'emission */
+ IvyFifoBuffer *ifb; /* le buffer circulaire en cas de congestion */
/* user data */
void *data;
#ifdef OPENMP
@@ -90,6 +95,10 @@ static int debug_send = 0;
WSADATA WsaData;
#endif
+
+static SendState BufferizedSocketSendRaw (const Client client, const char *buffer, const int len );
+
+
void SocketInit()
{
if ( getenv( "IVY_DEBUG_SEND" )) debug_send = 1;
@@ -106,10 +115,15 @@ static void DeleteSocket(void *data)
#ifdef OPENMP
omp_destroy_lock (&(client->fdLock));
#endif
-
+ if (client->ifb != NULL) {
+ IvyFifoDelete (client->ifb);
+ client->ifb = NULL;
+ }
+
IVY_LIST_REMOVE (clients_list, client );
}
+
static void DeleteServerSocket(void *data)
{
Server server = (Server )data;
@@ -121,6 +135,8 @@ static void DeleteServerSocket(void *data)
close (server->fd );
IVY_LIST_REMOVE (servers_list, server);
}
+
+
static void HandleSocket (Channel channel, HANDLE fd, void *data)
{
Client client = (Client)data;
@@ -171,7 +187,7 @@ static void HandleSocket (Channel channel, HANDLE fd, void *data)
if (ptr < client->ptr )
{ /* recopie ligne incomplete au debut du buffer */
len = client->ptr - ptr;
- memcpy (client->buffer, ptr, len );
+ memmove (client->buffer, ptr, len );
client->ptr = client->buffer + len;
}
else
@@ -180,6 +196,25 @@ static void HandleSocket (Channel channel, HANDLE fd, void *data)
}
}
+
+
+static void HandleCongestionWrite (Channel channel, HANDLE fd, void *data)
+{
+ Client client = (Client)data;
+
+ if (IvyFifoSendSocket (client->ifb, fd) == 0) {
+ // Not congestionned anymore
+ IvyChannelClearWritableEvent (channel);
+ // printf ("DBG> Socket *DE*congestionnee\n");
+ IvyFifoDelete (client->ifb);
+ client->ifb = NULL;
+ if (client->handle_decongestion )
+ (*client->handle_decongestion) (client, client->data );
+
+ }
+}
+
+
static void HandleServer(Channel channel, HANDLE fd, void *data)
{
Server server = (Server ) data;
@@ -187,7 +222,7 @@ static void HandleServer(Channel channel, HANDLE fd, void *data)
HANDLE ns;
socklen_t addrlen;
struct sockaddr_in remote2;
- // long socketFlag;
+ long socketFlag;
TRACE( "Accepting Connection...\n");
@@ -202,7 +237,7 @@ static void HandleServer(Channel channel, HANDLE fd, void *data)
IVY_LIST_ADD_START (clients_list, client );
- client->buffer_size = BUFFER_SIZE;
+ client->buffer_size = IVY_BUFFER_SIZE;
client->buffer = malloc( client->buffer_size );
if (!client->buffer )
{
@@ -212,17 +247,21 @@ static void HandleServer(Channel channel, HANDLE fd, void *data)
client->terminator = '\n';
client->from = remote2;
client->fd = ns;
+ client->ifb = NULL;
+ strcpy (client->app_uuid, "init by HandleServer");
-/* socketFlag = fcntl (client->fd, F_GETFD); */
-/* if (fcntl (client->fd, F_SETFD, socketFlag|O_NONBLOCK)) { */
-/* fprintf(stderr,"Warning : Setting socket in nonblock mode FAILED\n"); */
-/* } */
+ socketFlag = fcntl (client->fd, F_GETFD);
+ if (fcntl (client->fd, F_SETFD, socketFlag|O_NONBLOCK)) {
+ fprintf(stderr,"Warning : Setting socket in nonblock mode FAILED\n");
+ }
- client->channel = IvyChannelAdd (ns, client, DeleteSocket, HandleSocket );
+ client->channel = IvyChannelAdd (ns, client, DeleteSocket, HandleSocket,
+ HandleCongestionWrite);
client->interpretation = server->interpretation;
client->ptr = client->buffer;
client->handle_delete = server->handle_delete;
+ client->handle_decongestion = server->handle_decongestion;
client->data = (*server->create) (client );
#ifdef OPENMP
omp_init_lock (&(client->fdLock));
@@ -236,6 +275,7 @@ static void HandleServer(Channel channel, HANDLE fd, void *data)
Server SocketServer(unsigned short port,
void*(*create)(Client client),
void(*handle_delete)(Client client, void *data),
+ void(*handle_decongestion)(Client client, void *data),
void(*interpretation) (Client client, void *data, char *ligne))
{
Server server;
@@ -255,24 +295,27 @@ Server SocketServer(unsigned short port,
local.sin_addr.s_addr = INADDR_ANY;
local.sin_port = htons (port);
+
if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&one,sizeof(one)) < 0)
- {
+ {
#ifdef WIN32
- fprintf(stderr," setsockopt %d\n",WSAGetLastError());
+ fprintf(stderr," setsockopt %d\n",WSAGetLastError());
#endif
- perror ("*** set socket option SO_REUSEADDR ***");
- exit(0);
- }
+ perror ("*** set socket option SO_REUSEADDR ***");
+ exit(0);
+ }
#ifdef SO_REUSEPORT
if (setsockopt (fd, SOL_SOCKET, SO_REUSEPORT, (char *)&one, sizeof (one)) < 0)
- {
- perror ("*** set socket option REUSEPORT ***");
- exit(0);
- }
+ {
+ perror ("*** set socket option REUSEPORT ***");
+ exit(0);
+ }
#endif
+
+
if (bind(fd, (struct sockaddr *)&local, sizeof(local)) < 0)
{
perror ("*** bind ***");
@@ -294,9 +337,11 @@ Server SocketServer(unsigned short port,
IVY_LIST_ADD_START (servers_list, server );
server->fd = fd;
- server->channel = IvyChannelAdd (fd, server, DeleteServerSocket, HandleServer );
+ server->channel = IvyChannelAdd (fd, server, DeleteServerSocket,
+ HandleServer, NULL);
server->create = create;
server->handle_delete = handle_delete;
+ server->handle_decongestion = handle_decongestion;
server->interpretation = interpretation;
server->port = ntohs(local.sin_port);
IVY_LIST_ADD_END (servers_list, server );
@@ -359,136 +404,111 @@ void SocketClose (Client client )
IvyChannelRemove (client->channel );
}
-int SocketSendRaw (const Client client, const char *buffer, const int len )
+SendState SocketSendRaw (const Client client, const char *buffer, const int len )
{
- int err;
- int waiting= 0;
-
- if (!client)
- return waiting;
-
+ SendState state;
+
+ if (!client)
+ return SendParamError;
+
#ifdef OPENMP
- omp_set_lock (&(client->fdLock));
+ omp_set_lock (&(client->fdLock));
#endif
+
+ state = BufferizedSocketSendRaw (client, buffer, len);
- if ( debug_send )
- {
- /* try to determine if we are going to block */
- fd_set wrset;
- int ready;
- struct timeval timeout;
- timeout.tv_usec = 0;
- timeout.tv_sec = 0;
- FD_ZERO(&wrset);
- FD_SET( client->fd, &wrset );
- ready = select(client->fd+1, 0, &wrset, 0, &timeout);
- /* fprintf(stderr,"Ivy: select ready=%d fd=%d\n",ready,FD_ISSET( client->fd, &wrset ) ); */
- if(ready < 0) {
- perror("Ivy: SocketSendRaw select");
- }
- if ( !FD_ISSET( client->fd, &wrset ) )
- {
- fprintf(stderr,"Ivy: Client Queue Full Waiting..........");
- waiting = 1;
- }
-
- }
-
- err = send (client->fd, buffer, len, 0 );
- if (err != len )
- perror ("*** send ***");
- if ( debug_send && waiting )
- {
- fprintf(stderr,"... OK\n");
- }
#ifdef OPENMP
- omp_unset_lock (&(client->fdLock));
+ omp_unset_lock (&(client->fdLock));
#endif
- // GROS DEBUG
-/* { */
-/* char *toPrint = strndup (buffer, len); */
-/* printf ("DBG> SocketSendRaw [%d] -> '%s'\n", len, toPrint); */
-/* free (toPrint); */
-/* } */
- // END DEBUG
-
- return waiting;
+
+ return state;
}
-int SocketSendRawWithId( const Client client, const char *id, const char *buffer, const int len )
+
+
+
+static SendState BufferizedSocketSendRaw (const Client client, const char *buffer, const int len )
{
- int err;
- int waiting= 0;
+ ssize_t reallySent;
+ SendState state;
+
+ if (client->ifb != NULL) {
+ // Socket en congestion : on rajoute juste le flux dans le buffer,
+ // quand la socket sera dispo en ecriture, le select appellera la callback
+ // pour vider ce buffer
+ IvyFifoWrite (client->ifb, buffer, len);
+ state = IvyFifoIsFull (client->ifb) ? SendStateFifoFull : SendStillCongestion;
+ } else {
+ // on tente d'ecrire direct dans la socket
+ reallySent = send (client->fd, buffer, len, MSG_DONTWAIT);
+ if (reallySent == len) {
+ state = SendOk; // PAS CONGESTIONNEE
+ } else if (reallySent == -1) {
+ state = SendError; // ERREUR
+ } else {
+ // socket congestionnée
+ // on initialise une fifo pour accumuler les données
+ client->ifb = IvyFifoNew ();
+ IvyFifoWrite (client->ifb, &(buffer[reallySent]), len-reallySent);
+ // on ajoute un fdset pour que le select appelle une callback pour vider
+ // le buffer quand la socket sera à nouveau libre
+ IvyChannelAddWritableEvent (client->channel);
+ state = SendStateChangeToCongestion;
+ }
+ }
+ return (state);
+}
- if (!client)
- return waiting;
-
-#ifdef OPENMP
- omp_set_lock (&(client->fdLock));
-#endif
- if ( debug_send )
- {
- /* try to determine if we are going to block */
- fd_set wrset;
- int ready;
- struct timeval timeout;
- timeout.tv_usec = 0;
- timeout.tv_sec = 0;
- FD_ZERO(&wrset);
- FD_SET( client->fd, &wrset );
- ready = select(client->fd+1, 0, &wrset, 0, &timeout);
- /* fprintf(stderr,"Ivy: select ready=%d fd=%d\n",ready,FD_ISSET( client->fd, &wrset ) ); */
- if(ready < 0) {
- perror("Ivy: SocketSendRaw select");
- }
- if ( !FD_ISSET( client->fd, &wrset ) )
- {
- fprintf(stderr,"Ivy: Client Queue Full Waiting..........");
- waiting = 1;
- }
- }
+SendState SocketSendRawWithId( const Client client, const char *id, const char *buffer, const int len )
+{
+ SendState s1, s2;
+
+#ifdef OPENMP
+ omp_set_lock (&(client->fdLock));
+#endif
+
+ s1 = BufferizedSocketSendRaw (client, id, strlen (id));
- send (client->fd, id, strlen (id), 0 );
- err = send (client->fd, buffer, len, 0 );
- if (err != len )
- perror ("*** send ***");
- if ( debug_send && waiting )
- {
- fprintf(stderr,"... OK\n");
- }
+ s2 = BufferizedSocketSendRaw (client, buffer, len);
+
#ifdef OPENMP
- omp_unset_lock (&(client->fdLock));
+ omp_unset_lock (&(client->fdLock));
#endif
+
+ if (s1 == SendStateChangeToCongestion) {
+ // si le passage en congestion s'est fait sur l'envoi de l'id
+ s2 = s1;
+ }
- return waiting;
+ return (s2);
}
+
void SocketSetData (Client client, void *data )
{
- if (client)
- client->data = data;
+ if (client) {
+ client->data = data;
+ }
}
-int SocketSend (Client client, char *fmt, ... )
+SendState SocketSend (Client client, char *fmt, ... )
{
- int waiting = 0;
- static IvyBuffer buffer = {NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */
+ SendState state;
+ static IvyBuffer buffer = {NULL, 0, 0 }; /* Use static mem to eliminate multiple call to malloc /free */
#ifdef OPENMP
#pragma omp threadprivate (buffer)
#endif
- va_list ap;
- int len;
- if (!client)
- return waiting;
- va_start (ap, fmt );
- buffer.offset = 0;
- len = make_message (&buffer, fmt, ap );
- waiting = SocketSendRaw (client, buffer.data, len );
- va_end (ap );
- return waiting;
+ va_list ap;
+ int len;
+ va_start (ap, fmt );
+ buffer.offset = 0;
+ len = make_message (&buffer, fmt, ap );
+ state = SocketSendRaw (client, buffer.data, len );
+ va_end (ap );
+ return state;
}
void *SocketGetData (Client client )
@@ -499,7 +519,7 @@ void *SocketGetData (Client client )
void SocketBroadcast ( char *fmt, ... )
{
Client client;
- static IvyBuffer buffer = {NULL, 0, 0 }; /* Use satic mem to eliminate
+ static IvyBuffer buffer = {NULL, 0, 0 }; /* Use static mem to eliminate
multiple call to malloc /free */
#ifdef OPENMP
#pragma omp threadprivate (buffer)
@@ -522,7 +542,8 @@ Ouverture d'un canal TCP/IP en mode client
Client SocketConnect (char * host, unsigned short port,
void *data,
SocketInterpretation interpretation,
- void (*handle_delete)(Client client, void *data)
+ void (*handle_delete)(Client client, void *data),
+ void(*handle_decongestion)(Client client, void *data)
)
{
struct hostent *rhost;
@@ -532,19 +553,20 @@ Client SocketConnect (char * host, unsigned short port,
return NULL;
}
return SocketConnectAddr ((struct in_addr*)(rhost->h_addr), port, data,
- interpretation, handle_delete);
+ interpretation, handle_delete, handle_decongestion);
}
Client SocketConnectAddr (struct in_addr * addr, unsigned short port,
void *data,
SocketInterpretation interpretation,
- void (*handle_delete)(Client client, void *data)
+ void (*handle_delete)(Client client, void *data),
+ void(*handle_decongestion)(Client client, void *data)
)
{
HANDLE handle;
Client client;
struct sockaddr_in remote;
- // long socketFlag;
+ long socketFlag;
remote.sin_family = AF_INET;
remote.sin_addr = *addr;
@@ -559,14 +581,14 @@ Client SocketConnectAddr (struct in_addr * addr, unsigned short port,
perror ("*** client connect ***");
return NULL;
};
-/* socketFlag = fcntl (handle, F_GETFD); */
-/* if (fcntl (handle, F_SETFD, socketFlag|O_NONBLOCK)) { */
-/* fprintf(stderr,"Warning : Setting socket in nonblock mode FAILED\n"); */
-/* } */
+ socketFlag = fcntl (handle, F_GETFD);
+ if (fcntl (handle, F_SETFD, socketFlag|O_NONBLOCK)) {
+ fprintf(stderr,"Warning : Setting socket in nonblock mode FAILED\n");
+ }
IVY_LIST_ADD_START(clients_list, client );
- client->buffer_size = BUFFER_SIZE;
+ client->buffer_size = IVY_BUFFER_SIZE;
client->buffer = malloc( client->buffer_size );
if (!client->buffer )
{
@@ -575,14 +597,19 @@ Client SocketConnectAddr (struct in_addr * addr, unsigned short port,
}
client->terminator = '\n';
client->fd = handle;
- client->channel = IvyChannelAdd (handle, client, DeleteSocket, HandleSocket );
+ client->channel = IvyChannelAdd (handle, client, DeleteSocket,
+ HandleSocket, HandleCongestionWrite );
client->interpretation = interpretation;
client->ptr = client->buffer;
client->data = data;
client->handle_delete = handle_delete;
+ client->handle_decongestion = handle_decongestion;
client->from.sin_family = AF_INET;
client->from.sin_addr = *addr;
client->from.sin_port = htons (port);
+ client->ifb = NULL;
+ strcpy (client->app_uuid, "init by SocketConnectAddr");
+
#ifdef OPENMP
omp_init_lock (&(client->fdLock));
@@ -696,7 +723,7 @@ Client SocketBroadcastCreate (unsigned short port,
IVY_LIST_ADD_START(clients_list, client );
- client->buffer_size = BUFFER_SIZE;
+ client->buffer_size = IVY_BUFFER_SIZE;
client->buffer = malloc( client->buffer_size );
if (!client->buffer )
{
@@ -705,10 +732,14 @@ Client SocketBroadcastCreate (unsigned short port,
}
client->terminator = '\n';
client->fd = handle;
- client->channel = IvyChannelAdd (handle, client, DeleteSocket, HandleSocket );
+ client->channel = IvyChannelAdd (handle, client, DeleteSocket,
+ HandleSocket, HandleCongestionWrite);
client->interpretation = interpretation;
client->ptr = client->buffer;
client->data = data;
+ client->ifb = NULL;
+ strcpy (client->app_uuid, "init by SocketBroadcastCreate");
+
#ifdef OPENMP
omp_init_lock (&(client->fdLock));
#endif
@@ -780,3 +811,19 @@ Multicast datagrams with initial TTL 255 are unrestricted in scope.
return 1;
}
+extern void SocketSetUuid (Client client, const char *uuid)
+{
+ strncpy (client->app_uuid, uuid, sizeof (client->app_uuid));
+}
+
+const char* SocketGetUuid (const Client client)
+{
+ return client->app_uuid;
+}
+
+extern int SocketCmpUuid (const Client c1, const Client c2)
+{
+ return strncmp (c1->app_uuid, c2->app_uuid, sizeof (c1->app_uuid));
+}
+
+