7 /* multi-threaded QMQP test generator
10 /* \fBqmqp-source\fR [\fIoptions\fR] [\fBinet:\fR]\fIhost\fR[:\fIport\fR]
12 /* \fBqmqp-source\fR [\fIoptions\fR] \fBunix:\fIpathname\fR
14 /* \fBqmqp-source\fR connects to the named host and TCP port (default 628)
15 /* and sends one or more messages to it, either sequentially
16 /* or in parallel. The program speaks the QMQP protocol.
17 /* Connections can be made to UNIX-domain and IPv4 or IPv6 servers.
18 /* IPv4 and IPv6 are the default.
20 /* Note: this is an unsupported test program. No attempt is made
21 /* to maintain compatibility between successive versions.
25 /* Connect to the server with IPv4. This option has no effect when
26 /* Postfix is built without IPv6 support.
28 /* Connect to the server with IPv6. This option is not available when
29 /* Postfix is built without IPv6 support.
31 /* Display a running counter that is incremented each time
32 /* a delivery completes.
33 /* .IP "\fB-C \fIcount\fR"
34 /* When a host sends RESET instead of SYN|ACK, try \fIcount\fR times
35 /* before giving up. The default count is 1. Specify a larger count in
36 /* order to work around a problem with TCP/IP stacks that send RESET
37 /* when the listen queue is full.
38 /* .IP "\fB-f \fIfrom\fR"
39 /* Use the specified sender address (default: <foo@myhostname>).
40 /* .IP "\fB-l \fIlength\fR"
41 /* Send \fIlength\fR bytes as message payload. The length
42 /* includes the message headers.
43 /* .IP "\fB-m \fImessage_count\fR"
44 /* Send the specified number of messages (default: 1).
45 /* .IP "\fB-M \fImyhostname\fR"
46 /* Use the specified hostname or [address] in the default
47 /* sender and recipient addresses, instead of the machine
49 /* .IP "\fB-r \fIrecipient_count\fR"
50 /* Send the specified number of recipients per transaction (default: 1).
51 /* Recipient names are generated by prepending a number to the
53 /* .IP "\fB-s \fIsession_count\fR"
54 /* Run the specified number of QMQP sessions in parallel (default: 1).
55 /* .IP "\fB-t \fIto\fR"
56 /* Use the specified recipient address (default: <foo@myhostname>).
57 /* .IP "\fB-R \fIinterval\fR"
58 /* Wait for a random period of time 0 <= n <= interval between messages.
59 /* Suspending one thread does not affect other delivery threads.
61 /* Make the program more verbose, for debugging purposes.
62 /* .IP "\fB-w \fIinterval\fR"
63 /* Wait a fixed time between messages.
64 /* Suspending one thread does not affect other delivery threads.
66 /* qmqp-sink(1), QMQP message dump
70 /* The Secure Mailer license must be distributed with this software.
73 /* IBM T.J. Watson Research
75 /* Yorktown Heights, NY 10598, USA
81 #include <sys/socket.h>
83 #include <netinet/in.h>
91 /* Utility library. */
94 #include <msg_vstream.h>
97 #include <get_hostname.h>
100 #include <mymalloc.h>
103 #include <netstring.h>
104 #include <sane_connect.h>
105 #include <host_port.h>
106 #include <myaddrinfo.h>
107 #include <inet_proto.h>
108 #include <valid_hostname.h>
109 #include <valid_mailhost_addr.h>
111 /* Global library. */
113 #include <mail_date.h>
114 #include <qmqp_proto.h>
115 #include <mail_version.h>
117 /* Application-specific. */
120 * Per-session data structure with state.
122 * This software can maintain multiple parallel connections to the same QMQP
123 * server. However, it makes no more than one connection request at a time
124 * to avoid overwhelming the server with SYN packets and having to back off.
125 * Back-off would screw up the benchmark. Pending connection requests are
126 * kept in a linear list.
128 typedef struct SESSION
{
129 int xfer_count
; /* # of xfers in session */
130 int rcpt_done
; /* # of recipients done */
131 int rcpt_count
; /* # of recipients to go */
132 VSTREAM
*stream
; /* open connection */
133 int connect_count
; /* # of connect()s to retry */
134 struct SESSION
*next
; /* connect() queue linkage */
137 static SESSION
*last_session
; /* connect() queue tail */
139 static VSTRING
*buffer
;
140 static int var_line_limit
= 10240;
141 static int var_timeout
= 300;
142 static const char *var_myhostname
;
143 static int session_count
;
144 static int message_count
= 1;
145 static struct sockaddr_storage ss
;
148 static struct sockaddr_un sun
;
149 static struct sockaddr
*sa
;
150 static int sa_length
;
151 static int recipients
= 1;
152 static char *defaddr
;
153 static char *recipient
;
155 static int message_length
= 1024;
156 static int count
= 0;
157 static int counter
= 0;
158 static int connect_count
= 1;
159 static int random_delay
= 0;
160 static int fixed_delay
= 0;
161 static const char *mydate
;
164 static void enqueue_connect(SESSION
*);
165 static void start_connect(SESSION
*);
166 static void connect_done(int, char *);
168 static void send_data(SESSION
*);
169 static void receive_reply(int, char *);
171 static VSTRING
*message_buffer
;
172 static VSTRING
*sender_buffer
;
173 static VSTRING
*recipient_buffer
;
175 /* Silly little macros. */
177 #define STR(x) vstring_str(x)
178 #define LEN(x) VSTRING_LEN(x)
180 /* random_interval - generate a random value in 0 .. (small) interval */
182 static int random_interval(int interval
)
184 return (rand() % (interval
+ 1));
187 /* socket_error - look up and reset the last socket error */
189 static int socket_error(int sock
)
192 SOCKOPT_SIZE error_len
;
195 * Some Solaris 2 versions have getsockopt() itself return the error,
196 * instead of returning it via the parameter list.
199 error_len
= sizeof(error
);
200 if (getsockopt(sock
, SOL_SOCKET
, SO_ERROR
, (char *) &error
, &error_len
) < 0)
213 /* exception_text - translate exceptions from the netstring module */
215 static char *exception_text(int except
)
220 case NETSTRING_ERR_EOF
:
221 return ("lost connection");
222 case NETSTRING_ERR_TIME
:
224 case NETSTRING_ERR_FORMAT
:
225 return ("netstring format error");
226 case NETSTRING_ERR_SIZE
:
227 return ("netstring size exceeds limit");
229 msg_panic("exception_text: unknown exception %d", except
);
234 /* startup - connect to server but do not wait */
236 static void startup(SESSION
*session
)
238 if (message_count
-- <= 0) {
239 myfree((char *) session
);
243 enqueue_connect(session
);
246 /* start_event - invoke startup from timer context */
248 static void start_event(int unused_event
, char *context
)
250 SESSION
*session
= (SESSION
*) context
;
255 /* start_another - start another session */
257 static void start_another(SESSION
*session
)
259 if (random_delay
> 0) {
260 event_request_timer(start_event
, (char *) session
,
261 random_interval(random_delay
));
262 } else if (fixed_delay
> 0) {
263 event_request_timer(start_event
, (char *) session
, fixed_delay
);
269 /* enqueue_connect - queue a connection request */
271 static void enqueue_connect(SESSION
*session
)
274 if (last_session
== 0) {
275 last_session
= session
;
276 start_connect(session
);
278 last_session
->next
= session
;
279 last_session
= session
;
283 /* dequeue_connect - connection request completed */
285 static void dequeue_connect(SESSION
*session
)
287 if (session
== last_session
) {
288 if (session
->next
!= 0)
289 msg_panic("dequeue_connect: queue ends after last");
292 if (session
->next
== 0)
293 msg_panic("dequeue_connect: queue ends before last");
294 start_connect(session
->next
);
298 /* fail_connect - handle failed startup */
300 static void fail_connect(SESSION
*session
)
302 if (session
->connect_count
-- == 1)
303 msg_fatal("connect: %m");
304 msg_warn("connect: %m");
305 event_disable_readwrite(vstream_fileno(session
->stream
));
306 vstream_fclose(session
->stream
);
308 #ifdef MISSING_USLEEP
313 start_connect(session
);
316 /* start_connect - start TCP handshake */
318 static void start_connect(SESSION
*session
)
321 struct linger linger
;
324 * Some systems don't set the socket error when connect() fails early
325 * (loopback) so we must deal with the error immediately, rather than
326 * retrieving it later with getsockopt(). We can't use MSG_PEEK to
327 * distinguish between server disconnect and connection refused.
329 if ((fd
= socket(sa
->sa_family
, SOCK_STREAM
, 0)) < 0)
330 msg_fatal("socket: %m");
331 (void) non_blocking(fd
, NON_BLOCKING
);
334 if (setsockopt(fd
, SOL_SOCKET
, SO_LINGER
, (char *) &linger
,
336 msg_warn("setsockopt SO_LINGER %d: %m", linger
.l_linger
);
337 session
->stream
= vstream_fdopen(fd
, O_RDWR
);
338 event_enable_write(fd
, connect_done
, (char *) session
);
339 netstring_setup(session
->stream
, var_timeout
);
340 if (sane_connect(fd
, sa
, sa_length
) < 0 && errno
!= EINPROGRESS
)
341 fail_connect(session
);
344 /* connect_done - send message sender info */
346 static void connect_done(int unused_event
, char *context
)
348 SESSION
*session
= (SESSION
*) context
;
349 int fd
= vstream_fileno(session
->stream
);
352 * Try again after some delay when the connection failed, in case they
353 * run a Mickey Mouse protocol stack.
355 if (socket_error(fd
) < 0) {
356 fail_connect(session
);
358 dequeue_connect(session
);
359 non_blocking(fd
, BLOCKING
);
360 event_disable_readwrite(fd
);
361 /* Avoid poor performance when TCP MSS > VSTREAM_BUFSIZE. */
362 if (sa
->sa_family
== AF_INET
364 || sa
->sa_family
== AF_INET6
367 vstream_tweak_tcp(session
->stream
);
372 /* send_data - send message+sender+recipients */
374 static void send_data(SESSION
*session
)
376 int fd
= vstream_fileno(session
->stream
);
380 * Prepare for disaster.
382 if ((except
= vstream_setjmp(session
->stream
)) != 0)
383 msg_fatal("%s while sending message", exception_text(except
));
386 * Send the message content, by wrapping three netstrings into an
387 * over-all netstring.
389 * XXX This should be done more carefully to avoid blocking when sending
390 * large messages over slow networks.
392 netstring_put_multi(session
->stream
,
393 STR(message_buffer
), LEN(message_buffer
),
394 STR(sender_buffer
), LEN(sender_buffer
),
395 STR(recipient_buffer
), LEN(recipient_buffer
),
397 netstring_fflush(session
->stream
);
400 * Wake me up when the server replies or when something bad happens.
402 event_enable_read(fd
, receive_reply
, (char *) session
);
405 /* receive_reply - read server reply */
407 static void receive_reply(int unused_event
, char *context
)
409 SESSION
*session
= (SESSION
*) context
;
413 * Prepare for disaster.
415 if ((except
= vstream_setjmp(session
->stream
)) != 0)
416 msg_fatal("%s while receiving server reply", exception_text(except
));
419 * Receive and process the server reply.
421 netstring_get(session
->stream
, buffer
, var_line_limit
);
423 vstream_printf("<< %.*s\n", (int) LEN(buffer
), STR(buffer
));
424 if (STR(buffer
)[0] != QMQP_STAT_OK
)
425 msg_fatal("%s error: %.*s",
426 STR(buffer
)[0] == QMQP_STAT_RETRY
? "recoverable" :
427 STR(buffer
)[0] == QMQP_STAT_HARD
? "unrecoverable" :
428 "unknown", (int) LEN(buffer
) - 1, STR(buffer
) + 1);
431 * Update the optional running counter.
435 vstream_printf("%d\r", counter
);
436 vstream_fflush(VSTREAM_OUT
);
440 * Finish this session. QMQP sends only one message per session.
442 event_disable_readwrite(vstream_fileno(session
->stream
));
443 vstream_fclose(session
->stream
);
445 start_another(session
);
448 /* usage - explain */
450 static void usage(char *myname
)
452 msg_fatal("usage: %s -cv -s sess -l msglen -m msgs -C count -M myhostname -f from -t to -R delay -w delay host[:port]", myname
);
455 MAIL_VERSION_STAMP_DECLARE
;
457 /* main - parse JCL and start the machine */
459 int main(int argc
, char **argv
)
472 const char *parse_err
;
473 struct addrinfo
*res
;
475 const char *protocols
= INET_PROTO_NAME_ALL
;
476 INET_PROTO_INFO
*proto_info
;
479 * Fingerprint executables and core dumps.
481 MAIL_VERSION_STAMP_ALLOCATE
;
483 signal(SIGPIPE
, SIG_IGN
);
484 msg_vstream_init(argv
[0], VSTREAM_ERR
);
489 while ((ch
= GETOPT(argc
, argv
, "46cC:f:l:m:M:r:R:s:t:vw:")) > 0) {
492 protocols
= INET_PROTO_NAME_IPV4
;
495 protocols
= INET_PROTO_NAME_IPV6
;
501 if ((connect_count
= atoi(optarg
)) <= 0)
508 if ((message_length
= atoi(optarg
)) <= 0)
512 if ((message_count
= atoi(optarg
)) <= 0)
516 if (*optarg
== '[') {
517 if (!valid_mailhost_literal(optarg
, DO_GRIPE
))
518 msg_fatal("bad address literal: %s", optarg
);
520 if (!valid_hostname(optarg
, DO_GRIPE
))
521 msg_fatal("bad hostname: %s", optarg
);
523 var_myhostname
= optarg
;
526 if ((recipients
= atoi(optarg
)) <= 0)
530 if (fixed_delay
> 0 || (random_delay
= atoi(optarg
)) <= 0)
534 if ((sessions
= atoi(optarg
)) <= 0)
544 if (random_delay
> 0 || (fixed_delay
= atoi(optarg
)) <= 0)
551 if (argc
- optind
!= 1)
554 if (random_delay
> 0)
558 * Translate endpoint address to internal form.
560 proto_info
= inet_proto_init("protocols", protocols
);
561 if (strncmp(argv
[optind
], "unix:", 5) == 0) {
562 path
= argv
[optind
] + 5;
563 path_len
= strlen(path
);
564 if (path_len
>= (int) sizeof(sun
.sun_path
))
565 msg_fatal("unix-domain name too long: %s", path
);
566 memset((char *) &sun
, 0, sizeof(sun
));
567 sun
.sun_family
= AF_UNIX
;
569 sun
.sun_len
= path_len
+ 1;
571 memcpy(sun
.sun_path
, path
, path_len
);
572 sa
= (struct sockaddr
*) & sun
;
573 sa_length
= sizeof(sun
);
575 if (strncmp(argv
[optind
], "inet:", 5) == 0)
577 buf
= mystrdup(argv
[optind
]);
578 if ((parse_err
= host_port(buf
, &host
, (char *) 0, &port
, "628")) != 0)
579 msg_fatal("%s: %s", argv
[optind
], parse_err
);
580 if ((aierr
= hostname_to_sockaddr(host
, port
, SOCK_STREAM
, &res
)) != 0)
581 msg_fatal("%s: %s", argv
[optind
], MAI_STRERROR(aierr
));
583 sa
= (struct sockaddr
*) & ss
;
584 if (res
->ai_addrlen
> sizeof(ss
))
585 msg_fatal("address length %d > buffer length %d",
586 (int) res
->ai_addrlen
, (int) sizeof(ss
));
587 memcpy((char *) sa
, res
->ai_addr
, res
->ai_addrlen
);
588 sa_length
= res
->ai_addrlen
;
590 sa
->sa_len
= sa_length
;
596 * Allocate space for temporary buffer.
598 buffer
= vstring_alloc(100);
601 * Make sure we have sender and recipient addresses.
603 if (var_myhostname
== 0)
604 var_myhostname
= get_hostname();
605 if (sender
== 0 || recipient
== 0) {
606 vstring_sprintf(buffer
, "foo@%s", var_myhostname
);
607 defaddr
= mystrdup(vstring_str(buffer
));
615 * Prepare some results that may be used multiple times: the message
616 * content netstring, the sender netstring, and the recipient netstrings.
618 mydate
= mail_date(time((time_t *) 0));
621 message_buffer
= vstring_alloc(message_length
+ 200);
622 vstring_sprintf(buffer
,
623 "From: <%s>\nTo: <%s>\nDate: %s\nMessage-Id: <%d@%s>\n\n",
624 sender
, recipient
, mydate
, mypid
, var_myhostname
);
625 for (n
= 1; LEN(buffer
) < message_length
; n
++) {
626 for (i
= 0; i
< n
&& i
< 79; i
++)
627 VSTRING_ADDCH(buffer
, 'X');
628 VSTRING_ADDCH(buffer
, '\n');
630 STR(buffer
)[message_length
- 1] = '\n';
631 netstring_memcpy(message_buffer
, STR(buffer
), message_length
);
633 len
= strlen(sender
);
634 sender_buffer
= vstring_alloc(len
);
635 netstring_memcpy(sender_buffer
, sender
, len
);
637 if (recipients
== 1) {
638 len
= strlen(recipient
);
639 recipient_buffer
= vstring_alloc(len
);
640 netstring_memcpy(recipient_buffer
, recipient
, len
);
642 recipient_buffer
= vstring_alloc(100);
643 for (n
= 0; n
< recipients
; n
++) {
644 vstring_sprintf(buffer
, "%d%s", n
, recipient
);
645 netstring_memcat(recipient_buffer
, STR(buffer
), LEN(buffer
));
652 while (sessions
-- > 0) {
653 session
= (SESSION
*) mymalloc(sizeof(*session
));
655 session
->xfer_count
= 0;
656 session
->connect_count
= connect_count
;
663 if (session_count
<= 0 && message_count
<= 0) {
665 VSTREAM_PUTC('\n', VSTREAM_OUT
);
666 vstream_fflush(VSTREAM_OUT
);