summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbustico2006-07-28 11:29:39 +0000
committerbustico2006-07-28 11:29:39 +0000
commit4dd3bd225629bc0542b0c747d7c1add35ca912a1 (patch)
tree016924989945e74a9c6d9ff444b52dd5abd2cc65
parentd90f29a556caff198063aa747ace9b73dd96513c (diff)
downloadivy-perl-4dd3bd225629bc0542b0c747d7c1add35ca912a1.zip
ivy-perl-4dd3bd225629bc0542b0c747d7c1add35ca912a1.tar.gz
ivy-perl-4dd3bd225629bc0542b0c747d7c1add35ca912a1.tar.bz2
ivy-perl-4dd3bd225629bc0542b0c747d7c1add35ca912a1.tar.xz
Implementation des messages ping et pong
-rw-r--r--Ivy.pm75
1 files changed, 54 insertions, 21 deletions
diff --git a/Ivy.pm b/Ivy.pm
index d77b703..518254b 100644
--- a/Ivy.pm
+++ b/Ivy.pm
@@ -30,6 +30,7 @@
#
##################################################################
+
package Ivy ;
use Sys::Hostname;
@@ -95,7 +96,7 @@ sub sendAppNameMsgs; # envoie une liste de messages precedes
# du nom de l'application
sub sendDirectMsgs; # envoie une liste de messages directs a une appli
sub sendDieTo; # envoie un <<kill>> a une appli
-sub ping; # teste qu'une appli soit encore vivante
+sub ping ($$\&); # teste qu'une appli soit encore vivante
sub mainLoop ($); # la mainloop locale (sans tk)
sub stop (); # methode de classe : on delete les bus, mais
# on reste dans la mainloop
@@ -138,7 +139,7 @@ sub _removeFileDescriptor ($$); # on vire un fd et les structures associees
sub _sendErrorTo ($$$); #(fd, error) envoie un message d'erreur a un fd
sub _sendDieTo ($$); #(fd) envoie un message de demande de suicide a un fd
sub _sendMsgTo ($$\$); # (fd, message)
-sub _pong ($$); # (fd)
+sub _pong ($$$); # (fd)
sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk
sub _scanAfter () ; # parse si il faut appeler un callback associe a un after
sub _myCanRead (); # interface au select
@@ -161,7 +162,7 @@ sub _callCongestionCb ($$$); # appelle la callback de notification de congestion
# si elle a été définie par l'utilisateur
sub _getNameByFileDes ($$); # retourne le nom de l'appi en fonction du filedes
- # de la socket
+ # de la socket
sub _univSend ($$$); # effectue les send de manière bloquante ou non bloquante
# et accumule les messages si la socket est bloquée
#############################################################################
@@ -268,6 +269,9 @@ my $localLoopSelWrite;
# table d'ass. handle -> callback
my %localBindByHandle;
+# table d'ass. fhd -> nom appli
+my %nameByHandle;
+
# tableau d'ass [AFTER ou REPEAT,
# timeTotal, deadLine, [callback, arg, arg, ...]]
my %afterList=();
@@ -284,6 +288,8 @@ my %allBuses = ();
# cache des nom retournés par gethostbyaddr pour _getHostByAddr
my %hostNameByAddr = ();
+my $pingId = 1; # identifiant d'un ping (renvoyé par le pong)
+
#############################################################################
#### CLEFS DES VARIABLES D'INSTANCE #####
#### #####
@@ -321,6 +327,7 @@ use constant useMulticast => $constantIndexer++;
use constant appName => $constantIndexer++;
use constant messWhenReady => $constantIndexer++;
use constant uuid => $constantIndexer++;
+use constant pongQueue => $constantIndexer++;
#############################################################################
#### METHODES PUBLIQUES #####
@@ -512,6 +519,7 @@ sub new ($%)
$self->[cnnxion] = {};
$self->[connectedUuid] = {};
+
# tableau associatif, clef => file desc,
# valeur :buffer au cas ou la lecture ne se termine
# pas par \n, de maniere a resegmenter les messages
@@ -526,6 +534,10 @@ sub new ($%)
# identifiant unique
$self->[uuid] = sprintf ("%d%d", time(), rand()*1e15);
+ # queue de gestion des pings :
+ # clef : socket fd, valeur :liste [timestamp, machine:port, callBack]
+ $self->[pongQueue] = {};
+
my %optionsAndDefaults = (
-appName => $appName,
# nom de l'appli
@@ -978,25 +990,34 @@ sub sendDieTo
############### METHOD PING
-sub ping
+sub ping ($$\&)
{
my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
- my ($to, $timeout) = @_;
+ my ($to, $pongCbRef) = @_;
+ my @fds;
- if (defined $to and defined ($self->[appliList]{$to})) {
+ return unless defined $to;
- my @fds = @{$self->[appliList]{$to}};
-
- # pour tous les messages
- foreach my $fd (@fds) {
- _univSend ($self, $fd, sprintf (MSG_FMT, PING, 0, " "));
+ if (defined ($self->[appliList]{$to})) {
+ @fds = @{$self->[appliList]{$to}};
+ } else {
+ my %handleByName = reverse %nameByHandle;
+# printf "DBG>>> all names : %s\n", join (', ', keys %handleByName);
+ if (exists $handleByName{$to}) {
+ @fds = ($handleByName{$to});
+ } else {
+ carp "Warning in Ivy::ping, application '$to' is unknown" if $^W;
+ return 0;
}
}
- else {
- my $to_appli = (defined $to) ? $to : '';
- carp "Warning in Ivy::ping, application '$to_appli' is unknown" if $^W;
- return 0;
+
+ # pour tous les messages
+ foreach my $fd (@fds) {
+# print ("DBG>> ping : send to fd $fd\n");
+ $self->[pongQueue]->{$fd} = [$pingId, Time::HiRes::time(), $pongCbRef];
+ _univSend ($self, $self->[sockList]->{$fd}, sprintf (MSG_FMT, PING, $pingId, ""));
}
+ return ($pingId++);
} # end ping
############### METHODE MAINLOOP
@@ -1266,6 +1287,7 @@ sub _getBonjour ($)
or die "Can't set flags for the socket: $!\n";
# on cree une entree pour $appSock dans la liste des regexp
+ $nameByHandle{$appSock}=_getHostByAddr($addr) .":$peerPort";
$self->[cnnxion]{"$addr:$peerPort"} = $udpAppName;
$self->[sendRegList]{$appSock} = [];
$self->[sendRegListSrc]{$appSock} = [];
@@ -1486,7 +1508,7 @@ sub _getMessages ($$)
$self->_scanConnStatus ($senderName, "new", "$host:$peerPort", undef);
}
elsif ($type == APP_NAME) {
- # etat Connecte
+ # etat Connecte1558
if (($self->[appName] eq $valeurs) && $^W) {
carp "\033[1mWarning in Ivy::_getMessages, there is already an instance of ".
"$self->[appName] \033[m" ;
@@ -1494,6 +1516,7 @@ sub _getMessages ($$)
$senderName = $valeurs;
$self->[cnnxion]{"$addr:$peerPort"} = "\004$valeurs";
+ $nameByHandle{$appSock}=_getHostByAddr($addr) .":$peerPort";
}
elsif ($type == DIRECT_MSG) {
@@ -1534,10 +1557,16 @@ sub _getMessages ($$)
}
elsif ($type == PING) {
# si on recois un ping, on envoie un pong
- $self->_pong ($appSock);
+ $self->_pong ($appSock, $id);
}
elsif ($type == PONG) {
- return PONG;
+ if (exists $self->[pongQueue]->{$appSock}) {
+ my ($pingid, $time, $funcRef) = @{$self->[pongQueue]->{$appSock}};
+# printf ("DBG>>> stocked Id = $pingid;; message id = $id\n");
+ &$funcRef ((Time::HiRes::time()-$time)*1000, $nameByHandle{$appSock})
+ if ($pingid == $id);
+ delete $self->[pongQueue]->{$appSock};
+ }
}
else {
_$self->sendErrorTo ($appSock, "TYPE DE MESS $type inconnu");
@@ -1635,6 +1664,7 @@ sub _removeFileDescriptor ($$)
#printf "DBG> _removeFileDescriptor : deconnection de %s ($diedAppName)\n", _inetAdrByName ($diedAppName);
delete $self->[cnnxion]{$addrInet};
+ delete $nameByHandle{$fd};
# on vire l'entree correspondant a ce canal dans la liste des
# regexps par canal
@@ -1655,11 +1685,12 @@ sub _sendErrorTo ($$$)
############### METHODE PONG
-sub _pong ($$)
+sub _pong ($$$)
{
- my ($self, $fd) = @_;
+ my ($self, $fd, $pongId) = @_;
- _univSend ($self, $fd, join (' ', PONG, "0\002 \n"));
+# printf ("DBG>>> PONG Id = $pongId\n");
+ _univSend ($self, $fd, sprintf (MSG_FMT, PONG, $pongId, ""));
} # end _pong
@@ -2021,6 +2052,7 @@ sub _callCongestionCb ($$$)
}
############# Procedure __GET NAME BY FILEDES
+
sub _getNameByFileDes ($$)
{
my ($self, $fd) = @_;
@@ -2039,6 +2071,7 @@ sub _getNameByFileDes ($$)
+
sub _getHostByAddr ($)
{
my $addr = shift;