summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Ivy.pm553
1 files changed, 426 insertions, 127 deletions
diff --git a/Ivy.pm b/Ivy.pm
index 1a0ab4c..ba3a043 100644
--- a/Ivy.pm
+++ b/Ivy.pm
@@ -1,13 +1,13 @@
#
# Ivy, Perl interface
#
-# Copyright 1997-2003
+# Copyright 1997-2006
# Centre d'Études de la Navigation Aérienne
#
-# Authors: Alexandre Bustico <bustico@cena.fr>
-# Stéphane Chatty <chatty@cena.fr>
-# Hervé Damiano <damiano@cena.fr>
-# Christophe Mertz <mertz@cena.fr>
+# Authors: Alexandre Bustico <alexandre.bustico@cena.fr>
+# Stéphane Chatty <chatty@intuilab.com>
+# Hervé Damiano <herve.damiano@aviation-civile.gouv.fr>
+# Christophe Mertz <mertz@intuilab.com>
#
# All functions
#
@@ -30,6 +30,9 @@
#
##################################################################
+# TODO
+#
+
package Ivy ;
use Sys::Hostname;
@@ -40,6 +43,7 @@ use Carp;
use IO::Socket::Multicast;
use vars qw($VERSION);
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
# to compute the VERSION from the CVS tag (or if no tag, as the cvs file revision)
my $TAG= q$Name$;
@@ -80,7 +84,7 @@ sub start; # debut de l'integration au bus :
# getConnections
# pour etablir les connections "application"
-sub DESTROY ($); # - envoie un BYE et clot les connections
+sub DESTROY ($); # - envoie un BYE et clôt les connections
#sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack
# ou d'annuler une precedente association
@@ -94,7 +98,7 @@ sub sendAppNameMsgs; # envoie une liste de messages precedes
sub sendDirectMsgs; # envoie une liste de messages directs a une appli
sub sendDieTo; # envoie un <<kill>> a une appli
sub ping; # teste qu'une appli soit encore vivante
-sub mainLoop (); # la mainloop locale (sans tk)
+sub mainLoop ($); # la mainloop locale (sans tk)
sub stop (); # methode de classe : on delete les bus, mais
# on reste dans la mainloop
sub exit (); # methode de classe : on delete tous les
@@ -111,7 +115,8 @@ sub repeat ($$;$); # temps en millisecondes, callback
sub afterCancel ($;$); # l'id d'un cancel ou d'un repeat
sub afterResetTimer ($;$); # pour ré-armer un timer non-encore déclenché à sa valeur initiale
sub fileEvent ($$;$); # associe un fd a un callback pour la mainloop locale
-
+sub getUuid ($); # rend un identifiant unique d'application sur le bus, utile
+ # pour faire des genres de rpc avec un abonnement desabonnement dynamique
################ PRIVEE ####################################################
sub _getBonjour ($); # lit le (ou les) bonjour(s) sur le canal de supervision
@@ -134,7 +139,7 @@ sub _sendLastRegexpToAllreadyConnected ($$) ; # envoie la derniere regexp
sub _removeFileDescriptor ($$); # on vire un fd et les structures associees
sub _sendErrorTo ($$$); #(fd, error) envoie un message d'erreur a un fd
sub _sendDieTo ($$); #(fd) envoie un message de demande de suicide a un fd
-sub _sendMsgTo ($$$); # (fd, message)
+sub _sendMsgTo ($$\$); # (fd, message)
sub _pong ($$); # (fd)
sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk
sub _scanAfter () ; # parse si il faut appeler un callback associe a un after
@@ -153,6 +158,9 @@ sub _parseIvyBusParam ($); # prends une adresse de bus de la forme
sub _substituteEscapedChar ($$); #permet de transformer une regexp etendue
# 'perl' en regexp de base
+sub _callCongestionCb ($$$); # appelle la callback de notification de congestion,
+ # si elle a été définie par l'utilisateur
+
#############################################################################
#### CONSTANTES #####
#############################################################################
@@ -177,6 +185,8 @@ use constant IVY_PROTOCOLE_VERSION => 3;
use constant AFTER => 0;
use constant REPEAT => 1;
+use constant TK_MAINLOOP => 0;
+use constant LOCAL_MAINLOOP => 1;
use constant MAX_TIMOUT => 1000;
# pour pouvoir employer les regexps perl. Attention lors de l'utilisation
@@ -240,8 +250,17 @@ my $fileEventFunc;
# dans le cas ou l'on soit dans une mainLoop
# locale, cette var pointe une un objet
-# de type IO::Select;
-my $localLoopSel;
+# de type IO::Select, qui est l'ensemble des file descriptor
+# des sockets que l'on scrute en lecture (attente des messages)
+my $localLoopSelRead;
+
+# dans le cas ou l'on soit dans une mainLoop
+# locale, cette var pointe une un objet
+# de type IO::Select qui est l'ensemble des file descriptor
+# des sockets que l'on scrute en ecriture : les fd congestionnés
+# des "slow agent" que l'on surveille pour ecrire dedans dès que
+# possible (mode non bloquant)
+my $localLoopSelWrite;
# table d'ass. handle -> callback
my %localBindByHandle;
@@ -254,7 +273,7 @@ my $afterId = 0;
# timeout le plus petit pour le select
my $selectTimout = MAX_TIMOUT;
-
+my $loopMode;
# liste des bus actifs
my %allBuses = ();
@@ -275,6 +294,8 @@ my %allBuses = ();
use constant servPort => $constantIndexer++;
use constant neededApp => $constantIndexer++;
use constant statusFunc => $constantIndexer++;
+use constant slowAgentFunc => $constantIndexer++;
+use constant blockOnSlowAgent => $constantIndexer++;
use constant supSock => $constantIndexer++;
use constant connSock => $constantIndexer++;
use constant sockList => $constantIndexer++;
@@ -286,12 +307,15 @@ use constant topicRegexps => $constantIndexer++;
use constant recCbList => $constantIndexer++;
use constant directCbList => $constantIndexer++;
use constant cnnxion => $constantIndexer++;
-use constant buffByConn => $constantIndexer++;
+use constant connectedUuid => $constantIndexer++;
+use constant bufRecByCnnx => $constantIndexer++;
+use constant bufEmiByCnnx => $constantIndexer++;
use constant broadcastPort => $constantIndexer++;
use constant broadcastBuses => $constantIndexer++;
use constant useMulticast => $constantIndexer++;
use constant appName => $constantIndexer++;
use constant messWhenReady => $constantIndexer++;
+use constant uuid => $constantIndexer++;
#############################################################################
#### METHODES PUBLIQUES #####
@@ -303,6 +327,7 @@ sub init
return;
}
+ srand(); # initialisation du generateur aléatoire qui sert à produire un UUID
my $class = shift if (@_ and $_[0] eq __PACKAGE__);
my (%options) = @_;
@@ -340,11 +365,13 @@ sub init
# ps : ne pas faire d'exit dans le callback,
# c'est le bus qui s'en charge
- -pruneRegexp => [],
- # optimisation : si l'on connait les sujets des messages
- # qu'on envoie, on fournit la liste des sujets
- # et les regexps qui ne matchent pas
- # ces sujets sont eliminees.
+
+ -filterRegexp => [],# nouvelle api cohérente avec ivy-c, c++, java
+ -pruneRegexp => [], # obsolete
+ # optimisation : si l'on connait les sujets des messages
+ # qu'on envoie, on fournit la liste des sujets
+ # et les regexps qui ne matchent pas
+ # ces sujets sont eliminees.
) ;
# on examine toutes les options possibles
@@ -367,26 +394,32 @@ sub init
}
}
- my $loopMode = $options{-loopMode};
$ivyBus = $options{-ivyBus};
$appName = $options{-appName} ;
$onDieFunc = $options{-onDieFunc} ;
- @topicRegexps = @{$options{-pruneRegexp}};
+
+ if (scalar (@{$options{-pruneRegexp}})) {
+ carp "-pruneRegexp is *OBSOLETE*. -filterRegexp should be used instead\n";
+ $options{-filterRegexp} = $options{-pruneRegexp} unless defined $options{-filterRegexp};
+ }
+
+ @topicRegexps = @{$options{-filterRegexp}};
$messWhenReady = $options{-messWhenReady} eq "_APP NAME READY" ?
"$appName READY" :
$options{-messWhenReady};
- if ($loopMode =~ /local/i) {
+ if ($options{-loopMode} =~ /local/i) {
# mode boucle d'evenement locale
use IO::Select;
$fileEventFunc = \&fileEvent ;
- $localLoopSel = IO::Select->new ();
-
- } elsif ($loopMode =~ /tk/i) {
+ $localLoopSelRead = IO::Select->new ();
+ $localLoopSelWrite = IO::Select->new ();
+ $loopMode = LOCAL_MAINLOOP;
+ } elsif ($options{-loopMode} =~ /tk/i) {
# mode boucle d'evenement de TK
$fileEventFunc = \&_tkFileEvent ;
-
+ $loopMode = TK_MAINLOOP;
} else {
croak "Error in Ivy::init, argument loopMode must be either TK or LOCAL\n";
}
@@ -421,6 +454,7 @@ sub new ($%)
# - des qu'une appli se connecte
# - lorsqu'une appli se deconnecte
$self->[statusFunc] = '';
+ $self->[slowAgentFunc] = '';
# callback prenant en param 1 refs sur une liste :
# [ref sur fonction, parametres]
@@ -471,12 +505,21 @@ sub new ($%)
# pas sur un service en cas de bonjours repetes
# valeur : nom de l'application
$self->[cnnxion] = {};
+ $self->[connectedUuid] = {};
# tableau associatif, clef => file desc,
# valeur :buffer au cas ou la lecture ne se termine
# pas par \n, de maniere a resegmenter les messages
- $self->[buffByConn] = {};
+ $self->[bufRecByCnnx] = {};
+
+ # tableau associatif, clef => file desc,
+ # valeur :buffer au cas ou l'écriture bloque sur un fd
+ # pour eviter de bloquer la mainloop sur un send on bufferise
+ # les données dans le process
+ $self->[bufEmiByCnnx] = {};
+ # identifiant unique
+ $self->[uuid] = sprintf ("%d%d", time(), rand()*1e15);
my %optionsAndDefaults = (
-appName => $appName,
@@ -506,7 +549,7 @@ sub new ($%)
# toutes les applis necessaires ne sont pas presentes,
# et des que toutes les applis necessaires sont
# presentes, et si une appli necessaire se deconnecte
- # les trois parametres passes sont :
+ # les trois parametres passes à la callback sont :
# [liste des applis presentes],
# [liste des applis absentes],
# [table de hash, clefs = applis presentes,
@@ -515,11 +558,30 @@ sub new ($%)
# ca veut dire que plus d'une appli de meme nom
# tourne sur le meme bus : danger !!
- -pruneRegexp => [@topicRegexps],
- # optimisation : si l'on connait les sujets des messages
- # qu'on envoie, on fournit la liste des sujets
- # et les regexps qui ne matchent pas
- # ces sujets sont eliminees.
+ -blockOnSlowAgent => 1,
+ # comportement lorque un ou plusieurs des agents connectés
+ # se consomme suffisement rapidement ses messages. Ou bien on laisse
+ # le send bloquer, ou bien on accumule les messages localement
+ # en attendant que l'agent congestionné soit capable de les traiter.
+ # cette méthode a l'avantage de ne pas bloquer la mainloop locale,
+ # par contre la consomation mémoire peut devenir problématique.
+
+ -slowAgentFunc => sub {},
+ # fonction de callBack qui sera appelee si l'envoi de messages
+ # à un agent n'est plus possible parce que l'agent en question ne
+ # consomme pas ses messages assez vite, ou si un agent qui etait
+ # dans cette etat retrouve sa capacité à lire les messages.
+ # les paramètres passés à la callback sont
+ # nom de l'appli,
+ # adresse
+ # etat (congestion = 1, decongestion = 0)
+
+ -filterRegexp => [@topicRegexps],
+ -pruneRegexp => [], # obsolete
+ # optimisation : si l'on connait les sujets des messages
+ # qu'on envoie, on fournit la liste des sujets
+ # et les regexps qui ne matchent pas
+ # ces sujets sont eliminees.
) ;
@@ -536,6 +598,11 @@ sub new ($%)
}
}
+ if (scalar (@{$options{-pruneRegexp}})) {
+ carp "-pruneRegexp is *OBSOLETE*. -filterRegexp should be used instead\n";
+ $options{-filterRegexp} = $options{-pruneRegexp} unless defined $options{-filterRegexp};
+ }
+
# on examine toutes les options fournies, pour detecter les inutiles
foreach my $opt (keys %options) {
unless (exists ($optionsAndDefaults{$opt})) {
@@ -543,12 +610,14 @@ sub new ($%)
}
}
-
$self->[appName] = $options{-appName} ;
$self->[messWhenReady] = $options{-messWhenReady} ;
- @{$self->[neededApp]} = @{$options{-neededApp}} ;
- $self->[statusFunc] = $options{-statusFunc} ;
- $self->[topicRegexps] = $options{-pruneRegexp} ;
+ @{$self->[neededApp]} = @{$options{-neededApp}} ;
+ $self->[statusFunc] = $options{-statusFunc} ;
+ $self->[slowAgentFunc] = $options{-slowAgentFunc} ;
+ $self->[blockOnSlowAgent] = $options{-blockOnSlowAgent} ;
+
+ $self->[topicRegexps] = $options{-filterRegexp} ;
$allBuses{$self} = $self;
($self->[useMulticast], $self->[broadcastPort], $self->[broadcastBuses]) =
@@ -606,12 +675,10 @@ sub stop ()
sub exit ()
{
Ivy::stop ();
- if (defined $localLoopSel) {
- # boucle locale, on sait faire
- # printf ("DBG> undefining localLoopSel\n");
- undef $localLoopSel;
- }
- else {
+ if (defined $localLoopSelRead) {
+ undef $localLoopSelRead ;
+ undef $localLoopSelWrite ;
+ } else {
Tk::exit ();
}
} # end exit
@@ -650,7 +717,8 @@ sub start
$self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004";
# le message de bonjour à envoyer: "no de version no de port"
- my $bonjourMsg = sprintf ("%d %d\n", IVY_PROTOCOLE_VERSION, $connSock->sockport());
+ my $bonjourMsg = sprintf ("%d %d %s %s\n", IVY_PROTOCOLE_VERSION, $connSock->sockport(),
+ $self->[appName], $self->[uuid]);
if (!$self->[useMulticast]) {
# cree la socket de broadcast
@@ -777,58 +845,69 @@ sub bindDirect
}
} # end bindDirect
+
############### PROCEDURE SEND MSGS
sub sendMsgs
{
my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
- my @msgs = @_;
+ my $appSock;
my $total = 0;
# pour tous les messages
- foreach my $msg (@msgs) {
- carp "Warning in Ivy::sendMsgs, a message contains a '\\n'. You should correct it:\n'$msg'" if ($msg =~ /\n/) ;
-
- study ($msg);
-
- # pour routes les connections
- foreach my $fd (keys %{$self->[sockList]}) {
-
- # pour toutes les fonctions de filtrage de regexp
- foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
- $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
- }
- }
+ foreach my $msg (@_) {
+ study ($msg);
+ carp "Warning in Ivy::sendMsgs, a message contains a '\\n'. " .
+ "You should correct it:\n'$msg'" if ($msg =~ /\n/) ;
+
+ # pour routes les connections
+ foreach $appSock (values %{$self->[sockList]}) {
+ # pour toutes les fonctions de filtrage de regexp
+ $total += _sendMsgTo ($self, $appSock, $msg);
+ }
}
# print "DBG> sended $total times\n";
return $total;
} # end sendMsgs
+
+
+
+
+
+
############### PROCEDURE SEND MSGS
sub sendAppNameMsgs
{
my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
- my @msgs = @_;
- my $total = 0;
+ return $self->sendMsgs (map ("$self->[appName] $_", @_));
+}
- # pour tous les messages
- foreach (@msgs) {
- carp "Warning in Ivy::sendAppNameMsgs, a message contains a '\\n'. Skipping it:\n'$_'" if ($_ =~ /\n/);
- my $msg = "$self->[appName] $_";
- study ($msg);
+# sub sendAppNameMsgs
+# {
+# my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
+# my @msgs = @_;
+# my $total = 0;
- # pour toutes les connections
- foreach my $fd (keys %{$self->[sockList]}) {
+# # pour tous les messages
+# foreach (@msgs) {
+# carp "Warning in Ivy::sendAppNameMsgs, a message contains a '\\n'. Skipping it:\n'$_'" if ($_ =~ /\n/);
- # pour toutes les fonctions de filtrage de regexp
- foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
- $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
- }
- }
- }
- # print "DBG> sended $total times\n";
- return $total;
-} # end sendAppNameMsgs
+# my $msg = "$self->[appName] $_";
+# study ($msg);
+
+# # pour toutes les connections
+# foreach my $fd (keys %{$self->[sockList]}) {
+
+# # pour toutes les fonctions de filtrage de regexp
+# foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
+# $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
+# }
+# }
+# }
+# # print "DBG> sended $total times\n";
+# return $total;
+# } # end sendAppNameMsgs
@@ -909,18 +988,25 @@ sub ping
} # end ping
############### METHODE MAINLOOP
-sub mainLoop ()
+sub mainLoop ($)
{
- croak "Error in Ivy::mainLoop, Ivy should have been initialised with LOCAL loop mode \n"
- unless defined $localLoopSel;
+ my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
+ my ($fd, @selRes, @allDesc);
- my ($fd, @ready, @allDesc);
+ if ($loopMode == TK_MAINLOOP) {
+ eval {Tk::MainLoop ()};
+ return;
+ }
+
+ croak "Error in Ivy::mainLoop, Ivy not properly initialized\n"
+ unless defined $localLoopSelRead;
- while (defined $localLoopSel) {
- @ready = IO::Select::can_read ($localLoopSel, $selectTimout) ;
+ while (defined $localLoopSelRead) {
+# READ
+ @selRes = IO::Select::select ($localLoopSelRead, $localLoopSelWrite, undef, $selectTimout) ;
_scanAfter () ;
- foreach my $fd (@ready) {
+ foreach $fd (@{$selRes[0]}) {
if (ref $localBindByHandle{$fd} eq 'CODE') {
&{$localBindByHandle{$fd}} ;
}
@@ -929,6 +1015,25 @@ sub mainLoop ()
&$cb (@arg)
}
}
+
+#WRITE
+ my $bufEmiRef;
+ my $sent;
+
+ foreach $fd (@{$selRes[1]}) {
+ $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd});
+ $sent = send ($fd, $$bufEmiRef, 0);
+ unless (defined $sent) {
+ # y a rien à faire
+ } elsif ($sent == length ($$bufEmiRef)) {
+ $$bufEmiRef = "";
+ _callCongestionCb ($self, $fd, 0);
+ } elsif ($sent >= 0) {
+ substr ($$bufEmiRef, 0, $sent, '');
+ } else {
+ $self->_removeFileDescriptor ($fd) ;
+ }
+ }
}
} # end mainLoop
@@ -1026,22 +1131,30 @@ sub fileEvent ($$;$)
my ($fd, $cb) = @_;
- unless (defined $localLoopSel) {
+ unless (defined $localLoopSelRead) {
croak ("Error in Ivy::fileEvent, Ivy should have been initialised in LOCAL loop mode\n");
}
if ($cb) {
# adding the handler
$localBindByHandle{$fd} = $cb;
- $localLoopSel->add ($fd);
+ $localLoopSelRead->add ($fd);
} else {
# deleting the handler
delete $localBindByHandle{$fd};
# print ("DBG> Ivy::fileEvent : removing fd from the select\n");
- $localLoopSel->remove ($fd);
+ $localLoopSelRead->remove ($fd);
}
} # end fileEvent
+
+
+
+sub getUuid ($)
+{
+ my $self = shift;
+ return $self->[uuid];
+}
#############################################################################
#### METHODES PRIVEES #####
#############################################################################
@@ -1067,7 +1180,13 @@ sub _getBonjour ($)
my $peerName = gethostbyaddr ($addr, AF_INET) || inet_ntoa($addr);
# on force $peerPort a etre vu comme une valeur numerique
- my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/;
+ my ($version, $peerPort, $udpAppName, $uuid) =
+# $bonjourMsg =~ /^(\d+)\s+(\d+)\s+(?:(\w+)\s+(.*))?/;
+ $bonjourMsg =~ /^(\d+)\s+(\d+)(?:\s+(\S+)\s+(.*))?\n/;
+
+ $udpAppName = 1 unless defined $udpAppName;
+# printf ("DEBUG : bonjourMsg = '$bonjourMsg'\n");
+# printf ("DEBUG : reception de $peerName : bonjour $peerPort uuid = $uuid\n");
unless (defined ($version) && defined ($peerPort)) {
carp "Warning in Ivy::_getBonjour, ill-formed Hello message \"$bonjourMsg\"" ;
@@ -1085,10 +1204,18 @@ sub _getBonjour ($)
# on verifie qu'on ne se repond pas et qu'on ne
# se reconnecte pas a un process deja connecte
if (exists ($self->[cnnxion]{"$addr:$peerPort"})) {
- #print "DBG> bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ;
+# print "DBG> bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ;
return ;
+ } elsif ((defined $uuid) && ($uuid eq $self->[uuid])) {
+# print "DBG> bonjour de $peerName:$peerPort : $uuid c'est MOI\n" ;
+ return;
+ } elsif ((defined $uuid) && (exists ($self->[connectedUuid]->{$uuid}))) {
+# print "DBG> bonjour de $peerName:$peerPort:$uuid DEJA CONNECTE\n" ;
} else {
- #print "DBG> reception de $peerName : bonjour $peerPort\n" ;
+# print "DBG> reception de $peerName : bonjour $udpAppName:$peerPort" ;
+# print " uuid=$uuid" if (defined $uuid);
+# print ("\n");
+ $self->[connectedUuid]->{$uuid} = 1 if (defined $uuid);
}
# on verifie que l'adresse fasse partie de l'ensemble de reseau
@@ -1119,11 +1246,16 @@ sub _getBonjour ($)
Proto => 'tcp');
if ($appSock) {
+ my $flags = fcntl($appSock, F_GETFL, 0);
+ $flags = fcntl($appSock, F_SETFL, $flags | O_NONBLOCK)
+ or die "Can't set flags for the socket: $!\n";
+
# on cree une entree pour $appSock dans la liste des regexp
- $self->[cnnxion]{"$addr:$peerPort"} = 1;
+ $self->[cnnxion]{"$addr:$peerPort"} = $udpAppName;
$self->[sendRegList]{$appSock} = [];
$self->[sendRegListSrc]{$appSock} = [];
- $self->[buffByConn]{$appSock} = '';
+ $self->[bufRecByCnnx]{$appSock} = '';
+ $self->[bufEmiByCnnx]{$appSock} = '';
$self->[sockList]{$appSock} = $appSock;
&$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ;
@@ -1143,6 +1275,11 @@ sub _getConnections ($)
my $self = shift;
my $appSock = $self->[connSock]->accept();
+ if ($appSock) {
+ my $flags = fcntl($appSock, F_GETFL, 0);
+ $flags = fcntl($appSock, F_SETFL, $flags | O_NONBLOCK)
+ or die "Can't set flags for the socket: $!\n";
+ }
unless (defined $appSock) {
carp "Warning in Ivy::_getConnections, \$appSock not defined";
@@ -1160,7 +1297,8 @@ sub _getConnections ($)
# on cree une entree pour $appSock dans la liste des regexp
$self->[sendRegList]{$appSock} = [];
$self->[sendRegListSrc]{$appSock} = [];
- $self->[buffByConn]{$appSock} = '';
+ $self->[bufRecByCnnx]{$appSock} = '';
+ $self->[bufEmiByCnnx]{$appSock} = '';
$self->[sockList]{$appSock} = $appSock;
# on balance les regexps qui nous interessent a l'appli distante
$self->_sendWantedRegexp ($appSock);
@@ -1184,7 +1322,7 @@ sub _getMessages ($$)
# Bon la il faudra un jour clarifier ce bordel, lister toutes
# les facons dont un couple d'applis connectee peuevent sortir et
# eviter les dead lock qui doivent subsister.
- if (defined ($localLoopSel)) {
+ if (defined ($localLoopSelRead)) {
$self->_removeFileDescriptor ($appSock);
}
else {
@@ -1194,12 +1332,12 @@ sub _getMessages ($$)
}
- if (length ($self->[buffByConn]{$appSock})) {
- $buffer = $self->[buffByConn]{$appSock} . $buffer ;
- $self->[buffByConn]{$appSock} = '';
+ if (length ($self->[bufRecByCnnx]{$appSock})) {
+ $buffer = $self->[bufRecByCnnx]{$appSock} . $buffer ;
+ $self->[bufRecByCnnx]{$appSock} = '';
}
my @messages = split ('\n', $buffer) ;
- $self->[buffByConn]{$appSock} = pop (@messages) unless
+ $self->[bufRecByCnnx]{$appSock} = pop (@messages) unless
($buffer =~ /\n$/) ;
# if (defined $appSock->peername) {
@@ -1263,22 +1401,33 @@ sub _getMessages ($$)
# si l'id de regexp n'etait pas utilisee c'est tout bon
# on affecte la nouvelle regexp a un id
$self->[sendRegListSrc]{$appSock}->[$id] = $valeurs;
- $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_';
- sub {
- use strict;
- if (my @args = ${$_[0]} =~ /($valeurs)/io) {
- shift @args;
- $args[$#args] .= "\003" if @args;
- send ($appSock, sprintf (MSG_FMT,
- MSG, $id, join ("\003",@args)), 0)
- or $self->_removeFileDescriptor ($appSock) ;
- #print join (' ', "DBG> J'envoie MSG", $id, @args, "\n");
- return 1;
- }
- };
-_EOL_
- }
- else {
+ $self->[sendRegList]{$appSock}->[$id] =
+ eval ('sub {@{$_[1]} = ${$_[0]} =~ /($valeurs)/io;}');
+
+
+# $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_';
+# sub {
+# use strict;
+# if (my @args = ${$_[0]} =~ /($valeurs)/io) {
+# shift @args;
+# $args[$#args] .= "\003" if @args;
+# $self->[bufEmiByCnnx]->{$appSock} .=
+# sprintf (MSG_FMT, MSG, $id, join ("\003",@args));
+# my $sent = send ($appSock, $self->[bufEmiByCnnx]->{$appSock}, 0);
+# unless (defined $sent) {
+# # y a rien à faire
+# } elsif ($sent == length ($self->[bufEmiByCnnx]->{$appSock})) {
+# $self->[bufEmiByCnnx]->{$appSock} = "";
+# } elsif ($sent >= 0) {
+# substr ($self->[bufEmiByCnnx]->{$appSock}, 0, $sent, '');
+# } else {
+# $self->_removeFileDescriptor ($appSock) ;
+# }
+# }
+# return 1;
+# }
+# _EOL_
+ } else {
# l'id de la regexp etait deja utilise,
# et n'a pas ete libere par un message delregexp,
# on renvoie donc un message d'erreur
@@ -1304,7 +1453,7 @@ _EOL_
elsif ($type == ENDREGEXP) { # E N D R E G E X P
# on envoie le message ready uniquement a celui qui nous
# a envoye le message endregexp
- $self->_sendMsgTo ($appSock, $self->[messWhenReady]);
+ $self->_sendMsgTo ($appSock, \$self->[messWhenReady]);
# on passe de l'etat Connecte a l'etat Ready
$self->[cnnxion]{"$addr:$peerPort"} =~ s/^\004//;
@@ -1428,8 +1577,9 @@ sub _inetAdrByName ($$) {
keys %{$self->[cnnxion]}))[0];
return ("unknow") unless defined $addrInet;
- my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/;
+ my ($port) = $addrInet =~ /:(.*)/;
+ my $addr = substr ($addrInet,0,4);
my $host = (gethostbyaddr ($addr, AF_INET))[0] || inet_ntoa($addr);
return "$host:$port";
} # end _inetAdrByName
@@ -1453,7 +1603,8 @@ sub _removeFileDescriptor ($$)
&$fileEventFunc ($fd, '') ;
delete $self->[sendRegList]{$fd};
delete $self->[sockList]{$fd};
- delete $self->[buffByConn]->{$fd};
+ delete $self->[bufRecByCnnx]->{$fd};
+ delete $self->[bufEmiByCnnx]->{$fd};
$fd->close();
@@ -1530,14 +1681,51 @@ sub _sendDieTo ($$)
############### METHODE SEND MSG TO
-sub _sendMsgTo ($$$)
+sub _sendMsgTo ($$\$)
{
my ($self, $fd, $msg) = @_;
+ my $id = -1;
+ my $total = 0;
+ my @args = (); # tableau passé en reference aux fonctions
+ # anonymes compilées par eval pour receuillir les arguments filtrés
+ # par les regexp à envoyer sur le tuyau
+ my $sent;
+ my $regexpFunc;
+ my $bufEmiRef;
# pour toutes les fonctions de filtrage de regexp
- foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
- &{$regexpFunc} (\$msg) if defined $regexpFunc;
+ foreach $regexpFunc (@{$self->[sendRegList]{$fd}}) {
+ $id++;
+ # $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
+ next unless defined $regexpFunc;
+ next unless &{$regexpFunc} ($msg, \@args);
+ next unless @args;
+ $total ++;
+ shift @args;
+ $args[$#args] .= "\003" if @args;
+ $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd});
+ my $enCongestion = $$bufEmiRef ? 1 : 0;
+ $$bufEmiRef .= sprintf (MSG_FMT, MSG, $id, join ("\003",@args));
+ next if $enCongestion;
+ $sent = send ($fd, $$bufEmiRef, 0);
+ unless (defined $sent) {
+ # y a rien à faire
+ } elsif ($sent == length ($$bufEmiRef)) {
+ $$bufEmiRef = "";
+ } elsif ($sent >= 0) {
+ substr ($$bufEmiRef, 0, $sent, '');
+ _callCongestionCb ($self, $fd, 1);
+ if ($self->[blockOnSlowAgent]) {
+ my $win = '';
+ vec($win, fileno ($fd), 1) = 1;
+ select (undef, $win, undef, undef);
+ }
+ } else {
+ $self->_removeFileDescriptor ($fd) ;
+ }
}
+
+ return ($total);
} # end _sendMsgTo
@@ -1764,6 +1952,61 @@ sub _substituteEscapedChar ($$)
return $reg;
} # end _substituteEscapedChar
+############# Procedure __CALL CONGESTION CALLBACK
+sub _callCongestionCb ($$$)
+{
+ my ($self, $fd, $congestion) = @_;
+ my $appName;
+ my $addrInet;
+
+ EXT_LOOP:
+ foreach my $name (keys %{$self->[appliList]}) {
+ foreach my $fdp (@{$self->[appliList]{$name}}) {
+ if ($fd == $fdp) {
+ $appName = $name;
+ last EXT_LOOP;
+ }
+ }
+ }
+
+ if ($loopMode == LOCAL_MAINLOOP) {
+ if ($congestion) {
+ $localLoopSelWrite->add ($fd);
+ } else {
+ $localLoopSelWrite->remove ($fd);
+ }
+ } else {
+ if ($congestion) {
+ Tk::fileevent ('', $fd, 'writable',
+ sub {
+ my $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd});
+ my $sent = send ($fd, $$bufEmiRef, 0);
+ unless (defined $sent) {
+ # y a rien à faire
+ } elsif ($sent == length ($$bufEmiRef)) {
+ $$bufEmiRef = "";
+ _callCongestionCb ($self, $fd, 0);
+ } elsif ($sent >= 0) {
+ substr ($$bufEmiRef, 0, $sent, '');
+ } else {
+ $self->_removeFileDescriptor ($fd) ;
+ }
+ });
+ } else {
+ Tk::fileevent ('', $fd, 'writable', '');
+ }
+ }
+
+ if (defined $appName) {
+ $addrInet = $self->_inetAdrByName ($appName);
+ } else {
+ $appName = 'NONAME'; $addrInet = 'undef';
+ }
+
+ &{$self->[slowAgentFunc]} ($appName, $addrInet, $congestion);
+}
+
+
1;
__END__
@@ -1847,15 +2090,17 @@ The prototype of your method must be as follows:
...
}
-=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']>
+=item B<-filterRegexp =E<gt> ['subject 1', ..., 'subject n']>
Optimize communication using this option. Regexps
which don't match these subjects are removed.
- Example :
- Ivy->init(-loopMode =E<gt> 'TK',
- -appName =E<gt> 'MyWonderfulApp',
- -onDieFunc =E<gt> [\&restorecontext]);
+=item B<Example:>
+
+ Ivy->init(-loopMode => 'TK',
+ -appName => 'MyWonderfulApp',
+ -onDieFunc => [\&restorecontext] ,
+ -filterRegexp => ['MyWonderfulApp', 'ClockStart', 'ClockStop']);
=back
@@ -1906,7 +2151,7 @@ The prototype of your method must be as follows:
...
}
-=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']>
+=item B<-filterRegexp =E<gt> ['subject 1', ..., 'subject n']>
Optimize communication using this option. Regexps which don't match these subjects are removed.
@@ -1917,7 +2162,22 @@ before running.
=item B<-statusFunc =E<gt> sub {}>
-A callback which is called every time an agent C connects on the bus, disconnects from the bus, subscribes to a regexp, or unsubscribes to a regexp. When the agent A is stopping, this function is also called inside the agent A for every other agents Ci on the bus, as they are disconnecting. The first 3 parameters are a reference to an array of connected agents Ci, a reference to an array of not connected agents (according to the "-neededApp" argument of the new method / function), a reference to a hash table of connected agents Ci (giving the number of each agent). These 3 parameters are maintained for upwards compatibility but should no more be used, since the following three parameters are much easier to use: the name of an appearing / disapearing or subscribing / unsubscribing agent C, its status either "new" or "died" or "subscribing" or "unsubscribing", and the hostname where this agent C is running / dying OR the subscribed / unsubscribed regexp. If the hostname of this agent C is not known, it will be replaced by its IP address.
+A callback which is called every time an agent C connects on the bus,
+disconnects from the bus, subscribes to a regexp, or unsubscribes to a
+regexp. When the agent A is stopping, this function is also called
+inside the agent A for every other agents C on the bus, as they are
+disconnecting. The first 3 parameters are a reference to an array of
+connected agents Ci, a reference to an array of not connected agents
+(according to the "-neededApp" argument of the new method / function),
+a reference to a hash table of connected agents Ci (giving the number
+of each agent). These 3 parameters are maintained for upwards
+compatibility but should no more be used, since the following three
+parameters are much easier to use: the name of an appearing /
+disapearing or subscribing / unsubscribing agent C, its status either
+"new" or "died" or "subscribing" or "unsubscribing", and the hostname
+where this agent C is running / dying OR the subscribed / unsubscribed
+regexp. If the hostname of this agent C is not known, it will be
+replaced by its IP address.
@@ -1950,21 +2210,60 @@ Your callback could be:
}
-Example:
+
+
+=item B<-blockOnSlowAgent =E<gt> 0 or 1>
+
+Behavior when the bus is being congested due to an ivy agent which
+doesn't read messages sufficiently quickly. In blocking mode the local
+app will block on a send, so it won't be interactive until the send
+return, and it will at his turn don't read his pending message,
+leading to a global sluggishness of the entire ivy bus. In non
+blocking mode the messages are stocked until they could be sent, so
+the problem is the uncontrolled memory consumption.
+
+
+=item B<-slowAgentFunc=E<gt> \&congestionFunc >
+
+ A callback which is called every time a congestion event occurs. A
+ congestion event is emitted each time an agent is being congested,
+ or, after being congested is able to read his messages again. The
+ parameters are the name of the app, his address (hostname+port
+ number), and the state, 1 for congested, 0 for able to read.
+
+ Your callback could be:
+
+sub congestionFunc ($$$)
+{
+ my ($name, $addr, $state) = @_;
+ printf ("$name [$addr] %s\n", $state ? "CONGESTION" : "OK");
+}
+
+
+
+=item B<Example:>
Ivy->new(-ivyBus => '156,157:2204',
-onDieFunc => [\&restorecontext],
-neededApp => ["DataServer", "HMI"],
+ -slowAgentFunc=> \&congestionFunc,
+ -blockOnSlowAgent => 1,
-statusFunc => \&MyCallback);
=back
+
+
+
+
+
=item B<mainLoop>
Ivy->mainLoop;
Ivy::mainLoop;
+ $ivyobj->mainLoop;
-Local main events loop. Use it if you don't use the Tk library.
+ main events loop, call local mainloop or Tk::MainLoop according so specified mode
=item B<stop>