summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorbustico2008-02-06 16:32:54 +0000
committerbustico2008-02-06 16:32:54 +0000
commitee2e694ebba179f1c75764a7311df717fa3925cd (patch)
treea9da6c6d525241725597c7d641c90436239c91d9 /src
parentc50b5d38c1fc5491e8c99c8a86b043d39e074acc (diff)
downloadivy-c-ee2e694ebba179f1c75764a7311df717fa3925cd.zip
ivy-c-ee2e694ebba179f1c75764a7311df717fa3925cd.tar.gz
ivy-c-ee2e694ebba179f1c75764a7311df717fa3925cd.tar.bz2
ivy-c-ee2e694ebba179f1c75764a7311df717fa3925cd.tar.xz
* fix realloc buffer size when big message
* complete change of internal structures for performance optimisation * experimental parralelized version for performance optimisation (use and need openmp) which scale well for regexp matching on multicore/multi processor gear.
Diffstat (limited to 'src')
-rw-r--r--src/Makefile56
-rw-r--r--src/ivy.c941
-rw-r--r--src/ivy.h2
-rw-r--r--src/ivybuffer.c4
-rw-r--r--src/ivysocket.c100
-rw-r--r--src/ivysocket.h3
-rw-r--r--src/list.h6
-rw-r--r--src/timer.c2
-rw-r--r--src/version.h2
9 files changed, 916 insertions, 200 deletions
diff --git a/src/Makefile b/src/Makefile
index 8dba104..66526dc 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -1,4 +1,4 @@
-#
+
# Ivy, C interface
#
# Copyright (C) 1997-2002
@@ -17,7 +17,7 @@
# change this in version.h too !!!!
MAJOR=3
-MINOR=9
+MINOR=10
PERHAPS64 := $(shell uname -m | perl -ne "print /64/ ? '64' : '';")
LIB = /lib$(PERHAPS64)
@@ -81,6 +81,7 @@ else
endif
CC=gcc
+CPP='g++'
LIBTOOL=ar q # linux and solaris
#LIBTOOL=libtool -static -o
@@ -90,9 +91,12 @@ REGEXP= -DUSE_PCRE_REGEX -DPCRE_OPT=$(PCRE_OPT)
# see below
CHANNEL = -DTCL_CHANNEL_INTEGRATION
-CFLAGS += -g -Wall $(FPIC)
+CFLAGS += -g -Wall -Wshadow $(FPIC)
+OMPCFLAGS = $(CFLAGS) -fopenmp -DOPENMP=1 # -DDEBUG
+OMPLIB = -lgomp -lpthread
OBJ = ivyloop.o timer.o ivysocket.o ivy.o ivybuffer.o ivybind.o intervalRegexp.o
+OMPOBJ = ivyloop.o timer.o ivysocket_omp.o ivy_omp.o ivybuffer.o ivybind.o intervalRegexp.o
GOBJ = ivyloop.o timer.o ivysocket.o givy.o ivybuffer.o ivybind.o intervalRegexp.o
XTOBJ = ivyxtloop.o ivysocket.o ivy.o ivybuffer.o ivybind.o intervalRegexp.o
GLIBOBJ = ivyglibloop.o ivysocket.o ivy.o ivybuffer.o ivybind.o intervalRegexp.o
@@ -100,15 +104,20 @@ GLUTOBJ = ivyglutloop.o ivysocket.o ivy.o ivybuffer.o ivybind.o intervalRegexp.o
TCLOBJ = ivytcl.o timer.o ivysocket.o givy.o ivybuffer.o ivybind.o intervalRegexp.o
# WINDOWS add ivyloop.o if TCL_CHANNEL_INTEGRATION is not set
-TARGETS = ivyprobe ivyperf ivyglibprobe ivyxtprobe
+TARGETS = ivyprobe ivythroughput ivyperf ivyglibprobe ivyxtprobe ivyprobe_efence
TARGETLIBS=libivy.so.$(MAJOR).$(MINOR) libgivy.so.$(MAJOR).$(MINOR) libxtivy.so.$(MAJOR).$(MINOR) libglibivy.so.$(MAJOR).$(MINOR) libtclivy.so.$(MAJOR).$(MINOR)
+OMP_TARGET= libivy_omp.so.$(MAJOR).$(MINOR) libivy_omp.a ivyprobe_omp ivythroughput_omp
# not yet need Modified Glut ivyglutprobe
.c.o:
$(CC) $(CFLAGS) -c $*.c
+.cpp.o:
+ $(CPP) $(CFLAGS) -c $*.cpp
all: static-libs commands shared-libs pkgconf
+omp: $(OMP_TARGET)
+
static-libs: libivy.a libgivy.a libxtivy.a libglibivy.a libtclivy.a
# not yet need Modified Glut libglutivy.a
@@ -127,6 +136,12 @@ ivytcl.o: ivytcl.c
givy.o: ivy.c
$(CC) -c $(CFLAGS) -DDEBUG -o givy.o ivy.c
+ivy_omp.o: ivy.c
+ $(CC) -c $(OMPCFLAGS) -o ivy_omp.o ivy.c
+
+ivysocket_omp.o: ivysocket.c
+ $(CC) -c $(OMPCFLAGS) -o ivysocket_omp.o ivysocket.c
+
ivyglutloop.o: ivyglutloop.c ivyglutloop.h
$(CC) -c $(CFLAGS) $(GLUTINC) ivyglutloop.c
@@ -134,7 +149,21 @@ ivyglibloop.o: ivyglibloop.c ivyglibloop.h
$(CC) -c $(CFLAGS) $(GLIBINC) ivyglibloop.c
ivyprobe: ivyprobe.o libivy.a
- $(CC) $(CFLAGS) -o $@ ivyprobe.o -L. -livy $(PCRELIB) $(EXTRALIB)
+ $(CC) $(CFLAGS) -o $@ ivyprobe.o -L. -livy $(PCRELIB) $(EXTRALIB)
+
+ivythroughput: ivythroughput.o libivy.a
+ $(CPP) $(CFLAGS) -o $@ ivythroughput.o -L. -livy -lpcrecpp $(PCRELIB) $(EXTRALIB)
+
+ivyprobe_efence: ivyprobe.o libivy.a
+ $(CPP) $(CFLAGS) -o $@ ivyprobe.o -L. -livy -lpcrecpp $(PCRELIB) $(EXTRALIB) -lefence
+
+
+
+ivyprobe_omp: ivyprobe.o libivy_omp.a
+ $(CC) $(CFLAGS) -o $@ ivyprobe.o -L. -livy_omp $(PCRELIB) $(EXTRALIB) $(OMPLIB)
+
+ivythroughput_omp: ivythroughput.o libivy_omp.a
+ $(CPP) $(CFLAGS) -o $@ ivythroughput.o -L. -livy_omp -lpcrecpp $(PCRELIB) $(EXTRALIB) $(OMPLIB)
ivyprobe.o : ivyprobe.c
$(CC) $(CFLAGS) $(REGEXP) -c ivyprobe.c -o $@
@@ -162,7 +191,11 @@ ivyglutprobe: ivyglutprobe.o libglutivy.a
libivy.a: $(OBJ)
rm -f $@
- $(LIBTOOL) $@ $(OBJ)
+ $(LIBTOOL) $@ $(OBJ)
+
+libivy_omp.a: $(OMPOBJ)
+ rm -f $@
+ $(LIBTOOL) $@ $(OMPOBJ)
libgivy.a: $(GOBJ)
rm -f $@
@@ -190,6 +223,11 @@ libivy.so.$(MAJOR).$(MINOR): $(OBJ)
# $(CC) -G -Wl,-h,libivy.so.$(MAJOR) -o $@ $(OBJ) #solaris
# libtool -dynamic -o $@ $(OBJ) $(PCRELIB) -lc
+libivy_omp.so.$(MAJOR).$(MINOR): $(OMPOBJ)
+ $(CC) $(LDFLAGS) -shared -Wl,-soname,libivy.so.$(MAJOR) -o $@ $(OMPOBJ) $(PCRELIB) $(OMPLIB)
+# $(CC) -G -Wl,-h,libivy.so.$(MAJOR) -o $@ $(OBJ) #solaris
+# libtool -dynamic -o $@ $(OBJ) $(PCRELIB) -lc
+
libgivy.so.$(MAJOR).$(MINOR): $(GOBJ)
$(CC) $(LDFLAGS) -shared -Wl,-soname,libgivy.so.$(MAJOR) -o $@ $(GOBJ) $(PCRELIB)
# $(CC) -G -Wl,-h,libgivy.so.$(MAJOR) -o $@ $(GOBJ) #solaris
@@ -217,7 +255,7 @@ libtclivy.so.$(MAJOR).$(MINOR): $(TCLOBJ)
distclean: clean
clean:
- -rm -f $(TARGETS) $(TARGETLIBS) *.o *.a *.so *.so.* *~
+ -rm -f $(TARGETS) $(TARGETLIBS) $(OMP_TARGET) *.o *.a *.so *.so.* *~
-rm -f ivy-glib.pc
installlibs: static-libs shared-libs
@@ -225,6 +263,7 @@ installlibs: static-libs shared-libs
-test -d $(DESTDIR)$(X11_PREFIX)$(LIB) || mkdirhier $(DESTDIR)$(X11_PREFIX)$(LIB)
install -m644 libivy.a $(DESTDIR)$(PREFIX)$(LIB)
+ install -m644 libivy_omp.a $(DESTDIR)$(PREFIX)$(LIB)
install -m644 libgivy.a $(DESTDIR)$(PREFIX)$(LIB)
-install -m644 libxtivy.a $(DESTDIR)$(X11_PREFIX)$(LIB)
install -m644 libtclivy.a $(DESTDIR)$(PREFIX)$(LIB)
@@ -235,6 +274,7 @@ installlibs: static-libs shared-libs
-install -m644 libxtivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(X11_PREFIX)$(LIB)
install -m644 libtclivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)
install -m644 libglibivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)
+ install -m644 libivy_omp.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)
includes:
test -d $(DESTDIR)$(PREFIX)/include/Ivy || mkdirhier $(DESTDIR)$(PREFIX)/include/Ivy
@@ -262,10 +302,12 @@ installliblinks: installlibs
-ln -fs $(X11_PREFIX)$(LIB)/libxtivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(X11_PREFIX)$(LIB)/libxtivy.so.$(MAJOR)
ln -fs $(PREFIX)$(LIB)/libtclivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)/libtclivy.so
ln -fs $(PREFIX)$(LIB)/libtclivy.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)/libtclivy.so.$(MAJOR)
+ ln -fs $(PREFIX)$(LIB)/libivy_omp.so.$(MAJOR).$(MINOR) $(DESTDIR)$(PREFIX)$(LIB)/libivy_omp.so
installbins: commands
test -d $(DESTDIR)$(PREFIX)/bin || mkdirhier $(DESTDIR)$(PREFIX)/bin
install -m755 ivyprobe $(DESTDIR)$(PREFIX)/bin
+ install -m755 ivyprobe_omp $(DESTDIR)$(PREFIX)/bin
#install -m755 ivyxtprobe $(DESTDIR)$(PREFIX)/bin
#install -m755 ivyglibprobe $(DESTDIR)$(PREFIX)/bin
#install -m755 ivyglutprobe $(DESTDIR)$(PREFIX)/bin
diff --git a/src/ivy.c b/src/ivy.c
index 635e006..0d146ee 100644
--- a/src/ivy.c
+++ b/src/ivy.c
@@ -2,12 +2,12 @@
*
* Ivy, C interface
*
- * Copyright 1997-2000
+ * Copyright 1997-2008
* Centre d'Etudes de la Navigation Aerienne
*
* Main functions
*
- * Authors: Francois-Regis Colin,Stephane Chatty
+ * Authors: Francois-Regis Colin,Stephane Chatty, Alexandre Bustico
*
* $Id$
*
@@ -15,6 +15,17 @@
* copyright notice regarding this software
*/
+/*
+ TODO : ° version non bloquante
+ ° outil de chasse aux memleak
+ ° mesures de perfo
+ ° compil rejeu en monothread et omp et tests
+ ° compil sur mac et windows pour portabilité
+ */
+
+#ifdef OPENMP
+#include <omp.h>
+#endif
#include <stdlib.h>
#ifdef WIN32
@@ -28,8 +39,11 @@
#include <stdarg.h>
#include <ctype.h>
+
+
#include <fcntl.h>
+#include "uthash.h"
#include "intervalRegexp.h"
#include "ivychannel.h"
#include "ivysocket.h"
@@ -56,7 +70,6 @@
static char* DefaultIvyBus = GenerateIvyBus(DEFAULT_DOMAIN,DEFAULT_BUS);
typedef enum {
-
Bye, /* l'application emettrice se termine */
AddRegexp, /* expression reguliere d'un client */
Msg, /* message reel */
@@ -70,29 +83,51 @@ typedef enum {
Pong /* ivy doit renvoyer ce message à la reception d'un ping */
} MsgType;
-typedef struct _msg_snd *MsgSndPtr;
+
+typedef struct _global_reg_lst *GlobRegPtr;
+typedef struct _msg_snd_dict *MsgSndDictPtr;
+
struct _msg_rcv { /* requete d'emission d'un client */
MsgRcvPtr next;
int id;
const char *regexp; /* regexp du message a recevoir */
- MsgCallback callback; /* callback a declanche a la reception */
+ MsgCallback callback; /* callback a declancher a la reception */
void *user_data; /* stokage d'info client */
};
-struct _msg_snd { /* requete de reception d'un client */
- MsgSndPtr next;
- int id;
- char *str_regexp; /* la regexp sous forme inhumaine */
+
+
+/* liste de regexps source */
+struct _global_reg_lst { /* liste des regexp source */
+ GlobRegPtr next;
+ char *str_regexp; /* la regexp sous forme source */
+ int id; /* son id, differente pour chaque client */
+};
+
+
+/* pour le dictionnaire clef=regexp, valeur = cette struct */
+struct _msg_snd_dict { /* requete de reception d'un client */
+ UT_hash_handle hh; /* makes this structure hashable */
+ char *regexp_src; /* clef du dictionnaire (hash uthash) */
+ IvyClientPtr clientList; /* liste des clients */
IvyBinding binding; /* la regexp sous forme machine */
};
-struct _clnt_lst {
+/* liste de clients, champ de la struct _msg_snd_dict qui est valeur du dictionnaire */
+/* typedef IvyClientPtr */
+struct _clnt_lst_dict {
IvyClientPtr next;
Client client; /* la socket client */
- MsgSndPtr msg_send; /* liste des requetes recues */
char *app_name; /* nom de l'application */
unsigned short app_port; /* port de l'application */
+ int id; /* l'id n'est pas liée uniquement
+ a la regexp, mais au couple
+ regexp, client */
+ GlobRegPtr srcRegList; /* liste de regexp source */
+#ifdef OPENMP
+ int endRegexpReceived;
+#endif // OPENMP
};
/* flag pour le debug en cas de Filter de regexp */
@@ -113,35 +148,70 @@ static unsigned short SupervisionPort;
/* client pour la socket supervision */
static Client broadcast;
-static const char *ApplicationName = 0;
-static const char *ApplicationID = 0;
+static const char *ApplicationName = NULL;
+static const char *ApplicationID = NULL;
/* callback appele sur reception d'un message direct */
-static MsgDirectCallback direct_callback = 0;
-static void *direct_user_data = 0;
+static MsgDirectCallback direct_callback = NULL;
+static void *direct_user_data = NULL;
/* callback appele sur changement d'etat d'application */
static IvyApplicationCallback application_callback;
-static void *application_user_data = 0;
+static void *application_user_data = NULL;
/* callback appele sur ajout suppression de regexp */
static IvyBindCallback application_bind_callback;
-static void *application_bind_data = 0;
+static void *application_bind_data = NULL;
/* callback appele sur demande de terminaison d'application */
static IvyDieCallback application_die_callback;
-static void *application_die_user_data = 0;
+static void *application_die_user_data = NULL;
/* liste des messages a recevoir */
-static MsgRcvPtr msg_recv = 0;
+static MsgRcvPtr msg_recv = NULL;
/* liste des clients connectes */
-static IvyClientPtr clients = 0;
+static IvyClientPtr allClients = NULL;
+
+/* dictionnaire clef : regexp, valeur : liste de clients IvyClientPtr */
+static MsgSndDictPtr messSndByRegexp = NULL;
-static const char *ready_message = 0;
+static const char *ready_message = NULL;
static void substituteInterval (IvyBuffer *src);
+static int RegexpCall (const MsgSndDictPtr msg, const char * const message);
+static int RegexpCallUnique (const MsgSndDictPtr msg, const char * const message,
+ const Client clientUnique);
+
+static void freeClient ( IvyClientPtr client);
+static void delOneClient (const Client client);
+
+static void delRegexpForOneClientFromDictionary (const char *regexp, const IvyClientPtr client);
+static void delAllRegexpsFromDictionary ();
+static void addRegexpToDictionary (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding bind);
+static void changeRegexpInDictionary (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding bind);
+
+static char delRegexpForOneClient (const char *regexp, const IvyClientPtr client, int id);
+static void addRegexp (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding bind);
+static void changeRegexp (const char* regexp, IvyClientPtr client, int id, IvyBinding bind);
+static void addOrChangeRegexp (const char* regexp, IvyClientPtr client, int id, IvyBinding bind);
+static int IvyCheckBuffer( const char* buffer );
+
+#ifdef OPENMP
+static void regenerateRegPtrArrayCache ();
+static void addRegToPtrArrayCache (MsgSndDictPtr newReg);
+static struct {
+ MsgSndDictPtr *msgPtrArray;
+ int size;
+ int numPtr;
+} ompDictCache = {NULL, 0, 0};
+#endif
+
+
/*
* function like strok but do not eat consecutive separator
* */
@@ -169,70 +239,165 @@ static int MsgSendTo( Client client, MsgType msgtype, int id, const char *messag
static void IvyCleanup()
{
IvyClientPtr clnt,next;
+ GlobRegPtr regLst;
+
/* destruction des connexions clients */
- IVY_LIST_EACH_SAFE( clients, clnt, next )
+ IVY_LIST_EACH_SAFE( allClients, clnt, next )
{
/* on dit au revoir */
MsgSendTo( clnt->client, Bye, 0, "" );
SocketClose( clnt->client );
- IVY_LIST_EMPTY( clnt->msg_send );
+ IVY_LIST_EACH (clnt->srcRegList, regLst) {
+ if (regLst->str_regexp != NULL) {
+ free (regLst->str_regexp);
+ regLst->str_regexp = NULL;
+ }
+ }
+ IVY_LIST_EMPTY( clnt->srcRegList );
+ IVY_LIST_REMOVE (allClients, clnt);
}
- IVY_LIST_EMPTY( clients );
+ IVY_LIST_EMPTY( allClients );
+ delAllRegexpsFromDictionary ();
/* destruction des sockets serveur et supervision */
SocketServerClose( server );
SocketClose( broadcast );
}
-static int MsgCall (const char *message, MsgSndPtr msg, IvyClientPtr client)
+
+static int
+ClientCall (IvyClientPtr clnt, const char *message)
{
- int waiting = 0;
- static IvyBuffer buffer = { NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */
- int err;
- int index;
- int arglen;
- const char *arg;
-
- int rc= IvyBindingExec( msg->binding, message );
+ int match_count = 0;
+
+ /* pour toutes les regexp */
+ MsgSndDictPtr msgSendDict;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL;
+ msgSendDict=msgSendDict->hh.next) {
+ match_count += RegexpCallUnique (msgSendDict, message, clnt->client);
+ }
+
+ TRACE_IF( match_count == 0, "Warning no recipient for %s\n",message);
+ /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */
+ if ( match_count == 0 && debug_filter ) {
+ IvyBindindFilterCheck( message );
+ }
+ return match_count;
+}
+
+
+
+static int
+RegexpCall (const MsgSndDictPtr msg, const char * const message)
+{
+ static IvyBuffer bufferArg = {NULL, 0, 0};
+ char bufferId[16];
+ int match_count ;
+ int waiting ;
+ int indx;
+ int arglen;
+ const char *arg;
+ IvyClientPtr clnt;
+ int rc;
+
+#ifdef OPENMP
+#pragma omp threadprivate (bufferArg)
+ /* d'après la doc openmp :
+ Variables with automatic storage duration which are declared in a scope inside the
+ construct are private.
+ Il n'y aurait donc rien à faire pour s'assurer que les variables automatiques soient
+ privées au thread */
+#endif // OPENMP
+
+ match_count = 0;
+ waiting = 0;
+ rc= IvyBindingExec(msg->binding, message );
- if (rc<1) return 0; /* no match */
+ if (rc<1) return 0; /* no match */
- TRACE( "Sending message id=%d '%s'\n",msg->id,message);
+ bufferArg.offset = 0;
+ // bufferArg.size = bufferId.size = 0;
+ // bufferArg.data = bufferId.data = NULL;
- buffer.offset = 0;
- /* il faut essayer d'envoyer le message en une seule fois sur la socket */
- /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */
- /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */
- err = make_message_var( &buffer, "%d %d" ARG_START ,Msg, msg->id);
+ /* il faut essayer d'envoyer le message en une seule fois sur la socket */
+ /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */
+ /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */
- TRACE( "Send matching args count %d\n",rc);
+ TRACE( "Send matching args count %d\n",rc);
+
+ for( indx=1; indx < rc ; indx++ )
+ {
+ IvyBindingMatch (msg->binding, message, indx, &arglen, & arg );
+ make_message_var( &bufferArg, "%.*s" ARG_END , arglen, arg );
+ }
+ make_message_var( &bufferArg, "\n");
- for( index=1; index < rc ; index++ )
- {
- IvyBindingMatch( msg->binding, message, index, &arglen, & arg );
- err = make_message_var( &buffer, "%.*s" ARG_END , arglen, arg );
- }
- err = make_message_var( &buffer, "\n");
- waiting = SocketSendRaw(client->client, buffer.data , buffer.offset);
- if ( waiting )
- fprintf(stderr, "Ivy: Slow client : %s\n", client->app_name );
- return 1;
-}
+ IVY_LIST_EACH(msg->clientList, clnt ) {
+ sprintf (bufferId, "%d %d" ARG_START ,Msg, clnt->id);
+ waiting = SocketSendRawWithId(clnt->client, bufferId, bufferArg.data , bufferArg.offset);
+ match_count++;
+ if ( waiting )
+ fprintf(stderr, "Ivy: Slow client : %s\n", clnt->app_name );
+ }
+ return match_count;
+}
static int
-ClientCall (IvyClientPtr clnt, const char *message)
+RegexpCallUnique (const MsgSndDictPtr msg, const char * const message, const
+ Client clientUnique)
{
- MsgSndPtr msg;
- int match_count = 0;
- /* recherche dans la liste des requetes recues de ce client */
- IVY_LIST_EACH (clnt->msg_send, msg) {
- match_count+= MsgCall (message, msg, clnt);
- }
- return match_count;
+ static IvyBuffer bufferArg = {NULL, 0, 0};
+ char bufferId[16];
+ int match_count ;
+ int waiting ;
+ int indx;
+ int arglen;
+ const char *arg;
+ IvyClientPtr clnt;
+ int rc;
+
+ match_count = 0;
+ waiting = 0;
+ rc= IvyBindingExec(msg->binding, message );
+
+ if (rc<1) return 0; /* no match */
+
+ bufferArg.offset = 0;
+ // bufferArg.size = bufferId.
+ // bufferArg.size = bufferId.size = 0;
+ // bufferArg.data = bufferId.data = NULL;
+
+ /* il faut essayer d'envoyer le message en une seule fois sur la socket */
+ /* pour eviter au maximun de passer dans le select plusieur fois par message du protocole Ivy */
+ /* pour eviter la latence ( PB de perfo detecte par ivyperf ping roudtrip ) */
+
+ TRACE( "Send matching args count %d\n",rc);
+
+ for( indx=1; indx < rc ; indx++ )
+ {
+ IvyBindingMatch (msg->binding, message, indx, &arglen, & arg );
+ make_message_var( &bufferArg, "%.*s" ARG_END , arglen, arg );
+ }
+ make_message_var( &bufferArg, "\n");
+
+ IVY_LIST_EACH(msg->clientList, clnt ) {
+ if (clientUnique != clnt->client)
+ continue;
+ sprintf (bufferId, "%d %d" ARG_START ,Msg, clnt->id);
+ waiting = SocketSendRawWithId(clnt->client, bufferId, bufferArg.data , bufferArg.offset);
+ match_count++;
+ if ( waiting )
+ fprintf(stderr, "Ivy: Slow client : %s\n", clnt->app_name );
+ }
+ return match_count;
}
+
+
+
static int CheckConnected( IvyClientPtr clnt )
{
IvyClientPtr client;
@@ -242,7 +407,7 @@ static int CheckConnected( IvyClientPtr clnt )
if ( clnt->app_port == 0 ) /* Old Ivy Protocol Dont check */
return 0;
/* recherche dans la liste des clients de la presence de clnt */
- IVY_LIST_EACH( clients, client )
+ IVY_LIST_EACH( allClients, client )
{
/* client different mais port identique */
if ( (client != clnt) && (clnt->app_port == client->app_port) )
@@ -265,18 +430,19 @@ static int CheckConnected( IvyClientPtr clnt )
return 0;
}
+
+
static void Receive( Client client, void *data, char *line )
{
IvyClientPtr clnt;
int err,id;
- MsgSndPtr snd;
MsgRcvPtr rcv;
int argc = 0;
char *argv[MAX_MATCHING_ARGS];
char *arg;
int kind_of_msg = Bye;
- IvyBinding bind;
-
+ IvyBinding ivyBind;
+
const char *errbuf;
int erroffset;
@@ -311,7 +477,7 @@ static void Receive( Client client, void *data, char *line )
if ( !IvyBindingFilter( arg ) )
{
- TRACE("Warning: regexp '%s' illegal, removing from %s\n",arg,ApplicationName);
+ TRACE("Warning: regexp '%s' filtered, removing from %s\n",arg,ApplicationName);
if ( application_bind_callback )
{
@@ -320,58 +486,33 @@ static void Receive( Client client, void *data, char *line )
return;
}
- bind = IvyBindingCompile( arg, & erroffset, & errbuf );
- if ( bind != NULL )
- {
- /* On teste si c'est un change regexp : changement de regexp d'une id existante */
- IVY_LIST_ITER( clnt->msg_send, snd, ( id != snd->id ));
- if ( snd ) {
- free (snd->str_regexp);
- snd->str_regexp = strdup( arg );
- snd->binding = bind;
- if ( application_bind_callback )
- {
- (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyChangeBind );
- }
- } else {
- IVY_LIST_ADD_START( clnt->msg_send, snd )
- snd->id = id;
- snd->str_regexp = strdup( arg );
- snd->binding = bind;
- if ( application_bind_callback )
- {
- (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyAddBind );
- }
- IVY_LIST_ADD_END( clnt->msg_send, snd )
-
- }
- }
- else
- {
- printf("Error compiling '%s', %s\n", arg, errbuf);
- MsgSendTo( client, Error, erroffset, errbuf );
+ ivyBind = IvyBindingCompile( arg, & erroffset, & errbuf );
+ if ( ivyBind != NULL ) {
+ addOrChangeRegexp (arg, clnt, id, ivyBind);
+ } else {
+ printf("Error compiling '%s', %s\n", arg, errbuf);
+ MsgSendTo( client, Error, erroffset, errbuf );
}
break;
case DelRegexp:
-
- TRACE("Regexp Delete id=%d\n", id);
-
- IVY_LIST_ITER( clnt->msg_send, snd, ( id != snd->id ));
- if ( snd )
- {
- if ( application_bind_callback )
- {
- (*application_bind_callback)( clnt, application_bind_data, id, snd->str_regexp, IvyRemoveBind );
- }
- IvyBindingFree( snd->binding );
- IVY_LIST_REMOVE( clnt->msg_send, snd );
- }
- break;
+
+ TRACE("Regexp Delete id=%d\n", id);
+ if (delRegexpForOneClient (arg, clnt, id)) {
+ if ( application_bind_callback ) {
+ (*application_bind_callback)( clnt, application_bind_data, id, arg,
+ IvyRemoveBind );
+ }
+ }
+ break;
case StartRegexp:
TRACE("Regexp Start id=%d Application='%s'\n", id, arg);
+#ifdef OPENMP
+ clnt->endRegexpReceived=0;
+#endif // OPENMP
+
clnt->app_name = strdup( arg );
clnt->app_port = id;
if ( CheckConnected( clnt ) )
@@ -395,9 +536,13 @@ static void Receive( Client client, void *data, char *line )
{
int count;
count = ClientCall( clnt, ready_message );
-
- TRACE(" Sendind ready message %d\n", count);
+ // count = IvySendMsg ("%s", ready_message );
+ // printf ("%s sending READY MESSAGE %d\n", clnt->app_name, count);
}
+#ifdef OPENMP
+ clnt->endRegexpReceived=1;
+ regenerateRegPtrArrayCache();
+#endif // OPENMP
break;
case Msg:
@@ -461,9 +606,7 @@ static IvyClientPtr SendService( Client client, const char *appname )
{
IvyClientPtr clnt;
MsgRcvPtr msg;
- IVY_LIST_ADD_START( clients, clnt )
-
- clnt->msg_send = 0;
+ IVY_LIST_ADD_START( allClients, clnt )
clnt->client = client;
clnt->app_name = strdup(appname);
clnt->app_port = 0;
@@ -474,51 +617,44 @@ static IvyClientPtr SendService( Client client, const char *appname )
}
MsgSendTo( client, EndRegexp, 0, "");
- IVY_LIST_ADD_END( clients, clnt )
+ IVY_LIST_ADD_END( allClients, clnt )
+ //printf ("DBG> SendService addAllClient: name=%s; client->client=%p\n", appname, clnt->client);
+
return clnt;
}
static void ClientDelete( Client client, void *data )
{
IvyClientPtr clnt;
- MsgSndPtr msg;
+
#ifdef DEBUG
char *remotehost;
unsigned short remoteport;
#endif
clnt = (IvyClientPtr)data;
- if ( application_callback )
- {
- (*application_callback)( clnt, application_user_data, IvyApplicationDisconnected );
- }
+ if ( application_callback ) {
+ (*application_callback)( clnt, application_user_data, IvyApplicationDisconnected );
+ }
#ifdef DEBUG
/* probably bogus call, but this is for debug only anyway */
SocketGetRemoteHost( client, &remotehost, &remoteport );
TRACE("Deconnexion de %s:%hu\n", remotehost, remoteport );
#endif /*DEBUG */
-
- if ( clnt->app_name ) free( clnt->app_name );
- IVY_LIST_EACH( clnt->msg_send, msg)
- {
- /*regfree(msg->regexp);*/
- free( msg->str_regexp);
- }
- IVY_LIST_EMPTY( clnt->msg_send );
- IVY_LIST_REMOVE( clients, clnt );
+ delOneClient (client);
}
static void *ClientCreate( Client client )
{
-#ifdef DEBUG
- char *remotehost;
- unsigned short remoteport;
- SocketGetRemoteHost( client, &remotehost, &remoteport );
- TRACE("Connexion de %s:%hu\n", remotehost, remoteport );
-#endif /*DEBUG */
-
- return SendService (client, "Unknown");
+ // #ifdef DEBUG
+ char *remotehost;
+ unsigned short remoteport;
+ SocketGetRemoteHost( client, &remotehost, &remoteport );
+ printf ("%s : Connexion de %s:%hu client=%p\n",
+ ApplicationName, remotehost, remoteport, client);
+ // #endif /*DEBUG */
+ return SendService (client, "Unknown");
}
static void BroadcastReceive( Client client, void *data, char *line )
@@ -529,17 +665,14 @@ static void BroadcastReceive( Client client, void *data, char *line )
unsigned short serviceport;
char appid[2048];
char appname[2048];
-#ifdef DEBUG
unsigned short remoteport;
char *remotehost = 0;
-#endif
+
memset( appid, 0, sizeof( appid ) );
memset( appname, 0, sizeof( appname ) );
err = sscanf (line,"%d %hu %s %[^\n]", &version, &serviceport, appid, appname);
if ( err < 2 ) {
/* ignore the message */
- unsigned short remoteport;
- char *remotehost;
SocketGetRemoteHost (client, &remotehost, &remoteport );
printf (" Bad supervision message, expected 'version port' from %s:%d\n",
remotehost, remoteport);
@@ -547,8 +680,6 @@ static void BroadcastReceive( Client client, void *data, char *line )
}
if ( version != VERSION ) {
/* ignore the message */
- unsigned short remoteport;
- char *remotehost = 0;
SocketGetRemoteHost (client, &remotehost, &remoteport );
fprintf (stderr, "Bad Ivy version, expected %d and got %d from %s:%d\n",
VERSION, version, remotehost, remoteport);
@@ -569,6 +700,9 @@ static void BroadcastReceive( Client client, void *data, char *line )
IvyClientPtr clnt;
clnt = SendService( app, appname );
SocketSetData( app, clnt);
+ } else {
+ printf ("SocketConnectAddr error .....\n");
+ SocketSetData( app, NULL);
}
}
static unsigned long currentTime()
@@ -641,7 +775,7 @@ void IvyStart (const char* bus)
int error = 0;
const char* p = bus; /* used for decoding address list */
const char* q; /* used for decoding port number */
- int port;
+ unsigned short port;
/*
@@ -750,7 +884,7 @@ IvyUnbindMsg (MsgRcvPtr msg)
{
IvyClientPtr clnt;
/* Send to already connected clients */
- IVY_LIST_EACH (clients, clnt ) {
+ IVY_LIST_EACH (allClients, clnt ) {
MsgSendTo( clnt->client, DelRegexp,msg->id, "");
}
IVY_LIST_REMOVE( msg_recv, msg );
@@ -783,7 +917,7 @@ IvyBindMsg (MsgCallback callback, void *user_data, const char *fmt_regex, ... )
IVY_LIST_ADD_END( msg_recv, msg )
/* Send to already connected clients */
/* recherche dans la liste des requetes recues de mes clients */
- IVY_LIST_EACH( clients, clnt ) {
+ IVY_LIST_EACH( allClients, clnt ) {
MsgSendTo( clnt->client, AddRegexp,msg->id,msg->regexp);
}
return msg;
@@ -809,15 +943,115 @@ IvyChangeMsg (MsgRcvPtr msg, const char *fmt_regex, ... )
/* Send to already connected clients */
/* recherche dans la liste des requetes recues de mes clients */
- IVY_LIST_EACH( clients, clnt ) {
+ IVY_LIST_EACH( allClients, clnt ) {
MsgSendTo( clnt->client, AddRegexp,msg->id,msg->regexp);
}
return msg;
}
+
+
+
+int IvySendMsg(const char *fmt, ...) /* version dictionnaire */
+{
+ int match_count = 0;
+ static IvyBuffer buffer = { NULL, 0, 0}; /* Use static mem to eliminate multiple call to malloc /free */
+ va_list ap;
+
+ /* construction du buffer message à partir du format et des arguments */
+ if( fmt == 0 || strlen(fmt) == 0 ) return 0;
+ va_start( ap, fmt );
+ buffer.offset = 0;
+ make_message( &buffer, fmt, ap );
+ va_end ( ap );
+
+ /* test du contenu du message */
+ if ( debug_binary_msg )
+ {
+ if ( IvyCheckBuffer( buffer.data ) )
+ return 0;
+ }
+
+ /* pour toutes les regexp */
+
+#ifdef OPENMP
+ //#define TABLEAU_PREALABLE_SEQUENTIEL 1
+#define TABLEAU_PREALABLE 1
+ //#define SINGLE_NOWAIT 1
+ //#define SCHEDULE_GUIDED 1
+ //#define SEQUENTIEL_DEBUG 1
+
+#ifdef SCHEDULE_GUIDED
+ int count;
+#pragma omp parallel default(none) private(count) shared(ompDictCache, buffer) \
+ reduction(+:match_count)
+ {
+#pragma omp for schedule(guided) // après debug mettre schedule(guided, 10)
+ for(count=0; count<ompDictCache.numPtr; count++) {
+ match_count += RegexpCall (ompDictCache.msgPtrArray[count], buffer.data);
+ }
+}
+#endif // SCHEDULE_GUIDED
+
+
+#ifdef TABLEAU_PREALABLE
+ int count; // PARALLEL FOR
+#pragma omp parallel for default(none) private(count) shared(ompDictCache, buffer) \
+ reduction(+:match_count)
+ for(count=0; count<ompDictCache.numPtr; count++) {
+ match_count += RegexpCall (ompDictCache.msgPtrArray[count], buffer.data);
+ }
+#endif // TABLEAU_PREALABLE
+
+
+#ifdef TABLEAU_PREALABLE_SEQUENTIEL
+ int count;
+ for(count=0; count<ompDictCache.numPtr; count++) {
+ match_count += RegexpCall (ompDictCache.msgPtrArray[count], buffer.data);
+ }
+#endif // TABLEAU_PREALABLE_SEQUENTIEL
+
+
+#ifdef SINGLE_NOWAIT // OPEMMP LISTE
+ MsgSndDictPtr msgSendDict;
+#pragma omp parallel default(shared) private(msgSendDict) reduction(+:match_count)
+ for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) {
+#pragma omp single nowait
+ match_count += RegexpCall (msgSendDict, buffer.data);
+ }
+#endif // SINGLE_NOWAIT
+
+#ifdef SEQUENTIEL_DEBUG // OPEMMP LISTE
+ MsgSndDictPtr msgSendDict;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) {
+ match_count += RegexpCall (msgSendDict, buffer.data);
+ }
+#endif // SEQUENTIEL_DEBUG
+
+
+
+#else // PAS OPENMP
+ MsgSndDictPtr msgSendDict;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) {
+ match_count += RegexpCall (msgSendDict, buffer.data);
+ }
+#endif
+
+ TRACE_IF( match_count == 0, "Warning no recipient for %s\n",buffer.data);
+ /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */
+ if ( match_count == 0 && debug_filter )
+ {
+ IvyBindindFilterCheck( buffer.data );
+ }
+ return match_count;
+}
+
+
/* teste de la presence de binaire dans les message Ivy */
static int IvyCheckBuffer( const char* buffer )
{
- char * ptr = buffer;
+ const char * ptr = buffer;
while ( *ptr )
{
if ( *ptr++ < ' ' )
@@ -829,42 +1063,11 @@ static int IvyCheckBuffer( const char* buffer )
}
return 0;
}
-/* emmission d'un message avec formatage a la printf */
-int IvySendMsg(const char *fmt, ...)
-{
- IvyClientPtr clnt;
- int match_count = 0;
- static IvyBuffer buffer = { NULL, 0, 0}; /* Use satic mem to eliminate multiple call to malloc /free */
- va_list ap;
-
- if( fmt == 0 || strlen(fmt) == 0 ) return 0;
- va_start( ap, fmt );
- buffer.offset = 0;
- make_message( &buffer, fmt, ap );
- va_end ( ap );
- /* test du contenu du message */
- if ( debug_binary_msg )
- {
- if ( IvyCheckBuffer( buffer.data ) )
- return 0;
- }
-
- /* recherche dans la liste des requetes recues de mes clients */
- IVY_LIST_EACH (clients, clnt) {
- match_count += ClientCall (clnt, buffer.data);
- }
- TRACE_IF( match_count == 0, "Warning no recipient for %s\n",buffer.data);
- /* si le message n'est pas emit et qu'il y a des filtres alors WARNING */
- if ( match_count == 0 && debug_filter )
- {
- IvyBindindFilterCheck( buffer.data );
- }
- return match_count;
-}
+
void IvySendError( IvyClientPtr app, int id, const char *fmt, ... )
{
- static IvyBuffer buffer = { NULL, 0, 0}; /* Use satic mem to eliminate multiple call to malloc /free */
+ static IvyBuffer buffer = { NULL, 0, 0}; /* Use static mem to eliminate multiple call to malloc /free */
va_list ap;
va_start( ap, fmt );
@@ -943,7 +1146,7 @@ void IvyDefaultBindCallback( IvyClientPtr app, void *user_data, int id, char* re
IvyClientPtr IvyGetApplication( char *name )
{
IvyClientPtr app = 0;
- IVY_LIST_ITER( clients, app, strcmp(name, app->app_name) != 0 );
+ IVY_LIST_ITER( allClients, app, strcmp(name, app->app_name) != 0 );
return app;
}
@@ -952,7 +1155,7 @@ char *IvyGetApplicationList(const char *sep)
static char applist[4096]; /* TODO remove that ugly Thing */
IvyClientPtr app;
applist[0] = '\0';
- IVY_LIST_EACH( clients, app )
+ IVY_LIST_EACH( allClients, app )
{
strcat( applist, app->app_name );
strcat( applist, sep );
@@ -964,11 +1167,11 @@ char **IvyGetApplicationMessages( IvyClientPtr app )
{
#define MAX_REGEXP 4096
static char *messagelist[MAX_REGEXP+1];/* TODO remove that ugly Thing */
- MsgSndPtr msg;
+ GlobRegPtr msg;
int msgCount= 0;
memset( messagelist, 0 , sizeof( messagelist ));
/* recherche dans la liste des requetes recues de ce client */
- IVY_LIST_EACH( app->msg_send, msg )
+ IVY_LIST_EACH( app->srcRegList, msg )
{
messagelist[msgCount++]= msg->str_regexp;
if ( msgCount >= MAX_REGEXP )
@@ -1021,3 +1224,371 @@ static void substituteInterval (IvyBuffer *src)
src->data = dst.data;
}
}
+
+
+static void freeClient ( IvyClientPtr client)
+{
+ GlobRegPtr srcReg;
+
+ /* on libere la chaine nom de l'appli*/
+ if (client->app_name != NULL) {
+ free (client->app_name);
+ client->app_name = NULL;
+ /* on libere la liste des clients */
+ IVY_LIST_EACH (client->srcRegList, srcReg) {
+ if (srcReg->str_regexp != NULL) {
+ free (srcReg->str_regexp);
+ srcReg->str_regexp = NULL;
+ }
+ }
+ IVY_LIST_EMPTY (client->srcRegList);
+ }
+}
+
+
+
+
+
+
+
+static void delRegexpForOneClientFromDictionary (const char *regexp, const IvyClientPtr client)
+{
+ MsgSndDictPtr msgSendDict = NULL;
+ IvyClientPtr client_itr, next;
+ TRACE ("ENTER delRegexpForOneClientFromDictionary clnt=%d, reg='%s'\n", client, regexp);
+
+ HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict);
+ if (msgSendDict != NULL) {
+ /* la clef est trouvée, on itere sur la liste de client associée */
+ IVY_LIST_EACH_SAFE ( msgSendDict->clientList, client_itr, next) {
+ /* pour tester 2 IvyClientPtr, on teste la similarité
+ des pointeur Client qui doivent être uniques */
+ if (client_itr->client == client->client) {
+ /* on a trouve le client : on l'enleve */
+ IVY_LIST_REMOVE (msgSendDict->clientList, client_itr);
+ TRACE ("delRegexpForOneClientFromDictionary : IVY_LIST_REMOVE\n");
+ }
+ }
+ /* si la liste de clients associée à cette regexp est vide */
+ if ((msgSendDict->clientList == NULL) || (IVY_LIST_IS_EMPTY (msgSendDict->clientList))) {
+ TRACE ("delRegexpForOneClientFromDictionary : IvyBindingFree, free, hash_del\n");
+ /* on efface le binding */
+ IvyBindingFree (msgSendDict->binding);
+ /* on enlève l'entrée regexp de la table de hash */
+ HASH_DEL (messSndByRegexp, msgSendDict);
+ /* on efface la clef (regexp source) */
+ free (msgSendDict->regexp_src);
+ }
+ }
+#ifdef OPENMP
+ regenerateRegPtrArrayCache ();
+#endif
+}
+
+
+
+
+static void delAllRegexpsFromDictionary ()
+{
+ MsgSndDictPtr msgSendDict;
+ IvyClientPtr client;
+
+ /* pour toutes les entrees du dictionnaire des regexps */
+ for (msgSendDict=messSndByRegexp; msgSendDict ; msgSendDict=msgSendDict->hh.next) {
+ /* on efface le binding */
+ IvyBindingFree (msgSendDict->binding);
+ /* on efface la clef (regexp source) */
+ free (msgSendDict->regexp_src);
+ /* pour chaque client abonne a cette regexp */
+ IVY_LIST_EACH ( msgSendDict->clientList, client) {
+ freeClient (client);
+ }
+ /* on enleve la liste de regexps */
+ IVY_LIST_EMPTY(msgSendDict->clientList);
+ /* on enleve le couple regexp -> valeur */
+ HASH_DEL(messSndByRegexp, msgSendDict);
+ }
+
+#ifdef OPENMP
+ regenerateRegPtrArrayCache ();
+#endif
+}
+
+
+
+
+
+
+
+
+
+// HASH_ADD_KEYPTR (hh_name, head, key_ptr, key_len, item_ptr)
+// HASH_ADD_STR ( head, keyfield_name, item_ptr)
+
+static void addRegexpToDictionary (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding ivyBind)
+{
+ MsgSndDictPtr msgSendDict = NULL;
+ IvyClientPtr newClient = NULL;
+ /* on cherche si une entrée existe deja pour cette regexp source */
+ HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict);
+ /* l'entree n'existe pas dans le dictionnaire : on la cree */
+ if (msgSendDict == NULL) {
+ msgSendDict = malloc (sizeof (struct _msg_snd_dict));
+ msgSendDict->regexp_src = strdup (regexp);
+ msgSendDict->binding = ivyBind;
+ msgSendDict->clientList = NULL;
+
+ /* HASH_ADD_STR ne fonctionne que si la clef est un tavbleau de char, si c'est un pointeur
+ if faut utiliser HASH_ADD_KEYPTR */
+ HASH_ADD_KEYPTR(hh, messSndByRegexp, msgSendDict->regexp_src, strlen (msgSendDict->regexp_src), msgSendDict);
+#ifdef OPENMP
+ // On ne regenere le cache qu'après recpetion du endregexp, ça permet d'eviter
+ // de regenerer inutilement le cache à chaqye nouvelle regexp initiale
+ // par contre, après le end regexp, il faut regenerer le cache à chaque
+ // nouvel abonnement
+ if (client->endRegexpReceived == 1)
+ addRegToPtrArrayCache (msgSendDict);
+#endif
+ }
+
+
+ /* on ajoute le client à la liste des clients abonnés */
+ IVY_LIST_ADD_START (msgSendDict->clientList, newClient);
+ newClient->app_name = strdup (client->app_name);
+ newClient->app_port = client->app_port;
+ newClient->client = client->client;
+ newClient->id = id;
+ /* au niveau du champ liste de client du dictionnaire, on n'a pas besoin
+ de la liste des regexps sources (qui n'est necessaire que pour
+ la liste globale des clients) */
+ newClient->srcRegList = NULL;
+ IVY_LIST_ADD_END (msgSendDict->clientList, newClient);
+}
+
+
+
+static void changeRegexpInDictionary (const char* regexp, IvyClientPtr client,
+ int id, IvyBinding ivyBind)
+{
+ // printf ("ENTER changeRegexpInDictionary\n");
+ delRegexpForOneClientFromDictionary (regexp, client);
+ addRegexpToDictionary (regexp, client, id, ivyBind);
+}
+
+
+
+
+/* met a jour le dictionnaire et la liste globale */
+static void delOneClient (const Client client)
+{
+ IvyClientPtr client_itr;
+ GlobRegPtr regxpSrc =NULL, next;
+
+ TRACE ("ENTER delOneClient\n");
+ /* on cherche le client dans la liste globale des clients */
+ IVY_LIST_ITER (allClients, client_itr, client_itr->client != client );
+ /* si on le trouve */
+ if (client_itr != NULL) {
+ TRACE ("DEBUG delOneClient %s, client_itr->client=%p, clientRef=%p\n",
+ client_itr->app_name, client_itr->client, client);
+
+
+
+
+ /* pour chaque regexp source de ce client */
+ IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) {
+ /* on met a jour la liste des clients associee a la regexp source */
+ delRegexpForOneClient (regxpSrc->str_regexp, client_itr, regxpSrc->id);
+ /* on libere la memoire associee a la regexp source */
+ if (regxpSrc->str_regexp != NULL) {
+ free (regxpSrc->str_regexp);
+ regxpSrc->str_regexp = NULL;
+ }
+ }
+
+
+ /* on libere la liste de regexp source */
+ IVY_LIST_EMPTY (client_itr->srcRegList);
+ /* on enleve l'entree correspondant a ce client dans la liste globale */
+ IVY_LIST_REMOVE (allClients, client_itr);
+ }
+}
+
+
+static char delRegexpForOneClient (const char *regexp, const IvyClientPtr client, int id)
+{
+ IvyClientPtr client_itr = NULL;
+ GlobRegPtr regxpSrc = NULL, next = NULL;
+ char removed = 0;
+
+ TRACE ("ENTER delRegexpForOneClient id=%d\n", id);
+
+ /* on enleve du dictionnaire */
+
+ delRegexpForOneClientFromDictionary (regexp, client);
+
+ /* on enleve de la liste globale */
+ /* recherche du client */
+ IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client );
+ if (client_itr != NULL) {
+ /* pour chaque regexp source de ce client */
+ IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) {
+ /* si on trouve notre regexp, on la supprime */
+ if (regxpSrc->id == id) {
+ removed = 1;
+ if (regxpSrc->str_regexp != NULL) {
+ free (regxpSrc->str_regexp);
+ regxpSrc->str_regexp = NULL;
+ }
+ TRACE ("DBG> IVY_LIST_REMOVE (%p, %p)\n", client_itr->srcRegList, regxpSrc);
+ IVY_LIST_REMOVE (client_itr->srcRegList, regxpSrc);
+ }
+ }
+ }
+ return (removed);
+}
+
+static void addOrChangeRegexp (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding ivyBind)
+{
+ MsgSndDictPtr msgSendDict = NULL;
+ IvyClientPtr client_itr = NULL;
+
+ // printf ("ENTER addOrChangeRegexp\n");
+ /* on teste si la regexp existe deja et si il faut faire un changeRegexp */
+ HASH_FIND_STR(messSndByRegexp, regexp, msgSendDict);
+ /* la regexp n'existe pas du tout */
+ if (msgSendDict == NULL) {
+ addRegexp (regexp, client, id, ivyBind);
+ } else {
+ /* la regexp existe, mais l'id existe elle pour le client */
+ IVY_LIST_ITER( msgSendDict->clientList, client_itr, ( client_itr->client != client->client));
+ if (( client_itr != NULL) && (client_itr->id == id)) {
+ /* si oui on fait un change regexp */
+ changeRegexp (regexp, client, id, ivyBind);
+ } else {
+ /* si non on fait un add regexp */
+ addRegexp (regexp, client, id, ivyBind);
+ }
+ }
+}
+
+
+static void addRegexp (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding ivyBind)
+{
+ IvyClientPtr client_itr = NULL;
+ GlobRegPtr regxpSrc = NULL;
+
+
+ // printf ("ENTER addRegexp\n");
+
+ /* on ajoute au dictionnaire */
+ addRegexpToDictionary (regexp, client, id, ivyBind);
+
+ /* on ajoute a la liste globale */
+ /* recherche du client */
+ IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client );
+
+ /* si le client n'existe pas, faut le creer */
+ if (client_itr == NULL) {
+/* IVY_LIST_ADD_START (allClients, client_itr); */
+/* client_itr->app_name = strdup (client->app_name); */
+/* client_itr->app_port = client->app_port; */
+/* client_itr->client = client->client; */
+/* client_itr->srcRegList = NULL; */
+/* IVY_LIST_ADD_END (allClients, client_itr); */
+ fprintf(stderr, "addRegexp ERROR\n");
+ }
+
+ /* on ajoute la regexp à la liste de regexps */
+ IVY_LIST_ADD_START (client_itr->srcRegList, regxpSrc);
+ regxpSrc->id = id;
+ regxpSrc->str_regexp = strdup (regexp);
+ IVY_LIST_ADD_END (client_itr->srcRegList, regxpSrc);
+ if (application_bind_callback) {
+ (*application_bind_callback)( client, application_bind_data, id, regexp, IvyAddBind);
+ }
+}
+
+
+
+static void changeRegexp (const char* regexp, IvyClientPtr client, int id,
+ IvyBinding ivyBind)
+{
+ IvyClientPtr client_itr = NULL;
+ GlobRegPtr regxpSrc = NULL, next = NULL;
+ /* on change dans le dictionnaire */
+ // printf ("ENTER changeRegexp\n");
+ changeRegexpInDictionary (regexp, client, id , ivyBind);
+
+ /* on change dans la liste globale */
+ /* recherche du client */
+ IVY_LIST_ITER (allClients, client_itr, client_itr->client != client->client );
+ if (client_itr != NULL) {
+ /* pour chaque regexp source de ce client */
+ IVY_LIST_EACH_SAFE (client_itr->srcRegList, regxpSrc, next) {
+ /* si on trouve notre regexp, on la change */
+ if (regxpSrc->id == id) {
+ free (regxpSrc->str_regexp);
+ regxpSrc->str_regexp = strdup (regexp);
+ }
+ }
+ }
+ if (application_bind_callback) {
+ (*application_bind_callback)( client, application_bind_data, id, regexp, IvyChangeBind);
+ }
+}
+
+#ifdef OPENMP
+static void regenerateRegPtrArrayCache ()
+{
+ int count=0;
+ MsgSndDictPtr msgSendDict;
+ ompDictCache.numPtr = 0;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL ;
+ msgSendDict=msgSendDict->hh.next) {
+ ompDictCache.numPtr++;
+ }
+
+ if (ompDictCache.numPtr >= ompDictCache.size) {
+ ompDictCache.size = (ompDictCache.numPtr*2) + 128;
+ ompDictCache.msgPtrArray = realloc (ompDictCache.msgPtrArray,
+ sizeof (MsgSndDictPtr) * ompDictCache.size);
+ }
+
+
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL ;
+ msgSendDict=msgSendDict->hh.next) {
+ ompDictCache.msgPtrArray [count++] = msgSendDict;
+ }
+}
+
+
+static void addRegToPtrArrayCache (MsgSndDictPtr newReg)
+{
+ int count=0;
+ MsgSndDictPtr msgSendDict;
+ ompDictCache.numPtr = 0;
+
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL ;
+ msgSendDict=msgSendDict->hh.next) {
+ ompDictCache.numPtr++;
+ }
+
+ if (ompDictCache.numPtr >= ompDictCache.size) {
+ ompDictCache.size = (ompDictCache.numPtr*2) + 128;
+ ompDictCache.msgPtrArray = realloc (ompDictCache.msgPtrArray,
+ sizeof (MsgSndDictPtr) * ompDictCache.size);
+ for (msgSendDict=messSndByRegexp; msgSendDict != NULL ;
+ msgSendDict=msgSendDict->hh.next) {
+ ompDictCache.msgPtrArray [count++] = msgSendDict;
+ }
+ } else {
+ // on ajoute juste le nouveau pointeur
+ ompDictCache.msgPtrArray [ompDictCache.numPtr-1] = newReg;
+ }
+}
+#endif // OPENMP
diff --git a/src/ivy.h b/src/ivy.h
index 0ee4542..e7b28bf 100644
--- a/src/ivy.h
+++ b/src/ivy.h
@@ -26,7 +26,7 @@ extern "C" {
#define DEFAULT_BUS 2010
-typedef struct _clnt_lst *IvyClientPtr;
+typedef struct _clnt_lst_dict *IvyClientPtr;
typedef enum { IvyApplicationConnected, IvyApplicationDisconnected } IvyApplicationEvent;
typedef enum { IvyAddBind, IvyRemoveBind, IvyFilterBind, IvyChangeBind } IvyBindEvent;
diff --git a/src/ivybuffer.c b/src/ivybuffer.c
index a47eef5..7228f55 100644
--- a/src/ivybuffer.c
+++ b/src/ivybuffer.c
@@ -25,7 +25,7 @@
#include "ivybuffer.h"
-#define BUFFER_SIZE 4096 /* taille buffer initiale on multiple par deux a chaque realloc */
+#define BUFFER_SIZE 4096 /* taille buffer initiale on multiple par deux a chaque realloc */
/* fonction de formtage a la printf d'un buffer avec reallocation dynamique */
int make_message(IvyBuffer* buffer, const char *fmt, va_list ap)
@@ -81,5 +81,3 @@ int make_message_var(IvyBuffer* buffer, const char *fmt, ... )
return len;
}
-
-
diff --git a/src/ivysocket.c b/src/ivysocket.c
index 91aabf3..d0fd60e 100644
--- a/src/ivysocket.c
+++ b/src/ivysocket.c
@@ -14,6 +14,10 @@
* copyright notice regarding this software
*/
+#ifdef OPENMP
+#include <omp.h>
+#endif
+
#ifdef WIN32
#include <windows.h>
#endif
@@ -71,6 +75,9 @@ struct _client {
char *ptr;
/* user data */
void *data;
+#ifdef OPENMP
+ omp_lock_t fdLock;
+#endif
};
static Server servers_list = NULL;
@@ -95,6 +102,10 @@ static void DeleteSocket(void *data)
(*client->handle_delete) (client, client->data );
shutdown (client->fd, 2 );
close (client->fd );
+#ifdef OPENMP
+ omp_destroy_lock (&(client->fdLock));
+#endif
+
IVY_LIST_REMOVE (clients_list, client );
}
@@ -204,6 +215,10 @@ static void HandleServer(Channel channel, HANDLE fd, void *data)
client->ptr = client->buffer;
client->handle_delete = server->handle_delete;
client->data = (*server->create) (client );
+#ifdef OPENMP
+ omp_init_lock (&(client->fdLock));
+#endif
+
IVY_LIST_ADD_END (clients_list, client );
@@ -335,13 +350,17 @@ void SocketClose (Client client )
IvyChannelRemove (client->channel );
}
-int SocketSendRaw (Client client, char *buffer, int len )
+int SocketSendRaw (const Client client, const char *buffer, const int len )
{
int err;
int waiting= 0;
if (!client)
return waiting;
+
+#ifdef OPENMP
+ omp_set_lock (&(client->fdLock));
+#endif
if ( debug_send )
{
@@ -373,6 +392,67 @@ int SocketSendRaw (Client client, char *buffer, int len )
{
fprintf(stderr,"... OK\n");
}
+#ifdef OPENMP
+ omp_unset_lock (&(client->fdLock));
+#endif
+ // GROS DEBUG
+/* { */
+/* char *toPrint = strndup (buffer, len); */
+/* printf ("DBG> SocketSendRaw [%d] -> '%s'\n", len, toPrint); */
+/* free (toPrint); */
+/* } */
+ // END DEBUG
+
+ return waiting;
+}
+
+int SocketSendRawWithId( const Client client, const char *id, const char *buffer, const int len )
+{
+ int err;
+ int waiting= 0;
+
+ if (!client)
+ return waiting;
+
+#ifdef OPENMP
+ omp_set_lock (&(client->fdLock));
+#endif
+
+ if ( debug_send )
+ {
+ /* try to determine if we are going to block */
+ fd_set wrset;
+ int ready;
+ struct timeval timeout;
+ timeout.tv_usec = 0;
+ timeout.tv_sec = 0;
+ FD_ZERO(&wrset);
+ FD_SET( client->fd, &wrset );
+ ready = select(client->fd+1, 0, &wrset, 0, &timeout);
+ /* fprintf(stderr,"Ivy: select ready=%d fd=%d\n",ready,FD_ISSET( client->fd, &wrset ) ); */
+ if(ready < 0) {
+ perror("Ivy: SocketSendRaw select");
+ }
+ if ( !FD_ISSET( client->fd, &wrset ) )
+ {
+ fprintf(stderr,"Ivy: Client Queue Full Waiting..........");
+ waiting = 1;
+ }
+
+ }
+
+ send (client->fd, id, strlen (id), 0 );
+ err = send (client->fd, buffer, len, 0 );
+ if (err != len )
+ perror ("*** send ***");
+ if ( debug_send && waiting )
+ {
+ fprintf(stderr,"... OK\n");
+ }
+#ifdef OPENMP
+ omp_unset_lock (&(client->fdLock));
+#endif
+
return waiting;
}
@@ -386,6 +466,10 @@ int SocketSend (Client client, char *fmt, ... )
{
int waiting = 0;
static IvyBuffer buffer = {NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */
+#ifdef OPENMP
+#pragma omp threadprivate (buffer)
+#endif
+
va_list ap;
int len;
if (!client)
@@ -407,6 +491,9 @@ void SocketBroadcast ( char *fmt, ... )
{
Client client;
static IvyBuffer buffer = {NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */
+#ifdef OPENMP
+#pragma omp threadprivate (buffer)
+#endif
va_list ap;
int len;
@@ -480,8 +567,13 @@ Client SocketConnectAddr (struct in_addr * addr, unsigned short port,
client->from.sin_family = AF_INET;
client->from.sin_addr = *addr;
client->from.sin_port = htons (port);
+
+#ifdef OPENMP
+ omp_init_lock (&(client->fdLock));
+#endif
IVY_LIST_ADD_END(clients_list, client );
+
return client;
}
/* TODO factoriser avec HandleRead !!!! */
@@ -601,6 +693,9 @@ Client SocketBroadcastCreate (unsigned short port,
client->interpretation = interpretation;
client->ptr = client->buffer;
client->data = data;
+#ifdef OPENMP
+ omp_init_lock (&(client->fdLock));
+#endif
IVY_LIST_ADD_END(clients_list, client );
return client;
@@ -610,6 +705,9 @@ void SocketSendBroadcast (Client client, unsigned long host, unsigned short port
{
struct sockaddr_in remote;
static IvyBuffer buffer = { NULL, 0, 0 }; /* Use satic mem to eliminate multiple call to malloc /free */
+#ifdef OPENMP
+#pragma omp threadprivate (buffer)
+#endif
va_list ap;
int err,len;
diff --git a/src/ivysocket.h b/src/ivysocket.h
index c656312..604e65f 100644
--- a/src/ivysocket.h
+++ b/src/ivysocket.h
@@ -66,7 +66,8 @@ extern void SocketServerClose( Server server );
extern void SocketClose( Client client );
extern int SocketSend( Client client, char *fmt, ... );
-extern int SocketSendRaw( Client client, char *buffer, int len );
+extern int SocketSendRaw( const Client client, const char *buffer, const int len );
+extern int SocketSendRawWithId( const Client client, const char *id, const char *buffer, const int len );
extern char *SocketGetPeerHost( Client client );
extern void SocketSetData( Client client, void *data );
extern void *SocketGetData( Client client );
diff --git a/src/list.h b/src/list.h
index 968f619..2a7ea84 100644
--- a/src/list.h
+++ b/src/list.h
@@ -54,6 +54,9 @@
on place le code d'initialisation de l'objet entre START et END
pour eviter de chainer un objet non initialise
*/
+
+ /* printf ("sizeof (*"#p") = %d\n", sizeof( *p )); \ */ \
+
#define IVY_LIST_ADD_START(list, p ) \
if ((p = (TYPEOF(p)) (malloc( sizeof( *p ))))) \
{ \
@@ -75,6 +78,9 @@ pour eviter de chainer un objet non initialise
#define IVY_LIST_EACH_SAFE( list, p, next )\
for ( p = list ; (next = p ? p->next: p ),p ; p = next )
+#define IVY_LIST_IS_EMPTY( list ) \
+ list == NULL
+
#define IVY_LIST_EMPTY( list ) \
{ \
TYPEOF(list) p; \
diff --git a/src/timer.c b/src/timer.c
index 4e38c52..9718b22 100644
--- a/src/timer.c
+++ b/src/timer.c
@@ -63,7 +63,7 @@ static long currentTime()
static void SetNewTimeout( unsigned long current, unsigned long when )
{
unsigned long time;
- time = when - current;
+ time = (when <= current) ? 0 : when - current;
nextTimeout = when;
selectTimeout.tv_sec = time / MILLISEC;
selectTimeout.tv_usec = (time - selectTimeout.tv_sec* MILLISEC) * MILLISEC;
diff --git a/src/version.h b/src/version.h
index 42d32fd..de934c8 100644
--- a/src/version.h
+++ b/src/version.h
@@ -26,4 +26,4 @@
*
*/
#define IVYMAJOR_VERSION 3
-#define IVYMINOR_VERSION 9
+#define IVYMINOR_VERSION 10