diff options
-rw-r--r-- | Ivy.pm | 553 |
1 files changed, 426 insertions, 127 deletions
@@ -1,13 +1,13 @@ # # Ivy, Perl interface # -# Copyright 1997-2003 +# Copyright 1997-2006 # Centre d'Études de la Navigation Aérienne # -# Authors: Alexandre Bustico <bustico@cena.fr> -# Stéphane Chatty <chatty@cena.fr> -# Hervé Damiano <damiano@cena.fr> -# Christophe Mertz <mertz@cena.fr> +# Authors: Alexandre Bustico <alexandre.bustico@cena.fr> +# Stéphane Chatty <chatty@intuilab.com> +# Hervé Damiano <herve.damiano@aviation-civile.gouv.fr> +# Christophe Mertz <mertz@intuilab.com> # # All functions # @@ -30,6 +30,9 @@ # ################################################################## +# TODO +# + package Ivy ; use Sys::Hostname; @@ -40,6 +43,7 @@ use Carp; use IO::Socket::Multicast; use vars qw($VERSION); +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); # to compute the VERSION from the CVS tag (or if no tag, as the cvs file revision) my $TAG= q$Name$; @@ -80,7 +84,7 @@ sub start; # debut de l'integration au bus : # getConnections # pour etablir les connections "application" -sub DESTROY ($); # - envoie un BYE et clot les connections +sub DESTROY ($); # - envoie un BYE et clôt les connections #sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack # ou d'annuler une precedente association @@ -94,7 +98,7 @@ sub sendAppNameMsgs; # envoie une liste de messages precedes sub sendDirectMsgs; # envoie une liste de messages directs a une appli sub sendDieTo; # envoie un <<kill>> a une appli sub ping; # teste qu'une appli soit encore vivante -sub mainLoop (); # la mainloop locale (sans tk) +sub mainLoop ($); # la mainloop locale (sans tk) sub stop (); # methode de classe : on delete les bus, mais # on reste dans la mainloop sub exit (); # methode de classe : on delete tous les @@ -111,7 +115,8 @@ sub repeat ($$;$); # temps en millisecondes, callback sub afterCancel ($;$); # l'id d'un cancel ou d'un repeat sub afterResetTimer ($;$); # pour ré-armer un timer non-encore déclenché à sa valeur initiale sub fileEvent ($$;$); # associe un fd a un callback pour la mainloop locale - +sub getUuid ($); # rend un identifiant unique d'application sur le bus, utile + # pour faire des genres de rpc avec un abonnement desabonnement dynamique ################ PRIVEE #################################################### sub _getBonjour ($); # lit le (ou les) bonjour(s) sur le canal de supervision @@ -134,7 +139,7 @@ sub _sendLastRegexpToAllreadyConnected ($$) ; # envoie la derniere regexp sub _removeFileDescriptor ($$); # on vire un fd et les structures associees sub _sendErrorTo ($$$); #(fd, error) envoie un message d'erreur a un fd sub _sendDieTo ($$); #(fd) envoie un message de demande de suicide a un fd -sub _sendMsgTo ($$$); # (fd, message) +sub _sendMsgTo ($$\$); # (fd, message) sub _pong ($$); # (fd) sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk sub _scanAfter () ; # parse si il faut appeler un callback associe a un after @@ -153,6 +158,9 @@ sub _parseIvyBusParam ($); # prends une adresse de bus de la forme sub _substituteEscapedChar ($$); #permet de transformer une regexp etendue # 'perl' en regexp de base +sub _callCongestionCb ($$$); # appelle la callback de notification de congestion, + # si elle a été définie par l'utilisateur + ############################################################################# #### CONSTANTES ##### ############################################################################# @@ -177,6 +185,8 @@ use constant IVY_PROTOCOLE_VERSION => 3; use constant AFTER => 0; use constant REPEAT => 1; +use constant TK_MAINLOOP => 0; +use constant LOCAL_MAINLOOP => 1; use constant MAX_TIMOUT => 1000; # pour pouvoir employer les regexps perl. Attention lors de l'utilisation @@ -240,8 +250,17 @@ my $fileEventFunc; # dans le cas ou l'on soit dans une mainLoop # locale, cette var pointe une un objet -# de type IO::Select; -my $localLoopSel; +# de type IO::Select, qui est l'ensemble des file descriptor +# des sockets que l'on scrute en lecture (attente des messages) +my $localLoopSelRead; + +# dans le cas ou l'on soit dans une mainLoop +# locale, cette var pointe une un objet +# de type IO::Select qui est l'ensemble des file descriptor +# des sockets que l'on scrute en ecriture : les fd congestionnés +# des "slow agent" que l'on surveille pour ecrire dedans dès que +# possible (mode non bloquant) +my $localLoopSelWrite; # table d'ass. handle -> callback my %localBindByHandle; @@ -254,7 +273,7 @@ my $afterId = 0; # timeout le plus petit pour le select my $selectTimout = MAX_TIMOUT; - +my $loopMode; # liste des bus actifs my %allBuses = (); @@ -275,6 +294,8 @@ my %allBuses = (); use constant servPort => $constantIndexer++; use constant neededApp => $constantIndexer++; use constant statusFunc => $constantIndexer++; +use constant slowAgentFunc => $constantIndexer++; +use constant blockOnSlowAgent => $constantIndexer++; use constant supSock => $constantIndexer++; use constant connSock => $constantIndexer++; use constant sockList => $constantIndexer++; @@ -286,12 +307,15 @@ use constant topicRegexps => $constantIndexer++; use constant recCbList => $constantIndexer++; use constant directCbList => $constantIndexer++; use constant cnnxion => $constantIndexer++; -use constant buffByConn => $constantIndexer++; +use constant connectedUuid => $constantIndexer++; +use constant bufRecByCnnx => $constantIndexer++; +use constant bufEmiByCnnx => $constantIndexer++; use constant broadcastPort => $constantIndexer++; use constant broadcastBuses => $constantIndexer++; use constant useMulticast => $constantIndexer++; use constant appName => $constantIndexer++; use constant messWhenReady => $constantIndexer++; +use constant uuid => $constantIndexer++; ############################################################################# #### METHODES PUBLIQUES ##### @@ -303,6 +327,7 @@ sub init return; } + srand(); # initialisation du generateur aléatoire qui sert à produire un UUID my $class = shift if (@_ and $_[0] eq __PACKAGE__); my (%options) = @_; @@ -340,11 +365,13 @@ sub init # ps : ne pas faire d'exit dans le callback, # c'est le bus qui s'en charge - -pruneRegexp => [], - # optimisation : si l'on connait les sujets des messages - # qu'on envoie, on fournit la liste des sujets - # et les regexps qui ne matchent pas - # ces sujets sont eliminees. + + -filterRegexp => [],# nouvelle api cohérente avec ivy-c, c++, java + -pruneRegexp => [], # obsolete + # optimisation : si l'on connait les sujets des messages + # qu'on envoie, on fournit la liste des sujets + # et les regexps qui ne matchent pas + # ces sujets sont eliminees. ) ; # on examine toutes les options possibles @@ -367,26 +394,32 @@ sub init } } - my $loopMode = $options{-loopMode}; $ivyBus = $options{-ivyBus}; $appName = $options{-appName} ; $onDieFunc = $options{-onDieFunc} ; - @topicRegexps = @{$options{-pruneRegexp}}; + + if (scalar (@{$options{-pruneRegexp}})) { + carp "-pruneRegexp is *OBSOLETE*. -filterRegexp should be used instead\n"; + $options{-filterRegexp} = $options{-pruneRegexp} unless defined $options{-filterRegexp}; + } + + @topicRegexps = @{$options{-filterRegexp}}; $messWhenReady = $options{-messWhenReady} eq "_APP NAME READY" ? "$appName READY" : $options{-messWhenReady}; - if ($loopMode =~ /local/i) { + if ($options{-loopMode} =~ /local/i) { # mode boucle d'evenement locale use IO::Select; $fileEventFunc = \&fileEvent ; - $localLoopSel = IO::Select->new (); - - } elsif ($loopMode =~ /tk/i) { + $localLoopSelRead = IO::Select->new (); + $localLoopSelWrite = IO::Select->new (); + $loopMode = LOCAL_MAINLOOP; + } elsif ($options{-loopMode} =~ /tk/i) { # mode boucle d'evenement de TK $fileEventFunc = \&_tkFileEvent ; - + $loopMode = TK_MAINLOOP; } else { croak "Error in Ivy::init, argument loopMode must be either TK or LOCAL\n"; } @@ -421,6 +454,7 @@ sub new ($%) # - des qu'une appli se connecte # - lorsqu'une appli se deconnecte $self->[statusFunc] = ''; + $self->[slowAgentFunc] = ''; # callback prenant en param 1 refs sur une liste : # [ref sur fonction, parametres] @@ -471,12 +505,21 @@ sub new ($%) # pas sur un service en cas de bonjours repetes # valeur : nom de l'application $self->[cnnxion] = {}; + $self->[connectedUuid] = {}; # tableau associatif, clef => file desc, # valeur :buffer au cas ou la lecture ne se termine # pas par \n, de maniere a resegmenter les messages - $self->[buffByConn] = {}; + $self->[bufRecByCnnx] = {}; + + # tableau associatif, clef => file desc, + # valeur :buffer au cas ou l'écriture bloque sur un fd + # pour eviter de bloquer la mainloop sur un send on bufferise + # les données dans le process + $self->[bufEmiByCnnx] = {}; + # identifiant unique + $self->[uuid] = sprintf ("%d%d", time(), rand()*1e15); my %optionsAndDefaults = ( -appName => $appName, @@ -506,7 +549,7 @@ sub new ($%) # toutes les applis necessaires ne sont pas presentes, # et des que toutes les applis necessaires sont # presentes, et si une appli necessaire se deconnecte - # les trois parametres passes sont : + # les trois parametres passes à la callback sont : # [liste des applis presentes], # [liste des applis absentes], # [table de hash, clefs = applis presentes, @@ -515,11 +558,30 @@ sub new ($%) # ca veut dire que plus d'une appli de meme nom # tourne sur le meme bus : danger !! - -pruneRegexp => [@topicRegexps], - # optimisation : si l'on connait les sujets des messages - # qu'on envoie, on fournit la liste des sujets - # et les regexps qui ne matchent pas - # ces sujets sont eliminees. + -blockOnSlowAgent => 1, + # comportement lorque un ou plusieurs des agents connectés + # se consomme suffisement rapidement ses messages. Ou bien on laisse + # le send bloquer, ou bien on accumule les messages localement + # en attendant que l'agent congestionné soit capable de les traiter. + # cette méthode a l'avantage de ne pas bloquer la mainloop locale, + # par contre la consomation mémoire peut devenir problématique. + + -slowAgentFunc => sub {}, + # fonction de callBack qui sera appelee si l'envoi de messages + # à un agent n'est plus possible parce que l'agent en question ne + # consomme pas ses messages assez vite, ou si un agent qui etait + # dans cette etat retrouve sa capacité à lire les messages. + # les paramètres passés à la callback sont + # nom de l'appli, + # adresse + # etat (congestion = 1, decongestion = 0) + + -filterRegexp => [@topicRegexps], + -pruneRegexp => [], # obsolete + # optimisation : si l'on connait les sujets des messages + # qu'on envoie, on fournit la liste des sujets + # et les regexps qui ne matchent pas + # ces sujets sont eliminees. ) ; @@ -536,6 +598,11 @@ sub new ($%) } } + if (scalar (@{$options{-pruneRegexp}})) { + carp "-pruneRegexp is *OBSOLETE*. -filterRegexp should be used instead\n"; + $options{-filterRegexp} = $options{-pruneRegexp} unless defined $options{-filterRegexp}; + } + # on examine toutes les options fournies, pour detecter les inutiles foreach my $opt (keys %options) { unless (exists ($optionsAndDefaults{$opt})) { @@ -543,12 +610,14 @@ sub new ($%) } } - $self->[appName] = $options{-appName} ; $self->[messWhenReady] = $options{-messWhenReady} ; - @{$self->[neededApp]} = @{$options{-neededApp}} ; - $self->[statusFunc] = $options{-statusFunc} ; - $self->[topicRegexps] = $options{-pruneRegexp} ; + @{$self->[neededApp]} = @{$options{-neededApp}} ; + $self->[statusFunc] = $options{-statusFunc} ; + $self->[slowAgentFunc] = $options{-slowAgentFunc} ; + $self->[blockOnSlowAgent] = $options{-blockOnSlowAgent} ; + + $self->[topicRegexps] = $options{-filterRegexp} ; $allBuses{$self} = $self; ($self->[useMulticast], $self->[broadcastPort], $self->[broadcastBuses]) = @@ -606,12 +675,10 @@ sub stop () sub exit () { Ivy::stop (); - if (defined $localLoopSel) { - # boucle locale, on sait faire - # printf ("DBG> undefining localLoopSel\n"); - undef $localLoopSel; - } - else { + if (defined $localLoopSelRead) { + undef $localLoopSelRead ; + undef $localLoopSelWrite ; + } else { Tk::exit (); } } # end exit @@ -650,7 +717,8 @@ sub start $self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004"; # le message de bonjour à envoyer: "no de version no de port" - my $bonjourMsg = sprintf ("%d %d\n", IVY_PROTOCOLE_VERSION, $connSock->sockport()); + my $bonjourMsg = sprintf ("%d %d %s %s\n", IVY_PROTOCOLE_VERSION, $connSock->sockport(), + $self->[appName], $self->[uuid]); if (!$self->[useMulticast]) { # cree la socket de broadcast @@ -777,58 +845,69 @@ sub bindDirect } } # end bindDirect + ############### PROCEDURE SEND MSGS sub sendMsgs { my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy; - my @msgs = @_; + my $appSock; my $total = 0; # pour tous les messages - foreach my $msg (@msgs) { - carp "Warning in Ivy::sendMsgs, a message contains a '\\n'. You should correct it:\n'$msg'" if ($msg =~ /\n/) ; - - study ($msg); - - # pour routes les connections - foreach my $fd (keys %{$self->[sockList]}) { - - # pour toutes les fonctions de filtrage de regexp - foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { - $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; - } - } + foreach my $msg (@_) { + study ($msg); + carp "Warning in Ivy::sendMsgs, a message contains a '\\n'. " . + "You should correct it:\n'$msg'" if ($msg =~ /\n/) ; + + # pour routes les connections + foreach $appSock (values %{$self->[sockList]}) { + # pour toutes les fonctions de filtrage de regexp + $total += _sendMsgTo ($self, $appSock, $msg); + } } # print "DBG> sended $total times\n"; return $total; } # end sendMsgs + + + + + + ############### PROCEDURE SEND MSGS sub sendAppNameMsgs { my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy; - my @msgs = @_; - my $total = 0; + return $self->sendMsgs (map ("$self->[appName] $_", @_)); +} - # pour tous les messages - foreach (@msgs) { - carp "Warning in Ivy::sendAppNameMsgs, a message contains a '\\n'. Skipping it:\n'$_'" if ($_ =~ /\n/); - my $msg = "$self->[appName] $_"; - study ($msg); +# sub sendAppNameMsgs +# { +# my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy; +# my @msgs = @_; +# my $total = 0; - # pour toutes les connections - foreach my $fd (keys %{$self->[sockList]}) { +# # pour tous les messages +# foreach (@msgs) { +# carp "Warning in Ivy::sendAppNameMsgs, a message contains a '\\n'. Skipping it:\n'$_'" if ($_ =~ /\n/); - # pour toutes les fonctions de filtrage de regexp - foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { - $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; - } - } - } - # print "DBG> sended $total times\n"; - return $total; -} # end sendAppNameMsgs +# my $msg = "$self->[appName] $_"; +# study ($msg); + +# # pour toutes les connections +# foreach my $fd (keys %{$self->[sockList]}) { + +# # pour toutes les fonctions de filtrage de regexp +# foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { +# $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; +# } +# } +# } +# # print "DBG> sended $total times\n"; +# return $total; +# } # end sendAppNameMsgs @@ -909,18 +988,25 @@ sub ping } # end ping ############### METHODE MAINLOOP -sub mainLoop () +sub mainLoop ($) { - croak "Error in Ivy::mainLoop, Ivy should have been initialised with LOCAL loop mode \n" - unless defined $localLoopSel; + my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy; + my ($fd, @selRes, @allDesc); - my ($fd, @ready, @allDesc); + if ($loopMode == TK_MAINLOOP) { + eval {Tk::MainLoop ()}; + return; + } + + croak "Error in Ivy::mainLoop, Ivy not properly initialized\n" + unless defined $localLoopSelRead; - while (defined $localLoopSel) { - @ready = IO::Select::can_read ($localLoopSel, $selectTimout) ; + while (defined $localLoopSelRead) { +# READ + @selRes = IO::Select::select ($localLoopSelRead, $localLoopSelWrite, undef, $selectTimout) ; _scanAfter () ; - foreach my $fd (@ready) { + foreach $fd (@{$selRes[0]}) { if (ref $localBindByHandle{$fd} eq 'CODE') { &{$localBindByHandle{$fd}} ; } @@ -929,6 +1015,25 @@ sub mainLoop () &$cb (@arg) } } + +#WRITE + my $bufEmiRef; + my $sent; + + foreach $fd (@{$selRes[1]}) { + $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd}); + $sent = send ($fd, $$bufEmiRef, 0); + unless (defined $sent) { + # y a rien à faire + } elsif ($sent == length ($$bufEmiRef)) { + $$bufEmiRef = ""; + _callCongestionCb ($self, $fd, 0); + } elsif ($sent >= 0) { + substr ($$bufEmiRef, 0, $sent, ''); + } else { + $self->_removeFileDescriptor ($fd) ; + } + } } } # end mainLoop @@ -1026,22 +1131,30 @@ sub fileEvent ($$;$) my ($fd, $cb) = @_; - unless (defined $localLoopSel) { + unless (defined $localLoopSelRead) { croak ("Error in Ivy::fileEvent, Ivy should have been initialised in LOCAL loop mode\n"); } if ($cb) { # adding the handler $localBindByHandle{$fd} = $cb; - $localLoopSel->add ($fd); + $localLoopSelRead->add ($fd); } else { # deleting the handler delete $localBindByHandle{$fd}; # print ("DBG> Ivy::fileEvent : removing fd from the select\n"); - $localLoopSel->remove ($fd); + $localLoopSelRead->remove ($fd); } } # end fileEvent + + + +sub getUuid ($) +{ + my $self = shift; + return $self->[uuid]; +} ############################################################################# #### METHODES PRIVEES ##### ############################################################################# @@ -1067,7 +1180,13 @@ sub _getBonjour ($) my $peerName = gethostbyaddr ($addr, AF_INET) || inet_ntoa($addr); # on force $peerPort a etre vu comme une valeur numerique - my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/; + my ($version, $peerPort, $udpAppName, $uuid) = +# $bonjourMsg =~ /^(\d+)\s+(\d+)\s+(?:(\w+)\s+(.*))?/; + $bonjourMsg =~ /^(\d+)\s+(\d+)(?:\s+(\S+)\s+(.*))?\n/; + + $udpAppName = 1 unless defined $udpAppName; +# printf ("DEBUG : bonjourMsg = '$bonjourMsg'\n"); +# printf ("DEBUG : reception de $peerName : bonjour $peerPort uuid = $uuid\n"); unless (defined ($version) && defined ($peerPort)) { carp "Warning in Ivy::_getBonjour, ill-formed Hello message \"$bonjourMsg\"" ; @@ -1085,10 +1204,18 @@ sub _getBonjour ($) # on verifie qu'on ne se repond pas et qu'on ne # se reconnecte pas a un process deja connecte if (exists ($self->[cnnxion]{"$addr:$peerPort"})) { - #print "DBG> bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ; +# print "DBG> bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ; return ; + } elsif ((defined $uuid) && ($uuid eq $self->[uuid])) { +# print "DBG> bonjour de $peerName:$peerPort : $uuid c'est MOI\n" ; + return; + } elsif ((defined $uuid) && (exists ($self->[connectedUuid]->{$uuid}))) { +# print "DBG> bonjour de $peerName:$peerPort:$uuid DEJA CONNECTE\n" ; } else { - #print "DBG> reception de $peerName : bonjour $peerPort\n" ; +# print "DBG> reception de $peerName : bonjour $udpAppName:$peerPort" ; +# print " uuid=$uuid" if (defined $uuid); +# print ("\n"); + $self->[connectedUuid]->{$uuid} = 1 if (defined $uuid); } # on verifie que l'adresse fasse partie de l'ensemble de reseau @@ -1119,11 +1246,16 @@ sub _getBonjour ($) Proto => 'tcp'); if ($appSock) { + my $flags = fcntl($appSock, F_GETFL, 0); + $flags = fcntl($appSock, F_SETFL, $flags | O_NONBLOCK) + or die "Can't set flags for the socket: $!\n"; + # on cree une entree pour $appSock dans la liste des regexp - $self->[cnnxion]{"$addr:$peerPort"} = 1; + $self->[cnnxion]{"$addr:$peerPort"} = $udpAppName; $self->[sendRegList]{$appSock} = []; $self->[sendRegListSrc]{$appSock} = []; - $self->[buffByConn]{$appSock} = ''; + $self->[bufRecByCnnx]{$appSock} = ''; + $self->[bufEmiByCnnx]{$appSock} = ''; $self->[sockList]{$appSock} = $appSock; &$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ; @@ -1143,6 +1275,11 @@ sub _getConnections ($) my $self = shift; my $appSock = $self->[connSock]->accept(); + if ($appSock) { + my $flags = fcntl($appSock, F_GETFL, 0); + $flags = fcntl($appSock, F_SETFL, $flags | O_NONBLOCK) + or die "Can't set flags for the socket: $!\n"; + } unless (defined $appSock) { carp "Warning in Ivy::_getConnections, \$appSock not defined"; @@ -1160,7 +1297,8 @@ sub _getConnections ($) # on cree une entree pour $appSock dans la liste des regexp $self->[sendRegList]{$appSock} = []; $self->[sendRegListSrc]{$appSock} = []; - $self->[buffByConn]{$appSock} = ''; + $self->[bufRecByCnnx]{$appSock} = ''; + $self->[bufEmiByCnnx]{$appSock} = ''; $self->[sockList]{$appSock} = $appSock; # on balance les regexps qui nous interessent a l'appli distante $self->_sendWantedRegexp ($appSock); @@ -1184,7 +1322,7 @@ sub _getMessages ($$) # Bon la il faudra un jour clarifier ce bordel, lister toutes # les facons dont un couple d'applis connectee peuevent sortir et # eviter les dead lock qui doivent subsister. - if (defined ($localLoopSel)) { + if (defined ($localLoopSelRead)) { $self->_removeFileDescriptor ($appSock); } else { @@ -1194,12 +1332,12 @@ sub _getMessages ($$) } - if (length ($self->[buffByConn]{$appSock})) { - $buffer = $self->[buffByConn]{$appSock} . $buffer ; - $self->[buffByConn]{$appSock} = ''; + if (length ($self->[bufRecByCnnx]{$appSock})) { + $buffer = $self->[bufRecByCnnx]{$appSock} . $buffer ; + $self->[bufRecByCnnx]{$appSock} = ''; } my @messages = split ('\n', $buffer) ; - $self->[buffByConn]{$appSock} = pop (@messages) unless + $self->[bufRecByCnnx]{$appSock} = pop (@messages) unless ($buffer =~ /\n$/) ; # if (defined $appSock->peername) { @@ -1263,22 +1401,33 @@ sub _getMessages ($$) # si l'id de regexp n'etait pas utilisee c'est tout bon # on affecte la nouvelle regexp a un id $self->[sendRegListSrc]{$appSock}->[$id] = $valeurs; - $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_'; - sub { - use strict; - if (my @args = ${$_[0]} =~ /($valeurs)/io) { - shift @args; - $args[$#args] .= "\003" if @args; - send ($appSock, sprintf (MSG_FMT, - MSG, $id, join ("\003",@args)), 0) - or $self->_removeFileDescriptor ($appSock) ; - #print join (' ', "DBG> J'envoie MSG", $id, @args, "\n"); - return 1; - } - }; -_EOL_ - } - else { + $self->[sendRegList]{$appSock}->[$id] = + eval ('sub {@{$_[1]} = ${$_[0]} =~ /($valeurs)/io;}'); + + +# $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_'; +# sub { +# use strict; +# if (my @args = ${$_[0]} =~ /($valeurs)/io) { +# shift @args; +# $args[$#args] .= "\003" if @args; +# $self->[bufEmiByCnnx]->{$appSock} .= +# sprintf (MSG_FMT, MSG, $id, join ("\003",@args)); +# my $sent = send ($appSock, $self->[bufEmiByCnnx]->{$appSock}, 0); +# unless (defined $sent) { +# # y a rien à faire +# } elsif ($sent == length ($self->[bufEmiByCnnx]->{$appSock})) { +# $self->[bufEmiByCnnx]->{$appSock} = ""; +# } elsif ($sent >= 0) { +# substr ($self->[bufEmiByCnnx]->{$appSock}, 0, $sent, ''); +# } else { +# $self->_removeFileDescriptor ($appSock) ; +# } +# } +# return 1; +# } +# _EOL_ + } else { # l'id de la regexp etait deja utilise, # et n'a pas ete libere par un message delregexp, # on renvoie donc un message d'erreur @@ -1304,7 +1453,7 @@ _EOL_ elsif ($type == ENDREGEXP) { # E N D R E G E X P # on envoie le message ready uniquement a celui qui nous # a envoye le message endregexp - $self->_sendMsgTo ($appSock, $self->[messWhenReady]); + $self->_sendMsgTo ($appSock, \$self->[messWhenReady]); # on passe de l'etat Connecte a l'etat Ready $self->[cnnxion]{"$addr:$peerPort"} =~ s/^\004//; @@ -1428,8 +1577,9 @@ sub _inetAdrByName ($$) { keys %{$self->[cnnxion]}))[0]; return ("unknow") unless defined $addrInet; - my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/; + my ($port) = $addrInet =~ /:(.*)/; + my $addr = substr ($addrInet,0,4); my $host = (gethostbyaddr ($addr, AF_INET))[0] || inet_ntoa($addr); return "$host:$port"; } # end _inetAdrByName @@ -1453,7 +1603,8 @@ sub _removeFileDescriptor ($$) &$fileEventFunc ($fd, '') ; delete $self->[sendRegList]{$fd}; delete $self->[sockList]{$fd}; - delete $self->[buffByConn]->{$fd}; + delete $self->[bufRecByCnnx]->{$fd}; + delete $self->[bufEmiByCnnx]->{$fd}; $fd->close(); @@ -1530,14 +1681,51 @@ sub _sendDieTo ($$) ############### METHODE SEND MSG TO -sub _sendMsgTo ($$$) +sub _sendMsgTo ($$\$) { my ($self, $fd, $msg) = @_; + my $id = -1; + my $total = 0; + my @args = (); # tableau passé en reference aux fonctions + # anonymes compilées par eval pour receuillir les arguments filtrés + # par les regexp à envoyer sur le tuyau + my $sent; + my $regexpFunc; + my $bufEmiRef; # pour toutes les fonctions de filtrage de regexp - foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) { - &{$regexpFunc} (\$msg) if defined $regexpFunc; + foreach $regexpFunc (@{$self->[sendRegList]{$fd}}) { + $id++; + # $total += &{$regexpFunc} (\$msg) if defined $regexpFunc; + next unless defined $regexpFunc; + next unless &{$regexpFunc} ($msg, \@args); + next unless @args; + $total ++; + shift @args; + $args[$#args] .= "\003" if @args; + $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd}); + my $enCongestion = $$bufEmiRef ? 1 : 0; + $$bufEmiRef .= sprintf (MSG_FMT, MSG, $id, join ("\003",@args)); + next if $enCongestion; + $sent = send ($fd, $$bufEmiRef, 0); + unless (defined $sent) { + # y a rien à faire + } elsif ($sent == length ($$bufEmiRef)) { + $$bufEmiRef = ""; + } elsif ($sent >= 0) { + substr ($$bufEmiRef, 0, $sent, ''); + _callCongestionCb ($self, $fd, 1); + if ($self->[blockOnSlowAgent]) { + my $win = ''; + vec($win, fileno ($fd), 1) = 1; + select (undef, $win, undef, undef); + } + } else { + $self->_removeFileDescriptor ($fd) ; + } } + + return ($total); } # end _sendMsgTo @@ -1764,6 +1952,61 @@ sub _substituteEscapedChar ($$) return $reg; } # end _substituteEscapedChar +############# Procedure __CALL CONGESTION CALLBACK +sub _callCongestionCb ($$$) +{ + my ($self, $fd, $congestion) = @_; + my $appName; + my $addrInet; + + EXT_LOOP: + foreach my $name (keys %{$self->[appliList]}) { + foreach my $fdp (@{$self->[appliList]{$name}}) { + if ($fd == $fdp) { + $appName = $name; + last EXT_LOOP; + } + } + } + + if ($loopMode == LOCAL_MAINLOOP) { + if ($congestion) { + $localLoopSelWrite->add ($fd); + } else { + $localLoopSelWrite->remove ($fd); + } + } else { + if ($congestion) { + Tk::fileevent ('', $fd, 'writable', + sub { + my $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd}); + my $sent = send ($fd, $$bufEmiRef, 0); + unless (defined $sent) { + # y a rien à faire + } elsif ($sent == length ($$bufEmiRef)) { + $$bufEmiRef = ""; + _callCongestionCb ($self, $fd, 0); + } elsif ($sent >= 0) { + substr ($$bufEmiRef, 0, $sent, ''); + } else { + $self->_removeFileDescriptor ($fd) ; + } + }); + } else { + Tk::fileevent ('', $fd, 'writable', ''); + } + } + + if (defined $appName) { + $addrInet = $self->_inetAdrByName ($appName); + } else { + $appName = 'NONAME'; $addrInet = 'undef'; + } + + &{$self->[slowAgentFunc]} ($appName, $addrInet, $congestion); +} + + 1; __END__ @@ -1847,15 +2090,17 @@ The prototype of your method must be as follows: ... } -=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']> +=item B<-filterRegexp =E<gt> ['subject 1', ..., 'subject n']> Optimize communication using this option. Regexps which don't match these subjects are removed. - Example : - Ivy->init(-loopMode =E<gt> 'TK', - -appName =E<gt> 'MyWonderfulApp', - -onDieFunc =E<gt> [\&restorecontext]); +=item B<Example:> + + Ivy->init(-loopMode => 'TK', + -appName => 'MyWonderfulApp', + -onDieFunc => [\&restorecontext] , + -filterRegexp => ['MyWonderfulApp', 'ClockStart', 'ClockStop']); =back @@ -1906,7 +2151,7 @@ The prototype of your method must be as follows: ... } -=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']> +=item B<-filterRegexp =E<gt> ['subject 1', ..., 'subject n']> Optimize communication using this option. Regexps which don't match these subjects are removed. @@ -1917,7 +2162,22 @@ before running. =item B<-statusFunc =E<gt> sub {}> -A callback which is called every time an agent C connects on the bus, disconnects from the bus, subscribes to a regexp, or unsubscribes to a regexp. When the agent A is stopping, this function is also called inside the agent A for every other agents Ci on the bus, as they are disconnecting. The first 3 parameters are a reference to an array of connected agents Ci, a reference to an array of not connected agents (according to the "-neededApp" argument of the new method / function), a reference to a hash table of connected agents Ci (giving the number of each agent). These 3 parameters are maintained for upwards compatibility but should no more be used, since the following three parameters are much easier to use: the name of an appearing / disapearing or subscribing / unsubscribing agent C, its status either "new" or "died" or "subscribing" or "unsubscribing", and the hostname where this agent C is running / dying OR the subscribed / unsubscribed regexp. If the hostname of this agent C is not known, it will be replaced by its IP address. +A callback which is called every time an agent C connects on the bus, +disconnects from the bus, subscribes to a regexp, or unsubscribes to a +regexp. When the agent A is stopping, this function is also called +inside the agent A for every other agents C on the bus, as they are +disconnecting. The first 3 parameters are a reference to an array of +connected agents Ci, a reference to an array of not connected agents +(according to the "-neededApp" argument of the new method / function), +a reference to a hash table of connected agents Ci (giving the number +of each agent). These 3 parameters are maintained for upwards +compatibility but should no more be used, since the following three +parameters are much easier to use: the name of an appearing / +disapearing or subscribing / unsubscribing agent C, its status either +"new" or "died" or "subscribing" or "unsubscribing", and the hostname +where this agent C is running / dying OR the subscribed / unsubscribed +regexp. If the hostname of this agent C is not known, it will be +replaced by its IP address. @@ -1950,21 +2210,60 @@ Your callback could be: } -Example: + + +=item B<-blockOnSlowAgent =E<gt> 0 or 1> + +Behavior when the bus is being congested due to an ivy agent which +doesn't read messages sufficiently quickly. In blocking mode the local +app will block on a send, so it won't be interactive until the send +return, and it will at his turn don't read his pending message, +leading to a global sluggishness of the entire ivy bus. In non +blocking mode the messages are stocked until they could be sent, so +the problem is the uncontrolled memory consumption. + + +=item B<-slowAgentFunc=E<gt> \&congestionFunc > + + A callback which is called every time a congestion event occurs. A + congestion event is emitted each time an agent is being congested, + or, after being congested is able to read his messages again. The + parameters are the name of the app, his address (hostname+port + number), and the state, 1 for congested, 0 for able to read. + + Your callback could be: + +sub congestionFunc ($$$) +{ + my ($name, $addr, $state) = @_; + printf ("$name [$addr] %s\n", $state ? "CONGESTION" : "OK"); +} + + + +=item B<Example:> Ivy->new(-ivyBus => '156,157:2204', -onDieFunc => [\&restorecontext], -neededApp => ["DataServer", "HMI"], + -slowAgentFunc=> \&congestionFunc, + -blockOnSlowAgent => 1, -statusFunc => \&MyCallback); =back + + + + + =item B<mainLoop> Ivy->mainLoop; Ivy::mainLoop; + $ivyobj->mainLoop; -Local main events loop. Use it if you don't use the Tk library. + main events loop, call local mainloop or Tk::MainLoop according so specified mode =item B<stop> |