From 0d266d5b023baf08554ac528ee70d5a4edffe110 Mon Sep 17 00:00:00 2001 From: damiano Date: Fri, 4 Sep 1998 14:23:41 +0000 Subject: Initial revision --- Ivy.pm | 1026 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1026 insertions(+) create mode 100644 Ivy.pm (limited to 'Ivy.pm') diff --git a/Ivy.pm b/Ivy.pm new file mode 100644 index 0000000..76ebcb8 --- /dev/null +++ b/Ivy.pm @@ -0,0 +1,1026 @@ +package Bus ; +# +# +# +# +# +# +# +# +require 5.004 ; + +use Sys::Hostname; +use IO::Socket; +use strict ; +use Time::Gettimeofday ; + +############################################################################# +#### PROTOTYPES ##### +############################################################################# +sub start (@); # debut de l'integration au bus : + # - cree la socket d'application, recupere le no de port + # - cree la socket supervision + # - envoie le "no de port" + # - bind le file descriptor de la socket de supervision + # a la fonction getBonjour pour traiter les bonjours + # - bind le fd de connection sur la fonction + # getConnections + # pour etablir les connections "application" + +sub stop (); # - envoie un BYE et clos les connections + +sub bindRegexp ($$) ; # permet d'associer une regexp avec un callBack + # ou d'annuler une precedente association + +sub bindDirect ($$); # permet d'associer un identifiant de msg direct + # avec une fonction de callBack, ou de l'annuler + +sub sendMsgs (@) ; # envoie une liste de messages +sub sendAppNameMsgs (@) ; # envoie une liste de messages precedes + # du nom de l'application +sub sendDirectMsgs ($$@); # envoie une liste de messages directs a une appli +sub sendDieTo ($); # envoie un <> a une appli +sub ping ($$); # teste qu'une appli soit encore vivante +sub mainLoop (); # la mainloop locale (sans tk) +sub after ($$); # temps en millisecondes, callback +sub repeat ($$); # temps en millisecondes, callback +sub fileEvent ($$); # associe un fd a un callback pour la mainloop locale +sub pruneRegexp (@); # optimisation : si l'on connait les sujets des messages + # qu'on envoie, on appelle cette fonction avec une + # liste de sujets, et les regexps qui ne matchent pas + # ce sujet sont eliminees Cette fonction doit etre + # appelee avant le Bus::start + +################ PRIVEE #################################################### +sub _getBonjour (); # lit le (ou les) bonjour(s) sur le canal de supervision + # et se connecte, verifie qu'il ne se reponds pas lui + # meme, ni qu'il ne repond pas a un service deja connecte + +sub _getConnections (); # est appele lors d'une demande de connection : + # accepte la connection et mets a jour @sendRegList + # rajoute le fd du canal aux fd a scruter dans la + # boucle d'evenements + +sub _getMessages ($); # est appele lorqu'un message arrive + +sub _sendWantedRegexp ($); # envoie les regexp a l'appli distante + +sub _sendLastRegexpToAllreadyConnected ($) ; # envoie la derniere regexp + # pushee dans @recCbList + # a toutes les applis deja + # connectees +sub _removeFileDescriptor ($); # on vire un fd et les structures associees +sub _sendErrorTo ($$); #(fd, error) envoie un message d'erreur a un fd +sub _sendDieTo ($); #(fd) envoie un message de demande de suicide a un fd +sub _sendMsgTo ($$); # (fd, message) +sub _pong ($); # (fd) +sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk +sub _scanAfter () ; # parse si il faut appeler un callback associe a un after +sub _myCanRead (); # interface au select +sub _scanConnStatus (); # verifie les connections effectuees et + # appelle la fonction $statusFunc +sub _inetAdrByName ($); # transforme une adresse inet native en chaine $host:$port +sub _toBePruned ($$); +############################################################################# +#### CONSTANTES ##### +############################################################################# +use constant VERSION => 3; +use constant MSG_FMT => "%d %d\002%s\n"; + +# On plante en dur les adresses reseau sur lesquelles on broadcaste +# le bonjour, ATTENTION, ca PLANTERA au SITEF !! +use constant BROADCAST_ADDRS => ("143.196.1.255", "143.196.2.255") ; + +use constant BYE => 0; +use constant REGEXP => 1; +use constant MSG => 2; +use constant ERROR => 3; +use constant DELREGEXP => 4; +use constant ENDREGEXP => 5; +use constant APP_NAME => 6; +use constant DIRECT_MSG => 7; +use constant DIE => 8; +use constant PING => 9; +use constant PONG => 10; + + +use constant AFTER => 0; +use constant REPEAT => 1; +############################################################################# +#### VARIABLES GLOBALES ##### +############################################################################# +my $messWhenReady; # message a envoyer a un canal lorsqu'on + # a recu le message endregexp. + +my $localAddr; # adresse de la machine locale + +my $servPort; # No de port tcp du serveur + +my $appName; # Nom de l'appli locale + +my @neededApp; # liste des applis necessaires a l'appli locale + +my $statusFunc; # callback prenant en param 2 refs sur des listes : + # [applis presentes, appli absentes] + # cette fonction est appelee : + # - tout les pollingTime tant que toutes les applis + # ne sont pas presentes + # - des que toutes les applis sont presentes + # - lorsqu'une appli se deconnecte + +my $onDieFunc; # callback prenant en param 1 refs sur une liste : + # [ref sur fonction, parametres] + +my $supSock ; # socket de supervision en lecture/ecriture +my $connSock; # socket de connexion tcp + +my %sockList = (); # tab ass : nom du fd => fd +my %appliList = (); # tab ass : nom de l'appli => fd + +my %sendRegList = (); # tableau ass de liste du type + # sockId => [fonction, fonction, ...] + # pour savoir quoi envoyer a qui + # les fonctions anonymes sont compilees + # dynamiquement a la reception des messages REGEXP + # et filtrent les mess a envoyer et les envoient + # au besoin + +my @topicRegexps = (); # liste des topics qu'on envoie si on + # les regexps + +my @recCbList = (); # liste de ref sur des couples + # (regexp,callBack) les callbacks + # sont appeles lors de + # la reception de messages en fonction + # du numero de regexp. + +my @directCbList = (); # liste de callBack pour les messages directs + +my %cnnxion = (); # tableau ass : clef = nom:numero_de port + # permet de verifier qu'on ne se connecte pas + # sur nous meme et qu'on ne se reconnecte + # pas sur un service en cas de bonjours repetes + # valeur : nom de l'application + +my %buffByConn = (); # tableau associatif, clef => file desc, + # valeur :buffer au cas ou la lacture ne se termine + # pas par \n + +my $fileEventFunc; # pointeur sur la fonction permettant d'associer + # des callbacks a un file desc, (ainsi que de + # les enlever) + + +my $localLoopSel; # dans le cas ou l'on soit dans une mainLoop + # locale, cette var pointe une un objet + # de type IO::Select; + +my %localBindByHandle; # table d'ass. handle -> callback +my %afterList=(); # tableau d'ass [AFTER ou REPEAT, + # timeTotal, deadLine, [callback, arg, arg, ...]] + +my $afterId = 0; # + +my $smallestTimout = 10000; # timeout le plus petit pour le select + +my $maxInstanceOfApp ; # nombre max d'instances de l'appli utilisant + # le bus, si il y a deja $maxInstanceOfApp + # instances on sort. + +BEGIN {$SIG{'PIPE'} = sub {warn "broken pipe, ignoring ...\n";}} + +############################################################################# +#### PROCEDURES PUBLIQUES ##### +############################################################################# + + + +############### PROCEDURE BUS START +sub start (@) +{ + my %options = @_; + my %defaultOptions = ( #PARAMETRES OBLIGATOIRES + -loopMode => undef, + # TK ou LOCAL + + -broadcastPort => $ENV{"BUS"}, + # No de port UDP Si non specifie la variable + # d'environnement BUS sera examinee + + -appName => undef, + # nom de l'appli + + -messWhenReady => "_APP NAME READY", + # message de synchro a envoyer quand pret + + # PARAMETRES FACULTATIFS (avec valeurs par defaut) + -neededApp => [], + # liste des appplis necessaires + + -statusFunc => sub {}, + # fonction de callBack qui sera appelee tant que + # toutes les applis necessaires ne sont pas presentes, + # et des que toutes les applis necessaires sont + # presentes, et si une appli necessaire se deconnecte + # les deux parametres passes sont : + # [liste des applis presentes], + # [liste des applis absentes] + + -onDieFunc => [sub {}], + # fonction de cb appelee lorsque l'appli a recu l'ordre + # de quitter, on peut dans ce callback fermer + # proprement les ressources avant de sortir. + # ps : ne pas fasire d'exit dans le callback, + # c'est le bus qui s'en charge + ) ; + + + foreach my $opt (keys %defaultOptions) { + # si un parametre n'a pas ete defini + next if exists $options{$opt} ; + # est-il facultatif + if (defined $defaultOptions{$opt}) { + $options{$opt} = $defaultOptions{$opt} ; + } else { + # parametre obligatoire + die "ERREUR Bus::start vous devez specifier ". + "l'option $opt\n"; + } + } + + foreach my $opt (keys %options) { + die "ERREUR Bus::start option $opt inconnue\n" unless + exists ($defaultOptions{$opt}); + } + + my $loopMode = $options{-loopMode}; + my $broadcastPort = $options{-broadcastPort} ; + $appName = $options{-appName} ; + $messWhenReady = $options{-messWhenReady} eq "_APP NAME READY" ? + "$appName READY" : $options{-messWhenReady}; + @neededApp = @{$options{-neededApp}} ; + $statusFunc = $options{-statusFunc} ; + $onDieFunc = $options{-onDieFunc} ; + + if ($loopMode =~ /local/i) { + # mode boucle d'evenement locale + use IO::Select; + $fileEventFunc = \&fileEvent ; + $localLoopSel = IO::Select->new (); + } elsif ($loopMode =~ /tk/i) { + # mode boucle d'evenement de TK + $fileEventFunc = \&_tkFileEvent ; + } else { + die "le premier argument (mainloop mode) doit etre TK ou LOCAL\n"; + } + + my $hostAddr = (gethostbyname (hostname()))[4] ; + + # cree la socket de connexion, recupere le no de port + $connSock = IO::Socket::INET->new(Listen => 128, + Proto => 'tcp', + Reuse => 1) ; + $cnnxion{"$hostAddr:". $connSock->sockport} = "\004"; + + + $supSock = IO::Socket::INET->new(LocalPort => $broadcastPort, + Proto => 'udp', + Type => SOCK_DGRAM, + Reuse => 1); + + $supSock->sockopt (SO_BROADCAST, 1); + + + # envoie le bonjour : "no de version no de port" + my $bonjourMsg = sprintf ("%d %d\n", VERSION, $connSock->sockport()); + + + foreach my $netAddr (BROADCAST_ADDRS) { + my $netAddrInet = inet_aton ($netAddr); + my $netBroadcastAddr = pack_sockaddr_in ($broadcastPort, $netAddrInet); + send ($supSock, $bonjourMsg, 0, $netBroadcastAddr) or + warn "Bus::start envoi du bonjour a echoue sur $netAddr : $!\n"; + } + # callback pour traiter la reception des bonjours + &$fileEventFunc ($supSock, \&_getBonjour) ; + + # callback pour traiter les demandes de cxion + &$fileEventFunc ($connSock, \&_getConnections) ; +} + + +############### PROCEDURE BIND REGEXP +sub bindRegexp ($$) +{ + my ($regexp, $cb) = @_; + + if ($cb) { + my $id; + # on rajoute le couple $regexp, $cb dans la liste des messages + # qu'on prend + + # on commence par tester si on a un id libere dans le tableau + for ($id=0; $id <= ($#recCbList+1); $id++) { + last unless (defined $recCbList[$id]) && @{$recCbList[$id]->[1]}; + } + $recCbList[$id] = [$regexp, $cb]; + + # on envoie les messages regexps aux processus deja connectes + _sendLastRegexpToAllreadyConnected ($id) ; + } else { + # on vire le callback, et on se desabonne de cette regexp + for (my $id=0; $id <= $#recCbList; $id++) { + next unless (defined $recCbList[$id]) && @{$recCbList[$id]->[1]}; + if ($recCbList[$id]->[0] eq $regexp) { + $recCbList[$id]->[1] = []; + # on envoie le mesage delregexp + foreach my $fd (values %sockList) { + send ($fd, sprintf (MSG_FMT, DELREGEXP, $id), 0) + or _removeFileDescriptor ($fd); + } + } + } + } +} + +############### PROCEDURE BIND REGEXP +sub bindDirect ($$) +{ + my ($id, $cb) = @_; + + if ($cb) { + # on rajoute la $cb dans la liste des messages + # qu'on prend + $directCbList[$id] = $cb; + } else { + # on vire le callback + undef $directCbList[$id]; + } +} + + + +############### PROCEDURE SEND MSGS +sub sendMsgs (@) +{ + my @msgs = @_; + my $total = 0; + # pour tous les messages + foreach my $msg (@msgs) { + study ($msg); + + # pour routes les connections + foreach my $fd (keys %sockList) { + + # pour toutes les fonctions de filtrage de regexp + foreach my $regexpFunc (@{$sendRegList{$fd}}) { + $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; + } + } + } +# print "DBG> sended $total times\n"; + return $total; +} + +############### PROCEDURE SEND MSGS +sub sendAppNameMsgs (@) +{ + my @msgs = @_; + my $total = 0; + # pour tous les messages + foreach (@msgs) { + my $msg = "$appName $_"; + study ($msg); + + # pour routes les connections + foreach my $fd (keys %sockList) { + + # pour toutes les fonctions de filtrage de regexp + foreach my $regexpFunc (@{$sendRegList{$fd}}) { + $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; + } + } + } +# print "DBG> sended $total times\n"; + return $total; +} + + + +############### PROCEDURE SEND DIRECT MSGS +sub sendDirectMsgs ($$@) +{ + my ($to, $id, @msgs) = @_; + + if (defined ($appliList{$to})) { + my @fds = @{$appliList{$to}}; + # pour tous les messages + foreach my $msg (@msgs) { + foreach my $fd (@fds) { + send ($fd, sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"), 0) + or _removeFileDescriptor ($fd); + } + } + return 1; + } else { + warn "Bus::sendDirectMsgs appli $to inconnue\n"; + return 0; + } +} + + +############### PROCEDURE SEND DIE TO +sub sendDieTo ($) +{ + my $to = shift; + + if (defined ($appliList{$to})) { + my @fds = @{$appliList{$to}}; + # pour tous les messages + foreach my $fd (@fds) { + _sendDieTo ($fd); + } + return 1; + } else { + warn "Bus::sendDieTo appli $to inconnue\n"; + return 0; + } +} + + +############### PROCEDURE PING +sub ping ($$) +{ + my ($to, $timeout) = @_; + + if (defined ($appliList{$to})) { + my @fds = @{$appliList{$to}}; + # pour tous les messages + foreach my $fd (@fds) { + send ($fd, sprintf (MSG_FMT, PING, 0, " "), 0) + or _removeFileDescriptor ($fd); + } + } +} + + +############### PROCEDURE BUS STOP +sub stop () +{ + # pour routes les connections + foreach my $fd (values %sockList) { + send ($fd, sprintf (MSG_FMT, BYE, 0, ""), 0) + or _removeFileDescriptor ($fd); + } +} + + +############### PROCEDURE MAINLOOP +sub mainLoop () +{ + for (;;) { + my @ready = $localLoopSel->can_read ($smallestTimout) ; + _scanAfter () ; + foreach my $fd (@ready) { + if (ref $localBindByHandle{$fd} eq 'CODE') { + &{$localBindByHandle{$fd}} ; + } else { + my ($cb, @arg) = @{$localBindByHandle{$fd}} ; + &$cb (@arg) + } + } + } +} + + +############### PROCEDURE AFTER +sub after ($$) +{ + my ($timeAfter, $cbListRef) = @_; + $timeAfter /= 1000; + $smallestTimout = $timeAfter if $timeAfter < $smallestTimout; + + # si la valeur de timout est negative : c'est un after sinon + # c'est un repeat + $afterList{++$afterId} = [AFTER, $timeAfter, + timeofday()+$timeAfter, $cbListRef]; + + return ($afterId); +} + +############### PROCEDURE AFTER +sub repeat ($$) +{ + # on passe le temps en secondes pour le select + my ($timeAfter, $cbListRef) = @_; + $timeAfter /= 1000; + $smallestTimout = $timeAfter if $timeAfter < $smallestTimout; + + $afterList{++$afterId}= [REPEAT, $timeAfter, timeofday()+$timeAfter, + $cbListRef]; + return ($afterId); +} + +############### PROCEDURE AFTER CANCEL +sub afterCancel ($) +{ + my $id = shift; + + if (defined ($id) && defined $afterList{$id}) { + if ($afterList{$id}->[1] <= $smallestTimout) { + delete $afterList{$id} ; + # le timout de l'after/repeat etait le plus petit des timout + # on cherche donc le plus petit parmi ceux qui restent; + $smallestTimout = 10000; + foreach my $af (values %afterList) { + $smallestTimout = $af->[1] if $af->[1] < $smallestTimout ; + } + } else { + delete $afterList{$id} ; + } + } +} + +sub pruneRegexp (@) +{ + @topicRegexps = @_; +} + +############################################################################# +#### PROCEDURES PRIVEE ##### +############################################################################# + + +############### PROCEDURE GET BONJOUR +sub _getBonjour () +{ + my $bonjourMsg = ''; + + # l'hote distant + my $inetAddr = $supSock->recv ($bonjourMsg, 1024, 0); + unless (length $inetAddr) { + warn "recv error, bonjour non traite\n"; + return; + } + my $addr = (unpack_sockaddr_in ($inetAddr))[1]; + my $peerName = gethostbyaddr ($addr, AF_INET); + + # on force $peerPort a etre vu comme une valeur numerique + my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/; + + unless (defined ($version) && defined ($peerPort)) { + warn "ERREUR format du message bonjour incorrect\n". + "message = $bonjourMsg\n" ; + return; + } + if ($version != VERSION) { + warn "ERREUR VERSION: demande de connexion de $peerName\n". + "version courrante : " . VERSION . ", recue : $version\n" ; + return; + } + + # on verifie qu'on ne se repond pas et qu'on ne + # ne reconnecte pas a un process deja connecte + if (exists ($cnnxion{"$addr:$peerPort"})) { + #print "DBG> : bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ; + return ; + } else { + #print "DBG> : reception de $peerName : bonjour $peerPort\n" ; + } + $cnnxion{"$addr:$peerPort"} = 1; + + # ouverture du canal de communication + my $appSock = IO::Socket::INET->new (PeerAddr => $peerName, + PeerPort => $peerPort, + Proto => 'tcp'); + + if ($appSock) { + # on cree une entree pour $appSock dans la liste des regexp + $sendRegList{$appSock} = []; + $buffByConn{$appSock} = ''; + $sockList{$appSock} = $appSock; + &$fileEventFunc ($appSock, [\&_getMessages, $appSock]) ; + + # on balance les regexps qui nous interessent a l'appli distante + _sendWantedRegexp ($appSock); + } else { + warn "Bus::_getBonjour ERROR cannot connect to server " . + "$peerName:$peerPort\n" ; + } +} + + +############### PROCEDURE GET CONNECTIONS +sub _getConnections () +{ + my $appSock = $connSock->accept(); + + unless (defined $appSock) { + warn "ERROR _getConnections, \$appSock not defined\n"; + return; + } else { + #print sprintf ("accepting connection from %s:%d\n", + # (gethostbyaddr ($appSock->peeraddr(),AF_INET))[0], + # $appSock->peerport()); + } + + # callback pour traiter la reception des messages + &$fileEventFunc ($appSock, [\&_getMessages, $appSock]) ; + + # on cree une entree pour $appSock dans la liste des regexp + $sendRegList{$appSock} = []; + $buffByConn{$appSock} = ''; + $sockList{$appSock} = $appSock; + # on balance les regexps qui nous interessent a l'appli distante + _sendWantedRegexp ($appSock); +} + + +############### PROCEDURE GET MESSAGES +sub _getMessages ($) +{ + my $appSock = shift; + my $mess; + my $buffer = ''; + my ($addr, $peerPort, $senderName); + + # on recupere le message + recv ($appSock, $buffer, 2048, 0) ; + unless (length $buffer) { + # message null : broken pipe, ça s'est deconnecte a l'autre bout + # on vire ce fd de la boucle d'evenements + _removeFileDescriptor ($appSock); + return; + } + + if (length ($buffByConn{$appSock})) { + $buffer = $buffByConn{$appSock} . $buffer ; + $buffByConn{$appSock} = ''; + } + my @messages = split ('\n', $buffer) ; + $buffByConn{$appSock} = pop (@messages) unless ($buffer =~ /\n$/) ; + +# if (defined $appSock->peername) { + $addr = $appSock->peeraddr(); + $peerPort = $appSock->peerport() ; + $senderName = $cnnxion{"$addr:$peerPort"} ; + $senderName = "NONAME" unless $senderName; + +# } else { +# warn "\n\nBIG PB : appSock->peername undefined \n\n"; +# } + + foreach my $mess (@messages) { +# print "DBG>mess from $senderName *$mess*\n"; + + # on recupere les 3 champs : le type, le numero de regexp, les valeurs + my ($type, $id, $valeurs) = $mess =~ /^(\d+) + \s+ + (\d+) + \002 + (.*)/x ; + + # si ca a chie on rale + (warn "malformated message $mess\n" and return) unless + defined $type ; + + # sinon on fait en fonction du type de message + if ($type == MSG) { # M S G + # on recupere le couple call back, regexp correspondant + # a l'identifiant et on appelle la fonction avec les parametres + # traites par la regexp + if (my @cb = @{$recCbList[$id]->[1]}) { + my $cb = shift @cb; + # on split sur ETX + &$cb ($senderName, @cb, split ("\003", $valeurs)) ; + } else { + #_sendErrorTo ($appSock, "REEGXP ID $id inconnue"); + warn ("reception d'un message MSG : id $id inconnu de " . + "$senderName :\n«$mess»"); + } + } elsif ($type == BYE) { + #print "reception d'un bye\n"; + _removeFileDescriptor ($appSock); # B Y E + } elsif ($type == REGEXP) { # R E G E X P + # on ajoute une fonction traitant la regexp et envoyant le + # message sur le bon fd dans la liste des fonctions de filtrage + # ca permet de compiler les regexp avec 'once' donc une + # fois pour toute, et ainsi optimiser la vitesse de + # filtrage des messages a envoyer + next if _toBePruned ($senderName, $valeurs); + unless (defined $sendRegList{$appSock}->[$id]) { + # si l'id de regexp n'etait pas utilisee c'est tout bon + # on affecte la nouvelle regexp a un id + $sendRegList{$appSock}->[$id] = sub { + if (my @args = ${$_[0]} =~ /($valeurs)/) { + shift @args; + $args[$#args] .= "\003" if @args; + send ($appSock, sprintf (MSG_FMT, + MSG, $id, join ("\003",@args)), 0) + or _removeFileDescriptor ($appSock) ; + return 1; +# print join (' ', "DBG > J'envoie MSG", $id, @args, "\n"); + } + }; + } else { + # l'id de la regexp etait deja utilise, + # et n'a pas ete libere par un message delregexp, + # on renvoie donc un message d'erreur + _sendErrorTo ($appSock, "ID $id deja utilisee"); + } + } elsif ($type == ERROR) { # E R R O R + warn ("ERREUR de $senderName : «$valeurs»\n") + } elsif ($type == DELREGEXP) { # D E L R E G E X P + # on vire la regexp des regexps vefifier + $sendRegList{$appSock}->[$id] = undef ; + } elsif ($type == ENDREGEXP) { # E N D R E G E X P + # on envoie le message ready uniquement a celui qui nous + # a envoye le message endregexp + _sendMsgTo ($appSock, $messWhenReady); + # on passe de l'etat Connecte a l'etat Ready + $cnnxion{"$addr:$peerPort"} =~ s/^\004//; + my $apName = $cnnxion{"$addr:$peerPort"}; + unless (exists $appliList{$apName}) { + $appliList{$apName} = [$appSock]; + } else { + push @{$appliList{$apName}}, $appSock; + } + _scanConnStatus (); + } elsif ($type == APP_NAME) { + # etat Connecte + if ($appName eq $valeurs){ + warn "\033[1mATTENTION : une instance de $appName ". + "existe deja\033[m\n" ; + } + $senderName = $cnnxion{"$addr:$peerPort"} = "\004$valeurs"; + } elsif ($type == DIRECT_MSG) { + if (defined $directCbList[$id]) { + my @cb = @{$directCbList[$id]}; + my $cb = shift @cb; + &$cb (@cb, $valeurs); + } else { + _sendErrorTo ($appSock, "DIRECT ID $id inconnue"); + warn "reception d'un message DIRECT d'id $id inconnue de " . "$senderName :\n«$mess»"; + } + } elsif ($type == DIE) { + # il faut quitter + # on commence par appeler la callback de fin + my @cb = @{$onDieFunc}; + my $cb = shift @cb; + &$cb (@cb); + # on avertit les autres qu'on se barre + my $adr = _inetAdrByName ($senderName) ; + warn "reception d'un ordre de suicide de $senderName ($adr)". + "... exiting\n"; + stop (); + # adios + exit (); + } elsif ($type == PING) { + # si on recois un ping, on envoie un pong + _pong ($appSock); + } elsif ($type == PONG) { + return PONG; + } else { + _sendErrorTo ($appSock, "TYPE DE MESS $type inconnu"); + warn ("reception d'un message de type $type inconnu de " . + "$senderName :\n«$mess»"); + } + } +return 0; +} + +############### PROCEDURE SEND WANTED REGEXP +sub _sendWantedRegexp ($) +{ + my $appSock = shift; + + # on envoie le message "Nom appli" + send ($appSock, sprintf (MSG_FMT, APP_NAME, 0, $appName), 0) + or _removeFileDescriptor ($appSock) ; + + # on envoie les regexps + for (my $id = 0; $id <= $#recCbList; $id++) { + next unless defined $recCbList[$id] ; + send ($appSock , + sprintf (MSG_FMT, REGEXP, $id, $recCbList[$id]->[0]), + 0) or _removeFileDescriptor ($appSock) ; +# print sprintf ("DBG > %s %d %s\n", +# 'REGEXP', $id, $recCbList[$id]->[0]); + } + # on envoie le message de fin d'envoi de regexps + send ($appSock, sprintf (MSG_FMT, ENDREGEXP, 0, ""), 0) + or _removeFileDescriptor ($appSock) ; +} + +############### PROCEDURE SEND LAST REGEXP TO ALLREADY CONNECTED +sub _sendLastRegexpToAllreadyConnected ($) +{ + my $id = shift; + foreach my $fd (values %sockList) { + send ($fd, sprintf (MSG_FMT, REGEXP, $id, $recCbList[$id]->[0]), + 0) or _removeFileDescriptor ($fd) ; + } +} + +############### PROCEDURE INET ADR BY NAME +sub _inetAdrByName ($) { + my $appName = shift; + + my $addrInet = (grep ($cnnxion{$_} eq $appName, keys %cnnxion))[0]; + + return ("unknow") unless defined $addrInet; + my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/; + + my $host = (gethostbyaddr ($addr, AF_INET))[0] ; + return "$host:$port"; +} + + +############### PROCEDURE REMOVE FILE DESCRIPTOR +sub _removeFileDescriptor ($) +{ + my $fd = shift; + my $diedAppName; + # on vire ce fd des fd a scruter dans la bcle d'evenements + &$fileEventFunc ($fd, '') ; + # on clos la connection + delete $sendRegList{$fd}; + delete $sockList{$fd}; + close ($fd); + + EXT_LOOP: + foreach my $name (keys %appliList) { + foreach my $fdp (@{$appliList{$name}}) { + if ($fd eq $fdp) { + $diedAppName = $name; + @{$appliList{$name}} = grep ($_ ne $fdp, @{$appliList{$name}}); + last EXT_LOOP; + } + } + } + + unless (defined $diedAppName) { + print "_removeFileDescriptor : deconnection de NONAME\n" ; + return; + } + + my $addrInet = (grep ($cnnxion{$_} eq $diedAppName, keys %cnnxion))[0]; + unless (defined $addrInet) { + die "ERREUR _removeFileDescriptor deconnection de $diedAppName ". + "addrInet not defined\n"; + return; + } + #printf "deconnection de %s ($diedAppName)\n", _inetAdrByName ($diedAppName); + delete $cnnxion{$addrInet}; + + # on vire l'entree correspondant a ce canal dans la liste des + # regexps par canal + _scanConnStatus () ; +} + + +############### PROCEDURE SEND ERROR TO +sub _sendErrorTo ($$) +{ + my ($fd, $error) = @_; + + send ($fd, join (' ', ERROR, "0\002$error\n"), 0) + or _removeFileDescriptor ($fd); +} + + +############### PROCEDURE PONG +sub _pong ($) +{ + my $fd = shift; + + send ($fd, join (' ', PONG, "0\002 \n"), 0) + or _removeFileDescriptor ($fd); +} + + +############### PROCEDURE SEND ERROR TO +sub _sendDieTo ($) +{ + my $fd = shift; + + send ($fd, join (' ', DIE, "0\002\n"), 0) + or _removeFileDescriptor ($fd); +} + + +############### PROCEDURE SEND MSG TO +sub _sendMsgTo ($$) +{ + my ($fd, $msg) = @_; + + # pour toutes les fonctions de filtrage de regexp + foreach my $regexpFunc (@{$sendRegList{$fd}}) { + &{$regexpFunc} (\$msg) if defined $regexpFunc; + } +} + + +############### PROCEDURE TK FILE EVENT +sub _tkFileEvent ($$) +{ + my ($fd, $cb) = @_; + + Tk::fileevent ('', $fd, 'readable', $cb) ; +} + + +############### PROCEDURE BUS FILE EVENT +sub fileEvent ($$) +{ + my ($fd, $cb) = @_; + + if ($cb) { + # adding the handler + $localBindByHandle{$fd} = $cb; + $localLoopSel->add ($fd); + } else { + # deleting the handler + delete $localBindByHandle{$fd}; + $localLoopSel->remove ($fd); + } +} + + + +############### PROCEDURE SCAN AFTER +sub _scanAfter () +{ + my $stamp = timeofday (); + foreach my $afk (keys %afterList) { + my $af = $afterList{$afk}; + if ($af->[2] <= $stamp) { + # on traite : le temps de declencher le cb est arrive + if (ref $af->[3] eq 'CODE') { + &{$af->[3]}; + } else { + my ($cb, @args) = @{$af->[3]}; + &$cb (@args); + } + # si c'est un repeat on le reconduit + if ($af->[0]) { + $af->[2] = $stamp + $af->[1] ; + } else { + # si c'est un after on le vire + afterCancel ($afk); + } + } + } +} + + +############### PROCEDURE SCAN CONN STATUS +sub _scanConnStatus () +{ + my (%readyApp, @nonReadyApp); + + foreach (values %cnnxion) { + $readyApp{$_} = 1 unless /^\004/; # connecte mais pas ready + + } + + foreach (@neededApp) { + push (@nonReadyApp, $_) unless exists $readyApp{$_}; + } + + &$statusFunc ([keys %readyApp], \@nonReadyApp); +} + + +############### PROCEDURE TO BE PRUNED +sub _toBePruned ($$) +{ + my ($from, $regexp) = @_; + + + # si il n'y a pas de liste de sujets, on ne + # filtre pas + return 0 unless @topicRegexps; + + unless ($regexp =~ /^\^/) { + #print "DBG> regexp non ANCREE de $from : $regexp\n"; + return (0); + } + + if ($regexp =~ /^\^(\w+)/) { + my $topic = $1; + if (grep (/$topic/, @topicRegexps)) { + # on a trouve ce topic : on ne filtre pas la regexp + #print "DBG> on garde de $from : $regexp\n"; + return (0); + } + #print "DBG> on ELIMINE de $from : $regexp\n"; + return (1); + } else { + #print "DBG> on garde de $from : $regexp\n"; + return (0); + } +} + + +1; -- cgit v1.1