1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
/**
* Ivy java Async API tester: bindAsyncMsg, sendAsyncMsg.
*
* @author Yannick Jestin <mailto:jestin@cena.fr>
*
* (c) CENA
*
* usage: java AsyncAPI -h
*
* this program tests the Asynchronous reception of messages ( each callback
* is performed in a separate threads ). It also exhibits the behaviour of the
* library with regards to concurrent connections ! To stress the test, try it
* with different JVM ( kaffe is especially hard to pass ), and on SMP
* machines ...
*
* changelog
*
* 1.2.6 : async sending seems utterly buggy ...
*
*/
import fr.dgac.ivy.*;
import gnu.getopt.*;
class AsyncAPI {
public static final int MSGSIZE = 10;
public static final int NBITER = 100;
public static final int DELAYMS = 1000;
public static final String HEADER = "ASYNCPACKET";
public static final String TOSUBSCRIBE = "^"+HEADER+"([0-9]+) (.*)";
public static final String RECEIVENAME = "MSreceive";
public static final String SENDNAME = "MSsend";
private static long epoch = System.currentTimeMillis();
private Ivy bus;
DelayAnswer delay;
int re;
private String name;
boolean verbose;
private int nbpacket;
private int wait=0;
public AsyncAPI(int nb,String domain,int d, boolean v,boolean async) throws IvyException {
verbose=v;
nbpacket=nb;
bus = new Ivy(RECEIVENAME,null, null);
wait = d;
delay=new DelayAnswer();
if (async) re = bus.bindAsyncMsg(TOSUBSCRIBE,delay,BindType.ASYNC);
else re = bus.bindMsg(TOSUBSCRIBE,delay);
bus.start(domain);
}
private static java.text.DateFormat df = java.text.DateFormat.getTimeInstance();
private static String date() {
return "["+df.format(new java.util.Date())+"] ";
}
static Object truc = new Object();
volatile int count=0,total=0;
int status = 0;
synchronized void huh(IvyClient ic, String[] args) {
count++;
if (verbose) {
System.out.println(date()+"RECEIVE "+ count+"/"+nbpacket+" packets received arg:("+args[0]+")");
int nb = Integer.parseInt(args[0]);
total+=nb;
if (nb!=count) {
System.out.println("RECEIVE *** ERROR *** "+count+"!="+nb+ " - probable double connexion");
for (IvyClient i : bus.getIvyClients() ) System.out.println("client: "+i);
ic.sendDie("nok, bye");
bus.stop();
System.exit(-1);
}
}
if (count<nbpacket) return;
if (total==(((nbpacket+1)*nbpacket)/2)) {
System.out.println("RECEIVE receiver quitting the bus normally");
ic.sendDie("ok, bye");
bus.stop();
} else {
System.out.println("RECEIVE wrong count and total, hit ^C to exit");
//System.exit(-1);
}
}
class DelayAnswer implements IvyMessageListener {
public void receive(IvyClient ic, String[] args) {
huh(ic,args);
try {
System.out.println("RECEIVE Sleeping "+wait+"ms");
Thread.sleep(wait);
System.out.println("RECEIVE Finished Sleeping");
} catch (InterruptedException ie) {
System.out.println("RECEIVE Sleeping interrupted, not a problem");
}
}
}
public static final String helpmsg = "usage: java "+SENDNAME+" [options]\n\t-b domain\n\t-r\t to enable async reception\n\t-x to enable async sending\n\t-l loop count (default "+NBITER+")\n\t-s msgsize\t int value (default "+MSGSIZE+")\n\t-d delay in milliseconds (default "+DELAYMS+")\n\t-q \tquiet\n\t-e\tsend bus stop at the end\n\t-h\thelp\n\n";
public static void main(String[] args) throws IvyException {
Getopt opt = new Getopt(SENDNAME,args,"b:l:d:s:xrqeh");
String domain=null;
int nb = NBITER, delay = DELAYMS, c, size = MSGSIZE;
boolean doasyncSend=false, doasyncBind=false, verbose=true, exit=false;
while ((c = opt.getopt()) != -1) switch (c) {
case 'b':
domain=opt.getOptarg();
break;
case 's':
size=Integer.parseInt(opt.getOptarg());
break;
case 'l':
nb=Integer.parseInt(opt.getOptarg());
break;
case 'd':
delay=Integer.parseInt(opt.getOptarg());
break;
case 'e':
exit=true;
break;
case 'q':
verbose=false;
break;
case 'x':
System.out.println("async sending is not robust enough. end of test.");
System.exit(-1);
doasyncSend=true;
break;
case 'r':
doasyncBind=true;
break;
case 'h':
default:
System.out.println(helpmsg);
System.exit(0);
}
System.out.println("bus:"+domain+" loop:"+nb+" delay:"+delay+" verbose:"+verbose+" asyncBind:"+
doasyncBind+" asyncSend:"+doasyncSend+" msgsize:"+size);
AsyncAPI receiver = new AsyncAPI(nb,domain,delay,verbose,doasyncBind);
Ivy mainbus = new Ivy(SENDNAME,null, null);
mainbus.start(domain);
if ((mainbus.waitForClient(RECEIVENAME,5000))==null) {
System.out.println(RECEIVENAME+" did not join the bus. Quitting");
System.exit(-1);
}
System.out.println(RECEIVENAME+" is here, sending packets");
StringBuffer tosend = new StringBuffer(size);
for (int i=0;i<size;i++) tosend.append("a");
for (int i=1;i<=nb;i++) {
if (verbose) System.out.println(date()+"SENDER sending packet "+i);
mainbus.sendMsg(HEADER+i+" "+tosend.toString());
// TODO mainbus.sendMsg(HEADER+i+" "+tosend.toString(),doasyncSend);
}
System.out.println(date()+"SENDER sender has sent all its packets, waiting for a die message");
// i won't stop the sender's bus here, otherwise the all the packet
// can still be unprocessed
// TODO regession test for Ivy.stop()
}
}
|