summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbustico2006-06-27 11:08:23 +0000
committerbustico2006-06-27 11:08:23 +0000
commit158f28aebb59ed0487d856589c2b9fab0ff25cc2 (patch)
treed246583cdded9a5e2b5309c694449689fc362214
parentd2597b1ce78b12461c2bb1c90337972e363a23ab (diff)
downloadivy-perl-158f28aebb59ed0487d856589c2b9fab0ff25cc2.zip
ivy-perl-158f28aebb59ed0487d856589c2b9fab0ff25cc2.tar.gz
ivy-perl-158f28aebb59ed0487d856589c2b9fab0ff25cc2.tar.bz2
ivy-perl-158f28aebb59ed0487d856589c2b9fab0ff25cc2.tar.xz
° fix a bug : not all the send where completed in non blocking mode when non blocking
mode was requested ° Optimisation : remove buffer copy when it's possible
-rw-r--r--Ivy.pm220
1 files changed, 120 insertions, 100 deletions
diff --git a/Ivy.pm b/Ivy.pm
index ba3a043..638c6e0 100644
--- a/Ivy.pm
+++ b/Ivy.pm
@@ -30,9 +30,6 @@
#
##################################################################
-# TODO
-#
-
package Ivy ;
use Sys::Hostname;
@@ -48,6 +45,7 @@ use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
# to compute the VERSION from the CVS tag (or if no tag, as the cvs file revision)
my $TAG= q$Name$;
my $REVISION = q$Revision$ ;
+$VERSION = '1.39' ; # for Makefile.PL
($VERSION) = $TAG =~ /^\D*([\d_]+)/ ;
if (defined $VERSION and $VERSION ne "_") {
$VERSION =~ s/_/\./g;
@@ -86,7 +84,7 @@ sub start; # debut de l'integration au bus :
sub DESTROY ($); # - envoie un BYE et clôt les connections
-#sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack
+sub bindRegexp ($$$) ; # permet d'associer une regexp avec un callBack
# ou d'annuler une precedente association
sub bindDirect; # permet d'associer un identifiant de msg direct
@@ -161,6 +159,10 @@ sub _substituteEscapedChar ($$); #permet de transformer une regexp etendue
sub _callCongestionCb ($$$); # appelle la callback de notification de congestion,
# si elle a été définie par l'utilisateur
+sub _getNameByFileDes ($$); # retourne le nom de l'appi en fonction du filedes
+ # de la socket
+sub _univSend ($$$); # effectue les send de manière bloquante ou non bloquante
+ # et accumule les messages si la socket est bloquée
#############################################################################
#### CONSTANTES #####
#############################################################################
@@ -641,10 +643,10 @@ sub DESTROY ($)
# the 2 previous lines seems to works with other ivy-perl applis
# but DO NOT work with ivy-c api.
# the 2 next lines works. This has to been validated! CM 21/12/2000
- if (defined $fd) {
- send ($fd, sprintf (MSG_FMT, BYE, 0, ""), 0);
- $self->_removeFileDescriptor ($fd);
- }
+ if (defined $fd) {
+ _univSend ($self, $fd, sprintf (MSG_FMT, BYE, 0, ""));
+ $self->_removeFileDescriptor ($fd);
+ }
}
# on clot la socket de signalisation (UDP)
@@ -693,7 +695,7 @@ sub start
init (@_);
$self = $globalIvy = new Ivy;
} else {
- $self = shift;
+ $globalIvy = $self = shift;
}
if ($self->[connSock]) {
@@ -769,7 +771,7 @@ sub start
############### PROCEDURE BIND REGEXP
-sub bindRegexp
+sub bindRegexp ($$$)
{
my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
my ($regexp, $cb) = @_;
@@ -791,8 +793,13 @@ sub bindRegexp
$regexp = _substituteEscapedChar ('outside', $regexp);
# print ("DBG> regexp = $regexp\n");
- eval {my $test = "a" =~ /$regexp/ } ; # testing the regexp for avoiding
- if ($@) { carp "Warning in Ivy::bindRegexp, ill-formed regexp: '$original_regexp'" ; return };
+ if ($^W) {
+ eval {my $test = "a" =~ /$regexp/ } ; # testing the regexp for avoiding
+ if ($@) {
+ carp "Warning in Ivy::bindRegexp, ill-formed regexp: '$original_regexp'" ;
+ return
+ };
+ }
if ($cb) {
my $id;
@@ -821,8 +828,7 @@ sub bindRegexp
$self->[recCbList][$id]->[1] = [];
# on envoie le mesage delregexp
foreach my $fd (values %{$self->[sockList]}) {
- send ($fd, sprintf (MSG_FMT, DELREGEXP, $id,""), 0)
- or $self->_removeFileDescriptor ($fd);
+ _univSend ($self, $fd, sprintf (MSG_FMT, DELREGEXP, $id,""));
}
}
}
@@ -857,7 +863,7 @@ sub sendMsgs
foreach my $msg (@_) {
study ($msg);
carp "Warning in Ivy::sendMsgs, a message contains a '\\n'. " .
- "You should correct it:\n'$msg'" if ($msg =~ /\n/) ;
+ "You should correct it:\n'$msg'" if ($^W && ($msg =~ /\n/)) ;
# pour routes les connections
foreach $appSock (values %{$self->[sockList]}) {
@@ -924,8 +930,7 @@ sub sendDirectMsgs
carp "Warning in Ivy::sendDirectMsgs, a message contains a '\\n'. Skipping it:\n'$msg'" if ($msg =~ /\n/);
foreach my $fd (@fds) {
- send ($fd, sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"), 0)
- or $self->_removeFileDescriptor ($fd);
+ _univSend ($self, $fd, sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"));;
}
}
return 1;
@@ -976,8 +981,7 @@ sub ping
# pour tous les messages
foreach my $fd (@fds) {
- send ($fd, sprintf (MSG_FMT, PING, 0, " "), 0)
- or $self->_removeFileDescriptor ($fd);
+ _univSend ($self, $fd, sprintf (MSG_FMT, PING, 0, " "));
}
}
else {
@@ -993,6 +997,7 @@ sub mainLoop ($)
my $self = ref($_[0]) eq __PACKAGE__ ? shift : $globalIvy;
my ($fd, @selRes, @allDesc);
+
if ($loopMode == TK_MAINLOOP) {
eval {Tk::MainLoop ()};
return;
@@ -1003,7 +1008,8 @@ sub mainLoop ($)
while (defined $localLoopSelRead) {
# READ
- @selRes = IO::Select::select ($localLoopSelRead, $localLoopSelWrite, undef, $selectTimout) ;
+ @selRes = IO::Select::select ($localLoopSelRead, $localLoopSelWrite, undef ,
+ $selectTimout) ;
_scanAfter () ;
foreach $fd (@{$selRes[0]}) {
@@ -1144,6 +1150,7 @@ sub fileEvent ($$;$)
delete $localBindByHandle{$fd};
# print ("DBG> Ivy::fileEvent : removing fd from the select\n");
$localLoopSelRead->remove ($fd);
+ $localLoopSelWrite->remove ($fd);
}
} # end fileEvent
@@ -1309,46 +1316,37 @@ sub _getConnections ($)
sub _getMessages ($$)
{
my ($self, $appSock) = @_;
- my $mess;
- my $buffer = '';
+ my $bufferRef = \$self->[bufRecByCnnx]{$appSock};
my ($addr, $peerPort, $senderName);
+ my $nlIndex;
+ my $mess;
# on recupere le message
- recv ($appSock, $buffer, 65536, 0) ;
- unless (length $buffer) {
+# my $status = recv ($appSock, $buffer, 1048576, 0) ;
+
+ unless (sysread ($appSock, $$bufferRef, 65536, length ($$bufferRef))) {
# message null : broken pipe, ça s'est deconnecte a l'autre bout
# on vire ce fd de la boucle d'evenements
# print ("DBG> _getMessages, recv err, calling removeFileDesc.\n");
# Bon la il faudra un jour clarifier ce bordel, lister toutes
- # les facons dont un couple d'applis connectee peuevent sortir et
+ # les facons dont un couple d'applis connectées peuvent sortir et
# eviter les dead lock qui doivent subsister.
- if (defined ($localLoopSelRead)) {
- $self->_removeFileDescriptor ($appSock);
- }
- else {
- $self->_removeFileDescriptor ($appSock);
- }
+ $self->_removeFileDescriptor ($appSock);
return;
}
-
- if (length ($self->[bufRecByCnnx]{$appSock})) {
- $buffer = $self->[bufRecByCnnx]{$appSock} . $buffer ;
- $self->[bufRecByCnnx]{$appSock} = '';
- }
- my @messages = split ('\n', $buffer) ;
- $self->[bufRecByCnnx]{$appSock} = pop (@messages) unless
- ($buffer =~ /\n$/) ;
-
# if (defined $appSock->peername) {
$addr = $appSock->peeraddr();
$peerPort = $appSock->peerport() ;
$senderName = $self->[cnnxion]{"$addr:$peerPort"} ;
$senderName = "NONAME" unless $senderName;
- foreach my $mess (@messages) {
-# print "DBG> mess from $senderName '$mess'\n";
-
+# my @messages = split ('\n', $$bufferRef) ;
+# $$bufferRef = ($$bufferRef =~ /\n$/) ? '': pop (@messages) ;
+# foreach $mess (@messages) {
+ while (($nlIndex= index ($$bufferRef, "\n")) > 0) {
+ $mess = substr ($$bufferRef, 0, $nlIndex, '');
+ substr ($$bufferRef, 0, 1, '');
# on recupere les 3 champs : le type, le numero de regexp, les valeurs
my ($type, $id, $valeurs) = $mess =~ /^(\d+)
\s+
@@ -1396,7 +1394,12 @@ sub _getMessages ($$)
# fois pour toute, et ainsi optimiser la vitesse de
# filtrage des messages a envoyer
# print "DBG> REGEXP from $senderName '$id' '$valeurs'\n";
- next if $self->_toBePruned ($senderName, $valeurs);
+ if ($self->_toBePruned ($senderName, $valeurs)) {
+ &_scanConnStatus ($self, $senderName, 'filtered', $valeurs);
+ next;
+ } else {
+ &_scanConnStatus ($self, $senderName, 'subscribing', $valeurs);
+ }
unless (defined $self->[sendRegList]{$appSock}->[$id]) {
# si l'id de regexp n'etait pas utilisee c'est tout bon
# on affecte la nouvelle regexp a un id
@@ -1537,34 +1540,29 @@ sub _sendWantedRegexp ($$)
{
my ($self, $appSock) = @_;
my $connSock = $self->[connSock] ;
-
+ my $msg;
# on envoie le message "Nom appli"
- send ($appSock, sprintf (MSG_FMT, APP_NAME, $connSock->sockport, $self->[appName]), 0)
- or $self->_removeFileDescriptor ($appSock) ;
-
+ _univSend ($self, $appSock, sprintf (MSG_FMT, APP_NAME, $connSock->sockport,
+ $self->[appName]));
# on envoie les regexps
for (my $id = 0; $id <= $#{$self->[recCbList]}; $id++) {
next unless defined $self->[recCbList][$id]->[1]->[0];
-
- send ($appSock,
- sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]),
- 0) or $self->_removeFileDescriptor ($appSock) ;
+ $msg = sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]);
+ _univSend ($self, $appSock, \$msg);
# print sprintf ("DBG> %s %d %s\n",
# 'REGEXP', $id, $self->[recCbList][$id]->[0]);
}
# on envoie le message de fin d'envoi de regexps
- send ($appSock, sprintf (MSG_FMT, ENDREGEXP, 0, ""), 0)
- or $self->_removeFileDescriptor ($appSock) ;
+ _univSend ($self, $appSock, sprintf (MSG_FMT, ENDREGEXP, 0, ""));
} # end _sendWantedRegexp
############### METHODE SEND LAST REGEXP TO ALLREADY CONNECTED
sub _sendLastRegexpToAllreadyConnected ($$)
{
my ($self, $id) = @_;
-
+ my $msg = sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]);
foreach my $fd (values %{$self->[sockList]}) {
- send ($fd, sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]),
- 0) or $self->_removeFileDescriptor ($fd) ;
+ _univSend ($self, $fd, \$msg);
}
} # end _sendLastRegexpToAllreadyConnected
@@ -1590,11 +1588,9 @@ sub _removeFileDescriptor ($$)
{
my ($self, $fd) = @_;
- my $diedAppName;
-
# on s'est deja occupe de lui
return unless exists $self->[sockList]->{$fd};
- # printf ("DBG> _removeFileDescriptor IN thread %s\n", ${Thread->self});
+ my $diedAppName = _getNameByFileDes ($self, $fd);
# on efface les structures de donnees associees au fd
# on vire ce fd des fd a scruter dans la bcle d'evenements
@@ -1608,21 +1604,6 @@ sub _removeFileDescriptor ($$)
$fd->close();
- EXT_LOOP:
- foreach my $name (keys %{$self->[appliList]}) {
- foreach my $fdp (@{$self->[appliList]{$name}}) {
- if ($fd eq $fdp) {
- $diedAppName = $name;
- @{$self->[appliList]{$name}} =
- grep ($_ ne $fdp, @{$self->[appliList]{$name}});
- if (scalar (@{$self->[appliList]{$name}}) == 0) {
- delete $self->[appliList]->{$name}
- }
- last EXT_LOOP;
- }
- }
- }
-
unless (defined $diedAppName) {
warn "Ivy::__removeFileDescriptor : disconnection of NONAME\n" if $^W;
return;
@@ -1655,8 +1636,7 @@ sub _sendErrorTo ($$$)
{
my ($self, $fd, $error) = @_;
- send ($fd, join (' ', ERROR, "0\002$error\n"), 0)
- or $self->_removeFileDescriptor ($fd);
+ _univSend ($self, $fd, join (' ', ERROR, "0\002$error\n"));
} # end _sendErrorTo
@@ -1665,8 +1645,7 @@ sub _pong ($$)
{
my ($self, $fd) = @_;
- send ($fd, join (' ', PONG, "0\002 \n"), 0)
- or $self->_removeFileDescriptor ($fd);
+ _univSend ($self, $fd, join (' ', PONG, "0\002 \n"));
} # end _pong
@@ -1675,8 +1654,7 @@ sub _sendDieTo ($$)
{
my ($self, $fd) = @_;
- send ($fd, join (' ', DIE, "0\002\n"), 0)
- or $self->_removeFileDescriptor ($fd);
+ _univSend ($self, $fd, join (' ', DIE, "0\002\n"));
} # end _sendDieTo
@@ -1729,6 +1707,39 @@ sub _sendMsgTo ($$\$)
} # end _sendMsgTo
+
+############### METHODE UNIV SEND
+sub _univSend ($$$)
+{
+ my ($self, $fd, $msg) = @_;
+
+ my $bufEmiRef = \($self->[bufEmiByCnnx]->{$fd});
+ my $enCongestion = $$bufEmiRef ? 1 : 0;
+ if (ref $msg) {
+ $$bufEmiRef .= $$msg;
+ } else {
+ $$bufEmiRef .= $msg;
+ }
+ return if $enCongestion;
+ my $sent = send ($fd, $$bufEmiRef, 0);
+ unless (defined $sent) {
+ # y a rien à faire
+ } elsif ($sent == length ($$bufEmiRef)) {
+ $$bufEmiRef = "";
+ } elsif ($sent >= 0) {
+ substr ($$bufEmiRef, 0, $sent, '');
+ _callCongestionCb ($self, $fd, 1);
+ if ($self->[blockOnSlowAgent]) {
+ my $win = '';
+ vec($win, fileno ($fd), 1) = 1;
+ select (undef, $win, undef, undef);
+ }
+ } else {
+ $self->_removeFileDescriptor ($fd) ;
+ }
+}
+
+
############### PROCEDURE TK FILE EVENT
sub _tkFileEvent ($$)
{
@@ -1798,7 +1809,7 @@ sub _scanConnStatus ($$$$)
# de meme nom sur le meme bus.
# les nouveaux arguments sont:
# le 4eme arg est l'appli nouvelle, deconnecté, qui s'abonne ou se desabonne
- # le 5eme arg est le statut (actuellement: 'subscribing'|'unsubscribing'|'died'|'new')
+ # le 5eme arg est le statut (actuellement: 'subscribing'|'filtered'|'unsubscribing'|'died'|'new')
# le 6eme arg est l'addresse de la machine sur laquelle tourne l'agent
&{$self->[statusFunc]} ([keys %readyApp], \@nonReadyApp, \%readyApp, $appname, $status, $addr);
} # end _scanConnStatus
@@ -1821,8 +1832,6 @@ sub _toBePruned ($$$)
return 1};
- &_scanConnStatus ($self, $cleaned_from, 'subscribing', $regexp);
-
# si il n'y a pas de liste de sujets, on ne
# filtre pas
return 0 unless @{$self->[topicRegexps]};
@@ -1956,18 +1965,8 @@ sub _substituteEscapedChar ($$)
sub _callCongestionCb ($$$)
{
my ($self, $fd, $congestion) = @_;
- my $appName;
my $addrInet;
-
- EXT_LOOP:
- foreach my $name (keys %{$self->[appliList]}) {
- foreach my $fdp (@{$self->[appliList]{$name}}) {
- if ($fd == $fdp) {
- $appName = $name;
- last EXT_LOOP;
- }
- }
- }
+ my $appName = _getNameByFileDes ($self, $fd);
if ($loopMode == LOCAL_MAINLOOP) {
if ($congestion) {
@@ -2006,6 +2005,23 @@ sub _callCongestionCb ($$$)
&{$self->[slowAgentFunc]} ($appName, $addrInet, $congestion);
}
+############# Procedure __GET NAME BY FILEDES
+sub _getNameByFileDes ($$)
+{
+ my ($self, $fd) = @_;
+ my $appName = 'NONAME';
+ EXT_LOOP:
+ foreach my $name (keys %{$self->[appliList]}) {
+ foreach my $fdp (@{$self->[appliList]{$name}}) {
+ if ($fd == $fdp) {
+ $appName = $name;
+ last EXT_LOOP;
+ }
+ }
+ }
+ return $appName;
+}
+
1;
@@ -2173,7 +2189,7 @@ a reference to a hash table of connected agents Ci (giving the number
of each agent). These 3 parameters are maintained for upwards
compatibility but should no more be used, since the following three
parameters are much easier to use: the name of an appearing /
-disapearing or subscribing / unsubscribing agent C, its status either
+disapearing or subscribing / filtered / unsubscribing agent C, its status either
"new" or "died" or "subscribing" or "unsubscribing", and the hostname
where this agent C is running / dying OR the subscribed / unsubscribed
regexp. If the hostname of this agent C is not known, it will be
@@ -2204,6 +2220,9 @@ Your callback could be:
elsif ($status eq "subscribing") {
print "$appname subscribes to $host_or_regexp\n";
}
+ elsif ($status eq "filtered") {
+ print "$appname subscribes to FILTERED $host_or_regexp check -filterRegexp option\n";
+ }
elsif ($status eq "unsubscribing") {
print "$appname unsubscribed to $host_or_regexp\n";
}
@@ -2448,13 +2467,14 @@ No other known bugs at this time. If you find one, please report them to the aut
perl(1), perlre(1), ivyprobe.pl(1)
=head1 AUTHORS
-
-Alexandre Bustico <bustico@cena.fr>, Herve Damiano <damiano@cena.fr>,
-Stephane Chatty <chatty@cena.fr>, Christophe Mertz <mertz@cena.fr>
+ Alexandre Bustico <alexandre.bustico@cena.fr>
+ Stéphane Chatty <chatty@intuilab.com>
+ Hervé Damiano <herve.damiano@aviation-civile.gouv.fr>
+ Christophe Mertz <mertz@intuilab.com>
=head1 COPYRIGHT
-CENA (C) 1997-2002
+CENA (C) 1997-2006
=head1 HISTORY