From a22fbfd2903894b44b871d88c3f1c8a52e13bc23 Mon Sep 17 00:00:00 2001 From: bustico Date: Tue, 27 Jun 2006 11:14:22 +0000 Subject: ° fix a bug : not all the send where completed in non blocking mode when non blocking mode was requested ° Optimisation : remove buffer copy when it's possible ivyprobe.pl : ° use non blocking mode, ° fix bug with the use of gnu readline : now editing previous entries is possible ° add -regexpFile regexpfile option : ivyprobe.pl will bind qll the regexp which are in the file given in argument. ° add -filter class1,classe2,...,classeN : add the possibility to filter messages for test/debug purpose. testCongestionTk.pl : simple demo which demonstrate non blocking mode with Tk --- example/ivyprobe.pl | 212 +++++++++++++++++++++++++++++--------------- example/testCongestionTk.pl | 140 +++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+), 73 deletions(-) create mode 100755 example/testCongestionTk.pl (limited to 'example') diff --git a/example/ivyprobe.pl b/example/ivyprobe.pl index 2dd9b0e..d920f24 100755 --- a/example/ivyprobe.pl +++ b/example/ivyprobe.pl @@ -9,7 +9,7 @@ # modify it under the terms of the GNU GPL General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. -# +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the @@ -25,17 +25,16 @@ use strict; use Ivy; use Getopt::Long; use Term::ReadLine; -#use Tk; -use Carp; -my $term = Term::ReadLine->new("ivyprobe.pl"); -#$term->tkRunning; - -my $OUT = $term->OUT || *STDOUT{IO}; +use Carp; my $appliname = "IVYPROBE.PL"; my $bus; my $timestamp = 0; +my $noReadLineMode ; +my $regexpFile; +my $classes; +my @classes = (); # for each application gives the number of running instances my %connected_applications; @@ -45,45 +44,87 @@ my %where_applications; &check_options; +if (defined $classes) { + @classes =split(/:/, $classes); + printf ("DBG> CLASSES = %s\n", join (" , ", @classes)); +} + + +unless (defined $noReadLineMode) { + my $pid; + pipe (PIPE_READ, PIPE_WRITE); + select PIPE_WRITE; $| = 1; + select STDOUT; $| = 1; + + if (($pid = fork() == 0)) { + # code du fils qui lit dans le pipe + close (PIPE_WRITE); + open (STDIN, "<&PIPE_READ") or die "Can't dup STDIN on PIPE_READ: $!"; + } else { + close (PIPE_READ); + my $term = Term::ReadLine->new("ivyprobe.pl"); + $term->SetHistory (); + + while (defined ($_ = $term->readline("> "))) { + chomp; + print PIPE_WRITE "$_\n" ; + #$term->addhistory($_) if /\S/; + } + kill (15, $pid); + waitpid ($pid, 0); + exit (0); + } +} + + Ivy->init (-ivyBus => (defined $bus) ? $bus : undef, -appName => $appliname, -# -loopMode => 'TK', -loopMode => 'LOCAL', -messWhenReady => "$appliname READY", + -filterRegexp => \@classes ); my $Ivyobj = Ivy->new(-statusFunc => \&statusFunc, + -slowAgentFunc=> \&congestionCallback, + -blockOnSlowAgent => 0, ); foreach my $regexp (@ARGV) { - print $OUT "binding to $regexp\n"; + print "binding to $regexp\n"; if ($regexp =~ /'(.*)'/) { $regexp = $1; } - $Ivyobj->bindRegexp($regexp, [ "unused", \&callback] ); + $Ivyobj->bindRegexp($regexp, [ \&callback] ); } -$Ivyobj->start; +if (defined ($regexpFile)) { + open (RF, $regexpFile) || die "could not read rexp file $regexpFile\n"; + while ($_ = ) { + last if eof (RF); + chomp; + next unless (length ($_) > 4); + $Ivyobj->bindRegexp($_, [ \&callback] ) ; +# printf ("DBG> subscribe to '$_'\n"); + } + close (RF); +} -my $IN = *STDIN{IO}; + +$Ivyobj->start; sub cb { - my $line = $term->readline(""); - $term->addhistory($line); + my $line = ; chomp $line; exit if (&interpret_line ($line)); } -Ivy->fileEvent($IN, \&cb); -Ivy->mainLoop; -#my $mw = MainWindow->new(); -#$mw->fileevent($IN, 'readable' => \&cb); -#MainLoop; +$Ivyobj->fileEvent(*STDIN, \&cb); +$Ivyobj->mainLoop(); sub printtime { return if (!$timestamp); my ($sec,$min,$hour) = localtime(); - printf $OUT "[%02d:%02d:%02d] ", $hour, $min, $sec; + printf "[%02d:%02d:%02d] ", $hour, $min, $sec; } # this function has 3 additionnal parameters till Ivy Version 4.6 @@ -94,27 +135,31 @@ sub statusFunc { if ($status eq "new") { &printtime; - print $OUT "$appname connected from $host_or_regexp\n"; + print "$appname connected from $host_or_regexp\n"; $where_applications{"$appname:$host_or_regexp"}++; } elsif ($status eq "died") { &printtime; - print $OUT "$appname disconnected from $host_or_regexp\n"; + print "$appname disconnected from $host_or_regexp\n"; $where_applications{"$appname:$host_or_regexp"}--; } elsif ($status eq 'subscribing') { &printtime; - print $OUT "$appname subscribed to '$host_or_regexp'\n"; + print "$appname subscribed to '$host_or_regexp'\n"; } elsif ($status eq 'unsubscribing') { &printtime; - print $OUT "$appname unsubscribed to '$host_or_regexp'\n"; + print "$appname unsubscribed to '$host_or_regexp'\n"; + } + elsif ($status eq 'filtered') { + &printtime; + print "$appname subscribed to *FILTERED* '$host_or_regexp'\n"; } else { &printtime; - print $OUT "Bug: unkown status; $status in &statusFunc\n"; + print "Bug: unkown status; $status in &statusFunc\n"; } - + %connected_applications = %$ref_hashReady; } @@ -127,9 +172,9 @@ sub interpret_line { if ($str =~ /^([^\.])/ or $str =~ /^\\/) { my $count=$Ivyobj->sendMsgs($str); &printtime; - print $OUT "-> Sent to $count peer"; - if ($count > 1) { print $OUT "s" } - print $OUT "\n"; + print "-> Sent to $count peer"; + if ($count > 1) { print "s" } + print "\n"; return 0; } if ($str =~ /^\.q(uit)?\s*$/) { @@ -152,22 +197,22 @@ sub interpret_line { if ($str =~ /^.b(ind)?\s+(.*)$/) { my $regexp = $2; if ($regexp =~ /'(.*)'/) { $regexp = $1; } - print $OUT "binding $regexp\n"; - $Ivyobj->bindRegexp($regexp, [ "unused", \&callback] ); + print "binding $regexp\n"; + $Ivyobj->bindRegexp($regexp, [ \&callback] ); return 0; } if ($str =~ /^.u(nbind)?\s+(.*)$/) { my $regexp = $2; if ($regexp =~ /'(.*)'/) { $regexp = $1; } - print $OUT "unbinding $regexp\n"; + print "unbinding $regexp\n"; $Ivyobj->bindRegexp($regexp); return 0; } if ($str =~ /^.db(ind)?\s+(.*)$/) { my $id = $2; - print $OUT "direct binding id $id\n"; + print "direct binding id $id\n"; $Ivyobj->bindDirect($id, [\&directCallback] ); return 0; } @@ -177,7 +222,7 @@ sub interpret_line { my $id = $3; my $data = $4; &printtime; - print $OUT "send direct to $appname id=$id $data\n"; + print "send direct to $appname id=$id $data\n"; $Ivyobj->sendDirectMsgs($appname, $id, $data); return 0; } @@ -186,20 +231,20 @@ sub interpret_line { my $appname = $2; my $timeout = $3; &printtime; - print $OUT "ping $appname timeout=$timeout\n"; + print "ping $appname timeout=$timeout\n"; my $res = $Ivyobj->ping($appname, $timeout); - print $OUT "$res\n"; + print "$res\n"; return 0; } if ($str =~ /^.who\s*$/) { - print $OUT "Apps:"; + print "Apps:"; foreach my $app (sort keys %connected_applications) { for (my $i=0; $i<$connected_applications{$app} ; $i++) { - print $OUT " $app"; + print " $app"; } } - print $OUT "\n"; + print "\n"; return 0; } @@ -210,26 +255,25 @@ sub interpret_line { my ($app,$host) = $app_host =~ /(.+):(.*)/ ; if ($app eq $appli) { for (my $i=0; $i<$where_applications{$app_host}; $i++) { - print $OUT "Application $app on $host\n"; + print "Application $app on $host\n"; $found = 1; } } } - print $OUT "No Application $appli\n" unless ($found); + print "No Application $appli\n" unless ($found); return 0; } - print $OUT "bad command. Type '.help' for a list of commands\n"; + print "bad command. Type '.help' for a list of commands\n"; return 0; - } sub callback { - my ($unused, $appname, @param) = @_; + my ($appname, @param) = @_; my $paramString = ""; if (scalar @param) { $paramString = join ("' '", @param); } - print $OUT "$appname sent '", $paramString, "'\n"; + print "$appname sent '", $paramString, "'\n"; } sub directCallback { @@ -237,10 +281,18 @@ sub directCallback { my $paramString = ""; if (scalar @param) { $paramString = join ("|", @param); } - print $OUT "directMessage received '", $paramString, "'\n"; + print "directMessage received '", $paramString, "'\n"; +} + +sub congestionCallback ($$$) +{ + my ($name, $addr, $state) = @_; + + printf ("\033[1m $name [$addr] %s\033[m\n", $state ? "CONGESTION" : "OK"); } + sub check_options { # on traite la ligne de commande my ($opt_help, $opt_appliname); @@ -248,51 +300,65 @@ sub check_options { "b:s" => \$bus, "name:s" => \$opt_appliname, "t" => \$timestamp, + "stdio" => \$noReadLineMode, + "filter:s" => \$classes, + "regexpFile:s" => \$regexpFile ); if (defined $opt_appliname and $opt_appliname =~ /\s/) { - print $OUT "-name value should not contains blanck\n"; + print "-name value should not contains blanck\n"; &usage; } - &usage if ($opt_help && $opt_help); + &usage if (defined $opt_help && $opt_help); $appliname = $opt_appliname if (defined $opt_appliname); } sub usage { - print $OUT "ivyprobe.pl [-h] [ -b : ] ['regexp']*\n"; - print $OUT " ivyprobe.pl is a simple test application for the ivy-perl library\n"; - print $OUT " Its is based on a similar appplication available with ivy-c\n"; - print $OUT " It waits for messages on the bus, messages writtten on the command line\n"; - print $OUT " or commands issued on the comnand line\n"; - print $OUT " Help for the command line is available through the command\n"; - print $OUT " .help or .h\n"; - print $OUT " -h print this help\n"; - print $OUT " -b :\n"; - print $OUT " to defined the network adress and the port number\n"; - print $OUT " defaulted to 127:2010\n"; - print $OUT " -t print a time stamp when a message is send or received\n"; - print $OUT " -name \n"; - print $OUT " to change the default appliname \'$appliname\'\n"; + print "ivyprobe.pl [-h] [ -b : ] ['regexp']*\n"; + print " ivyprobe.pl is a simple test application for the ivy-perl library\n"; + print " Its is based on a similar appplication available with ivy-c\n"; + print " It waits for messages on the bus, messages writtten on the command line\n"; + print " or commands issued on the comnand line\n"; + print " Help for the command line is available through the command\n"; + print " .help or .h\n"; + print " -h print this help\n"; + print " -b :\n"; + print " to defined the network adress and the port number\n"; + print " defaulted to 127:2010\n"; + print " -t print a time stamp when a message is send or received\n"; + print " -name \n"; + print " -stdio don't use gnu readline which permits to recall/edit \n"; + print " entries with arrow keys\n"; + print " This options permits to redirect ivyprobe.pl output to file\n"; + print " -filter classe1,classe2,classe3,...,classeN\n"; + print " filter messages so that we could send only messages\n"; + print " beginning by classe1 or classe2 or classe3 or ... or classeN\n"; + print " -regexpFile file bind to all regexps which are in the geregexp file\n"; + print " \n"; + print " \n"; exit; } sub line_command_usage { print "tutu\n"; - print $OUT "Commands list:\n"; - print $OUT " .h[elp] - this help\n"; - print $OUT " .q[uit] - terminate this application\n"; - print $OUT " .b[ind] regexp - add a msg to receive\n"; - print $OUT " .u[nbind] regexp - remove a msg to receive\n"; - print $OUT " .die appname1 appname2 ... - send die msg to appnameN\n"; - print $OUT " .db[ind] id - add a direct msg to receive\n"; - print $OUT " .d[irect] appname id args - send direct msg to appname\n"; - print $OUT " .p[ing] appname timeout - ping appname with a delay of timeout ms NYI\n"; - print $OUT " .where appname - on which host is/are appname\n"; - print $OUT " .who - who is on the bus\n"; + print "Commands list:\n"; + print " .h[elp] - this help\n"; + print " .q[uit] - terminate this application\n"; + print " .b[ind] regexp - add a msg to receive\n"; + print " .u[nbind] regexp - remove a msg to receive\n"; + print " .die appname1 appname2 ... - send die msg to appnameN\n"; + print " .db[ind] id - add a direct msg to receive\n"; + print " .d[irect] appname id args - send direct msg to appname\n"; + print " .p[ing] appname timeout - ping appname with a delay of timeout ms NYI\n"; + print " .where appname - on which host is/are appname\n"; + print " .who - who is on the bus\n"; } + + + __END__ diff --git a/example/testCongestionTk.pl b/example/testCongestionTk.pl new file mode 100755 index 0000000..aa3d21a --- /dev/null +++ b/example/testCongestionTk.pl @@ -0,0 +1,140 @@ +#!/usr/bin/perl -w + +use strict; +use Getopt::Long; +use Ivy; +use Time::HiRes; +use Tk; + +sub defaultOption ($$); + +my %options; +my $numberOfSentMsg = 0; +my $numberOfSentMsgWhenCongestion = 1e6; +END {Ivy::stop ();} + + +# cet exemple lance deux agents un qui envoie vite de gros messages, et un autre +# qui les reçoit. Lors des 10 premières receptions il attend une seconde après +# chaque message, ensuite il depile aussi vite qu'il peut, cet exemple permet +# de tester le bon fonctionnement du mode non bloquant. + + + +#OPTIONS +GetOptions (\%options, "send", "receive"); + +unless ((exists $options{send}) || (exists $options{receive})) { + if (fork () == 0) { + sleep (1); + exec (qw (./testCongestionTk.pl -send)); + } else { + exec (qw (./testCongestionTk.pl -receive)); + } +} + +defaultOption ("bus", $ENV{IVYBUS}); +if (exists ($options{send})) { + defaultOption ("ivyname", "TESTSEND"); +} else { + defaultOption ("ivyname", "TESTRECEIVE"); +} + +my $t0; +my $cbAppelee = 0; +# IVY + +Ivy->init (-loopMode => 'TK', + -appName => $options{ivyname}, + -ivyBus => $options{bus}, + -filterRegexp => [$options{ivyname}] + ) ; + +my $bus = Ivy->new (-statusFunc => \&statusFunc, + -slowAgentFunc=> \&congestionFunc, + -blockOnSlowAgent => 0, + -neededApp => exists $options{send} ? + ["TESTRECEIVE"] : ["TESTSEND"]); + +my $mw = MainWindow->new; +my $tx1 = $mw->Text; +my $tx2 = $mw->Text (-height => 3); +$tx2->pack (-fill => 'both', -expand => 'false'); +$tx1->pack (-fill => 'both', -expand => 'true'); +$mw->title ($options{ivyname}); + +unless (exists ($options{send})) { + $bus->bindRegexp ('TESTSEND SEND (\d+) (.*)', [\&receiveSend]); +} + +if (exists ($options{send})) { + $mw->repeat (10, [\&send]); +} + +$bus->start (); + +$bus->mainLoop (); +#Tk::MainLoop (); + + +# PROCEDURES + + +sub receiveSend ($$) +{ + my ($app, $iter) = @_; + $tx1->insert ('end', "RECEIVE $iter\n"); + $tx1->yviewScroll (1, 'units'); + $tx1->idletasks(); + sleep (1) if ($cbAppelee++ < 10); +} + + +sub send () +{ + $t0 = Time::HiRes::gettimeofday; + #print ("DBG> send $t0\n"); + + if ($numberOfSentMsg++ < ($numberOfSentMsgWhenCongestion+100)) { + $tx1->insert ('end', "SEND $numberOfSentMsg\n"); + $tx1->yviewScroll (1, 'units'); + $bus->sendAppNameMsgs ("SEND $numberOfSentMsg " . 'a' x 1020); + } +} + + +sub defaultOption ($$) +{ + my ($option, $default) = @_; + unless (defined $options{$option}) { +# warn "option $option non spécifiéee : utilision de $default\n"; + $options{$option} = $default; + } +} + + +sub statusFunc ($$) +{ + my ($ready, $notReady) = @_; + + if (@{$notReady}) { + printf "appli manquantes : %s\n", join (' ', @{$notReady}); + } else { + printf ("Toutes applis OK !!\n"); + } +} + +sub congestionFunc ($$$) +{ + my ($name, $addr, $state) = @_; + + if ($state == 1) { + $tx2->insert ('end', sprintf ("$name [$addr] %s will stop at N=%d\n", $state ? "CONGESTION" : "OK", + $numberOfSentMsg+100)); + $numberOfSentMsgWhenCongestion = $numberOfSentMsg; + } else { + $tx2->insert ('end', sprintf ("$name [$addr] %s\n", $state ? "CONGESTION" : "OK")); + } + $tx2->update(); +} + -- cgit v1.1