# # Ivy, Perl interface # # Copyright 1997-1999 # Centre d'Etudes de la Navigation Aerienne # # Version 2.0 : api OO, envoi des messages par l'intermediaire # de thread pour ne jamais bloquer sur un envois. # # TODO : # # # - touver un mecanisme permettant de sortir meme quand # un thread d'emission est bloque sur un send. # # # BUGS: # -Si une appli abonnee bloque, son thread d'emission # reste bloque sur un send, le process ne sortira pas # sur un , mais uniquement sur un kill -TERM pid, # voire sous irix sur un kill -KILL pid # # -Sous irix, si on cree dymaniquement des bus et qu'om les # tue a rythme eleve, au bout d'un moment perl plante. # # pour avoir une version debarassee des commentaires et des blancs # pour voir la longueur reelle du code : # perl -ne 'chomp; next if /^\s*$/ || /^\s*#/; s/#.*//; print "$_\n";' Ivy.pm # # pour commenter tous les print de debug : # perl -i.bak -ne 's/(printf?\s*\(?\"DBG)/ \# $1/ unless /^\s*\#/; print $_;' Ivy.pm package Ivy ; use Sys::Hostname; use IO::Socket; use strict ; use Time::Gettimeofday ; use Thread; use Thread::Queue; use Thread::Signal; use Fcntl; use Errno; use vars qw($VERSION); $VERSION = '4.1'; ############################################################################# #### PROTOTYPES ##### ############################################################################# sub init ($%); # methode de classe, permet de renseigner # tous les parametres globaux. Ces parametres # seront utilises par new si ils ne sont pas # donnes lors de l'appel de new. sub new ($%); # verifie la validite de tous les parametres, # cree et retourne un objet Ivy. Les parametres # appName, networks, messWhenReady, peuvent # etre donnes, meme si ils ont deja ete # donnes dans init, dans ce cas, ce sont ceux # de new qui prevalent sub start ($); # debut de l'integration au bus : # - cree la socket d'application, recupere le no # de port # - cree la socket supervision # - envoie le "no de port" # - bind le file descriptor de la socket de # supervision a la fonction getBonjour pour # traiter les bonjours # - bind le fd de connection sur la fonction # getConnections # pour etablir les connections "application" sub DESTROY ($); # - envoie un BYE et clot les connections sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack # ou d'annuler une precedente association sub bindDirect ($$$); # permet d'associer un identifiant de msg direct # avec une fonction de callBack, ou de l'annuler sub sendMsgs ($@) ; # envoie une liste de messages sub sendAppNameMsgs ($@) ; # envoie une liste de messages precedes # du nom de l'application sub sendDirectMsgs ($$$@); # envoie une liste de messages directs a une appli sub sendDieTo ($$); # envoie un <> a une appli sub ping ($$$); # teste qu'une appli soit encore vivante sub mainLoop (); # la mainloop locale (sans tk) sub stop (); # methode de classe : on delete les bus, mais # on reste dans la mainloop sub exit (); # methode de classe : on delete tous les # bus (donc on ferme proprement toutes les # connexions). # Si on est en mainloop locale on sort de la # mainloop, le code qui suit l'appel mainLoop # sera execute. # par contre si on est en mainloop Tk, # il faut en plus detruire la mainwindow # pour sortir de la mainloop; sub after ($$;$); # temps en millisecondes, callback sub repeat ($$;$); # temps en millisecondes, callback sub afterCancel ($;$); # l'id d'un cancel ou d'un repeat sub fileEvent ($$;$); # associe un fd a un callback pour la mainloop locale ################ PRIVEE #################################################### sub _getBonjour ($); # lit le (ou les) bonjour(s) sur le canal de supervision # et se connecte, verifie qu'il ne se reponds pas lui # meme, ni qu'il ne repond pas a un service deja connecte sub _getConnections ($); # est appele lors d'une demande de connection : # accepte la connection et mets a jour @sendRegList # rajoute le fd du canal aux fd a scruter dans la # boucle d'evenements sub _getMessages ($$); # est appele lorqu'un message arrive sub _sendWantedRegexp ($$); # envoie les regexp a l'appli distante sub _sendLastRegexpToAllreadyConnected ($$) ; # envoie la derniere regexp # pushee dans @recCbList # a toutes les applis deja # connectees sub _removeFileDescriptor ($$); # on vire un fd et les structures associees sub _sendErrorTo ($$$); #(fd, error) envoie un message d'erreur a un fd sub _sendDieTo ($$); #(fd) envoie un message de demande de suicide a un fd sub _sendMsgTo ($$$); # (fd, message) sub _pong ($$); # (fd) sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk sub _scanAfter () ; # parse si il faut appeler un callback associe a un after sub _myCanRead (); # interface au select sub _scanConnStatus ($); # verifie les connections effectuees et # appelle la fonction $statusFunc sub _inetAdrByName ($$); # transforme une adresse inet native en chaine # $host:$port sub _toBePruned ($$$); sub _parseIvyBusParam ($); # prends une adresse de bus de la forme # 143.196.53,DGAC-CENATLS:2010 et # renvoie une liste de deux elements : # un numero de port et une ref sur une # liste d'adresses addr_inet sub _senderThread ($$$); # procedure executee dans un thread, qui # envoie les donnees sur une socket a travers # une queue. De cette facon, si une appli # distante bloque, ca ne penalise pas # l'appli locale, en dehors du fait que la # queue va grossir jusqu'au not enough memory # au prix de la memoire c'est pas grave :-) # deux arguments : la queue et le fd. sub _closeSenderThread ($$); # procedure d'arret d'un thread : # on vide la queue d'emission, on # place une valeur de sortie dans la queue, # et on ferme le fd associe a la socket # d'emission. sub _substituteEscapedChar ($$); #permet de transormer une regexp etendue # 'perl' en regexp de base ############################################################################# #### CONSTANTES ##### ############################################################################# use constant VERSION => 3; use constant MSG_FMT => "%d %d\002%s\n"; # par defaut, on diffuse le bonjour en local use constant BROADCAST_ADDRS => "127.0.0.1" ; use constant BYE => 0; use constant REGEXP => 1; use constant MSG => 2; use constant ERROR => 3; use constant DELREGEXP => 4; use constant ENDREGEXP => 5; use constant APP_NAME => 6; use constant DIRECT_MSG => 7; use constant DIE => 8; use constant PING => 9; use constant PONG => 10; use constant AFTER => 0; use constant REPEAT => 1; use constant MAX_TIMOUT => 1000; # pour pouvoir employer les regexps perl. Attention lors de l'utilisation # ne pas mettre un \n dans une chaine entre "" car l'\n sera interprete. use constant REG_PERLISSISME => ('w' => '[a-zA-Z0-9_]', 'W' => '[^a-zA-Z0-9_]', 's' => '[ ]', 'S' => '[^ ]', 'd' => '[0-9]', 'D' => '[^0-9]', 'n' => '', # Il ne faut pas mettre d'\n : # c'est un delimiteur pour le bus 'e' => '[]') ; ############################################################################# #### VARIABLES de CLASSE ##### ############################################################################# # optimisation : si l'on connait les sujets des messages # qu'on envoie, cette variable contient une liste de # sujets qui doivent matcher les regexps d'abonnement # pour que celle ci se soient pas eliminees my @topicRegexps; # les adresses de reseau sur lesquelles ont broadcaste # suivies du No de port : # exemples : "143.196.1.255,143.196.2.255:2010" # "DGAC-CENATLS-PII:DGAC-CENATLS:2010" # ":2010" <= dans ce cas c'est la valeur # de reseau de broadcast par defaut qui est prise : 127.255.255.255 # c.a.d local a la machine my $ivyBus ; # le nom de l'appli pour le bus my $appName ; # message a envoyer a un canal lorsqu'on # a recu le message endregexp. my $messWhenReady ; # fonction de cb appelee lorsque l'appli a recu l'ordre # de quitter, on peut dans ce callback fermer # proprement les ressources avant de sortir. # ps : ne pas fasire d'exit dans le callback, # c'est le bus qui s'en charge my $onDieFunc; # permet de donner des valeurs successives aux constantes permettant # d'acceder aux differents champs de l'objet my $constantIndexer =0; # pointeur sur la fonction permettant d'associer # des callbacks a un file desc, (ainsi que de les enlever) my $fileEventFunc; # dans le cas ou l'on soit dans une mainLoop # locale, cette var pointe une un objet # de type IO::Select; my $localLoopSel; # table d'ass. handle -> callback my %localBindByHandle; # tableau d'ass [AFTER ou REPEAT, # timeTotal, deadLine, [callback, arg, arg, ...]] my %afterList=(); my $afterId = 0; # timeout le plus petit pour le select my $selectTimout = MAX_TIMOUT; # liste des bus actifs my %allBuses = (); ############################################################################# #### CLEFS DES VARIABLES D'INSTANCE ##### #### ##### #### l'objet Ivy sera 'blessed' sur une reference sur un array et non ##### #### sur une table de hash comme pratique courament de facon a ##### #### 1/ optimiser au niveau vitesse ##### #### 2/ avoir des clefs sous forme de symboles (use constant...) ##### #### et nom des clefs sous forme de chaines de caracteres ##### #### de facon a eviter des erreurs ##### #### ##### #### ##### ############################################################################# use constant servPort => $constantIndexer++; use constant neededApp => $constantIndexer++; use constant statusFunc => $constantIndexer++; use constant supSock => $constantIndexer++; use constant connSock => $constantIndexer++; use constant sockList => $constantIndexer++; use constant queueList => $constantIndexer++; use constant threadList => $constantIndexer++; use constant appliList => $constantIndexer++; use constant sendRegList => $constantIndexer++; use constant topicRegexps => $constantIndexer++; use constant recCbList => $constantIndexer++; use constant directCbList => $constantIndexer++; use constant cnnxion => $constantIndexer++; use constant buffByConn => $constantIndexer++; use constant broadcastPort => $constantIndexer++; use constant broadcastBuses => $constantIndexer++; use constant appName => $constantIndexer++; use constant messWhenReady => $constantIndexer++; ############################################################################# #### METHODES PUBLIQUES ##### ############################################################################# ############### METHODE DE CLASSE INIT sub init ($%) { my ($class, %options) = @_; # valeurs par defaut pour le parametre : variable d'environnement # ou valeur cablee, a defaut my $default_ivyBus = defined $ENV{"IVYBUS"} ? $ENV{"IVYBUS"} : ""; my %defaultOptions = ( #PARAMETRES OBLIGATOIRES -loopMode => undef, # TK ou LOCAL -appName => undef, # nom de l'appli # PARAMETRES FACULTATIFS (avec valeurs par defaut) # les adresses de reseau sur lesquelles ont broadcaste # suivies du No de port : # exemples : "143.196.1.255,143.196.2.255:2010" # "DGAC-CENATLS-PII:DGAC-CENATLS:2010" # ":2010" <= dans ce cas c'est la valeur # de reseau de broadcast par defaut qui est prise : # 127.255.255.255 c.a.d local a la machine -ivyBus => $default_ivyBus, -messWhenReady => "_APP NAME READY", # message de synchro a envoyer quand pret -onDieFunc => [sub {}], # fonction de cb appelee lorsque l'appli a recu l'ordre # de quitter, on peut dans ce callback fermer # proprement les ressources avant de sortir. # ps : ne pas fasire d'exit dans le callback, # c'est le bus qui s'en charge -pruneRegexp => [], # optimisation : si l'on connait les sujets des messages # qu'on envoie, on fournit la liste des sujets # et les regexps qui ne matchent pas # ces sujets sont eliminees. ) ; foreach my $opt (keys %defaultOptions) { # si un parametre n'a pas ete defini next if defined $options{$opt} ; # est-il facultatif if (defined $defaultOptions{$opt}) { $options{$opt} = $defaultOptions{$opt} ; } else { # parametre obligatoire die "ERREUR Ivy::init vous devez specifier ". "l'option $opt\n"; } } foreach my $opt (keys %options) { die "ERREUR Ivy::init option $opt inconnue\n" unless exists ($defaultOptions{$opt}); } my $loopMode = $options{-loopMode}; $ivyBus = $options{-ivyBus} ne "" ? $options{-ivyBus} : undef; $appName = $options{-appName} ; $messWhenReady = $options{-messWhenReady} eq "_APP NAME READY" ? "$appName READY" : $options{-messWhenReady}; $onDieFunc = $options{-onDieFunc} ; @topicRegexps = @{$options{-pruneRegexp}}; if ($loopMode =~ /local/i) { # mode boucle d'evenement locale use IO::Select; $fileEventFunc = \&fileEvent ; $localLoopSel = IO::Select->new (); } elsif ($loopMode =~ /tk/i) { # mode boucle d'evenement de TK $fileEventFunc = \&_tkFileEvent ; } else { die qq|l'argument "mainloop mode" doit etre TK ou LOCAL\n|; } $SIG{'PIPE'} = 'IGNORE' ; } ############# METHODE DE CLASSE NEW sub new ($%) { my ($class, %options) = @_; my $self = []; $#{$self} = $constantIndexer; # on predimensionne le tableau bless ($self, $class); # on verifie que la methode de classe init ait ete appelee unless ((defined $appName) && ($appName ne '')) { die "ERREUR Ivy::new vous devez initialiser le module via Ivy->init ()"; } # No de port tcp du serveur $self->[servPort] = ''; # liste des applis necessaires a l'appli locale $self->[neededApp] = []; # callback prenant en param 2 refs sur des listes : # [applis presentes, appli absentes] # cette fonction est appelee : # - tout les pollingTime tant que toutes les applis # ne sont pas presentes # - des que toutes les applis sont presentes # - lorsqu'une appli se deconnecte $self->[statusFunc] = ''; # callback prenant en param 1 refs sur une liste : # [ref sur fonction, parametres] # socket de supervision en lecture/ecriture $self->[supSock] = ''; # socket de connexion tcp $self->[connSock] = ''; # tab ass : nom du fd => fd $self->[sockList] = {}; # tab ass : nom du fd => queue de communication entre # l'appli et le thread qui fait les envois $self->[queueList] = {}; # tab ass : nom du fd => thread qui fait les envois $self->[threadList] = {}; # tab ass : nom de l'appli => fd $self->[appliList] = {}; # tableau ass de liste du type # sockId => [fonction, fonction, ...] # pour savoir quoi envoyer a qui # les fonctions anonymes sont compilees # dynamiquement a la reception des messages REGEXP # et filtrent les mess a envoyer et les envoient # au besoin $self->[sendRegList] = {}; # liste des topics qu'on envoie si on # filtre les regexps $self->[topicRegexps] = []; # liste de ref sur des couples # (regexp,callBack) les callbacks # sont appeles lors de # la reception de messages en fonction # du numero de regexp. $self->[recCbList] = []; # liste de callBack pour les messages directs $self->[directCbList] = []; # tableau ass : clef = nom:numero_de port # permet de verifier qu'on ne se connecte pas # sur nous meme et qu'on ne se reconnecte # pas sur un service en cas de bonjours repetes # valeur : nom de l'application $self->[cnnxion] = {}; # tableau associatif, clef => file desc, # valeur :buffer au cas ou la lacture ne se termine # pas par \n, de maniere a resegmenter les messages $self->[buffByConn] = {}; my %defaultOptions = ( -appName => $appName, # nom de l'appli # PARAMETRES FACULTATIFS (avec valeurs par defaut) -messWhenReady => $messWhenReady, # message de synchro a envoyer quand pret # PARAMETRES FACULTATIFS (avec valeurs par defaut) # les adresses de reseau sur lesquelles ont broadcaste # suivies du No de port : # exemples : "143.196.1.255,143.196.2.255:2010" # "DGAC-CENATLS-PII:DGAC-CENATLS:2010" # ":2010" <= dans ce cas c'est la valeur # de reseau de broadcast par defaut qui est prise : # 127.255.255.255 c.a.d local a la machine -ivyBus => $ivyBus, -neededApp => [], # liste des appplis necessaires -statusFunc => sub {}, # fonction de callBack qui sera appelee tant que # toutes les applis necessaires ne sont pas presentes, # et des que toutes les applis necessaires sont # presentes, et si une appli necessaire se deconnecte # les trois parametres passes sont : # [liste des applis presentes], # [liste des applis absentes], # [table de hash, clefs = applis presentes, # valeurs = nombre d'applis . # normalement ce nombre devrait etre 1, sinon # ca veut dire que plus d'une appli de meme nom # tourne sur le meme bus : danger !! -pruneRegexp => [@topicRegexps], # optimisation : si l'on connait les sujets des messages # qu'on envoie, on fournit la liste des sujets # et les regexps qui ne matchent pas # ces sujets sont eliminees. ) ; foreach my $opt (keys %defaultOptions) { # si un parametre n'a pas ete defini next if defined $options{$opt} ; # est-il facultatif if (defined $defaultOptions{$opt}) { $options{$opt} = $defaultOptions{$opt} ; } else { # parametre obligatoire die "ERREUR Ivy::start vous devez specifier ". "l'option $opt\n"; } } foreach my $opt (keys %options) { die "ERREUR Ivy::start option $opt inconnue\n" unless exists ($defaultOptions{$opt}); } $self->[appName] = $options{-appName} ; $self->[messWhenReady] = $options{-messWhenReady} ; @{$self->[neededApp]} = @{$options{-neededApp}} ; $self->[statusFunc] = $options{-statusFunc} ; $self->[topicRegexps] = $options{-pruneRegexp} ; $allBuses{$self} = $self; ($self->[broadcastPort], $self->[broadcastBuses]) = _parseIvyBusParam ($options{-ivyBus}); return ($self); } ############## METHODE DE CLASSE STOP sub stop () { foreach my $bus (values %allBuses) { $bus->DESTROY(); } } ############## METHODE DE CLASSE EXIT sub exit () { Ivy::stop (); if (defined $localLoopSel) { # boucle locale, on sait faire # printf ("DBG> undefining localLoopSel\n"); undef $localLoopSel; } else { # afficher les threads qui restent foreach my $t (Thread->list()) { next if (($t->tid == Thread->self->tid) || $t->tid == 0); printf ("DBG> Thread %d is active, flag = %d\n", $t->tid, $t->flags) if $^W; } Tk::exit (); } } ################ METHODE START sub start ($) { my $self = shift; # cree la socket de connexion, recupere le no de port my $connSock = $self->[connSock] = IO::Socket::INET->new(Listen => 128, Proto => 'tcp', Reuse => 1) ; # print ("DBG> opening TCP fd $connSock\n"); # on memorise tout ca, ce qui evitera par la suite de se # repondre a soi-meme. On le fait sous nos deux noms : # le nom de machine et 'localhost' my $hostAddr = (gethostbyname (hostname()))[4] ; my $localhostAddr = (gethostbyname ('localhost'))[4] ; $self->[cnnxion]->{"$hostAddr:". $connSock->sockport} = "\004"; $self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004"; # cree la socket de broadcast $self->[supSock] = IO::Socket::INET->new ( LocalPort => $self->[broadcastPort], Proto => 'udp', Type => SOCK_DGRAM, Reuse => 1); $self->[supSock]->sockopt (SO_BROADCAST, 1); fcntl ( $self->[supSock], F_SETFL, O_NDELAY) ; # et on envoie envoie le bonjour : "no de version no de port" my $bonjourMsg = sprintf ("%d %d\n", VERSION, $connSock->sockport()); foreach my $netBroadcastAddr (@{$self->[broadcastBuses]}) { send ($self->[supSock], $bonjourMsg, 0, $netBroadcastAddr) or warn "Attention Ivy::start envoi du bonjour a echoue : $!\n"; } # callback pour traiter la reception des bonjours &$fileEventFunc ($self->[supSock], [\&_getBonjour, $self]) ; # callback pour traiter les demandes de cxion &$fileEventFunc ($connSock, [\&_getConnections, $self]) ; } ############### METHODE BIND REGEXP sub bindRegexp ($$$$) { my ($self, $regexp, $cb, $observer) = @_; # on substitue les meta caracteres des regexps perl : \d, \w, \s, \e # par les classes de caracteres corespondantes de maniere a ce # qu'une appli distante non perl comprenne ces regexp. $regexp =~ s| ( (?[recCbList]}+1); $id++) { last unless (defined $self->[recCbList][$id]) && @{$self->[recCbList][$id]->[1]}; } $self->[recCbList][$id] = [$regexp, $cb, $observer]; # on envoie les messages regexps aux processus deja connectes _sendLastRegexpToAllreadyConnected ($self, $id) ; } else { # on vire le callback, et on se desabonne de cette regexp for (my $id=0; $id <= $#{$self->[recCbList]}; $id++) { next unless (defined $self->[recCbList][$id]) && @{$self->[recCbList][$id]->[1]}; if ($self->[recCbList][$id]->[0] eq $regexp) { $self->[recCbList][$id]->[1] = []; # on envoie le mesage delregexp foreach my $fd (values %{$self->[sockList]}) { Thread::Queue::enqueue ($self->[queueList]->{$fd}, \sprintf (MSG_FMT, DELREGEXP, $id)); } } } } } ############### METHODE BIND DIRECT sub bindDirect ($$$) { my ($self, $id, $cb) = @_; if ($cb) { # on rajoute la $cb dans la liste des messages # qu'on prend $self->[directCbList][$id] = $cb; } else { # on vire le callback undef $self->[directCbList][$id]; } } ############### METHODE SEND MSGS sub sendMsgs ($@) { use attrs qw(locked); my ($self, @msgs) = @_; my $total = 0; # pour tous les messages foreach my $msg (@msgs) { study ($msg); # pour routes les connections foreach my $fd (keys %{$self->[sockList]}) { # pour toutes les fonctions de filtrage de regexp foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; } } } # print "DBG> sended $total times\n"; return $total; } ############### METHODE SEND APP NAME MSGS sub sendAppNameMsgs ($@) { use attrs qw(locked); my ($self, @msgs) = @_; my $total = 0; # pour tous les messages foreach (@msgs) { my $msg = "$self->[appName] $_"; study ($msg); # pour routes les connections foreach my $fd (keys %{$self->[sockList]}) { # pour toutes les fonctions de filtrage de regexp foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; } } } # print "DBG> sended $total times\n"; return $total; } ############### METHODE SEND DIRECT MSGS sub sendDirectMsgs ($$$@) { use attrs qw(locked); my ($self, $to, $id, @msgs) = @_; if (defined ($self->[appliList]{$to})) { my @fds = @{$self->[appliList]{$to}}; # pour tous les messages foreach my $msg (@msgs) { foreach my $fd (@fds) { Thread::Queue::enqueue ($self->[queueList]->{$fd}, \sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg")); } } return 1; } else { warn "Attention : Ivy::sendDirectMsgs appli $to inconnue\n" if $^W; return 0; } } ############### METHODE SEND DIE TO sub sendDieTo ($$) { use attrs qw(locked); my ($self, $to) = @_; if (defined ($self->[appliList]{$to})) { my @fds = @{$self->[appliList]{$to}}; warn "Attention : Ivy::sendDieTo gros BUG \@fds est vide \n" if (scalar (@fds) == 0); # pour tous les messages foreach my $fd (@fds) { _sendDieTo ($self, $fd); } return 1; } else { warn "Attention : Ivy::sendDieTo appli $to inconnue\n" if $^W; return 0; } } ############### METHODE PING sub ping ($$$) { use attrs qw(locked); my ($self, $to, $timeout) = @_; if (defined ($self->[appliList]{$to})) { my @fds = @{$self->[appliList]{$to}}; # pour tous les messages foreach my $fd (@fds) { Thread::Queue::enqueue ($self->[queueList]->{$fd}, \sprintf (MSG_FMT, PING, 0, " ")); } } } ############### METHODE IVY DESTROY sub DESTROY ($) { my $self = shift; return unless exists $allBuses{$self}; # print ("DBG DESTROY appele sur l'objet $self\n"); # pour toutes les connections foreach my $fd (values %{$self->[sockList]}) { next unless exists ($self->[queueList]->{$fd}); Thread::Queue::enqueue ($self->[queueList]->{$fd}, \sprintf (MSG_FMT, BYE, 0, "")); # on attend un peu avant de fermer la thread, que # le bye ait le temps d'etre envoye si le # thread n'est pas bloque par le send. select (undef, undef, undef, 0.1); # on desactive le thread de reception # et on attend qu'il sorte $self->_closeSenderThread ($fd); } # on clos la socket de signalisation (UDP) # print "DBG> fermeture de supSock\n"; $self->[supSock]->close() if $self->[supSock]; delete $allBuses{$self}; # on clos la socket de connection # print "DBG> fermeture de connSock\n"; $self->[connSock]->close() if $self->[connSock]; undef (@$self); } ############### METHODE MAINLOOP sub mainLoop () { die "Erreur Ivy->mainLoop, Ivy doit etre initialise en mode". " loopMode local\n" unless defined $localLoopSel; my ($fd, @ready, @allDesc); while (defined $localLoopSel) { @ready = IO::Select::can_read ($localLoopSel, $selectTimout) ; _scanAfter () ; foreach $fd (@ready) { if (ref $localBindByHandle{$fd} eq 'CODE') { &{$localBindByHandle{$fd}} ; } else { my ($cb, @arg) = @{$localBindByHandle{$fd}} ; &$cb (@arg) } } } } ############### METHODE AFTER sub after ($$;$) { # test du premier argument au cas ou la fonction soit # appelee de maniere objet : premier argument = class ou une instance # de classe shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; my ($timeAfter, $cbListRef) = @_; $timeAfter /= 1000; $selectTimout = $timeAfter if $timeAfter < $selectTimout; $afterList{++$afterId} = [AFTER, $timeAfter, timeofday()+$timeAfter, $cbListRef]; return ($afterId); } ############### METHODE REPEAT sub repeat ($$;$) { # test du premier argument au cas ou la fonction soit # appelee de maniere objet : premier argument = class ou une instance # de classe shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; # on passe le temps en secondes pour le select my ($timeAfter, $cbListRef) = @_; $timeAfter /= 1000; $selectTimout = $timeAfter if $timeAfter < $selectTimout; $afterList{++$afterId}= [REPEAT, $timeAfter, timeofday()+$timeAfter, $cbListRef]; return ($afterId); } ############### METHODE AFTER CANCEL sub afterCancel ($;$) { # test du premier argument au cas ou la fonction soit # appelee de maniere objet : premier argument = class ou une instance # de classe shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; my $id = shift; if (defined ($id) && defined $afterList{$id}) { if ($afterList{$id}->[1] <= $selectTimout) { delete $afterList{$id} ; # le timout de l'after/repeat etait le plus petit des timout # on cherche donc le plus petit parmi ceux qui restent; $selectTimout = MAX_TIMOUT; foreach my $af (values %afterList) { $selectTimout = $af->[1] if $af->[1] < $selectTimout ; } } else { delete $afterList{$id} ; } } } ############### METHODE FILE EVENT sub fileEvent ($$;$) { # test du premier argument au cas ou la fonction soit # appelee de maniere objet : premier argument = class ou une instance # de classe shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; my ($fd, $cb) = @_; unless (defined $localLoopSel) { die ("Erreur Ivy::fileEvent : Ivy::fileEvent n'est utilisable qu'en ". "mode mainLoop LOCALE\n"); } if ($cb) { # adding the handler $localBindByHandle{$fd} = $cb; $localLoopSel->add ($fd); } else { # deleting the handler delete $localBindByHandle{$fd}; # print ("DBG: Ivy::fileEvent : removing fd from the select\n"); $localLoopSel->remove ($fd); } } ############################################################################# #### METHODES PRIVEE ##### ############################################################################# ############### METHODE GET BONJOUR sub _getBonjour ($) { my $self = shift; my $bonjourMsg = ''; # l'hote distant my $inetAddr = $self->[supSock]->recv ($bonjourMsg, 1024, 0); unless (length $inetAddr) { warn "Attention : Ivy::_getBonjour recv error, bonjour non traite\n"; return; } my $addr = (unpack_sockaddr_in ($inetAddr))[1]; my $peerName = gethostbyaddr ($addr, AF_INET); # on force $peerPort a etre vu comme une valeur numerique my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/; unless (defined ($version) && defined ($peerPort)) { warn "Attention : Ivy::_getBonjour format du message bonjour incorrect\n". "message = $bonjourMsg\n" ; return; } if ($version != VERSION) { warn "Attention : Ivy::_getBonjour VERSION: demande de connexion de ". "$peerName\n version courrante : " . VERSION . ", recue : $version\n" ; return; } # on verifie qu'on ne se repond pas et qu'on ne # se reconnecte pas a un process deja connecte if (exists ($self->[cnnxion]->{"$addr:$peerPort"})) { #print "DBG> : bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ; return ; } else { #print "DBG> : reception de $peerName : bonjour $peerPort\n" ; } # on verifie que l'adresse fasse partie de l'ensemble de reseau # definis par ivybus my $addrInIvyBus = 0; my @ivyBusAddrList = map ( (unpack_sockaddr_in ($_))[1], @{$self->[broadcastBuses]}); # Bon dans cette version on reponds aux bonjour emis par # la machine locale, on ne peut donc pas avoir # une appli qui ne causerait qu'a des machines sur une # autre reseau, si ca embete qqun, qu'il me le dise push (@ivyBusAddrList, pack ("CCCC", 127,255,255,255)); push (@ivyBusAddrList, (gethostbyname (hostname()))[4]); foreach my $ivyBusAddr (@ivyBusAddrList) { $addrInIvyBus = 1 unless (grep ($_ != 0, unpack ("CCCC", ($addr & $ivyBusAddr) ^ $addr))); } if ($addrInIvyBus == 0) { warn "bonjour de $peerName ignore, ne fait pas partie des ivyBus\n" if $^W; return; } # ouverture du canal de communication my $appSock = IO::Socket::INET->new (PeerAddr => $peerName, PeerPort => $peerPort, Proto => 'tcp'); if ($appSock) { # on cree la queue et le thread qui vont interfacer les envois # vers cette appli $self->[queueList]->{$appSock} = Thread::Queue->new(); $self->[threadList]->{$appSock} = Thread->new (\&_senderThread, $self, $self->[queueList]->{$appSock}, $appSock); # print "DBG> new thread ${$self->[threadList]->{$appSock}}\n"; # on cree une entree pour $appSock dans la liste des regexp $self->[cnnxion]->{"$addr:$peerPort"} = 1; $self->[sendRegList]{$appSock} = []; $self->[buffByConn]{$appSock} = ''; $self->[sockList]->{$appSock} = $appSock; &$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ; # on balance les regexps qui nous interessent a l'appli distante _sendWantedRegexp ($self, $appSock); } else { warn "Attention Ivy::_getBonjour impossible de se connecter au serveur " . "$peerName:$peerPort\n" ; } } ############### METHODE GET CONNECTIONS sub _getConnections ($) { my $self = shift; my $appSock = $self->[connSock]->accept(); unless (defined $appSock) { warn "Attention Ivy::_getConnections, \$appSock not defined\n"; return; } else { printf "accepting connection from %s:%d\n", (gethostbyaddr ($appSock->peeraddr(),AF_INET))[0], $appSock->peerport() if $^W; } # callback pour traiter la reception des messages # on cree la queue et le thread qui vont interfacer les envois # vers cette appli $self->[queueList]->{$appSock} = Thread::Queue->new(); $self->[threadList]->{$appSock} = Thread->new (\&_senderThread, $self, $self->[queueList]->{$appSock}, $appSock); # print "DBG> new thread ${$self->[threadList]->{$appSock}}\n"; &$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ; # on cree une entree pour $appSock dans la liste des regexp $self->[sendRegList]{$appSock} = []; $self->[buffByConn]{$appSock} = ''; $self->[sockList]->{$appSock} = $appSock; # on balance les regexps qui nous interessent a l'appli distante _sendWantedRegexp ($self, $appSock); } ############### METHODE GET MESSAGES sub _getMessages ($$) { my ($self, $appSock) = @_; my $mess; my $buffer = ''; my ($addr, $peerPort, $senderName); # on recupere le message recv ($appSock, $buffer, 65536, 0) ; unless (length $buffer) { # message null : broken pipe, ça s'est deconnecte a l'autre bout # on vire ce fd de la boucle d'evenements ainsi que # le thread associe. # print ("DBG : _getMessages, recv err, calling removeFileDesc.\n"); # Bon la il faudra un jour clarifier ce bordel, lister toutes # les facons dont un couple d'applis connectee peuevent sortir et # eviter les dead lock qui doivent subsister. if (defined ($localLoopSel)) { # _removeFileDescriptor ($self, $appSock); _closeSenderThread ($self, $appSock); } else { _closeSenderThread ($self, $appSock); } return; } if (length ($self->[buffByConn]{$appSock})) { $buffer = $self->[buffByConn]{$appSock} . $buffer ; $self->[buffByConn]{$appSock} = ''; } my @messages = split ('\n', $buffer) ; $self->[buffByConn]{$appSock} = pop (@messages) unless ($buffer =~ /\n$/) ; # if (defined $appSock->peername) { $addr = $appSock->peeraddr(); $peerPort = $appSock->peerport() ; $senderName = $self->[cnnxion]->{"$addr:$peerPort"} ; $senderName = "NONAME" unless $senderName; foreach my $mess (@messages) { # print "DBG>mess from $senderName *$mess*\n"; # on recupere les 3 champs : le type, le numero de regexp, les valeurs my ($type, $id, $valeurs) = $mess =~ /^(\d+) \s+ (\d+) \002 (.*)/x ; # si ca a chie on rale (warn "Attention Ivy::_getMessages message mal formatte : $mess\n" and return) unless defined $type ; # sinon on fait en fonction du type de message if ($type == MSG) { # M S G # on recupere le couple call back, regexp correspondant # a l'identifiant et on appelle la fonction avec les parametres # traites par la regexp if (my @cb = @{$self->[recCbList][$id]->[1]}) { my $cb = shift @cb; # on split sur ETX my $observer = $self->[recCbList][$id]->[2]; if (defined $observer) { $observer->$cb ($senderName, @cb, split ("\003", $valeurs)) ; } else { &$cb ($senderName, @cb, split ("\003", $valeurs)) ; } } else { #_sendErrorTo ($appSock, "REEGXP ID $id inconnue"); warn ("Attention Ivy::_getMessages reception d'un message " . "MSG : id $id inconnu de $senderName :\n«$mess»"); } } elsif ($type == BYE) { #print "reception d'un bye\n"; $self->_closeSenderThread ($appSock); } elsif ($type == REGEXP) { # R E G E X P # on ajoute une fonction traitant la regexp et envoyant le # message sur le bon fd dans la liste des fonctions de filtrage # ca permet de compiler les regexp avec 'once' donc une # fois pour toute, et ainsi optimiser la vitesse de # filtrage des messages a envoyer next if _toBePruned ($self, $senderName, $valeurs); unless (defined $self->[sendRegList]{$appSock}->[$id]) { # si l'id de regexp n'etait pas utilisee c'est tout bon # on affecte la nouvelle regexp a un id $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_'; sub { use strict; if (my @args = ${$_[0]} =~ /($valeurs)/o) { shift @args; $args[$#args] .= "\003" if @args; my $queue = $self->[queueList]->{$appSock}; return 0 unless defined $queue; $queue->enqueue (\sprintf (MSG_FMT, MSG, $id, join ("\003",@args))); # print join (' ', "DBG > J'envoie MSG", $id, @args, "\n"); return 1; } }; _EOL_ } else { # l'id de la regexp etait deja utilise, # et n'a pas ete libere par un message delregexp, # on renvoie donc un message d'erreur _sendErrorTo ($self, $appSock, "ID $id deja utilisee"); } } elsif ($type == ERROR) { # E R R O R warn ("Attention Ivy::_getMessages ERREUR recue de ". "$senderName : «$valeurs»\n"); } elsif ($type == DELREGEXP) { # D E L R E G E X P # on vire la regexp des regexps vefifier $self->[sendRegList]{$appSock}->[$id] = undef ; } elsif ($type == ENDREGEXP) { # E N D R E G E X P # on envoie le message ready uniquement a celui qui nous # a envoye le message endregexp _sendMsgTo ($self, $appSock, $self->[messWhenReady]); # on passe de l'etat Connecte a l'etat Ready $self->[cnnxion]->{"$addr:$peerPort"} =~ s/^\004//; $senderName = $self->[cnnxion]->{"$addr:$peerPort"}; unless (exists $self->[appliList]{$senderName}) { $self->[appliList]->{$senderName} = [$appSock]; } else { push @{$self->[appliList]->{$senderName}}, $appSock; } _scanConnStatus ($self); } elsif ($type == APP_NAME) { # etat Connecte if (($self->[appName] eq $valeurs) && $^W) { warn "\033[1mATTENTION : Ivy::_getMessages une instance de ". "$self->[appName] existe deja\033[m\n" ; } $senderName = $valeurs; $self->[cnnxion]->{"$addr:$peerPort"} = "\004$valeurs"; } elsif ($type == DIRECT_MSG) { if (defined $self->[directCbList][$id]) { my @cb = @{$self->[directCbList][$id]}; my $cb = shift @cb; &$cb (@cb, $valeurs); } else { _sendErrorTo ($self, $appSock, "DIRECT ID $id inconnue"); warn "Attention Ivy::_getMessages reception d'un message " . "DIRECT d'id $id inconnue de $senderName :\n«$mess»"; } } elsif ($type == DIE) { # il faut quitter # on commence par appeler la callback de fin my @cb = @{$onDieFunc}; my $cb = shift @cb; &$cb (@cb); # on avertit les autres qu'on se barre my $adr = _inetAdrByName ($self, $senderName) ; warn "Attention Ivy::_getMessages reception d'un ordre " . "de suicide de $senderName ($adr)... exiting\n" if $^W; # adios Ivy::exit (); } elsif ($type == PING) { # si on recois un ping, on envoie un pong _pong ($self, $appSock); } elsif ($type == PONG) { return PONG; } else { _sendErrorTo ($self, $appSock, "TYPE DE MESS $type inconnu"); warn ("Attention Ivy::_getMessages reception d'un message " . "de type $type inconnu de $senderName :\n«$mess»"); } } return 0; } ############### METHODE SEND WANTED REGEXP sub _sendWantedRegexp ($$) { my ($self, $appSock) = @_; # on envoie le message "Nom appli" Thread::Queue::enqueue ($self->[queueList]->{$appSock}, \sprintf (MSG_FMT, APP_NAME, 0, $self->[appName])); # on envoie les regexps for (my $id = 0; $id <= $#{$self->[recCbList]}; $id++) { next unless defined $self->[recCbList]->[$id]->[1]->[0]; Thread::Queue::enqueue ($self->[queueList]->{$appSock}, \sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0])); # print sprintf ("DBG > %s %d %s\n", # 'REGEXP', $id, $self->[recCbList][$id]->[0]); } # on envoie le message de fin d'envoi de regexps Thread::Queue::enqueue ($self->[queueList]->{$appSock}, \sprintf (MSG_FMT, ENDREGEXP, 0, "")); } ############### METHODE SEND LAST REGEXP TO ALLREADY CONNECTED sub _sendLastRegexpToAllreadyConnected ($$) { my ($self, $id) = @_; foreach my $fd (values %{$self->[sockList]}) { Thread::Queue::enqueue ($self->[queueList]->{$fd}, \sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id][0])); } } ############### METHODE INET ADR BY NAME sub _inetAdrByName ($$) { my ($self, $appName) = @_; my $addrInet = (grep ($self->[cnnxion]->{$_} eq $appName, keys %{$self->[cnnxion]}))[0]; return ("unknow") unless defined $addrInet; my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/; my $host = (gethostbyaddr ($addr, AF_INET))[0] ; return "$host:$port"; } ############### METHODE REMOVE FILE DESCRIPTOR sub _removeFileDescriptor ($$) { my ($self, $fd) = @_; my $diedAppName; # on s'est deja occupe de lui return unless exists $self->[sockList]->{$fd}; # printf ("DBG> _removeFileDescriptor IN thread %s\n", ${Thread->self}); # on efface les structures de donnees associees au fd # on vire ce fd des fd a scruter dans la bcle d'evenements # uniquement si on est dans le thread principal # sinon le select merde salement sur ce coup &$fileEventFunc ($fd, ''); delete $self->[sendRegList]->{$fd}; delete $self->[sockList]->{$fd}; delete $self->[buffByConn]->{$fd}; delete $self->[queueList]->{$fd}; # on clos la connection $fd->close (); # EXT_LOOP: # foreach my $name (keys %{$self->[appliList]}) { # @{$self->[appliList]{$name}} = # grep ($_ ne $fd, @{$self->[appliList]{$name}}); # if (length (@{$self->[appliList]{$name}}) == 0) { # delete $self->[appliList]->{$name}} ; # } # } EXT_LOOP: foreach my $name (keys %{$self->[appliList]}) { foreach my $fdp (@{$self->[appliList]{$name}}) { if ($fd eq $fdp) { $diedAppName = $name; @{$self->[appliList]{$name}} = grep ($_ ne $fdp, @{$self->[appliList]{$name}}); if (scalar (@{$self->[appliList]{$name}}) == 0) { delete $self->[appliList]->{$name}} ; last EXT_LOOP; } } } unless (defined $diedAppName) { warn "Ivy::_removeFileDescriptor : deconnection de NONAME\n" if $^W; return; } my $addrInet = (grep ($self->[cnnxion]->{$_} eq $diedAppName, keys %{$self->[cnnxion]}))[0]; unless (defined $addrInet) { die "ERREUR _removeFileDescriptor deconnection de $diedAppName ". "addrInet not defined\n"; return; } # printf "DBG> _removeFileDescriptor : deconnection de %s ($diedAppName)\n", _inetAdrByName ($self, $diedAppName); delete $self->[cnnxion]->{$addrInet}; # on vire l'entree correspondant a ce canal dans la liste des # regexps par canal _scanConnStatus ($self) ; } ############### METHODE SEND ERROR TO sub _sendErrorTo ($$$) { my ($self, $fd, $error) = @_; Thread::Queue::enqueue ($self->[queueList]->{$fd}, \join (' ', ERROR, "0\002$error\n")); } ############### METHODE PONG sub _pong ($$) { my ($self, $fd) = @_; Thread::Queue::enqueue ($self->[queueList]->{$fd}, \join (' ', PONG, "0\002\n")); } ############### METHODE SEND ERROR TO sub _sendDieTo ($$) { my ($self, $fd) = @_; Thread::Queue::enqueue ($self->[queueList]->{$fd}, \join (' ', DIE, "0\002\n")); } ############### METHODE SEND MSG TO sub _sendMsgTo ($$$) { my ($self, $fd, $msg) = @_; # pour toutes les fonctions de filtrage de regexp foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { &{$regexpFunc} (\$msg) if defined $regexpFunc; } } ############### METHODE TK FILE EVENT sub _tkFileEvent ($$) { my ($fd, $cb) = @_; Tk::fileevent ('', $fd, 'readable', $cb) ; } ############### METHODE SCAN AFTER sub _scanAfter () { my $stamp = timeofday (); $selectTimout = MAX_TIMOUT; foreach my $afk (keys %afterList) { my $af = $afterList{$afk}; # si ce timer est a declencher if ($af->[2] <= $stamp) { # on traite : le temps de declencher le cb est arrive if (ref $af->[3] eq 'CODE') { &{$af->[3]}; } else { my ($cb, @args) = @{$af->[3]}; &$cb (@args); } # si c'est un repeat on le reconduit if ($af->[0]) { $af->[2] = $stamp + $af->[1] ; $selectTimout = $af->[1] if $af->[1] < $selectTimout; } else { # si c'est un after on le vire afterCancel ($afk); } } else { my $timeTotrigg = $af->[2] - $stamp; $selectTimout = $timeTotrigg if $timeTotrigg < $selectTimout; } } } ############### METHODE SCAN CONN STATUS sub _scanConnStatus ($) { my $self = shift; my (%readyApp, @nonReadyApp); foreach (values %{$self->[cnnxion]}) { next if $_ eq "1"; $readyApp{$_}++ unless /^\004/; # connecte mais pas ready } foreach (@{$self->[neededApp]}) { push (@nonReadyApp, $_) unless exists $readyApp{$_}; } # par compatibilite avec l'ancienne version, on envoie comme # deux premiers arguments une ref sur la liste des applis presentes, # la liste des applis absentes, mais on rajoute comme troisieme # argument une ref sur une table de hash : comme clef les # applis presentes, comme valeur le nombre d'applis ayant ce nom, # de facon a detecter plus facilement quand il y a trop d'applis # de meme nom sur le meme bus. &{$self->[statusFunc]} ([keys %readyApp], \@nonReadyApp, \%readyApp); } ############### METHODE TO BE PRUNED sub _toBePruned ($$$) { my ($self, $from, $regexp) = @_; # si il n'y a pas de liste de sujets, on ne # filtre pas return 0 unless @{$self->[topicRegexps]}; unless ($regexp =~ /^\^/) { #print "DBG> regexp non ANCREE de $from : $regexp\n"; return (0); } if ($regexp =~ /^\^(\w+)/) { my $topic = $1; if (grep (/$topic/, @{$self->[topicRegexps]})) { # on a trouve ce topic : on ne filtre pas la regexp #print "DBG> on garde de $from : $regexp\n"; return (0); } #print "DBG> on ELIMINE de $from : $regexp\n"; return (1); } else { #print "DBG> on garde de $from : $regexp\n"; return (0); } } ############### METHODE PARSE IVY BUS PARAM sub _parseIvyBusParam ($) { my $ivyBus = shift; my ($ivyNetworks, $ivyPort) = $ivyBus =~ /^(.*):(.*)/; die ("Erreur Ivy::_parseIvyBusParam format de l'adresse ou ". "no de port incorrect : $ivyBus\n") unless $ivyPort =~ /^\d+$/; my @ivyAddrInet = (); $ivyNetworks =~ s/ //g; my @broadcastAddrs = split (',', $ivyNetworks); foreach my $netAddr (@broadcastAddrs) { $netAddr = BROADCAST_ADDRS if (($netAddr eq '') || ($netAddr =~ /^127/) || ($netAddr =~ /^loopback/)); # deux cas de figure : on a un nom de sous reseau, ou # une adresse ip de la forme \d+.\d+.... my $netAddrInet; if ($netAddr !~ /^(\d+\.)+\d+/) { # on a un nom de reseau, il faut trouver son adresse ip # on contourne un bug : Si les adresses sont incompletes # dans la map network : 143.196.53 au lieu de 143.196.53.255 # getbyname renvoie une adresse de type 0.143.196.53. # on doit donc faire un decalage des octets vers la gauche, # chaque 0 qui sort a gauche est remplace par un 255 a droite. my $networkAddr = getnetbyname ($netAddr); unless (defined $networkAddr) { warn ("Ivy::_parseIvyBusParam reseau inconnu : $netAddr\n"); next; } my @dummyNetAddr = unpack ("CCCC", pack ('N', $networkAddr)); while (!$dummyNetAddr[0]) { # tant que le premier octet est 0, on decale vers la gauche et # ont fait rentrer un 255 sur la droite shift @dummyNetAddr; push (@dummyNetAddr, 255); } $netAddrInet = pack ("CCCC", @dummyNetAddr); } else { # on a deja une adresse ip, on rajoute les .255 # a la fin s'ils ont etes omis. ($netAddr .= ".255.255.255") =~ s/^((\d+\.){3}\d+).*/$1/; $netAddrInet = inet_aton ($netAddr); } push (@ivyAddrInet, pack_sockaddr_in ($ivyPort, $netAddrInet)); } return ($ivyPort, \@ivyAddrInet); } ############# Methode _ SENDER THREAD sub _senderThread ($$$) { my ($self, $queue, $fd) = @_; my $data; # condition d'arret du thread : # pour l'arreter on enqueue undef. while ($data = $queue->dequeue()) { # si le send echoue, on vire le fd et on sort du thread. # last unless syswrite ($fd, $$data, length ($$data)); last unless send ($fd, $$data, 0); } } ############# Methode _ SENDER THREAD_NON_BLOQUANT # tentative pour pouvoir arreter facilement un thread # bloque sur un send. Mais c'est pas satisfaisant # car quand le fd est de nouveau accessible en ecriture, # les donnees accumulees dans l'accumulateur ne sont # envoyees que lorsque l'on ecrit a nouveau dans # la queue de communication du thread # Je laisse ce code pour info. # sub _senderThread_NON_BLOQUANT ($$$) { # my ($self, $queue, $fd) = @_; # my $data; # my @buff = (); # # on ne bloque plus sur ce fd # fcntl ($fd, F_SETFL, O_NDELAY) ; # EXT_LOOP: # for (;;) { # # condition d'arret du thread : # # pour l'arreter on enqueue undef. # last unless defined ($data = Thread::Queue::dequeue ($queue)); # # on colle les donnees dans un accumulateur # push (@buff, $data); # # tant qu'on peut ecrire sur le fd, on le fait # while ($data = shift @buff) { # unless (syswrite ($fd, $$data, length ($$data))) { # # le write a plante : # if ($! == Errno::EWOULDBLOCK) { # # si c'est parce que le fd est temporairement innacessible # # on remets le message dans l'accumulateur et on se remets # # en attente sur la queue de communication # unshift (@buff, $data); # next EXT_LOOP; # } else { # # sinon c'est que le fd a ete ferme, dans ce cas le thread sort # last EXT_LOOP; # } # } # } # } # print ("DBG> sortie de thread\n"); # } ############# Methode _CLOSE SENDER THREAD sub _closeSenderThread ($$) { my ($self, $fd) = @_; my $queue = $self->[queueList]->{$fd}; # on vide la queue. Oui je sais :-) # printf ("DBG> on vide la queue dans le thread %s\n", ${Thread->self()}); if (defined $queue) { while ($queue->dequeue_nb ()) {}; $queue->enqueue (undef); _removeFileDescriptor ($self, $fd); } # printf ("DBG> demande de join du thread ${$self->[threadList]->{$fd}}\n"); $self->[threadList]->{$fd}->join() if defined $self->[threadList]->{$fd};; # printf ("DBG> done\n"); } ############# Procedure _SUBSTITUTE ESCAPED CHAR sub _substituteEscapedChar ($$) { my ($scope, $reg) = @_; my %escapeRegexp = REG_PERLISSISME; # Si on fait la substitution dans une classe de caractere # on elimine les crochets. grep ($escapeRegexp{$_} =~ s/[\[\]]//g, keys %escapeRegexp) if ($scope eq 'inside') ; $reg =~ s/\\([wWsSdDne])/$escapeRegexp{$1}/ge; return $reg; } 1;