From ee2e694ebba179f1c75764a7311df717fa3925cd Mon Sep 17 00:00:00 2001 From: bustico Date: Wed, 6 Feb 2008 16:32:54 +0000 Subject: * fix realloc buffer size when big message * complete change of internal structures for performance optimisation * experimental parralelized version for performance optimisation (use and need openmp) which scale well for regexp matching on multicore/multi processor gear. --- src/Makefile | 56 +++- src/ivy.c | 941 +++++++++++++++++++++++++++++++++++++++++++++----------- src/ivy.h | 2 +- src/ivybuffer.c | 4 +- src/ivysocket.c | 100 +++++- src/ivysocket.h | 3 +- src/list.h | 6 + src/timer.c | 2 +- src/version.h | 2 +- 9 files changed, 916 insertions(+), 200 deletions(-) (limited to 'src') diff --git a/src/Makefile b/src/Makefile index 8dba104..66526dc 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,4 +1,4 @@ -# + # Ivy, C interface # # Copyright (C) 1997-2002 @@ -17,7 +17,7 @@ # change this in version.h too !!!! MAJOR=3 -MINOR=9 +MINOR=10 PERHAPS64 := $(shell uname -m | perl -ne "print /64/ ? '64' : '';") LIB = /lib$(PERHAPS64) @@ -81,6 +81,7 @@ else endif CC=gcc +CPP='g++' LIBTOOL=ar q # linux and solaris #LIBTOOL=libtool -static -o @@ -90,9 +91,12 @@ REGEXP= -DUSE_PCRE_REGEX -DPCRE_OPT=$(PCRE_OPT) # see below CHANNEL = -DTCL_CHANNEL_INTEGRATION -CFLAGS += -g -Wall $(FPIC) +CFLAGS += -g -Wall -Wshadow $(FPIC) +OMPCFLAGS = $(CFLAGS) -fopenmp -DOPENMP=1 # -DDEBUG +OMPLIB = -lgomp -lpthread OBJ = ivyloop.o timer.o ivysocket.o ivy.o ivybuffer.o ivybind.o intervalRegexp.o +OMPOBJ = ivyloop.o timer.o ivysocket_omp.o ivy_omp.o ivybuffer.o ivybind.o intervalRegexp.o GOBJ = ivyloop.o timer.o ivysocket.o givy.o ivybuffer.o ivybind.o intervalRegexp.o XTOBJ = ivyxtloop.o ivysocket.o ivy.o ivybuffer.o ivybind.o intervalRegexp.o GLIBOBJ = ivyglibloop.o ivysocket.o ivy.o ivybuffer.o ivybind.o intervalRegexp.o @@ -100,15 +104,20 @@ GLUTOBJ = ivyglutloop.o ivysocket.o ivy.o ivybuffer.o ivybind.o intervalRegexp.o TCLOBJ = ivytcl.o timer.o ivysocket.o givy.o ivybuffer.o ivybind.o intervalRegexp.o # WINDOWS add ivyloop.o if TCL_CHANNEL_INTEGRATION is not set -TARGETS = ivyprobe ivyperf ivyglibprobe ivyxtprobe +TARGETS = ivyprobe ivythroughput ivyperf ivyglibprobe ivyxtprobe ivyprobe_efence TARGETLIBS=libivy.so.$(MAJOR).$(MINOR) libgivy.so.$(MAJOR).$(MINOR) libxtivy.so.$(MAJOR).$(MINOR) libglibivy.so.$(MAJOR).$(MINOR) libtclivy.so.$(MAJOR).$(MINOR) +OMP_TARGET= libivy_omp.so.$(MAJOR).$(MINOR) libivy_omp.a ivyprobe_omp ivythroughput_omp # not yet need Modified Glut ivyglutprobe .c.o: $(CC) $(CFLAGS) -c $*.c +.cpp.o: + $(CPP) $(CFLAGS) -c $*.cpp all: static-libs commands shared-libs pkgconf +omp: $(OMP_TARGET) + static-libs: libivy.a libgivy.a libxtivy.a libglibivy.a libtclivy.a # not yet need Modified Glut libglutivy.a @@ -127,6 +136,12 @@ ivytcl.o: ivytcl.c givy.o: ivy.c $(CC) -c $(CFLAGS) -DDEBUG -o givy.o ivy.c +ivy_omp.o: ivy.c + $(CC) -c $(OMPCFLAGS) -o ivy_omp.o ivy.c + +ivysocket_omp.o: ivysocket.c + $(CC) -c $(OMPCFLAGS) -o ivysocket_omp.o ivysocket.c + ivyglutloop.o: ivyglutloop.c ivyglutloop.h $(CC) -c $(CFLAGS) $(GLUTINC) ivyglutloop.c @@ -134,7 +149,21 @@ ivyglibloop.o: ivyglibloop.c ivyglibloop.h $(CC) -c $(CFLAGS) $(GLIBINC) ivyglibloop.c ivyprobe: ivyprobe.o libivy.a - $(CC) $(CFLAGS) -o $@ ivyprobe.o -L. -livy $(PCRELIB) $(EXTRALIB) + $(CC) $(CFLAGS) -o $@ ivyprobe.o -L. -livy $(PCRELIB) $(EXTRALIB) + +ivythroughput: ivythroughput.o libivy.a + $(CPP) $(CFLAGS) -o $@ ivythroughput.o -L. -livy -lpcrecpp $(PCRELIB) $(EXTRALIB) + +ivyprobe_efence: ivyprobe.o libivy.a + $(CPP) $(CFLAGS) -o $@ ivyprobe.o -L. -livy -lpcrecpp $(PCRELIB) $(EXTRALIB) -lefence + + + +ivyprobe_omp: ivyprobe.o libivy_omp.a + $(CC) $(CFLAGS) -o $@ ivyprobe.o -L. -livy_omp $(PCRELIB) $(EXTRALIB) $(OMPLIB) + +ivythroughput_omp: ivythroughput.o libivy_omp.a + $(CPP) $(CFLAGS) -o $@ ivythroughput.o -L. -livy_omp -lpcrecpp $(PCRELIB) $(EXTRALIB) $(OMPLIB) ivyprobe.o : ivyprobe.c $(CC) $(CFLAGS) $(REGEXP) -c ivyprobe.c -o $@ @@ -162,7 +191,11 @@ ivyglutprobe: ivyglutprobe.o libglutivy.a libivy.a: $(OBJ) rm -f $@ - $(LIBTOOL) $@ $(OBJ) + $(LIBTOOL) $@ $(OBJ) + +libivy_omp.a: $(OMPOBJ) + rm -f $@ + $(LIBTOOL) $@ $(OMPOBJ) libgivy.a: $(GOBJ) rm -f $@ @@ -190,6 +223,11 @@ libivy.so.$(MAJOR).$(MINOR): $(OBJ) # $(CC) -G -Wl,-h,libivy.so.$(MAJOR) -o $@ $(OBJ) #solaris # libtool -dynamic -o $@ $(OBJ) $(PCRELIB) -lc +libivy_omp.so.$(MAJOR).$(MINOR): $(OMPOBJ) + $(CC) $(LDFLAGS) -shared -Wl,-soname,libivy.so.$(MAJOR) -o $@ $(OMPOBJ) $(PCRELIB) $(OMPLIB) +# $(CC) -G -Wl,-h,libivy.so.$(MAJOR) -o $@ $(OBJ) #solaris +# libtool -dynamic -o $@ $(OBJ) $(PCRELIB) -lc + libgivy.so.$(MAJOR).$(MINOR): $(GOBJ) $(CC) $(LDFLAGS) -shared -Wl,-soname,libgivy.so.$(MAJOR) -o $@ $(GOBJ) $(PCRELIB) # $(CC) -G -Wl,-h,libgivy.so.$(MAJOR) -o $@ $(GOBJ) #solaris @@ -217,7 +255,7 @@ libtclivy.so.$(MAJOR).$(MINOR): $(TCLOBJ) distclean: clean clean: - -rm -f $(TARGETS) $(TARGETLIBS) *.o *.a *.so *.so.* *~ + -rm -f $(TARGETS) $(TARGETLIBS) $(OMP_TARGET) *.o *.a *.so *.so.* *~ -rm -f ivy-glib.pc installlibs: static-libs shared-libs @@ -225,6 +263,7 @@ installlibs: static-libs shared-libs -test -d $(DESTDIR)$(X11_PREFIX)$(LIB) || mkdirhier $(DESTDIR)$(X11_PREFIX)$(LIB) install -m644 libivy.a $(DESTDIR)$(PREFIX)$(LIB) + install -m644 libivy_omp.a $(DESTDIR)$(PREFIX)$(LIB) install -m644 libgivy.a $(DESTDIR)$(PREFIX)$(LIB) -install -m644 libxtivy.a $(DESTDIR)$(X11_PREFIX)$(LIB) install -m644 libtclivy.a $(DESTDIR)$(PREFIX)$(LIB) @@ -235,6 +274,7 @@ installlibs: static-libs shared-libs -install -m644 libxtivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(X11_PREFIX)$(LIB) install -m644 libtclivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB) install -m644 libglibivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB) + install -m644 libivy_omp.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB) includes: test -d $(DESTDIR)$(PREFIX)/include/Ivy || mkdirhier $(DESTDIR)$(PREFIX)/include/Ivy @@ -262,10 +302,12 @@ installliblinks: installlibs -ln -fs $(X11_PREFIX)$(LIB)/libxtivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(X11_PREFIX)$(LIB)/libxtivy.so.$(MAJOR) ln -fs $(PREFIX)$(LIB)/libtclivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)/libtclivy.so ln -fs $(PREFIX)$(LIB)/libtclivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)/libtclivy.so.$(MAJOR) + ln -fs $(PREFIX)$(LIB)/libivy_omp.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)/libivy_omp.so installbins: commands test -d $(DESTDIR)$(PREFIX)/bin || mkdirhier $(DESTDIR)$(PREFIX)/bin install -m755 ivyprobe $(DESTDIR)$(PREFIX)/bin + install -m755 ivyprobe_omp $(DESTDIR)$(PREFIX)/bin #install -m755 ivyxtprobe $(DESTDIR)$(PREFIX)/bin #install -m755 ivyglibprobe $(DESTDIR)$(PREFIX)/bin #install -m755 ivyglutprobe $(DESTDIR)$(PREFIX)/bin diff --git a/src/ivy.c b/src/ivy.c index 635e006..0d146ee 100644 --- a/src/ivy.c +++ b/src/ivy.c @@ -2,12 +2,12 @@ * * Ivy, C interface * - * Copyright 1997-2000 + * Copyright 1997-2008 * Centre d'Etudes de la Navigation Aerienne * * Main functions * - * Authors: Francois-Regis Colin,Stephane Chatty + * Authors: Francois-Regis Colin,Stephane Chatty, Alexandre Bustico * * $Id$ * @@ -15,6 +15,17 @@ * copyright notice regarding this software */ +/* + TODO : ° version non bloquante + ° outil de chasse aux memleak + ° mesures de perfo + ° compil rejeu en monothread et omp et tests + ° compil sur mac et windows pour portabilité + */ + +#ifdef OPENMP +#include +#endif #include #ifdef WIN32 @@ -28,8 +39,11 @@ #include #include + + #include +#include "uthash.h" #include "intervalRegexp.h" #include "ivychannel.h" #include "ivysocket.h" @@ -56,7 +70,6 @@ static char* DefaultIvyBus = GenerateIvyBus(DEFAULT_DOMAIN,DEFAULT_BUS); typedef enum { - Bye, /* l'application emettrice se termine */ AddRegexp, /* expression reguliere d'un client */ Msg, /* message reel */ @@ -70,29 +83,51 @@ typedef enum { Pong /* ivy doit renvoyer ce message à la reception d'un ping */ } MsgType; -typedef struct _msg_snd *MsgSndPtr; + +typedef struct _global_reg_lst *GlobRegPtr; +typedef struct _msg_snd_dict *MsgSndDictPtr; + struct _msg_rcv { /* requete d'emission d'un client */ MsgRcvPtr next; int id; const char *regexp; /* regexp du message a recevoir */ - MsgCallback callback; /* callback a declanche a la reception */ + MsgCallback callback; /* callback a declancher a la reception */ void *user_data; /* stokage d'info client */ }; -struct _msg_snd { /* requete de reception d'un client */ - MsgSndPtr next; - int id; - char *str_regexp; /* la regexp sous forme inhumaine */ + + +/* liste de regexps source */ +struct _global_reg_lst { /* liste des regexp source */ + GlobRegPtr next; + char *str_regexp; /* la regexp sous forme source */ + int id; /* son id, differente pour chaque client */ +}; + + +/* pour le dictionnaire clef=regexp, valeur = cette struct */ +struct _msg_snd_dict { /* requete de reception d'un client */ + UT_hash_handle hh; /* makes this structure hashable */ + char *regexp_src; /* clef du dictionnaire (hash uthash) */ + IvyClientPtr clientList; /* liste des clients */ IvyBinding binding; /* la regexp sous forme machine */ }; -struct _clnt_lst { +/* liste de clients, champ de la struct _msg_snd_dict qui est valeur du dictionnaire */ +/* typedef IvyClientPtr */ +struct _clnt_lst_dict { IvyClientPtr next; Client client; /* la socket client */ - MsgSndPtr msg_send; /* liste des requetes recues */ char *app_name; /* nom de l'application */ unsigned short app_port; /* port de l'application */ + int id; /* l'id n'est pas liée uniquement + a la regexp, mais au couple + regexp, client */ + GlobRegPtr srcRegList; /* liste de regexp source */ +#ifdef OPENMP + int endRegexpReceived; +#endif // OPENMP }; /* flag pour le debug en cas de Filter de regexp */ @@ -113,35 +148,70 @@ static unsigned short SupervisionPort; /* client pour la socket supervision */ static Client broadcast; -static const char *ApplicationName = 0; -static const char *ApplicationID = 0; +static const char *ApplicationName = NULL; +static const char *ApplicationID = NULL; /* callback appele sur reception d'un message direct */ -static MsgDirectCallback direct_callback = 0; -static void *direct_user_data = 0; +static MsgDirectCallback direct_callback = NULL; +static void *direct_user_data = NULL; /* callback appele sur changement d'etat d'application */ static IvyApplicationCallback application_callback; -static void *application_user_data = 0; +static void *application_user_data = NULL; /* callback appele sur ajout suppression de regexp */ static IvyBindCallback application_bind_callback; -static void *application_bind_data = 0; +static void *application_bind_data = NULL; /* callback appele sur demande de terminaison d'application */ static IvyDieCallback application_die_callback; -static void *application_die_user_data = 0; +static void *application_die_user_data = NULL; /* liste des messages a recevoir */ -static MsgRcvPtr msg_recv = 0; +static MsgRcvPtr msg_recv = NULL; /* liste des clients connectes */ -static IvyClientPtr clients = 0; +static IvyClientPtr allClients = NULL; + +/* dictionnaire clef : regexp, valeur : liste de clients IvyClientPtr */ +static MsgSndDictPtr messSndByRegexp = NULL; -static const char *ready_message = 0; +static const char *ready_message = NULL; static void substituteInterval (IvyBuffer *src); +static int RegexpCall (const MsgSndDictPtr msg, const char * const message); +static int RegexpCallUnique (const MsgSndDictPtr msg, const char * const message, + const Client clientUnique); + +static void freeClient ( IvyClientPtr client); +static void delOneClient (const Client client); + +static void delRegexpForOneClientFromDictionary (const char *regexp, const IvyClientPtr client); +static void delAllRegexpsFromDictionary (); +static void addRegexpToDictionary (const char* regexp, IvyClientPtr client, int id, + IvyBinding bind); +static void changeRegexpInDictionary (const char* regexp, IvyClientPtr client, int id, + IvyBinding bind); + +static char delRegexpForOneClient (const char *regexp, const IvyClientPtr client, int id); +static void addRegexp (const char* regexp, IvyClientPtr client, int id, + IvyBinding bind); +static void changeRegexp (const char* regexp, IvyClientPtr client, int id, IvyBinding bind); +static void addOrChangeRegexp (const char* regexp, IvyClientPtr client, int id, IvyBinding bind); +static int IvyCheckBuffer( const char* buffer ); + +#ifdef OPENMP +static void regenerateRegPtrArrayCache (); +static void addRegToPtrArrayCache (MsgSndDictPtr newReg); +static struct { + MsgSndDictPtr *msgPtrArray; + int size; + int numPtr; +} ompDictCache = {NULL, 0, 0}; +#endif + + /* * function like strok but do not eat consecutive separator * */ @@ -169,70 +239,165 @@ static int MsgSendTo( Client client, MsgType msgtype, int id, const char *messag static void IvyCleanup() { IvyClientPtr clnt,next; + GlobRegPtr regLst; + /* destruction des connexions clients */ - IVY_LIST_EACH_SAFE( clients, clnt, next ) + IVY_LIST_EACH_SAFE( allClients, clnt, next ) { /* on dit au revoir */ MsgSendTo( clnt->client, Bye, 0, "" ); SocketClose( clnt->client ); - IVY_LIST_EMPTY( clnt->msg_send ); + IVY_LIST_EACH (clnt->srcRegList, regLst) { + if (regLst->str_regexp != NULL) { + free (regLst->str_regexp); + regLst->str_regexp = NULL; + } + } + IVY_LIST_EMPTY( clnt->srcRegList ); + IVY_LIST_REMOVE (allClients, clnt); } - IVY_LIST_EMPTY( clients ); + IVY_LIST_EMPTY( allClients ); + delAllRegexpsFromDictionary (); /* destruction des sockets serveur et supervision */ SocketServerClose( server ); SocketClose( broadcast ); } -static int MsgCall (const char *message, MsgSndPtr msg, IvyClientPtr client) + +static int +ClientCall (IvyClientPtr clnt, const char *message) { - int waiting = 0; - static IvyBuffer buffer = { NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */ - int err; - int index; - int arglen; - const char *arg; - - int rc= IvyBindingExec( msg->binding, message ); + int match_count = 0; + + /* pour toutes les regexp */ + MsgSndDictPtr msgSendDict; + + for (msgSendDict=messSndByRegexp; msgSendDict != NULL; + msgSendDict=msgSendDict->hh.next) { + match_count += RegexpCallUnique (msgSendDict, message, clnt->client); + } + + TRACE_IF( match_count == 0, "Warning no recipient for %s\n",message); + /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */ + if ( match_count == 0 && debug_filter ) { + IvyBindindFilterCheck( message ); + } + return match_count; +} + + + +static int +RegexpCall (const MsgSndDictPtr msg, const char * const message) +{ + static IvyBuffer bufferArg = {NULL, 0, 0}; + char bufferId[16]; + int match_count ; + int waiting ; + int indx; + int arglen; + const char *arg; + IvyClientPtr clnt; + int rc; + +#ifdef OPENMP +#pragma omp threadprivate (bufferArg) + /* d'après la doc openmp : + Variables with automatic storage duration which are declared in a scope inside the + construct are private. + Il n'y aurait donc rien à faire pour s'assurer que les variables automatiques soient + privées au thread */ +#endif // OPENMP + + match_count = 0; + waiting = 0; + rc= IvyBindingExec(msg->binding, message ); - if (rc<1) return 0; /* no match */ + if (rc<1) return 0; /* no match */ - TRACE( "Sending message id=%d '%s'\n",msg->id,message); + bufferArg.offset = 0; + // bufferArg.size = bufferId.size = 0; + // bufferArg.data = bufferId.data = NULL; - buffer.offset = 0; - /* il faut essayer d'envoyer le message en une seule fois sur la socket */ - /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */ - /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */ - err = make_message_var( &buffer, "%d %d" ARG_START ,Msg, msg->id); + /* il faut essayer d'envoyer le message en une seule fois sur la socket */ + /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */ + /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */ - TRACE( "Send matching args count %d\n",rc); + TRACE( "Send matching args count %d\n",rc); + + for( indx=1; indx < rc ; indx++ ) + { + IvyBindingMatch (msg->binding, message, indx, &arglen, & arg ); + make_message_var( &bufferArg, "%.*s" ARG_END , arglen, arg ); + } + make_message_var( &bufferArg, "\n"); - for( index=1; index < rc ; index++ ) - { - IvyBindingMatch( msg->binding, message, index, &arglen, & arg ); - err = make_message_var( &buffer, "%.*s" ARG_END , arglen, arg ); - } - err = make_message_var( &buffer, "\n"); - waiting = SocketSendRaw(client->client, buffer.data , buffer.offset); - if ( waiting ) - fprintf(stderr, "Ivy: Slow client : %s\n", client->app_name ); - return 1; -} + IVY_LIST_EACH(msg->clientList, clnt ) { + sprintf (bufferId, "%d %d" ARG_START ,Msg, clnt->id); + waiting = SocketSendRawWithId(clnt->client, bufferId, bufferArg.data , bufferArg.offset); + match_count++; + if ( waiting ) + fprintf(stderr, "Ivy: Slow client : %s\n", clnt->app_name ); + } + return match_count; +} static int -ClientCall (IvyClientPtr clnt, const char *message) +RegexpCallUnique (const MsgSndDictPtr msg, const char * const message, const + Client clientUnique) { - MsgSndPtr msg; - int match_count = 0; - /* recherche dans la liste des requetes recues de ce client */ - IVY_LIST_EACH (clnt->msg_send, msg) { - match_count+= MsgCall (message, msg, clnt); - } - return match_count; + static IvyBuffer bufferArg = {NULL, 0, 0}; + char bufferId[16]; + int match_count ; + int waiting ; + int indx; + int arglen; + const char *arg; + IvyClientPtr clnt; + int rc; + + match_count = 0; + waiting = 0; + rc= IvyBindingExec(msg->binding, message ); + + if (rc<1) return 0; /* no match */ + + bufferArg.offset = 0; + // bufferArg.size = bufferId. + // bufferArg.size = bufferId.size = 0; + // bufferArg.data = bufferId.data = NULL; + + /* il faut essayer d'envoyer le message en une seule fois sur la socket */ + /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */ + /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */ + + TRACE( "Send matching args count %d\n",rc); + + for( indx=1; indx < rc ; indx++ ) + { + IvyBindingMatch (msg->binding, message, indx, &arglen, & arg ); + make_message_var( &bufferArg, "%.*s" ARG_END , arglen, arg ); + } + make_message_var( &bufferArg, "\n"); + + IVY_LIST_EACH(msg->clientList, clnt ) { + if (clientUnique != clnt->client) + continue; + sprintf (bufferId, "%d %d" ARG_START ,Msg, clnt->id); + waiting = SocketSendRawWithId(clnt->client, bufferId, bufferArg.data , bufferArg.offset); + match_count++; + if ( waiting ) + fprintf(stderr, "Ivy: Slow client : %s\n", clnt->app_name ); + } + return match_count; } + + + static int CheckConnected( IvyClientPtr clnt ) { IvyClientPtr client; @@ -242,7 +407,7 @@ static int CheckConnected( IvyClientPtr clnt ) if ( clnt->app_port == 0 ) /* Old Ivy Protocol Dont check */ return 0; /* recherche dans la liste des clients de la presence de clnt */ - IVY_LIST_EACH( clients, client ) + IVY_LIST_EACH( allClients, client ) { /* client different mais port identique */ if ( (client != clnt) && (clnt->app_port == client->app_port) ) @@ -265,18 +430,19 @@ static int CheckConnected( IvyClientPtr clnt ) return 0; } + + static void Receive( Client client, void *data, char *line ) { IvyClientPtr clnt; int err,id; - MsgSndPtr snd; MsgRcvPtr rcv; int argc = 0; char *argv[MAX_MATCHING_ARGS]; char *arg; int kind_of_msg = Bye; - IvyBinding bind; - + IvyBinding ivyBind; + const char *errbuf; int erroffset; @@ -311,7 +477,7 @@ static void Receive( Client client, void *data, char *line ) if ( !IvyBindingFilter( arg ) ) { - TRACE("Warning: regexp '%s' illegal, removing from %s\n",arg,ApplicationName); + TRACE("Warning: regexp '%s' filtered, removing from %s\n",arg,ApplicationName); if ( application_bind_callback ) { @@ -320,58 +486,33 @@ static void Receive( Client client, void *data, char *line ) return; } - bind = IvyBindingCompile( arg, & erroffset, & errbuf ); - if ( bind != NULL ) - { - /* On teste si c'est un change regexp : changement de regexp d'une id existante */ - IVY_LIST_ITER( clnt->msg_send, snd, ( id != snd->id )); - if ( snd ) { - free (snd->str_regexp); - snd->str_regexp = strdup( arg ); - snd->binding = bind; - if ( application_bind_callback ) - { - (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyChangeBind ); - } - } else { - IVY_LIST_ADD_START( clnt->msg_send, snd ) - snd->id = id; - snd->str_regexp = strdup( arg ); - snd->binding = bind; - if ( application_bind_callback ) - { - (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyAddBind ); - } - IVY_LIST_ADD_END( clnt->msg_send, snd ) - - } - } - else - { - printf("Error compiling '%s', %s\n", arg, errbuf); - MsgSendTo( client, Error, erroffset, errbuf ); + ivyBind = IvyBindingCompile( arg, & erroffset, & errbuf ); + if ( ivyBind != NULL ) { + addOrChangeRegexp (arg, clnt, id, ivyBind); + } else { + printf("Error compiling '%s', %s\n", arg, errbuf); + MsgSendTo( client, Error, erroffset, errbuf ); } break; case DelRegexp: - - TRACE("Regexp Delete id=%d\n", id); - - IVY_LIST_ITER( clnt->msg_send, snd, ( id != snd->id )); - if ( snd ) - { - if ( application_bind_callback ) - { - (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyRemoveBind ); - } - IvyBindingFree( snd->binding ); - IVY_LIST_REMOVE( clnt->msg_send, snd ); - } - break; + + TRACE("Regexp Delete id=%d\n", id); + if (delRegexpForOneClient (arg, clnt, id)) { + if ( application_bind_callback ) { + (*application_bind_callback)( clnt, application_bind_data, id, arg, + IvyRemoveBind ); + } + } + break; case StartRegexp: TRACE("Regexp Start id=%d Application='%s'\n", id, arg); +#ifdef OPENMP + clnt->endRegexpReceived=0; +#endif // OPENMP + clnt->app_name = strdup( arg ); clnt->app_port = id; if ( CheckConnected( clnt ) ) @@ -395,9 +536,13 @@ static void Receive( Client client, void *data, char *line ) { int count; count = ClientCall( clnt, ready_message ); - - TRACE(" Sendind ready message %d\n", count); + // count = IvySendMsg ("%s", ready_message ); + // printf ("%s sending READY MESSAGE %d\n", clnt->app_name, count); } +#ifdef OPENMP + clnt->endRegexpReceived=1; + regenerateRegPtrArrayCache(); +#endif // OPENMP break; case Msg: @@ -461,9 +606,7 @@ static IvyClientPtr SendService( Client client, const char *appname ) { IvyClientPtr clnt; MsgRcvPtr msg; - IVY_LIST_ADD_START( clients, clnt ) - - clnt->msg_send = 0; + IVY_LIST_ADD_START( allClients, clnt ) clnt->client = client; clnt->app_name = strdup(appname); clnt->app_port = 0; @@ -474,51 +617,44 @@ static IvyClientPtr SendService( Client client, const char *appname ) } MsgSendTo( client, EndRegexp, 0, ""); - IVY_LIST_ADD_END( clients, clnt ) + IVY_LIST_ADD_END( allClients, clnt ) + //printf ("DBG> SendService addAllClient: name=%s; client->client=%p\n", appname, clnt->client); + return clnt; } static void ClientDelete( Client client, void *data ) { IvyClientPtr clnt; - MsgSndPtr msg; + #ifdef DEBUG char *remotehost; unsigned short remoteport; #endif clnt = (IvyClientPtr)data; - if ( application_callback ) - { - (*application_callback)( clnt, application_user_data, IvyApplicationDisconnected ); - } + if ( application_callback ) { + (*application_callback)( clnt, application_user_data, IvyApplicationDisconnected ); + } #ifdef DEBUG /* probably bogus call, but this is for debug only anyway */ SocketGetRemoteHost( client, &remotehost, &remoteport ); TRACE("Deconnexion de %s:%hu\n", remotehost, remoteport ); #endif /*DEBUG */ - - if ( clnt->app_name ) free( clnt->app_name ); - IVY_LIST_EACH( clnt->msg_send, msg) - { - /*regfree(msg->regexp);*/ - free( msg->str_regexp); - } - IVY_LIST_EMPTY( clnt->msg_send ); - IVY_LIST_REMOVE( clients, clnt ); + delOneClient (client); } static void *ClientCreate( Client client ) { -#ifdef DEBUG - char *remotehost; - unsigned short remoteport; - SocketGetRemoteHost( client, &remotehost, &remoteport ); - TRACE("Connexion de %s:%hu\n", remotehost, remoteport ); -#endif /*DEBUG */ - - return SendService (client, "Unknown"); + // #ifdef DEBUG + char *remotehost; + unsigned short remoteport; + SocketGetRemoteHost( client, &remotehost, &remoteport ); + printf ("%s : Connexion de %s:%hu client=%p\n", + ApplicationName, remotehost, remoteport, client); + // #endif /*DEBUG */ + return SendService (client, "Unknown"); } static void BroadcastReceive( Client client, void *data, char *line ) @@ -529,17 +665,14 @@ static void BroadcastReceive( Client client, void *data, char *line ) unsigned short serviceport; char appid[2048]; char appname[2048]; -#ifdef DEBUG unsigned short remoteport; char *remotehost = 0; -#endif + memset( appid, 0, sizeof( appid ) ); memset( appname, 0, sizeof( appname ) ); err = sscanf (line,"%d %hu %s %[^\n]", &version, &serviceport, appid, appname); if ( err < 2 ) { /* ignore the message */ - unsigned short remoteport; - char *remotehost; SocketGetRemoteHost (client, &remotehost, &remoteport ); printf (" Bad supervision message, expected 'version port' from %s:%d\n", remotehost, remoteport); @@ -547,8 +680,6 @@ static void BroadcastReceive( Client client, void *data, char *line ) } if ( version != VERSION ) { /* ignore the message */ - unsigned short remoteport; - char *remotehost = 0; SocketGetRemoteHost (client, &remotehost, &remoteport ); fprintf (stderr, "Bad Ivy version, expected %d and got %d from %s:%d\n", VERSION, version, remotehost, remoteport); @@ -569,6 +700,9 @@ static void BroadcastReceive( Client client, void *data, char *line ) IvyClientPtr clnt; clnt = SendService( app, appname ); SocketSetData( app, clnt); + } else { + printf ("SocketConnectAddr error .....\n"); + SocketSetData( app, NULL); } } static unsigned long currentTime() @@ -641,7 +775,7 @@ void IvyStart (const char* bus) int error = 0; const char* p = bus; /* used for decoding address list */ const char* q; /* used for decoding port number */ - int port; + unsigned short port; /* @@ -750,7 +884,7 @@ IvyUnbindMsg (MsgRcvPtr msg) { IvyClientPtr clnt; /* Send to already connected clients */ - IVY_LIST_EACH (clients, clnt ) { + IVY_LIST_EACH (allClients, clnt ) { MsgSendTo( clnt->client, DelRegexp,msg->id, ""); } IVY_LIST_REMOVE( msg_recv, msg ); @@ -783,7 +917,7 @@ IvyBindMsg (MsgCallback callback, void *user_data, const char *fmt_regex, ... ) IVY_LIST_ADD_END( msg_recv, msg ) /* Send to already connected clients */ /* recherche dans la liste des requetes recues de mes clients */ - IVY_LIST_EACH( clients, clnt ) { + IVY_LIST_EACH( allClients, clnt ) { MsgSendTo( clnt->client, AddRegexp,msg->id,msg->regexp); } return msg; @@ -809,15 +943,115 @@ IvyChangeMsg (MsgRcvPtr msg, const char *fmt_regex, ... ) /* Send to already connected clients */ /* recherche dans la liste des requetes recues de mes clients */ - IVY_LIST_EACH( clients, clnt ) { + IVY_LIST_EACH( allClients, clnt ) { MsgSendTo( clnt->client, AddRegexp,msg->id,msg->regexp); } return msg; } + + + +int IvySendMsg(const char *fmt, ...) /* version dictionnaire */ +{ + int match_count = 0; + static IvyBuffer buffer = { NULL, 0, 0}; /* Use static mem to eliminate multiple call to malloc /free */ + va_list ap; + + /* construction du buffer message à partir du format et des arguments */ + if( fmt == 0 || strlen(fmt) == 0 ) return 0; + va_start( ap, fmt ); + buffer.offset = 0; + make_message( &buffer, fmt, ap ); + va_end ( ap ); + + /* test du contenu du message */ + if ( debug_binary_msg ) + { + if ( IvyCheckBuffer( buffer.data ) ) + return 0; + } + + /* pour toutes les regexp */ + +#ifdef OPENMP + //#define TABLEAU_PREALABLE_SEQUENTIEL 1 +#define TABLEAU_PREALABLE 1 + //#define SINGLE_NOWAIT 1 + //#define SCHEDULE_GUIDED 1 + //#define SEQUENTIEL_DEBUG 1 + +#ifdef SCHEDULE_GUIDED + int count; +#pragma omp parallel default(none) private(count) shared(ompDictCache, buffer) \ + reduction(+:match_count) + { +#pragma omp for schedule(guided) // après debug mettre schedule(guided, 10) + for(count=0; counthh.next) { +#pragma omp single nowait + match_count += RegexpCall (msgSendDict, buffer.data); + } +#endif // SINGLE_NOWAIT + +#ifdef SEQUENTIEL_DEBUG // OPEMMP LISTE + MsgSndDictPtr msgSendDict; + + for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) { + match_count += RegexpCall (msgSendDict, buffer.data); + } +#endif // SEQUENTIEL_DEBUG + + + +#else // PAS OPENMP + MsgSndDictPtr msgSendDict; + + for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) { + match_count += RegexpCall (msgSendDict, buffer.data); + } +#endif + + TRACE_IF( match_count == 0, "Warning no recipient for %s\n",buffer.data); + /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */ + if ( match_count == 0 && debug_filter ) + { + IvyBindindFilterCheck( buffer.data ); + } + return match_count; +} + + /* teste de la presence de binaire dans les message Ivy */ static int IvyCheckBuffer( const char* buffer ) { - char * ptr = buffer; + const char * ptr = buffer; while ( *ptr ) { if ( *ptr++ < ' ' ) @@ -829,42 +1063,11 @@ static int IvyCheckBuffer( const char* buffer ) } return 0; } -/* emmission d'un message avec formatage a la printf */ -int IvySendMsg(const char *fmt, ...) -{ - IvyClientPtr clnt; - int match_count = 0; - static IvyBuffer buffer = { NULL, 0, 0}; /* Use satic mem to eliminate multiple call to malloc /free */ - va_list ap; - - if( fmt == 0 || strlen(fmt) == 0 ) return 0; - va_start( ap, fmt ); - buffer.offset = 0; - make_message( &buffer, fmt, ap ); - va_end ( ap ); - /* test du contenu du message */ - if ( debug_binary_msg ) - { - if ( IvyCheckBuffer( buffer.data ) ) - return 0; - } - - /* recherche dans la liste des requetes recues de mes clients */ - IVY_LIST_EACH (clients, clnt) { - match_count += ClientCall (clnt, buffer.data); - } - TRACE_IF( match_count == 0, "Warning no recipient for %s\n",buffer.data); - /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */ - if ( match_count == 0 && debug_filter ) - { - IvyBindindFilterCheck( buffer.data ); - } - return match_count; -} + void IvySendError( IvyClientPtr app, int id, const char *fmt, ... ) { - static IvyBuffer buffer = { NULL, 0, 0}; /* Use satic mem to eliminate multiple call to malloc /free */ + static IvyBuffer buffer = { NULL, 0, 0}; /* Use static mem to eliminate multiple call to malloc /free */ va_list ap; va_start( ap, fmt ); @@ -943,7 +1146,7 @@ void IvyDefaultBindCallback( IvyClientPtr app, void *user_data, int id, char* re IvyClientPtr IvyGetApplication( char *name ) { IvyClientPtr app = 0; - IVY_LIST_ITER( clients, app, strcmp(name, app->app_name) != 0 ); + IVY_LIST_ITER( allClients, app, strcmp(name, app->app_name) != 0 ); return app; } @@ -952,7 +1155,7 @@ char *IvyGetApplicationList(const char *sep) static char applist[4096]; /* TODO remove that ugly Thing */ IvyClientPtr app; applist[0] = '\0'; - IVY_LIST_EACH( clients, app ) + IVY_LIST_EACH( allClients, app ) { strcat( applist, app->app_name ); strcat( applist, sep ); @@ -964,11 +1167,11 @@ char **IvyGetApplicationMessages( IvyClientPtr app ) { #define MAX_REGEXP 4096 static char *messagelist[MAX_REGEXP+1];/* TODO remove that ugly Thing */ - MsgSndPtr msg; + GlobRegPtr msg; int msgCount= 0; memset( messagelist, 0 , sizeof( messagelist )); /* recherche dans la liste des requetes recues de ce client */ - IVY_LIST_EACH( app->msg_send, msg ) + IVY_LIST_EACH( app->srcRegList, msg ) { messagelist[msgCount++]= msg->str_regexp; if ( msgCount >= MAX_REGEXP ) @@ -1021,3 +1224,371 @@ static void substituteInterval (IvyBuffer *src) src->data = dst.data; } } + + +static void freeClient ( IvyClientPtr client) +{ + GlobRegPtr srcReg; + + /* on libere la chaine nom de l'appli*/ + if (client->app_name != NULL) { + free (client->app_name); + client->app_name = NULL; + /* on libere la liste des clients */ + IVY_LIST_EACH (client->srcRegList, srcReg) { + if (srcReg->str_regexp != NULL) { + free (srcReg->str_regexp); + srcReg->str_regexp = NULL; + } + } + IVY_LIST_EMPTY (client->srcRegList); + } +} + + + + + + + +static void delRegexpForOneClientFromDictionary (const char *regexp, const IvyClientPtr client) +{ + MsgSndDictPtr msgSendDict = NULL; + IvyClientPtr client_itr, next; + TRACE ("ENTER delRegexpForOneClientFromDictionary clnt=%d, reg='%s'\n", client, regexp); + + HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict); + if (msgSendDict != NULL) { + /* la clef est trouvée, on itere sur la liste de client associée */ + IVY_LIST_EACH_SAFE ( msgSendDict->clientList, client_itr, next) { + /* pour tester 2 IvyClientPtr, on teste la similarité + des pointeur Client qui doivent être uniques */ + if (client_itr->client == client->client) { + /* on a trouve le client : on l'enleve */ + IVY_LIST_REMOVE (msgSendDict->clientList, client_itr); + TRACE ("delRegexpForOneClientFromDictionary : IVY_LIST_REMOVE\n"); + } + } + /* si la liste de clients associée à cette regexp est vide */ + if ((msgSendDict->clientList == NULL) || (IVY_LIST_IS_EMPTY (msgSendDict->clientList))) { + TRACE ("delRegexpForOneClientFromDictionary : IvyBindingFree, free, hash_del\n"); + /* on efface le binding */ + IvyBindingFree (msgSendDict->binding); + /* on enlève l'entrée regexp de la table de hash */ + HASH_DEL (messSndByRegexp, msgSendDict); + /* on efface la clef (regexp source) */ + free (msgSendDict->regexp_src); + } + } +#ifdef OPENMP + regenerateRegPtrArrayCache (); +#endif +} + + + + +static void delAllRegexpsFromDictionary () +{ + MsgSndDictPtr msgSendDict; + IvyClientPtr client; + + /* pour toutes les entrees du dictionnaire des regexps */ + for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) { + /* on efface le binding */ + IvyBindingFree (msgSendDict->binding); + /* on efface la clef (regexp source) */ + free (msgSendDict->regexp_src); + /* pour chaque client abonne a cette regexp */ + IVY_LIST_EACH ( msgSendDict->clientList, client) { + freeClient (client); + } + /* on enleve la liste de regexps */ + IVY_LIST_EMPTY(msgSendDict->clientList); + /* on enleve le couple regexp -> valeur */ + HASH_DEL(messSndByRegexp, msgSendDict); + } + +#ifdef OPENMP + regenerateRegPtrArrayCache (); +#endif +} + + + + + + + + + +// HASH_ADD_KEYPTR (hh_name, head, key_ptr, key_len, item_ptr) +// HASH_ADD_STR ( head, keyfield_name, item_ptr) + +static void addRegexpToDictionary (const char* regexp, IvyClientPtr client, int id, + IvyBinding ivyBind) +{ + MsgSndDictPtr msgSendDict = NULL; + IvyClientPtr newClient = NULL; + /* on cherche si une entrée existe deja pour cette regexp source */ + HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict); + /* l'entree n'existe pas dans le dictionnaire : on la cree */ + if (msgSendDict == NULL) { + msgSendDict = malloc (sizeof (struct _msg_snd_dict)); + msgSendDict->regexp_src = strdup (regexp); + msgSendDict->binding = ivyBind; + msgSendDict->clientList = NULL; + + /* HASH_ADD_STR ne fonctionne que si la clef est un tavbleau de char, si c'est un pointeur + if faut utiliser HASH_ADD_KEYPTR */ + HASH_ADD_KEYPTR(hh, messSndByRegexp, msgSendDict->regexp_src, strlen (msgSendDict->regexp_src), msgSendDict); +#ifdef OPENMP + // On ne regenere le cache qu'après recpetion du endregexp, ça permet d'eviter + // de regenerer inutilement le cache à chaqye nouvelle regexp initiale + // par contre, après le end regexp, il faut regenerer le cache à chaque + // nouvel abonnement + if (client->endRegexpReceived == 1) + addRegToPtrArrayCache (msgSendDict); +#endif + } + + + /* on ajoute le client à la liste des clients abonnés */ + IVY_LIST_ADD_START (msgSendDict->clientList, newClient); + newClient->app_name = strdup (client->app_name); + newClient->app_port = client->app_port; + newClient->client = client->client; + newClient->id = id; + /* au niveau du champ liste de client du dictionnaire, on n'a pas besoin + de la liste des regexps sources (qui n'est necessaire que pour + la liste globale des clients) */ + newClient->srcRegList = NULL; + IVY_LIST_ADD_END (msgSendDict->clientList, newClient); +} + + + +static void changeRegexpInDictionary (const char* regexp, IvyClientPtr client, + int id, IvyBinding ivyBind) +{ + // printf ("ENTER changeRegexpInDictionary\n"); + delRegexpForOneClientFromDictionary (regexp, client); + addRegexpToDictionary (regexp, client, id, ivyBind); +} + + + + +/* met a jour le dictionnaire et la liste globale */ +static void delOneClient (const Client client) +{ + IvyClientPtr client_itr; + GlobRegPtr regxpSrc =NULL, next; + + TRACE ("ENTER delOneClient\n"); + /* on cherche le client dans la liste globale des clients */ + IVY_LIST_ITER (allClients, client_itr, client_itr->client != client ); + /* si on le trouve */ + if (client_itr != NULL) { + TRACE ("DEBUG delOneClient %s, client_itr->client=%p, clientRef=%p\n", + client_itr->app_name, client_itr->client, client); + + + + + /* pour chaque regexp source de ce client */ + IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) { + /* on met a jour la liste des clients associee a la regexp source */ + delRegexpForOneClient (regxpSrc->str_regexp, client_itr, regxpSrc->id); + /* on libere la memoire associee a la regexp source */ + if (regxpSrc->str_regexp != NULL) { + free (regxpSrc->str_regexp); + regxpSrc->str_regexp = NULL; + } + } + + + /* on libere la liste de regexp source */ + IVY_LIST_EMPTY (client_itr->srcRegList); + /* on enleve l'entree correspondant a ce client dans la liste globale */ + IVY_LIST_REMOVE (allClients, client_itr); + } +} + + +static char delRegexpForOneClient (const char *regexp, const IvyClientPtr client, int id) +{ + IvyClientPtr client_itr = NULL; + GlobRegPtr regxpSrc = NULL, next = NULL; + char removed = 0; + + TRACE ("ENTER delRegexpForOneClient id=%d\n", id); + + /* on enleve du dictionnaire */ + + delRegexpForOneClientFromDictionary (regexp, client); + + /* on enleve de la liste globale */ + /* recherche du client */ + IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client ); + if (client_itr != NULL) { + /* pour chaque regexp source de ce client */ + IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) { + /* si on trouve notre regexp, on la supprime */ + if (regxpSrc->id == id) { + removed = 1; + if (regxpSrc->str_regexp != NULL) { + free (regxpSrc->str_regexp); + regxpSrc->str_regexp = NULL; + } + TRACE ("DBG> IVY_LIST_REMOVE (%p, %p)\n", client_itr->srcRegList, regxpSrc); + IVY_LIST_REMOVE (client_itr->srcRegList, regxpSrc); + } + } + } + return (removed); +} + +static void addOrChangeRegexp (const char* regexp, IvyClientPtr client, int id, + IvyBinding ivyBind) +{ + MsgSndDictPtr msgSendDict = NULL; + IvyClientPtr client_itr = NULL; + + // printf ("ENTER addOrChangeRegexp\n"); + /* on teste si la regexp existe deja et si il faut faire un changeRegexp */ + HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict); + /* la regexp n'existe pas du tout */ + if (msgSendDict == NULL) { + addRegexp (regexp, client, id, ivyBind); + } else { + /* la regexp existe, mais l'id existe elle pour le client */ + IVY_LIST_ITER( msgSendDict->clientList, client_itr, ( client_itr->client != client->client)); + if (( client_itr != NULL) && (client_itr->id == id)) { + /* si oui on fait un change regexp */ + changeRegexp (regexp, client, id, ivyBind); + } else { + /* si non on fait un add regexp */ + addRegexp (regexp, client, id, ivyBind); + } + } +} + + +static void addRegexp (const char* regexp, IvyClientPtr client, int id, + IvyBinding ivyBind) +{ + IvyClientPtr client_itr = NULL; + GlobRegPtr regxpSrc = NULL; + + + // printf ("ENTER addRegexp\n"); + + /* on ajoute au dictionnaire */ + addRegexpToDictionary (regexp, client, id, ivyBind); + + /* on ajoute a la liste globale */ + /* recherche du client */ + IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client ); + + /* si le client n'existe pas, faut le creer */ + if (client_itr == NULL) { +/* IVY_LIST_ADD_START (allClients, client_itr); */ +/* client_itr->app_name = strdup (client->app_name); */ +/* client_itr->app_port = client->app_port; */ +/* client_itr->client = client->client; */ +/* client_itr->srcRegList = NULL; */ +/* IVY_LIST_ADD_END (allClients, client_itr); */ + fprintf(stderr, "addRegexp ERROR\n"); + } + + /* on ajoute la regexp à la liste de regexps */ + IVY_LIST_ADD_START (client_itr->srcRegList, regxpSrc); + regxpSrc->id = id; + regxpSrc->str_regexp = strdup (regexp); + IVY_LIST_ADD_END (client_itr->srcRegList, regxpSrc); + if (application_bind_callback) { + (*application_bind_callback)( client, application_bind_data, id, regexp, IvyAddBind); + } +} + + + +static void changeRegexp (const char* regexp, IvyClientPtr client, int id, + IvyBinding ivyBind) +{ + IvyClientPtr client_itr = NULL; + GlobRegPtr regxpSrc = NULL, next = NULL; + /* on change dans le dictionnaire */ + // printf ("ENTER changeRegexp\n"); + changeRegexpInDictionary (regexp, client, id , ivyBind); + + /* on change dans la liste globale */ + /* recherche du client */ + IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client ); + if (client_itr != NULL) { + /* pour chaque regexp source de ce client */ + IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) { + /* si on trouve notre regexp, on la change */ + if (regxpSrc->id == id) { + free (regxpSrc->str_regexp); + regxpSrc->str_regexp = strdup (regexp); + } + } + } + if (application_bind_callback) { + (*application_bind_callback)( client, application_bind_data, id, regexp, IvyChangeBind); + } +} + +#ifdef OPENMP +static void regenerateRegPtrArrayCache () +{ + int count=0; + MsgSndDictPtr msgSendDict; + ompDictCache.numPtr = 0; + + for (msgSendDict=messSndByRegexp; msgSendDict != NULL ; + msgSendDict=msgSendDict->hh.next) { + ompDictCache.numPtr++; + } + + if (ompDictCache.numPtr >= ompDictCache.size) { + ompDictCache.size = (ompDictCache.numPtr*2) + 128; + ompDictCache.msgPtrArray = realloc (ompDictCache.msgPtrArray, + sizeof (MsgSndDictPtr) * ompDictCache.size); + } + + + for (msgSendDict=messSndByRegexp; msgSendDict != NULL ; + msgSendDict=msgSendDict->hh.next) { + ompDictCache.msgPtrArray [count++] = msgSendDict; + } +} + + +static void addRegToPtrArrayCache (MsgSndDictPtr newReg) +{ + int count=0; + MsgSndDictPtr msgSendDict; + ompDictCache.numPtr = 0; + + for (msgSendDict=messSndByRegexp; msgSendDict != NULL ; + msgSendDict=msgSendDict->hh.next) { + ompDictCache.numPtr++; + } + + if (ompDictCache.numPtr >= ompDictCache.size) { + ompDictCache.size = (ompDictCache.numPtr*2) + 128; + ompDictCache.msgPtrArray = realloc (ompDictCache.msgPtrArray, + sizeof (MsgSndDictPtr) * ompDictCache.size); + for (msgSendDict=messSndByRegexp; msgSendDict != NULL ; + msgSendDict=msgSendDict->hh.next) { + ompDictCache.msgPtrArray [count++] = msgSendDict; + } + } else { + // on ajoute juste le nouveau pointeur + ompDictCache.msgPtrArray [ompDictCache.numPtr-1] = newReg; + } +} +#endif // OPENMP diff --git a/src/ivy.h b/src/ivy.h index 0ee4542..e7b28bf 100644 --- a/src/ivy.h +++ b/src/ivy.h @@ -26,7 +26,7 @@ extern "C" { #define DEFAULT_BUS 2010 -typedef struct _clnt_lst *IvyClientPtr; +typedef struct _clnt_lst_dict *IvyClientPtr; typedef enum { IvyApplicationConnected, IvyApplicationDisconnected } IvyApplicationEvent; typedef enum { IvyAddBind, IvyRemoveBind, IvyFilterBind, IvyChangeBind } IvyBindEvent; diff --git a/src/ivybuffer.c b/src/ivybuffer.c index a47eef5..7228f55 100644 --- a/src/ivybuffer.c +++ b/src/ivybuffer.c @@ -25,7 +25,7 @@ #include "ivybuffer.h" -#define BUFFER_SIZE 4096 /* taille buffer initiale on multiple par deux a chaque realloc */ +#define BUFFER_SIZE 4096 /* taille buffer initiale on multiple par deux a chaque realloc */ /* fonction de formtage a la printf d'un buffer avec reallocation dynamique */ int make_message(IvyBuffer* buffer, const char *fmt, va_list ap) @@ -81,5 +81,3 @@ int make_message_var(IvyBuffer* buffer, const char *fmt, ... ) return len; } - - diff --git a/src/ivysocket.c b/src/ivysocket.c index 91aabf3..d0fd60e 100644 --- a/src/ivysocket.c +++ b/src/ivysocket.c @@ -14,6 +14,10 @@ * copyright notice regarding this software */ +#ifdef OPENMP +#include +#endif + #ifdef WIN32 #include #endif @@ -71,6 +75,9 @@ struct _client { char *ptr; /* user data */ void *data; +#ifdef OPENMP + omp_lock_t fdLock; +#endif }; static Server servers_list = NULL; @@ -95,6 +102,10 @@ static void DeleteSocket(void *data) (*client->handle_delete) (client, client->data ); shutdown (client->fd, 2 ); close (client->fd ); +#ifdef OPENMP + omp_destroy_lock (&(client->fdLock)); +#endif + IVY_LIST_REMOVE (clients_list, client ); } @@ -204,6 +215,10 @@ static void HandleServer(Channel channel, HANDLE fd, void *data) client->ptr = client->buffer; client->handle_delete = server->handle_delete; client->data = (*server->create) (client ); +#ifdef OPENMP + omp_init_lock (&(client->fdLock)); +#endif + IVY_LIST_ADD_END (clients_list, client ); @@ -335,13 +350,17 @@ void SocketClose (Client client ) IvyChannelRemove (client->channel ); } -int SocketSendRaw (Client client, char *buffer, int len ) +int SocketSendRaw (const Client client, const char *buffer, const int len ) { int err; int waiting= 0; if (!client) return waiting; + +#ifdef OPENMP + omp_set_lock (&(client->fdLock)); +#endif if ( debug_send ) { @@ -373,6 +392,67 @@ int SocketSendRaw (Client client, char *buffer, int len ) { fprintf(stderr,"... OK\n"); } +#ifdef OPENMP + omp_unset_lock (&(client->fdLock)); +#endif + // GROS DEBUG +/* { */ +/* char *toPrint = strndup (buffer, len); */ +/* printf ("DBG> SocketSendRaw [%d] -> '%s'\n", len, toPrint); */ +/* free (toPrint); */ +/* } */ + // END DEBUG + + return waiting; +} + +int SocketSendRawWithId( const Client client, const char *id, const char *buffer, const int len ) +{ + int err; + int waiting= 0; + + if (!client) + return waiting; + +#ifdef OPENMP + omp_set_lock (&(client->fdLock)); +#endif + + if ( debug_send ) + { + /* try to determine if we are going to block */ + fd_set wrset; + int ready; + struct timeval timeout; + timeout.tv_usec = 0; + timeout.tv_sec = 0; + FD_ZERO(&wrset); + FD_SET( client->fd, &wrset ); + ready = select(client->fd+1, 0, &wrset, 0, &timeout); + /* fprintf(stderr,"Ivy: select ready=%d fd=%d\n",ready,FD_ISSET( client->fd, &wrset ) ); */ + if(ready < 0) { + perror("Ivy: SocketSendRaw select"); + } + if ( !FD_ISSET( client->fd, &wrset ) ) + { + fprintf(stderr,"Ivy: Client Queue Full Waiting.........."); + waiting = 1; + } + + } + + send (client->fd, id, strlen (id), 0 ); + err = send (client->fd, buffer, len, 0 ); + if (err != len ) + perror ("*** send ***"); + if ( debug_send && waiting ) + { + fprintf(stderr,"... OK\n"); + } +#ifdef OPENMP + omp_unset_lock (&(client->fdLock)); +#endif + return waiting; } @@ -386,6 +466,10 @@ int SocketSend (Client client, char *fmt, ... ) { int waiting = 0; static IvyBuffer buffer = {NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */ +#ifdef OPENMP +#pragma omp threadprivate (buffer) +#endif + va_list ap; int len; if (!client) @@ -407,6 +491,9 @@ void SocketBroadcast ( char *fmt, ... ) { Client client; static IvyBuffer buffer = {NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */ +#ifdef OPENMP +#pragma omp threadprivate (buffer) +#endif va_list ap; int len; @@ -480,8 +567,13 @@ Client SocketConnectAddr (struct in_addr * addr, unsigned short port, client->from.sin_family = AF_INET; client->from.sin_addr = *addr; client->from.sin_port = htons (port); + +#ifdef OPENMP + omp_init_lock (&(client->fdLock)); +#endif IVY_LIST_ADD_END(clients_list, client ); + return client; } /* TODO factoriser avec HandleRead !!!! */ @@ -601,6 +693,9 @@ Client SocketBroadcastCreate (unsigned short port, client->interpretation = interpretation; client->ptr = client->buffer; client->data = data; +#ifdef OPENMP + omp_init_lock (&(client->fdLock)); +#endif IVY_LIST_ADD_END(clients_list, client ); return client; @@ -610,6 +705,9 @@ void SocketSendBroadcast (Client client, unsigned long host, unsigned short port { struct sockaddr_in remote; static IvyBuffer buffer = { NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */ +#ifdef OPENMP +#pragma omp threadprivate (buffer) +#endif va_list ap; int err,len; diff --git a/src/ivysocket.h b/src/ivysocket.h index c656312..604e65f 100644 --- a/src/ivysocket.h +++ b/src/ivysocket.h @@ -66,7 +66,8 @@ extern void SocketServerClose( Server server ); extern void SocketClose( Client client ); extern int SocketSend( Client client, char *fmt, ... ); -extern int SocketSendRaw( Client client, char *buffer, int len ); +extern int SocketSendRaw( const Client client, const char *buffer, const int len ); +extern int SocketSendRawWithId( const Client client, const char *id, const char *buffer, const int len ); extern char *SocketGetPeerHost( Client client ); extern void SocketSetData( Client client, void *data ); extern void *SocketGetData( Client client ); diff --git a/src/list.h b/src/list.h index 968f619..2a7ea84 100644 --- a/src/list.h +++ b/src/list.h @@ -54,6 +54,9 @@ on place le code d'initialisation de l'objet entre START et END pour eviter de chainer un objet non initialise */ + + /* printf ("sizeof (*"#p") = %d\n", sizeof( *p )); \ */ \ + #define IVY_LIST_ADD_START(list, p ) \ if ((p = (TYPEOF(p)) (malloc( sizeof( *p ))))) \ { \ @@ -75,6 +78,9 @@ pour eviter de chainer un objet non initialise #define IVY_LIST_EACH_SAFE( list, p, next )\ for ( p = list ; (next = p ? p->next: p ),p ; p = next ) +#define IVY_LIST_IS_EMPTY( list ) \ + list == NULL + #define IVY_LIST_EMPTY( list ) \ { \ TYPEOF(list) p; \ diff --git a/src/timer.c b/src/timer.c index 4e38c52..9718b22 100644 --- a/src/timer.c +++ b/src/timer.c @@ -63,7 +63,7 @@ static long currentTime() static void SetNewTimeout( unsigned long current, unsigned long when ) { unsigned long time; - time = when - current; + time = (when <= current) ? 0 : when - current; nextTimeout = when; selectTimeout.tv_sec = time / MILLISEC; selectTimeout.tv_usec = (time - selectTimeout.tv_sec* MILLISEC) * MILLISEC; diff --git a/src/version.h b/src/version.h index 42d32fd..de934c8 100644 --- a/src/version.h +++ b/src/version.h @@ -26,4 +26,4 @@ * */ #define IVYMAJOR_VERSION 3 -#define IVYMINOR_VERSION 9 +#define IVYMINOR_VERSION 10 -- cgit v1.1