package Bus ; # # # # # # # # require 5.004 ; use Sys::Hostname; use IO::Socket; use strict ; use Time::Gettimeofday ; ############################################################################# #### PROTOTYPES ##### ############################################################################# sub start (@); # debut de l'integration au bus : # - cree la socket d'application, recupere le no de port # - cree la socket supervision # - envoie le "no de port" # - bind le file descriptor de la socket de supervision # a la fonction getBonjour pour traiter les bonjours # - bind le fd de connection sur la fonction # getConnections # pour etablir les connections "application" sub stop (); # - envoie un BYE et clos les connections sub bindRegexp ($$) ; # permet d'associer une regexp avec un callBack # ou d'annuler une precedente association sub bindDirect ($$); # permet d'associer un identifiant de msg direct # avec une fonction de callBack, ou de l'annuler sub sendMsgs (@) ; # envoie une liste de messages sub sendAppNameMsgs (@) ; # envoie une liste de messages precedes # du nom de l'application sub sendDirectMsgs ($$@); # envoie une liste de messages directs a une appli sub sendDieTo ($); # envoie un <> a une appli sub ping ($$); # teste qu'une appli soit encore vivante sub mainLoop (); # la mainloop locale (sans tk) sub after ($$); # temps en millisecondes, callback sub repeat ($$); # temps en millisecondes, callback sub fileEvent ($$); # associe un fd a un callback pour la mainloop locale sub pruneRegexp (@); # optimisation : si l'on connait les sujets des messages # qu'on envoie, on appelle cette fonction avec une # liste de sujets, et les regexps qui ne matchent pas # ce sujet sont eliminees Cette fonction doit etre # appelee avant le Bus::start ################ PRIVEE #################################################### sub _getBonjour (); # lit le (ou les) bonjour(s) sur le canal de supervision # et se connecte, verifie qu'il ne se reponds pas lui # meme, ni qu'il ne repond pas a un service deja connecte sub _getConnections (); # est appele lors d'une demande de connection : # accepte la connection et mets a jour @sendRegList # rajoute le fd du canal aux fd a scruter dans la # boucle d'evenements sub _getMessages ($); # est appele lorqu'un message arrive sub _sendWantedRegexp ($); # envoie les regexp a l'appli distante sub _sendLastRegexpToAllreadyConnected ($) ; # envoie la derniere regexp # pushee dans @recCbList # a toutes les applis deja # connectees sub _removeFileDescriptor ($); # on vire un fd et les structures associees sub _sendErrorTo ($$); #(fd, error) envoie un message d'erreur a un fd sub _sendDieTo ($); #(fd) envoie un message de demande de suicide a un fd sub _sendMsgTo ($$); # (fd, message) sub _pong ($); # (fd) sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk sub _scanAfter () ; # parse si il faut appeler un callback associe a un after sub _myCanRead (); # interface au select sub _scanConnStatus (); # verifie les connections effectuees et # appelle la fonction $statusFunc sub _inetAdrByName ($); # transforme une adresse inet native en chaine $host:$port sub _toBePruned ($$); ############################################################################# #### CONSTANTES ##### ############################################################################# use constant VERSION => 3; use constant MSG_FMT => "%d %d\002%s\n"; # On plante en dur les adresses reseau sur lesquelles on broadcaste # le bonjour, ATTENTION, ca PLANTERA au SITEF !! use constant BROADCAST_ADDRS => ("143.196.1.255", "143.196.2.255") ; use constant BYE => 0; use constant REGEXP => 1; use constant MSG => 2; use constant ERROR => 3; use constant DELREGEXP => 4; use constant ENDREGEXP => 5; use constant APP_NAME => 6; use constant DIRECT_MSG => 7; use constant DIE => 8; use constant PING => 9; use constant PONG => 10; use constant AFTER => 0; use constant REPEAT => 1; ############################################################################# #### VARIABLES GLOBALES ##### ############################################################################# my $messWhenReady; # message a envoyer a un canal lorsqu'on # a recu le message endregexp. my $localAddr; # adresse de la machine locale my $servPort; # No de port tcp du serveur my $appName; # Nom de l'appli locale my @neededApp; # liste des applis necessaires a l'appli locale my $statusFunc; # callback prenant en param 2 refs sur des listes : # [applis presentes, appli absentes] # cette fonction est appelee : # - tout les pollingTime tant que toutes les applis # ne sont pas presentes # - des que toutes les applis sont presentes # - lorsqu'une appli se deconnecte my $onDieFunc; # callback prenant en param 1 refs sur une liste : # [ref sur fonction, parametres] my $supSock ; # socket de supervision en lecture/ecriture my $connSock; # socket de connexion tcp my %sockList = (); # tab ass : nom du fd => fd my %appliList = (); # tab ass : nom de l'appli => fd my %sendRegList = (); # tableau ass de liste du type # sockId => [fonction, fonction, ...] # pour savoir quoi envoyer a qui # les fonctions anonymes sont compilees # dynamiquement a la reception des messages REGEXP # et filtrent les mess a envoyer et les envoient # au besoin my @topicRegexps = (); # liste des topics qu'on envoie si on # les regexps my @recCbList = (); # liste de ref sur des couples # (regexp,callBack) les callbacks # sont appeles lors de # la reception de messages en fonction # du numero de regexp. my @directCbList = (); # liste de callBack pour les messages directs my %cnnxion = (); # tableau ass : clef = nom:numero_de port # permet de verifier qu'on ne se connecte pas # sur nous meme et qu'on ne se reconnecte # pas sur un service en cas de bonjours repetes # valeur : nom de l'application my %buffByConn = (); # tableau associatif, clef => file desc, # valeur :buffer au cas ou la lacture ne se termine # pas par \n my $fileEventFunc; # pointeur sur la fonction permettant d'associer # des callbacks a un file desc, (ainsi que de # les enlever) my $localLoopSel; # dans le cas ou l'on soit dans une mainLoop # locale, cette var pointe une un objet # de type IO::Select; my %localBindByHandle; # table d'ass. handle -> callback my %afterList=(); # tableau d'ass [AFTER ou REPEAT, # timeTotal, deadLine, [callback, arg, arg, ...]] my $afterId = 0; # my $smallestTimout = 10000; # timeout le plus petit pour le select my $maxInstanceOfApp ; # nombre max d'instances de l'appli utilisant # le bus, si il y a deja $maxInstanceOfApp # instances on sort. BEGIN {$SIG{'PIPE'} = sub {warn "broken pipe, ignoring ...\n";}} ############################################################################# #### PROCEDURES PUBLIQUES ##### ############################################################################# ############### PROCEDURE BUS START sub start (@) { my %options = @_; my %defaultOptions = ( #PARAMETRES OBLIGATOIRES -loopMode => undef, # TK ou LOCAL -broadcastPort => $ENV{"BUS"}, # No de port UDP Si non specifie la variable # d'environnement BUS sera examinee -appName => undef, # nom de l'appli -messWhenReady => "_APP NAME READY", # message de synchro a envoyer quand pret # PARAMETRES FACULTATIFS (avec valeurs par defaut) -neededApp => [], # liste des appplis necessaires -statusFunc => sub {}, # fonction de callBack qui sera appelee tant que # toutes les applis necessaires ne sont pas presentes, # et des que toutes les applis necessaires sont # presentes, et si une appli necessaire se deconnecte # les deux parametres passes sont : # [liste des applis presentes], # [liste des applis absentes] -onDieFunc => [sub {}], # fonction de cb appelee lorsque l'appli a recu l'ordre # de quitter, on peut dans ce callback fermer # proprement les ressources avant de sortir. # ps : ne pas fasire d'exit dans le callback, # c'est le bus qui s'en charge ) ; foreach my $opt (keys %defaultOptions) { # si un parametre n'a pas ete defini next if exists $options{$opt} ; # est-il facultatif if (defined $defaultOptions{$opt}) { $options{$opt} = $defaultOptions{$opt} ; } else { # parametre obligatoire die "ERREUR Bus::start vous devez specifier ". "l'option $opt\n"; } } foreach my $opt (keys %options) { die "ERREUR Bus::start option $opt inconnue\n" unless exists ($defaultOptions{$opt}); } my $loopMode = $options{-loopMode}; my $broadcastPort = $options{-broadcastPort} ; $appName = $options{-appName} ; $messWhenReady = $options{-messWhenReady} eq "_APP NAME READY" ? "$appName READY" : $options{-messWhenReady}; @neededApp = @{$options{-neededApp}} ; $statusFunc = $options{-statusFunc} ; $onDieFunc = $options{-onDieFunc} ; if ($loopMode =~ /local/i) { # mode boucle d'evenement locale use IO::Select; $fileEventFunc = \&fileEvent ; $localLoopSel = IO::Select->new (); } elsif ($loopMode =~ /tk/i) { # mode boucle d'evenement de TK $fileEventFunc = \&_tkFileEvent ; } else { die "le premier argument (mainloop mode) doit etre TK ou LOCAL\n"; } my $hostAddr = (gethostbyname (hostname()))[4] ; # cree la socket de connexion, recupere le no de port $connSock = IO::Socket::INET->new(Listen => 128, Proto => 'tcp', Reuse => 1) ; $cnnxion{"$hostAddr:". $connSock->sockport} = "\004"; $supSock = IO::Socket::INET->new(LocalPort => $broadcastPort, Proto => 'udp', Type => SOCK_DGRAM, Reuse => 1); $supSock->sockopt (SO_BROADCAST, 1); # envoie le bonjour : "no de version no de port" my $bonjourMsg = sprintf ("%d %d\n", VERSION, $connSock->sockport()); foreach my $netAddr (BROADCAST_ADDRS) { my $netAddrInet = inet_aton ($netAddr); my $netBroadcastAddr = pack_sockaddr_in ($broadcastPort, $netAddrInet); send ($supSock, $bonjourMsg, 0, $netBroadcastAddr) or warn "Bus::start envoi du bonjour a echoue sur $netAddr : $!\n"; } # callback pour traiter la reception des bonjours &$fileEventFunc ($supSock, \&_getBonjour) ; # callback pour traiter les demandes de cxion &$fileEventFunc ($connSock, \&_getConnections) ; } ############### PROCEDURE BIND REGEXP sub bindRegexp ($$) { my ($regexp, $cb) = @_; if ($cb) { my $id; # on rajoute le couple $regexp, $cb dans la liste des messages # qu'on prend # on commence par tester si on a un id libere dans le tableau for ($id=0; $id <= ($#recCbList+1); $id++) { last unless (defined $recCbList[$id]) && @{$recCbList[$id]->[1]}; } $recCbList[$id] = [$regexp, $cb]; # on envoie les messages regexps aux processus deja connectes _sendLastRegexpToAllreadyConnected ($id) ; } else { # on vire le callback, et on se desabonne de cette regexp for (my $id=0; $id <= $#recCbList; $id++) { next unless (defined $recCbList[$id]) && @{$recCbList[$id]->[1]}; if ($recCbList[$id]->[0] eq $regexp) { $recCbList[$id]->[1] = []; # on envoie le mesage delregexp foreach my $fd (values %sockList) { send ($fd, sprintf (MSG_FMT, DELREGEXP, $id), 0) or _removeFileDescriptor ($fd); } } } } } ############### PROCEDURE BIND REGEXP sub bindDirect ($$) { my ($id, $cb) = @_; if ($cb) { # on rajoute la $cb dans la liste des messages # qu'on prend $directCbList[$id] = $cb; } else { # on vire le callback undef $directCbList[$id]; } } ############### PROCEDURE SEND MSGS sub sendMsgs (@) { my @msgs = @_; my $total = 0; # pour tous les messages foreach my $msg (@msgs) { study ($msg); # pour routes les connections foreach my $fd (keys %sockList) { # pour toutes les fonctions de filtrage de regexp foreach my $regexpFunc (@{$sendRegList{$fd}}) { $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; } } } # print "DBG> sended $total times\n"; return $total; } ############### PROCEDURE SEND MSGS sub sendAppNameMsgs (@) { my @msgs = @_; my $total = 0; # pour tous les messages foreach (@msgs) { my $msg = "$appName $_"; study ($msg); # pour routes les connections foreach my $fd (keys %sockList) { # pour toutes les fonctions de filtrage de regexp foreach my $regexpFunc (@{$sendRegList{$fd}}) { $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; } } } # print "DBG> sended $total times\n"; return $total; } ############### PROCEDURE SEND DIRECT MSGS sub sendDirectMsgs ($$@) { my ($to, $id, @msgs) = @_; if (defined ($appliList{$to})) { my @fds = @{$appliList{$to}}; # pour tous les messages foreach my $msg (@msgs) { foreach my $fd (@fds) { send ($fd, sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"), 0) or _removeFileDescriptor ($fd); } } return 1; } else { warn "Bus::sendDirectMsgs appli $to inconnue\n"; return 0; } } ############### PROCEDURE SEND DIE TO sub sendDieTo ($) { my $to = shift; if (defined ($appliList{$to})) { my @fds = @{$appliList{$to}}; # pour tous les messages foreach my $fd (@fds) { _sendDieTo ($fd); } return 1; } else { warn "Bus::sendDieTo appli $to inconnue\n"; return 0; } } ############### PROCEDURE PING sub ping ($$) { my ($to, $timeout) = @_; if (defined ($appliList{$to})) { my @fds = @{$appliList{$to}}; # pour tous les messages foreach my $fd (@fds) { send ($fd, sprintf (MSG_FMT, PING, 0, " "), 0) or _removeFileDescriptor ($fd); } } } ############### PROCEDURE BUS STOP sub stop () { # pour routes les connections foreach my $fd (values %sockList) { send ($fd, sprintf (MSG_FMT, BYE, 0, ""), 0) or _removeFileDescriptor ($fd); } } ############### PROCEDURE MAINLOOP sub mainLoop () { for (;;) { my @ready = $localLoopSel->can_read ($smallestTimout) ; _scanAfter () ; foreach my $fd (@ready) { if (ref $localBindByHandle{$fd} eq 'CODE') { &{$localBindByHandle{$fd}} ; } else { my ($cb, @arg) = @{$localBindByHandle{$fd}} ; &$cb (@arg) } } } } ############### PROCEDURE AFTER sub after ($$) { my ($timeAfter, $cbListRef) = @_; $timeAfter /= 1000; $smallestTimout = $timeAfter if $timeAfter < $smallestTimout; # si la valeur de timout est negative : c'est un after sinon # c'est un repeat $afterList{++$afterId} = [AFTER, $timeAfter, timeofday()+$timeAfter, $cbListRef]; return ($afterId); } ############### PROCEDURE AFTER sub repeat ($$) { # on passe le temps en secondes pour le select my ($timeAfter, $cbListRef) = @_; $timeAfter /= 1000; $smallestTimout = $timeAfter if $timeAfter < $smallestTimout; $afterList{++$afterId}= [REPEAT, $timeAfter, timeofday()+$timeAfter, $cbListRef]; return ($afterId); } ############### PROCEDURE AFTER CANCEL sub afterCancel ($) { my $id = shift; if (defined ($id) && defined $afterList{$id}) { if ($afterList{$id}->[1] <= $smallestTimout) { delete $afterList{$id} ; # le timout de l'after/repeat etait le plus petit des timout # on cherche donc le plus petit parmi ceux qui restent; $smallestTimout = 10000; foreach my $af (values %afterList) { $smallestTimout = $af->[1] if $af->[1] < $smallestTimout ; } } else { delete $afterList{$id} ; } } } sub pruneRegexp (@) { @topicRegexps = @_; } ############################################################################# #### PROCEDURES PRIVEE ##### ############################################################################# ############### PROCEDURE GET BONJOUR sub _getBonjour () { my $bonjourMsg = ''; # l'hote distant my $inetAddr = $supSock->recv ($bonjourMsg, 1024, 0); unless (length $inetAddr) { warn "recv error, bonjour non traite\n"; return; } my $addr = (unpack_sockaddr_in ($inetAddr))[1]; my $peerName = gethostbyaddr ($addr, AF_INET); # on force $peerPort a etre vu comme une valeur numerique my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/; unless (defined ($version) && defined ($peerPort)) { warn "ERREUR format du message bonjour incorrect\n". "message = $bonjourMsg\n" ; return; } if ($version != VERSION) { warn "ERREUR VERSION: demande de connexion de $peerName\n". "version courrante : " . VERSION . ", recue : $version\n" ; return; } # on verifie qu'on ne se repond pas et qu'on ne # ne reconnecte pas a un process deja connecte if (exists ($cnnxion{"$addr:$peerPort"})) { #print "DBG> : bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ; return ; } else { #print "DBG> : reception de $peerName : bonjour $peerPort\n" ; } $cnnxion{"$addr:$peerPort"} = 1; # ouverture du canal de communication my $appSock = IO::Socket::INET->new (PeerAddr => $peerName, PeerPort => $peerPort, Proto => 'tcp'); if ($appSock) { # on cree une entree pour $appSock dans la liste des regexp $sendRegList{$appSock} = []; $buffByConn{$appSock} = ''; $sockList{$appSock} = $appSock; &$fileEventFunc ($appSock, [\&_getMessages, $appSock]) ; # on balance les regexps qui nous interessent a l'appli distante _sendWantedRegexp ($appSock); } else { warn "Bus::_getBonjour ERROR cannot connect to server " . "$peerName:$peerPort\n" ; } } ############### PROCEDURE GET CONNECTIONS sub _getConnections () { my $appSock = $connSock->accept(); unless (defined $appSock) { warn "ERROR _getConnections, \$appSock not defined\n"; return; } else { #print sprintf ("accepting connection from %s:%d\n", # (gethostbyaddr ($appSock->peeraddr(),AF_INET))[0], # $appSock->peerport()); } # callback pour traiter la reception des messages &$fileEventFunc ($appSock, [\&_getMessages, $appSock]) ; # on cree une entree pour $appSock dans la liste des regexp $sendRegList{$appSock} = []; $buffByConn{$appSock} = ''; $sockList{$appSock} = $appSock; # on balance les regexps qui nous interessent a l'appli distante _sendWantedRegexp ($appSock); } ############### PROCEDURE GET MESSAGES sub _getMessages ($) { my $appSock = shift; my $mess; my $buffer = ''; my ($addr, $peerPort, $senderName); # on recupere le message recv ($appSock, $buffer, 2048, 0) ; unless (length $buffer) { # message null : broken pipe, ça s'est deconnecte a l'autre bout # on vire ce fd de la boucle d'evenements _removeFileDescriptor ($appSock); return; } if (length ($buffByConn{$appSock})) { $buffer = $buffByConn{$appSock} . $buffer ; $buffByConn{$appSock} = ''; } my @messages = split ('\n', $buffer) ; $buffByConn{$appSock} = pop (@messages) unless ($buffer =~ /\n$/) ; # if (defined $appSock->peername) { $addr = $appSock->peeraddr(); $peerPort = $appSock->peerport() ; $senderName = $cnnxion{"$addr:$peerPort"} ; $senderName = "NONAME" unless $senderName; # } else { # warn "\n\nBIG PB : appSock->peername undefined \n\n"; # } foreach my $mess (@messages) { # print "DBG>mess from $senderName *$mess*\n"; # on recupere les 3 champs : le type, le numero de regexp, les valeurs my ($type, $id, $valeurs) = $mess =~ /^(\d+) \s+ (\d+) \002 (.*)/x ; # si ca a chie on rale (warn "malformated message $mess\n" and return) unless defined $type ; # sinon on fait en fonction du type de message if ($type == MSG) { # M S G # on recupere le couple call back, regexp correspondant # a l'identifiant et on appelle la fonction avec les parametres # traites par la regexp if (my @cb = @{$recCbList[$id]->[1]}) { my $cb = shift @cb; # on split sur ETX &$cb ($senderName, @cb, split ("\003", $valeurs)) ; } else { #_sendErrorTo ($appSock, "REEGXP ID $id inconnue"); warn ("reception d'un message MSG : id $id inconnu de " . "$senderName :\n«$mess»"); } } elsif ($type == BYE) { #print "reception d'un bye\n"; _removeFileDescriptor ($appSock); # B Y E } elsif ($type == REGEXP) { # R E G E X P # on ajoute une fonction traitant la regexp et envoyant le # message sur le bon fd dans la liste des fonctions de filtrage # ca permet de compiler les regexp avec 'once' donc une # fois pour toute, et ainsi optimiser la vitesse de # filtrage des messages a envoyer next if _toBePruned ($senderName, $valeurs); unless (defined $sendRegList{$appSock}->[$id]) { # si l'id de regexp n'etait pas utilisee c'est tout bon # on affecte la nouvelle regexp a un id $sendRegList{$appSock}->[$id] = sub { if (my @args = ${$_[0]} =~ /($valeurs)/) { shift @args; $args[$#args] .= "\003" if @args; send ($appSock, sprintf (MSG_FMT, MSG, $id, join ("\003",@args)), 0) or _removeFileDescriptor ($appSock) ; return 1; # print join (' ', "DBG > J'envoie MSG", $id, @args, "\n"); } }; } else { # l'id de la regexp etait deja utilise, # et n'a pas ete libere par un message delregexp, # on renvoie donc un message d'erreur _sendErrorTo ($appSock, "ID $id deja utilisee"); } } elsif ($type == ERROR) { # E R R O R warn ("ERREUR de $senderName : «$valeurs»\n") } elsif ($type == DELREGEXP) { # D E L R E G E X P # on vire la regexp des regexps vefifier $sendRegList{$appSock}->[$id] = undef ; } elsif ($type == ENDREGEXP) { # E N D R E G E X P # on envoie le message ready uniquement a celui qui nous # a envoye le message endregexp _sendMsgTo ($appSock, $messWhenReady); # on passe de l'etat Connecte a l'etat Ready $cnnxion{"$addr:$peerPort"} =~ s/^\004//; my $apName = $cnnxion{"$addr:$peerPort"}; unless (exists $appliList{$apName}) { $appliList{$apName} = [$appSock]; } else { push @{$appliList{$apName}}, $appSock; } _scanConnStatus (); } elsif ($type == APP_NAME) { # etat Connecte if ($appName eq $valeurs){ warn "\033[1mATTENTION : une instance de $appName ". "existe deja\033[m\n" ; } $senderName = $cnnxion{"$addr:$peerPort"} = "\004$valeurs"; } elsif ($type == DIRECT_MSG) { if (defined $directCbList[$id]) { my @cb = @{$directCbList[$id]}; my $cb = shift @cb; &$cb (@cb, $valeurs); } else { _sendErrorTo ($appSock, "DIRECT ID $id inconnue"); warn "reception d'un message DIRECT d'id $id inconnue de " . "$senderName :\n«$mess»"; } } elsif ($type == DIE) { # il faut quitter # on commence par appeler la callback de fin my @cb = @{$onDieFunc}; my $cb = shift @cb; &$cb (@cb); # on avertit les autres qu'on se barre my $adr = _inetAdrByName ($senderName) ; warn "reception d'un ordre de suicide de $senderName ($adr)". "... exiting\n"; stop (); # adios exit (); } elsif ($type == PING) { # si on recois un ping, on envoie un pong _pong ($appSock); } elsif ($type == PONG) { return PONG; } else { _sendErrorTo ($appSock, "TYPE DE MESS $type inconnu"); warn ("reception d'un message de type $type inconnu de " . "$senderName :\n«$mess»"); } } return 0; } ############### PROCEDURE SEND WANTED REGEXP sub _sendWantedRegexp ($) { my $appSock = shift; # on envoie le message "Nom appli" send ($appSock, sprintf (MSG_FMT, APP_NAME, 0, $appName), 0) or _removeFileDescriptor ($appSock) ; # on envoie les regexps for (my $id = 0; $id <= $#recCbList; $id++) { next unless defined $recCbList[$id] ; send ($appSock , sprintf (MSG_FMT, REGEXP, $id, $recCbList[$id]->[0]), 0) or _removeFileDescriptor ($appSock) ; # print sprintf ("DBG > %s %d %s\n", # 'REGEXP', $id, $recCbList[$id]->[0]); } # on envoie le message de fin d'envoi de regexps send ($appSock, sprintf (MSG_FMT, ENDREGEXP, 0, ""), 0) or _removeFileDescriptor ($appSock) ; } ############### PROCEDURE SEND LAST REGEXP TO ALLREADY CONNECTED sub _sendLastRegexpToAllreadyConnected ($) { my $id = shift; foreach my $fd (values %sockList) { send ($fd, sprintf (MSG_FMT, REGEXP, $id, $recCbList[$id]->[0]), 0) or _removeFileDescriptor ($fd) ; } } ############### PROCEDURE INET ADR BY NAME sub _inetAdrByName ($) { my $appName = shift; my $addrInet = (grep ($cnnxion{$_} eq $appName, keys %cnnxion))[0]; return ("unknow") unless defined $addrInet; my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/; my $host = (gethostbyaddr ($addr, AF_INET))[0] ; return "$host:$port"; } ############### PROCEDURE REMOVE FILE DESCRIPTOR sub _removeFileDescriptor ($) { my $fd = shift; my $diedAppName; # on vire ce fd des fd a scruter dans la bcle d'evenements &$fileEventFunc ($fd, '') ; # on clos la connection delete $sendRegList{$fd}; delete $sockList{$fd}; close ($fd); EXT_LOOP: foreach my $name (keys %appliList) { foreach my $fdp (@{$appliList{$name}}) { if ($fd eq $fdp) { $diedAppName = $name; @{$appliList{$name}} = grep ($_ ne $fdp, @{$appliList{$name}}); last EXT_LOOP; } } } unless (defined $diedAppName) { print "_removeFileDescriptor : deconnection de NONAME\n" ; return; } my $addrInet = (grep ($cnnxion{$_} eq $diedAppName, keys %cnnxion))[0]; unless (defined $addrInet) { die "ERREUR _removeFileDescriptor deconnection de $diedAppName ". "addrInet not defined\n"; return; } #printf "deconnection de %s ($diedAppName)\n", _inetAdrByName ($diedAppName); delete $cnnxion{$addrInet}; # on vire l'entree correspondant a ce canal dans la liste des # regexps par canal _scanConnStatus () ; } ############### PROCEDURE SEND ERROR TO sub _sendErrorTo ($$) { my ($fd, $error) = @_; send ($fd, join (' ', ERROR, "0\002$error\n"), 0) or _removeFileDescriptor ($fd); } ############### PROCEDURE PONG sub _pong ($) { my $fd = shift; send ($fd, join (' ', PONG, "0\002 \n"), 0) or _removeFileDescriptor ($fd); } ############### PROCEDURE SEND ERROR TO sub _sendDieTo ($) { my $fd = shift; send ($fd, join (' ', DIE, "0\002\n"), 0) or _removeFileDescriptor ($fd); } ############### PROCEDURE SEND MSG TO sub _sendMsgTo ($$) { my ($fd, $msg) = @_; # pour toutes les fonctions de filtrage de regexp foreach my $regexpFunc (@{$sendRegList{$fd}}) { &{$regexpFunc} (\$msg) if defined $regexpFunc; } } ############### PROCEDURE TK FILE EVENT sub _tkFileEvent ($$) { my ($fd, $cb) = @_; Tk::fileevent ('', $fd, 'readable', $cb) ; } ############### PROCEDURE BUS FILE EVENT sub fileEvent ($$) { my ($fd, $cb) = @_; if ($cb) { # adding the handler $localBindByHandle{$fd} = $cb; $localLoopSel->add ($fd); } else { # deleting the handler delete $localBindByHandle{$fd}; $localLoopSel->remove ($fd); } } ############### PROCEDURE SCAN AFTER sub _scanAfter () { my $stamp = timeofday (); foreach my $afk (keys %afterList) { my $af = $afterList{$afk}; if ($af->[2] <= $stamp) { # on traite : le temps de declencher le cb est arrive if (ref $af->[3] eq 'CODE') { &{$af->[3]}; } else { my ($cb, @args) = @{$af->[3]}; &$cb (@args); } # si c'est un repeat on le reconduit if ($af->[0]) { $af->[2] = $stamp + $af->[1] ; } else { # si c'est un after on le vire afterCancel ($afk); } } } } ############### PROCEDURE SCAN CONN STATUS sub _scanConnStatus () { my (%readyApp, @nonReadyApp); foreach (values %cnnxion) { $readyApp{$_} = 1 unless /^\004/; # connecte mais pas ready } foreach (@neededApp) { push (@nonReadyApp, $_) unless exists $readyApp{$_}; } &$statusFunc ([keys %readyApp], \@nonReadyApp); } ############### PROCEDURE TO BE PRUNED sub _toBePruned ($$) { my ($from, $regexp) = @_; # si il n'y a pas de liste de sujets, on ne # filtre pas return 0 unless @topicRegexps; unless ($regexp =~ /^\^/) { #print "DBG> regexp non ANCREE de $from : $regexp\n"; return (0); } if ($regexp =~ /^\^(\w+)/) { my $topic = $1; if (grep (/$topic/, @topicRegexps)) { # on a trouve ce topic : on ne filtre pas la regexp #print "DBG> on garde de $from : $regexp\n"; return (0); } #print "DBG> on ELIMINE de $from : $regexp\n"; return (1); } else { #print "DBG> on garde de $from : $regexp\n"; return (0); } } 1;