From 158f28aebb59ed0487d856589c2b9fab0ff25cc2 Mon Sep 17 00:00:00 2001 From: bustico Date: Tue, 27 Jun 2006 11:08:23 +0000 Subject: ° fix a bug : not all the send where completed in non blocking mode when non blocking mode was requested ° Optimisation : remove buffer copy when it's possible --- Ivy.pm | 220 +++++++++++++++++++++++++++++++++++------------------------------ 1 file changed, 120 insertions(+), 100 deletions(-) diff --git a/Ivy.pm b/Ivy.pm index ba3a043..638c6e0 100644 --- a/Ivy.pm +++ b/Ivy.pm @@ -30,9 +30,6 @@ # ################################################################## -# TODO -# - package Ivy ; use Sys::Hostname; @@ -48,6 +45,7 @@ use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); # to compute the VERSION from the CVS tag (or if no tag, as the cvs file revision) my $TAG= q$Name$; my $REVISION = q$Revision$ ; +$VERSION = '1.39' ; # for Makefile.PL ($VERSION) = $TAG =~ /^\D*([\d_]+)/ ; if (defined $VERSION and $VERSION ne "_") { $VERSION =~ s/_/\./g; @@ -86,7 +84,7 @@ sub start; # debut de l'integration au bus : sub DESTROY ($); # - envoie un BYE et clôt les connections -#sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack +sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack # ou d'annuler une precedente association sub bindDirect; # permet d'associer un identifiant de msg direct @@ -161,6 +159,10 @@ sub _substituteEscapedChar ($$); #permet de transformer une regexp etendue sub _callCongestionCb ($$$); # appelle la callback de notification de congestion, # si elle a été définie par l'utilisateur +sub _getNameByFileDes ($$); # retourne le nom de l'appi en fonction du filedes + # de la socket +sub _univSend ($$$); # effectue les send de manière bloquante ou non bloquante + # et accumule les messages si la socket est bloquée ############################################################################# #### CONSTANTES ##### ############################################################################# @@ -641,10 +643,10 @@ sub DESTROY ($) # the 2 previous lines seems to works with other ivy-perl applis # but DO NOT work with ivy-c api. # the 2 next lines works. This has to been validated! CM 21/12/2000 - if (defined $fd) { - send ($fd, sprintf (MSG_FMT, BYE, 0, ""), 0); - $self->_removeFileDescriptor ($fd); - } + if (defined $fd) { + _univSend ($self, $fd, sprintf (MSG_FMT, BYE, 0, "")); + $self->_removeFileDescriptor ($fd); + } } # on clot la socket de signalisation (UDP) @@ -693,7 +695,7 @@ sub start init (@_); $self = $globalIvy = new Ivy; } else { - $self = shift; + $globalIvy = $self = shift; } if ($self->[connSock]) { @@ -769,7 +771,7 @@ sub start ############### PROCEDURE BIND REGEXP -sub bindRegexp +sub bindRegexp ($$$) { my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy; my ($regexp, $cb) = @_; @@ -791,8 +793,13 @@ sub bindRegexp $regexp = _substituteEscapedChar ('outside', $regexp); # print ("DBG> regexp = $regexp\n"); - eval {my $test = "a" =~ /$regexp/ } ; # testing the regexp for avoiding - if ($@) { carp "Warning in Ivy::bindRegexp, ill-formed regexp: '$original_regexp'" ; return }; + if ($^W) { + eval {my $test = "a" =~ /$regexp/ } ; # testing the regexp for avoiding + if ($@) { + carp "Warning in Ivy::bindRegexp, ill-formed regexp: '$original_regexp'" ; + return + }; + } if ($cb) { my $id; @@ -821,8 +828,7 @@ sub bindRegexp $self->[recCbList][$id]->[1] = []; # on envoie le mesage delregexp foreach my $fd (values %{$self->[sockList]}) { - send ($fd, sprintf (MSG_FMT, DELREGEXP, $id,""), 0) - or $self->_removeFileDescriptor ($fd); + _univSend ($self, $fd, sprintf (MSG_FMT, DELREGEXP, $id,"")); } } } @@ -857,7 +863,7 @@ sub sendMsgs foreach my $msg (@_) { study ($msg); carp "Warning in Ivy::sendMsgs, a message contains a '\\n'. " . - "You should correct it:\n'$msg'" if ($msg =~ /\n/) ; + "You should correct it:\n'$msg'" if ($^W && ($msg =~ /\n/)) ; # pour routes les connections foreach $appSock (values %{$self->[sockList]}) { @@ -924,8 +930,7 @@ sub sendDirectMsgs carp "Warning in Ivy::sendDirectMsgs, a message contains a '\\n'. Skipping it:\n'$msg'" if ($msg =~ /\n/); foreach my $fd (@fds) { - send ($fd, sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"), 0) - or $self->_removeFileDescriptor ($fd); + _univSend ($self, $fd, sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"));; } } return 1; @@ -976,8 +981,7 @@ sub ping # pour tous les messages foreach my $fd (@fds) { - send ($fd, sprintf (MSG_FMT, PING, 0, " "), 0) - or $self->_removeFileDescriptor ($fd); + _univSend ($self, $fd, sprintf (MSG_FMT, PING, 0, " ")); } } else { @@ -993,6 +997,7 @@ sub mainLoop ($) my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy; my ($fd, @selRes, @allDesc); + if ($loopMode == TK_MAINLOOP) { eval {Tk::MainLoop ()}; return; @@ -1003,7 +1008,8 @@ sub mainLoop ($) while (defined $localLoopSelRead) { # READ - @selRes = IO::Select::select ($localLoopSelRead, $localLoopSelWrite, undef, $selectTimout) ; + @selRes = IO::Select::select ($localLoopSelRead, $localLoopSelWrite, undef , + $selectTimout) ; _scanAfter () ; foreach $fd (@{$selRes[0]}) { @@ -1144,6 +1150,7 @@ sub fileEvent ($$;$) delete $localBindByHandle{$fd}; # print ("DBG> Ivy::fileEvent : removing fd from the select\n"); $localLoopSelRead->remove ($fd); + $localLoopSelWrite->remove ($fd); } } # end fileEvent @@ -1309,46 +1316,37 @@ sub _getConnections ($) sub _getMessages ($$) { my ($self, $appSock) = @_; - my $mess; - my $buffer = ''; + my $bufferRef = \$self->[bufRecByCnnx]{$appSock}; my ($addr, $peerPort, $senderName); + my $nlIndex; + my $mess; # on recupere le message - recv ($appSock, $buffer, 65536, 0) ; - unless (length $buffer) { +# my $status = recv ($appSock, $buffer, 1048576, 0) ; + + unless (sysread ($appSock, $$bufferRef, 65536, length ($$bufferRef))) { # message null : broken pipe, ça s'est deconnecte a l'autre bout # on vire ce fd de la boucle d'evenements # print ("DBG> _getMessages, recv err, calling removeFileDesc.\n"); # Bon la il faudra un jour clarifier ce bordel, lister toutes - # les facons dont un couple d'applis connectee peuevent sortir et + # les facons dont un couple d'applis connectées peuvent sortir et # eviter les dead lock qui doivent subsister. - if (defined ($localLoopSelRead)) { - $self->_removeFileDescriptor ($appSock); - } - else { - $self->_removeFileDescriptor ($appSock); - } + $self->_removeFileDescriptor ($appSock); return; } - - if (length ($self->[bufRecByCnnx]{$appSock})) { - $buffer = $self->[bufRecByCnnx]{$appSock} . $buffer ; - $self->[bufRecByCnnx]{$appSock} = ''; - } - my @messages = split ('\n', $buffer) ; - $self->[bufRecByCnnx]{$appSock} = pop (@messages) unless - ($buffer =~ /\n$/) ; - # if (defined $appSock->peername) { $addr = $appSock->peeraddr(); $peerPort = $appSock->peerport() ; $senderName = $self->[cnnxion]{"$addr:$peerPort"} ; $senderName = "NONAME" unless $senderName; - foreach my $mess (@messages) { -# print "DBG> mess from $senderName '$mess'\n"; - +# my @messages = split ('\n', $$bufferRef) ; +# $$bufferRef = ($$bufferRef =~ /\n$/) ? '': pop (@messages) ; +# foreach $mess (@messages) { + while (($nlIndex= index ($$bufferRef, "\n")) > 0) { + $mess = substr ($$bufferRef, 0, $nlIndex, ''); + substr ($$bufferRef, 0, 1, ''); # on recupere les 3 champs : le type, le numero de regexp, les valeurs my ($type, $id, $valeurs) = $mess =~ /^(\d+) \s+ @@ -1396,7 +1394,12 @@ sub _getMessages ($$) # fois pour toute, et ainsi optimiser la vitesse de # filtrage des messages a envoyer # print "DBG> REGEXP from $senderName '$id' '$valeurs'\n"; - next if $self->_toBePruned ($senderName, $valeurs); + if ($self->_toBePruned ($senderName, $valeurs)) { + &_scanConnStatus ($self, $senderName, 'filtered', $valeurs); + next; + } else { + &_scanConnStatus ($self, $senderName, 'subscribing', $valeurs); + } unless (defined $self->[sendRegList]{$appSock}->[$id]) { # si l'id de regexp n'etait pas utilisee c'est tout bon # on affecte la nouvelle regexp a un id @@ -1537,34 +1540,29 @@ sub _sendWantedRegexp ($$) { my ($self, $appSock) = @_; my $connSock = $self->[connSock] ; - + my $msg; # on envoie le message "Nom appli" - send ($appSock, sprintf (MSG_FMT, APP_NAME, $connSock->sockport, $self->[appName]), 0) - or $self->_removeFileDescriptor ($appSock) ; - + _univSend ($self, $appSock, sprintf (MSG_FMT, APP_NAME, $connSock->sockport, + $self->[appName])); # on envoie les regexps for (my $id = 0; $id <= $#{$self->[recCbList]}; $id++) { next unless defined $self->[recCbList][$id]->[1]->[0]; - - send ($appSock, - sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]), - 0) or $self->_removeFileDescriptor ($appSock) ; + $msg = sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]); + _univSend ($self, $appSock, \$msg); # print sprintf ("DBG> %s %d %s\n", # 'REGEXP', $id, $self->[recCbList][$id]->[0]); } # on envoie le message de fin d'envoi de regexps - send ($appSock, sprintf (MSG_FMT, ENDREGEXP, 0, ""), 0) - or $self->_removeFileDescriptor ($appSock) ; + _univSend ($self, $appSock, sprintf (MSG_FMT, ENDREGEXP, 0, "")); } # end _sendWantedRegexp ############### METHODE SEND LAST REGEXP TO ALLREADY CONNECTED sub _sendLastRegexpToAllreadyConnected ($$) { my ($self, $id) = @_; - + my $msg = sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]); foreach my $fd (values %{$self->[sockList]}) { - send ($fd, sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]), - 0) or $self->_removeFileDescriptor ($fd) ; + _univSend ($self, $fd, \$msg); } } # end _sendLastRegexpToAllreadyConnected @@ -1590,11 +1588,9 @@ sub _removeFileDescriptor ($$) { my ($self, $fd) = @_; - my $diedAppName; - # on s'est deja occupe de lui return unless exists $self->[sockList]->{$fd}; - # printf ("DBG> _removeFileDescriptor IN thread %s\n", ${Thread->self}); + my $diedAppName = _getNameByFileDes ($self, $fd); # on efface les structures de donnees associees au fd # on vire ce fd des fd a scruter dans la bcle d'evenements @@ -1608,21 +1604,6 @@ sub _removeFileDescriptor ($$) $fd->close(); - EXT_LOOP: - foreach my $name (keys %{$self->[appliList]}) { - foreach my $fdp (@{$self->[appliList]{$name}}) { - if ($fd eq $fdp) { - $diedAppName = $name; - @{$self->[appliList]{$name}} = - grep ($_ ne $fdp, @{$self->[appliList]{$name}}); - if (scalar (@{$self->[appliList]{$name}}) == 0) { - delete $self->[appliList]->{$name} - } - last EXT_LOOP; - } - } - } - unless (defined $diedAppName) { warn "Ivy::__removeFileDescriptor : disconnection of NONAME\n" if $^W; return; @@ -1655,8 +1636,7 @@ sub _sendErrorTo ($$$) { my ($self, $fd, $error) = @_; - send ($fd, join (' ', ERROR, "0\002$error\n"), 0) - or $self->_removeFileDescriptor ($fd); + _univSend ($self, $fd, join (' ', ERROR, "0\002$error\n")); } # end _sendErrorTo @@ -1665,8 +1645,7 @@ sub _pong ($$) { my ($self, $fd) = @_; - send ($fd, join (' ', PONG, "0\002 \n"), 0) - or $self->_removeFileDescriptor ($fd); + _univSend ($self, $fd, join (' ', PONG, "0\002 \n")); } # end _pong @@ -1675,8 +1654,7 @@ sub _sendDieTo ($$) { my ($self, $fd) = @_; - send ($fd, join (' ', DIE, "0\002\n"), 0) - or $self->_removeFileDescriptor ($fd); + _univSend ($self, $fd, join (' ', DIE, "0\002\n")); } # end _sendDieTo @@ -1729,6 +1707,39 @@ sub _sendMsgTo ($$\$) } # end _sendMsgTo + +############### METHODE UNIV SEND +sub _univSend ($$$) +{ + my ($self, $fd, $msg) = @_; + + my $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd}); + my $enCongestion = $$bufEmiRef ? 1 : 0; + if (ref $msg) { + $$bufEmiRef .= $$msg; + } else { + $$bufEmiRef .= $msg; + } + return if $enCongestion; + my $sent = send ($fd, $$bufEmiRef, 0); + unless (defined $sent) { + # y a rien à faire + } elsif ($sent == length ($$bufEmiRef)) { + $$bufEmiRef = ""; + } elsif ($sent >= 0) { + substr ($$bufEmiRef, 0, $sent, ''); + _callCongestionCb ($self, $fd, 1); + if ($self->[blockOnSlowAgent]) { + my $win = ''; + vec($win, fileno ($fd), 1) = 1; + select (undef, $win, undef, undef); + } + } else { + $self->_removeFileDescriptor ($fd) ; + } +} + + ############### PROCEDURE TK FILE EVENT sub _tkFileEvent ($$) { @@ -1798,7 +1809,7 @@ sub _scanConnStatus ($$$$) # de meme nom sur le meme bus. # les nouveaux arguments sont: # le 4eme arg est l'appli nouvelle, deconnecté, qui s'abonne ou se desabonne - # le 5eme arg est le statut (actuellement: 'subscribing'|'unsubscribing'|'died'|'new') + # le 5eme arg est le statut (actuellement: 'subscribing'|'filtered'|'unsubscribing'|'died'|'new') # le 6eme arg est l'addresse de la machine sur laquelle tourne l'agent &{$self->[statusFunc]} ([keys %readyApp], \@nonReadyApp, \%readyApp, $appname, $status, $addr); } # end _scanConnStatus @@ -1821,8 +1832,6 @@ sub _toBePruned ($$$) return 1}; - &_scanConnStatus ($self, $cleaned_from, 'subscribing', $regexp); - # si il n'y a pas de liste de sujets, on ne # filtre pas return 0 unless @{$self->[topicRegexps]}; @@ -1956,18 +1965,8 @@ sub _substituteEscapedChar ($$) sub _callCongestionCb ($$$) { my ($self, $fd, $congestion) = @_; - my $appName; my $addrInet; - - EXT_LOOP: - foreach my $name (keys %{$self->[appliList]}) { - foreach my $fdp (@{$self->[appliList]{$name}}) { - if ($fd == $fdp) { - $appName = $name; - last EXT_LOOP; - } - } - } + my $appName = _getNameByFileDes ($self, $fd); if ($loopMode == LOCAL_MAINLOOP) { if ($congestion) { @@ -2006,6 +2005,23 @@ sub _callCongestionCb ($$$) &{$self->[slowAgentFunc]} ($appName, $addrInet, $congestion); } +############# Procedure __GET NAME BY FILEDES +sub _getNameByFileDes ($$) +{ + my ($self, $fd) = @_; + my $appName = 'NONAME'; + EXT_LOOP: + foreach my $name (keys %{$self->[appliList]}) { + foreach my $fdp (@{$self->[appliList]{$name}}) { + if ($fd == $fdp) { + $appName = $name; + last EXT_LOOP; + } + } + } + return $appName; +} + 1; @@ -2173,7 +2189,7 @@ a reference to a hash table of connected agents Ci (giving the number of each agent). These 3 parameters are maintained for upwards compatibility but should no more be used, since the following three parameters are much easier to use: the name of an appearing / -disapearing or subscribing / unsubscribing agent C, its status either +disapearing or subscribing / filtered / unsubscribing agent C, its status either "new" or "died" or "subscribing" or "unsubscribing", and the hostname where this agent C is running / dying OR the subscribed / unsubscribed regexp. If the hostname of this agent C is not known, it will be @@ -2204,6 +2220,9 @@ Your callback could be: elsif ($status eq "subscribing") { print "$appname subscribes to $host_or_regexp\n"; } + elsif ($status eq "filtered") { + print "$appname subscribes to FILTERED $host_or_regexp check -filterRegexp option\n"; + } elsif ($status eq "unsubscribing") { print "$appname unsubscribed to $host_or_regexp\n"; } @@ -2448,13 +2467,14 @@ No other known bugs at this time. If you find one, please report them to the aut perl(1), perlre(1), ivyprobe.pl(1) =head1 AUTHORS - -Alexandre Bustico , Herve Damiano , -Stephane Chatty , Christophe Mertz + Alexandre Bustico + Stéphane Chatty + Hervé Damiano + Christophe Mertz =head1 COPYRIGHT -CENA (C) 1997-2002 +CENA (C) 1997-2006 =head1 HISTORY -- cgit v1.1