summaryrefslogtreecommitdiff
path: root/src/ivy.c
diff options
context:
space:
mode:
authorbustico2008-02-06 16:32:54 +0000
committerbustico2008-02-06 16:32:54 +0000
commitee2e694ebba179f1c75764a7311df717fa3925cd (patch)
treea9da6c6d525241725597c7d641c90436239c91d9 /src/ivy.c
parentc50b5d38c1fc5491e8c99c8a86b043d39e074acc (diff)
downloadivy-c-ee2e694ebba179f1c75764a7311df717fa3925cd.zip
ivy-c-ee2e694ebba179f1c75764a7311df717fa3925cd.tar.gz
ivy-c-ee2e694ebba179f1c75764a7311df717fa3925cd.tar.bz2
ivy-c-ee2e694ebba179f1c75764a7311df717fa3925cd.tar.xz
* fix realloc buffer size when big message
* complete change of internal structures for performance optimisation * experimental parralelized version for performance optimisation (use and need openmp) which scale well for regexp matching on multicore/multi processor gear.
Diffstat (limited to 'src/ivy.c')
-rw-r--r--src/ivy.c941
1 files changed, 756 insertions, 185 deletions
diff --git a/src/ivy.c b/src/ivy.c
index 635e006..0d146ee 100644
--- a/src/ivy.c
+++ b/src/ivy.c
@@ -2,12 +2,12 @@
*
* Ivy, C interface
*
- * Copyright 1997-2000
+ * Copyright 1997-2008
* Centre d'Etudes de la Navigation Aerienne
*
* Main functions
*
- * Authors: Francois-Regis Colin,Stephane Chatty
+ * Authors: Francois-Regis Colin,Stephane Chatty, Alexandre Bustico
*
* $Id$
*
@@ -15,6 +15,17 @@
* copyright notice regarding this software
*/
+/*
+ TODO : ° version non bloquante
+ ° outil de chasse aux memleak
+ ° mesures de perfo
+ ° compil rejeu en monothread et omp et tests
+ ° compil sur mac et windows pour portabilité
+ */
+
+#ifdef OPENMP
+#include <omp.h>
+#endif
#include <stdlib.h>
#ifdef WIN32
@@ -28,8 +39,11 @@
#include <stdarg.h>
#include <ctype.h>
+
+
#include <fcntl.h>
+#include "uthash.h"
#include "intervalRegexp.h"
#include "ivychannel.h"
#include "ivysocket.h"
@@ -56,7 +70,6 @@
static char* DefaultIvyBus = GenerateIvyBus(DEFAULT_DOMAIN,DEFAULT_BUS);
typedef enum {
-
Bye, /* l'application emettrice se termine */
AddRegexp, /* expression reguliere d'un client */
Msg, /* message reel */
@@ -70,29 +83,51 @@ typedef enum {
Pong /* ivy doit renvoyer ce message à la reception d'un ping */
} MsgType;
-typedef struct _msg_snd *MsgSndPtr;
+
+typedef struct _global_reg_lst *GlobRegPtr;
+typedef struct _msg_snd_dict *MsgSndDictPtr;
+
struct _msg_rcv { /* requete d'emission d'un client */
MsgRcvPtr next;
int id;
const char *regexp; /* regexp du message a recevoir */
- MsgCallback callback; /* callback a declanche a la reception */
+ MsgCallback callback; /* callback a declancher a la reception */
void *user_data; /* stokage d'info client */
};
-struct _msg_snd { /* requete de reception d'un client */
- MsgSndPtr next;
- int id;
- char *str_regexp; /* la regexp sous forme inhumaine */
+
+
+/* liste de regexps source */
+struct _global_reg_lst { /* liste des regexp source */
+ GlobRegPtr next;
+ char *str_regexp; /* la regexp sous forme source */
+ int id; /* son id, differente pour chaque client */
+};
+
+
+/* pour le dictionnaire clef=regexp, valeur = cette struct */
+struct _msg_snd_dict { /* requete de reception d'un client */
+ UT_hash_handle hh; /* makes this structure hashable */
+ char *regexp_src; /* clef du dictionnaire (hash uthash) */
+ IvyClientPtr clientList; /* liste des clients */
IvyBinding binding; /* la regexp sous forme machine */
};
-struct _clnt_lst {
+/* liste de clients, champ de la struct _msg_snd_dict qui est valeur du dictionnaire */
+/* typedef IvyClientPtr */
+struct _clnt_lst_dict {
IvyClientPtr next;
Client client; /* la socket client */
- MsgSndPtr msg_send; /* liste des requetes recues */
char *app_name; /* nom de l'application */
unsigned short app_port; /* port de l'application */
+ int id; /* l'id n'est pas liée uniquement
+ a la regexp, mais au couple
+ regexp, client */
+ GlobRegPtr srcRegList; /* liste de regexp source */
+#ifdef OPENMP
+ int endRegexpReceived;
+#endif // OPENMP
};
/* flag pour le debug en cas de Filter de regexp */
@@ -113,35 +148,70 @@ static unsigned short SupervisionPort;
/* client pour la socket supervision */
static Client broadcast;
-static const char *ApplicationName = 0;
-static const char *ApplicationID = 0;
+static const char *ApplicationName = NULL;
+static const char *ApplicationID = NULL;
/* callback appele sur reception d'un message direct */
-static MsgDirectCallback direct_callback = 0;
-static void *direct_user_data = 0;
+static MsgDirectCallback direct_callback = NULL;
+static void *direct_user_data = NULL;
/* callback appele sur changement d'etat d'application */
static IvyApplicationCallback application_callback;
-static void *application_user_data = 0;
+static void *application_user_data = NULL;
/* callback appele sur ajout suppression de regexp */
static IvyBindCallback application_bind_callback;
-static void *application_bind_data = 0;
+static void *application_bind_data = NULL;
/* callback appele sur demande de terminaison d'application */
static IvyDieCallback application_die_callback;
-static void *application_die_user_data = 0;
+static void *application_die_user_data = NULL;
/* liste des messages a recevoir */
-static MsgRcvPtr msg_recv = 0;
+static MsgRcvPtr msg_recv = NULL;
/* liste des clients connectes */
-static IvyClientPtr clients = 0;
+static IvyClientPtr allClients = NULL;
+
+/* dictionnaire clef : regexp, valeur : liste de clients IvyClientPtr */
+static MsgSndDictPtr messSndByRegexp = NULL;
-static const char *ready_message = 0;
+static const char *ready_message = NULL;
static void substituteInterval (IvyBuffer *src);
+static int RegexpCall (const MsgSndDictPtr msg, const char * const message);
+static int RegexpCallUnique (const MsgSndDictPtr msg, const char * const message,
+ const Client clientUnique);
+
+static void freeClient ( IvyClientPtr client);
+static void delOneClient (const Client client);
+
+static void delRegexpForOneClientFromDictionary (const char *regexp, const IvyClientPtr client);
+static void delAllRegexpsFromDictionary ();
+static void addRegexpToDictionary (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding bind);
+static void changeRegexpInDictionary (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding bind);
+
+static char delRegexpForOneClient (const char *regexp, const IvyClientPtr client, int id);
+static void addRegexp (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding bind);
+static void changeRegexp (const char* regexp, IvyClientPtr client, int id, IvyBinding bind);
+static void addOrChangeRegexp (const char* regexp, IvyClientPtr client, int id, IvyBinding bind);
+static int IvyCheckBuffer( const char* buffer );
+
+#ifdef OPENMP
+static void regenerateRegPtrArrayCache ();
+static void addRegToPtrArrayCache (MsgSndDictPtr newReg);
+static struct {
+ MsgSndDictPtr *msgPtrArray;
+ int size;
+ int numPtr;
+} ompDictCache = {NULL, 0, 0};
+#endif
+
+
/*
* function like strok but do not eat consecutive separator
* */
@@ -169,70 +239,165 @@ static int MsgSendTo( Client client, MsgType msgtype, int id, const char *messag
static void IvyCleanup()
{
IvyClientPtr clnt,next;
+ GlobRegPtr regLst;
+
/* destruction des connexions clients */
- IVY_LIST_EACH_SAFE( clients, clnt, next )
+ IVY_LIST_EACH_SAFE( allClients, clnt, next )
{
/* on dit au revoir */
MsgSendTo( clnt->client, Bye, 0, "" );
SocketClose( clnt->client );
- IVY_LIST_EMPTY( clnt->msg_send );
+ IVY_LIST_EACH (clnt->srcRegList, regLst) {
+ if (regLst->str_regexp != NULL) {
+ free (regLst->str_regexp);
+ regLst->str_regexp = NULL;
+ }
+ }
+ IVY_LIST_EMPTY( clnt->srcRegList );
+ IVY_LIST_REMOVE (allClients, clnt);
}
- IVY_LIST_EMPTY( clients );
+ IVY_LIST_EMPTY( allClients );
+ delAllRegexpsFromDictionary ();
/* destruction des sockets serveur et supervision */
SocketServerClose( server );
SocketClose( broadcast );
}
-static int MsgCall (const char *message, MsgSndPtr msg, IvyClientPtr client)
+
+static int
+ClientCall (IvyClientPtr clnt, const char *message)
{
- int waiting = 0;
- static IvyBuffer buffer = { NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */
- int err;
- int index;
- int arglen;
- const char *arg;
-
- int rc= IvyBindingExec( msg->binding, message );
+ int match_count = 0;
+
+ /* pour toutes les regexp */
+ MsgSndDictPtr msgSendDict;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL;
+ msgSendDict=msgSendDict->hh.next) {
+ match_count += RegexpCallUnique (msgSendDict, message, clnt->client);
+ }
+
+ TRACE_IF( match_count == 0, "Warning no recipient for %s\n",message);
+ /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */
+ if ( match_count == 0 && debug_filter ) {
+ IvyBindindFilterCheck( message );
+ }
+ return match_count;
+}
+
+
+
+static int
+RegexpCall (const MsgSndDictPtr msg, const char * const message)
+{
+ static IvyBuffer bufferArg = {NULL, 0, 0};
+ char bufferId[16];
+ int match_count ;
+ int waiting ;
+ int indx;
+ int arglen;
+ const char *arg;
+ IvyClientPtr clnt;
+ int rc;
+
+#ifdef OPENMP
+#pragma omp threadprivate (bufferArg)
+ /* d'après la doc openmp :
+ Variables with automatic storage duration which are declared in a scope inside the
+ construct are private.
+ Il n'y aurait donc rien à faire pour s'assurer que les variables automatiques soient
+ privées au thread */
+#endif // OPENMP
+
+ match_count = 0;
+ waiting = 0;
+ rc= IvyBindingExec(msg->binding, message );
- if (rc<1) return 0; /* no match */
+ if (rc<1) return 0; /* no match */
- TRACE( "Sending message id=%d '%s'\n",msg->id,message);
+ bufferArg.offset = 0;
+ // bufferArg.size = bufferId.size = 0;
+ // bufferArg.data = bufferId.data = NULL;
- buffer.offset = 0;
- /* il faut essayer d'envoyer le message en une seule fois sur la socket */
- /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */
- /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */
- err = make_message_var( &buffer, "%d %d" ARG_START ,Msg, msg->id);
+ /* il faut essayer d'envoyer le message en une seule fois sur la socket */
+ /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */
+ /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */
- TRACE( "Send matching args count %d\n",rc);
+ TRACE( "Send matching args count %d\n",rc);
+
+ for( indx=1; indx < rc ; indx++ )
+ {
+ IvyBindingMatch (msg->binding, message, indx, &arglen, & arg );
+ make_message_var( &bufferArg, "%.*s" ARG_END , arglen, arg );
+ }
+ make_message_var( &bufferArg, "\n");
- for( index=1; index < rc ; index++ )
- {
- IvyBindingMatch( msg->binding, message, index, &arglen, & arg );
- err = make_message_var( &buffer, "%.*s" ARG_END , arglen, arg );
- }
- err = make_message_var( &buffer, "\n");
- waiting = SocketSendRaw(client->client, buffer.data , buffer.offset);
- if ( waiting )
- fprintf(stderr, "Ivy: Slow client : %s\n", client->app_name );
- return 1;
-}
+ IVY_LIST_EACH(msg->clientList, clnt ) {
+ sprintf (bufferId, "%d %d" ARG_START ,Msg, clnt->id);
+ waiting = SocketSendRawWithId(clnt->client, bufferId, bufferArg.data , bufferArg.offset);
+ match_count++;
+ if ( waiting )
+ fprintf(stderr, "Ivy: Slow client : %s\n", clnt->app_name );
+ }
+ return match_count;
+}
static int
-ClientCall (IvyClientPtr clnt, const char *message)
+RegexpCallUnique (const MsgSndDictPtr msg, const char * const message, const
+ Client clientUnique)
{
- MsgSndPtr msg;
- int match_count = 0;
- /* recherche dans la liste des requetes recues de ce client */
- IVY_LIST_EACH (clnt->msg_send, msg) {
- match_count+= MsgCall (message, msg, clnt);
- }
- return match_count;
+ static IvyBuffer bufferArg = {NULL, 0, 0};
+ char bufferId[16];
+ int match_count ;
+ int waiting ;
+ int indx;
+ int arglen;
+ const char *arg;
+ IvyClientPtr clnt;
+ int rc;
+
+ match_count = 0;
+ waiting = 0;
+ rc= IvyBindingExec(msg->binding, message );
+
+ if (rc<1) return 0; /* no match */
+
+ bufferArg.offset = 0;
+ // bufferArg.size = bufferId.
+ // bufferArg.size = bufferId.size = 0;
+ // bufferArg.data = bufferId.data = NULL;
+
+ /* il faut essayer d'envoyer le message en une seule fois sur la socket */
+ /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */
+ /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */
+
+ TRACE( "Send matching args count %d\n",rc);
+
+ for( indx=1; indx < rc ; indx++ )
+ {
+ IvyBindingMatch (msg->binding, message, indx, &arglen, & arg );
+ make_message_var( &bufferArg, "%.*s" ARG_END , arglen, arg );
+ }
+ make_message_var( &bufferArg, "\n");
+
+ IVY_LIST_EACH(msg->clientList, clnt ) {
+ if (clientUnique != clnt->client)
+ continue;
+ sprintf (bufferId, "%d %d" ARG_START ,Msg, clnt->id);
+ waiting = SocketSendRawWithId(clnt->client, bufferId, bufferArg.data , bufferArg.offset);
+ match_count++;
+ if ( waiting )
+ fprintf(stderr, "Ivy: Slow client : %s\n", clnt->app_name );
+ }
+ return match_count;
}
+
+
+
static int CheckConnected( IvyClientPtr clnt )
{
IvyClientPtr client;
@@ -242,7 +407,7 @@ static int CheckConnected( IvyClientPtr clnt )
if ( clnt->app_port == 0 ) /* Old Ivy Protocol Dont check */
return 0;
/* recherche dans la liste des clients de la presence de clnt */
- IVY_LIST_EACH( clients, client )
+ IVY_LIST_EACH( allClients, client )
{
/* client different mais port identique */
if ( (client != clnt) && (clnt->app_port == client->app_port) )
@@ -265,18 +430,19 @@ static int CheckConnected( IvyClientPtr clnt )
return 0;
}
+
+
static void Receive( Client client, void *data, char *line )
{
IvyClientPtr clnt;
int err,id;
- MsgSndPtr snd;
MsgRcvPtr rcv;
int argc = 0;
char *argv[MAX_MATCHING_ARGS];
char *arg;
int kind_of_msg = Bye;
- IvyBinding bind;
-
+ IvyBinding ivyBind;
+
const char *errbuf;
int erroffset;
@@ -311,7 +477,7 @@ static void Receive( Client client, void *data, char *line )
if ( !IvyBindingFilter( arg ) )
{
- TRACE("Warning: regexp '%s' illegal, removing from %s\n",arg,ApplicationName);
+ TRACE("Warning: regexp '%s' filtered, removing from %s\n",arg,ApplicationName);
if ( application_bind_callback )
{
@@ -320,58 +486,33 @@ static void Receive( Client client, void *data, char *line )
return;
}
- bind = IvyBindingCompile( arg, & erroffset, & errbuf );
- if ( bind != NULL )
- {
- /* On teste si c'est un change regexp : changement de regexp d'une id existante */
- IVY_LIST_ITER( clnt->msg_send, snd, ( id != snd->id ));
- if ( snd ) {
- free (snd->str_regexp);
- snd->str_regexp = strdup( arg );
- snd->binding = bind;
- if ( application_bind_callback )
- {
- (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyChangeBind );
- }
- } else {
- IVY_LIST_ADD_START( clnt->msg_send, snd )
- snd->id = id;
- snd->str_regexp = strdup( arg );
- snd->binding = bind;
- if ( application_bind_callback )
- {
- (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyAddBind );
- }
- IVY_LIST_ADD_END( clnt->msg_send, snd )
-
- }
- }
- else
- {
- printf("Error compiling '%s', %s\n", arg, errbuf);
- MsgSendTo( client, Error, erroffset, errbuf );
+ ivyBind = IvyBindingCompile( arg, & erroffset, & errbuf );
+ if ( ivyBind != NULL ) {
+ addOrChangeRegexp (arg, clnt, id, ivyBind);
+ } else {
+ printf("Error compiling '%s', %s\n", arg, errbuf);
+ MsgSendTo( client, Error, erroffset, errbuf );
}
break;
case DelRegexp:
-
- TRACE("Regexp Delete id=%d\n", id);
-
- IVY_LIST_ITER( clnt->msg_send, snd, ( id != snd->id ));
- if ( snd )
- {
- if ( application_bind_callback )
- {
- (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyRemoveBind );
- }
- IvyBindingFree( snd->binding );
- IVY_LIST_REMOVE( clnt->msg_send, snd );
- }
- break;
+
+ TRACE("Regexp Delete id=%d\n", id);
+ if (delRegexpForOneClient (arg, clnt, id)) {
+ if ( application_bind_callback ) {
+ (*application_bind_callback)( clnt, application_bind_data, id, arg,
+ IvyRemoveBind );
+ }
+ }
+ break;
case StartRegexp:
TRACE("Regexp Start id=%d Application='%s'\n", id, arg);
+#ifdef OPENMP
+ clnt->endRegexpReceived=0;
+#endif // OPENMP
+
clnt->app_name = strdup( arg );
clnt->app_port = id;
if ( CheckConnected( clnt ) )
@@ -395,9 +536,13 @@ static void Receive( Client client, void *data, char *line )
{
int count;
count = ClientCall( clnt, ready_message );
-
- TRACE(" Sendind ready message %d\n", count);
+ // count = IvySendMsg ("%s", ready_message );
+ // printf ("%s sending READY MESSAGE %d\n", clnt->app_name, count);
}
+#ifdef OPENMP
+ clnt->endRegexpReceived=1;
+ regenerateRegPtrArrayCache();
+#endif // OPENMP
break;
case Msg:
@@ -461,9 +606,7 @@ static IvyClientPtr SendService( Client client, const char *appname )
{
IvyClientPtr clnt;
MsgRcvPtr msg;
- IVY_LIST_ADD_START( clients, clnt )
-
- clnt->msg_send = 0;
+ IVY_LIST_ADD_START( allClients, clnt )
clnt->client = client;
clnt->app_name = strdup(appname);
clnt->app_port = 0;
@@ -474,51 +617,44 @@ static IvyClientPtr SendService( Client client, const char *appname )
}
MsgSendTo( client, EndRegexp, 0, "");
- IVY_LIST_ADD_END( clients, clnt )
+ IVY_LIST_ADD_END( allClients, clnt )
+ //printf ("DBG> SendService addAllClient: name=%s; client->client=%p\n", appname, clnt->client);
+
return clnt;
}
static void ClientDelete( Client client, void *data )
{
IvyClientPtr clnt;
- MsgSndPtr msg;
+
#ifdef DEBUG
char *remotehost;
unsigned short remoteport;
#endif
clnt = (IvyClientPtr)data;
- if ( application_callback )
- {
- (*application_callback)( clnt, application_user_data, IvyApplicationDisconnected );
- }
+ if ( application_callback ) {
+ (*application_callback)( clnt, application_user_data, IvyApplicationDisconnected );
+ }
#ifdef DEBUG
/* probably bogus call, but this is for debug only anyway */
SocketGetRemoteHost( client, &remotehost, &remoteport );
TRACE("Deconnexion de %s:%hu\n", remotehost, remoteport );
#endif /*DEBUG */
-
- if ( clnt->app_name ) free( clnt->app_name );
- IVY_LIST_EACH( clnt->msg_send, msg)
- {
- /*regfree(msg->regexp);*/
- free( msg->str_regexp);
- }
- IVY_LIST_EMPTY( clnt->msg_send );
- IVY_LIST_REMOVE( clients, clnt );
+ delOneClient (client);
}
static void *ClientCreate( Client client )
{
-#ifdef DEBUG
- char *remotehost;
- unsigned short remoteport;
- SocketGetRemoteHost( client, &remotehost, &remoteport );
- TRACE("Connexion de %s:%hu\n", remotehost, remoteport );
-#endif /*DEBUG */
-
- return SendService (client, "Unknown");
+ // #ifdef DEBUG
+ char *remotehost;
+ unsigned short remoteport;
+ SocketGetRemoteHost( client, &remotehost, &remoteport );
+ printf ("%s : Connexion de %s:%hu client=%p\n",
+ ApplicationName, remotehost, remoteport, client);
+ // #endif /*DEBUG */
+ return SendService (client, "Unknown");
}
static void BroadcastReceive( Client client, void *data, char *line )
@@ -529,17 +665,14 @@ static void BroadcastReceive( Client client, void *data, char *line )
unsigned short serviceport;
char appid[2048];
char appname[2048];
-#ifdef DEBUG
unsigned short remoteport;
char *remotehost = 0;
-#endif
+
memset( appid, 0, sizeof( appid ) );
memset( appname, 0, sizeof( appname ) );
err = sscanf (line,"%d %hu %s %[^\n]", &version, &serviceport, appid, appname);
if ( err < 2 ) {
/* ignore the message */
- unsigned short remoteport;
- char *remotehost;
SocketGetRemoteHost (client, &remotehost, &remoteport );
printf (" Bad supervision message, expected 'version port' from %s:%d\n",
remotehost, remoteport);
@@ -547,8 +680,6 @@ static void BroadcastReceive( Client client, void *data, char *line )
}
if ( version != VERSION ) {
/* ignore the message */
- unsigned short remoteport;
- char *remotehost = 0;
SocketGetRemoteHost (client, &remotehost, &remoteport );
fprintf (stderr, "Bad Ivy version, expected %d and got %d from %s:%d\n",
VERSION, version, remotehost, remoteport);
@@ -569,6 +700,9 @@ static void BroadcastReceive( Client client, void *data, char *line )
IvyClientPtr clnt;
clnt = SendService( app, appname );
SocketSetData( app, clnt);
+ } else {
+ printf ("SocketConnectAddr error .....\n");
+ SocketSetData( app, NULL);
}
}
static unsigned long currentTime()
@@ -641,7 +775,7 @@ void IvyStart (const char* bus)
int error = 0;
const char* p = bus; /* used for decoding address list */
const char* q; /* used for decoding port number */
- int port;
+ unsigned short port;
/*
@@ -750,7 +884,7 @@ IvyUnbindMsg (MsgRcvPtr msg)
{
IvyClientPtr clnt;
/* Send to already connected clients */
- IVY_LIST_EACH (clients, clnt ) {
+ IVY_LIST_EACH (allClients, clnt ) {
MsgSendTo( clnt->client, DelRegexp,msg->id, "");
}
IVY_LIST_REMOVE( msg_recv, msg );
@@ -783,7 +917,7 @@ IvyBindMsg (MsgCallback callback, void *user_data, const char *fmt_regex, ... )
IVY_LIST_ADD_END( msg_recv, msg )
/* Send to already connected clients */
/* recherche dans la liste des requetes recues de mes clients */
- IVY_LIST_EACH( clients, clnt ) {
+ IVY_LIST_EACH( allClients, clnt ) {
MsgSendTo( clnt->client, AddRegexp,msg->id,msg->regexp);
}
return msg;
@@ -809,15 +943,115 @@ IvyChangeMsg (MsgRcvPtr msg, const char *fmt_regex, ... )
/* Send to already connected clients */
/* recherche dans la liste des requetes recues de mes clients */
- IVY_LIST_EACH( clients, clnt ) {
+ IVY_LIST_EACH( allClients, clnt ) {
MsgSendTo( clnt->client, AddRegexp,msg->id,msg->regexp);
}
return msg;
}
+
+
+
+int IvySendMsg(const char *fmt, ...) /* version dictionnaire */
+{
+ int match_count = 0;
+ static IvyBuffer buffer = { NULL, 0, 0}; /* Use static mem to eliminate multiple call to malloc /free */
+ va_list ap;
+
+ /* construction du buffer message à partir du format et des arguments */
+ if( fmt == 0 || strlen(fmt) == 0 ) return 0;
+ va_start( ap, fmt );
+ buffer.offset = 0;
+ make_message( &buffer, fmt, ap );
+ va_end ( ap );
+
+ /* test du contenu du message */
+ if ( debug_binary_msg )
+ {
+ if ( IvyCheckBuffer( buffer.data ) )
+ return 0;
+ }
+
+ /* pour toutes les regexp */
+
+#ifdef OPENMP
+ //#define TABLEAU_PREALABLE_SEQUENTIEL 1
+#define TABLEAU_PREALABLE 1
+ //#define SINGLE_NOWAIT 1
+ //#define SCHEDULE_GUIDED 1
+ //#define SEQUENTIEL_DEBUG 1
+
+#ifdef SCHEDULE_GUIDED
+ int count;
+#pragma omp parallel default(none) private(count) shared(ompDictCache, buffer) \
+ reduction(+:match_count)
+ {
+#pragma omp for schedule(guided) // après debug mettre schedule(guided, 10)
+ for(count=0; count<ompDictCache.numPtr; count++) {
+ match_count += RegexpCall (ompDictCache.msgPtrArray[count], buffer.data);
+ }
+}
+#endif // SCHEDULE_GUIDED
+
+
+#ifdef TABLEAU_PREALABLE
+ int count; // PARALLEL FOR
+#pragma omp parallel for default(none) private(count) shared(ompDictCache, buffer) \
+ reduction(+:match_count)
+ for(count=0; count<ompDictCache.numPtr; count++) {
+ match_count += RegexpCall (ompDictCache.msgPtrArray[count], buffer.data);
+ }
+#endif // TABLEAU_PREALABLE
+
+
+#ifdef TABLEAU_PREALABLE_SEQUENTIEL
+ int count;
+ for(count=0; count<ompDictCache.numPtr; count++) {
+ match_count += RegexpCall (ompDictCache.msgPtrArray[count], buffer.data);
+ }
+#endif // TABLEAU_PREALABLE_SEQUENTIEL
+
+
+#ifdef SINGLE_NOWAIT // OPEMMP LISTE
+ MsgSndDictPtr msgSendDict;
+#pragma omp parallel default(shared) private(msgSendDict) reduction(+:match_count)
+ for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) {
+#pragma omp single nowait
+ match_count += RegexpCall (msgSendDict, buffer.data);
+ }
+#endif // SINGLE_NOWAIT
+
+#ifdef SEQUENTIEL_DEBUG // OPEMMP LISTE
+ MsgSndDictPtr msgSendDict;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) {
+ match_count += RegexpCall (msgSendDict, buffer.data);
+ }
+#endif // SEQUENTIEL_DEBUG
+
+
+
+#else // PAS OPENMP
+ MsgSndDictPtr msgSendDict;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) {
+ match_count += RegexpCall (msgSendDict, buffer.data);
+ }
+#endif
+
+ TRACE_IF( match_count == 0, "Warning no recipient for %s\n",buffer.data);
+ /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */
+ if ( match_count == 0 && debug_filter )
+ {
+ IvyBindindFilterCheck( buffer.data );
+ }
+ return match_count;
+}
+
+
/* teste de la presence de binaire dans les message Ivy */
static int IvyCheckBuffer( const char* buffer )
{
- char * ptr = buffer;
+ const char * ptr = buffer;
while ( *ptr )
{
if ( *ptr++ < ' ' )
@@ -829,42 +1063,11 @@ static int IvyCheckBuffer( const char* buffer )
}
return 0;
}
-/* emmission d'un message avec formatage a la printf */
-int IvySendMsg(const char *fmt, ...)
-{
- IvyClientPtr clnt;
- int match_count = 0;
- static IvyBuffer buffer = { NULL, 0, 0}; /* Use satic mem to eliminate multiple call to malloc /free */
- va_list ap;
-
- if( fmt == 0 || strlen(fmt) == 0 ) return 0;
- va_start( ap, fmt );
- buffer.offset = 0;
- make_message( &buffer, fmt, ap );
- va_end ( ap );
- /* test du contenu du message */
- if ( debug_binary_msg )
- {
- if ( IvyCheckBuffer( buffer.data ) )
- return 0;
- }
-
- /* recherche dans la liste des requetes recues de mes clients */
- IVY_LIST_EACH (clients, clnt) {
- match_count += ClientCall (clnt, buffer.data);
- }
- TRACE_IF( match_count == 0, "Warning no recipient for %s\n",buffer.data);
- /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */
- if ( match_count == 0 && debug_filter )
- {
- IvyBindindFilterCheck( buffer.data );
- }
- return match_count;
-}
+
void IvySendError( IvyClientPtr app, int id, const char *fmt, ... )
{
- static IvyBuffer buffer = { NULL, 0, 0}; /* Use satic mem to eliminate multiple call to malloc /free */
+ static IvyBuffer buffer = { NULL, 0, 0}; /* Use static mem to eliminate multiple call to malloc /free */
va_list ap;
va_start( ap, fmt );
@@ -943,7 +1146,7 @@ void IvyDefaultBindCallback( IvyClientPtr app, void *user_data, int id, char* re
IvyClientPtr IvyGetApplication( char *name )
{
IvyClientPtr app = 0;
- IVY_LIST_ITER( clients, app, strcmp(name, app->app_name) != 0 );
+ IVY_LIST_ITER( allClients, app, strcmp(name, app->app_name) != 0 );
return app;
}
@@ -952,7 +1155,7 @@ char *IvyGetApplicationList(const char *sep)
static char applist[4096]; /* TODO remove that ugly Thing */
IvyClientPtr app;
applist[0] = '\0';
- IVY_LIST_EACH( clients, app )
+ IVY_LIST_EACH( allClients, app )
{
strcat( applist, app->app_name );
strcat( applist, sep );
@@ -964,11 +1167,11 @@ char **IvyGetApplicationMessages( IvyClientPtr app )
{
#define MAX_REGEXP 4096
static char *messagelist[MAX_REGEXP+1];/* TODO remove that ugly Thing */
- MsgSndPtr msg;
+ GlobRegPtr msg;
int msgCount= 0;
memset( messagelist, 0 , sizeof( messagelist ));
/* recherche dans la liste des requetes recues de ce client */
- IVY_LIST_EACH( app->msg_send, msg )
+ IVY_LIST_EACH( app->srcRegList, msg )
{
messagelist[msgCount++]= msg->str_regexp;
if ( msgCount >= MAX_REGEXP )
@@ -1021,3 +1224,371 @@ static void substituteInterval (IvyBuffer *src)
src->data = dst.data;
}
}
+
+
+static void freeClient ( IvyClientPtr client)
+{
+ GlobRegPtr srcReg;
+
+ /* on libere la chaine nom de l'appli*/
+ if (client->app_name != NULL) {
+ free (client->app_name);
+ client->app_name = NULL;
+ /* on libere la liste des clients */
+ IVY_LIST_EACH (client->srcRegList, srcReg) {
+ if (srcReg->str_regexp != NULL) {
+ free (srcReg->str_regexp);
+ srcReg->str_regexp = NULL;
+ }
+ }
+ IVY_LIST_EMPTY (client->srcRegList);
+ }
+}
+
+
+
+
+
+
+
+static void delRegexpForOneClientFromDictionary (const char *regexp, const IvyClientPtr client)
+{
+ MsgSndDictPtr msgSendDict = NULL;
+ IvyClientPtr client_itr, next;
+ TRACE ("ENTER delRegexpForOneClientFromDictionary clnt=%d, reg='%s'\n", client, regexp);
+
+ HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict);
+ if (msgSendDict != NULL) {
+ /* la clef est trouvée, on itere sur la liste de client associée */
+ IVY_LIST_EACH_SAFE ( msgSendDict->clientList, client_itr, next) {
+ /* pour tester 2 IvyClientPtr, on teste la similarité
+ des pointeur Client qui doivent être uniques */
+ if (client_itr->client == client->client) {
+ /* on a trouve le client : on l'enleve */
+ IVY_LIST_REMOVE (msgSendDict->clientList, client_itr);
+ TRACE ("delRegexpForOneClientFromDictionary : IVY_LIST_REMOVE\n");
+ }
+ }
+ /* si la liste de clients associée à cette regexp est vide */
+ if ((msgSendDict->clientList == NULL) || (IVY_LIST_IS_EMPTY (msgSendDict->clientList))) {
+ TRACE ("delRegexpForOneClientFromDictionary : IvyBindingFree, free, hash_del\n");
+ /* on efface le binding */
+ IvyBindingFree (msgSendDict->binding);
+ /* on enlève l'entrée regexp de la table de hash */
+ HASH_DEL (messSndByRegexp, msgSendDict);
+ /* on efface la clef (regexp source) */
+ free (msgSendDict->regexp_src);
+ }
+ }
+#ifdef OPENMP
+ regenerateRegPtrArrayCache ();
+#endif
+}
+
+
+
+
+static void delAllRegexpsFromDictionary ()
+{
+ MsgSndDictPtr msgSendDict;
+ IvyClientPtr client;
+
+ /* pour toutes les entrees du dictionnaire des regexps */
+ for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) {
+ /* on efface le binding */
+ IvyBindingFree (msgSendDict->binding);
+ /* on efface la clef (regexp source) */
+ free (msgSendDict->regexp_src);
+ /* pour chaque client abonne a cette regexp */
+ IVY_LIST_EACH ( msgSendDict->clientList, client) {
+ freeClient (client);
+ }
+ /* on enleve la liste de regexps */
+ IVY_LIST_EMPTY(msgSendDict->clientList);
+ /* on enleve le couple regexp -> valeur */
+ HASH_DEL(messSndByRegexp, msgSendDict);
+ }
+
+#ifdef OPENMP
+ regenerateRegPtrArrayCache ();
+#endif
+}
+
+
+
+
+
+
+
+
+
+// HASH_ADD_KEYPTR (hh_name, head, key_ptr, key_len, item_ptr)
+// HASH_ADD_STR ( head, keyfield_name, item_ptr)
+
+static void addRegexpToDictionary (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding ivyBind)
+{
+ MsgSndDictPtr msgSendDict = NULL;
+ IvyClientPtr newClient = NULL;
+ /* on cherche si une entrée existe deja pour cette regexp source */
+ HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict);
+ /* l'entree n'existe pas dans le dictionnaire : on la cree */
+ if (msgSendDict == NULL) {
+ msgSendDict = malloc (sizeof (struct _msg_snd_dict));
+ msgSendDict->regexp_src = strdup (regexp);
+ msgSendDict->binding = ivyBind;
+ msgSendDict->clientList = NULL;
+
+ /* HASH_ADD_STR ne fonctionne que si la clef est un tavbleau de char, si c'est un pointeur
+ if faut utiliser HASH_ADD_KEYPTR */
+ HASH_ADD_KEYPTR(hh, messSndByRegexp, msgSendDict->regexp_src, strlen (msgSendDict->regexp_src), msgSendDict);
+#ifdef OPENMP
+ // On ne regenere le cache qu'après recpetion du endregexp, ça permet d'eviter
+ // de regenerer inutilement le cache à chaqye nouvelle regexp initiale
+ // par contre, après le end regexp, il faut regenerer le cache à chaque
+ // nouvel abonnement
+ if (client->endRegexpReceived == 1)
+ addRegToPtrArrayCache (msgSendDict);
+#endif
+ }
+
+
+ /* on ajoute le client à la liste des clients abonnés */
+ IVY_LIST_ADD_START (msgSendDict->clientList, newClient);
+ newClient->app_name = strdup (client->app_name);
+ newClient->app_port = client->app_port;
+ newClient->client = client->client;
+ newClient->id = id;
+ /* au niveau du champ liste de client du dictionnaire, on n'a pas besoin
+ de la liste des regexps sources (qui n'est necessaire que pour
+ la liste globale des clients) */
+ newClient->srcRegList = NULL;
+ IVY_LIST_ADD_END (msgSendDict->clientList, newClient);
+}
+
+
+
+static void changeRegexpInDictionary (const char* regexp, IvyClientPtr client,
+ int id, IvyBinding ivyBind)
+{
+ // printf ("ENTER changeRegexpInDictionary\n");
+ delRegexpForOneClientFromDictionary (regexp, client);
+ addRegexpToDictionary (regexp, client, id, ivyBind);
+}
+
+
+
+
+/* met a jour le dictionnaire et la liste globale */
+static void delOneClient (const Client client)
+{
+ IvyClientPtr client_itr;
+ GlobRegPtr regxpSrc =NULL, next;
+
+ TRACE ("ENTER delOneClient\n");
+ /* on cherche le client dans la liste globale des clients */
+ IVY_LIST_ITER (allClients, client_itr, client_itr->client != client );
+ /* si on le trouve */
+ if (client_itr != NULL) {
+ TRACE ("DEBUG delOneClient %s, client_itr->client=%p, clientRef=%p\n",
+ client_itr->app_name, client_itr->client, client);
+
+
+
+
+ /* pour chaque regexp source de ce client */
+ IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) {
+ /* on met a jour la liste des clients associee a la regexp source */
+ delRegexpForOneClient (regxpSrc->str_regexp, client_itr, regxpSrc->id);
+ /* on libere la memoire associee a la regexp source */
+ if (regxpSrc->str_regexp != NULL) {
+ free (regxpSrc->str_regexp);
+ regxpSrc->str_regexp = NULL;
+ }
+ }
+
+
+ /* on libere la liste de regexp source */
+ IVY_LIST_EMPTY (client_itr->srcRegList);
+ /* on enleve l'entree correspondant a ce client dans la liste globale */
+ IVY_LIST_REMOVE (allClients, client_itr);
+ }
+}
+
+
+static char delRegexpForOneClient (const char *regexp, const IvyClientPtr client, int id)
+{
+ IvyClientPtr client_itr = NULL;
+ GlobRegPtr regxpSrc = NULL, next = NULL;
+ char removed = 0;
+
+ TRACE ("ENTER delRegexpForOneClient id=%d\n", id);
+
+ /* on enleve du dictionnaire */
+
+ delRegexpForOneClientFromDictionary (regexp, client);
+
+ /* on enleve de la liste globale */
+ /* recherche du client */
+ IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client );
+ if (client_itr != NULL) {
+ /* pour chaque regexp source de ce client */
+ IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) {
+ /* si on trouve notre regexp, on la supprime */
+ if (regxpSrc->id == id) {
+ removed = 1;
+ if (regxpSrc->str_regexp != NULL) {
+ free (regxpSrc->str_regexp);
+ regxpSrc->str_regexp = NULL;
+ }
+ TRACE ("DBG> IVY_LIST_REMOVE (%p, %p)\n", client_itr->srcRegList, regxpSrc);
+ IVY_LIST_REMOVE (client_itr->srcRegList, regxpSrc);
+ }
+ }
+ }
+ return (removed);
+}
+
+static void addOrChangeRegexp (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding ivyBind)
+{
+ MsgSndDictPtr msgSendDict = NULL;
+ IvyClientPtr client_itr = NULL;
+
+ // printf ("ENTER addOrChangeRegexp\n");
+ /* on teste si la regexp existe deja et si il faut faire un changeRegexp */
+ HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict);
+ /* la regexp n'existe pas du tout */
+ if (msgSendDict == NULL) {
+ addRegexp (regexp, client, id, ivyBind);
+ } else {
+ /* la regexp existe, mais l'id existe elle pour le client */
+ IVY_LIST_ITER( msgSendDict->clientList, client_itr, ( client_itr->client != client->client));
+ if (( client_itr != NULL) && (client_itr->id == id)) {
+ /* si oui on fait un change regexp */
+ changeRegexp (regexp, client, id, ivyBind);
+ } else {
+ /* si non on fait un add regexp */
+ addRegexp (regexp, client, id, ivyBind);
+ }
+ }
+}
+
+
+static void addRegexp (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding ivyBind)
+{
+ IvyClientPtr client_itr = NULL;
+ GlobRegPtr regxpSrc = NULL;
+
+
+ // printf ("ENTER addRegexp\n");
+
+ /* on ajoute au dictionnaire */
+ addRegexpToDictionary (regexp, client, id, ivyBind);
+
+ /* on ajoute a la liste globale */
+ /* recherche du client */
+ IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client );
+
+ /* si le client n'existe pas, faut le creer */
+ if (client_itr == NULL) {
+/* IVY_LIST_ADD_START (allClients, client_itr); */
+/* client_itr->app_name = strdup (client->app_name); */
+/* client_itr->app_port = client->app_port; */
+/* client_itr->client = client->client; */
+/* client_itr->srcRegList = NULL; */
+/* IVY_LIST_ADD_END (allClients, client_itr); */
+ fprintf(stderr, "addRegexp ERROR\n");
+ }
+
+ /* on ajoute la regexp à la liste de regexps */
+ IVY_LIST_ADD_START (client_itr->srcRegList, regxpSrc);
+ regxpSrc->id = id;
+ regxpSrc->str_regexp = strdup (regexp);
+ IVY_LIST_ADD_END (client_itr->srcRegList, regxpSrc);
+ if (application_bind_callback) {
+ (*application_bind_callback)( client, application_bind_data, id, regexp, IvyAddBind);
+ }
+}
+
+
+
+static void changeRegexp (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding ivyBind)
+{
+ IvyClientPtr client_itr = NULL;
+ GlobRegPtr regxpSrc = NULL, next = NULL;
+ /* on change dans le dictionnaire */
+ // printf ("ENTER changeRegexp\n");
+ changeRegexpInDictionary (regexp, client, id , ivyBind);
+
+ /* on change dans la liste globale */
+ /* recherche du client */
+ IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client );
+ if (client_itr != NULL) {
+ /* pour chaque regexp source de ce client */
+ IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) {
+ /* si on trouve notre regexp, on la change */
+ if (regxpSrc->id == id) {
+ free (regxpSrc->str_regexp);
+ regxpSrc->str_regexp = strdup (regexp);
+ }
+ }
+ }
+ if (application_bind_callback) {
+ (*application_bind_callback)( client, application_bind_data, id, regexp, IvyChangeBind);
+ }
+}
+
+#ifdef OPENMP
+static void regenerateRegPtrArrayCache ()
+{
+ int count=0;
+ MsgSndDictPtr msgSendDict;
+ ompDictCache.numPtr = 0;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL ;
+ msgSendDict=msgSendDict->hh.next) {
+ ompDictCache.numPtr++;
+ }
+
+ if (ompDictCache.numPtr >= ompDictCache.size) {
+ ompDictCache.size = (ompDictCache.numPtr*2) + 128;
+ ompDictCache.msgPtrArray = realloc (ompDictCache.msgPtrArray,
+ sizeof (MsgSndDictPtr) * ompDictCache.size);
+ }
+
+
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL ;
+ msgSendDict=msgSendDict->hh.next) {
+ ompDictCache.msgPtrArray [count++] = msgSendDict;
+ }
+}
+
+
+static void addRegToPtrArrayCache (MsgSndDictPtr newReg)
+{
+ int count=0;
+ MsgSndDictPtr msgSendDict;
+ ompDictCache.numPtr = 0;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL ;
+ msgSendDict=msgSendDict->hh.next) {
+ ompDictCache.numPtr++;
+ }
+
+ if (ompDictCache.numPtr >= ompDictCache.size) {
+ ompDictCache.size = (ompDictCache.numPtr*2) + 128;
+ ompDictCache.msgPtrArray = realloc (ompDictCache.msgPtrArray,
+ sizeof (MsgSndDictPtr) * ompDictCache.size);
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL ;
+ msgSendDict=msgSendDict->hh.next) {
+ ompDictCache.msgPtrArray [count++] = msgSendDict;
+ }
+ } else {
+ // on ajoute juste le nouveau pointeur
+ ompDictCache.msgPtrArray [ompDictCache.numPtr-1] = newReg;
+ }
+}
+#endif // OPENMP