summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbustico2006-06-19 10:55:25 +0000
committerbustico2006-06-19 10:55:25 +0000
commitd2597b1ce78b12461c2bb1c90337972e363a23ab (patch)
tree071906da93057bc233e3c300ddaac3fdba500934
parent06f3b9f78f5fb14c3e2387f79348157283a8f682 (diff)
downloadivy-perl-d2597b1ce78b12461c2bb1c90337972e363a23ab.zip
ivy-perl-d2597b1ce78b12461c2bb1c90337972e363a23ab.tar.gz
ivy-perl-d2597b1ce78b12461c2bb1c90337972e363a23ab.tar.bz2
ivy-perl-d2597b1ce78b12461c2bb1c90337972e363a23ab.tar.xz
1/
une modification du message UDP Hello afin d'y intégrer un identificateur unique d'application et le nom de l'application Ceci afin de corriger les problèmes de connexion multiples en cas d'utilisation de plusieurs réseau ( il arrive que l'on reçoive plusieurs fois le paquet ) L'identifiant permettra d'identifier à coup sur l'application et donc de rejeter les paquets multiples Le protocole reste compatible avec les anciennes versions d'ivy. 2/ notification des congestions : on spécifier une callback qui sera appelée si un client ne consomme pas ses messages assez vite : la callback n'est appelée que lors d'un changement d'état : congestion ou decongestion. my $bus = Ivy->new (-slowAgentFunc=> \&congestionFunc ) sub congestionFunc ($$$) { my ($name, $addr, $state) = @_; if ($state == 1) { printf ("$name [$addr] %s will stop at N=%d\n", $state ? "CONGESTION" : "OK", $numberOfSentMsg+100); $numberOfSentMsgWhenCongestion = $numberOfSentMsg; } else { printf ("$name [$addr] %s\n", $state ? "CONGESTION" : "OK"); } } 3/ possibilité de rendre Ivy non bloquant : si un client ne consomme pas ses messages assez vite les messages seront accumulés en local et le sendMsgs non bloquant rendra la main. my $bus = Ivy->new (-blockOnSlowAgent => 0) 4/ optimisation en terme de vitesse et de conso mémoire.
-rw-r--r--Ivy.pm553
1 files changed, 426 insertions, 127 deletions
diff --git a/Ivy.pm b/Ivy.pm
index 1a0ab4c..ba3a043 100644
--- a/Ivy.pm
+++ b/Ivy.pm
@@ -1,13 +1,13 @@
#
# Ivy, Perl interface
#
-# Copyright 1997-2003
+# Copyright 1997-2006
# Centre d'Études de la Navigation Aérienne
#
-# Authors: Alexandre Bustico <bustico@cena.fr>
-# Stéphane Chatty <chatty@cena.fr>
-# Hervé Damiano <damiano@cena.fr>
-# Christophe Mertz <mertz@cena.fr>
+# Authors: Alexandre Bustico <alexandre.bustico@cena.fr>
+# Stéphane Chatty <chatty@intuilab.com>
+# Hervé Damiano <herve.damiano@aviation-civile.gouv.fr>
+# Christophe Mertz <mertz@intuilab.com>
#
# All functions
#
@@ -30,6 +30,9 @@
#
##################################################################
+# TODO
+#
+
package Ivy ;
use Sys::Hostname;
@@ -40,6 +43,7 @@ use Carp;
use IO::Socket::Multicast;
use vars qw($VERSION);
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
# to compute the VERSION from the CVS tag (or if no tag, as the cvs file revision)
my $TAG= q$Name$;
@@ -80,7 +84,7 @@ sub start; # debut de l'integration au bus :
# getConnections
# pour etablir les connections "application"
-sub DESTROY ($); # - envoie un BYE et clot les connections
+sub DESTROY ($); # - envoie un BYE et clôt les connections
#sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack
# ou d'annuler une precedente association
@@ -94,7 +98,7 @@ sub sendAppNameMsgs; # envoie une liste de messages precedes
sub sendDirectMsgs; # envoie une liste de messages directs a une appli
sub sendDieTo; # envoie un <<kill>> a une appli
sub ping; # teste qu'une appli soit encore vivante
-sub mainLoop (); # la mainloop locale (sans tk)
+sub mainLoop ($); # la mainloop locale (sans tk)
sub stop (); # methode de classe : on delete les bus, mais
# on reste dans la mainloop
sub exit (); # methode de classe : on delete tous les
@@ -111,7 +115,8 @@ sub repeat ($$;$); # temps en millisecondes, callback
sub afterCancel ($;$); # l'id d'un cancel ou d'un repeat
sub afterResetTimer ($;$); # pour ré-armer un timer non-encore déclenché à sa valeur initiale
sub fileEvent ($$;$); # associe un fd a un callback pour la mainloop locale
-
+sub getUuid ($); # rend un identifiant unique d'application sur le bus, utile
+ # pour faire des genres de rpc avec un abonnement desabonnement dynamique
################ PRIVEE ####################################################
sub _getBonjour ($); # lit le (ou les) bonjour(s) sur le canal de supervision
@@ -134,7 +139,7 @@ sub _sendLastRegexpToAllreadyConnected ($$) ; # envoie la derniere regexp
sub _removeFileDescriptor ($$); # on vire un fd et les structures associees
sub _sendErrorTo ($$$); #(fd, error) envoie un message d'erreur a un fd
sub _sendDieTo ($$); #(fd) envoie un message de demande de suicide a un fd
-sub _sendMsgTo ($$$); # (fd, message)
+sub _sendMsgTo ($$\$); # (fd, message)
sub _pong ($$); # (fd)
sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk
sub _scanAfter () ; # parse si il faut appeler un callback associe a un after
@@ -153,6 +158,9 @@ sub _parseIvyBusParam ($); # prends une adresse de bus de la forme
sub _substituteEscapedChar ($$); #permet de transformer une regexp etendue
# 'perl' en regexp de base
+sub _callCongestionCb ($$$); # appelle la callback de notification de congestion,
+ # si elle a été définie par l'utilisateur
+
#############################################################################
#### CONSTANTES #####
#############################################################################
@@ -177,6 +185,8 @@ use constant IVY_PROTOCOLE_VERSION => 3;
use constant AFTER => 0;
use constant REPEAT => 1;
+use constant TK_MAINLOOP => 0;
+use constant LOCAL_MAINLOOP => 1;
use constant MAX_TIMOUT => 1000;
# pour pouvoir employer les regexps perl. Attention lors de l'utilisation
@@ -240,8 +250,17 @@ my $fileEventFunc;
# dans le cas ou l'on soit dans une mainLoop
# locale, cette var pointe une un objet
-# de type IO::Select;
-my $localLoopSel;
+# de type IO::Select, qui est l'ensemble des file descriptor
+# des sockets que l'on scrute en lecture (attente des messages)
+my $localLoopSelRead;
+
+# dans le cas ou l'on soit dans une mainLoop
+# locale, cette var pointe une un objet
+# de type IO::Select qui est l'ensemble des file descriptor
+# des sockets que l'on scrute en ecriture : les fd congestionnés
+# des "slow agent" que l'on surveille pour ecrire dedans dès que
+# possible (mode non bloquant)
+my $localLoopSelWrite;
# table d'ass. handle -> callback
my %localBindByHandle;
@@ -254,7 +273,7 @@ my $afterId = 0;
# timeout le plus petit pour le select
my $selectTimout = MAX_TIMOUT;
-
+my $loopMode;
# liste des bus actifs
my %allBuses = ();
@@ -275,6 +294,8 @@ my %allBuses = ();
use constant servPort => $constantIndexer++;
use constant neededApp => $constantIndexer++;
use constant statusFunc => $constantIndexer++;
+use constant slowAgentFunc => $constantIndexer++;
+use constant blockOnSlowAgent => $constantIndexer++;
use constant supSock => $constantIndexer++;
use constant connSock => $constantIndexer++;
use constant sockList => $constantIndexer++;
@@ -286,12 +307,15 @@ use constant topicRegexps => $constantIndexer++;
use constant recCbList => $constantIndexer++;
use constant directCbList => $constantIndexer++;
use constant cnnxion => $constantIndexer++;
-use constant buffByConn => $constantIndexer++;
+use constant connectedUuid => $constantIndexer++;
+use constant bufRecByCnnx => $constantIndexer++;
+use constant bufEmiByCnnx => $constantIndexer++;
use constant broadcastPort => $constantIndexer++;
use constant broadcastBuses => $constantIndexer++;
use constant useMulticast => $constantIndexer++;
use constant appName => $constantIndexer++;
use constant messWhenReady => $constantIndexer++;
+use constant uuid => $constantIndexer++;
#############################################################################
#### METHODES PUBLIQUES #####
@@ -303,6 +327,7 @@ sub init
return;
}
+ srand(); # initialisation du generateur aléatoire qui sert à produire un UUID
my $class = shift if (@_ and $_[0] eq __PACKAGE__);
my (%options) = @_;
@@ -340,11 +365,13 @@ sub init
# ps : ne pas faire d'exit dans le callback,
# c'est le bus qui s'en charge
- -pruneRegexp => [],
- # optimisation : si l'on connait les sujets des messages
- # qu'on envoie, on fournit la liste des sujets
- # et les regexps qui ne matchent pas
- # ces sujets sont eliminees.
+
+ -filterRegexp => [],# nouvelle api cohérente avec ivy-c, c++, java
+ -pruneRegexp => [], # obsolete
+ # optimisation : si l'on connait les sujets des messages
+ # qu'on envoie, on fournit la liste des sujets
+ # et les regexps qui ne matchent pas
+ # ces sujets sont eliminees.
) ;
# on examine toutes les options possibles
@@ -367,26 +394,32 @@ sub init
}
}
- my $loopMode = $options{-loopMode};
$ivyBus = $options{-ivyBus};
$appName = $options{-appName} ;
$onDieFunc = $options{-onDieFunc} ;
- @topicRegexps = @{$options{-pruneRegexp}};
+
+ if (scalar (@{$options{-pruneRegexp}})) {
+ carp "-pruneRegexp is *OBSOLETE*. -filterRegexp should be used instead\n";
+ $options{-filterRegexp} = $options{-pruneRegexp} unless defined $options{-filterRegexp};
+ }
+
+ @topicRegexps = @{$options{-filterRegexp}};
$messWhenReady = $options{-messWhenReady} eq "_APP NAME READY" ?
"$appName READY" :
$options{-messWhenReady};
- if ($loopMode =~ /local/i) {
+ if ($options{-loopMode} =~ /local/i) {
# mode boucle d'evenement locale
use IO::Select;
$fileEventFunc = \&fileEvent ;
- $localLoopSel = IO::Select->new ();
-
- } elsif ($loopMode =~ /tk/i) {
+ $localLoopSelRead = IO::Select->new ();
+ $localLoopSelWrite = IO::Select->new ();
+ $loopMode = LOCAL_MAINLOOP;
+ } elsif ($options{-loopMode} =~ /tk/i) {
# mode boucle d'evenement de TK
$fileEventFunc = \&_tkFileEvent ;
-
+ $loopMode = TK_MAINLOOP;
} else {
croak "Error in Ivy::init, argument loopMode must be either TK or LOCAL\n";
}
@@ -421,6 +454,7 @@ sub new ($%)
# - des qu'une appli se connecte
# - lorsqu'une appli se deconnecte
$self->[statusFunc] = '';
+ $self->[slowAgentFunc] = '';
# callback prenant en param 1 refs sur une liste :
# [ref sur fonction, parametres]
@@ -471,12 +505,21 @@ sub new ($%)
# pas sur un service en cas de bonjours repetes
# valeur : nom de l'application
$self->[cnnxion] = {};
+ $self->[connectedUuid] = {};
# tableau associatif, clef => file desc,
# valeur :buffer au cas ou la lecture ne se termine
# pas par \n, de maniere a resegmenter les messages
- $self->[buffByConn] = {};
+ $self->[bufRecByCnnx] = {};
+
+ # tableau associatif, clef => file desc,
+ # valeur :buffer au cas ou l'écriture bloque sur un fd
+ # pour eviter de bloquer la mainloop sur un send on bufferise
+ # les données dans le process
+ $self->[bufEmiByCnnx] = {};
+ # identifiant unique
+ $self->[uuid] = sprintf ("%d%d", time(), rand()*1e15);
my %optionsAndDefaults = (
-appName => $appName,
@@ -506,7 +549,7 @@ sub new ($%)
# toutes les applis necessaires ne sont pas presentes,
# et des que toutes les applis necessaires sont
# presentes, et si une appli necessaire se deconnecte
- # les trois parametres passes sont :
+ # les trois parametres passes à la callback sont :
# [liste des applis presentes],
# [liste des applis absentes],
# [table de hash, clefs = applis presentes,
@@ -515,11 +558,30 @@ sub new ($%)
# ca veut dire que plus d'une appli de meme nom
# tourne sur le meme bus : danger !!
- -pruneRegexp => [@topicRegexps],
- # optimisation : si l'on connait les sujets des messages
- # qu'on envoie, on fournit la liste des sujets
- # et les regexps qui ne matchent pas
- # ces sujets sont eliminees.
+ -blockOnSlowAgent => 1,
+ # comportement lorque un ou plusieurs des agents connectés
+ # se consomme suffisement rapidement ses messages. Ou bien on laisse
+ # le send bloquer, ou bien on accumule les messages localement
+ # en attendant que l'agent congestionné soit capable de les traiter.
+ # cette méthode a l'avantage de ne pas bloquer la mainloop locale,
+ # par contre la consomation mémoire peut devenir problématique.
+
+ -slowAgentFunc => sub {},
+ # fonction de callBack qui sera appelee si l'envoi de messages
+ # à un agent n'est plus possible parce que l'agent en question ne
+ # consomme pas ses messages assez vite, ou si un agent qui etait
+ # dans cette etat retrouve sa capacité à lire les messages.
+ # les paramètres passés à la callback sont
+ # nom de l'appli,
+ # adresse
+ # etat (congestion = 1, decongestion = 0)
+
+ -filterRegexp => [@topicRegexps],
+ -pruneRegexp => [], # obsolete
+ # optimisation : si l'on connait les sujets des messages
+ # qu'on envoie, on fournit la liste des sujets
+ # et les regexps qui ne matchent pas
+ # ces sujets sont eliminees.
) ;
@@ -536,6 +598,11 @@ sub new ($%)
}
}
+ if (scalar (@{$options{-pruneRegexp}})) {
+ carp "-pruneRegexp is *OBSOLETE*. -filterRegexp should be used instead\n";
+ $options{-filterRegexp} = $options{-pruneRegexp} unless defined $options{-filterRegexp};
+ }
+
# on examine toutes les options fournies, pour detecter les inutiles
foreach my $opt (keys %options) {
unless (exists ($optionsAndDefaults{$opt})) {
@@ -543,12 +610,14 @@ sub new ($%)
}
}
-
$self->[appName] = $options{-appName} ;
$self->[messWhenReady] = $options{-messWhenReady} ;
- @{$self->[neededApp]} = @{$options{-neededApp}} ;
- $self->[statusFunc] = $options{-statusFunc} ;
- $self->[topicRegexps] = $options{-pruneRegexp} ;
+ @{$self->[neededApp]} = @{$options{-neededApp}} ;
+ $self->[statusFunc] = $options{-statusFunc} ;
+ $self->[slowAgentFunc] = $options{-slowAgentFunc} ;
+ $self->[blockOnSlowAgent] = $options{-blockOnSlowAgent} ;
+
+ $self->[topicRegexps] = $options{-filterRegexp} ;
$allBuses{$self} = $self;
($self->[useMulticast], $self->[broadcastPort], $self->[broadcastBuses]) =
@@ -606,12 +675,10 @@ sub stop ()
sub exit ()
{
Ivy::stop ();
- if (defined $localLoopSel) {
- # boucle locale, on sait faire
- # printf ("DBG> undefining localLoopSel\n");
- undef $localLoopSel;
- }
- else {
+ if (defined $localLoopSelRead) {
+ undef $localLoopSelRead ;
+ undef $localLoopSelWrite ;
+ } else {
Tk::exit ();
}
} # end exit
@@ -650,7 +717,8 @@ sub start
$self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004";
# le message de bonjour à envoyer: "no de version no de port"
- my $bonjourMsg = sprintf ("%d %d\n", IVY_PROTOCOLE_VERSION, $connSock->sockport());
+ my $bonjourMsg = sprintf ("%d %d %s %s\n", IVY_PROTOCOLE_VERSION, $connSock->sockport(),
+ $self->[appName], $self->[uuid]);
if (!$self->[useMulticast]) {
# cree la socket de broadcast
@@ -777,58 +845,69 @@ sub bindDirect
}
} # end bindDirect
+
############### PROCEDURE SEND MSGS
sub sendMsgs
{
my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
- my @msgs = @_;
+ my $appSock;
my $total = 0;
# pour tous les messages
- foreach my $msg (@msgs) {
- carp "Warning in Ivy::sendMsgs, a message contains a '\\n'. You should correct it:\n'$msg'" if ($msg =~ /\n/) ;
-
- study ($msg);
-
- # pour routes les connections
- foreach my $fd (keys %{$self->[sockList]}) {
-
- # pour toutes les fonctions de filtrage de regexp
- foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
- $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
- }
- }
+ foreach my $msg (@_) {
+ study ($msg);
+ carp "Warning in Ivy::sendMsgs, a message contains a '\\n'. " .
+ "You should correct it:\n'$msg'" if ($msg =~ /\n/) ;
+
+ # pour routes les connections
+ foreach $appSock (values %{$self->[sockList]}) {
+ # pour toutes les fonctions de filtrage de regexp
+ $total += _sendMsgTo ($self, $appSock, $msg);
+ }
}
# print "DBG> sended $total times\n";
return $total;
} # end sendMsgs
+
+
+
+
+
+
############### PROCEDURE SEND MSGS
sub sendAppNameMsgs
{
my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
- my @msgs = @_;
- my $total = 0;
+ return $self->sendMsgs (map ("$self->[appName] $_", @_));
+}
- # pour tous les messages
- foreach (@msgs) {
- carp "Warning in Ivy::sendAppNameMsgs, a message contains a '\\n'. Skipping it:\n'$_'" if ($_ =~ /\n/);
- my $msg = "$self->[appName] $_";
- study ($msg);
+# sub sendAppNameMsgs
+# {
+# my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
+# my @msgs = @_;
+# my $total = 0;
- # pour toutes les connections
- foreach my $fd (keys %{$self->[sockList]}) {
+# # pour tous les messages
+# foreach (@msgs) {
+# carp "Warning in Ivy::sendAppNameMsgs, a message contains a '\\n'. Skipping it:\n'$_'" if ($_ =~ /\n/);
- # pour toutes les fonctions de filtrage de regexp
- foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
- $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
- }
- }
- }
- # print "DBG> sended $total times\n";
- return $total;
-} # end sendAppNameMsgs
+# my $msg = "$self->[appName] $_";
+# study ($msg);
+
+# # pour toutes les connections
+# foreach my $fd (keys %{$self->[sockList]}) {
+
+# # pour toutes les fonctions de filtrage de regexp
+# foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
+# $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
+# }
+# }
+# }
+# # print "DBG> sended $total times\n";
+# return $total;
+# } # end sendAppNameMsgs
@@ -909,18 +988,25 @@ sub ping
} # end ping
############### METHODE MAINLOOP
-sub mainLoop ()
+sub mainLoop ($)
{
- croak "Error in Ivy::mainLoop, Ivy should have been initialised with LOCAL loop mode \n"
- unless defined $localLoopSel;
+ my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
+ my ($fd, @selRes, @allDesc);
- my ($fd, @ready, @allDesc);
+ if ($loopMode == TK_MAINLOOP) {
+ eval {Tk::MainLoop ()};
+ return;
+ }
+
+ croak "Error in Ivy::mainLoop, Ivy not properly initialized\n"
+ unless defined $localLoopSelRead;
- while (defined $localLoopSel) {
- @ready = IO::Select::can_read ($localLoopSel, $selectTimout) ;
+ while (defined $localLoopSelRead) {
+# READ
+ @selRes = IO::Select::select ($localLoopSelRead, $localLoopSelWrite, undef, $selectTimout) ;
_scanAfter () ;
- foreach my $fd (@ready) {
+ foreach $fd (@{$selRes[0]}) {
if (ref $localBindByHandle{$fd} eq 'CODE') {
&{$localBindByHandle{$fd}} ;
}
@@ -929,6 +1015,25 @@ sub mainLoop ()
&$cb (@arg)
}
}
+
+#WRITE
+ my $bufEmiRef;
+ my $sent;
+
+ foreach $fd (@{$selRes[1]}) {
+ $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd});
+ $sent = send ($fd, $$bufEmiRef, 0);
+ unless (defined $sent) {
+ # y a rien à faire
+ } elsif ($sent == length ($$bufEmiRef)) {
+ $$bufEmiRef = "";
+ _callCongestionCb ($self, $fd, 0);
+ } elsif ($sent >= 0) {
+ substr ($$bufEmiRef, 0, $sent, '');
+ } else {
+ $self->_removeFileDescriptor ($fd) ;
+ }
+ }
}
} # end mainLoop
@@ -1026,22 +1131,30 @@ sub fileEvent ($$;$)
my ($fd, $cb) = @_;
- unless (defined $localLoopSel) {
+ unless (defined $localLoopSelRead) {
croak ("Error in Ivy::fileEvent, Ivy should have been initialised in LOCAL loop mode\n");
}
if ($cb) {
# adding the handler
$localBindByHandle{$fd} = $cb;
- $localLoopSel->add ($fd);
+ $localLoopSelRead->add ($fd);
} else {
# deleting the handler
delete $localBindByHandle{$fd};
# print ("DBG> Ivy::fileEvent : removing fd from the select\n");
- $localLoopSel->remove ($fd);
+ $localLoopSelRead->remove ($fd);
}
} # end fileEvent
+
+
+
+sub getUuid ($)
+{
+ my $self = shift;
+ return $self->[uuid];
+}
#############################################################################
#### METHODES PRIVEES #####
#############################################################################
@@ -1067,7 +1180,13 @@ sub _getBonjour ($)
my $peerName = gethostbyaddr ($addr, AF_INET) || inet_ntoa($addr);
# on force $peerPort a etre vu comme une valeur numerique
- my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/;
+ my ($version, $peerPort, $udpAppName, $uuid) =
+# $bonjourMsg =~ /^(\d+)\s+(\d+)\s+(?:(\w+)\s+(.*))?/;
+ $bonjourMsg =~ /^(\d+)\s+(\d+)(?:\s+(\S+)\s+(.*))?\n/;
+
+ $udpAppName = 1 unless defined $udpAppName;
+# printf ("DEBUG : bonjourMsg = '$bonjourMsg'\n");
+# printf ("DEBUG : reception de $peerName : bonjour $peerPort uuid = $uuid\n");
unless (defined ($version) && defined ($peerPort)) {
carp "Warning in Ivy::_getBonjour, ill-formed Hello message \"$bonjourMsg\"" ;
@@ -1085,10 +1204,18 @@ sub _getBonjour ($)
# on verifie qu'on ne se repond pas et qu'on ne
# se reconnecte pas a un process deja connecte
if (exists ($self->[cnnxion]{"$addr:$peerPort"})) {
- #print "DBG> bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ;
+# print "DBG> bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ;
return ;
+ } elsif ((defined $uuid) && ($uuid eq $self->[uuid])) {
+# print "DBG> bonjour de $peerName:$peerPort : $uuid c'est MOI\n" ;
+ return;
+ } elsif ((defined $uuid) && (exists ($self->[connectedUuid]->{$uuid}))) {
+# print "DBG> bonjour de $peerName:$peerPort:$uuid DEJA CONNECTE\n" ;
} else {
- #print "DBG> reception de $peerName : bonjour $peerPort\n" ;
+# print "DBG> reception de $peerName : bonjour $udpAppName:$peerPort" ;
+# print " uuid=$uuid" if (defined $uuid);
+# print ("\n");
+ $self->[connectedUuid]->{$uuid} = 1 if (defined $uuid);
}
# on verifie que l'adresse fasse partie de l'ensemble de reseau
@@ -1119,11 +1246,16 @@ sub _getBonjour ($)
Proto => 'tcp');
if ($appSock) {
+ my $flags = fcntl($appSock, F_GETFL, 0);
+ $flags = fcntl($appSock, F_SETFL, $flags | O_NONBLOCK)
+ or die "Can't set flags for the socket: $!\n";
+
# on cree une entree pour $appSock dans la liste des regexp
- $self->[cnnxion]{"$addr:$peerPort"} = 1;
+ $self->[cnnxion]{"$addr:$peerPort"} = $udpAppName;
$self->[sendRegList]{$appSock} = [];
$self->[sendRegListSrc]{$appSock} = [];
- $self->[buffByConn]{$appSock} = '';
+ $self->[bufRecByCnnx]{$appSock} = '';
+ $self->[bufEmiByCnnx]{$appSock} = '';
$self->[sockList]{$appSock} = $appSock;
&$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ;
@@ -1143,6 +1275,11 @@ sub _getConnections ($)
my $self = shift;
my $appSock = $self->[connSock]->accept();
+ if ($appSock) {
+ my $flags = fcntl($appSock, F_GETFL, 0);
+ $flags = fcntl($appSock, F_SETFL, $flags | O_NONBLOCK)
+ or die "Can't set flags for the socket: $!\n";
+ }
unless (defined $appSock) {
carp "Warning in Ivy::_getConnections, \$appSock not defined";
@@ -1160,7 +1297,8 @@ sub _getConnections ($)
# on cree une entree pour $appSock dans la liste des regexp
$self->[sendRegList]{$appSock} = [];
$self->[sendRegListSrc]{$appSock} = [];
- $self->[buffByConn]{$appSock} = '';
+ $self->[bufRecByCnnx]{$appSock} = '';
+ $self->[bufEmiByCnnx]{$appSock} = '';
$self->[sockList]{$appSock} = $appSock;
# on balance les regexps qui nous interessent a l'appli distante
$self->_sendWantedRegexp ($appSock);
@@ -1184,7 +1322,7 @@ sub _getMessages ($$)
# Bon la il faudra un jour clarifier ce bordel, lister toutes
# les facons dont un couple d'applis connectee peuevent sortir et
# eviter les dead lock qui doivent subsister.
- if (defined ($localLoopSel)) {
+ if (defined ($localLoopSelRead)) {
$self->_removeFileDescriptor ($appSock);
}
else {
@@ -1194,12 +1332,12 @@ sub _getMessages ($$)
}
- if (length ($self->[buffByConn]{$appSock})) {
- $buffer = $self->[buffByConn]{$appSock} . $buffer ;
- $self->[buffByConn]{$appSock} = '';
+ if (length ($self->[bufRecByCnnx]{$appSock})) {
+ $buffer = $self->[bufRecByCnnx]{$appSock} . $buffer ;
+ $self->[bufRecByCnnx]{$appSock} = '';
}
my @messages = split ('\n', $buffer) ;
- $self->[buffByConn]{$appSock} = pop (@messages) unless
+ $self->[bufRecByCnnx]{$appSock} = pop (@messages) unless
($buffer =~ /\n$/) ;
# if (defined $appSock->peername) {
@@ -1263,22 +1401,33 @@ sub _getMessages ($$)
# si l'id de regexp n'etait pas utilisee c'est tout bon
# on affecte la nouvelle regexp a un id
$self->[sendRegListSrc]{$appSock}->[$id] = $valeurs;
- $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_';
- sub {
- use strict;
- if (my @args = ${$_[0]} =~ /($valeurs)/io) {
- shift @args;
- $args[$#args] .= "\003" if @args;
- send ($appSock, sprintf (MSG_FMT,
- MSG, $id, join ("\003",@args)), 0)
- or $self->_removeFileDescriptor ($appSock) ;
- #print join (' ', "DBG> J'envoie MSG", $id, @args, "\n");
- return 1;
- }
- };
-_EOL_
- }
- else {
+ $self->[sendRegList]{$appSock}->[$id] =
+ eval ('sub {@{$_[1]} = ${$_[0]} =~ /($valeurs)/io;}');
+
+
+# $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_';
+# sub {
+# use strict;
+# if (my @args = ${$_[0]} =~ /($valeurs)/io) {
+# shift @args;
+# $args[$#args] .= "\003" if @args;
+# $self->[bufEmiByCnnx]->{$appSock} .=
+# sprintf (MSG_FMT, MSG, $id, join ("\003",@args));
+# my $sent = send ($appSock, $self->[bufEmiByCnnx]->{$appSock}, 0);
+# unless (defined $sent) {
+# # y a rien à faire
+# } elsif ($sent == length ($self->[bufEmiByCnnx]->{$appSock})) {
+# $self->[bufEmiByCnnx]->{$appSock} = "";
+# } elsif ($sent >= 0) {
+# substr ($self->[bufEmiByCnnx]->{$appSock}, 0, $sent, '');
+# } else {
+# $self->_removeFileDescriptor ($appSock) ;
+# }
+# }
+# return 1;
+# }
+# _EOL_
+ } else {
# l'id de la regexp etait deja utilise,
# et n'a pas ete libere par un message delregexp,
# on renvoie donc un message d'erreur
@@ -1304,7 +1453,7 @@ _EOL_
elsif ($type == ENDREGEXP) { # E N D R E G E X P
# on envoie le message ready uniquement a celui qui nous
# a envoye le message endregexp
- $self->_sendMsgTo ($appSock, $self->[messWhenReady]);
+ $self->_sendMsgTo ($appSock, \$self->[messWhenReady]);
# on passe de l'etat Connecte a l'etat Ready
$self->[cnnxion]{"$addr:$peerPort"} =~ s/^\004//;
@@ -1428,8 +1577,9 @@ sub _inetAdrByName ($$) {
keys %{$self->[cnnxion]}))[0];
return ("unknow") unless defined $addrInet;
- my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/;
+ my ($port) = $addrInet =~ /:(.*)/;
+ my $addr = substr ($addrInet,0,4);
my $host = (gethostbyaddr ($addr, AF_INET))[0] || inet_ntoa($addr);
return "$host:$port";
} # end _inetAdrByName
@@ -1453,7 +1603,8 @@ sub _removeFileDescriptor ($$)
&$fileEventFunc ($fd, '') ;
delete $self->[sendRegList]{$fd};
delete $self->[sockList]{$fd};
- delete $self->[buffByConn]->{$fd};
+ delete $self->[bufRecByCnnx]->{$fd};
+ delete $self->[bufEmiByCnnx]->{$fd};
$fd->close();
@@ -1530,14 +1681,51 @@ sub _sendDieTo ($$)
############### METHODE SEND MSG TO
-sub _sendMsgTo ($$$)
+sub _sendMsgTo ($$\$)
{
my ($self, $fd, $msg) = @_;
+ my $id = -1;
+ my $total = 0;
+ my @args = (); # tableau passé en reference aux fonctions
+ # anonymes compilées par eval pour receuillir les arguments filtrés
+ # par les regexp à envoyer sur le tuyau
+ my $sent;
+ my $regexpFunc;
+ my $bufEmiRef;
# pour toutes les fonctions de filtrage de regexp
- foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
- &{$regexpFunc} (\$msg) if defined $regexpFunc;
+ foreach $regexpFunc (@{$self->[sendRegList]{$fd}}) {
+ $id++;
+ # $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
+ next unless defined $regexpFunc;
+ next unless &{$regexpFunc} ($msg, \@args);
+ next unless @args;
+ $total ++;
+ shift @args;
+ $args[$#args] .= "\003" if @args;
+ $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd});
+ my $enCongestion = $$bufEmiRef ? 1 : 0;
+ $$bufEmiRef .= sprintf (MSG_FMT, MSG, $id, join ("\003",@args));
+ next if $enCongestion;
+ $sent = send ($fd, $$bufEmiRef, 0);
+ unless (defined $sent) {
+ # y a rien à faire
+ } elsif ($sent == length ($$bufEmiRef)) {
+ $$bufEmiRef = "";
+ } elsif ($sent >= 0) {
+ substr ($$bufEmiRef, 0, $sent, '');
+ _callCongestionCb ($self, $fd, 1);
+ if ($self->[blockOnSlowAgent]) {
+ my $win = '';
+ vec($win, fileno ($fd), 1) = 1;
+ select (undef, $win, undef, undef);
+ }
+ } else {
+ $self->_removeFileDescriptor ($fd) ;
+ }
}
+
+ return ($total);
} # end _sendMsgTo
@@ -1764,6 +1952,61 @@ sub _substituteEscapedChar ($$)
return $reg;
} # end _substituteEscapedChar
+############# Procedure __CALL CONGESTION CALLBACK
+sub _callCongestionCb ($$$)
+{
+ my ($self, $fd, $congestion) = @_;
+ my $appName;
+ my $addrInet;
+
+ EXT_LOOP:
+ foreach my $name (keys %{$self->[appliList]}) {
+ foreach my $fdp (@{$self->[appliList]{$name}}) {
+ if ($fd == $fdp) {
+ $appName = $name;
+ last EXT_LOOP;
+ }
+ }
+ }
+
+ if ($loopMode == LOCAL_MAINLOOP) {
+ if ($congestion) {
+ $localLoopSelWrite->add ($fd);
+ } else {
+ $localLoopSelWrite->remove ($fd);
+ }
+ } else {
+ if ($congestion) {
+ Tk::fileevent ('', $fd, 'writable',
+ sub {
+ my $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd});
+ my $sent = send ($fd, $$bufEmiRef, 0);
+ unless (defined $sent) {
+ # y a rien à faire
+ } elsif ($sent == length ($$bufEmiRef)) {
+ $$bufEmiRef = "";
+ _callCongestionCb ($self, $fd, 0);
+ } elsif ($sent >= 0) {
+ substr ($$bufEmiRef, 0, $sent, '');
+ } else {
+ $self->_removeFileDescriptor ($fd) ;
+ }
+ });
+ } else {
+ Tk::fileevent ('', $fd, 'writable', '');
+ }
+ }
+
+ if (defined $appName) {
+ $addrInet = $self->_inetAdrByName ($appName);
+ } else {
+ $appName = 'NONAME'; $addrInet = 'undef';
+ }
+
+ &{$self->[slowAgentFunc]} ($appName, $addrInet, $congestion);
+}
+
+
1;
__END__
@@ -1847,15 +2090,17 @@ The prototype of your method must be as follows:
...
}
-=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']>
+=item B<-filterRegexp =E<gt> ['subject 1', ..., 'subject n']>
Optimize communication using this option. Regexps
which don't match these subjects are removed.
- Example :
- Ivy->init(-loopMode =E<gt> 'TK',
- -appName =E<gt> 'MyWonderfulApp',
- -onDieFunc =E<gt> [\&restorecontext]);
+=item B<Example:>
+
+ Ivy->init(-loopMode => 'TK',
+ -appName => 'MyWonderfulApp',
+ -onDieFunc => [\&restorecontext] ,
+ -filterRegexp => ['MyWonderfulApp', 'ClockStart', 'ClockStop']);
=back
@@ -1906,7 +2151,7 @@ The prototype of your method must be as follows:
...
}
-=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']>
+=item B<-filterRegexp =E<gt> ['subject 1', ..., 'subject n']>
Optimize communication using this option. Regexps which don't match these subjects are removed.
@@ -1917,7 +2162,22 @@ before running.
=item B<-statusFunc =E<gt> sub {}>
-A callback which is called every time an agent C connects on the bus, disconnects from the bus, subscribes to a regexp, or unsubscribes to a regexp. When the agent A is stopping, this function is also called inside the agent A for every other agents Ci on the bus, as they are disconnecting. The first 3 parameters are a reference to an array of connected agents Ci, a reference to an array of not connected agents (according to the "-neededApp" argument of the new method / function), a reference to a hash table of connected agents Ci (giving the number of each agent). These 3 parameters are maintained for upwards compatibility but should no more be used, since the following three parameters are much easier to use: the name of an appearing / disapearing or subscribing / unsubscribing agent C, its status either "new" or "died" or "subscribing" or "unsubscribing", and the hostname where this agent C is running / dying OR the subscribed / unsubscribed regexp. If the hostname of this agent C is not known, it will be replaced by its IP address.
+A callback which is called every time an agent C connects on the bus,
+disconnects from the bus, subscribes to a regexp, or unsubscribes to a
+regexp. When the agent A is stopping, this function is also called
+inside the agent A for every other agents C on the bus, as they are
+disconnecting. The first 3 parameters are a reference to an array of
+connected agents Ci, a reference to an array of not connected agents
+(according to the "-neededApp" argument of the new method / function),
+a reference to a hash table of connected agents Ci (giving the number
+of each agent). These 3 parameters are maintained for upwards
+compatibility but should no more be used, since the following three
+parameters are much easier to use: the name of an appearing /
+disapearing or subscribing / unsubscribing agent C, its status either
+"new" or "died" or "subscribing" or "unsubscribing", and the hostname
+where this agent C is running / dying OR the subscribed / unsubscribed
+regexp. If the hostname of this agent C is not known, it will be
+replaced by its IP address.
@@ -1950,21 +2210,60 @@ Your callback could be:
}
-Example:
+
+
+=item B<-blockOnSlowAgent =E<gt> 0 or 1>
+
+Behavior when the bus is being congested due to an ivy agent which
+doesn't read messages sufficiently quickly. In blocking mode the local
+app will block on a send, so it won't be interactive until the send
+return, and it will at his turn don't read his pending message,
+leading to a global sluggishness of the entire ivy bus. In non
+blocking mode the messages are stocked until they could be sent, so
+the problem is the uncontrolled memory consumption.
+
+
+=item B<-slowAgentFunc=E<gt> \&congestionFunc >
+
+ A callback which is called every time a congestion event occurs. A
+ congestion event is emitted each time an agent is being congested,
+ or, after being congested is able to read his messages again. The
+ parameters are the name of the app, his address (hostname+port
+ number), and the state, 1 for congested, 0 for able to read.
+
+ Your callback could be:
+
+sub congestionFunc ($$$)
+{
+ my ($name, $addr, $state) = @_;
+ printf ("$name [$addr] %s\n", $state ? "CONGESTION" : "OK");
+}
+
+
+
+=item B<Example:>
Ivy->new(-ivyBus => '156,157:2204',
-onDieFunc => [\&restorecontext],
-neededApp => ["DataServer", "HMI"],
+ -slowAgentFunc=> \&congestionFunc,
+ -blockOnSlowAgent => 1,
-statusFunc => \&MyCallback);
=back
+
+
+
+
+
=item B<mainLoop>
Ivy->mainLoop;
Ivy::mainLoop;
+ $ivyobj->mainLoop;
-Local main events loop. Use it if you don't use the Tk library.
+ main events loop, call local mainloop or Tk::MainLoop according so specified mode
=item B<stop>