From 53d9a90ba69b8eab217fab6c7f9d3cb742b55a8e Mon Sep 17 00:00:00 2001 From: damiano Date: Wed, 22 Dec 1999 13:37:01 +0000 Subject: *** empty log message *** --- Ivy.pm | 1694 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1694 insertions(+) create mode 100644 Ivy.pm (limited to 'Ivy.pm') diff --git a/Ivy.pm b/Ivy.pm new file mode 100644 index 0000000..1b4d760 --- /dev/null +++ b/Ivy.pm @@ -0,0 +1,1694 @@ +# +# Ivy, Perl interface +# +# Copyright 1997-1999 +# Centre d'Etudes de la Navigation Aerienne +# +# Version 2.0 : api OO, envoi des messages par l'intermediaire +# de thread pour ne jamais bloquer sur un envois. +# +# TODO : +# +# +# - touver un mecanisme permettant de sortir meme quand +# un thread d'emission est bloque sur un send. +# +# +# BUGS: +# -Si une appli abonnee bloque, son thread d'emission +# reste bloque sur un send, le process ne sortira pas +# sur un , mais uniquement sur un kill -TERM pid, +# voire sous irix sur un kill -KILL pid +# +# -Sous irix, si on cree dymaniquement des bus et qu'om les +# tue a rythme eleve, au bout d'un moment perl plante. +# +# pour avoir une version debarassee des commentaires et des blancs +# pour voir la longueur reelle du code : +# perl -ne 'chomp; next if /^\s*$/ || /^\s*#/; s/#.*//; print "$_\n";' Ivy.pm +# +# pour commenter tous les print de debug : +# perl -i.bak -ne 's/(printf?\s*\(?\"DBG)/ \# $1/ unless /^\s*\#/; print $_;' Ivy.pm + +package Ivy ; + +use Sys::Hostname; +use IO::Socket; +use strict ; +use Time::Gettimeofday ; +use Thread; +use Thread::Queue; +use Thread::Signal; +use Fcntl; +use Errno; +use vars qw($VERSION); +$VERSION = '4.0'; + +############################################################################# +#### PROTOTYPES ##### +############################################################################# + +sub init ($%); # methode de classe, permet de renseigner + # tous les parametres globaux. Ces parametres + # seront utilises par new si ils ne sont pas + # donnes lors de l'appel de new. + + +sub new ($%); # verifie la validite de tous les parametres, + # cree et retourne un objet Ivy. Les parametres + # appName, networks, messWhenReady, peuvent + # etre donnes, meme si ils ont deja ete + # donnes dans init, dans ce cas, ce sont ceux + # de new qui prevalent + +sub start ($); # debut de l'integration au bus : + # - cree la socket d'application, recupere le no + # de port + # - cree la socket supervision + # - envoie le "no de port" + # - bind le file descriptor de la socket de + # supervision a la fonction getBonjour pour + # traiter les bonjours + # - bind le fd de connection sur la fonction + # getConnections + # pour etablir les connections "application" + +sub DESTROY ($); # - envoie un BYE et clot les connections + +sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack + # ou d'annuler une precedente association + +sub bindDirect ($$$); # permet d'associer un identifiant de msg direct + # avec une fonction de callBack, ou de l'annuler + +sub sendMsgs ($@) ; # envoie une liste de messages +sub sendAppNameMsgs ($@) ; # envoie une liste de messages precedes + # du nom de l'application +sub sendDirectMsgs ($$$@); # envoie une liste de messages directs a une appli +sub sendDieTo ($$); # envoie un <> a une appli +sub ping ($$$); # teste qu'une appli soit encore vivante +sub mainLoop (); # la mainloop locale (sans tk) +sub stop (); # methode de classe : on delete les bus, mais + # on reste dans la mainloop +sub exit (); # methode de classe : on delete tous les + # bus (donc on ferme proprement toutes les + # connexions). + # Si on est en mainloop locale on sort de la + # mainloop, le code qui suit l'appel mainLoop + # sera execute. + # par contre si on est en mainloop Tk, + # il faut en plus detruire la mainwindow + # pour sortir de la mainloop; +sub after ($$;$); # temps en millisecondes, callback +sub repeat ($$;$); # temps en millisecondes, callback +sub afterCancel ($;$); # l'id d'un cancel ou d'un repeat +sub fileEvent ($$;$); # associe un fd a un callback pour la mainloop locale + + +################ PRIVEE #################################################### +sub _getBonjour ($); # lit le (ou les) bonjour(s) sur le canal de supervision + # et se connecte, verifie qu'il ne se reponds pas lui + # meme, ni qu'il ne repond pas a un service deja connecte + +sub _getConnections ($); # est appele lors d'une demande de connection : + # accepte la connection et mets a jour @sendRegList + # rajoute le fd du canal aux fd a scruter dans la + # boucle d'evenements + +sub _getMessages ($$); # est appele lorqu'un message arrive + +sub _sendWantedRegexp ($$); # envoie les regexp a l'appli distante + +sub _sendLastRegexpToAllreadyConnected ($$) ; # envoie la derniere regexp + # pushee dans @recCbList + # a toutes les applis deja + # connectees +sub _removeFileDescriptor ($$); # on vire un fd et les structures associees +sub _sendErrorTo ($$$); #(fd, error) envoie un message d'erreur a un fd +sub _sendDieTo ($$); #(fd) envoie un message de demande de suicide a un fd +sub _sendMsgTo ($$$); # (fd, message) +sub _pong ($$); # (fd) +sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk +sub _scanAfter () ; # parse si il faut appeler un callback associe a un after +sub _myCanRead (); # interface au select +sub _scanConnStatus ($); # verifie les connections effectuees et + # appelle la fonction $statusFunc +sub _inetAdrByName ($$); # transforme une adresse inet native en chaine + # $host:$port +sub _toBePruned ($$$); +sub _parseIvyBusParam ($); # prends une adresse de bus de la forme + # 143.196.53,DGAC-CENATLS:2010 et + # renvoie une liste de deux elements : + # un numero de port et une ref sur une + # liste d'adresses addr_inet + +sub _senderThread ($$$); # procedure executee dans un thread, qui + # envoie les donnees sur une socket a travers + # une queue. De cette facon, si une appli + # distante bloque, ca ne penalise pas + # l'appli locale, en dehors du fait que la + # queue va grossir jusqu'au not enough memory + # au prix de la memoire c'est pas grave :-) + # deux arguments : la queue et le fd. + +sub _closeSenderThread ($$); # procedure d'arret d'un thread : + # on vide la queue d'emission, on + # place une valeur de sortie dans la queue, + # et on ferme le fd associe a la socket + # d'emission. + +sub _substituteEscapedChar ($$); #permet de transormer une regexp etendue + # 'perl' en regexp de base +############################################################################# +#### CONSTANTES ##### +############################################################################# +use constant VERSION => 3; +use constant MSG_FMT => "%d %d\002%s\n"; + +# par defaut, on diffuse le bonjour en local +use constant BROADCAST_ADDRS => "127.0.0.1" ; + +use constant BYE => 0; +use constant REGEXP => 1; +use constant MSG => 2; +use constant ERROR => 3; +use constant DELREGEXP => 4; +use constant ENDREGEXP => 5; +use constant APP_NAME => 6; +use constant DIRECT_MSG => 7; +use constant DIE => 8; +use constant PING => 9; +use constant PONG => 10; + + +use constant AFTER => 0; +use constant REPEAT => 1; +use constant MAX_TIMOUT => 1000; + +# pour pouvoir employer les regexps perl. Attention lors de l'utilisation +# ne pas mettre un \n dans une chaine entre "" car l'\n sera interprete. +use constant REG_PERLISSISME => ('w' => '[a-zA-Z0-9_]', + 'W' => '[^a-zA-Z0-9_]', + 's' => '[ ]', + 'S' => '[^ ]', + 'd' => '[0-9]', + 'D' => '[^0-9]', + 'n' => '', # Il ne faut pas mettre d'\n : + # c'est un delimiteur pour le bus + 'e' => '[]') ; + + +############################################################################# +#### VARIABLES de CLASSE ##### +############################################################################# + + +# optimisation : si l'on connait les sujets des messages +# qu'on envoie, cette variable contient une liste de +# sujets qui doivent matcher les regexps d'abonnement +# pour que celle ci se soient pas eliminees +my @topicRegexps; + +# les adresses de reseau sur lesquelles ont broadcaste +# suivies du No de port : +# exemples : "143.196.1.255,143.196.2.255:2010" +# "DGAC-CENATLS-PII:DGAC-CENATLS:2010" +# ":2010" <= dans ce cas c'est la valeur +# de reseau de broadcast par defaut qui est prise : 127.255.255.255 +# c.a.d local a la machine +my $ivyBus ; + +# le nom de l'appli pour le bus +my $appName ; + +# message a envoyer a un canal lorsqu'on +# a recu le message endregexp. +my $messWhenReady ; + +# fonction de cb appelee lorsque l'appli a recu l'ordre +# de quitter, on peut dans ce callback fermer +# proprement les ressources avant de sortir. +# ps : ne pas fasire d'exit dans le callback, +# c'est le bus qui s'en charge +my $onDieFunc; + + +# permet de donner des valeurs successives aux constantes permettant +# d'acceder aux differents champs de l'objet +my $constantIndexer =0; + +# pointeur sur la fonction permettant d'associer +# des callbacks a un file desc, (ainsi que de les enlever) +my $fileEventFunc; + +# dans le cas ou l'on soit dans une mainLoop +# locale, cette var pointe une un objet +# de type IO::Select; +my $localLoopSel; + +# table d'ass. handle -> callback +my %localBindByHandle; + +# tableau d'ass [AFTER ou REPEAT, +# timeTotal, deadLine, [callback, arg, arg, ...]] +my %afterList=(); + +my $afterId = 0; + +# timeout le plus petit pour le select +my $selectTimout = MAX_TIMOUT; + + +# liste des bus actifs +my %allBuses = (); + + +############################################################################# +#### CLEFS DES VARIABLES D'INSTANCE ##### +#### ##### +#### l'objet Ivy sera 'blessed' sur une reference sur un array et non ##### +#### sur une table de hash comme pratique courament de facon a ##### +#### 1/ optimiser au niveau vitesse ##### +#### 2/ avoir des clefs sous forme de symboles (use constant...) ##### +#### et nom des clefs sous forme de chaines de caracteres ##### +#### de facon a eviter des erreurs ##### +#### ##### +#### ##### +############################################################################# +use constant servPort => $constantIndexer++; +use constant neededApp => $constantIndexer++; +use constant statusFunc => $constantIndexer++; +use constant supSock => $constantIndexer++; +use constant connSock => $constantIndexer++; +use constant sockList => $constantIndexer++; +use constant queueList => $constantIndexer++; +use constant threadList => $constantIndexer++; +use constant appliList => $constantIndexer++; +use constant sendRegList => $constantIndexer++; +use constant topicRegexps => $constantIndexer++; +use constant recCbList => $constantIndexer++; +use constant directCbList => $constantIndexer++; +use constant cnnxion => $constantIndexer++; +use constant buffByConn => $constantIndexer++; +use constant broadcastPort => $constantIndexer++; +use constant broadcastBuses => $constantIndexer++; +use constant appName => $constantIndexer++; +use constant messWhenReady => $constantIndexer++; + + + +############################################################################# +#### METHODES PUBLIQUES ##### +############################################################################# + + + +############### METHODE DE CLASSE INIT +sub init ($%) +{ + my ($class, %options) = @_; + + # valeurs par defaut pour le parametre : variable d'environnement + # ou valeur cablee, a defaut + my $default_ivyBus = defined $ENV{"IVYBUS"} ? $ENV{"IVYBUS"} : ""; + + my %defaultOptions = ( #PARAMETRES OBLIGATOIRES + -loopMode => undef, + # TK ou LOCAL + + -appName => undef, + # nom de l'appli + + # PARAMETRES FACULTATIFS (avec valeurs par defaut) + + # les adresses de reseau sur lesquelles ont broadcaste + # suivies du No de port : + # exemples : "143.196.1.255,143.196.2.255:2010" + # "DGAC-CENATLS-PII:DGAC-CENATLS:2010" + # ":2010" <= dans ce cas c'est la valeur + # de reseau de broadcast par defaut qui est prise : + # 127.255.255.255 c.a.d local a la machine + -ivyBus => $default_ivyBus, + + -messWhenReady => "_APP NAME READY", + # message de synchro a envoyer quand pret + + -onDieFunc => [sub {}], + # fonction de cb appelee lorsque l'appli a recu l'ordre + # de quitter, on peut dans ce callback fermer + # proprement les ressources avant de sortir. + # ps : ne pas fasire d'exit dans le callback, + # c'est le bus qui s'en charge + + -pruneRegexp => [], + # optimisation : si l'on connait les sujets des messages + # qu'on envoie, on fournit la liste des sujets + # et les regexps qui ne matchent pas + # ces sujets sont eliminees. + ) ; + + + foreach my $opt (keys %defaultOptions) { + # si un parametre n'a pas ete defini + next if defined $options{$opt} ; + # est-il facultatif + if (defined $defaultOptions{$opt}) { + $options{$opt} = $defaultOptions{$opt} ; + } else { + # parametre obligatoire + die "ERREUR Ivy::init vous devez specifier ". + "l'option $opt\n"; + } + } + + foreach my $opt (keys %options) { + die "ERREUR Ivy::init option $opt inconnue\n" unless + exists ($defaultOptions{$opt}); + } + + my $loopMode = $options{-loopMode}; + $ivyBus = $options{-ivyBus} ne "" ? $options{-ivyBus} : undef; + $appName = $options{-appName} ; + $messWhenReady = + $options{-messWhenReady} eq "_APP NAME READY" ? + "$appName READY" : $options{-messWhenReady}; + + $onDieFunc = $options{-onDieFunc} ; + @topicRegexps = @{$options{-pruneRegexp}}; + + if ($loopMode =~ /local/i) { + # mode boucle d'evenement locale + use IO::Select; + $fileEventFunc = \&fileEvent ; + $localLoopSel = IO::Select->new (); + } elsif ($loopMode =~ /tk/i) { + # mode boucle d'evenement de TK + $fileEventFunc = \&_tkFileEvent ; + } else { + die qq|l'argument "mainloop mode" doit etre TK ou LOCAL\n|; + } + + $SIG{'PIPE'} = 'IGNORE' ; +} + + + +############# METHODE DE CLASSE NEW +sub new ($%) +{ + my ($class, %options) = @_; + my $self = []; + $#{$self} = $constantIndexer; # on predimensionne le tableau + bless ($self, $class); + + # on verifie que la methode de classe init ait ete appelee + unless ((defined $appName) && ($appName ne '')) { + die "ERREUR Ivy::new vous devez initialiser le module via Ivy->init ()"; + } + + # No de port tcp du serveur + $self->[servPort] = ''; + + # liste des applis necessaires a l'appli locale + $self->[neededApp] = []; + + # callback prenant en param 2 refs sur des listes : + # [applis presentes, appli absentes] + # cette fonction est appelee : + # - tout les pollingTime tant que toutes les applis + # ne sont pas presentes + # - des que toutes les applis sont presentes + # - lorsqu'une appli se deconnecte + $self->[statusFunc] = ''; + + # callback prenant en param 1 refs sur une liste : + # [ref sur fonction, parametres] + + # socket de supervision en lecture/ecriture + $self->[supSock] = ''; + + # socket de connexion tcp + $self->[connSock] = ''; + + # tab ass : nom du fd => fd + $self->[sockList] = {}; + + # tab ass : nom du fd => queue de communication entre + # l'appli et le thread qui fait les envois + $self->[queueList] = {}; + + # tab ass : nom du fd => thread qui fait les envois + $self->[threadList] = {}; + + # tab ass : nom de l'appli => fd + $self->[appliList] = {}; + + # tableau ass de liste du type + # sockId => [fonction, fonction, ...] + # pour savoir quoi envoyer a qui + # les fonctions anonymes sont compilees + # dynamiquement a la reception des messages REGEXP + # et filtrent les mess a envoyer et les envoient + # au besoin + $self->[sendRegList] = {}; + + # liste des topics qu'on envoie si on + # filtre les regexps + $self->[topicRegexps] = []; + + # liste de ref sur des couples + # (regexp,callBack) les callbacks + # sont appeles lors de + # la reception de messages en fonction + # du numero de regexp. + $self->[recCbList] = []; + + # liste de callBack pour les messages directs + $self->[directCbList] = []; + + # tableau ass : clef = nom:numero_de port + # permet de verifier qu'on ne se connecte pas + # sur nous meme et qu'on ne se reconnecte + # pas sur un service en cas de bonjours repetes + # valeur : nom de l'application + $self->[cnnxion] = {}; + + # tableau associatif, clef => file desc, + # valeur :buffer au cas ou la lacture ne se termine + # pas par \n, de maniere a resegmenter les messages + $self->[buffByConn] = {}; + + + my %defaultOptions = ( + -appName => $appName, + # nom de l'appli + + # PARAMETRES FACULTATIFS (avec valeurs par defaut) + -messWhenReady => $messWhenReady, + # message de synchro a envoyer quand pret + + + # PARAMETRES FACULTATIFS (avec valeurs par defaut) + + # les adresses de reseau sur lesquelles ont broadcaste + # suivies du No de port : + # exemples : "143.196.1.255,143.196.2.255:2010" + # "DGAC-CENATLS-PII:DGAC-CENATLS:2010" + # ":2010" <= dans ce cas c'est la valeur + # de reseau de broadcast par defaut qui est prise : + # 127.255.255.255 c.a.d local a la machine + -ivyBus => $ivyBus, + + -neededApp => [], + # liste des appplis necessaires + + -statusFunc => sub {}, + # fonction de callBack qui sera appelee tant que + # toutes les applis necessaires ne sont pas presentes, + # et des que toutes les applis necessaires sont + # presentes, et si une appli necessaire se deconnecte + # les trois parametres passes sont : + # [liste des applis presentes], + # [liste des applis absentes], + # [table de hash, clefs = applis presentes, + # valeurs = nombre d'applis . + # normalement ce nombre devrait etre 1, sinon + # ca veut dire que plus d'une appli de meme nom + # tourne sur le meme bus : danger !! + + -pruneRegexp => [@topicRegexps], + # optimisation : si l'on connait les sujets des messages + # qu'on envoie, on fournit la liste des sujets + # et les regexps qui ne matchent pas + # ces sujets sont eliminees. + ) ; + + + foreach my $opt (keys %defaultOptions) { + # si un parametre n'a pas ete defini + next if defined $options{$opt} ; + # est-il facultatif + if (defined $defaultOptions{$opt}) { + $options{$opt} = $defaultOptions{$opt} ; + } else { + # parametre obligatoire + die "ERREUR Ivy::start vous devez specifier ". + "l'option $opt\n"; + } + } + + foreach my $opt (keys %options) { + die "ERREUR Ivy::start option $opt inconnue\n" unless + exists ($defaultOptions{$opt}); + } + + + + $self->[appName] = $options{-appName} ; + $self->[messWhenReady] = $options{-messWhenReady} ; + @{$self->[neededApp]} = @{$options{-neededApp}} ; + $self->[statusFunc] = $options{-statusFunc} ; + $self->[topicRegexps] = $options{-pruneRegexp} ; + $allBuses{$self} = $self; + + ($self->[broadcastPort], $self->[broadcastBuses]) = + _parseIvyBusParam ($options{-ivyBus}); + + + return ($self); +} + + + +############## METHODE DE CLASSE STOP +sub stop () +{ + foreach my $bus (values %allBuses) { + $bus->DESTROY(); + } +} + + +############## METHODE DE CLASSE EXIT +sub exit () +{ + Ivy::stop (); + if (defined $localLoopSel) { + # boucle locale, on sait faire + # printf ("DBG> undefining localLoopSel\n"); + undef $localLoopSel; + } else { + + # afficher les threads qui restent + foreach my $t (Thread->list()) { + next if (($t->tid == Thread->self->tid) || $t->tid == 0); + printf ("DBG> Thread %d is active, flag = %d\n", $t->tid, $t->flags) + if $^W; + } + Tk::exit (); + } +} + + + +################ METHODE START +sub start ($) +{ + my $self = shift; + + # cree la socket de connexion, recupere le no de port + my $connSock = $self->[connSock] = IO::Socket::INET->new(Listen => 128, + Proto => 'tcp', + Reuse => 1) ; + # print ("DBG> opening TCP fd $connSock\n"); + # on memorise tout ca, ce qui evitera par la suite de se + # repondre a soi-meme. On le fait sous nos deux noms : + # le nom de machine et 'localhost' + my $hostAddr = (gethostbyname (hostname()))[4] ; + my $localhostAddr = (gethostbyname ('localhost'))[4] ; + $self->[cnnxion]->{"$hostAddr:". $connSock->sockport} = "\004"; + $self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004"; + + # cree la socket de broadcast + $self->[supSock] = IO::Socket::INET->new ( + LocalPort => $self->[broadcastPort], + Proto => 'udp', + Type => SOCK_DGRAM, + Reuse => 1); + + $self->[supSock]->sockopt (SO_BROADCAST, 1); + fcntl ( $self->[supSock], F_SETFL, O_NDELAY) ; + + # et on envoie envoie le bonjour : "no de version no de port" + my $bonjourMsg = sprintf ("%d %d\n", VERSION, $connSock->sockport()); + + foreach my $netBroadcastAddr (@{$self->[broadcastBuses]}) { + + send ($self->[supSock], $bonjourMsg, 0, $netBroadcastAddr) or + warn "Attention Ivy::start envoi du bonjour a echoue : $!\n"; + } + # callback pour traiter la reception des bonjours + &$fileEventFunc ($self->[supSock], [\&_getBonjour, $self]) ; + + # callback pour traiter les demandes de cxion + &$fileEventFunc ($connSock, [\&_getConnections, $self]) ; +} + + +############### METHODE BIND REGEXP +sub bindRegexp ($$$) +{ + my ($self, $regexp, $cb) = @_; + + # on substitue les meta caracteres des regexps perl : \d, \w, \s, \e + # par les classes de caracteres corespondantes de maniere a ce + # qu'une appli distante non perl comprenne ces regexp. + $regexp =~ s| + ( + (?[recCbList]}+1); $id++) { + last unless (defined $self->[recCbList][$id]) && + @{$self->[recCbList][$id]->[1]}; + } + $self->[recCbList][$id] = [$regexp, $cb]; + + # on envoie les messages regexps aux processus deja connectes + _sendLastRegexpToAllreadyConnected ($self, $id) ; + } else { + # on vire le callback, et on se desabonne de cette regexp + for (my $id=0; $id <= $#{$self->[recCbList]}; $id++) { + next unless (defined $self->[recCbList][$id]) && + @{$self->[recCbList][$id]->[1]}; + if ($self->[recCbList][$id]->[0] eq $regexp) { + $self->[recCbList][$id]->[1] = []; + # on envoie le mesage delregexp + foreach my $fd (values %{$self->[sockList]}) { + Thread::Queue::enqueue ($self->[queueList]->{$fd}, + \sprintf (MSG_FMT, DELREGEXP, $id)); + } + } + } + } +} + +############### METHODE BIND DIRECT +sub bindDirect ($$$) +{ + my ($self, $id, $cb) = @_; + + if ($cb) { + # on rajoute la $cb dans la liste des messages + # qu'on prend + $self->[directCbList][$id] = $cb; + } else { + # on vire le callback + undef $self->[directCbList][$id]; + } +} + + + +############### METHODE SEND MSGS +sub sendMsgs ($@) +{ + use attrs qw(locked); + + my ($self, @msgs) = @_; + my $total = 0; + # pour tous les messages + foreach my $msg (@msgs) { + study ($msg); + + # pour routes les connections + foreach my $fd (keys %{$self->[sockList]}) { + + # pour toutes les fonctions de filtrage de regexp + foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { + $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; + } + } + } +# print "DBG> sended $total times\n"; + return $total; +} + +############### METHODE SEND APP NAME MSGS +sub sendAppNameMsgs ($@) +{ + use attrs qw(locked); + + my ($self, @msgs) = @_; + my $total = 0; + # pour tous les messages + foreach (@msgs) { + my $msg = "$self->[appName] $_"; + study ($msg); + + # pour routes les connections + foreach my $fd (keys %{$self->[sockList]}) { + + # pour toutes les fonctions de filtrage de regexp + foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { + $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; + } + } + } + # print "DBG> sended $total times\n"; + return $total; +} + + + +############### METHODE SEND DIRECT MSGS +sub sendDirectMsgs ($$$@) +{ + use attrs qw(locked); + my ($self, $to, $id, @msgs) = @_; + + if (defined ($self->[appliList]{$to})) { + my @fds = @{$self->[appliList]{$to}}; + # pour tous les messages + foreach my $msg (@msgs) { + foreach my $fd (@fds) { + Thread::Queue::enqueue ($self->[queueList]->{$fd}, + \sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg")); + } + } + return 1; + } else { + warn "Attention : Ivy::sendDirectMsgs appli $to inconnue\n" if $^W; + return 0; + } +} + + +############### METHODE SEND DIE TO +sub sendDieTo ($$) +{ + use attrs qw(locked); + my ($self, $to) = @_; + + if (defined ($self->[appliList]{$to})) { + my @fds = @{$self->[appliList]{$to}}; + warn "Attention : Ivy::sendDieTo gros BUG \@fds est vide \n" + if (scalar (@fds) == 0); + + # pour tous les messages + foreach my $fd (@fds) { + _sendDieTo ($self, $fd); + } + return 1; + } else { + warn "Attention : Ivy::sendDieTo appli $to inconnue\n" if $^W; + return 0; + } +} + + +############### METHODE PING +sub ping ($$$) +{ + use attrs qw(locked); + my ($self, $to, $timeout) = @_; + + if (defined ($self->[appliList]{$to})) { + my @fds = @{$self->[appliList]{$to}}; + # pour tous les messages + foreach my $fd (@fds) { + Thread::Queue::enqueue ($self->[queueList]->{$fd}, + \sprintf (MSG_FMT, PING, 0, " ")); + } + } +} + + +############### METHODE IVY DESTROY +sub DESTROY ($) +{ + my $self = shift; + return unless exists $allBuses{$self}; + + # print ("DBG DESTROY appele sur l'objet $self\n"); + + # pour toutes les connections + foreach my $fd (values %{$self->[sockList]}) { + next unless exists ($self->[queueList]->{$fd}); + + Thread::Queue::enqueue ($self->[queueList]->{$fd}, + \sprintf (MSG_FMT, BYE, 0, "")); + + # on attend un peu avant de fermer la thread, que + # le bye ait le temps d'etre envoye si le + # thread n'est pas bloque par le send. + select (undef, undef, undef, 0.1); + + # on desactive le thread de reception + # et on attend qu'il sorte + $self->_closeSenderThread ($fd); + } + + # on clos la socket de signalisation (UDP) +# print "DBG> fermeture de supSock\n"; + $self->[supSock]->close() if $self->[supSock]; + delete $allBuses{$self}; + + # on clos la socket de connection +# print "DBG> fermeture de connSock\n"; + $self->[connSock]->close() if $self->[connSock]; + undef (@$self); +} + + +############### METHODE MAINLOOP +sub mainLoop () +{ + die "Erreur Ivy->mainLoop, Ivy doit etre initialise en mode". + " loopMode local\n" unless defined $localLoopSel; + + my ($fd, @ready, @allDesc); + + while (defined $localLoopSel) { + @ready = IO::Select::can_read ($localLoopSel, $selectTimout) ; + _scanAfter () ; + + foreach $fd (@ready) { + if (ref $localBindByHandle{$fd} eq 'CODE') { + &{$localBindByHandle{$fd}} ; + } else { + my ($cb, @arg) = @{$localBindByHandle{$fd}} ; + &$cb (@arg) + } + } + } +} + + +############### METHODE AFTER +sub after ($$;$) +{ + # test du premier argument au cas ou la fonction soit + # appelee de maniere objet : premier argument = class ou une instance + # de classe + shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; + + my ($timeAfter, $cbListRef) = @_; + $timeAfter /= 1000; + $selectTimout = $timeAfter if $timeAfter < $selectTimout; + + $afterList{++$afterId} = [AFTER, $timeAfter, + timeofday()+$timeAfter, $cbListRef]; + + return ($afterId); +} + +############### METHODE REPEAT +sub repeat ($$;$) +{ + # test du premier argument au cas ou la fonction soit + # appelee de maniere objet : premier argument = class ou une instance + # de classe + shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; + + # on passe le temps en secondes pour le select + my ($timeAfter, $cbListRef) = @_; + $timeAfter /= 1000; + $selectTimout = $timeAfter if $timeAfter < $selectTimout; + + $afterList{++$afterId}= [REPEAT, $timeAfter, timeofday()+$timeAfter, + $cbListRef]; + return ($afterId); +} + +############### METHODE AFTER CANCEL +sub afterCancel ($;$) +{ + # test du premier argument au cas ou la fonction soit + # appelee de maniere objet : premier argument = class ou une instance + # de classe + shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; + + my $id = shift; + + if (defined ($id) && defined $afterList{$id}) { + if ($afterList{$id}->[1] <= $selectTimout) { + delete $afterList{$id} ; + # le timout de l'after/repeat etait le plus petit des timout + # on cherche donc le plus petit parmi ceux qui restent; + $selectTimout = MAX_TIMOUT; + foreach my $af (values %afterList) { + $selectTimout = $af->[1] if $af->[1] < $selectTimout ; + } + } else { + delete $afterList{$id} ; + } + } +} + + +############### METHODE FILE EVENT +sub fileEvent ($$;$) +{ + # test du premier argument au cas ou la fonction soit + # appelee de maniere objet : premier argument = class ou une instance + # de classe + shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; + + my ($fd, $cb) = @_; + + + unless (defined $localLoopSel) { + die ("Erreur Ivy::fileEvent : Ivy::fileEvent n'est utilisable qu'en ". + "mode mainLoop LOCALE\n"); + } + + if ($cb) { + # adding the handler + $localBindByHandle{$fd} = $cb; + $localLoopSel->add ($fd); + } else { + # deleting the handler + delete $localBindByHandle{$fd}; + # print ("DBG: Ivy::fileEvent : removing fd from the select\n"); + $localLoopSel->remove ($fd); + } +} + + +############################################################################# +#### METHODES PRIVEE ##### +############################################################################# + + +############### METHODE GET BONJOUR +sub _getBonjour ($) +{ + my $self = shift; + + my $bonjourMsg = ''; + + # l'hote distant + my $inetAddr = $self->[supSock]->recv ($bonjourMsg, 1024, 0); + unless (length $inetAddr) { + warn "Attention : Ivy::_getBonjour recv error, bonjour non traite\n"; + return; + } + my $addr = (unpack_sockaddr_in ($inetAddr))[1]; + + my $peerName = gethostbyaddr ($addr, AF_INET); + + # on force $peerPort a etre vu comme une valeur numerique + my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/; + + unless (defined ($version) && defined ($peerPort)) { + warn "Attention : Ivy::_getBonjour format du message bonjour incorrect\n". + "message = $bonjourMsg\n" ; + return; + } + if ($version != VERSION) { + warn "Attention : Ivy::_getBonjour VERSION: demande de connexion de ". + "$peerName\n version courrante : " . VERSION . ", recue : $version\n" ; + return; + } + + # on verifie qu'on ne se repond pas et qu'on ne + # se reconnecte pas a un process deja connecte + if (exists ($self->[cnnxion]->{"$addr:$peerPort"})) { + #print "DBG> : bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ; + return ; + } else { + #print "DBG> : reception de $peerName : bonjour $peerPort\n" ; + } + + + # on verifie que l'adresse fasse partie de l'ensemble de reseau + # definis par ivybus + my $addrInIvyBus = 0; + my @ivyBusAddrList = map ( (unpack_sockaddr_in ($_))[1], + @{$self->[broadcastBuses]}); + # Bon dans cette version on reponds aux bonjour emis par + # la machine locale, on ne peut donc pas avoir + # une appli qui ne causerait qu'a des machines sur une + # autre reseau, si ca embete qqun, qu'il me le dise + push (@ivyBusAddrList, pack ("CCCC", 127,255,255,255)); + push (@ivyBusAddrList, (gethostbyname (hostname()))[4]); + foreach my $ivyBusAddr (@ivyBusAddrList) { + $addrInIvyBus = 1 unless (grep ($_ != 0, unpack ("CCCC", + ($addr & $ivyBusAddr) ^ $addr))); + } + + if ($addrInIvyBus == 0) { + warn "bonjour de $peerName ignore, ne fait pas partie des ivyBus\n" if $^W; + return; + } + + + # ouverture du canal de communication + my $appSock = IO::Socket::INET->new (PeerAddr => $peerName, + PeerPort => $peerPort, + Proto => 'tcp'); + + if ($appSock) { + # on cree la queue et le thread qui vont interfacer les envois + # vers cette appli + $self->[queueList]->{$appSock} = Thread::Queue->new(); + $self->[threadList]->{$appSock} = + Thread->new (\&_senderThread, $self, $self->[queueList]->{$appSock}, + $appSock); + # print "DBG> new thread ${$self->[threadList]->{$appSock}}\n"; + # on cree une entree pour $appSock dans la liste des regexp + $self->[cnnxion]->{"$addr:$peerPort"} = 1; + $self->[sendRegList]{$appSock} = []; + $self->[buffByConn]{$appSock} = ''; + $self->[sockList]->{$appSock} = $appSock; + &$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ; + + # on balance les regexps qui nous interessent a l'appli distante + _sendWantedRegexp ($self, $appSock); + } else { + warn "Attention Ivy::_getBonjour impossible de se connecter au serveur " . + "$peerName:$peerPort\n" ; + } +} + + +############### METHODE GET CONNECTIONS +sub _getConnections ($) +{ + my $self = shift; + + my $appSock = $self->[connSock]->accept(); + + unless (defined $appSock) { + warn "Attention Ivy::_getConnections, \$appSock not defined\n"; + return; + } else { + printf "accepting connection from %s:%d\n", + (gethostbyaddr ($appSock->peeraddr(),AF_INET))[0], + $appSock->peerport() if $^W; + } + + # callback pour traiter la reception des messages + # on cree la queue et le thread qui vont interfacer les envois + # vers cette appli + $self->[queueList]->{$appSock} = Thread::Queue->new(); + $self->[threadList]->{$appSock} = + Thread->new (\&_senderThread, $self, $self->[queueList]->{$appSock}, + $appSock); + # print "DBG> new thread ${$self->[threadList]->{$appSock}}\n"; + &$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ; + + # on cree une entree pour $appSock dans la liste des regexp + $self->[sendRegList]{$appSock} = []; + $self->[buffByConn]{$appSock} = ''; + $self->[sockList]->{$appSock} = $appSock; + # on balance les regexps qui nous interessent a l'appli distante + _sendWantedRegexp ($self, $appSock); +} + + + +############### METHODE GET MESSAGES +sub _getMessages ($$) +{ + my ($self, $appSock) = @_; + my $mess; + my $buffer = ''; + my ($addr, $peerPort, $senderName); + + # on recupere le message + recv ($appSock, $buffer, 65536, 0) ; + unless (length $buffer) { + # message null : broken pipe, ça s'est deconnecte a l'autre bout + # on vire ce fd de la boucle d'evenements ainsi que + # le thread associe. + # print ("DBG : _getMessages, recv err, calling removeFileDesc.\n"); + # Bon la il faudra un jour clarifier ce bordel, lister toutes + # les facons dont un couple d'applis connectee peuevent sortir et + # eviter les dead lock qui doivent subsister. + if (defined ($localLoopSel)) { +# _removeFileDescriptor ($self, $appSock); + _closeSenderThread ($self, $appSock); + } else { + _closeSenderThread ($self, $appSock); + } + return; + } + + if (length ($self->[buffByConn]{$appSock})) { + $buffer = $self->[buffByConn]{$appSock} . $buffer ; + $self->[buffByConn]{$appSock} = ''; + } + my @messages = split ('\n', $buffer) ; + $self->[buffByConn]{$appSock} = pop (@messages) unless + ($buffer =~ /\n$/) ; + +# if (defined $appSock->peername) { + $addr = $appSock->peeraddr(); + $peerPort = $appSock->peerport() ; + $senderName = $self->[cnnxion]->{"$addr:$peerPort"} ; + $senderName = "NONAME" unless $senderName; + + + foreach my $mess (@messages) { +# print "DBG>mess from $senderName *$mess*\n"; + + # on recupere les 3 champs : le type, le numero de regexp, les valeurs + my ($type, $id, $valeurs) = $mess =~ /^(\d+) + \s+ + (\d+) + \002 + (.*)/x ; + + # si ca a chie on rale + (warn "Attention Ivy::_getMessages message mal formatte : $mess\n" + and return) unless defined $type ; + + # sinon on fait en fonction du type de message + if ($type == MSG) { # M S G + # on recupere le couple call back, regexp correspondant + # a l'identifiant et on appelle la fonction avec les parametres + # traites par la regexp + if (my @cb = @{$self->[recCbList][$id]->[1]}) { + my $cb = shift @cb; + # on split sur ETX + &$cb ($senderName, @cb, split ("\003", $valeurs)) ; + } else { + #_sendErrorTo ($appSock, "REEGXP ID $id inconnue"); + warn ("Attention Ivy::_getMessages reception d'un message " . + "MSG : id $id inconnu de $senderName :\n«$mess»"); + } + } elsif ($type == BYE) { + #print "reception d'un bye\n"; + $self->_closeSenderThread ($appSock); + } elsif ($type == REGEXP) { # R E G E X P + # on ajoute une fonction traitant la regexp et envoyant le + # message sur le bon fd dans la liste des fonctions de filtrage + # ca permet de compiler les regexp avec 'once' donc une + # fois pour toute, et ainsi optimiser la vitesse de + # filtrage des messages a envoyer + next if _toBePruned ($self, $senderName, $valeurs); + unless (defined $self->[sendRegList]{$appSock}->[$id]) { + # si l'id de regexp n'etait pas utilisee c'est tout bon + # on affecte la nouvelle regexp a un id + $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_'; + sub { + use strict; + if (my @args = ${$_[0]} =~ /($valeurs)/o) { + shift @args; + $args[$#args] .= "\003" if @args; + my $queue = $self->[queueList]->{$appSock}; + return 0 unless defined $queue; + $queue->enqueue (\sprintf (MSG_FMT, MSG, $id, + join ("\003",@args))); + # print join (' ', "DBG > J'envoie MSG", $id, @args, "\n"); + return 1; + } + }; +_EOL_ + } else { + # l'id de la regexp etait deja utilise, + # et n'a pas ete libere par un message delregexp, + # on renvoie donc un message d'erreur + _sendErrorTo ($self, $appSock, "ID $id deja utilisee"); + } + } elsif ($type == ERROR) { # E R R O R + warn ("Attention Ivy::_getMessages ERREUR recue de ". + "$senderName : «$valeurs»\n"); + } elsif ($type == DELREGEXP) { # D E L R E G E X P + # on vire la regexp des regexps vefifier + $self->[sendRegList]{$appSock}->[$id] = undef ; + } elsif ($type == ENDREGEXP) { # E N D R E G E X P + # on envoie le message ready uniquement a celui qui nous + # a envoye le message endregexp + _sendMsgTo ($self, $appSock, $self->[messWhenReady]); + # on passe de l'etat Connecte a l'etat Ready + $self->[cnnxion]->{"$addr:$peerPort"} =~ s/^\004//; + $senderName = $self->[cnnxion]->{"$addr:$peerPort"}; + unless (exists $self->[appliList]{$senderName}) { + $self->[appliList]->{$senderName} = [$appSock]; + } else { + push @{$self->[appliList]->{$senderName}}, $appSock; + } + _scanConnStatus ($self); + } elsif ($type == APP_NAME) { + # etat Connecte + if (($self->[appName] eq $valeurs) && $^W) { + warn "\033[1mATTENTION : Ivy::_getMessages une instance de ". + "$self->[appName] existe deja\033[m\n" ; + } + $senderName = $valeurs; + $self->[cnnxion]->{"$addr:$peerPort"} = "\004$valeurs"; + } elsif ($type == DIRECT_MSG) { + if (defined $self->[directCbList][$id]) { + my @cb = @{$self->[directCbList][$id]}; + my $cb = shift @cb; + &$cb (@cb, $valeurs); + } else { + _sendErrorTo ($self, $appSock, "DIRECT ID $id inconnue"); + warn "Attention Ivy::_getMessages reception d'un message " . + "DIRECT d'id $id inconnue de $senderName :\n«$mess»"; + } + } elsif ($type == DIE) { + # il faut quitter + # on commence par appeler la callback de fin + my @cb = @{$onDieFunc}; + my $cb = shift @cb; + &$cb (@cb); + # on avertit les autres qu'on se barre + my $adr = _inetAdrByName ($self, $senderName) ; + warn "Attention Ivy::_getMessages reception d'un ordre " . + "de suicide de $senderName ($adr)... exiting\n" if $^W; + # adios + Ivy::exit (); + } elsif ($type == PING) { + # si on recois un ping, on envoie un pong + _pong ($self, $appSock); + } elsif ($type == PONG) { + return PONG; + } else { + _sendErrorTo ($self, $appSock, "TYPE DE MESS $type inconnu"); + warn ("Attention Ivy::_getMessages reception d'un message " . + "de type $type inconnu de $senderName :\n«$mess»"); + } + } +return 0; +} + +############### METHODE SEND WANTED REGEXP +sub _sendWantedRegexp ($$) +{ + my ($self, $appSock) = @_; + + # on envoie le message "Nom appli" + Thread::Queue::enqueue ($self->[queueList]->{$appSock}, + \sprintf (MSG_FMT, APP_NAME, 0, $self->[appName])); + + # on envoie les regexps + for (my $id = 0; $id <= $#{$self->[recCbList]}; $id++) { + next unless defined $self->[recCbList]->[$id]->[1]->[0]; + + Thread::Queue::enqueue ($self->[queueList]->{$appSock}, + \sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0])); + # print sprintf ("DBG > %s %d %s\n", + # 'REGEXP', $id, $self->[recCbList][$id]->[0]); + } + # on envoie le message de fin d'envoi de regexps + Thread::Queue::enqueue ($self->[queueList]->{$appSock}, + \sprintf (MSG_FMT, ENDREGEXP, 0, "")); +} + +############### METHODE SEND LAST REGEXP TO ALLREADY CONNECTED +sub _sendLastRegexpToAllreadyConnected ($$) +{ + my ($self, $id) = @_; + foreach my $fd (values %{$self->[sockList]}) { + Thread::Queue::enqueue ($self->[queueList]->{$fd}, + \sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id][0])); + } +} + +############### METHODE INET ADR BY NAME +sub _inetAdrByName ($$) { + my ($self, $appName) = @_; + + my $addrInet = (grep ($self->[cnnxion]->{$_} eq $appName, + keys %{$self->[cnnxion]}))[0]; + + return ("unknow") unless defined $addrInet; + my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/; + + my $host = (gethostbyaddr ($addr, AF_INET))[0] ; + return "$host:$port"; +} + + +############### METHODE REMOVE FILE DESCRIPTOR +sub _removeFileDescriptor ($$) +{ + my ($self, $fd) = @_; + my $diedAppName; + + + + # on s'est deja occupe de lui + return unless exists $self->[sockList]->{$fd}; + # printf ("DBG> _removeFileDescriptor IN thread %s\n", ${Thread->self}); + + # on efface les structures de donnees associees au fd + # on vire ce fd des fd a scruter dans la bcle d'evenements + # uniquement si on est dans le thread principal + # sinon le select merde salement sur ce coup + &$fileEventFunc ($fd, ''); + delete $self->[sendRegList]->{$fd}; + delete $self->[sockList]->{$fd}; + delete $self->[buffByConn]->{$fd}; + delete $self->[queueList]->{$fd}; + + # on clos la connection + $fd->close (); + +# EXT_LOOP: +# foreach my $name (keys %{$self->[appliList]}) { +# @{$self->[appliList]{$name}} = +# grep ($_ ne $fd, @{$self->[appliList]{$name}}); +# if (length (@{$self->[appliList]{$name}}) == 0) { +# delete $self->[appliList]->{$name}} ; +# } +# } + + EXT_LOOP: + foreach my $name (keys %{$self->[appliList]}) { + foreach my $fdp (@{$self->[appliList]{$name}}) { + if ($fd eq $fdp) { + $diedAppName = $name; + @{$self->[appliList]{$name}} = + grep ($_ ne $fdp, @{$self->[appliList]{$name}}); + if (scalar (@{$self->[appliList]{$name}}) == 0) { + delete $self->[appliList]->{$name}} ; + last EXT_LOOP; + } + } + } + + + unless (defined $diedAppName) { + warn "Ivy::_removeFileDescriptor : deconnection de NONAME\n" if $^W; + return; + } + + my $addrInet = (grep ($self->[cnnxion]->{$_} eq $diedAppName, + keys %{$self->[cnnxion]}))[0]; + unless (defined $addrInet) { + die "ERREUR _removeFileDescriptor deconnection de $diedAppName ". + "addrInet not defined\n"; + return; + } + # printf "DBG> _removeFileDescriptor : deconnection de %s ($diedAppName)\n", _inetAdrByName ($self, $diedAppName); + + delete $self->[cnnxion]->{$addrInet}; + + # on vire l'entree correspondant a ce canal dans la liste des + # regexps par canal + _scanConnStatus ($self) ; +} + + +############### METHODE SEND ERROR TO +sub _sendErrorTo ($$$) +{ + my ($self, $fd, $error) = @_; + + Thread::Queue::enqueue ($self->[queueList]->{$fd}, + \join (' ', ERROR, "0\002$error\n")); +} + + +############### METHODE PONG +sub _pong ($$) +{ + my ($self, $fd) = @_; + + Thread::Queue::enqueue ($self->[queueList]->{$fd}, + \join (' ', PONG, "0\002\n")); +} + + +############### METHODE SEND ERROR TO +sub _sendDieTo ($$) +{ + my ($self, $fd) = @_; + + Thread::Queue::enqueue ($self->[queueList]->{$fd}, + \join (' ', DIE, "0\002\n")); +} + + +############### METHODE SEND MSG TO +sub _sendMsgTo ($$$) +{ + my ($self, $fd, $msg) = @_; + + # pour toutes les fonctions de filtrage de regexp + foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { + &{$regexpFunc} (\$msg) if defined $regexpFunc; + } +} + + +############### METHODE TK FILE EVENT +sub _tkFileEvent ($$) +{ + my ($fd, $cb) = @_; + + Tk::fileevent ('', $fd, 'readable', $cb) ; +} + + + + +############### METHODE SCAN AFTER +sub _scanAfter () +{ + my $stamp = timeofday (); + $selectTimout = MAX_TIMOUT; + foreach my $afk (keys %afterList) { + my $af = $afterList{$afk}; + # si ce timer est a declencher + if ($af->[2] <= $stamp) { + # on traite : le temps de declencher le cb est arrive + if (ref $af->[3] eq 'CODE') { + &{$af->[3]}; + } else { + my ($cb, @args) = @{$af->[3]}; + &$cb (@args); + } + # si c'est un repeat on le reconduit + if ($af->[0]) { + $af->[2] = $stamp + $af->[1] ; + $selectTimout = $af->[1] if $af->[1] < $selectTimout; + } else { + # si c'est un after on le vire + afterCancel ($afk); + } + } else { + my $timeTotrigg = $af->[2] - $stamp; + $selectTimout = $timeTotrigg if $timeTotrigg < $selectTimout; + } + } +} + + +############### METHODE SCAN CONN STATUS +sub _scanConnStatus ($) +{ + my $self = shift; + + my (%readyApp, @nonReadyApp); + + foreach (values %{$self->[cnnxion]}) { + next if $_ eq "1"; + $readyApp{$_}++ unless /^\004/; # connecte mais pas ready + } + + foreach (@{$self->[neededApp]}) { + push (@nonReadyApp, $_) unless exists $readyApp{$_}; + } + + # par compatibilite avec l'ancienne version, on envoie comme + # deux premiers arguments une ref sur la liste des applis presentes, + # la liste des applis absentes, mais on rajoute comme troisieme + # argument une ref sur une table de hash : comme clef les + # applis presentes, comme valeur le nombre d'applis ayant ce nom, + # de facon a detecter plus facilement quand il y a trop d'applis + # de meme nom sur le meme bus. + &{$self->[statusFunc]} ([keys %readyApp], \@nonReadyApp, \%readyApp); +} + + +############### METHODE TO BE PRUNED +sub _toBePruned ($$$) +{ + my ($self, $from, $regexp) = @_; + + + # si il n'y a pas de liste de sujets, on ne + # filtre pas + return 0 unless @{$self->[topicRegexps]}; + + unless ($regexp =~ /^\^/) { + #print "DBG> regexp non ANCREE de $from : $regexp\n"; + return (0); + } + + if ($regexp =~ /^\^(\w+)/) { + my $topic = $1; + if (grep (/$topic/, @{$self->[topicRegexps]})) { + # on a trouve ce topic : on ne filtre pas la regexp + #print "DBG> on garde de $from : $regexp\n"; + return (0); + } + #print "DBG> on ELIMINE de $from : $regexp\n"; + return (1); + } else { + #print "DBG> on garde de $from : $regexp\n"; + return (0); + } +} + + +############### METHODE PARSE IVY BUS PARAM +sub _parseIvyBusParam ($) +{ + my $ivyBus = shift; + my ($ivyNetworks, $ivyPort) = $ivyBus =~ /^(.*):(.*)/; + die ("Erreur Ivy::_parseIvyBusParam format de l'adresse ou ". + "no de port incorrect : $ivyBus\n") + unless $ivyPort =~ /^\d+$/; + + my @ivyAddrInet = (); + + $ivyNetworks =~ s/ //g; + my @broadcastAddrs = split (',', $ivyNetworks); + + foreach my $netAddr (@broadcastAddrs) { + $netAddr = BROADCAST_ADDRS if + (($netAddr eq '') || ($netAddr =~ /^127/) || + ($netAddr =~ /^loopback/)); + + # deux cas de figure : on a un nom de sous reseau, ou + # une adresse ip de la forme \d+.\d+.... + my $netAddrInet; + + if ($netAddr !~ /^(\d+\.)+\d+/) { + # on a un nom de reseau, il faut trouver son adresse ip + # on contourne un bug : Si les adresses sont incompletes + # dans la map network : 143.196.53 au lieu de 143.196.53.255 + # getbyname renvoie une adresse de type 0.143.196.53. + # on doit donc faire un decalage des octets vers la gauche, + # chaque 0 qui sort a gauche est remplace par un 255 a droite. + my $networkAddr = getnetbyname ($netAddr); + unless (defined $networkAddr) { + warn ("Ivy::_parseIvyBusParam reseau inconnu : $netAddr\n"); + next; + } + + my @dummyNetAddr = unpack ("CCCC", pack ('N', $networkAddr)); + while (!$dummyNetAddr[0]) { + # tant que le premier octet est 0, on decale vers la gauche et + # ont fait rentrer un 255 sur la droite + shift @dummyNetAddr; + push (@dummyNetAddr, 255); + } + $netAddrInet = pack ("CCCC", @dummyNetAddr); + } else { + # on a deja une adresse ip, on rajoute les .255 + # a la fin s'ils ont etes omis. + ($netAddr .= ".255.255.255") =~ s/^((\d+\.){3}\d+).*/$1/; + $netAddrInet = inet_aton ($netAddr); + } + + push (@ivyAddrInet, pack_sockaddr_in ($ivyPort, $netAddrInet)); + } + return ($ivyPort, \@ivyAddrInet); +} + + +############# Methode _ SENDER THREAD +sub _senderThread ($$$) { + my ($self, $queue, $fd) = @_; + my $data; + # condition d'arret du thread : + # pour l'arreter on enqueue undef. + while ($data = $queue->dequeue()) { + # si le send echoue, on vire le fd et on sort du thread. +# last unless syswrite ($fd, $$data, length ($$data)); + last unless send ($fd, $$data, 0); + } +} + + +############# Methode _ SENDER THREAD_NON_BLOQUANT + +# tentative pour pouvoir arreter facilement un thread +# bloque sur un send. Mais c'est pas satisfaisant +# car quand le fd est de nouveau accessible en ecriture, +# les donnees accumulees dans l'accumulateur ne sont +# envoyees que lorsque l'on ecrit a nouveau dans +# la queue de communication du thread +# Je laisse ce code pour info. +# sub _senderThread_NON_BLOQUANT ($$$) { +# my ($self, $queue, $fd) = @_; +# my $data; +# my @buff = (); + +# # on ne bloque plus sur ce fd +# fcntl ($fd, F_SETFL, O_NDELAY) ; + +# EXT_LOOP: +# for (;;) { +# # condition d'arret du thread : +# # pour l'arreter on enqueue undef. +# last unless defined ($data = Thread::Queue::dequeue ($queue)); + +# # on colle les donnees dans un accumulateur +# push (@buff, $data); + +# # tant qu'on peut ecrire sur le fd, on le fait +# while ($data = shift @buff) { +# unless (syswrite ($fd, $$data, length ($$data))) { +# # le write a plante : +# if ($! == Errno::EWOULDBLOCK) { +# # si c'est parce que le fd est temporairement innacessible +# # on remets le message dans l'accumulateur et on se remets +# # en attente sur la queue de communication +# unshift (@buff, $data); +# next EXT_LOOP; +# } else { +# # sinon c'est que le fd a ete ferme, dans ce cas le thread sort +# last EXT_LOOP; +# } +# } +# } + +# } +# print ("DBG> sortie de thread\n"); +# } + + + + + +############# Methode _CLOSE SENDER THREAD +sub _closeSenderThread ($$) +{ + my ($self, $fd) = @_; + my $queue = $self->[queueList]->{$fd}; + + # on vide la queue. Oui je sais :-) + # printf ("DBG> on vide la queue dans le thread %s\n", ${Thread->self()}); + if (defined $queue) { + while ($queue->dequeue_nb ()) {}; + $queue->enqueue (undef); + _removeFileDescriptor ($self, $fd); + } + + # printf ("DBG> demande de join du thread ${$self->[threadList]->{$fd}}\n"); + $self->[threadList]->{$fd}->join() if defined $self->[threadList]->{$fd};; + # printf ("DBG> done\n"); +} + + + +############# Procedure _SUBSTITUTE ESCAPED CHAR +sub _substituteEscapedChar ($$) +{ + my ($scope, $reg) = @_; + + my %escapeRegexp = REG_PERLISSISME; + # Si on fait la substitution dans une classe de caractere + # on elimine les crochets. + grep ($escapeRegexp{$_} =~ s/[\[\]]//g, keys %escapeRegexp) + if ($scope eq 'inside') ; + + $reg =~ s/\\([wWsSdDne])/$escapeRegexp{$1}/ge; + return $reg; +} + + +1; -- cgit v1.1