From ee2e694ebba179f1c75764a7311df717fa3925cd Mon Sep 17 00:00:00 2001 From: bustico Date: Wed, 6 Feb 2008 16:32:54 +0000 Subject: * fix realloc buffer size when big message * complete change of internal structures for performance optimisation * experimental parralelized version for performance optimisation (use and need openmp) which scale well for regexp matching on multicore/multi processor gear. --- src/ivy.c | 941 ++++++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 756 insertions(+), 185 deletions(-) (limited to 'src/ivy.c') diff --git a/src/ivy.c b/src/ivy.c index 635e006..0d146ee 100644 --- a/src/ivy.c +++ b/src/ivy.c @@ -2,12 +2,12 @@ * * Ivy, C interface * - * Copyright 1997-2000 + * Copyright 1997-2008 * Centre d'Etudes de la Navigation Aerienne * * Main functions * - * Authors: Francois-Regis Colin,Stephane Chatty + * Authors: Francois-Regis Colin,Stephane Chatty, Alexandre Bustico * * $Id$ * @@ -15,6 +15,17 @@ * copyright notice regarding this software */ +/* + TODO : ° version non bloquante + ° outil de chasse aux memleak + ° mesures de perfo + ° compil rejeu en monothread et omp et tests + ° compil sur mac et windows pour portabilité + */ + +#ifdef OPENMP +#include +#endif #include #ifdef WIN32 @@ -28,8 +39,11 @@ #include #include + + #include +#include "uthash.h" #include "intervalRegexp.h" #include "ivychannel.h" #include "ivysocket.h" @@ -56,7 +70,6 @@ static char* DefaultIvyBus = GenerateIvyBus(DEFAULT_DOMAIN,DEFAULT_BUS); typedef enum { - Bye, /* l'application emettrice se termine */ AddRegexp, /* expression reguliere d'un client */ Msg, /* message reel */ @@ -70,29 +83,51 @@ typedef enum { Pong /* ivy doit renvoyer ce message à la reception d'un ping */ } MsgType; -typedef struct _msg_snd *MsgSndPtr; + +typedef struct _global_reg_lst *GlobRegPtr; +typedef struct _msg_snd_dict *MsgSndDictPtr; + struct _msg_rcv { /* requete d'emission d'un client */ MsgRcvPtr next; int id; const char *regexp; /* regexp du message a recevoir */ - MsgCallback callback; /* callback a declanche a la reception */ + MsgCallback callback; /* callback a declancher a la reception */ void *user_data; /* stokage d'info client */ }; -struct _msg_snd { /* requete de reception d'un client */ - MsgSndPtr next; - int id; - char *str_regexp; /* la regexp sous forme inhumaine */ + + +/* liste de regexps source */ +struct _global_reg_lst { /* liste des regexp source */ + GlobRegPtr next; + char *str_regexp; /* la regexp sous forme source */ + int id; /* son id, differente pour chaque client */ +}; + + +/* pour le dictionnaire clef=regexp, valeur = cette struct */ +struct _msg_snd_dict { /* requete de reception d'un client */ + UT_hash_handle hh; /* makes this structure hashable */ + char *regexp_src; /* clef du dictionnaire (hash uthash) */ + IvyClientPtr clientList; /* liste des clients */ IvyBinding binding; /* la regexp sous forme machine */ }; -struct _clnt_lst { +/* liste de clients, champ de la struct _msg_snd_dict qui est valeur du dictionnaire */ +/* typedef IvyClientPtr */ +struct _clnt_lst_dict { IvyClientPtr next; Client client; /* la socket client */ - MsgSndPtr msg_send; /* liste des requetes recues */ char *app_name; /* nom de l'application */ unsigned short app_port; /* port de l'application */ + int id; /* l'id n'est pas liée uniquement + a la regexp, mais au couple + regexp, client */ + GlobRegPtr srcRegList; /* liste de regexp source */ +#ifdef OPENMP + int endRegexpReceived; +#endif // OPENMP }; /* flag pour le debug en cas de Filter de regexp */ @@ -113,35 +148,70 @@ static unsigned short SupervisionPort; /* client pour la socket supervision */ static Client broadcast; -static const char *ApplicationName = 0; -static const char *ApplicationID = 0; +static const char *ApplicationName = NULL; +static const char *ApplicationID = NULL; /* callback appele sur reception d'un message direct */ -static MsgDirectCallback direct_callback = 0; -static void *direct_user_data = 0; +static MsgDirectCallback direct_callback = NULL; +static void *direct_user_data = NULL; /* callback appele sur changement d'etat d'application */ static IvyApplicationCallback application_callback; -static void *application_user_data = 0; +static void *application_user_data = NULL; /* callback appele sur ajout suppression de regexp */ static IvyBindCallback application_bind_callback; -static void *application_bind_data = 0; +static void *application_bind_data = NULL; /* callback appele sur demande de terminaison d'application */ static IvyDieCallback application_die_callback; -static void *application_die_user_data = 0; +static void *application_die_user_data = NULL; /* liste des messages a recevoir */ -static MsgRcvPtr msg_recv = 0; +static MsgRcvPtr msg_recv = NULL; /* liste des clients connectes */ -static IvyClientPtr clients = 0; +static IvyClientPtr allClients = NULL; + +/* dictionnaire clef : regexp, valeur : liste de clients IvyClientPtr */ +static MsgSndDictPtr messSndByRegexp = NULL; -static const char *ready_message = 0; +static const char *ready_message = NULL; static void substituteInterval (IvyBuffer *src); +static int RegexpCall (const MsgSndDictPtr msg, const char * const message); +static int RegexpCallUnique (const MsgSndDictPtr msg, const char * const message, + const Client clientUnique); + +static void freeClient ( IvyClientPtr client); +static void delOneClient (const Client client); + +static void delRegexpForOneClientFromDictionary (const char *regexp, const IvyClientPtr client); +static void delAllRegexpsFromDictionary (); +static void addRegexpToDictionary (const char* regexp, IvyClientPtr client, int id, + IvyBinding bind); +static void changeRegexpInDictionary (const char* regexp, IvyClientPtr client, int id, + IvyBinding bind); + +static char delRegexpForOneClient (const char *regexp, const IvyClientPtr client, int id); +static void addRegexp (const char* regexp, IvyClientPtr client, int id, + IvyBinding bind); +static void changeRegexp (const char* regexp, IvyClientPtr client, int id, IvyBinding bind); +static void addOrChangeRegexp (const char* regexp, IvyClientPtr client, int id, IvyBinding bind); +static int IvyCheckBuffer( const char* buffer ); + +#ifdef OPENMP +static void regenerateRegPtrArrayCache (); +static void addRegToPtrArrayCache (MsgSndDictPtr newReg); +static struct { + MsgSndDictPtr *msgPtrArray; + int size; + int numPtr; +} ompDictCache = {NULL, 0, 0}; +#endif + + /* * function like strok but do not eat consecutive separator * */ @@ -169,70 +239,165 @@ static int MsgSendTo( Client client, MsgType msgtype, int id, const char *messag static void IvyCleanup() { IvyClientPtr clnt,next; + GlobRegPtr regLst; + /* destruction des connexions clients */ - IVY_LIST_EACH_SAFE( clients, clnt, next ) + IVY_LIST_EACH_SAFE( allClients, clnt, next ) { /* on dit au revoir */ MsgSendTo( clnt->client, Bye, 0, "" ); SocketClose( clnt->client ); - IVY_LIST_EMPTY( clnt->msg_send ); + IVY_LIST_EACH (clnt->srcRegList, regLst) { + if (regLst->str_regexp != NULL) { + free (regLst->str_regexp); + regLst->str_regexp = NULL; + } + } + IVY_LIST_EMPTY( clnt->srcRegList ); + IVY_LIST_REMOVE (allClients, clnt); } - IVY_LIST_EMPTY( clients ); + IVY_LIST_EMPTY( allClients ); + delAllRegexpsFromDictionary (); /* destruction des sockets serveur et supervision */ SocketServerClose( server ); SocketClose( broadcast ); } -static int MsgCall (const char *message, MsgSndPtr msg, IvyClientPtr client) + +static int +ClientCall (IvyClientPtr clnt, const char *message) { - int waiting = 0; - static IvyBuffer buffer = { NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */ - int err; - int index; - int arglen; - const char *arg; - - int rc= IvyBindingExec( msg->binding, message ); + int match_count = 0; + + /* pour toutes les regexp */ + MsgSndDictPtr msgSendDict; + + for (msgSendDict=messSndByRegexp; msgSendDict != NULL; + msgSendDict=msgSendDict->hh.next) { + match_count += RegexpCallUnique (msgSendDict, message, clnt->client); + } + + TRACE_IF( match_count == 0, "Warning no recipient for %s\n",message); + /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */ + if ( match_count == 0 && debug_filter ) { + IvyBindindFilterCheck( message ); + } + return match_count; +} + + + +static int +RegexpCall (const MsgSndDictPtr msg, const char * const message) +{ + static IvyBuffer bufferArg = {NULL, 0, 0}; + char bufferId[16]; + int match_count ; + int waiting ; + int indx; + int arglen; + const char *arg; + IvyClientPtr clnt; + int rc; + +#ifdef OPENMP +#pragma omp threadprivate (bufferArg) + /* d'après la doc openmp : + Variables with automatic storage duration which are declared in a scope inside the + construct are private. + Il n'y aurait donc rien à faire pour s'assurer que les variables automatiques soient + privées au thread */ +#endif // OPENMP + + match_count = 0; + waiting = 0; + rc= IvyBindingExec(msg->binding, message ); - if (rc<1) return 0; /* no match */ + if (rc<1) return 0; /* no match */ - TRACE( "Sending message id=%d '%s'\n",msg->id,message); + bufferArg.offset = 0; + // bufferArg.size = bufferId.size = 0; + // bufferArg.data = bufferId.data = NULL; - buffer.offset = 0; - /* il faut essayer d'envoyer le message en une seule fois sur la socket */ - /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */ - /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */ - err = make_message_var( &buffer, "%d %d" ARG_START ,Msg, msg->id); + /* il faut essayer d'envoyer le message en une seule fois sur la socket */ + /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */ + /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */ - TRACE( "Send matching args count %d\n",rc); + TRACE( "Send matching args count %d\n",rc); + + for( indx=1; indx < rc ; indx++ ) + { + IvyBindingMatch (msg->binding, message, indx, &arglen, & arg ); + make_message_var( &bufferArg, "%.*s" ARG_END , arglen, arg ); + } + make_message_var( &bufferArg, "\n"); - for( index=1; index < rc ; index++ ) - { - IvyBindingMatch( msg->binding, message, index, &arglen, & arg ); - err = make_message_var( &buffer, "%.*s" ARG_END , arglen, arg ); - } - err = make_message_var( &buffer, "\n"); - waiting = SocketSendRaw(client->client, buffer.data , buffer.offset); - if ( waiting ) - fprintf(stderr, "Ivy: Slow client : %s\n", client->app_name ); - return 1; -} + IVY_LIST_EACH(msg->clientList, clnt ) { + sprintf (bufferId, "%d %d" ARG_START ,Msg, clnt->id); + waiting = SocketSendRawWithId(clnt->client, bufferId, bufferArg.data , bufferArg.offset); + match_count++; + if ( waiting ) + fprintf(stderr, "Ivy: Slow client : %s\n", clnt->app_name ); + } + return match_count; +} static int -ClientCall (IvyClientPtr clnt, const char *message) +RegexpCallUnique (const MsgSndDictPtr msg, const char * const message, const + Client clientUnique) { - MsgSndPtr msg; - int match_count = 0; - /* recherche dans la liste des requetes recues de ce client */ - IVY_LIST_EACH (clnt->msg_send, msg) { - match_count+= MsgCall (message, msg, clnt); - } - return match_count; + static IvyBuffer bufferArg = {NULL, 0, 0}; + char bufferId[16]; + int match_count ; + int waiting ; + int indx; + int arglen; + const char *arg; + IvyClientPtr clnt; + int rc; + + match_count = 0; + waiting = 0; + rc= IvyBindingExec(msg->binding, message ); + + if (rc<1) return 0; /* no match */ + + bufferArg.offset = 0; + // bufferArg.size = bufferId. + // bufferArg.size = bufferId.size = 0; + // bufferArg.data = bufferId.data = NULL; + + /* il faut essayer d'envoyer le message en une seule fois sur la socket */ + /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */ + /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */ + + TRACE( "Send matching args count %d\n",rc); + + for( indx=1; indx < rc ; indx++ ) + { + IvyBindingMatch (msg->binding, message, indx, &arglen, & arg ); + make_message_var( &bufferArg, "%.*s" ARG_END , arglen, arg ); + } + make_message_var( &bufferArg, "\n"); + + IVY_LIST_EACH(msg->clientList, clnt ) { + if (clientUnique != clnt->client) + continue; + sprintf (bufferId, "%d %d" ARG_START ,Msg, clnt->id); + waiting = SocketSendRawWithId(clnt->client, bufferId, bufferArg.data , bufferArg.offset); + match_count++; + if ( waiting ) + fprintf(stderr, "Ivy: Slow client : %s\n", clnt->app_name ); + } + return match_count; } + + + static int CheckConnected( IvyClientPtr clnt ) { IvyClientPtr client; @@ -242,7 +407,7 @@ static int CheckConnected( IvyClientPtr clnt ) if ( clnt->app_port == 0 ) /* Old Ivy Protocol Dont check */ return 0; /* recherche dans la liste des clients de la presence de clnt */ - IVY_LIST_EACH( clients, client ) + IVY_LIST_EACH( allClients, client ) { /* client different mais port identique */ if ( (client != clnt) && (clnt->app_port == client->app_port) ) @@ -265,18 +430,19 @@ static int CheckConnected( IvyClientPtr clnt ) return 0; } + + static void Receive( Client client, void *data, char *line ) { IvyClientPtr clnt; int err,id; - MsgSndPtr snd; MsgRcvPtr rcv; int argc = 0; char *argv[MAX_MATCHING_ARGS]; char *arg; int kind_of_msg = Bye; - IvyBinding bind; - + IvyBinding ivyBind; + const char *errbuf; int erroffset; @@ -311,7 +477,7 @@ static void Receive( Client client, void *data, char *line ) if ( !IvyBindingFilter( arg ) ) { - TRACE("Warning: regexp '%s' illegal, removing from %s\n",arg,ApplicationName); + TRACE("Warning: regexp '%s' filtered, removing from %s\n",arg,ApplicationName); if ( application_bind_callback ) { @@ -320,58 +486,33 @@ static void Receive( Client client, void *data, char *line ) return; } - bind = IvyBindingCompile( arg, & erroffset, & errbuf ); - if ( bind != NULL ) - { - /* On teste si c'est un change regexp : changement de regexp d'une id existante */ - IVY_LIST_ITER( clnt->msg_send, snd, ( id != snd->id )); - if ( snd ) { - free (snd->str_regexp); - snd->str_regexp = strdup( arg ); - snd->binding = bind; - if ( application_bind_callback ) - { - (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyChangeBind ); - } - } else { - IVY_LIST_ADD_START( clnt->msg_send, snd ) - snd->id = id; - snd->str_regexp = strdup( arg ); - snd->binding = bind; - if ( application_bind_callback ) - { - (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyAddBind ); - } - IVY_LIST_ADD_END( clnt->msg_send, snd ) - - } - } - else - { - printf("Error compiling '%s', %s\n", arg, errbuf); - MsgSendTo( client, Error, erroffset, errbuf ); + ivyBind = IvyBindingCompile( arg, & erroffset, & errbuf ); + if ( ivyBind != NULL ) { + addOrChangeRegexp (arg, clnt, id, ivyBind); + } else { + printf("Error compiling '%s', %s\n", arg, errbuf); + MsgSendTo( client, Error, erroffset, errbuf ); } break; case DelRegexp: - - TRACE("Regexp Delete id=%d\n", id); - - IVY_LIST_ITER( clnt->msg_send, snd, ( id != snd->id )); - if ( snd ) - { - if ( application_bind_callback ) - { - (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyRemoveBind ); - } - IvyBindingFree( snd->binding ); - IVY_LIST_REMOVE( clnt->msg_send, snd ); - } - break; + + TRACE("Regexp Delete id=%d\n", id); + if (delRegexpForOneClient (arg, clnt, id)) { + if ( application_bind_callback ) { + (*application_bind_callback)( clnt, application_bind_data, id, arg, + IvyRemoveBind ); + } + } + break; case StartRegexp: TRACE("Regexp Start id=%d Application='%s'\n", id, arg); +#ifdef OPENMP + clnt->endRegexpReceived=0; +#endif // OPENMP + clnt->app_name = strdup( arg ); clnt->app_port = id; if ( CheckConnected( clnt ) ) @@ -395,9 +536,13 @@ static void Receive( Client client, void *data, char *line ) { int count; count = ClientCall( clnt, ready_message ); - - TRACE(" Sendind ready message %d\n", count); + // count = IvySendMsg ("%s", ready_message ); + // printf ("%s sending READY MESSAGE %d\n", clnt->app_name, count); } +#ifdef OPENMP + clnt->endRegexpReceived=1; + regenerateRegPtrArrayCache(); +#endif // OPENMP break; case Msg: @@ -461,9 +606,7 @@ static IvyClientPtr SendService( Client client, const char *appname ) { IvyClientPtr clnt; MsgRcvPtr msg; - IVY_LIST_ADD_START( clients, clnt ) - - clnt->msg_send = 0; + IVY_LIST_ADD_START( allClients, clnt ) clnt->client = client; clnt->app_name = strdup(appname); clnt->app_port = 0; @@ -474,51 +617,44 @@ static IvyClientPtr SendService( Client client, const char *appname ) } MsgSendTo( client, EndRegexp, 0, ""); - IVY_LIST_ADD_END( clients, clnt ) + IVY_LIST_ADD_END( allClients, clnt ) + //printf ("DBG> SendService addAllClient: name=%s; client->client=%p\n", appname, clnt->client); + return clnt; } static void ClientDelete( Client client, void *data ) { IvyClientPtr clnt; - MsgSndPtr msg; + #ifdef DEBUG char *remotehost; unsigned short remoteport; #endif clnt = (IvyClientPtr)data; - if ( application_callback ) - { - (*application_callback)( clnt, application_user_data, IvyApplicationDisconnected ); - } + if ( application_callback ) { + (*application_callback)( clnt, application_user_data, IvyApplicationDisconnected ); + } #ifdef DEBUG /* probably bogus call, but this is for debug only anyway */ SocketGetRemoteHost( client, &remotehost, &remoteport ); TRACE("Deconnexion de %s:%hu\n", remotehost, remoteport ); #endif /*DEBUG */ - - if ( clnt->app_name ) free( clnt->app_name ); - IVY_LIST_EACH( clnt->msg_send, msg) - { - /*regfree(msg->regexp);*/ - free( msg->str_regexp); - } - IVY_LIST_EMPTY( clnt->msg_send ); - IVY_LIST_REMOVE( clients, clnt ); + delOneClient (client); } static void *ClientCreate( Client client ) { -#ifdef DEBUG - char *remotehost; - unsigned short remoteport; - SocketGetRemoteHost( client, &remotehost, &remoteport ); - TRACE("Connexion de %s:%hu\n", remotehost, remoteport ); -#endif /*DEBUG */ - - return SendService (client, "Unknown"); + // #ifdef DEBUG + char *remotehost; + unsigned short remoteport; + SocketGetRemoteHost( client, &remotehost, &remoteport ); + printf ("%s : Connexion de %s:%hu client=%p\n", + ApplicationName, remotehost, remoteport, client); + // #endif /*DEBUG */ + return SendService (client, "Unknown"); } static void BroadcastReceive( Client client, void *data, char *line ) @@ -529,17 +665,14 @@ static void BroadcastReceive( Client client, void *data, char *line ) unsigned short serviceport; char appid[2048]; char appname[2048]; -#ifdef DEBUG unsigned short remoteport; char *remotehost = 0; -#endif + memset( appid, 0, sizeof( appid ) ); memset( appname, 0, sizeof( appname ) ); err = sscanf (line,"%d %hu %s %[^\n]", &version, &serviceport, appid, appname); if ( err < 2 ) { /* ignore the message */ - unsigned short remoteport; - char *remotehost; SocketGetRemoteHost (client, &remotehost, &remoteport ); printf (" Bad supervision message, expected 'version port' from %s:%d\n", remotehost, remoteport); @@ -547,8 +680,6 @@ static void BroadcastReceive( Client client, void *data, char *line ) } if ( version != VERSION ) { /* ignore the message */ - unsigned short remoteport; - char *remotehost = 0; SocketGetRemoteHost (client, &remotehost, &remoteport ); fprintf (stderr, "Bad Ivy version, expected %d and got %d from %s:%d\n", VERSION, version, remotehost, remoteport); @@ -569,6 +700,9 @@ static void BroadcastReceive( Client client, void *data, char *line ) IvyClientPtr clnt; clnt = SendService( app, appname ); SocketSetData( app, clnt); + } else { + printf ("SocketConnectAddr error .....\n"); + SocketSetData( app, NULL); } } static unsigned long currentTime() @@ -641,7 +775,7 @@ void IvyStart (const char* bus) int error = 0; const char* p = bus; /* used for decoding address list */ const char* q; /* used for decoding port number */ - int port; + unsigned short port; /* @@ -750,7 +884,7 @@ IvyUnbindMsg (MsgRcvPtr msg) { IvyClientPtr clnt; /* Send to already connected clients */ - IVY_LIST_EACH (clients, clnt ) { + IVY_LIST_EACH (allClients, clnt ) { MsgSendTo( clnt->client, DelRegexp,msg->id, ""); } IVY_LIST_REMOVE( msg_recv, msg ); @@ -783,7 +917,7 @@ IvyBindMsg (MsgCallback callback, void *user_data, const char *fmt_regex, ... ) IVY_LIST_ADD_END( msg_recv, msg ) /* Send to already connected clients */ /* recherche dans la liste des requetes recues de mes clients */ - IVY_LIST_EACH( clients, clnt ) { + IVY_LIST_EACH( allClients, clnt ) { MsgSendTo( clnt->client, AddRegexp,msg->id,msg->regexp); } return msg; @@ -809,15 +943,115 @@ IvyChangeMsg (MsgRcvPtr msg, const char *fmt_regex, ... ) /* Send to already connected clients */ /* recherche dans la liste des requetes recues de mes clients */ - IVY_LIST_EACH( clients, clnt ) { + IVY_LIST_EACH( allClients, clnt ) { MsgSendTo( clnt->client, AddRegexp,msg->id,msg->regexp); } return msg; } + + + +int IvySendMsg(const char *fmt, ...) /* version dictionnaire */ +{ + int match_count = 0; + static IvyBuffer buffer = { NULL, 0, 0}; /* Use static mem to eliminate multiple call to malloc /free */ + va_list ap; + + /* construction du buffer message à partir du format et des arguments */ + if( fmt == 0 || strlen(fmt) == 0 ) return 0; + va_start( ap, fmt ); + buffer.offset = 0; + make_message( &buffer, fmt, ap ); + va_end ( ap ); + + /* test du contenu du message */ + if ( debug_binary_msg ) + { + if ( IvyCheckBuffer( buffer.data ) ) + return 0; + } + + /* pour toutes les regexp */ + +#ifdef OPENMP + //#define TABLEAU_PREALABLE_SEQUENTIEL 1 +#define TABLEAU_PREALABLE 1 + //#define SINGLE_NOWAIT 1 + //#define SCHEDULE_GUIDED 1 + //#define SEQUENTIEL_DEBUG 1 + +#ifdef SCHEDULE_GUIDED + int count; +#pragma omp parallel default(none) private(count) shared(ompDictCache, buffer) \ + reduction(+:match_count) + { +#pragma omp for schedule(guided) // après debug mettre schedule(guided, 10) + for(count=0; counthh.next) { +#pragma omp single nowait + match_count += RegexpCall (msgSendDict, buffer.data); + } +#endif // SINGLE_NOWAIT + +#ifdef SEQUENTIEL_DEBUG // OPEMMP LISTE + MsgSndDictPtr msgSendDict; + + for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) { + match_count += RegexpCall (msgSendDict, buffer.data); + } +#endif // SEQUENTIEL_DEBUG + + + +#else // PAS OPENMP + MsgSndDictPtr msgSendDict; + + for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) { + match_count += RegexpCall (msgSendDict, buffer.data); + } +#endif + + TRACE_IF( match_count == 0, "Warning no recipient for %s\n",buffer.data); + /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */ + if ( match_count == 0 && debug_filter ) + { + IvyBindindFilterCheck( buffer.data ); + } + return match_count; +} + + /* teste de la presence de binaire dans les message Ivy */ static int IvyCheckBuffer( const char* buffer ) { - char * ptr = buffer; + const char * ptr = buffer; while ( *ptr ) { if ( *ptr++ < ' ' ) @@ -829,42 +1063,11 @@ static int IvyCheckBuffer( const char* buffer ) } return 0; } -/* emmission d'un message avec formatage a la printf */ -int IvySendMsg(const char *fmt, ...) -{ - IvyClientPtr clnt; - int match_count = 0; - static IvyBuffer buffer = { NULL, 0, 0}; /* Use satic mem to eliminate multiple call to malloc /free */ - va_list ap; - - if( fmt == 0 || strlen(fmt) == 0 ) return 0; - va_start( ap, fmt ); - buffer.offset = 0; - make_message( &buffer, fmt, ap ); - va_end ( ap ); - /* test du contenu du message */ - if ( debug_binary_msg ) - { - if ( IvyCheckBuffer( buffer.data ) ) - return 0; - } - - /* recherche dans la liste des requetes recues de mes clients */ - IVY_LIST_EACH (clients, clnt) { - match_count += ClientCall (clnt, buffer.data); - } - TRACE_IF( match_count == 0, "Warning no recipient for %s\n",buffer.data); - /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */ - if ( match_count == 0 && debug_filter ) - { - IvyBindindFilterCheck( buffer.data ); - } - return match_count; -} + void IvySendError( IvyClientPtr app, int id, const char *fmt, ... ) { - static IvyBuffer buffer = { NULL, 0, 0}; /* Use satic mem to eliminate multiple call to malloc /free */ + static IvyBuffer buffer = { NULL, 0, 0}; /* Use static mem to eliminate multiple call to malloc /free */ va_list ap; va_start( ap, fmt ); @@ -943,7 +1146,7 @@ void IvyDefaultBindCallback( IvyClientPtr app, void *user_data, int id, char* re IvyClientPtr IvyGetApplication( char *name ) { IvyClientPtr app = 0; - IVY_LIST_ITER( clients, app, strcmp(name, app->app_name) != 0 ); + IVY_LIST_ITER( allClients, app, strcmp(name, app->app_name) != 0 ); return app; } @@ -952,7 +1155,7 @@ char *IvyGetApplicationList(const char *sep) static char applist[4096]; /* TODO remove that ugly Thing */ IvyClientPtr app; applist[0] = '\0'; - IVY_LIST_EACH( clients, app ) + IVY_LIST_EACH( allClients, app ) { strcat( applist, app->app_name ); strcat( applist, sep ); @@ -964,11 +1167,11 @@ char **IvyGetApplicationMessages( IvyClientPtr app ) { #define MAX_REGEXP 4096 static char *messagelist[MAX_REGEXP+1];/* TODO remove that ugly Thing */ - MsgSndPtr msg; + GlobRegPtr msg; int msgCount= 0; memset( messagelist, 0 , sizeof( messagelist )); /* recherche dans la liste des requetes recues de ce client */ - IVY_LIST_EACH( app->msg_send, msg ) + IVY_LIST_EACH( app->srcRegList, msg ) { messagelist[msgCount++]= msg->str_regexp; if ( msgCount >= MAX_REGEXP ) @@ -1021,3 +1224,371 @@ static void substituteInterval (IvyBuffer *src) src->data = dst.data; } } + + +static void freeClient ( IvyClientPtr client) +{ + GlobRegPtr srcReg; + + /* on libere la chaine nom de l'appli*/ + if (client->app_name != NULL) { + free (client->app_name); + client->app_name = NULL; + /* on libere la liste des clients */ + IVY_LIST_EACH (client->srcRegList, srcReg) { + if (srcReg->str_regexp != NULL) { + free (srcReg->str_regexp); + srcReg->str_regexp = NULL; + } + } + IVY_LIST_EMPTY (client->srcRegList); + } +} + + + + + + + +static void delRegexpForOneClientFromDictionary (const char *regexp, const IvyClientPtr client) +{ + MsgSndDictPtr msgSendDict = NULL; + IvyClientPtr client_itr, next; + TRACE ("ENTER delRegexpForOneClientFromDictionary clnt=%d, reg='%s'\n", client, regexp); + + HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict); + if (msgSendDict != NULL) { + /* la clef est trouvée, on itere sur la liste de client associée */ + IVY_LIST_EACH_SAFE ( msgSendDict->clientList, client_itr, next) { + /* pour tester 2 IvyClientPtr, on teste la similarité + des pointeur Client qui doivent être uniques */ + if (client_itr->client == client->client) { + /* on a trouve le client : on l'enleve */ + IVY_LIST_REMOVE (msgSendDict->clientList, client_itr); + TRACE ("delRegexpForOneClientFromDictionary : IVY_LIST_REMOVE\n"); + } + } + /* si la liste de clients associée à cette regexp est vide */ + if ((msgSendDict->clientList == NULL) || (IVY_LIST_IS_EMPTY (msgSendDict->clientList))) { + TRACE ("delRegexpForOneClientFromDictionary : IvyBindingFree, free, hash_del\n"); + /* on efface le binding */ + IvyBindingFree (msgSendDict->binding); + /* on enlève l'entrée regexp de la table de hash */ + HASH_DEL (messSndByRegexp, msgSendDict); + /* on efface la clef (regexp source) */ + free (msgSendDict->regexp_src); + } + } +#ifdef OPENMP + regenerateRegPtrArrayCache (); +#endif +} + + + + +static void delAllRegexpsFromDictionary () +{ + MsgSndDictPtr msgSendDict; + IvyClientPtr client; + + /* pour toutes les entrees du dictionnaire des regexps */ + for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) { + /* on efface le binding */ + IvyBindingFree (msgSendDict->binding); + /* on efface la clef (regexp source) */ + free (msgSendDict->regexp_src); + /* pour chaque client abonne a cette regexp */ + IVY_LIST_EACH ( msgSendDict->clientList, client) { + freeClient (client); + } + /* on enleve la liste de regexps */ + IVY_LIST_EMPTY(msgSendDict->clientList); + /* on enleve le couple regexp -> valeur */ + HASH_DEL(messSndByRegexp, msgSendDict); + } + +#ifdef OPENMP + regenerateRegPtrArrayCache (); +#endif +} + + + + + + + + + +// HASH_ADD_KEYPTR (hh_name, head, key_ptr, key_len, item_ptr) +// HASH_ADD_STR ( head, keyfield_name, item_ptr) + +static void addRegexpToDictionary (const char* regexp, IvyClientPtr client, int id, + IvyBinding ivyBind) +{ + MsgSndDictPtr msgSendDict = NULL; + IvyClientPtr newClient = NULL; + /* on cherche si une entrée existe deja pour cette regexp source */ + HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict); + /* l'entree n'existe pas dans le dictionnaire : on la cree */ + if (msgSendDict == NULL) { + msgSendDict = malloc (sizeof (struct _msg_snd_dict)); + msgSendDict->regexp_src = strdup (regexp); + msgSendDict->binding = ivyBind; + msgSendDict->clientList = NULL; + + /* HASH_ADD_STR ne fonctionne que si la clef est un tavbleau de char, si c'est un pointeur + if faut utiliser HASH_ADD_KEYPTR */ + HASH_ADD_KEYPTR(hh, messSndByRegexp, msgSendDict->regexp_src, strlen (msgSendDict->regexp_src), msgSendDict); +#ifdef OPENMP + // On ne regenere le cache qu'après recpetion du endregexp, ça permet d'eviter + // de regenerer inutilement le cache à chaqye nouvelle regexp initiale + // par contre, après le end regexp, il faut regenerer le cache à chaque + // nouvel abonnement + if (client->endRegexpReceived == 1) + addRegToPtrArrayCache (msgSendDict); +#endif + } + + + /* on ajoute le client à la liste des clients abonnés */ + IVY_LIST_ADD_START (msgSendDict->clientList, newClient); + newClient->app_name = strdup (client->app_name); + newClient->app_port = client->app_port; + newClient->client = client->client; + newClient->id = id; + /* au niveau du champ liste de client du dictionnaire, on n'a pas besoin + de la liste des regexps sources (qui n'est necessaire que pour + la liste globale des clients) */ + newClient->srcRegList = NULL; + IVY_LIST_ADD_END (msgSendDict->clientList, newClient); +} + + + +static void changeRegexpInDictionary (const char* regexp, IvyClientPtr client, + int id, IvyBinding ivyBind) +{ + // printf ("ENTER changeRegexpInDictionary\n"); + delRegexpForOneClientFromDictionary (regexp, client); + addRegexpToDictionary (regexp, client, id, ivyBind); +} + + + + +/* met a jour le dictionnaire et la liste globale */ +static void delOneClient (const Client client) +{ + IvyClientPtr client_itr; + GlobRegPtr regxpSrc =NULL, next; + + TRACE ("ENTER delOneClient\n"); + /* on cherche le client dans la liste globale des clients */ + IVY_LIST_ITER (allClients, client_itr, client_itr->client != client ); + /* si on le trouve */ + if (client_itr != NULL) { + TRACE ("DEBUG delOneClient %s, client_itr->client=%p, clientRef=%p\n", + client_itr->app_name, client_itr->client, client); + + + + + /* pour chaque regexp source de ce client */ + IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) { + /* on met a jour la liste des clients associee a la regexp source */ + delRegexpForOneClient (regxpSrc->str_regexp, client_itr, regxpSrc->id); + /* on libere la memoire associee a la regexp source */ + if (regxpSrc->str_regexp != NULL) { + free (regxpSrc->str_regexp); + regxpSrc->str_regexp = NULL; + } + } + + + /* on libere la liste de regexp source */ + IVY_LIST_EMPTY (client_itr->srcRegList); + /* on enleve l'entree correspondant a ce client dans la liste globale */ + IVY_LIST_REMOVE (allClients, client_itr); + } +} + + +static char delRegexpForOneClient (const char *regexp, const IvyClientPtr client, int id) +{ + IvyClientPtr client_itr = NULL; + GlobRegPtr regxpSrc = NULL, next = NULL; + char removed = 0; + + TRACE ("ENTER delRegexpForOneClient id=%d\n", id); + + /* on enleve du dictionnaire */ + + delRegexpForOneClientFromDictionary (regexp, client); + + /* on enleve de la liste globale */ + /* recherche du client */ + IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client ); + if (client_itr != NULL) { + /* pour chaque regexp source de ce client */ + IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) { + /* si on trouve notre regexp, on la supprime */ + if (regxpSrc->id == id) { + removed = 1; + if (regxpSrc->str_regexp != NULL) { + free (regxpSrc->str_regexp); + regxpSrc->str_regexp = NULL; + } + TRACE ("DBG> IVY_LIST_REMOVE (%p, %p)\n", client_itr->srcRegList, regxpSrc); + IVY_LIST_REMOVE (client_itr->srcRegList, regxpSrc); + } + } + } + return (removed); +} + +static void addOrChangeRegexp (const char* regexp, IvyClientPtr client, int id, + IvyBinding ivyBind) +{ + MsgSndDictPtr msgSendDict = NULL; + IvyClientPtr client_itr = NULL; + + // printf ("ENTER addOrChangeRegexp\n"); + /* on teste si la regexp existe deja et si il faut faire un changeRegexp */ + HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict); + /* la regexp n'existe pas du tout */ + if (msgSendDict == NULL) { + addRegexp (regexp, client, id, ivyBind); + } else { + /* la regexp existe, mais l'id existe elle pour le client */ + IVY_LIST_ITER( msgSendDict->clientList, client_itr, ( client_itr->client != client->client)); + if (( client_itr != NULL) && (client_itr->id == id)) { + /* si oui on fait un change regexp */ + changeRegexp (regexp, client, id, ivyBind); + } else { + /* si non on fait un add regexp */ + addRegexp (regexp, client, id, ivyBind); + } + } +} + + +static void addRegexp (const char* regexp, IvyClientPtr client, int id, + IvyBinding ivyBind) +{ + IvyClientPtr client_itr = NULL; + GlobRegPtr regxpSrc = NULL; + + + // printf ("ENTER addRegexp\n"); + + /* on ajoute au dictionnaire */ + addRegexpToDictionary (regexp, client, id, ivyBind); + + /* on ajoute a la liste globale */ + /* recherche du client */ + IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client ); + + /* si le client n'existe pas, faut le creer */ + if (client_itr == NULL) { +/* IVY_LIST_ADD_START (allClients, client_itr); */ +/* client_itr->app_name = strdup (client->app_name); */ +/* client_itr->app_port = client->app_port; */ +/* client_itr->client = client->client; */ +/* client_itr->srcRegList = NULL; */ +/* IVY_LIST_ADD_END (allClients, client_itr); */ + fprintf(stderr, "addRegexp ERROR\n"); + } + + /* on ajoute la regexp à la liste de regexps */ + IVY_LIST_ADD_START (client_itr->srcRegList, regxpSrc); + regxpSrc->id = id; + regxpSrc->str_regexp = strdup (regexp); + IVY_LIST_ADD_END (client_itr->srcRegList, regxpSrc); + if (application_bind_callback) { + (*application_bind_callback)( client, application_bind_data, id, regexp, IvyAddBind); + } +} + + + +static void changeRegexp (const char* regexp, IvyClientPtr client, int id, + IvyBinding ivyBind) +{ + IvyClientPtr client_itr = NULL; + GlobRegPtr regxpSrc = NULL, next = NULL; + /* on change dans le dictionnaire */ + // printf ("ENTER changeRegexp\n"); + changeRegexpInDictionary (regexp, client, id , ivyBind); + + /* on change dans la liste globale */ + /* recherche du client */ + IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client ); + if (client_itr != NULL) { + /* pour chaque regexp source de ce client */ + IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) { + /* si on trouve notre regexp, on la change */ + if (regxpSrc->id == id) { + free (regxpSrc->str_regexp); + regxpSrc->str_regexp = strdup (regexp); + } + } + } + if (application_bind_callback) { + (*application_bind_callback)( client, application_bind_data, id, regexp, IvyChangeBind); + } +} + +#ifdef OPENMP +static void regenerateRegPtrArrayCache () +{ + int count=0; + MsgSndDictPtr msgSendDict; + ompDictCache.numPtr = 0; + + for (msgSendDict=messSndByRegexp; msgSendDict != NULL ; + msgSendDict=msgSendDict->hh.next) { + ompDictCache.numPtr++; + } + + if (ompDictCache.numPtr >= ompDictCache.size) { + ompDictCache.size = (ompDictCache.numPtr*2) + 128; + ompDictCache.msgPtrArray = realloc (ompDictCache.msgPtrArray, + sizeof (MsgSndDictPtr) * ompDictCache.size); + } + + + for (msgSendDict=messSndByRegexp; msgSendDict != NULL ; + msgSendDict=msgSendDict->hh.next) { + ompDictCache.msgPtrArray [count++] = msgSendDict; + } +} + + +static void addRegToPtrArrayCache (MsgSndDictPtr newReg) +{ + int count=0; + MsgSndDictPtr msgSendDict; + ompDictCache.numPtr = 0; + + for (msgSendDict=messSndByRegexp; msgSendDict != NULL ; + msgSendDict=msgSendDict->hh.next) { + ompDictCache.numPtr++; + } + + if (ompDictCache.numPtr >= ompDictCache.size) { + ompDictCache.size = (ompDictCache.numPtr*2) + 128; + ompDictCache.msgPtrArray = realloc (ompDictCache.msgPtrArray, + sizeof (MsgSndDictPtr) * ompDictCache.size); + for (msgSendDict=messSndByRegexp; msgSendDict != NULL ; + msgSendDict=msgSendDict->hh.next) { + ompDictCache.msgPtrArray [count++] = msgSendDict; + } + } else { + // on ajoute juste le nouveau pointeur + ompDictCache.msgPtrArray [ompDictCache.numPtr-1] = newReg; + } +} +#endif // OPENMP -- cgit v1.1