diff options
-rw-r--r-- | Ivy.pm | 1492 |
1 files changed, 819 insertions, 673 deletions
@@ -1,53 +1,36 @@ # -# Ivy, Perl interface +# Ivy, Perl interface # -# Copyright 1997-1999 -# Centre d'Etudes de la Navigation Aerienne +# Copyright 1997-1999 +# Centre d'Études de la Navigation Aérienne # -# Version 2.0 : api OO, envoi des messages par l'intermediaire -# de thread pour ne jamais bloquer sur un envois. +# Authors: Alexandre Bustico <bustico@cenatoulouse.dgac.fr> +# Stéphane Chatty <chatty@cenatoulouse.dgac.fr> # -# TODO : +# All functions # +# $Id$ # -# - touver un mecanisme permettant de sortir meme quand -# un thread d'emission est bloque sur un send. +# Please refer to file Version.pm for the +# copyright notice regarding this software # -# -# BUGS: -# -Si une appli abonnee bloque, son thread d'emission -# reste bloque sur un send, le process ne sortira pas -# sur un <ctrl C>, mais uniquement sur un kill -TERM pid, -# voire sous irix sur un kill -KILL pid -# -# -Sous irix, si on cree dymaniquement des bus et qu'om les -# tue a rythme eleve, au bout d'un moment perl plante. -# -# pour avoir une version debarassee des commentaires et des blancs -# pour voir la longueur reelle du code : -# perl -ne 'chomp; next if /^\s*$/ || /^\s*#/; s/#.*//; print "$_\n";' Ivy.pm -# -# pour commenter tous les print de debug : -# perl -i.bak -ne 's/(printf?\s*\(?\"DBG)/ \# $1/ unless /^\s*\#/; print $_;' Ivy.pm package Ivy ; +require 5.005 ; + use Sys::Hostname; use IO::Socket; use strict ; use Time::Gettimeofday ; -use Thread; -use Thread::Queue; -use Thread::Signal; -use Fcntl; -use Errno; + + use vars qw($VERSION); -$VERSION = '4.1'; +$VERSION = '4.2'; ############################################################################# #### PROTOTYPES ##### ############################################################################# - sub init ($%); # methode de classe, permet de renseigner # tous les parametres globaux. Ces parametres # seront utilises par new si ils ne sont pas @@ -142,31 +125,17 @@ sub _parseIvyBusParam ($); # prends une adresse de bus de la forme # un numero de port et une ref sur une # liste d'adresses addr_inet -sub _senderThread ($$$); # procedure executee dans un thread, qui - # envoie les donnees sur une socket a travers - # une queue. De cette facon, si une appli - # distante bloque, ca ne penalise pas - # l'appli locale, en dehors du fait que la - # queue va grossir jusqu'au not enough memory - # au prix de la memoire c'est pas grave :-) - # deux arguments : la queue et le fd. - -sub _closeSenderThread ($$); # procedure d'arret d'un thread : - # on vide la queue d'emission, on - # place une valeur de sortie dans la queue, - # et on ferme le fd associe a la socket - # d'emission. - sub _substituteEscapedChar ($$); #permet de transormer une regexp etendue # 'perl' en regexp de base + ############################################################################# #### CONSTANTES ##### ############################################################################# -use constant VERSION => 3; use constant MSG_FMT => "%d %d\002%s\n"; # par defaut, on diffuse le bonjour en local -use constant BROADCAST_ADDRS => "127.0.0.1" ; +use constant BROADCAST_ADDRS => "127.255.255.255" ; +use constant BROADCAST_PORT => "2010"; use constant BYE => 0; use constant REGEXP => 1; @@ -197,7 +166,6 @@ use constant REG_PERLISSISME => ('w' => '[a-zA-Z0-9_]', # c'est un delimiteur pour le bus 'e' => '[]') ; - ############################################################################# #### VARIABLES de CLASSE ##### ############################################################################# @@ -295,15 +263,9 @@ use constant broadcastBuses => $constantIndexer++; use constant appName => $constantIndexer++; use constant messWhenReady => $constantIndexer++; - - ############################################################################# #### METHODES PUBLIQUES ##### ############################################################################# - - - -############### METHODE DE CLASSE INIT sub init ($%) { my ($class, %options) = @_; @@ -354,7 +316,8 @@ sub init ($%) # est-il facultatif if (defined $defaultOptions{$opt}) { $options{$opt} = $defaultOptions{$opt} ; - } else { + } + else { # parametre obligatoire die "ERREUR Ivy::init vous devez specifier ". "l'option $opt\n"; @@ -385,13 +348,11 @@ sub init ($%) # mode boucle d'evenement de TK $fileEventFunc = \&_tkFileEvent ; } else { - die qq|l'argument "mainloop mode" doit etre TK ou LOCAL\n|; + die "l'argument \"mainloop mode\" doit etre TK ou LOCAL\n"; } $SIG{'PIPE'} = 'IGNORE' ; } - - ############# METHODE DE CLASSE NEW sub new ($%) @@ -433,13 +394,6 @@ sub new ($%) # tab ass : nom du fd => fd $self->[sockList] = {}; - # tab ass : nom du fd => queue de communication entre - # l'appli et le thread qui fait les envois - $self->[queueList] = {}; - - # tab ass : nom du fd => thread qui fait les envois - $self->[threadList] = {}; - # tab ass : nom de l'appli => fd $self->[appliList] = {}; @@ -532,7 +486,7 @@ sub new ($%) $options{$opt} = $defaultOptions{$opt} ; } else { # parametre obligatoire - die "ERREUR Ivy::start vous devez specifier ". + die "ERREUR Ivy::new vous devez specifier ". "l'option $opt\n"; } } @@ -557,15 +511,42 @@ sub new ($%) return ($self); } - +############### METHODE IVY DESTROY +sub DESTROY ($) +{ + my $self = shift; + return unless exists $allBuses{$self}; + + # print ("DBG DESTROY appele sur l'objet $self\n"); -############## METHODE DE CLASSE STOP -sub stop () + # pour toutes les connections + foreach my $fd (values %{$self->[sockList]}) { + next unless exists ($self->[queueList]->{$fd}); + + foreach my $fd (values %{$self->[sockList]}) { + send ($fd, sprintf (MSG_FMT, BYE, 0, ""), 0) + or $self->_removeFileDescriptor ($fd); + } + } + + # on clos la socket de signalisation (UDP) + # print "DBG> fermeture de supSock\n"; + $self->[supSock]->close() if $self->[supSock]; + delete $allBuses{$self}; + + # on clos la socket de connection + # print "DBG> fermeture de connSock\n"; + $self->[connSock]->close() if $self->[connSock]; + undef (@$self); +} + +############### METHODE DE CLASSE STOP +sub stop () { foreach my $bus (values %allBuses) { $bus->DESTROY(); - } + } # pour toutes les connections } @@ -575,123 +556,116 @@ sub exit () Ivy::stop (); if (defined $localLoopSel) { # boucle locale, on sait faire - # printf ("DBG> undefining localLoopSel\n"); + # printf ("DBG> undefining localLoopSel\n"); undef $localLoopSel; - } else { - - # afficher les threads qui restent - foreach my $t (Thread->list()) { - next if (($t->tid == Thread->self->tid) || $t->tid == 0); - printf ("DBG> Thread %d is active, flag = %d\n", $t->tid, $t->flags) - if $^W; - } + } + else { Tk::exit (); } } - - -################ METHODE START -sub start ($) +############### PROCEDURE BUS START +sub start ($) { my $self = shift; - # cree la socket de connexion, recupere le no de port - my $connSock = $self->[connSock] = IO::Socket::INET->new(Listen => 128, - Proto => 'tcp', - Reuse => 1) ; - # print ("DBG> opening TCP fd $connSock\n"); - # on memorise tout ca, ce qui evitera par la suite de se - # repondre a soi-meme. On le fait sous nos deux noms : - # le nom de machine et 'localhost' - my $hostAddr = (gethostbyname (hostname()))[4] ; - my $localhostAddr = (gethostbyname ('localhost'))[4] ; - $self->[cnnxion]->{"$hostAddr:". $connSock->sockport} = "\004"; - $self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004"; - - # cree la socket de broadcast - $self->[supSock] = IO::Socket::INET->new ( - LocalPort => $self->[broadcastPort], - Proto => 'udp', - Type => SOCK_DGRAM, - Reuse => 1); - - $self->[supSock]->sockopt (SO_BROADCAST, 1); - fcntl ( $self->[supSock], F_SETFL, O_NDELAY) ; - - # et on envoie envoie le bonjour : "no de version no de port" - my $bonjourMsg = sprintf ("%d %d\n", VERSION, $connSock->sockport()); - - foreach my $netBroadcastAddr (@{$self->[broadcastBuses]}) { + # cree la socket de connexion, recupere le no de port + my $connSock = $self->[connSock] = IO::Socket::INET->new(Listen => 128, + Proto => 'tcp', + Reuse => 1) ; + # on memorise tout ca, ce qui evitera par la suite de se + # repondre a soi-meme. On le fait sous nos deux noms : + # le nom de machine et 'localhost' + my $hostAddr = (gethostbyname (hostname()))[4] ; + my $localhostAddr = (gethostbyname ('localhost'))[4] ; + $self->[cnnxion]->{"$hostAddr:". $connSock->sockport} = "\004"; + $self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004"; + + # cree la socket de broadcast + $self->[supSock] = IO::Socket::INET->new + (LocalPort => $self->[broadcastPort], + Proto => 'udp', + Type => SOCK_DGRAM, + Reuse => 1); + + $self->[supSock]->sockopt (SO_BROADCAST, 1); - send ($self->[supSock], $bonjourMsg, 0, $netBroadcastAddr) or - warn "Attention Ivy::start envoi du bonjour a echoue : $!\n"; - } - # callback pour traiter la reception des bonjours - &$fileEventFunc ($self->[supSock], [\&_getBonjour, $self]) ; - - # callback pour traiter les demandes de cxion - &$fileEventFunc ($connSock, [\&_getConnections, $self]) ; + + # et on envoie envoie le bonjour : "no de version no de port" + my $bonjourMsg = sprintf ("%d %d\n", VERSION, $connSock->sockport()); + + foreach my $netBroadcastAddr (@{$self->[broadcastBuses]}) { + send ($self->[supSock], $bonjourMsg, 0, $netBroadcastAddr) or + warn "Ivy::start envoi du bonjour a echoue sur : $!\n"; + } + # callback pour traiter la reception des bonjours + &$fileEventFunc ($self->[supSock], [\&_getBonjour, $self]) ; + + # callback pour traiter les demandes de cxion + &$fileEventFunc ($self->[connSock], [\&_getConnections, $self]) ; } -############### METHODE BIND REGEXP -sub bindRegexp ($$$$) -{ - my ($self, $regexp, $cb, $observer) = @_; - - # on substitue les meta caracteres des regexps perl : \d, \w, \s, \e - # par les classes de caracteres corespondantes de maniere a ce - # qu'une appli distante non perl comprenne ces regexp. - $regexp =~ s| - ( - (?<!\\) \[ # le premier crochet ouvrant non precede d'un \ - .*? # ce qu'il y a dans le crochet, en mode frugal - (?<!\\) \] # le premier crochet fermant non precede d'un \ - ) - | - _substituteEscapedChar ('inside', $1) - |xge; +############### PROCEDURE BIND REGEXP +sub bindRegexp ($$$) { + my ($self, $regexp, $cb) = @_; + + # on substitue les meta caracteres des regexps perl : \d, \w, \s, \e + # par les classes de caracteres corespondantes de maniere a ce + # qu'une appli distante non perl comprenne ces regexp. + $regexp =~ s| + ( + (?<!\\) \[ # le premier crochet ouvrant non precede d'un \ + .*? # ce qu'il y a dans le crochet, en mode frugal + (?<!\\) \] # le premier crochet fermant non precede d'un \ + ) + | + _substituteEscapedChar ('inside', $1) + |xge; - $regexp = _substituteEscapedChar ('outside', $regexp); - # print ("DBG regexp = $regexp\n"); - - if ($cb) { - my $id; - # on rajoute le couple $regexp, $cb dans la liste des messages - # qu'on prend - - # on commence par tester si on a un id libere dans le tableau - for ($id=0; $id <= ($#{$self->[recCbList]}+1); $id++) { - last unless (defined $self->[recCbList][$id]) && - @{$self->[recCbList][$id]->[1]}; - } - $self->[recCbList][$id] = [$regexp, $cb, $observer]; + $regexp = _substituteEscapedChar ('outside', $regexp); + # print ("DBG regexp = $regexp\n"); + + if ($cb) { + my $id; + # on rajoute le couple $regexp, $cb dans la liste des messages + # qu'on prend + + # on commence par tester si on a un id libere dans le tableau + for ($id=0; $id <= ($#{$self->[recCbList]}+1); $id++) { + last unless (defined $self->[recCbList][$id]) && @{$self->[recCbList][$id]->[1]}; + } + $self->[recCbList][$id] = [$regexp, $cb]; - # on envoie les messages regexps aux processus deja connectes - _sendLastRegexpToAllreadyConnected ($self, $id) ; - } else { - # on vire le callback, et on se desabonne de cette regexp - for (my $id=0; $id <= $#{$self->[recCbList]}; $id++) { - next unless (defined $self->[recCbList][$id]) && - @{$self->[recCbList][$id]->[1]}; - if ($self->[recCbList][$id]->[0] eq $regexp) { - $self->[recCbList][$id]->[1] = []; - # on envoie le mesage delregexp - foreach my $fd (values %{$self->[sockList]}) { - Thread::Queue::enqueue ($self->[queueList]->{$fd}, - \sprintf (MSG_FMT, DELREGEXP, $id)); - } - } + # on envoie les messages regexps aux processus deja connectes + _sendLastRegexpToAllreadyConnected ($self, $id) ; + } + else { + + # on vire le callback, et on se desabonne de cette regexp + for (my $id=0; $id <= $#{$self->[recCbList]}; $id++) { + + next unless (defined $self->[recCbList][$id]) && + @{$self->[recCbList][$id]->[1]}; + + if ($self->[recCbList][$id]->[0] eq $regexp) { + + $self->[recCbList][$id]->[1] = []; + # on envoie le mesage delregexp + foreach my $fd (values %{$self->[sockList]}) { + send ($fd, sprintf (MSG_FMT, DELREGEXP, $id), 0) + or $self->_removeFileDescriptor ($fd); } + } } + } } -############### METHODE BIND DIRECT +############### METHODE BIND REGEXP sub bindDirect ($$$) { my ($self, $id, $cb) = @_; - + if ($cb) { # on rajoute la $cb dans la liste des messages # qu'on prend @@ -701,48 +675,48 @@ sub bindDirect ($$$) undef $self->[directCbList][$id]; } } - - -############### METHODE SEND MSGS +############### PROCEDURE SEND MSGS sub sendMsgs ($@) { use attrs qw(locked); my ($self, @msgs) = @_; - my $total = 0; - # pour tous les messages - foreach my $msg (@msgs) { - study ($msg); + my $total = 0; - # pour routes les connections - foreach my $fd (keys %{$self->[sockList]}) { + # pour tous les messages + foreach my $msg (@msgs) { + study ($msg); - # pour toutes les fonctions de filtrage de regexp - foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { - $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; - } + # pour routes les connections + foreach my $fd (keys %{$self->[sockList]}) { + + # pour toutes les fonctions de filtrage de regexp + foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { + $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; + } } } -# print "DBG> sended $total times\n"; - return $total; + # print "DBG> sended $total times\n"; + return $total; } -############### METHODE SEND APP NAME MSGS +############### PROCEDURE SEND MSGS sub sendAppNameMsgs ($@) { use attrs qw(locked); - + my ($self, @msgs) = @_; my $total = 0; + # pour tous les messages foreach (@msgs) { my $msg = "$self->[appName] $_"; study ($msg); - + # pour routes les connections foreach my $fd (keys %{$self->[sockList]}) { - + # pour toutes les fonctions de filtrage de regexp foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; @@ -755,129 +729,96 @@ sub sendAppNameMsgs ($@) -############### METHODE SEND DIRECT MSGS +############### PROCEDURE SEND DIRECT MSGS sub sendDirectMsgs ($$$@) { - use attrs qw(locked); my ($self, $to, $id, @msgs) = @_; - + if (defined ($self->[appliList]{$to})) { my @fds = @{$self->[appliList]{$to}}; # pour tous les messages foreach my $msg (@msgs) { foreach my $fd (@fds) { - Thread::Queue::enqueue ($self->[queueList]->{$fd}, - \sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg")); + send ($fd, sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"), 0) + or $self->_removeFileDescriptor ($fd); } } return 1; } else { - warn "Attention : Ivy::sendDirectMsgs appli $to inconnue\n" if $^W; + warn "Ivy::sendDirectMsgs appli $to inconnue\n"; return 0; } } -############### METHODE SEND DIE TO +############### METHOD SEND DIE TO sub sendDieTo ($$) { use attrs qw(locked); my ($self, $to) = @_; - + if (defined ($self->[appliList]{$to})) { my @fds = @{$self->[appliList]{$to}}; + warn "Attention : Ivy::sendDieTo gros BUG \@fds est vide \n" if (scalar (@fds) == 0); - + # pour tous les messages foreach my $fd (@fds) { - _sendDieTo ($self, $fd); + $self->_sendDieTo($fd); } return 1; - } else { - warn "Attention : Ivy::sendDieTo appli $to inconnue\n" if $^W; + } + else { + warn "Ivy::sendDieTo appli $to inconnue\n" if $^W; return 0; } } -############### METHODE PING +############### METHOD PING sub ping ($$$) { use attrs qw(locked); + my ($self, $to, $timeout) = @_; - + if (defined ($self->[appliList]{$to})) { + my @fds = @{$self->[appliList]{$to}}; + # pour tous les messages foreach my $fd (@fds) { - Thread::Queue::enqueue ($self->[queueList]->{$fd}, - \sprintf (MSG_FMT, PING, 0, " ")); + send ($fd, sprintf (MSG_FMT, PING, 0, " "), 0) + or $self->_removeFileDescriptor ($fd); } } } - -############### METHODE IVY DESTROY -sub DESTROY ($) -{ - my $self = shift; - return unless exists $allBuses{$self}; - - # print ("DBG DESTROY appele sur l'objet $self\n"); - - # pour toutes les connections - foreach my $fd (values %{$self->[sockList]}) { - next unless exists ($self->[queueList]->{$fd}); - - Thread::Queue::enqueue ($self->[queueList]->{$fd}, - \sprintf (MSG_FMT, BYE, 0, "")); - - # on attend un peu avant de fermer la thread, que - # le bye ait le temps d'etre envoye si le - # thread n'est pas bloque par le send. - select (undef, undef, undef, 0.1); - - # on desactive le thread de reception - # et on attend qu'il sorte - $self->_closeSenderThread ($fd); - } - - # on clos la socket de signalisation (UDP) -# print "DBG> fermeture de supSock\n"; - $self->[supSock]->close() if $self->[supSock]; - delete $allBuses{$self}; - - # on clos la socket de connection -# print "DBG> fermeture de connSock\n"; - $self->[connSock]->close() if $self->[connSock]; - undef (@$self); -} - - -############### METHODE MAINLOOP +############### METHODE MAINLOOP sub mainLoop () { die "Erreur Ivy->mainLoop, Ivy doit etre initialise en mode". " loopMode local\n" unless defined $localLoopSel; - - my ($fd, @ready, @allDesc); + my ($fd, @ready, @allDesc); + while (defined $localLoopSel) { @ready = IO::Select::can_read ($localLoopSel, $selectTimout) ; _scanAfter () ; - foreach $fd (@ready) { + foreach my $fd (@ready) { if (ref $localBindByHandle{$fd} eq 'CODE') { &{$localBindByHandle{$fd}} ; - } else { + } + else { my ($cb, @arg) = @{$localBindByHandle{$fd}} ; &$cb (@arg) } } } } - + ############### METHODE AFTER sub after ($$;$) @@ -886,14 +827,16 @@ sub after ($$;$) # appelee de maniere objet : premier argument = class ou une instance # de classe shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; - + my ($timeAfter, $cbListRef) = @_; $timeAfter /= 1000; $selectTimout = $timeAfter if $timeAfter < $selectTimout; - + + # si la valeur de timout est negative : c'est un after sinon + # c'est un repeat $afterList{++$afterId} = [AFTER, $timeAfter, timeofday()+$timeAfter, $cbListRef]; - + return ($afterId); } @@ -903,13 +846,13 @@ sub repeat ($$;$) # test du premier argument au cas ou la fonction soit # appelee de maniere objet : premier argument = class ou une instance # de classe + shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; - # on passe le temps en secondes pour le select my ($timeAfter, $cbListRef) = @_; $timeAfter /= 1000; $selectTimout = $timeAfter if $timeAfter < $selectTimout; - + $afterList{++$afterId}= [REPEAT, $timeAfter, timeofday()+$timeAfter, $cbListRef]; return ($afterId); @@ -922,9 +865,9 @@ sub afterCancel ($;$) # appelee de maniere objet : premier argument = class ou une instance # de classe shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ; - - my $id = shift; + my $id = shift; + if (defined ($id) && defined $afterList{$id}) { if ($afterList{$id}->[1] <= $selectTimout) { delete $afterList{$id} ; @@ -934,13 +877,13 @@ sub afterCancel ($;$) foreach my $af (values %afterList) { $selectTimout = $af->[1] if $af->[1] < $selectTimout ; } - } else { + } + else { delete $afterList{$id} ; } } } - ############### METHODE FILE EVENT sub fileEvent ($$;$) { @@ -969,9 +912,8 @@ sub fileEvent ($$;$) } } - ############################################################################# -#### METHODES PRIVEE ##### +#### METHODES PRIVEES ##### ############################################################################# @@ -979,44 +921,47 @@ sub fileEvent ($$;$) sub _getBonjour ($) { my $self = shift; - + my $bonjourMsg = ''; - + # l'hote distant my $inetAddr = $self->[supSock]->recv ($bonjourMsg, 1024, 0); + unless (length $inetAddr) { warn "Attention : Ivy::_getBonjour recv error, bonjour non traite\n"; return; } + my $addr = (unpack_sockaddr_in ($inetAddr))[1]; - - my $peerName = gethostbyaddr ($addr, AF_INET); + my $peerName = gethostbyaddr ($addr, AF_INET); + # on force $peerPort a etre vu comme une valeur numerique my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/; - + unless (defined ($version) && defined ($peerPort)) { warn "Attention : Ivy::_getBonjour format du message bonjour incorrect\n". "message = $bonjourMsg\n" ; return; } + if ($version != VERSION) { warn "Attention : Ivy::_getBonjour VERSION: demande de connexion de ". "$peerName\n version courrante : " . VERSION . ", recue : $version\n" ; return; } - + # on verifie qu'on ne se repond pas et qu'on ne # se reconnecte pas a un process deja connecte - if (exists ($self->[cnnxion]->{"$addr:$peerPort"})) { - #print "DBG> : bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ; + if (exists ($self->[cnnxion]{"$addr:$peerPort"})) { + #print "DBG> : bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ; return ; - } else { - #print "DBG> : reception de $peerName : bonjour $peerPort\n" ; } - - - # on verifie que l'adresse fasse partie de l'ensemble de reseau + else { + #print "DBG> : reception de $peerName : bonjour $peerPort\n" ; + } + +# on verifie que l'adresse fasse partie de l'ensemble de reseau # definis par ivybus my $addrInIvyBus = 0; my @ivyBusAddrList = map ( (unpack_sockaddr_in ($_))[1], @@ -1036,73 +981,59 @@ sub _getBonjour ($) warn "bonjour de $peerName ignore, ne fait pas partie des ivyBus\n" if $^W; return; } - - + # ouverture du canal de communication my $appSock = IO::Socket::INET->new (PeerAddr => $peerName, PeerPort => $peerPort, Proto => 'tcp'); if ($appSock) { - # on cree la queue et le thread qui vont interfacer les envois - # vers cette appli - $self->[queueList]->{$appSock} = Thread::Queue->new(); - $self->[threadList]->{$appSock} = - Thread->new (\&_senderThread, $self, $self->[queueList]->{$appSock}, - $appSock); - # print "DBG> new thread ${$self->[threadList]->{$appSock}}\n"; # on cree une entree pour $appSock dans la liste des regexp - $self->[cnnxion]->{"$addr:$peerPort"} = 1; + $self->[cnnxion]{"$addr:$peerPort"} = 1; $self->[sendRegList]{$appSock} = []; $self->[buffByConn]{$appSock} = ''; - $self->[sockList]->{$appSock} = $appSock; + $self->[sockList]{$appSock} = $appSock; &$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ; - + # on balance les regexps qui nous interessent a l'appli distante - _sendWantedRegexp ($self, $appSock); - } else { - warn "Attention Ivy::_getBonjour impossible de se connecter au serveur " . + $self->_sendWantedRegexp ($appSock); + } + else { + warn "Attention Ivy::_getBonjour impossible de se connecter au serveur" . "$peerName:$peerPort\n" ; - } + } } -############### METHODE GET CONNECTIONS +############### PROCEDURE GET CONNECTIONS sub _getConnections ($) { my $self = shift; my $appSock = $self->[connSock]->accept(); - + unless (defined $appSock) { warn "Attention Ivy::_getConnections, \$appSock not defined\n"; return; - } else { + } + else { printf "accepting connection from %s:%d\n", - (gethostbyaddr ($appSock->peeraddr(),AF_INET))[0], - $appSock->peerport() if $^W; + (gethostbyaddr ($appSock->peeraddr(),AF_INET))[0], + $appSock->peerport() if $^W; } - + # callback pour traiter la reception des messages - # on cree la queue et le thread qui vont interfacer les envois - # vers cette appli - $self->[queueList]->{$appSock} = Thread::Queue->new(); - $self->[threadList]->{$appSock} = - Thread->new (\&_senderThread, $self, $self->[queueList]->{$appSock}, - $appSock); - # print "DBG> new thread ${$self->[threadList]->{$appSock}}\n"; &$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ; - + # on cree une entree pour $appSock dans la liste des regexp $self->[sendRegList]{$appSock} = []; $self->[buffByConn]{$appSock} = ''; - $self->[sockList]->{$appSock} = $appSock; + $self->[sockList]{$appSock} = $appSock; # on balance les regexps qui nous interessent a l'appli distante - _sendWantedRegexp ($self, $appSock); + $self->_sendWantedRegexp ($appSock); } - ############### METHODE GET MESSAGES sub _getMessages ($$) { @@ -1115,17 +1046,16 @@ sub _getMessages ($$) recv ($appSock, $buffer, 65536, 0) ; unless (length $buffer) { # message null : broken pipe, ça s'est deconnecte a l'autre bout - # on vire ce fd de la boucle d'evenements ainsi que - # le thread associe. + # on vire ce fd de la boucle d'evenements # print ("DBG : _getMessages, recv err, calling removeFileDesc.\n"); # Bon la il faudra un jour clarifier ce bordel, lister toutes # les facons dont un couple d'applis connectee peuevent sortir et # eviter les dead lock qui doivent subsister. if (defined ($localLoopSel)) { -# _removeFileDescriptor ($self, $appSock); - _closeSenderThread ($self, $appSock); - } else { - _closeSenderThread ($self, $appSock); + $self->_removeFileDescriptor ($self, $appSock); + } + else { + $self->_removeFileDescriptor ($self, $appSock); } return; } @@ -1135,146 +1065,178 @@ sub _getMessages ($$) $self->[buffByConn]{$appSock} = ''; } my @messages = split ('\n', $buffer) ; - $self->[buffByConn]{$appSock} = pop (@messages) unless + $self->[buffByConn]{$appSock} = pop (@messages) unless ($buffer =~ /\n$/) ; -# if (defined $appSock->peername) { + # if (defined $appSock->peername) { $addr = $appSock->peeraddr(); $peerPort = $appSock->peerport() ; - $senderName = $self->[cnnxion]->{"$addr:$peerPort"} ; + $senderName = $self->[cnnxion]{"$addr:$peerPort"} ; $senderName = "NONAME" unless $senderName; - foreach my $mess (@messages) { -# print "DBG>mess from $senderName *$mess*\n"; + # print "DBG>mess from $senderName *$mess*\n"; - # on recupere les 3 champs : le type, le numero de regexp, les valeurs - my ($type, $id, $valeurs) = $mess =~ /^(\d+) - \s+ - (\d+) - \002 - (.*)/x ; + # on recupere les 3 champs : le type, le numero de regexp, les valeurs + my ($type, $id, $valeurs) = $mess =~ /^(\d+) + \s+ + (\d+) + \002 + (.*)/x ; # si ca a chie on rale - (warn "Attention Ivy::_getMessages message mal formatte : $mess\n" - and return) unless defined $type ; - - # sinon on fait en fonction du type de message - if ($type == MSG) { # M S G - # on recupere le couple call back, regexp correspondant - # a l'identifiant et on appelle la fonction avec les parametres - # traites par la regexp - if (my @cb = @{$self->[recCbList][$id]->[1]}) { - my $cb = shift @cb; - # on split sur ETX - my $observer = $self->[recCbList][$id]->[2]; - if (defined $observer) { - $observer->$cb ($senderName, @cb, split ("\003", $valeurs)) ; - } - else { - &$cb ($senderName, @cb, split ("\003", $valeurs)) ; - } - } else { - #_sendErrorTo ($appSock, "REEGXP ID $id inconnue"); - warn ("Attention Ivy::_getMessages reception d'un message " . - "MSG : id $id inconnu de $senderName :\n«$mess»"); + (warn "Attention Ivy::_getMessages malformated message $mess\n" and return) unless defined $type ; + + # sinon on fait en fonction du type de message + if ($type == MSG) { # M S G + # on recupere le couple call back, regexp correspondant + # a l'identifiant et on appelle la fonction avec les parametres + # traites par la regexp + if (my @cb = @{$self->[recCbList][$id]->[1]}) { + my $cb = shift @cb; + my $refcb = ref($cb); + if ($refcb ne 'CODE') { + my $method = shift @cb; + # on split sur ETX + $cb->$method($senderName, @cb, split ("\003", $valeurs)) ; + } + else { + &$cb ($senderName, @cb, split ("\003", $valeurs)) ; + } + } + else { + #_sendErrorTo ($appSock, "REEGXP ID $id inconnue"); + warn ("Attention Ivy::_getMessages reception d'un message ". + "MSG : id $id inconnu de $senderName :\n«$mess»"); + } + } + elsif ($type == BYE) { + #print "reception d'un bye\n"; + $self->_removeFileDescriptor ($appSock); # B Y E + } + elsif ($type == REGEXP) { # R E G E X P + # on ajoute une fonction traitant la regexp et envoyant le + # message sur le bon fd dans la liste des fonctions de filtrage + # ca permet de compiler les regexp avec 'once' donc une + # fois pour toute, et ainsi optimiser la vitesse de + # filtrage des messages a envoyer + next if $self->_toBePruned ($senderName, $valeurs); + unless (defined $self->[sendRegList]{$appSock}->[$id]) { + # si l'id de regexp n'etait pas utilisee c'est tout bon + # on affecte la nouvelle regexp a un id + $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_'; + sub { + use strict; + if (my @args = ${$_[0]} =~ /($valeurs)/o) { + shift @args; + $args[$#args] .= "\003" if @args; + send ($appSock, sprintf (MSG_FMT, + MSG, $id, join ("\003",@args)), 0) + or $self->_removeFileDescriptor ($appSock) ; + #print join (' ', "DBG > J'envoie MSG", $id, @args, "\n"); + return 1; } - } elsif ($type == BYE) { - #print "reception d'un bye\n"; - $self->_closeSenderThread ($appSock); - } elsif ($type == REGEXP) { # R E G E X P - # on ajoute une fonction traitant la regexp et envoyant le - # message sur le bon fd dans la liste des fonctions de filtrage - # ca permet de compiler les regexp avec 'once' donc une - # fois pour toute, et ainsi optimiser la vitesse de - # filtrage des messages a envoyer - next if _toBePruned ($self, $senderName, $valeurs); - unless (defined $self->[sendRegList]{$appSock}->[$id]) { - # si l'id de regexp n'etait pas utilisee c'est tout bon - # on affecte la nouvelle regexp a un id - $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_'; - sub { - use strict; - if (my @args = ${$_[0]} =~ /($valeurs)/o) { - shift @args; - $args[$#args] .= "\003" if @args; - my $queue = $self->[queueList]->{$appSock}; - return 0 unless defined $queue; - $queue->enqueue (\sprintf (MSG_FMT, MSG, $id, - join ("\003",@args))); - # print join (' ', "DBG > J'envoie MSG", $id, @args, "\n"); - return 1; - } - }; + }; _EOL_ - } else { - # l'id de la regexp etait deja utilise, - # et n'a pas ete libere par un message delregexp, - # on renvoie donc un message d'erreur - _sendErrorTo ($self, $appSock, "ID $id deja utilisee"); + } + else { + # l'id de la regexp etait deja utilise, + # et n'a pas ete libere par un message delregexp, + # on renvoie donc un message d'erreur + $self->_sendErrorTo($appSock, "ID $id deja utilisee"); } - } elsif ($type == ERROR) { # E R R O R + } + elsif ($type == ERROR) { # E R R O R warn ("Attention Ivy::_getMessages ERREUR recue de ". - "$senderName : «$valeurs»\n"); - } elsif ($type == DELREGEXP) { # D E L R E G E X P - # on vire la regexp des regexps vefifier - $self->[sendRegList]{$appSock}->[$id] = undef ; - } elsif ($type == ENDREGEXP) { # E N D R E G E X P - # on envoie le message ready uniquement a celui qui nous - # a envoye le message endregexp - _sendMsgTo ($self, $appSock, $self->[messWhenReady]); - # on passe de l'etat Connecte a l'etat Ready - $self->[cnnxion]->{"$addr:$peerPort"} =~ s/^\004//; - $senderName = $self->[cnnxion]->{"$addr:$peerPort"}; - unless (exists $self->[appliList]{$senderName}) { - $self->[appliList]->{$senderName} = [$appSock]; - } else { - push @{$self->[appliList]->{$senderName}}, $appSock; - } - _scanConnStatus ($self); - } elsif ($type == APP_NAME) { - # etat Connecte - if (($self->[appName] eq $valeurs) && $^W) { - warn "\033[1mATTENTION : Ivy::_getMessages une instance de ". - "$self->[appName] existe deja\033[m\n" ; - } - $senderName = $valeurs; - $self->[cnnxion]->{"$addr:$peerPort"} = "\004$valeurs"; - } elsif ($type == DIRECT_MSG) { - if (defined $self->[directCbList][$id]) { - my @cb = @{$self->[directCbList][$id]}; - my $cb = shift @cb; - &$cb (@cb, $valeurs); - } else { - _sendErrorTo ($self, $appSock, "DIRECT ID $id inconnue"); - warn "Attention Ivy::_getMessages reception d'un message " . - "DIRECT d'id $id inconnue de $senderName :\n«$mess»"; - } - } elsif ($type == DIE) { - # il faut quitter - # on commence par appeler la callback de fin - my @cb = @{$onDieFunc}; - my $cb = shift @cb; - &$cb (@cb); - # on avertit les autres qu'on se barre - my $adr = _inetAdrByName ($self, $senderName) ; - warn "Attention Ivy::_getMessages reception d'un ordre " . - "de suicide de $senderName ($adr)... exiting\n" if $^W; - # adios - Ivy::exit (); - } elsif ($type == PING) { - # si on recois un ping, on envoie un pong - _pong ($self, $appSock); - } elsif ($type == PONG) { - return PONG; - } else { - _sendErrorTo ($self, $appSock, "TYPE DE MESS $type inconnu"); - warn ("Attention Ivy::_getMessages reception d'un message " . - "de type $type inconnu de $senderName :\n«$mess»"); + "$senderName : «$valeurs»\n"); + } + elsif ($type == DELREGEXP) { # D E L R E G E X P + # on vire la regexp des regexps vefifier + $self->[sendRegList]{$appSock}->[$id] = undef ; + } + elsif ($type == ENDREGEXP) { # E N D R E G E X P + # on envoie le message ready uniquement a celui qui nous + # a envoye le message endregexp + $self->_sendMsgTo ($appSock, $self->[messWhenReady]); + + # on passe de l'etat Connecte a l'etat Ready + $self->[cnnxion]{"$addr:$peerPort"} =~ s/^\004//; + $senderName = $self->[cnnxion]{"$addr:$peerPort"}; + + unless (exists $self->[appliList]{$senderName}) { + $self->[appliList]{$senderName} = [$appSock]; + } + else { + push @{$self->[appliList]{$senderName}}, $appSock; + } + + $self->_scanConnStatus (); + } + elsif ($type == APP_NAME) { + # etat Connecte + if (($self->[appName] eq $valeurs) && $^W) { + warn "\033[1mATTENTION : Ivy::_getMessages une instance de ". + "$self->[appName] existe deja\033[m\n" ; } - } -return 0; -} + + $senderName = $valeurs; + $self->[cnnxion]{"$addr:$peerPort"} = "\004$valeurs"; + } + elsif ($type == DIRECT_MSG) { + + if (defined $self->[directCbList][$id]) { + my @cb = @{$self->[directCbList][$id]}; + my $cb = shift @cb; + my $refcb = ref($cb); + if ($refcb ne 'CODE') { + my $method = shift @cb; + $cb->$method(@cb, $valeurs); + } + else { + &$cb (@cb, $valeurs); + } + } + else { + $self->_sendErrorTo ($appSock, "DIRECT ID $id inconnue"); + warn "Attention Ivy::_getMessages reception d'un message ". + "DIRECT d'id $id inconnue de $senderName :\n«$mess»"; + } + } elsif ($type == DIE) { + # il faut quitter + # on commence par appeler la callback de fin + my @cb = @{$onDieFunc}; + my $cb = shift @cb; + my $refcb = ref($cb); + if ($refcb ne 'CODE') { + my $method = shift @cb; + $cb->$method(@cb); + } + else { + &$cb (@cb); + } + # on avertit les autres qu'on se barre + my $adr = $self->_inetAdrByName ($senderName) ; + warn "Attention Ivy::_getMessages reception d'un ordre " . + "de suicide de $senderName ($adr) ... exiting\n" if $^W; + # adios + Ivy::exit (); + + } + elsif ($type == PING) { + # si on recois un ping, on envoie un pong + $self->_pong ($appSock); + } + elsif ($type == PONG) { + return PONG; + } + else { + _$self->sendErrorTo ($appSock, "TYPE DE MESS $type inconnu"); + warn ("reception d'un message de type $type inconnu de " . + "$senderName :\n«$mess»"); + } + } + return 0; + } ############### METHODE SEND WANTED REGEXP sub _sendWantedRegexp ($$) @@ -1282,116 +1244,109 @@ sub _sendWantedRegexp ($$) my ($self, $appSock) = @_; # on envoie le message "Nom appli" - Thread::Queue::enqueue ($self->[queueList]->{$appSock}, - \sprintf (MSG_FMT, APP_NAME, 0, $self->[appName])); + send ($appSock, sprintf (MSG_FMT, APP_NAME, 0, $self->[appName]), 0) + or $self->_removeFileDescriptor ($appSock) ; # on envoie les regexps for (my $id = 0; $id <= $#{$self->[recCbList]}; $id++) { - next unless defined $self->[recCbList]->[$id]->[1]->[0]; - - Thread::Queue::enqueue ($self->[queueList]->{$appSock}, - \sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0])); + next unless defined $self->[recCbList][$id]->[1]->[0]; + + send ($appSock, + sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]), + 0) or $self->_removeFileDescriptor ($appSock) ; # print sprintf ("DBG > %s %d %s\n", # 'REGEXP', $id, $self->[recCbList][$id]->[0]); } # on envoie le message de fin d'envoi de regexps - Thread::Queue::enqueue ($self->[queueList]->{$appSock}, - \sprintf (MSG_FMT, ENDREGEXP, 0, "")); + send ($appSock, sprintf (MSG_FMT, ENDREGEXP, 0, ""), 0) + or $self->_removeFileDescriptor ($appSock) ; } ############### METHODE SEND LAST REGEXP TO ALLREADY CONNECTED sub _sendLastRegexpToAllreadyConnected ($$) { - my ($self, $id) = @_; - foreach my $fd (values %{$self->[sockList]}) { - Thread::Queue::enqueue ($self->[queueList]->{$fd}, - \sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id][0])); - } + my ($self, $id) = @_; + + foreach my $fd (values %{$self->[sockList]}) { + send ($fd, sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]), + 0) or $self->_removeFileDescriptor ($fd) ; + } } ############### METHODE INET ADR BY NAME sub _inetAdrByName ($$) { - my ($self, $appName) = @_; - my $addrInet = (grep ($self->[cnnxion]->{$_} eq $appName, - keys %{$self->[cnnxion]}))[0]; - - return ("unknow") unless defined $addrInet; - my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/; - - my $host = (gethostbyaddr ($addr, AF_INET))[0] ; - return "$host:$port"; + my ($self, $appName) = @_; + + my $addrInet = (grep ($self->[cnnxion]{$_} eq $appName, + keys %{$self->[cnnxion]}))[0]; + + return ("unknow") unless defined $addrInet; + my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/; + + my $host = (gethostbyaddr ($addr, AF_INET))[0] ; + return "$host:$port"; } -############### METHODE REMOVE FILE DESCRIPTOR +############### PROCEDURE REMOVE FILE DESCRIPTOR sub _removeFileDescriptor ($$) { - my ($self, $fd) = @_; - my $diedAppName; + my ($self, $fd) = @_; - - - # on s'est deja occupe de lui - return unless exists $self->[sockList]->{$fd}; - # printf ("DBG> _removeFileDescriptor IN thread %s\n", ${Thread->self}); - - # on efface les structures de donnees associees au fd - # on vire ce fd des fd a scruter dans la bcle d'evenements - # uniquement si on est dans le thread principal - # sinon le select merde salement sur ce coup - &$fileEventFunc ($fd, ''); - delete $self->[sendRegList]->{$fd}; - delete $self->[sockList]->{$fd}; - delete $self->[buffByConn]->{$fd}; - delete $self->[queueList]->{$fd}; - - # on clos la connection - $fd->close (); - -# EXT_LOOP: -# foreach my $name (keys %{$self->[appliList]}) { -# @{$self->[appliList]{$name}} = -# grep ($_ ne $fd, @{$self->[appliList]{$name}}); -# if (length (@{$self->[appliList]{$name}}) == 0) { -# delete $self->[appliList]->{$name}} ; -# } -# } - - EXT_LOOP: - foreach my $name (keys %{$self->[appliList]}) { - foreach my $fdp (@{$self->[appliList]{$name}}) { - if ($fd eq $fdp) { - $diedAppName = $name; - @{$self->[appliList]{$name}} = - grep ($_ ne $fdp, @{$self->[appliList]{$name}}); - if (scalar (@{$self->[appliList]{$name}}) == 0) { - delete $self->[appliList]->{$name}} ; - last EXT_LOOP; + my $diedAppName; + + # on s'est deja occupe de lui + return unless exists $self->[sockList]->{$fd}; + # printf ("DBG> _removeFileDescriptor IN thread %s\n", ${Thread->self}); + + # on efface les structures de donnees associees au fd + # on vire ce fd des fd a scruter dans la bcle d'evenements + # uniquement si on est dans le thread principal + # sinon le select merde salement sur ce coup + &$fileEventFunc ($fd, '') ; + delete $self->[sendRegList]{$fd}; + delete $self->[sockList]{$fd}; + delete $self->[buffByConn]->{$fd}; + + $fd->close(); + + EXT_LOOP: + foreach my $name (keys %{$self->[appliList]}) { + foreach my $fdp (@{$self->[appliList]{$name}}) { + if ($fd eq $fdp) { + $diedAppName = $name; + @{$self->[appliList]{$name}} = + grep ($_ ne $fdp, @{$self->[appliList]{$name}}); + if (scalar (@{$self->[appliList]{$name}}) == 0) { + delete $self->[appliList]->{$name} + } + last EXT_LOOP; + } } } - } - - - unless (defined $diedAppName) { - warn "Ivy::_removeFileDescriptor : deconnection de NONAME\n" if $^W; - return; - } - - my $addrInet = (grep ($self->[cnnxion]->{$_} eq $diedAppName, - keys %{$self->[cnnxion]}))[0]; - unless (defined $addrInet) { - die "ERREUR _removeFileDescriptor deconnection de $diedAppName ". - "addrInet not defined\n"; - return; - } - # printf "DBG> _removeFileDescriptor : deconnection de %s ($diedAppName)\n", _inetAdrByName ($self, $diedAppName); + + unless (defined $diedAppName) { + warn "Ivy::__removeFileDescriptor : deconnection de NONAME\n" if $^W; + return; + } - delete $self->[cnnxion]->{$addrInet}; - - # on vire l'entree correspondant a ce canal dans la liste des - # regexps par canal - _scanConnStatus ($self) ; + my $addrInet = (grep ($self->[cnnxion]{$_} eq $diedAppName, + keys %{$self->[cnnxion]}))[0]; + + unless (defined $addrInet) { + die "ERREUR _removeFileDescriptor deconnection de $diedAppName ". + "addrInet not defined\n"; + return; + } + + #printf "DBG> _removeFileDescriptor : deconnection de %s ($diedAppName)\n", _inetAdrByName ($diedAppName); + + delete $self->[cnnxion]{$addrInet}; + + # on vire l'entree correspondant a ce canal dans la liste des + # regexps par canal + $self->_scanConnStatus () ; } @@ -1400,8 +1355,8 @@ sub _sendErrorTo ($$$) { my ($self, $fd, $error) = @_; - Thread::Queue::enqueue ($self->[queueList]->{$fd}, - \join (' ', ERROR, "0\002$error\n")); + send ($fd, join (' ', ERROR, "0\002$error\n"), 0) + or $self->_removeFileDescriptor ($fd); } @@ -1409,9 +1364,9 @@ sub _sendErrorTo ($$$) sub _pong ($$) { my ($self, $fd) = @_; - - Thread::Queue::enqueue ($self->[queueList]->{$fd}, - \join (' ', PONG, "0\002\n")); + + send ($fd, join (' ', PONG, "0\002 \n"), 0) + or $self->_removeFileDescriptor ($fd); } @@ -1420,63 +1375,64 @@ sub _sendDieTo ($$) { my ($self, $fd) = @_; - Thread::Queue::enqueue ($self->[queueList]->{$fd}, - \join (' ', DIE, "0\002\n")); + send ($fd, join (' ', DIE, "0\002\n"), 0) + or $self->_removeFileDescriptor ($fd); } ############### METHODE SEND MSG TO sub _sendMsgTo ($$$) { - my ($self, $fd, $msg) = @_; + my ($self, $fd, $msg) = @_; - # pour toutes les fonctions de filtrage de regexp - foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { - &{$regexpFunc} (\$msg) if defined $regexpFunc; - } + # pour toutes les fonctions de filtrage de regexp + foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { + &{$regexpFunc} (\$msg) if defined $regexpFunc; + } } -############### METHODE TK FILE EVENT +############### PROCEDURE TK FILE EVENT sub _tkFileEvent ($$) -{ +{ my ($fd, $cb) = @_; Tk::fileevent ('', $fd, 'readable', $cb) ; -} - - +} -############### METHODE SCAN AFTER +############### PROCEDURE SCAN AFTER sub _scanAfter () { - my $stamp = timeofday (); - $selectTimout = MAX_TIMOUT; - foreach my $afk (keys %afterList) { - my $af = $afterList{$afk}; - # si ce timer est a declencher - if ($af->[2] <= $stamp) { - # on traite : le temps de declencher le cb est arrive - if (ref $af->[3] eq 'CODE') { - &{$af->[3]}; - } else { - my ($cb, @args) = @{$af->[3]}; - &$cb (@args); - } - # si c'est un repeat on le reconduit - if ($af->[0]) { - $af->[2] = $stamp + $af->[1] ; - $selectTimout = $af->[1] if $af->[1] < $selectTimout; - } else { - # si c'est un after on le vire - afterCancel ($afk); - } - } else { - my $timeTotrigg = $af->[2] - $stamp; - $selectTimout = $timeTotrigg if $timeTotrigg < $selectTimout; - } + my $stamp = timeofday (); + $selectTimout = MAX_TIMOUT; + foreach my $afk (keys %afterList) { + my $af = $afterList{$afk}; + # si ce timer est a declencher + if ($af->[2] <= $stamp) { + # on traite : le temps de declencher le cb est arrive + if (ref $af->[3] eq 'CODE') { + &{$af->[3]}; + } + else { + my ($cb, @args) = @{$af->[3]}; + &$cb (@args); + } + # si c'est un repeat on le reconduit + if ($af->[0]) { + $af->[2] = $stamp + $af->[1] ; + $selectTimout = $af->[1] if $af->[1] < $selectTimout; + } + else { + # si c'est un after on le vire + afterCancel ($afk); + } + } + else { + my $timeTotrigg = $af->[2] - $stamp; + $selectTimout = $timeTotrigg if $timeTotrigg < $selectTimout; } + } } @@ -1491,7 +1447,7 @@ sub _scanConnStatus ($) next if $_ eq "1"; $readyApp{$_}++ unless /^\004/; # connecte mais pas ready } - + foreach (@{$self->[neededApp]}) { push (@nonReadyApp, $_) unless exists $readyApp{$_}; } @@ -1510,48 +1466,50 @@ sub _scanConnStatus ($) ############### METHODE TO BE PRUNED sub _toBePruned ($$$) { - my ($self, $from, $regexp) = @_; - - - # si il n'y a pas de liste de sujets, on ne - # filtre pas - return 0 unless @{$self->[topicRegexps]}; + my ($self, $from, $regexp) = @_; - unless ($regexp =~ /^\^/) { - #print "DBG> regexp non ANCREE de $from : $regexp\n"; - return (0); - } - - if ($regexp =~ /^\^(\w+)/) { - my $topic = $1; - if (grep (/$topic/, @{$self->[topicRegexps]})) { - # on a trouve ce topic : on ne filtre pas la regexp - #print "DBG> on garde de $from : $regexp\n"; - return (0); - } - #print "DBG> on ELIMINE de $from : $regexp\n"; - return (1); - } else { - #print "DBG> on garde de $from : $regexp\n"; - return (0); + # si il n'y a pas de liste de sujets, on ne + # filtre pas + return 0 unless @{$self->[topicRegexps]}; + + unless ($regexp =~ /^\^/) { + #print "DBG> regexp non ANCREE de $from : $regexp\n"; + return (0); + } + + if ($regexp =~ /^\^(\w+)/) { + my $topic = $1; + if (grep (/$topic/, @{$self->[topicRegexps]})) { + # on a trouve ce topic : on ne filtre pas la regexp + #print "DBG> on garde de $from : $regexp\n"; + return (0); } + #print "DBG> on ELIMINE de $from : $regexp\n"; + return (1); + } + else { + #print "DBG> on garde de $from : $regexp\n"; + return (0); + } } -############### METHODE PARSE IVY BUS PARAM +############### PROCEDURE PARSE IVY BUS PARAM sub _parseIvyBusParam ($) { my $ivyBus = shift; + my ($ivyNetworks, $ivyPort) = $ivyBus =~ /^(.*):(.*)/; + die ("Erreur Ivy::_parseIvyBusParam format de l'adresse ou ". "no de port incorrect : $ivyBus\n") unless $ivyPort =~ /^\d+$/; my @ivyAddrInet = (); - $ivyNetworks =~ s/ //g; + $ivyNetworks =~ s/ //g; my @broadcastAddrs = split (',', $ivyNetworks); - + foreach my $netAddr (@broadcastAddrs) { $netAddr = BROADCAST_ADDRS if (($netAddr eq '') || ($netAddr =~ /^127/) || @@ -1594,107 +1552,295 @@ sub _parseIvyBusParam ($) return ($ivyPort, \@ivyAddrInet); } +############# Procedure _SUBSTITUTE ESCAPED CHAR +sub _substituteEscapedChar ($$) +{ + my ($scope, $reg) = @_; -############# Methode _ SENDER THREAD -sub _senderThread ($$$) { - my ($self, $queue, $fd) = @_; - my $data; - # condition d'arret du thread : - # pour l'arreter on enqueue undef. - while ($data = $queue->dequeue()) { - # si le send echoue, on vire le fd et on sort du thread. -# last unless syswrite ($fd, $$data, length ($$data)); - last unless send ($fd, $$data, 0); - } + my %escapeRegexp = REG_PERLISSISME; + # Si on fait la substitution dans une classe de caractere + # on elimine les crochets. + grep ($escapeRegexp{$_} =~ s/[\[\]]//g, keys %escapeRegexp) + if ($scope eq 'inside') ; + + $reg =~ s/\\([wWsSdDne])/$escapeRegexp{$1}/ge; + return $reg; } +1; -############# Methode _ SENDER THREAD_NON_BLOQUANT +__END__ -# tentative pour pouvoir arreter facilement un thread -# bloque sur un send. Mais c'est pas satisfaisant -# car quand le fd est de nouveau accessible en ecriture, -# les donnees accumulees dans l'accumulateur ne sont -# envoyees que lorsque l'on ecrit a nouveau dans -# la queue de communication du thread -# Je laisse ce code pour info. -# sub _senderThread_NON_BLOQUANT ($$$) { -# my ($self, $queue, $fd) = @_; -# my $data; -# my @buff = (); - -# # on ne bloque plus sur ce fd -# fcntl ($fd, F_SETFL, O_NDELAY) ; - -# EXT_LOOP: -# for (;;) { -# # condition d'arret du thread : -# # pour l'arreter on enqueue undef. -# last unless defined ($data = Thread::Queue::dequeue ($queue)); - -# # on colle les donnees dans un accumulateur -# push (@buff, $data); - -# # tant qu'on peut ecrire sur le fd, on le fait -# while ($data = shift @buff) { -# unless (syswrite ($fd, $$data, length ($$data))) { -# # le write a plante : -# if ($! == Errno::EWOULDBLOCK) { -# # si c'est parce que le fd est temporairement innacessible -# # on remets le message dans l'accumulateur et on se remets -# # en attente sur la queue de communication -# unshift (@buff, $data); -# next EXT_LOOP; -# } else { -# # sinon c'est que le fd a ete ferme, dans ce cas le thread sort -# last EXT_LOOP; -# } -# } -# } - -# } -# print ("DBG> sortie de thread\n"); -# } +=head1 NAME +Ivy - Perl extension for implementing a software bus +=head1 SYNOPSIS +use Ivy; +=head1 DESCRIPTION -############# Methode _CLOSE SENDER THREAD -sub _closeSenderThread ($$) -{ - my ($self, $fd) = @_; - my $queue = $self->[queueList]->{$fd}; - - # on vide la queue. Oui je sais :-) - # printf ("DBG> on vide la queue dans le thread %s\n", ${Thread->self()}); - if (defined $queue) { - while ($queue->dequeue_nb ()) {}; - $queue->enqueue (undef); - _removeFileDescriptor ($self, $fd); - } +The Ivy perl module implements a software bus to provide with an easy +communication between applications. Messages are broadcasted as ASCII strings +over a network defined by a list of domains and a port. +Messages are received if they match a regular expressions and if your application +is on the same network as remote ones. +Before receive or send message you must call 'init', and 'new' class methods, +followed by 'start' method. +When you quit your application don't forget to call 'exit' class methods. - # printf ("DBG> demande de join du thread ${$self->[threadList]->{$fd}}\n"); - $self->[threadList]->{$fd}->join() if defined $self->[threadList]->{$fd};; - # printf ("DBG> done\n"); -} +=head1 CLASS METHODS +=head2 Ivy->init(...) +Allows one to define global parameters which may be used as default ones +at object creation time. -############# Procedure _SUBSTITUTE ESCAPED CHAR -sub _substituteEscapedChar ($$) -{ - my ($scope, $reg) = @_; +Parameters are : - my %escapeRegexp = REG_PERLISSISME; - # Si on fait la substitution dans une classe de caractere - # on elimine les crochets. - grep ($escapeRegexp{$_} =~ s/[\[\]]//g, keys %escapeRegexp) - if ($scope eq 'inside') ; +=over 4 - $reg =~ s/\\([wWsSdDne])/$escapeRegexp{$1}/ge; - return $reg; +=item B<-loopMode =E<gt> 'TK'|'LOCAL'> + +Mode of events loop among TK or LOCAL. According to this mode, you must +use Ivy->mainLoop or Tk::MainLoop(3) + +=item B<-appName =E<gt> 'your app ivy name'> + +Name of your application used to identify on ivy bus. + +=item B<-ivyBus =E<gt> 'domain 1,...,domain n:port number'> + +A list of domains, followed by port number where to broadcast messages. +Default is 127.255.255.255:2010 + +=item B<-messWhenReady =E<gt> 'your message when ready'> + +Synchronisation message sent when application is ready to receive and send +messages. + +=item B<-onDieFunc =E<gt> [\&yourdiefunc, @parameters]> + +=item B<-onDieFunc =E<gt> [$an_object, \&a_method, @parameters]> + +A callback or method to call when application receive die message. +Don't make an exit in callback, ivy we'll do it for you. + +Prototype of your callback must be : + +sub MyCallback { + my @parameters = @_; + + ... } +Prototype of your method must be : -1; +sub MyMethod { + my ($self, @parameters) = @_; + + ... +} + +=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']> + +Optimize communication using this option. Regexps which don't match these subjects are removed. + +Example : + + Ivy->init(-loopMode =E<gt> 'TK', + -appName =E<gt> 'MyWonderfulApp', + -onDieFunc =E<gt> [\&restorecontext]); + +=back + +=head2 Ivy->new(...); + +Check parameters, and create an ivy bus object. You must call Ivy->init before +this one. + +Parameters are : + +=over 4 + +=item B<-appName =E<gt> 'your app ivy name'> + +Name of your application used to identify on ivy bus. + +=item B<-ivyBus =E<gt> 'domain 1,...,domain n:port number'> + +A list of domains, followed by port number where to broadcast messages. +Default is 127.255.255.255:2010 + +=item B<-messWhenReady =E<gt> 'your message when ready'> + +Synchronisation message sent when application is ready to receive and send +messages. + +=item B<-onDieFunc =E<gt> [\&yourdiefunc, @parameters]> + +=item B<-onDieFunc =E<gt> [$an_object, \&a_method, @parameters]> + +A callback or method to call when application receive die message. +Don't make an exit in callback, ivy we'll do it for you. +Prototype of your callback must be : + + sub MyCallback { + my @parameters = @_; + ... + } + +Prototype of your method must be : + + sub MyMethod { + my ($self, @parameters) = @_; + ... + } + +=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']> + +Optimize communication using this option. Regexps which don't match these subjects are removed. + +=item B<-neededApp =E<gt> ['app 1', ..., 'app n']> + +A list of application your own one needs to correctly run. + +=item B<-statusFunc =E<gt> sub {}> + +A callback which will be called until every needed app is present on bus. + +Your callback could be : + + sub MyCallback { + my ($present, $absent, %present) = @_; + + foreach my $remoteapp (keys %present) { + if ($present{$remoteapp} > 1) { + print "n apps $remoteapp are presents on bus\n"; + } + } + +Example : + + Ivy->new(-ivyBus => '156.255.255.255,157.255.255.255:2204', + -onDieFunc => [\&restorecontext], + -neededApp => ["DataServer", "HMI"], + -statusFunc => \&startwhenpresents); + +=back + +=head2 Ivy->mainLoop; + +Local main events loop. Use it if you don't use Tk library. + +=head2 $ivyobj->stop; + +=head1 OBJECT METHODS + +=head2 $ivyobj->start; + +You must call it after you are ready to communicate through ivy bus +and before you really communicate. + +=head2 $ivyobj->sendMsgs(@messages); + +Send a list of messages + +Example : + + $ivyobj->sendMsgs("Hello World", "Don't Bother", "Y2K is behind us"); + +=head2 $ivyobj->sendAppNameMsgs(@messages); + +Send a list of messages precedeed by ivy application name. + +Example : + + $ivyobj->sendMsgs("Hello World"); + # it will send "$appName Hello World" over ivy bus + +=head2 $ivyobject->bindRegexp($regexp, [\&callback, @cb_parameters]); + +=head2 $ivyobject->bindRegexp($regexp, [$an_obj, \&method, @cb_parameters]); + +Allow one to associate a message which matches a regular expression and a +callback or method. See perlre(1), to find how to write regexps. +Use the bracketing construct ( ... ), so that ivy perl will call +callback with matched patterns as parameters. + +Example : + + $ivyobject->bindRegexp("\w+ (\d+)", [\&callback, @cb_parameters]); + # You callback will be called with one more parameter which will be + # a number precedeed by a word, because of bracketed regexp. + +Your callback and method protos must be : + + sub cb { + my ($sendername, @cb_parameters, + @matched_regexps_in_brackets) = @_; + ... + } + + sub method { + my ($self, $sendername, @cb_parameters, + @matched_regexps_in_brackets) = @_; + ... + } + +=head2 $ivyobj->sendDirectMsgs($to, $id, @msgs); + +Send a message but Ask Alex to find what are $to and $id + +=head2 $ivyobj->bindDirect($regexp, $id, [\&callback, @cb_parameters]); + +=head2 $ivyobj->bindDirect($regexp, $id, [$an_obj, \&method, @cb_parameters]); + +Same as bindRegexp method but Ask Alex to find what is $id. + +=head2 $ivyobj->sendDieTo($to) + +send a die message to $to application name. + +=head2 $ivyobj->ping($to, $timeout); + +send a ping message and wait until timeout to receive a pong. + +=head2 $after_id = $ivyobj->after($timeAfter, \@callbacks_list); + +Call a list of callbacks after $timeAfter mseconds. + +=head2 $repeat_id = $ivyobj->repeat($timeAfter, \@callbacks_list); + +Repeat calls of a list of callbacks after $timeAfter mseconds. + +=head2 $ivyobj->afterCancel($after_or_repeat_id); + +Cancel an after callback call. + +=head2 $ivyobj->fileEvent($fd, $cb); + +Ask Alex + +=head2 $ivyobj->DESTROY; + +=head1 BUGS + +No know bugs at this time. Report them to author. + +=head1 SEE ALSO + +perl(1), perlre(1) + +=head1 AUTHORS + +Alexandre Bustico <bustico@tls.cena.fr>, Herve Damiano <damiano@tls.cena.fr> + +=head1 COPYRIGHT + +CENA + +=head1 HISTORY + +=cut |