From 4dd3bd225629bc0542b0c747d7c1add35ca912a1 Mon Sep 17 00:00:00 2001 From: bustico Date: Fri, 28 Jul 2006 11:29:39 +0000 Subject: Implementation des messages ping et pong --- Ivy.pm | 75 +++++++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 21 deletions(-) (limited to 'Ivy.pm') diff --git a/Ivy.pm b/Ivy.pm index d77b703..518254b 100644 --- a/Ivy.pm +++ b/Ivy.pm @@ -30,6 +30,7 @@ # ################################################################## + package Ivy ; use Sys::Hostname; @@ -95,7 +96,7 @@ sub sendAppNameMsgs; # envoie une liste de messages precedes # du nom de l'application sub sendDirectMsgs; # envoie une liste de messages directs a une appli sub sendDieTo; # envoie un <> a une appli -sub ping; # teste qu'une appli soit encore vivante +sub ping ($$\&); # teste qu'une appli soit encore vivante sub mainLoop ($); # la mainloop locale (sans tk) sub stop (); # methode de classe : on delete les bus, mais # on reste dans la mainloop @@ -138,7 +139,7 @@ sub _removeFileDescriptor ($$); # on vire un fd et les structures associees sub _sendErrorTo ($$$); #(fd, error) envoie un message d'erreur a un fd sub _sendDieTo ($$); #(fd) envoie un message de demande de suicide a un fd sub _sendMsgTo ($$\$); # (fd, message) -sub _pong ($$); # (fd) +sub _pong ($$$); # (fd) sub _tkFileEvent ($$); # associe un fd a un callback pour la mainloop tk sub _scanAfter () ; # parse si il faut appeler un callback associe a un after sub _myCanRead (); # interface au select @@ -161,7 +162,7 @@ sub _callCongestionCb ($$$); # appelle la callback de notification de congestion # si elle a été définie par l'utilisateur sub _getNameByFileDes ($$); # retourne le nom de l'appi en fonction du filedes - # de la socket + # de la socket sub _univSend ($$$); # effectue les send de manière bloquante ou non bloquante # et accumule les messages si la socket est bloquée ############################################################################# @@ -268,6 +269,9 @@ my $localLoopSelWrite; # table d'ass. handle -> callback my %localBindByHandle; +# table d'ass. fhd -> nom appli +my %nameByHandle; + # tableau d'ass [AFTER ou REPEAT, # timeTotal, deadLine, [callback, arg, arg, ...]] my %afterList=(); @@ -284,6 +288,8 @@ my %allBuses = (); # cache des nom retournés par gethostbyaddr pour _getHostByAddr my %hostNameByAddr = (); +my $pingId = 1; # identifiant d'un ping (renvoyé par le pong) + ############################################################################# #### CLEFS DES VARIABLES D'INSTANCE ##### #### ##### @@ -321,6 +327,7 @@ use constant useMulticast => $constantIndexer++; use constant appName => $constantIndexer++; use constant messWhenReady => $constantIndexer++; use constant uuid => $constantIndexer++; +use constant pongQueue => $constantIndexer++; ############################################################################# #### METHODES PUBLIQUES ##### @@ -512,6 +519,7 @@ sub new ($%) $self->[cnnxion] = {}; $self->[connectedUuid] = {}; + # tableau associatif, clef => file desc, # valeur :buffer au cas ou la lecture ne se termine # pas par \n, de maniere a resegmenter les messages @@ -526,6 +534,10 @@ sub new ($%) # identifiant unique $self->[uuid] = sprintf ("%d%d", time(), rand()*1e15); + # queue de gestion des pings : + # clef : socket fd, valeur :liste [timestamp, machine:port, callBack] + $self->[pongQueue] = {}; + my %optionsAndDefaults = ( -appName => $appName, # nom de l'appli @@ -978,25 +990,34 @@ sub sendDieTo ############### METHOD PING -sub ping +sub ping ($$\&) { my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy; - my ($to, $timeout) = @_; + my ($to, $pongCbRef) = @_; + my @fds; - if (defined $to and defined ($self->[appliList]{$to})) { + return unless defined $to; - my @fds = @{$self->[appliList]{$to}}; - - # pour tous les messages - foreach my $fd (@fds) { - _univSend ($self, $fd, sprintf (MSG_FMT, PING, 0, " ")); + if (defined ($self->[appliList]{$to})) { + @fds = @{$self->[appliList]{$to}}; + } else { + my %handleByName = reverse %nameByHandle; +# printf "DBG>>> all names : %s\n", join (', ', keys %handleByName); + if (exists $handleByName{$to}) { + @fds = ($handleByName{$to}); + } else { + carp "Warning in Ivy::ping, application '$to' is unknown" if $^W; + return 0; } } - else { - my $to_appli = (defined $to) ? $to : ''; - carp "Warning in Ivy::ping, application '$to_appli' is unknown" if $^W; - return 0; + + # pour tous les messages + foreach my $fd (@fds) { +# print ("DBG>> ping : send to fd $fd\n"); + $self->[pongQueue]->{$fd} = [$pingId, Time::HiRes::time(), $pongCbRef]; + _univSend ($self, $self->[sockList]->{$fd}, sprintf (MSG_FMT, PING, $pingId, "")); } + return ($pingId++); } # end ping ############### METHODE MAINLOOP @@ -1266,6 +1287,7 @@ sub _getBonjour ($) or die "Can't set flags for the socket: $!\n"; # on cree une entree pour $appSock dans la liste des regexp + $nameByHandle{$appSock}=_getHostByAddr($addr) .":$peerPort"; $self->[cnnxion]{"$addr:$peerPort"} = $udpAppName; $self->[sendRegList]{$appSock} = []; $self->[sendRegListSrc]{$appSock} = []; @@ -1486,7 +1508,7 @@ sub _getMessages ($$) $self->_scanConnStatus ($senderName, "new", "$host:$peerPort", undef); } elsif ($type == APP_NAME) { - # etat Connecte + # etat Connecte1558 if (($self->[appName] eq $valeurs) && $^W) { carp "\033[1mWarning in Ivy::_getMessages, there is already an instance of ". "$self->[appName] \033[m" ; @@ -1494,6 +1516,7 @@ sub _getMessages ($$) $senderName = $valeurs; $self->[cnnxion]{"$addr:$peerPort"} = "\004$valeurs"; + $nameByHandle{$appSock}=_getHostByAddr($addr) .":$peerPort"; } elsif ($type == DIRECT_MSG) { @@ -1534,10 +1557,16 @@ sub _getMessages ($$) } elsif ($type == PING) { # si on recois un ping, on envoie un pong - $self->_pong ($appSock); + $self->_pong ($appSock, $id); } elsif ($type == PONG) { - return PONG; + if (exists $self->[pongQueue]->{$appSock}) { + my ($pingid, $time, $funcRef) = @{$self->[pongQueue]->{$appSock}}; +# printf ("DBG>>> stocked Id = $pingid;; message id = $id\n"); + &$funcRef ((Time::HiRes::time()-$time)*1000, $nameByHandle{$appSock}) + if ($pingid == $id); + delete $self->[pongQueue]->{$appSock}; + } } else { _$self->sendErrorTo ($appSock, "TYPE DE MESS $type inconnu"); @@ -1635,6 +1664,7 @@ sub _removeFileDescriptor ($$) #printf "DBG> _removeFileDescriptor : deconnection de %s ($diedAppName)\n", _inetAdrByName ($diedAppName); delete $self->[cnnxion]{$addrInet}; + delete $nameByHandle{$fd}; # on vire l'entree correspondant a ce canal dans la liste des # regexps par canal @@ -1655,11 +1685,12 @@ sub _sendErrorTo ($$$) ############### METHODE PONG -sub _pong ($$) +sub _pong ($$$) { - my ($self, $fd) = @_; + my ($self, $fd, $pongId) = @_; - _univSend ($self, $fd, join (' ', PONG, "0\002 \n")); +# printf ("DBG>>> PONG Id = $pongId\n"); + _univSend ($self, $fd, sprintf (MSG_FMT, PONG, $pongId, "")); } # end _pong @@ -2021,6 +2052,7 @@ sub _callCongestionCb ($$$) } ############# Procedure __GET NAME BY FILEDES + sub _getNameByFileDes ($$) { my ($self, $fd) = @_; @@ -2039,6 +2071,7 @@ sub _getNameByFileDes ($$) + sub _getHostByAddr ($) { my $addr = shift; -- cgit v1.1