summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbustico2006-06-27 11:14:22 +0000
committerbustico2006-06-27 11:14:22 +0000
commita22fbfd2903894b44b871d88c3f1c8a52e13bc23 (patch)
treefdfadad8c189033f2469147b365d498a83db33d8
parent158f28aebb59ed0487d856589c2b9fab0ff25cc2 (diff)
downloadivy-perl-a22fbfd2903894b44b871d88c3f1c8a52e13bc23.zip
ivy-perl-a22fbfd2903894b44b871d88c3f1c8a52e13bc23.tar.gz
ivy-perl-a22fbfd2903894b44b871d88c3f1c8a52e13bc23.tar.bz2
ivy-perl-a22fbfd2903894b44b871d88c3f1c8a52e13bc23.tar.xz
° fix a bug : not all the send where completed in non blocking mode when non blocking
mode was requested ° Optimisation : remove buffer copy when it's possible ivyprobe.pl : ° use non blocking mode, ° fix bug with the use of gnu readline : now editing previous entries is possible ° add -regexpFile regexpfile option : ivyprobe.pl will bind qll the regexp which are in the file given in argument. ° add -filter class1,classe2,...,classeN : add the possibility to filter messages for test/debug purpose. testCongestionTk.pl : simple demo which demonstrate non blocking mode with Tk
-rwxr-xr-xexample/ivyprobe.pl212
-rwxr-xr-xexample/testCongestionTk.pl140
2 files changed, 279 insertions, 73 deletions
diff --git a/example/ivyprobe.pl b/example/ivyprobe.pl
index 2dd9b0e..d920f24 100755
--- a/example/ivyprobe.pl
+++ b/example/ivyprobe.pl
@@ -9,7 +9,7 @@
# modify it under the terms of the GNU GPL General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
-#
+#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
@@ -25,17 +25,16 @@ use strict;
use Ivy;
use Getopt::Long;
use Term::ReadLine;
-#use Tk;
-use Carp;
-my $term = Term::ReadLine->new("ivyprobe.pl");
-#$term->tkRunning;
-
-my $OUT = $term->OUT || *STDOUT{IO};
+use Carp;
my $appliname = "IVYPROBE.PL";
my $bus;
my $timestamp = 0;
+my $noReadLineMode ;
+my $regexpFile;
+my $classes;
+my @classes = ();
# for each application gives the number of running instances
my %connected_applications;
@@ -45,45 +44,87 @@ my %where_applications;
&check_options;
+if (defined $classes) {
+ @classes =split(/:/, $classes);
+ printf ("DBG> CLASSES = %s\n", join (" , ", @classes));
+}
+
+
+unless (defined $noReadLineMode) {
+ my $pid;
+ pipe (PIPE_READ, PIPE_WRITE);
+ select PIPE_WRITE; $| = 1;
+ select STDOUT; $| = 1;
+
+ if (($pid = fork() == 0)) {
+ # code du fils qui lit dans le pipe
+ close (PIPE_WRITE);
+ open (STDIN, "<&PIPE_READ") or die "Can't dup STDIN on PIPE_READ: $!";
+ } else {
+ close (PIPE_READ);
+ my $term = Term::ReadLine->new("ivyprobe.pl");
+ $term->SetHistory ();
+
+ while (defined ($_ = $term->readline("> "))) {
+ chomp;
+ print PIPE_WRITE "$_\n" ;
+ #$term->addhistory($_) if /\S/;
+ }
+ kill (15, $pid);
+ waitpid ($pid, 0);
+ exit (0);
+ }
+}
+
+
Ivy->init (-ivyBus => (defined $bus) ? $bus : undef,
-appName => $appliname,
-# -loopMode => 'TK',
-loopMode => 'LOCAL',
-messWhenReady => "$appliname READY",
+ -filterRegexp => \@classes
);
my $Ivyobj = Ivy->new(-statusFunc => \&statusFunc,
+ -slowAgentFunc=> \&congestionCallback,
+ -blockOnSlowAgent => 0,
);
foreach my $regexp (@ARGV) {
- print $OUT "binding to $regexp\n";
+ print "binding to $regexp\n";
if ($regexp =~ /'(.*)'/) { $regexp = $1; }
- $Ivyobj->bindRegexp($regexp, [ "unused", \&callback] );
+ $Ivyobj->bindRegexp($regexp, [ \&callback] );
}
-$Ivyobj->start;
+if (defined ($regexpFile)) {
+ open (RF, $regexpFile) || die "could not read rexp file $regexpFile\n";
+ while ($_ = <RF>) {
+ last if eof (RF);
+ chomp;
+ next unless (length ($_) > 4);
+ $Ivyobj->bindRegexp($_, [ \&callback] ) ;
+# printf ("DBG> subscribe to '$_'\n");
+ }
+ close (RF);
+}
-my $IN = *STDIN{IO};
+
+$Ivyobj->start;
sub cb {
- my $line = $term->readline("");
- $term->addhistory($line);
+ my $line = <STDIN>;
chomp $line;
exit if (&interpret_line ($line));
}
-Ivy->fileEvent($IN, \&cb);
-Ivy->mainLoop;
-#my $mw = MainWindow->new();
-#$mw->fileevent($IN, 'readable' => \&cb);
-#MainLoop;
+$Ivyobj->fileEvent(*STDIN, \&cb);
+$Ivyobj->mainLoop();
sub printtime {
return if (!$timestamp);
my ($sec,$min,$hour) = localtime();
- printf $OUT "[%02d:%02d:%02d] ", $hour, $min, $sec;
+ printf "[%02d:%02d:%02d] ", $hour, $min, $sec;
}
# this function has 3 additionnal parameters till Ivy Version 4.6
@@ -94,27 +135,31 @@ sub statusFunc {
if ($status eq "new") {
&printtime;
- print $OUT "$appname connected from $host_or_regexp\n";
+ print "$appname connected from $host_or_regexp\n";
$where_applications{"$appname:$host_or_regexp"}++;
}
elsif ($status eq "died") {
&printtime;
- print $OUT "$appname disconnected from $host_or_regexp\n";
+ print "$appname disconnected from $host_or_regexp\n";
$where_applications{"$appname:$host_or_regexp"}--;
}
elsif ($status eq 'subscribing') {
&printtime;
- print $OUT "$appname subscribed to '$host_or_regexp'\n";
+ print "$appname subscribed to '$host_or_regexp'\n";
}
elsif ($status eq 'unsubscribing') {
&printtime;
- print $OUT "$appname unsubscribed to '$host_or_regexp'\n";
+ print "$appname unsubscribed to '$host_or_regexp'\n";
+ }
+ elsif ($status eq 'filtered') {
+ &printtime;
+ print "$appname subscribed to *FILTERED* '$host_or_regexp'\n";
}
else {
&printtime;
- print $OUT "Bug: unkown status; $status in &statusFunc\n";
+ print "Bug: unkown status; $status in &statusFunc\n";
}
-
+
%connected_applications = %$ref_hashReady;
}
@@ -127,9 +172,9 @@ sub interpret_line {
if ($str =~ /^([^\.])/ or $str =~ /^\\/) {
my $count=$Ivyobj->sendMsgs($str);
&printtime;
- print $OUT "-> Sent to $count peer";
- if ($count > 1) { print $OUT "s" }
- print $OUT "\n";
+ print "-> Sent to $count peer";
+ if ($count > 1) { print "s" }
+ print "\n";
return 0;
}
if ($str =~ /^\.q(uit)?\s*$/) {
@@ -152,22 +197,22 @@ sub interpret_line {
if ($str =~ /^.b(ind)?\s+(.*)$/) {
my $regexp = $2;
if ($regexp =~ /'(.*)'/) { $regexp = $1; }
- print $OUT "binding $regexp\n";
- $Ivyobj->bindRegexp($regexp, [ "unused", \&callback] );
+ print "binding $regexp\n";
+ $Ivyobj->bindRegexp($regexp, [ \&callback] );
return 0;
}
if ($str =~ /^.u(nbind)?\s+(.*)$/) {
my $regexp = $2;
if ($regexp =~ /'(.*)'/) { $regexp = $1; }
- print $OUT "unbinding $regexp\n";
+ print "unbinding $regexp\n";
$Ivyobj->bindRegexp($regexp);
return 0;
}
if ($str =~ /^.db(ind)?\s+(.*)$/) {
my $id = $2;
- print $OUT "direct binding id $id\n";
+ print "direct binding id $id\n";
$Ivyobj->bindDirect($id, [\&directCallback] );
return 0;
}
@@ -177,7 +222,7 @@ sub interpret_line {
my $id = $3;
my $data = $4;
&printtime;
- print $OUT "send direct to $appname id=$id $data\n";
+ print "send direct to $appname id=$id $data\n";
$Ivyobj->sendDirectMsgs($appname, $id, $data);
return 0;
}
@@ -186,20 +231,20 @@ sub interpret_line {
my $appname = $2;
my $timeout = $3;
&printtime;
- print $OUT "ping $appname timeout=$timeout\n";
+ print "ping $appname timeout=$timeout\n";
my $res = $Ivyobj->ping($appname, $timeout);
- print $OUT "$res\n";
+ print "$res\n";
return 0;
}
if ($str =~ /^.who\s*$/) {
- print $OUT "Apps:";
+ print "Apps:";
foreach my $app (sort keys %connected_applications) {
for (my $i=0; $i<$connected_applications{$app} ; $i++) {
- print $OUT " $app";
+ print " $app";
}
}
- print $OUT "\n";
+ print "\n";
return 0;
}
@@ -210,26 +255,25 @@ sub interpret_line {
my ($app,$host) = $app_host =~ /(.+):(.*)/ ;
if ($app eq $appli) {
for (my $i=0; $i<$where_applications{$app_host}; $i++) {
- print $OUT "Application $app on $host\n";
+ print "Application $app on $host\n";
$found = 1;
}
}
}
- print $OUT "No Application $appli\n" unless ($found);
+ print "No Application $appli\n" unless ($found);
return 0;
}
- print $OUT "bad command. Type '.help' for a list of commands\n";
+ print "bad command. Type '.help' for a list of commands\n";
return 0;
-
}
sub callback {
- my ($unused, $appname, @param) = @_;
+ my ($appname, @param) = @_;
my $paramString = "";
if (scalar @param) { $paramString = join ("' '", @param); }
- print $OUT "$appname sent '", $paramString, "'\n";
+ print "$appname sent '", $paramString, "'\n";
}
sub directCallback {
@@ -237,10 +281,18 @@ sub directCallback {
my $paramString = "";
if (scalar @param) { $paramString = join ("|", @param); }
- print $OUT "directMessage received '", $paramString, "'\n";
+ print "directMessage received '", $paramString, "'\n";
+}
+
+sub congestionCallback ($$$)
+{
+ my ($name, $addr, $state) = @_;
+
+ printf ("\033[1m $name [$addr] %s\033[m\n", $state ? "CONGESTION" : "OK");
}
+
sub check_options {
# on traite la ligne de commande
my ($opt_help, $opt_appliname);
@@ -248,51 +300,65 @@ sub check_options {
"b:s" => \$bus,
"name:s" => \$opt_appliname,
"t" => \$timestamp,
+ "stdio" => \$noReadLineMode,
+ "filter:s" => \$classes,
+ "regexpFile:s" => \$regexpFile
);
if (defined $opt_appliname and $opt_appliname =~ /\s/) {
- print $OUT "-name value should not contains blanck\n";
+ print "-name value should not contains blanck\n";
&usage;
}
- &usage if ($opt_help && $opt_help);
+ &usage if (defined $opt_help && $opt_help);
$appliname = $opt_appliname if (defined $opt_appliname);
}
sub usage {
- print $OUT "ivyprobe.pl [-h] [ -b <network>:<port> ] ['regexp']*\n";
- print $OUT " ivyprobe.pl is a simple test application for the ivy-perl library\n";
- print $OUT " Its is based on a similar appplication available with ivy-c\n";
- print $OUT " It waits for messages on the bus, messages writtten on the command line\n";
- print $OUT " or commands issued on the comnand line\n";
- print $OUT " Help for the command line is available through the command\n";
- print $OUT " .help or .h\n";
- print $OUT " -h print this help\n";
- print $OUT " -b <network>:<port>\n";
- print $OUT " to defined the network adress and the port number\n";
- print $OUT " defaulted to 127:2010\n";
- print $OUT " -t print a time stamp when a message is send or received\n";
- print $OUT " -name <this_appli_name>\n";
- print $OUT " to change the default appliname \'$appliname\'\n";
+ print "ivyprobe.pl [-h] [ -b <network>:<port> ] ['regexp']*\n";
+ print " ivyprobe.pl is a simple test application for the ivy-perl library\n";
+ print " Its is based on a similar appplication available with ivy-c\n";
+ print " It waits for messages on the bus, messages writtten on the command line\n";
+ print " or commands issued on the comnand line\n";
+ print " Help for the command line is available through the command\n";
+ print " .help or .h\n";
+ print " -h print this help\n";
+ print " -b <network>:<port>\n";
+ print " to defined the network adress and the port number\n";
+ print " defaulted to 127:2010\n";
+ print " -t print a time stamp when a message is send or received\n";
+ print " -name <this_appli_name>\n";
+ print " -stdio don't use gnu readline which permits to recall/edit \n";
+ print " entries with arrow keys\n";
+ print " This options permits to redirect ivyprobe.pl output to file\n";
+ print " -filter classe1,classe2,classe3,...,classeN\n";
+ print " filter messages so that we could send only messages\n";
+ print " beginning by classe1 or classe2 or classe3 or ... or classeN\n";
+ print " -regexpFile file bind to all regexps which are in the geregexp file\n";
+ print " \n";
+ print " \n";
exit;
}
sub line_command_usage { print "tutu\n";
- print $OUT "Commands list:\n";
- print $OUT " .h[elp] - this help\n";
- print $OUT " .q[uit] - terminate this application\n";
- print $OUT " .b[ind] regexp - add a msg to receive\n";
- print $OUT " .u[nbind] regexp - remove a msg to receive\n";
- print $OUT " .die appname1 appname2 ... - send die msg to appnameN\n";
- print $OUT " .db[ind] id - add a direct msg to receive\n";
- print $OUT " .d[irect] appname id args - send direct msg to appname\n";
- print $OUT " .p[ing] appname timeout - ping appname with a delay of timeout ms NYI\n";
- print $OUT " .where appname - on which host is/are appname\n";
- print $OUT " .who - who is on the bus\n";
+ print "Commands list:\n";
+ print " .h[elp] - this help\n";
+ print " .q[uit] - terminate this application\n";
+ print " .b[ind] regexp - add a msg to receive\n";
+ print " .u[nbind] regexp - remove a msg to receive\n";
+ print " .die appname1 appname2 ... - send die msg to appnameN\n";
+ print " .db[ind] id - add a direct msg to receive\n";
+ print " .d[irect] appname id args - send direct msg to appname\n";
+ print " .p[ing] appname timeout - ping appname with a delay of timeout ms NYI\n";
+ print " .where appname - on which host is/are appname\n";
+ print " .who - who is on the bus\n";
}
+
+
+
__END__
diff --git a/example/testCongestionTk.pl b/example/testCongestionTk.pl
new file mode 100755
index 0000000..aa3d21a
--- /dev/null
+++ b/example/testCongestionTk.pl
@@ -0,0 +1,140 @@
+#!/usr/bin/perl -w
+
+use strict;
+use Getopt::Long;
+use Ivy;
+use Time::HiRes;
+use Tk;
+
+sub defaultOption ($$);
+
+my %options;
+my $numberOfSentMsg = 0;
+my $numberOfSentMsgWhenCongestion = 1e6;
+END {Ivy::stop ();}
+
+
+# cet exemple lance deux agents un qui envoie vite de gros messages, et un autre
+# qui les reçoit. Lors des 10 premières receptions il attend une seconde après
+# chaque message, ensuite il depile aussi vite qu'il peut, cet exemple permet
+# de tester le bon fonctionnement du mode non bloquant.
+
+
+
+#OPTIONS
+GetOptions (\%options, "send", "receive");
+
+unless ((exists $options{send}) || (exists $options{receive})) {
+ if (fork () == 0) {
+ sleep (1);
+ exec (qw (./testCongestionTk.pl -send));
+ } else {
+ exec (qw (./testCongestionTk.pl -receive));
+ }
+}
+
+defaultOption ("bus", $ENV{IVYBUS});
+if (exists ($options{send})) {
+ defaultOption ("ivyname", "TESTSEND");
+} else {
+ defaultOption ("ivyname", "TESTRECEIVE");
+}
+
+my $t0;
+my $cbAppelee = 0;
+# IVY
+
+Ivy->init (-loopMode => 'TK',
+ -appName => $options{ivyname},
+ -ivyBus => $options{bus},
+ -filterRegexp => [$options{ivyname}]
+ ) ;
+
+my $bus = Ivy->new (-statusFunc => \&statusFunc,
+ -slowAgentFunc=> \&congestionFunc,
+ -blockOnSlowAgent => 0,
+ -neededApp => exists $options{send} ?
+ ["TESTRECEIVE"] : ["TESTSEND"]);
+
+my $mw = MainWindow->new;
+my $tx1 = $mw->Text;
+my $tx2 = $mw->Text (-height => 3);
+$tx2->pack (-fill => 'both', -expand => 'false');
+$tx1->pack (-fill => 'both', -expand => 'true');
+$mw->title ($options{ivyname});
+
+unless (exists ($options{send})) {
+ $bus->bindRegexp ('TESTSEND SEND (\d+) (.*)', [\&receiveSend]);
+}
+
+if (exists ($options{send})) {
+ $mw->repeat (10, [\&send]);
+}
+
+$bus->start ();
+
+$bus->mainLoop ();
+#Tk::MainLoop ();
+
+
+# PROCEDURES
+
+
+sub receiveSend ($$)
+{
+ my ($app, $iter) = @_;
+ $tx1->insert ('end', "RECEIVE $iter\n");
+ $tx1->yviewScroll (1, 'units');
+ $tx1->idletasks();
+ sleep (1) if ($cbAppelee++ < 10);
+}
+
+
+sub send ()
+{
+ $t0 = Time::HiRes::gettimeofday;
+ #print ("DBG> send $t0\n");
+
+ if ($numberOfSentMsg++ < ($numberOfSentMsgWhenCongestion+100)) {
+ $tx1->insert ('end', "SEND $numberOfSentMsg\n");
+ $tx1->yviewScroll (1, 'units');
+ $bus->sendAppNameMsgs ("SEND $numberOfSentMsg " . 'a' x 1020);
+ }
+}
+
+
+sub defaultOption ($$)
+{
+ my ($option, $default) = @_;
+ unless (defined $options{$option}) {
+# warn "option $option non spécifiéee : utilision de $default\n";
+ $options{$option} = $default;
+ }
+}
+
+
+sub statusFunc ($$)
+{
+ my ($ready, $notReady) = @_;
+
+ if (@{$notReady}) {
+ printf "appli manquantes : %s\n", join (' ', @{$notReady});
+ } else {
+ printf ("Toutes applis OK !!\n");
+ }
+}
+
+sub congestionFunc ($$$)
+{
+ my ($name, $addr, $state) = @_;
+
+ if ($state == 1) {
+ $tx2->insert ('end', sprintf ("$name [$addr] %s will stop at N=%d\n", $state ? "CONGESTION" : "OK",
+ $numberOfSentMsg+100));
+ $numberOfSentMsgWhenCongestion = $numberOfSentMsg;
+ } else {
+ $tx2->insert ('end', sprintf ("$name [$addr] %s\n", $state ? "CONGESTION" : "OK"));
+ }
+ $tx2->update();
+}
+