summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Ivy.pm1492
1 files changed, 819 insertions, 673 deletions
diff --git a/Ivy.pm b/Ivy.pm
index 11d667f..735d632 100644
--- a/Ivy.pm
+++ b/Ivy.pm
@@ -1,53 +1,36 @@
#
-# Ivy, Perl interface
+# Ivy, Perl interface
#
-# Copyright 1997-1999
-# Centre d'Etudes de la Navigation Aerienne
+# Copyright 1997-1999
+# Centre d'Études de la Navigation Aérienne
#
-# Version 2.0 : api OO, envoi des messages par l'intermediaire
-# de thread pour ne jamais bloquer sur un envois.
+# Authors: Alexandre Bustico <bustico@cenatoulouse.dgac.fr>
+# Stéphane Chatty <chatty@cenatoulouse.dgac.fr>
#
-# TODO :
+# All functions
#
+# $Id$
#
-# - touver un mecanisme permettant de sortir meme quand
-# un thread d'emission est bloque sur un send.
+# Please refer to file Version.pm for the
+# copyright notice regarding this software
#
-#
-# BUGS:
-# -Si une appli abonnee bloque, son thread d'emission
-# reste bloque sur un send, le process ne sortira pas
-# sur un <ctrl C>, mais uniquement sur un kill -TERM pid,
-# voire sous irix sur un kill -KILL pid
-#
-# -Sous irix, si on cree dymaniquement des bus et qu'om les
-# tue a rythme eleve, au bout d'un moment perl plante.
-#
-# pour avoir une version debarassee des commentaires et des blancs
-# pour voir la longueur reelle du code :
-# perl -ne 'chomp; next if /^\s*$/ || /^\s*#/; s/#.*//; print "$_\n";' Ivy.pm
-#
-# pour commenter tous les print de debug :
-# perl -i.bak -ne 's/(printf?\s*\(?\"DBG)/ \# $1/ unless /^\s*\#/; print $_;' Ivy.pm
package Ivy ;
+require 5.005 ;
+
use Sys::Hostname;
use IO::Socket;
use strict ;
use Time::Gettimeofday ;
-use Thread;
-use Thread::Queue;
-use Thread::Signal;
-use Fcntl;
-use Errno;
+
+
use vars qw($VERSION);
-$VERSION = '4.1';
+$VERSION = '4.2';
#############################################################################
#### PROTOTYPES #####
#############################################################################
-
sub init ($%); # methode de classe, permet de renseigner
# tous les parametres globaux. Ces parametres
# seront utilises par new si ils ne sont pas
@@ -142,31 +125,17 @@ sub _parseIvyBusParam ($); # prends une adresse de bus de la forme
# un numero de port et une ref sur une
# liste d'adresses addr_inet
-sub _senderThread ($$$); # procedure executee dans un thread, qui
- # envoie les donnees sur une socket a travers
- # une queue. De cette facon, si une appli
- # distante bloque, ca ne penalise pas
- # l'appli locale, en dehors du fait que la
- # queue va grossir jusqu'au not enough memory
- # au prix de la memoire c'est pas grave :-)
- # deux arguments : la queue et le fd.
-
-sub _closeSenderThread ($$); # procedure d'arret d'un thread :
- # on vide la queue d'emission, on
- # place une valeur de sortie dans la queue,
- # et on ferme le fd associe a la socket
- # d'emission.
-
sub _substituteEscapedChar ($$); #permet de transormer une regexp etendue
# 'perl' en regexp de base
+
#############################################################################
#### CONSTANTES #####
#############################################################################
-use constant VERSION => 3;
use constant MSG_FMT => "%d %d\002%s\n";
# par defaut, on diffuse le bonjour en local
-use constant BROADCAST_ADDRS => "127.0.0.1" ;
+use constant BROADCAST_ADDRS => "127.255.255.255" ;
+use constant BROADCAST_PORT => "2010";
use constant BYE => 0;
use constant REGEXP => 1;
@@ -197,7 +166,6 @@ use constant REG_PERLISSISME => ('w' => '[a-zA-Z0-9_]',
# c'est un delimiteur pour le bus
'e' => '[]') ;
-
#############################################################################
#### VARIABLES de CLASSE #####
#############################################################################
@@ -295,15 +263,9 @@ use constant broadcastBuses => $constantIndexer++;
use constant appName => $constantIndexer++;
use constant messWhenReady => $constantIndexer++;
-
-
#############################################################################
#### METHODES PUBLIQUES #####
#############################################################################
-
-
-
-############### METHODE DE CLASSE INIT
sub init ($%)
{
my ($class, %options) = @_;
@@ -354,7 +316,8 @@ sub init ($%)
# est-il facultatif
if (defined $defaultOptions{$opt}) {
$options{$opt} = $defaultOptions{$opt} ;
- } else {
+ }
+ else {
# parametre obligatoire
die "ERREUR Ivy::init vous devez specifier ".
"l'option $opt\n";
@@ -385,13 +348,11 @@ sub init ($%)
# mode boucle d'evenement de TK
$fileEventFunc = \&_tkFileEvent ;
} else {
- die qq|l'argument "mainloop mode" doit etre TK ou LOCAL\n|;
+ die "l'argument \"mainloop mode\" doit etre TK ou LOCAL\n";
}
$SIG{'PIPE'} = 'IGNORE' ;
}
-
-
############# METHODE DE CLASSE NEW
sub new ($%)
@@ -433,13 +394,6 @@ sub new ($%)
# tab ass : nom du fd => fd
$self->[sockList] = {};
- # tab ass : nom du fd => queue de communication entre
- # l'appli et le thread qui fait les envois
- $self->[queueList] = {};
-
- # tab ass : nom du fd => thread qui fait les envois
- $self->[threadList] = {};
-
# tab ass : nom de l'appli => fd
$self->[appliList] = {};
@@ -532,7 +486,7 @@ sub new ($%)
$options{$opt} = $defaultOptions{$opt} ;
} else {
# parametre obligatoire
- die "ERREUR Ivy::start vous devez specifier ".
+ die "ERREUR Ivy::new vous devez specifier ".
"l'option $opt\n";
}
}
@@ -557,15 +511,42 @@ sub new ($%)
return ($self);
}
-
+############### METHODE IVY DESTROY
+sub DESTROY ($)
+{
+ my $self = shift;
+ return unless exists $allBuses{$self};
+
+ # print ("DBG DESTROY appele sur l'objet $self\n");
-############## METHODE DE CLASSE STOP
-sub stop ()
+ # pour toutes les connections
+ foreach my $fd (values %{$self->[sockList]}) {
+ next unless exists ($self->[queueList]->{$fd});
+
+ foreach my $fd (values %{$self->[sockList]}) {
+ send ($fd, sprintf (MSG_FMT, BYE, 0, ""), 0)
+ or $self->_removeFileDescriptor ($fd);
+ }
+ }
+
+ # on clos la socket de signalisation (UDP)
+ # print "DBG> fermeture de supSock\n";
+ $self->[supSock]->close() if $self->[supSock];
+ delete $allBuses{$self};
+
+ # on clos la socket de connection
+ # print "DBG> fermeture de connSock\n";
+ $self->[connSock]->close() if $self->[connSock];
+ undef (@$self);
+}
+
+############### METHODE DE CLASSE STOP
+sub stop ()
{
foreach my $bus (values %allBuses) {
$bus->DESTROY();
- }
+ } # pour toutes les connections
}
@@ -575,123 +556,116 @@ sub exit ()
Ivy::stop ();
if (defined $localLoopSel) {
# boucle locale, on sait faire
- # printf ("DBG> undefining localLoopSel\n");
+ # printf ("DBG> undefining localLoopSel\n");
undef $localLoopSel;
- } else {
-
- # afficher les threads qui restent
- foreach my $t (Thread->list()) {
- next if (($t->tid == Thread->self->tid) || $t->tid == 0);
- printf ("DBG> Thread %d is active, flag = %d\n", $t->tid, $t->flags)
- if $^W;
- }
+ }
+ else {
Tk::exit ();
}
}
-
-
-################ METHODE START
-sub start ($)
+############### PROCEDURE BUS START
+sub start ($)
{
my $self = shift;
- # cree la socket de connexion, recupere le no de port
- my $connSock = $self->[connSock] = IO::Socket::INET->new(Listen => 128,
- Proto => 'tcp',
- Reuse => 1) ;
- # print ("DBG> opening TCP fd $connSock\n");
- # on memorise tout ca, ce qui evitera par la suite de se
- # repondre a soi-meme. On le fait sous nos deux noms :
- # le nom de machine et 'localhost'
- my $hostAddr = (gethostbyname (hostname()))[4] ;
- my $localhostAddr = (gethostbyname ('localhost'))[4] ;
- $self->[cnnxion]->{"$hostAddr:". $connSock->sockport} = "\004";
- $self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004";
-
- # cree la socket de broadcast
- $self->[supSock] = IO::Socket::INET->new (
- LocalPort => $self->[broadcastPort],
- Proto => 'udp',
- Type => SOCK_DGRAM,
- Reuse => 1);
-
- $self->[supSock]->sockopt (SO_BROADCAST, 1);
- fcntl ( $self->[supSock], F_SETFL, O_NDELAY) ;
-
- # et on envoie envoie le bonjour : "no de version no de port"
- my $bonjourMsg = sprintf ("%d %d\n", VERSION, $connSock->sockport());
-
- foreach my $netBroadcastAddr (@{$self->[broadcastBuses]}) {
+ # cree la socket de connexion, recupere le no de port
+ my $connSock = $self->[connSock] = IO::Socket::INET->new(Listen => 128,
+ Proto => 'tcp',
+ Reuse => 1) ;
+ # on memorise tout ca, ce qui evitera par la suite de se
+ # repondre a soi-meme. On le fait sous nos deux noms :
+ # le nom de machine et 'localhost'
+ my $hostAddr = (gethostbyname (hostname()))[4] ;
+ my $localhostAddr = (gethostbyname ('localhost'))[4] ;
+ $self->[cnnxion]->{"$hostAddr:". $connSock->sockport} = "\004";
+ $self->[cnnxion]->{"$localhostAddr:". $connSock->sockport} = "\004";
+
+ # cree la socket de broadcast
+ $self->[supSock] = IO::Socket::INET->new
+ (LocalPort => $self->[broadcastPort],
+ Proto => 'udp',
+ Type => SOCK_DGRAM,
+ Reuse => 1);
+
+ $self->[supSock]->sockopt (SO_BROADCAST, 1);
- send ($self->[supSock], $bonjourMsg, 0, $netBroadcastAddr) or
- warn "Attention Ivy::start envoi du bonjour a echoue : $!\n";
- }
- # callback pour traiter la reception des bonjours
- &$fileEventFunc ($self->[supSock], [\&_getBonjour, $self]) ;
-
- # callback pour traiter les demandes de cxion
- &$fileEventFunc ($connSock, [\&_getConnections, $self]) ;
+
+ # et on envoie envoie le bonjour : "no de version no de port"
+ my $bonjourMsg = sprintf ("%d %d\n", VERSION, $connSock->sockport());
+
+ foreach my $netBroadcastAddr (@{$self->[broadcastBuses]}) {
+ send ($self->[supSock], $bonjourMsg, 0, $netBroadcastAddr) or
+ warn "Ivy::start envoi du bonjour a echoue sur : $!\n";
+ }
+ # callback pour traiter la reception des bonjours
+ &$fileEventFunc ($self->[supSock], [\&_getBonjour, $self]) ;
+
+ # callback pour traiter les demandes de cxion
+ &$fileEventFunc ($self->[connSock], [\&_getConnections, $self]) ;
}
-############### METHODE BIND REGEXP
-sub bindRegexp ($$$$)
-{
- my ($self, $regexp, $cb, $observer) = @_;
-
- # on substitue les meta caracteres des regexps perl : \d, \w, \s, \e
- # par les classes de caracteres corespondantes de maniere a ce
- # qu'une appli distante non perl comprenne ces regexp.
- $regexp =~ s|
- (
- (?<!\\) \[ # le premier crochet ouvrant non precede d'un \
- .*? # ce qu'il y a dans le crochet, en mode frugal
- (?<!\\) \] # le premier crochet fermant non precede d'un \
- )
- |
- _substituteEscapedChar ('inside', $1)
- |xge;
+############### PROCEDURE BIND REGEXP
+sub bindRegexp ($$$) {
+ my ($self, $regexp, $cb) = @_;
+
+ # on substitue les meta caracteres des regexps perl : \d, \w, \s, \e
+ # par les classes de caracteres corespondantes de maniere a ce
+ # qu'une appli distante non perl comprenne ces regexp.
+ $regexp =~ s|
+ (
+ (?<!\\) \[ # le premier crochet ouvrant non precede d'un \
+ .*? # ce qu'il y a dans le crochet, en mode frugal
+ (?<!\\) \] # le premier crochet fermant non precede d'un \
+ )
+ |
+ _substituteEscapedChar ('inside', $1)
+ |xge;
- $regexp = _substituteEscapedChar ('outside', $regexp);
- # print ("DBG regexp = $regexp\n");
-
- if ($cb) {
- my $id;
- # on rajoute le couple $regexp, $cb dans la liste des messages
- # qu'on prend
-
- # on commence par tester si on a un id libere dans le tableau
- for ($id=0; $id <= ($#{$self->[recCbList]}+1); $id++) {
- last unless (defined $self->[recCbList][$id]) &&
- @{$self->[recCbList][$id]->[1]};
- }
- $self->[recCbList][$id] = [$regexp, $cb, $observer];
+ $regexp = _substituteEscapedChar ('outside', $regexp);
+ # print ("DBG regexp = $regexp\n");
+
+ if ($cb) {
+ my $id;
+ # on rajoute le couple $regexp, $cb dans la liste des messages
+ # qu'on prend
+
+ # on commence par tester si on a un id libere dans le tableau
+ for ($id=0; $id <= ($#{$self->[recCbList]}+1); $id++) {
+ last unless (defined $self->[recCbList][$id]) && @{$self->[recCbList][$id]->[1]};
+ }
+ $self->[recCbList][$id] = [$regexp, $cb];
- # on envoie les messages regexps aux processus deja connectes
- _sendLastRegexpToAllreadyConnected ($self, $id) ;
- } else {
- # on vire le callback, et on se desabonne de cette regexp
- for (my $id=0; $id <= $#{$self->[recCbList]}; $id++) {
- next unless (defined $self->[recCbList][$id]) &&
- @{$self->[recCbList][$id]->[1]};
- if ($self->[recCbList][$id]->[0] eq $regexp) {
- $self->[recCbList][$id]->[1] = [];
- # on envoie le mesage delregexp
- foreach my $fd (values %{$self->[sockList]}) {
- Thread::Queue::enqueue ($self->[queueList]->{$fd},
- \sprintf (MSG_FMT, DELREGEXP, $id));
- }
- }
+ # on envoie les messages regexps aux processus deja connectes
+ _sendLastRegexpToAllreadyConnected ($self, $id) ;
+ }
+ else {
+
+ # on vire le callback, et on se desabonne de cette regexp
+ for (my $id=0; $id <= $#{$self->[recCbList]}; $id++) {
+
+ next unless (defined $self->[recCbList][$id]) &&
+ @{$self->[recCbList][$id]->[1]};
+
+ if ($self->[recCbList][$id]->[0] eq $regexp) {
+
+ $self->[recCbList][$id]->[1] = [];
+ # on envoie le mesage delregexp
+ foreach my $fd (values %{$self->[sockList]}) {
+ send ($fd, sprintf (MSG_FMT, DELREGEXP, $id), 0)
+ or $self->_removeFileDescriptor ($fd);
}
+ }
}
+ }
}
-############### METHODE BIND DIRECT
+############### METHODE BIND REGEXP
sub bindDirect ($$$)
{
my ($self, $id, $cb) = @_;
-
+
if ($cb) {
# on rajoute la $cb dans la liste des messages
# qu'on prend
@@ -701,48 +675,48 @@ sub bindDirect ($$$)
undef $self->[directCbList][$id];
}
}
-
-
-############### METHODE SEND MSGS
+############### PROCEDURE SEND MSGS
sub sendMsgs ($@)
{
use attrs qw(locked);
my ($self, @msgs) = @_;
- my $total = 0;
- # pour tous les messages
- foreach my $msg (@msgs) {
- study ($msg);
+ my $total = 0;
- # pour routes les connections
- foreach my $fd (keys %{$self->[sockList]}) {
+ # pour tous les messages
+ foreach my $msg (@msgs) {
+ study ($msg);
- # pour toutes les fonctions de filtrage de regexp
- foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
- $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
- }
+ # pour routes les connections
+ foreach my $fd (keys %{$self->[sockList]}) {
+
+ # pour toutes les fonctions de filtrage de regexp
+ foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
+ $total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
+ }
}
}
-# print "DBG> sended $total times\n";
- return $total;
+ # print "DBG> sended $total times\n";
+ return $total;
}
-############### METHODE SEND APP NAME MSGS
+############### PROCEDURE SEND MSGS
sub sendAppNameMsgs ($@)
{
use attrs qw(locked);
-
+
my ($self, @msgs) = @_;
my $total = 0;
+
# pour tous les messages
foreach (@msgs) {
my $msg = "$self->[appName] $_";
study ($msg);
-
+
# pour routes les connections
foreach my $fd (keys %{$self->[sockList]}) {
-
+
# pour toutes les fonctions de filtrage de regexp
foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
$total += &{$regexpFunc} (\$msg) if defined $regexpFunc;
@@ -755,129 +729,96 @@ sub sendAppNameMsgs ($@)
-############### METHODE SEND DIRECT MSGS
+############### PROCEDURE SEND DIRECT MSGS
sub sendDirectMsgs ($$$@)
{
- use attrs qw(locked);
my ($self, $to, $id, @msgs) = @_;
-
+
if (defined ($self->[appliList]{$to})) {
my @fds = @{$self->[appliList]{$to}};
# pour tous les messages
foreach my $msg (@msgs) {
foreach my $fd (@fds) {
- Thread::Queue::enqueue ($self->[queueList]->{$fd},
- \sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"));
+ send ($fd, sprintf (MSG_FMT, DIRECT_MSG, $id, "$msg"), 0)
+ or $self->_removeFileDescriptor ($fd);
}
}
return 1;
} else {
- warn "Attention : Ivy::sendDirectMsgs appli $to inconnue\n" if $^W;
+ warn "Ivy::sendDirectMsgs appli $to inconnue\n";
return 0;
}
}
-############### METHODE SEND DIE TO
+############### METHOD SEND DIE TO
sub sendDieTo ($$)
{
use attrs qw(locked);
my ($self, $to) = @_;
-
+
if (defined ($self->[appliList]{$to})) {
my @fds = @{$self->[appliList]{$to}};
+
warn "Attention : Ivy::sendDieTo gros BUG \@fds est vide \n"
if (scalar (@fds) == 0);
-
+
# pour tous les messages
foreach my $fd (@fds) {
- _sendDieTo ($self, $fd);
+ $self->_sendDieTo($fd);
}
return 1;
- } else {
- warn "Attention : Ivy::sendDieTo appli $to inconnue\n" if $^W;
+ }
+ else {
+ warn "Ivy::sendDieTo appli $to inconnue\n" if $^W;
return 0;
}
}
-############### METHODE PING
+############### METHOD PING
sub ping ($$$)
{
use attrs qw(locked);
+
my ($self, $to, $timeout) = @_;
-
+
if (defined ($self->[appliList]{$to})) {
+
my @fds = @{$self->[appliList]{$to}};
+
# pour tous les messages
foreach my $fd (@fds) {
- Thread::Queue::enqueue ($self->[queueList]->{$fd},
- \sprintf (MSG_FMT, PING, 0, " "));
+ send ($fd, sprintf (MSG_FMT, PING, 0, " "), 0)
+ or $self->_removeFileDescriptor ($fd);
}
}
}
-
-############### METHODE IVY DESTROY
-sub DESTROY ($)
-{
- my $self = shift;
- return unless exists $allBuses{$self};
-
- # print ("DBG DESTROY appele sur l'objet $self\n");
-
- # pour toutes les connections
- foreach my $fd (values %{$self->[sockList]}) {
- next unless exists ($self->[queueList]->{$fd});
-
- Thread::Queue::enqueue ($self->[queueList]->{$fd},
- \sprintf (MSG_FMT, BYE, 0, ""));
-
- # on attend un peu avant de fermer la thread, que
- # le bye ait le temps d'etre envoye si le
- # thread n'est pas bloque par le send.
- select (undef, undef, undef, 0.1);
-
- # on desactive le thread de reception
- # et on attend qu'il sorte
- $self->_closeSenderThread ($fd);
- }
-
- # on clos la socket de signalisation (UDP)
-# print "DBG> fermeture de supSock\n";
- $self->[supSock]->close() if $self->[supSock];
- delete $allBuses{$self};
-
- # on clos la socket de connection
-# print "DBG> fermeture de connSock\n";
- $self->[connSock]->close() if $self->[connSock];
- undef (@$self);
-}
-
-
-############### METHODE MAINLOOP
+############### METHODE MAINLOOP
sub mainLoop ()
{
die "Erreur Ivy->mainLoop, Ivy doit etre initialise en mode".
" loopMode local\n" unless defined $localLoopSel;
-
- my ($fd, @ready, @allDesc);
+ my ($fd, @ready, @allDesc);
+
while (defined $localLoopSel) {
@ready = IO::Select::can_read ($localLoopSel, $selectTimout) ;
_scanAfter () ;
- foreach $fd (@ready) {
+ foreach my $fd (@ready) {
if (ref $localBindByHandle{$fd} eq 'CODE') {
&{$localBindByHandle{$fd}} ;
- } else {
+ }
+ else {
my ($cb, @arg) = @{$localBindByHandle{$fd}} ;
&$cb (@arg)
}
}
}
}
-
+
############### METHODE AFTER
sub after ($$;$)
@@ -886,14 +827,16 @@ sub after ($$;$)
# appelee de maniere objet : premier argument = class ou une instance
# de classe
shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ;
-
+
my ($timeAfter, $cbListRef) = @_;
$timeAfter /= 1000;
$selectTimout = $timeAfter if $timeAfter < $selectTimout;
-
+
+ # si la valeur de timout est negative : c'est un after sinon
+ # c'est un repeat
$afterList{++$afterId} = [AFTER, $timeAfter,
timeofday()+$timeAfter, $cbListRef];
-
+
return ($afterId);
}
@@ -903,13 +846,13 @@ sub repeat ($$;$)
# test du premier argument au cas ou la fonction soit
# appelee de maniere objet : premier argument = class ou une instance
# de classe
+
shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ;
-
# on passe le temps en secondes pour le select
my ($timeAfter, $cbListRef) = @_;
$timeAfter /= 1000;
$selectTimout = $timeAfter if $timeAfter < $selectTimout;
-
+
$afterList{++$afterId}= [REPEAT, $timeAfter, timeofday()+$timeAfter,
$cbListRef];
return ($afterId);
@@ -922,9 +865,9 @@ sub afterCancel ($;$)
# appelee de maniere objet : premier argument = class ou une instance
# de classe
shift if ((ref ($_[0]) eq 'Ivy') || ($_[0] eq 'Ivy')) ;
-
- my $id = shift;
+ my $id = shift;
+
if (defined ($id) && defined $afterList{$id}) {
if ($afterList{$id}->[1] <= $selectTimout) {
delete $afterList{$id} ;
@@ -934,13 +877,13 @@ sub afterCancel ($;$)
foreach my $af (values %afterList) {
$selectTimout = $af->[1] if $af->[1] < $selectTimout ;
}
- } else {
+ }
+ else {
delete $afterList{$id} ;
}
}
}
-
############### METHODE FILE EVENT
sub fileEvent ($$;$)
{
@@ -969,9 +912,8 @@ sub fileEvent ($$;$)
}
}
-
#############################################################################
-#### METHODES PRIVEE #####
+#### METHODES PRIVEES #####
#############################################################################
@@ -979,44 +921,47 @@ sub fileEvent ($$;$)
sub _getBonjour ($)
{
my $self = shift;
-
+
my $bonjourMsg = '';
-
+
# l'hote distant
my $inetAddr = $self->[supSock]->recv ($bonjourMsg, 1024, 0);
+
unless (length $inetAddr) {
warn "Attention : Ivy::_getBonjour recv error, bonjour non traite\n";
return;
}
+
my $addr = (unpack_sockaddr_in ($inetAddr))[1];
-
- my $peerName = gethostbyaddr ($addr, AF_INET);
+ my $peerName = gethostbyaddr ($addr, AF_INET);
+
# on force $peerPort a etre vu comme une valeur numerique
my ($version, $peerPort) = $bonjourMsg =~ /^(\d+)\s+(\d+)/;
-
+
unless (defined ($version) && defined ($peerPort)) {
warn "Attention : Ivy::_getBonjour format du message bonjour incorrect\n".
"message = $bonjourMsg\n" ;
return;
}
+
if ($version != VERSION) {
warn "Attention : Ivy::_getBonjour VERSION: demande de connexion de ".
"$peerName\n version courrante : " . VERSION . ", recue : $version\n" ;
return;
}
-
+
# on verifie qu'on ne se repond pas et qu'on ne
# se reconnecte pas a un process deja connecte
- if (exists ($self->[cnnxion]->{"$addr:$peerPort"})) {
- #print "DBG> : bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ;
+ if (exists ($self->[cnnxion]{"$addr:$peerPort"})) {
+ #print "DBG> : bonjour de $peerName:$peerPort : DEJA CONNECTE\n" ;
return ;
- } else {
- #print "DBG> : reception de $peerName : bonjour $peerPort\n" ;
}
-
-
- # on verifie que l'adresse fasse partie de l'ensemble de reseau
+ else {
+ #print "DBG> : reception de $peerName : bonjour $peerPort\n" ;
+ }
+
+# on verifie que l'adresse fasse partie de l'ensemble de reseau
# definis par ivybus
my $addrInIvyBus = 0;
my @ivyBusAddrList = map ( (unpack_sockaddr_in ($_))[1],
@@ -1036,73 +981,59 @@ sub _getBonjour ($)
warn "bonjour de $peerName ignore, ne fait pas partie des ivyBus\n" if $^W;
return;
}
-
-
+
# ouverture du canal de communication
my $appSock = IO::Socket::INET->new (PeerAddr => $peerName,
PeerPort => $peerPort,
Proto => 'tcp');
if ($appSock) {
- # on cree la queue et le thread qui vont interfacer les envois
- # vers cette appli
- $self->[queueList]->{$appSock} = Thread::Queue->new();
- $self->[threadList]->{$appSock} =
- Thread->new (\&_senderThread, $self, $self->[queueList]->{$appSock},
- $appSock);
- # print "DBG> new thread ${$self->[threadList]->{$appSock}}\n";
# on cree une entree pour $appSock dans la liste des regexp
- $self->[cnnxion]->{"$addr:$peerPort"} = 1;
+ $self->[cnnxion]{"$addr:$peerPort"} = 1;
$self->[sendRegList]{$appSock} = [];
$self->[buffByConn]{$appSock} = '';
- $self->[sockList]->{$appSock} = $appSock;
+ $self->[sockList]{$appSock} = $appSock;
&$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ;
-
+
# on balance les regexps qui nous interessent a l'appli distante
- _sendWantedRegexp ($self, $appSock);
- } else {
- warn "Attention Ivy::_getBonjour impossible de se connecter au serveur " .
+ $self->_sendWantedRegexp ($appSock);
+ }
+ else {
+ warn "Attention Ivy::_getBonjour impossible de se connecter au serveur" .
"$peerName:$peerPort\n" ;
- }
+ }
}
-############### METHODE GET CONNECTIONS
+############### PROCEDURE GET CONNECTIONS
sub _getConnections ($)
{
my $self = shift;
my $appSock = $self->[connSock]->accept();
-
+
unless (defined $appSock) {
warn "Attention Ivy::_getConnections, \$appSock not defined\n";
return;
- } else {
+ }
+ else {
printf "accepting connection from %s:%d\n",
- (gethostbyaddr ($appSock->peeraddr(),AF_INET))[0],
- $appSock->peerport() if $^W;
+ (gethostbyaddr ($appSock->peeraddr(),AF_INET))[0],
+ $appSock->peerport() if $^W;
}
-
+
# callback pour traiter la reception des messages
- # on cree la queue et le thread qui vont interfacer les envois
- # vers cette appli
- $self->[queueList]->{$appSock} = Thread::Queue->new();
- $self->[threadList]->{$appSock} =
- Thread->new (\&_senderThread, $self, $self->[queueList]->{$appSock},
- $appSock);
- # print "DBG> new thread ${$self->[threadList]->{$appSock}}\n";
&$fileEventFunc ($appSock, [\&_getMessages, $self, $appSock]) ;
-
+
# on cree une entree pour $appSock dans la liste des regexp
$self->[sendRegList]{$appSock} = [];
$self->[buffByConn]{$appSock} = '';
- $self->[sockList]->{$appSock} = $appSock;
+ $self->[sockList]{$appSock} = $appSock;
# on balance les regexps qui nous interessent a l'appli distante
- _sendWantedRegexp ($self, $appSock);
+ $self->_sendWantedRegexp ($appSock);
}
-
############### METHODE GET MESSAGES
sub _getMessages ($$)
{
@@ -1115,17 +1046,16 @@ sub _getMessages ($$)
recv ($appSock, $buffer, 65536, 0) ;
unless (length $buffer) {
# message null : broken pipe, ça s'est deconnecte a l'autre bout
- # on vire ce fd de la boucle d'evenements ainsi que
- # le thread associe.
+ # on vire ce fd de la boucle d'evenements
# print ("DBG : _getMessages, recv err, calling removeFileDesc.\n");
# Bon la il faudra un jour clarifier ce bordel, lister toutes
# les facons dont un couple d'applis connectee peuevent sortir et
# eviter les dead lock qui doivent subsister.
if (defined ($localLoopSel)) {
-# _removeFileDescriptor ($self, $appSock);
- _closeSenderThread ($self, $appSock);
- } else {
- _closeSenderThread ($self, $appSock);
+ $self->_removeFileDescriptor ($self, $appSock);
+ }
+ else {
+ $self->_removeFileDescriptor ($self, $appSock);
}
return;
}
@@ -1135,146 +1065,178 @@ sub _getMessages ($$)
$self->[buffByConn]{$appSock} = '';
}
my @messages = split ('\n', $buffer) ;
- $self->[buffByConn]{$appSock} = pop (@messages) unless
+ $self->[buffByConn]{$appSock} = pop (@messages) unless
($buffer =~ /\n$/) ;
-# if (defined $appSock->peername) {
+ # if (defined $appSock->peername) {
$addr = $appSock->peeraddr();
$peerPort = $appSock->peerport() ;
- $senderName = $self->[cnnxion]->{"$addr:$peerPort"} ;
+ $senderName = $self->[cnnxion]{"$addr:$peerPort"} ;
$senderName = "NONAME" unless $senderName;
-
foreach my $mess (@messages) {
-# print "DBG>mess from $senderName *$mess*\n";
+ # print "DBG>mess from $senderName *$mess*\n";
- # on recupere les 3 champs : le type, le numero de regexp, les valeurs
- my ($type, $id, $valeurs) = $mess =~ /^(\d+)
- \s+
- (\d+)
- \002
- (.*)/x ;
+ # on recupere les 3 champs : le type, le numero de regexp, les valeurs
+ my ($type, $id, $valeurs) = $mess =~ /^(\d+)
+ \s+
+ (\d+)
+ \002
+ (.*)/x ;
# si ca a chie on rale
- (warn "Attention Ivy::_getMessages message mal formatte : $mess\n"
- and return) unless defined $type ;
-
- # sinon on fait en fonction du type de message
- if ($type == MSG) { # M S G
- # on recupere le couple call back, regexp correspondant
- # a l'identifiant et on appelle la fonction avec les parametres
- # traites par la regexp
- if (my @cb = @{$self->[recCbList][$id]->[1]}) {
- my $cb = shift @cb;
- # on split sur ETX
- my $observer = $self->[recCbList][$id]->[2];
- if (defined $observer) {
- $observer->$cb ($senderName, @cb, split ("\003", $valeurs)) ;
- }
- else {
- &$cb ($senderName, @cb, split ("\003", $valeurs)) ;
- }
- } else {
- #_sendErrorTo ($appSock, "REEGXP ID $id inconnue");
- warn ("Attention Ivy::_getMessages reception d'un message " .
- "MSG : id $id inconnu de $senderName :\n«$mess»");
+ (warn "Attention Ivy::_getMessages malformated message $mess\n" and return) unless defined $type ;
+
+ # sinon on fait en fonction du type de message
+ if ($type == MSG) { # M S G
+ # on recupere le couple call back, regexp correspondant
+ # a l'identifiant et on appelle la fonction avec les parametres
+ # traites par la regexp
+ if (my @cb = @{$self->[recCbList][$id]->[1]}) {
+ my $cb = shift @cb;
+ my $refcb = ref($cb);
+ if ($refcb ne 'CODE') {
+ my $method = shift @cb;
+ # on split sur ETX
+ $cb->$method($senderName, @cb, split ("\003", $valeurs)) ;
+ }
+ else {
+ &$cb ($senderName, @cb, split ("\003", $valeurs)) ;
+ }
+ }
+ else {
+ #_sendErrorTo ($appSock, "REEGXP ID $id inconnue");
+ warn ("Attention Ivy::_getMessages reception d'un message ".
+ "MSG : id $id inconnu de $senderName :\n«$mess»");
+ }
+ }
+ elsif ($type == BYE) {
+ #print "reception d'un bye\n";
+ $self->_removeFileDescriptor ($appSock); # B Y E
+ }
+ elsif ($type == REGEXP) { # R E G E X P
+ # on ajoute une fonction traitant la regexp et envoyant le
+ # message sur le bon fd dans la liste des fonctions de filtrage
+ # ca permet de compiler les regexp avec 'once' donc une
+ # fois pour toute, et ainsi optimiser la vitesse de
+ # filtrage des messages a envoyer
+ next if $self->_toBePruned ($senderName, $valeurs);
+ unless (defined $self->[sendRegList]{$appSock}->[$id]) {
+ # si l'id de regexp n'etait pas utilisee c'est tout bon
+ # on affecte la nouvelle regexp a un id
+ $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_';
+ sub {
+ use strict;
+ if (my @args = ${$_[0]} =~ /($valeurs)/o) {
+ shift @args;
+ $args[$#args] .= "\003" if @args;
+ send ($appSock, sprintf (MSG_FMT,
+ MSG, $id, join ("\003",@args)), 0)
+ or $self->_removeFileDescriptor ($appSock) ;
+ #print join (' ', "DBG > J'envoie MSG", $id, @args, "\n");
+ return 1;
}
- } elsif ($type == BYE) {
- #print "reception d'un bye\n";
- $self->_closeSenderThread ($appSock);
- } elsif ($type == REGEXP) { # R E G E X P
- # on ajoute une fonction traitant la regexp et envoyant le
- # message sur le bon fd dans la liste des fonctions de filtrage
- # ca permet de compiler les regexp avec 'once' donc une
- # fois pour toute, et ainsi optimiser la vitesse de
- # filtrage des messages a envoyer
- next if _toBePruned ($self, $senderName, $valeurs);
- unless (defined $self->[sendRegList]{$appSock}->[$id]) {
- # si l'id de regexp n'etait pas utilisee c'est tout bon
- # on affecte la nouvelle regexp a un id
- $self->[sendRegList]{$appSock}->[$id] = eval <<'_EOL_';
- sub {
- use strict;
- if (my @args = ${$_[0]} =~ /($valeurs)/o) {
- shift @args;
- $args[$#args] .= "\003" if @args;
- my $queue = $self->[queueList]->{$appSock};
- return 0 unless defined $queue;
- $queue->enqueue (\sprintf (MSG_FMT, MSG, $id,
- join ("\003",@args)));
- # print join (' ', "DBG > J'envoie MSG", $id, @args, "\n");
- return 1;
- }
- };
+ };
_EOL_
- } else {
- # l'id de la regexp etait deja utilise,
- # et n'a pas ete libere par un message delregexp,
- # on renvoie donc un message d'erreur
- _sendErrorTo ($self, $appSock, "ID $id deja utilisee");
+ }
+ else {
+ # l'id de la regexp etait deja utilise,
+ # et n'a pas ete libere par un message delregexp,
+ # on renvoie donc un message d'erreur
+ $self->_sendErrorTo($appSock, "ID $id deja utilisee");
}
- } elsif ($type == ERROR) { # E R R O R
+ }
+ elsif ($type == ERROR) { # E R R O R
warn ("Attention Ivy::_getMessages ERREUR recue de ".
- "$senderName : «$valeurs»\n");
- } elsif ($type == DELREGEXP) { # D E L R E G E X P
- # on vire la regexp des regexps vefifier
- $self->[sendRegList]{$appSock}->[$id] = undef ;
- } elsif ($type == ENDREGEXP) { # E N D R E G E X P
- # on envoie le message ready uniquement a celui qui nous
- # a envoye le message endregexp
- _sendMsgTo ($self, $appSock, $self->[messWhenReady]);
- # on passe de l'etat Connecte a l'etat Ready
- $self->[cnnxion]->{"$addr:$peerPort"} =~ s/^\004//;
- $senderName = $self->[cnnxion]->{"$addr:$peerPort"};
- unless (exists $self->[appliList]{$senderName}) {
- $self->[appliList]->{$senderName} = [$appSock];
- } else {
- push @{$self->[appliList]->{$senderName}}, $appSock;
- }
- _scanConnStatus ($self);
- } elsif ($type == APP_NAME) {
- # etat Connecte
- if (($self->[appName] eq $valeurs) && $^W) {
- warn "\033[1mATTENTION : Ivy::_getMessages une instance de ".
- "$self->[appName] existe deja\033[m\n" ;
- }
- $senderName = $valeurs;
- $self->[cnnxion]->{"$addr:$peerPort"} = "\004$valeurs";
- } elsif ($type == DIRECT_MSG) {
- if (defined $self->[directCbList][$id]) {
- my @cb = @{$self->[directCbList][$id]};
- my $cb = shift @cb;
- &$cb (@cb, $valeurs);
- } else {
- _sendErrorTo ($self, $appSock, "DIRECT ID $id inconnue");
- warn "Attention Ivy::_getMessages reception d'un message " .
- "DIRECT d'id $id inconnue de $senderName :\n«$mess»";
- }
- } elsif ($type == DIE) {
- # il faut quitter
- # on commence par appeler la callback de fin
- my @cb = @{$onDieFunc};
- my $cb = shift @cb;
- &$cb (@cb);
- # on avertit les autres qu'on se barre
- my $adr = _inetAdrByName ($self, $senderName) ;
- warn "Attention Ivy::_getMessages reception d'un ordre " .
- "de suicide de $senderName ($adr)... exiting\n" if $^W;
- # adios
- Ivy::exit ();
- } elsif ($type == PING) {
- # si on recois un ping, on envoie un pong
- _pong ($self, $appSock);
- } elsif ($type == PONG) {
- return PONG;
- } else {
- _sendErrorTo ($self, $appSock, "TYPE DE MESS $type inconnu");
- warn ("Attention Ivy::_getMessages reception d'un message " .
- "de type $type inconnu de $senderName :\n«$mess»");
+ "$senderName : «$valeurs»\n");
+ }
+ elsif ($type == DELREGEXP) { # D E L R E G E X P
+ # on vire la regexp des regexps vefifier
+ $self->[sendRegList]{$appSock}->[$id] = undef ;
+ }
+ elsif ($type == ENDREGEXP) { # E N D R E G E X P
+ # on envoie le message ready uniquement a celui qui nous
+ # a envoye le message endregexp
+ $self->_sendMsgTo ($appSock, $self->[messWhenReady]);
+
+ # on passe de l'etat Connecte a l'etat Ready
+ $self->[cnnxion]{"$addr:$peerPort"} =~ s/^\004//;
+ $senderName = $self->[cnnxion]{"$addr:$peerPort"};
+
+ unless (exists $self->[appliList]{$senderName}) {
+ $self->[appliList]{$senderName} = [$appSock];
+ }
+ else {
+ push @{$self->[appliList]{$senderName}}, $appSock;
+ }
+
+ $self->_scanConnStatus ();
+ }
+ elsif ($type == APP_NAME) {
+ # etat Connecte
+ if (($self->[appName] eq $valeurs) && $^W) {
+ warn "\033[1mATTENTION : Ivy::_getMessages une instance de ".
+ "$self->[appName] existe deja\033[m\n" ;
}
- }
-return 0;
-}
+
+ $senderName = $valeurs;
+ $self->[cnnxion]{"$addr:$peerPort"} = "\004$valeurs";
+ }
+ elsif ($type == DIRECT_MSG) {
+
+ if (defined $self->[directCbList][$id]) {
+ my @cb = @{$self->[directCbList][$id]};
+ my $cb = shift @cb;
+ my $refcb = ref($cb);
+ if ($refcb ne 'CODE') {
+ my $method = shift @cb;
+ $cb->$method(@cb, $valeurs);
+ }
+ else {
+ &$cb (@cb, $valeurs);
+ }
+ }
+ else {
+ $self->_sendErrorTo ($appSock, "DIRECT ID $id inconnue");
+ warn "Attention Ivy::_getMessages reception d'un message ".
+ "DIRECT d'id $id inconnue de $senderName :\n«$mess»";
+ }
+ } elsif ($type == DIE) {
+ # il faut quitter
+ # on commence par appeler la callback de fin
+ my @cb = @{$onDieFunc};
+ my $cb = shift @cb;
+ my $refcb = ref($cb);
+ if ($refcb ne 'CODE') {
+ my $method = shift @cb;
+ $cb->$method(@cb);
+ }
+ else {
+ &$cb (@cb);
+ }
+ # on avertit les autres qu'on se barre
+ my $adr = $self->_inetAdrByName ($senderName) ;
+ warn "Attention Ivy::_getMessages reception d'un ordre " .
+ "de suicide de $senderName ($adr) ... exiting\n" if $^W;
+ # adios
+ Ivy::exit ();
+
+ }
+ elsif ($type == PING) {
+ # si on recois un ping, on envoie un pong
+ $self->_pong ($appSock);
+ }
+ elsif ($type == PONG) {
+ return PONG;
+ }
+ else {
+ _$self->sendErrorTo ($appSock, "TYPE DE MESS $type inconnu");
+ warn ("reception d'un message de type $type inconnu de " .
+ "$senderName :\n«$mess»");
+ }
+ }
+ return 0;
+ }
############### METHODE SEND WANTED REGEXP
sub _sendWantedRegexp ($$)
@@ -1282,116 +1244,109 @@ sub _sendWantedRegexp ($$)
my ($self, $appSock) = @_;
# on envoie le message "Nom appli"
- Thread::Queue::enqueue ($self->[queueList]->{$appSock},
- \sprintf (MSG_FMT, APP_NAME, 0, $self->[appName]));
+ send ($appSock, sprintf (MSG_FMT, APP_NAME, 0, $self->[appName]), 0)
+ or $self->_removeFileDescriptor ($appSock) ;
# on envoie les regexps
for (my $id = 0; $id <= $#{$self->[recCbList]}; $id++) {
- next unless defined $self->[recCbList]->[$id]->[1]->[0];
-
- Thread::Queue::enqueue ($self->[queueList]->{$appSock},
- \sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]));
+ next unless defined $self->[recCbList][$id]->[1]->[0];
+
+ send ($appSock,
+ sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]),
+ 0) or $self->_removeFileDescriptor ($appSock) ;
# print sprintf ("DBG > %s %d %s\n",
# 'REGEXP', $id, $self->[recCbList][$id]->[0]);
}
# on envoie le message de fin d'envoi de regexps
- Thread::Queue::enqueue ($self->[queueList]->{$appSock},
- \sprintf (MSG_FMT, ENDREGEXP, 0, ""));
+ send ($appSock, sprintf (MSG_FMT, ENDREGEXP, 0, ""), 0)
+ or $self->_removeFileDescriptor ($appSock) ;
}
############### METHODE SEND LAST REGEXP TO ALLREADY CONNECTED
sub _sendLastRegexpToAllreadyConnected ($$)
{
- my ($self, $id) = @_;
- foreach my $fd (values %{$self->[sockList]}) {
- Thread::Queue::enqueue ($self->[queueList]->{$fd},
- \sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id][0]));
- }
+ my ($self, $id) = @_;
+
+ foreach my $fd (values %{$self->[sockList]}) {
+ send ($fd, sprintf (MSG_FMT, REGEXP, $id, $self->[recCbList][$id]->[0]),
+ 0) or $self->_removeFileDescriptor ($fd) ;
+ }
}
############### METHODE INET ADR BY NAME
sub _inetAdrByName ($$) {
- my ($self, $appName) = @_;
- my $addrInet = (grep ($self->[cnnxion]->{$_} eq $appName,
- keys %{$self->[cnnxion]}))[0];
-
- return ("unknow") unless defined $addrInet;
- my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/;
-
- my $host = (gethostbyaddr ($addr, AF_INET))[0] ;
- return "$host:$port";
+ my ($self, $appName) = @_;
+
+ my $addrInet = (grep ($self->[cnnxion]{$_} eq $appName,
+ keys %{$self->[cnnxion]}))[0];
+
+ return ("unknow") unless defined $addrInet;
+ my ($addr,$port) = $addrInet =~ /(.{4}):(.*)/;
+
+ my $host = (gethostbyaddr ($addr, AF_INET))[0] ;
+ return "$host:$port";
}
-############### METHODE REMOVE FILE DESCRIPTOR
+############### PROCEDURE REMOVE FILE DESCRIPTOR
sub _removeFileDescriptor ($$)
{
- my ($self, $fd) = @_;
- my $diedAppName;
+ my ($self, $fd) = @_;
-
-
- # on s'est deja occupe de lui
- return unless exists $self->[sockList]->{$fd};
- # printf ("DBG> _removeFileDescriptor IN thread %s\n", ${Thread->self});
-
- # on efface les structures de donnees associees au fd
- # on vire ce fd des fd a scruter dans la bcle d'evenements
- # uniquement si on est dans le thread principal
- # sinon le select merde salement sur ce coup
- &$fileEventFunc ($fd, '');
- delete $self->[sendRegList]->{$fd};
- delete $self->[sockList]->{$fd};
- delete $self->[buffByConn]->{$fd};
- delete $self->[queueList]->{$fd};
-
- # on clos la connection
- $fd->close ();
-
-# EXT_LOOP:
-# foreach my $name (keys %{$self->[appliList]}) {
-# @{$self->[appliList]{$name}} =
-# grep ($_ ne $fd, @{$self->[appliList]{$name}});
-# if (length (@{$self->[appliList]{$name}}) == 0) {
-# delete $self->[appliList]->{$name}} ;
-# }
-# }
-
- EXT_LOOP:
- foreach my $name (keys %{$self->[appliList]}) {
- foreach my $fdp (@{$self->[appliList]{$name}}) {
- if ($fd eq $fdp) {
- $diedAppName = $name;
- @{$self->[appliList]{$name}} =
- grep ($_ ne $fdp, @{$self->[appliList]{$name}});
- if (scalar (@{$self->[appliList]{$name}}) == 0) {
- delete $self->[appliList]->{$name}} ;
- last EXT_LOOP;
+ my $diedAppName;
+
+ # on s'est deja occupe de lui
+ return unless exists $self->[sockList]->{$fd};
+ # printf ("DBG> _removeFileDescriptor IN thread %s\n", ${Thread->self});
+
+ # on efface les structures de donnees associees au fd
+ # on vire ce fd des fd a scruter dans la bcle d'evenements
+ # uniquement si on est dans le thread principal
+ # sinon le select merde salement sur ce coup
+ &$fileEventFunc ($fd, '') ;
+ delete $self->[sendRegList]{$fd};
+ delete $self->[sockList]{$fd};
+ delete $self->[buffByConn]->{$fd};
+
+ $fd->close();
+
+ EXT_LOOP:
+ foreach my $name (keys %{$self->[appliList]}) {
+ foreach my $fdp (@{$self->[appliList]{$name}}) {
+ if ($fd eq $fdp) {
+ $diedAppName = $name;
+ @{$self->[appliList]{$name}} =
+ grep ($_ ne $fdp, @{$self->[appliList]{$name}});
+ if (scalar (@{$self->[appliList]{$name}}) == 0) {
+ delete $self->[appliList]->{$name}
+ }
+ last EXT_LOOP;
+ }
}
}
- }
-
-
- unless (defined $diedAppName) {
- warn "Ivy::_removeFileDescriptor : deconnection de NONAME\n" if $^W;
- return;
- }
-
- my $addrInet = (grep ($self->[cnnxion]->{$_} eq $diedAppName,
- keys %{$self->[cnnxion]}))[0];
- unless (defined $addrInet) {
- die "ERREUR _removeFileDescriptor deconnection de $diedAppName ".
- "addrInet not defined\n";
- return;
- }
- # printf "DBG> _removeFileDescriptor : deconnection de %s ($diedAppName)\n", _inetAdrByName ($self, $diedAppName);
+
+ unless (defined $diedAppName) {
+ warn "Ivy::__removeFileDescriptor : deconnection de NONAME\n" if $^W;
+ return;
+ }
- delete $self->[cnnxion]->{$addrInet};
-
- # on vire l'entree correspondant a ce canal dans la liste des
- # regexps par canal
- _scanConnStatus ($self) ;
+ my $addrInet = (grep ($self->[cnnxion]{$_} eq $diedAppName,
+ keys %{$self->[cnnxion]}))[0];
+
+ unless (defined $addrInet) {
+ die "ERREUR _removeFileDescriptor deconnection de $diedAppName ".
+ "addrInet not defined\n";
+ return;
+ }
+
+ #printf "DBG> _removeFileDescriptor : deconnection de %s ($diedAppName)\n", _inetAdrByName ($diedAppName);
+
+ delete $self->[cnnxion]{$addrInet};
+
+ # on vire l'entree correspondant a ce canal dans la liste des
+ # regexps par canal
+ $self->_scanConnStatus () ;
}
@@ -1400,8 +1355,8 @@ sub _sendErrorTo ($$$)
{
my ($self, $fd, $error) = @_;
- Thread::Queue::enqueue ($self->[queueList]->{$fd},
- \join (' ', ERROR, "0\002$error\n"));
+ send ($fd, join (' ', ERROR, "0\002$error\n"), 0)
+ or $self->_removeFileDescriptor ($fd);
}
@@ -1409,9 +1364,9 @@ sub _sendErrorTo ($$$)
sub _pong ($$)
{
my ($self, $fd) = @_;
-
- Thread::Queue::enqueue ($self->[queueList]->{$fd},
- \join (' ', PONG, "0\002\n"));
+
+ send ($fd, join (' ', PONG, "0\002 \n"), 0)
+ or $self->_removeFileDescriptor ($fd);
}
@@ -1420,63 +1375,64 @@ sub _sendDieTo ($$)
{
my ($self, $fd) = @_;
- Thread::Queue::enqueue ($self->[queueList]->{$fd},
- \join (' ', DIE, "0\002\n"));
+ send ($fd, join (' ', DIE, "0\002\n"), 0)
+ or $self->_removeFileDescriptor ($fd);
}
############### METHODE SEND MSG TO
sub _sendMsgTo ($$$)
{
- my ($self, $fd, $msg) = @_;
+ my ($self, $fd, $msg) = @_;
- # pour toutes les fonctions de filtrage de regexp
- foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
- &{$regexpFunc} (\$msg) if defined $regexpFunc;
- }
+ # pour toutes les fonctions de filtrage de regexp
+ foreach my $regexpFunc (@{$self->[sendRegList]{$fd}}) {
+ &{$regexpFunc} (\$msg) if defined $regexpFunc;
+ }
}
-############### METHODE TK FILE EVENT
+############### PROCEDURE TK FILE EVENT
sub _tkFileEvent ($$)
-{
+{
my ($fd, $cb) = @_;
Tk::fileevent ('', $fd, 'readable', $cb) ;
-}
-
-
+}
-############### METHODE SCAN AFTER
+############### PROCEDURE SCAN AFTER
sub _scanAfter ()
{
- my $stamp = timeofday ();
- $selectTimout = MAX_TIMOUT;
- foreach my $afk (keys %afterList) {
- my $af = $afterList{$afk};
- # si ce timer est a declencher
- if ($af->[2] <= $stamp) {
- # on traite : le temps de declencher le cb est arrive
- if (ref $af->[3] eq 'CODE') {
- &{$af->[3]};
- } else {
- my ($cb, @args) = @{$af->[3]};
- &$cb (@args);
- }
- # si c'est un repeat on le reconduit
- if ($af->[0]) {
- $af->[2] = $stamp + $af->[1] ;
- $selectTimout = $af->[1] if $af->[1] < $selectTimout;
- } else {
- # si c'est un after on le vire
- afterCancel ($afk);
- }
- } else {
- my $timeTotrigg = $af->[2] - $stamp;
- $selectTimout = $timeTotrigg if $timeTotrigg < $selectTimout;
- }
+ my $stamp = timeofday ();
+ $selectTimout = MAX_TIMOUT;
+ foreach my $afk (keys %afterList) {
+ my $af = $afterList{$afk};
+ # si ce timer est a declencher
+ if ($af->[2] <= $stamp) {
+ # on traite : le temps de declencher le cb est arrive
+ if (ref $af->[3] eq 'CODE') {
+ &{$af->[3]};
+ }
+ else {
+ my ($cb, @args) = @{$af->[3]};
+ &$cb (@args);
+ }
+ # si c'est un repeat on le reconduit
+ if ($af->[0]) {
+ $af->[2] = $stamp + $af->[1] ;
+ $selectTimout = $af->[1] if $af->[1] < $selectTimout;
+ }
+ else {
+ # si c'est un after on le vire
+ afterCancel ($afk);
+ }
+ }
+ else {
+ my $timeTotrigg = $af->[2] - $stamp;
+ $selectTimout = $timeTotrigg if $timeTotrigg < $selectTimout;
}
+ }
}
@@ -1491,7 +1447,7 @@ sub _scanConnStatus ($)
next if $_ eq "1";
$readyApp{$_}++ unless /^\004/; # connecte mais pas ready
}
-
+
foreach (@{$self->[neededApp]}) {
push (@nonReadyApp, $_) unless exists $readyApp{$_};
}
@@ -1510,48 +1466,50 @@ sub _scanConnStatus ($)
############### METHODE TO BE PRUNED
sub _toBePruned ($$$)
{
- my ($self, $from, $regexp) = @_;
-
-
- # si il n'y a pas de liste de sujets, on ne
- # filtre pas
- return 0 unless @{$self->[topicRegexps]};
+ my ($self, $from, $regexp) = @_;
- unless ($regexp =~ /^\^/) {
- #print "DBG> regexp non ANCREE de $from : $regexp\n";
- return (0);
- }
-
- if ($regexp =~ /^\^(\w+)/) {
- my $topic = $1;
- if (grep (/$topic/, @{$self->[topicRegexps]})) {
- # on a trouve ce topic : on ne filtre pas la regexp
- #print "DBG> on garde de $from : $regexp\n";
- return (0);
- }
- #print "DBG> on ELIMINE de $from : $regexp\n";
- return (1);
- } else {
- #print "DBG> on garde de $from : $regexp\n";
- return (0);
+ # si il n'y a pas de liste de sujets, on ne
+ # filtre pas
+ return 0 unless @{$self->[topicRegexps]};
+
+ unless ($regexp =~ /^\^/) {
+ #print "DBG> regexp non ANCREE de $from : $regexp\n";
+ return (0);
+ }
+
+ if ($regexp =~ /^\^(\w+)/) {
+ my $topic = $1;
+ if (grep (/$topic/, @{$self->[topicRegexps]})) {
+ # on a trouve ce topic : on ne filtre pas la regexp
+ #print "DBG> on garde de $from : $regexp\n";
+ return (0);
}
+ #print "DBG> on ELIMINE de $from : $regexp\n";
+ return (1);
+ }
+ else {
+ #print "DBG> on garde de $from : $regexp\n";
+ return (0);
+ }
}
-############### METHODE PARSE IVY BUS PARAM
+############### PROCEDURE PARSE IVY BUS PARAM
sub _parseIvyBusParam ($)
{
my $ivyBus = shift;
+
my ($ivyNetworks, $ivyPort) = $ivyBus =~ /^(.*):(.*)/;
+
die ("Erreur Ivy::_parseIvyBusParam format de l'adresse ou ".
"no de port incorrect : $ivyBus\n")
unless $ivyPort =~ /^\d+$/;
my @ivyAddrInet = ();
- $ivyNetworks =~ s/ //g;
+ $ivyNetworks =~ s/ //g;
my @broadcastAddrs = split (',', $ivyNetworks);
-
+
foreach my $netAddr (@broadcastAddrs) {
$netAddr = BROADCAST_ADDRS if
(($netAddr eq '') || ($netAddr =~ /^127/) ||
@@ -1594,107 +1552,295 @@ sub _parseIvyBusParam ($)
return ($ivyPort, \@ivyAddrInet);
}
+############# Procedure _SUBSTITUTE ESCAPED CHAR
+sub _substituteEscapedChar ($$)
+{
+ my ($scope, $reg) = @_;
-############# Methode _ SENDER THREAD
-sub _senderThread ($$$) {
- my ($self, $queue, $fd) = @_;
- my $data;
- # condition d'arret du thread :
- # pour l'arreter on enqueue undef.
- while ($data = $queue->dequeue()) {
- # si le send echoue, on vire le fd et on sort du thread.
-# last unless syswrite ($fd, $$data, length ($$data));
- last unless send ($fd, $$data, 0);
- }
+ my %escapeRegexp = REG_PERLISSISME;
+ # Si on fait la substitution dans une classe de caractere
+ # on elimine les crochets.
+ grep ($escapeRegexp{$_} =~ s/[\[\]]//g, keys %escapeRegexp)
+ if ($scope eq 'inside') ;
+
+ $reg =~ s/\\([wWsSdDne])/$escapeRegexp{$1}/ge;
+ return $reg;
}
+1;
-############# Methode _ SENDER THREAD_NON_BLOQUANT
+__END__
-# tentative pour pouvoir arreter facilement un thread
-# bloque sur un send. Mais c'est pas satisfaisant
-# car quand le fd est de nouveau accessible en ecriture,
-# les donnees accumulees dans l'accumulateur ne sont
-# envoyees que lorsque l'on ecrit a nouveau dans
-# la queue de communication du thread
-# Je laisse ce code pour info.
-# sub _senderThread_NON_BLOQUANT ($$$) {
-# my ($self, $queue, $fd) = @_;
-# my $data;
-# my @buff = ();
-
-# # on ne bloque plus sur ce fd
-# fcntl ($fd, F_SETFL, O_NDELAY) ;
-
-# EXT_LOOP:
-# for (;;) {
-# # condition d'arret du thread :
-# # pour l'arreter on enqueue undef.
-# last unless defined ($data = Thread::Queue::dequeue ($queue));
-
-# # on colle les donnees dans un accumulateur
-# push (@buff, $data);
-
-# # tant qu'on peut ecrire sur le fd, on le fait
-# while ($data = shift @buff) {
-# unless (syswrite ($fd, $$data, length ($$data))) {
-# # le write a plante :
-# if ($! == Errno::EWOULDBLOCK) {
-# # si c'est parce que le fd est temporairement innacessible
-# # on remets le message dans l'accumulateur et on se remets
-# # en attente sur la queue de communication
-# unshift (@buff, $data);
-# next EXT_LOOP;
-# } else {
-# # sinon c'est que le fd a ete ferme, dans ce cas le thread sort
-# last EXT_LOOP;
-# }
-# }
-# }
-
-# }
-# print ("DBG> sortie de thread\n");
-# }
+=head1 NAME
+Ivy - Perl extension for implementing a software bus
+=head1 SYNOPSIS
+use Ivy;
+=head1 DESCRIPTION
-############# Methode _CLOSE SENDER THREAD
-sub _closeSenderThread ($$)
-{
- my ($self, $fd) = @_;
- my $queue = $self->[queueList]->{$fd};
-
- # on vide la queue. Oui je sais :-)
- # printf ("DBG> on vide la queue dans le thread %s\n", ${Thread->self()});
- if (defined $queue) {
- while ($queue->dequeue_nb ()) {};
- $queue->enqueue (undef);
- _removeFileDescriptor ($self, $fd);
- }
+The Ivy perl module implements a software bus to provide with an easy
+communication between applications. Messages are broadcasted as ASCII strings
+over a network defined by a list of domains and a port.
+Messages are received if they match a regular expressions and if your application
+is on the same network as remote ones.
+Before receive or send message you must call 'init', and 'new' class methods,
+followed by 'start' method.
+When you quit your application don't forget to call 'exit' class methods.
- # printf ("DBG> demande de join du thread ${$self->[threadList]->{$fd}}\n");
- $self->[threadList]->{$fd}->join() if defined $self->[threadList]->{$fd};;
- # printf ("DBG> done\n");
-}
+=head1 CLASS METHODS
+=head2 Ivy->init(...)
+Allows one to define global parameters which may be used as default ones
+at object creation time.
-############# Procedure _SUBSTITUTE ESCAPED CHAR
-sub _substituteEscapedChar ($$)
-{
- my ($scope, $reg) = @_;
+Parameters are :
- my %escapeRegexp = REG_PERLISSISME;
- # Si on fait la substitution dans une classe de caractere
- # on elimine les crochets.
- grep ($escapeRegexp{$_} =~ s/[\[\]]//g, keys %escapeRegexp)
- if ($scope eq 'inside') ;
+=over 4
- $reg =~ s/\\([wWsSdDne])/$escapeRegexp{$1}/ge;
- return $reg;
+=item B<-loopMode =E<gt> 'TK'|'LOCAL'>
+
+Mode of events loop among TK or LOCAL. According to this mode, you must
+use Ivy->mainLoop or Tk::MainLoop(3)
+
+=item B<-appName =E<gt> 'your app ivy name'>
+
+Name of your application used to identify on ivy bus.
+
+=item B<-ivyBus =E<gt> 'domain 1,...,domain n:port number'>
+
+A list of domains, followed by port number where to broadcast messages.
+Default is 127.255.255.255:2010
+
+=item B<-messWhenReady =E<gt> 'your message when ready'>
+
+Synchronisation message sent when application is ready to receive and send
+messages.
+
+=item B<-onDieFunc =E<gt> [\&yourdiefunc, @parameters]>
+
+=item B<-onDieFunc =E<gt> [$an_object, \&a_method, @parameters]>
+
+A callback or method to call when application receive die message.
+Don't make an exit in callback, ivy we'll do it for you.
+
+Prototype of your callback must be :
+
+sub MyCallback {
+ my @parameters = @_;
+
+ ...
}
+Prototype of your method must be :
-1;
+sub MyMethod {
+ my ($self, @parameters) = @_;
+
+ ...
+}
+
+=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']>
+
+Optimize communication using this option. Regexps which don't match these subjects are removed.
+
+Example :
+
+ Ivy->init(-loopMode =E<gt> 'TK',
+ -appName =E<gt> 'MyWonderfulApp',
+ -onDieFunc =E<gt> [\&restorecontext]);
+
+=back
+
+=head2 Ivy->new(...);
+
+Check parameters, and create an ivy bus object. You must call Ivy->init before
+this one.
+
+Parameters are :
+
+=over 4
+
+=item B<-appName =E<gt> 'your app ivy name'>
+
+Name of your application used to identify on ivy bus.
+
+=item B<-ivyBus =E<gt> 'domain 1,...,domain n:port number'>
+
+A list of domains, followed by port number where to broadcast messages.
+Default is 127.255.255.255:2010
+
+=item B<-messWhenReady =E<gt> 'your message when ready'>
+
+Synchronisation message sent when application is ready to receive and send
+messages.
+
+=item B<-onDieFunc =E<gt> [\&yourdiefunc, @parameters]>
+
+=item B<-onDieFunc =E<gt> [$an_object, \&a_method, @parameters]>
+
+A callback or method to call when application receive die message.
+Don't make an exit in callback, ivy we'll do it for you.
+Prototype of your callback must be :
+
+ sub MyCallback {
+ my @parameters = @_;
+ ...
+ }
+
+Prototype of your method must be :
+
+ sub MyMethod {
+ my ($self, @parameters) = @_;
+ ...
+ }
+
+=item B<-pruneRegexp =E<gt> ['subject 1', ..., 'subject n']>
+
+Optimize communication using this option. Regexps which don't match these subjects are removed.
+
+=item B<-neededApp =E<gt> ['app 1', ..., 'app n']>
+
+A list of application your own one needs to correctly run.
+
+=item B<-statusFunc =E<gt> sub {}>
+
+A callback which will be called until every needed app is present on bus.
+
+Your callback could be :
+
+ sub MyCallback {
+ my ($present, $absent, %present) = @_;
+
+ foreach my $remoteapp (keys %present) {
+ if ($present{$remoteapp} > 1) {
+ print "n apps $remoteapp are presents on bus\n";
+ }
+ }
+
+Example :
+
+ Ivy->new(-ivyBus => '156.255.255.255,157.255.255.255:2204',
+ -onDieFunc => [\&restorecontext],
+ -neededApp => ["DataServer", "HMI"],
+ -statusFunc => \&startwhenpresents);
+
+=back
+
+=head2 Ivy->mainLoop;
+
+Local main events loop. Use it if you don't use Tk library.
+
+=head2 $ivyobj->stop;
+
+=head1 OBJECT METHODS
+
+=head2 $ivyobj->start;
+
+You must call it after you are ready to communicate through ivy bus
+and before you really communicate.
+
+=head2 $ivyobj->sendMsgs(@messages);
+
+Send a list of messages
+
+Example :
+
+ $ivyobj->sendMsgs("Hello World", "Don't Bother", "Y2K is behind us");
+
+=head2 $ivyobj->sendAppNameMsgs(@messages);
+
+Send a list of messages precedeed by ivy application name.
+
+Example :
+
+ $ivyobj->sendMsgs("Hello World");
+ # it will send "$appName Hello World" over ivy bus
+
+=head2 $ivyobject->bindRegexp($regexp, [\&callback, @cb_parameters]);
+
+=head2 $ivyobject->bindRegexp($regexp, [$an_obj, \&method, @cb_parameters]);
+
+Allow one to associate a message which matches a regular expression and a
+callback or method. See perlre(1), to find how to write regexps.
+Use the bracketing construct ( ... ), so that ivy perl will call
+callback with matched patterns as parameters.
+
+Example :
+
+ $ivyobject->bindRegexp("\w+ (\d+)", [\&callback, @cb_parameters]);
+ # You callback will be called with one more parameter which will be
+ # a number precedeed by a word, because of bracketed regexp.
+
+Your callback and method protos must be :
+
+ sub cb {
+ my ($sendername, @cb_parameters,
+ @matched_regexps_in_brackets) = @_;
+ ...
+ }
+
+ sub method {
+ my ($self, $sendername, @cb_parameters,
+ @matched_regexps_in_brackets) = @_;
+ ...
+ }
+
+=head2 $ivyobj->sendDirectMsgs($to, $id, @msgs);
+
+Send a message but Ask Alex to find what are $to and $id
+
+=head2 $ivyobj->bindDirect($regexp, $id, [\&callback, @cb_parameters]);
+
+=head2 $ivyobj->bindDirect($regexp, $id, [$an_obj, \&method, @cb_parameters]);
+
+Same as bindRegexp method but Ask Alex to find what is $id.
+
+=head2 $ivyobj->sendDieTo($to)
+
+send a die message to $to application name.
+
+=head2 $ivyobj->ping($to, $timeout);
+
+send a ping message and wait until timeout to receive a pong.
+
+=head2 $after_id = $ivyobj->after($timeAfter, \@callbacks_list);
+
+Call a list of callbacks after $timeAfter mseconds.
+
+=head2 $repeat_id = $ivyobj->repeat($timeAfter, \@callbacks_list);
+
+Repeat calls of a list of callbacks after $timeAfter mseconds.
+
+=head2 $ivyobj->afterCancel($after_or_repeat_id);
+
+Cancel an after callback call.
+
+=head2 $ivyobj->fileEvent($fd, $cb);
+
+Ask Alex
+
+=head2 $ivyobj->DESTROY;
+
+=head1 BUGS
+
+No know bugs at this time. Report them to author.
+
+=head1 SEE ALSO
+
+perl(1), perlre(1)
+
+=head1 AUTHORS
+
+Alexandre Bustico <bustico@tls.cena.fr>, Herve Damiano <damiano@tls.cena.fr>
+
+=head1 COPYRIGHT
+
+CENA
+
+=head1 HISTORY
+
+=cut