7 /* active queue management
11 /* void qmgr_active_feed(scan_info, queue_id)
12 /* QMGR_SCAN *scan_info;
13 /* const char *queue_id;
15 /* void qmgr_active_drain()
17 /* int qmgr_active_done(message)
18 /* QMGR_MESSAGE *message;
20 /* These functions maintain the active message queue: the set
21 /* of messages that the queue manager is actually working on.
22 /* The active queue is limited in size. Messages are drained
23 /* from the active queue by allocating a delivery process and
24 /* by delivering mail via that process. Messages leak into the
25 /* active queue only when the active queue is small enough.
26 /* Damaged message files are saved to the "corrupt" directory.
28 /* qmgr_active_feed() inserts the named message file into
29 /* the active queue. Message files with the wrong name or
30 /* with other wrong properties are skipped but not removed.
31 /* The following queue flags are recognized, other flags being
34 /* Examine all queue files. Normally, deferred queue files with
35 /* future time stamps are ignored, and incoming queue files with
36 /* future time stamps are frowned upon.
38 /* qmgr_active_drain() allocates one delivery process.
39 /* Process allocation is asynchronous. Once the delivery
40 /* process is available, an attempt is made to deliver
41 /* a message via it. Message delivery is asynchronous, too.
43 /* qmgr_active_done() deals with a message after delivery
44 /* has been tried for all in-core recipients. If the message
45 /* was bounced, a bounce message is sent to the sender, or
46 /* to the Errors-To: address if one was specified.
47 /* If there are more on-file recipients, a new batch of
48 /* in-core recipients is read from the queue file. Otherwise,
49 /* if a delivery agent marked the queue file as corrupt,
50 /* the queue file is moved to the "corrupt" queue (surprise);
51 /* if at least one delivery failed, the message is moved
52 /* to the deferred queue. The time stamps of a deferred queue
53 /* file are set to the nearest wakeup time of its recipient
54 /* sites (if delivery failed due to a problem with a next-hop
55 /* host), are set into the future by the amount of time the
56 /* message was queued (per-message exponential backoff), or are set
57 /* into the future by a minimal backoff time, whichever is more.
58 /* The minimal_backoff_time parameter specifies the minimal
59 /* amount of time between delivery attempts; maximal_backoff_time
60 /* specifies an upper limit.
62 /* Fatal: queue file access failures, out of memory.
63 /* Panic: interface violations, internal consistency errors.
64 /* Warnings: corrupt message file. A corrupt message is saved
65 /* to the "corrupt" queue for further inspection.
69 /* The Secure Mailer license must be distributed with this software.
72 /* IBM T.J. Watson Research
74 /* Yorktown Heights, NY 10598, USA
88 #ifndef S_IRWXU /* What? no POSIX system? */
92 /* Utility library. */
101 #include <mail_params.h>
102 #include <mail_open_ok.h>
103 #include <mail_queue.h>
104 #include <recipient_list.h>
109 #include <rec_type.h>
110 #include <qmgr_user.h>
112 /* Application-specific. */
117 * A bunch of call-back routines.
119 static void qmgr_active_done_2_bounce_flush(int, char *);
120 static void qmgr_active_done_2_generic(QMGR_MESSAGE
*);
121 static void qmgr_active_done_3_defer_flush(int, char *);
122 static void qmgr_active_done_3_defer_warn(int, char *);
123 static void qmgr_active_done_3_generic(QMGR_MESSAGE
*);
125 /* qmgr_active_corrupt - move corrupted file out of the way */
127 static void qmgr_active_corrupt(const char *queue_id
)
129 const char *myname
= "qmgr_active_corrupt";
131 if (mail_queue_rename(queue_id
, MAIL_QUEUE_ACTIVE
, MAIL_QUEUE_CORRUPT
)) {
133 msg_fatal("%s: save corrupt file queue %s id %s: %m",
134 myname
, MAIL_QUEUE_ACTIVE
, queue_id
);
136 msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
137 queue_id
, MAIL_QUEUE_ACTIVE
, MAIL_QUEUE_CORRUPT
);
141 /* qmgr_active_defer - defer queue file */
143 static void qmgr_active_defer(const char *queue_name
, const char *queue_id
,
144 const char *dest_queue
, int delay
)
146 const char *myname
= "qmgr_active_defer";
151 msg_info("wakeup %s after %ld secs", queue_id
, (long) delay
);
153 tbuf
.actime
= tbuf
.modtime
= event_time() + delay
;
154 path
= mail_queue_path((VSTRING
*) 0, queue_name
, queue_id
);
155 if (utime(path
, &tbuf
) < 0 && errno
!= ENOENT
)
156 msg_fatal("%s: update %s time stamps: %m", myname
, path
);
157 if (mail_queue_rename(queue_id
, queue_name
, dest_queue
)) {
159 msg_fatal("%s: rename %s from %s to %s: %m", myname
,
160 queue_id
, queue_name
, dest_queue
);
161 msg_warn("%s: rename %s from %s to %s: %m", myname
,
162 queue_id
, queue_name
, dest_queue
);
163 } else if (msg_verbose
) {
164 msg_info("%s: defer %s", myname
, queue_id
);
168 /* qmgr_active_feed - feed one message into active queue */
170 int qmgr_active_feed(QMGR_SCAN
*scan_info
, const char *queue_id
)
172 const char *myname
= "qmgr_active_feed";
173 QMGR_MESSAGE
*message
;
177 if (strcmp(scan_info
->queue
, MAIL_QUEUE_ACTIVE
) == 0)
178 msg_panic("%s: bad queue %s", myname
, scan_info
->queue
);
180 msg_info("%s: queue %s", myname
, scan_info
->queue
);
183 * Make sure this is something we are willing to open.
185 if (mail_open_ok(scan_info
->queue
, queue_id
, &st
, &path
) == MAIL_OPEN_NO
)
189 msg_info("%s: %s", myname
, path
);
192 * Skip files that have time stamps into the future. They need to cool
193 * down. Incoming and deferred files can have future time stamps.
195 if ((scan_info
->flags
& QMGR_SCAN_ALL
) == 0
196 && st
.st_mtime
> time((time_t *) 0) + 1) {
198 msg_info("%s: skip %s (%ld seconds)", myname
, queue_id
,
199 (long) (st
.st_mtime
- event_time()));
204 * Move the message to the active queue. File access errors are fatal.
206 if (mail_queue_rename(queue_id
, scan_info
->queue
, MAIL_QUEUE_ACTIVE
)) {
208 msg_fatal("%s: %s: rename from %s to %s: %m", myname
,
209 queue_id
, scan_info
->queue
, MAIL_QUEUE_ACTIVE
);
210 msg_warn("%s: %s: rename from %s to %s: %m", myname
,
211 queue_id
, scan_info
->queue
, MAIL_QUEUE_ACTIVE
);
216 * Extract envelope information: sender and recipients. At this point,
217 * mail addresses have been processed by the cleanup service so they
218 * should be in canonical form. Generate requests to deliver this
221 * Throwing away queue files seems bad, especially when they made it this
222 * far into the mail system. Therefore we save bad files to a separate
223 * directory for further inspection.
225 * After queue manager restart it is possible that a queue file is still
226 * being delivered. In that case (the file is locked), defer delivery by
227 * a minimal amount of time.
229 #define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
231 if ((message
= qmgr_message_alloc(MAIL_QUEUE_ACTIVE
, queue_id
,
232 (st
.st_mode
& MAIL_QUEUE_STAT_UNTHROTTLE
) ?
233 scan_info
->flags
| QMGR_FLUSH_AFTER
:
235 (st
.st_mode
& MAIL_QUEUE_STAT_UNTHROTTLE
) ?
236 st
.st_mode
& ~MAIL_QUEUE_STAT_UNTHROTTLE
:
238 qmgr_active_corrupt(queue_id
);
240 } else if (message
== QMGR_MESSAGE_LOCKED
) {
241 qmgr_active_defer(MAIL_QUEUE_ACTIVE
, queue_id
, MAIL_QUEUE_INCOMING
, 60);
246 * Special case if all recipients were already delivered. Send any
247 * bounces and clean up.
249 if (message
->refcount
== 0)
250 qmgr_active_done(message
);
255 /* qmgr_active_done - dispose of message after recipients have been tried */
257 void qmgr_active_done(QMGR_MESSAGE
*message
)
259 const char *myname
= "qmgr_active_done";
263 msg_info("%s: %s", myname
, message
->queue_id
);
266 * During a previous iteration, an attempt to bounce this message may
267 * have failed, so there may still be a bounce log lying around. XXX By
268 * groping around in the bounce queue, we're trespassing on the bounce
269 * service's territory. But doing so is more robust than depending on the
270 * bounce daemon to do the lookup for us, and for us to do the deleting
271 * after we have received a successful status from the bounce service.
272 * The bounce queue directory blocks are most likely in memory anyway. If
273 * these lookups become a performance problem we will have to build an
274 * in-core cache into the bounce daemon.
276 * Don't bounce when the bounce log is empty. The bounce process obviously
277 * failed, and the delivery agent will have requested that the message be
280 * Bounces are sent asynchronously to avoid stalling while the cleanup
281 * daemon waits for the qmgr to accept the "new mail" trigger.
283 * See also code in cleanup_bounce.c.
285 if (stat(mail_queue_path((VSTRING
*) 0, MAIL_QUEUE_BOUNCE
, message
->queue_id
), &st
) == 0) {
286 if (st
.st_size
== 0) {
287 if (mail_queue_remove(MAIL_QUEUE_BOUNCE
, message
->queue_id
))
288 msg_fatal("remove %s %s: %m",
289 MAIL_QUEUE_BOUNCE
, message
->queue_id
);
292 msg_info("%s: bounce %s", myname
, message
->queue_id
);
293 if (message
->verp_delims
== 0 || var_verp_bounce_off
)
294 abounce_flush(BOUNCE_FLAG_KEEP
,
301 qmgr_active_done_2_bounce_flush
,
304 abounce_flush_verp(BOUNCE_FLAG_KEEP
,
311 message
->verp_delims
,
312 qmgr_active_done_2_bounce_flush
,
319 * Asynchronous processing does not reach this point.
321 qmgr_active_done_2_generic(message
);
324 /* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
326 static void qmgr_active_done_2_bounce_flush(int status
, char *context
)
328 QMGR_MESSAGE
*message
= (QMGR_MESSAGE
*) context
;
331 * Process abounce_flush() status and continue processing.
333 message
->flags
|= status
;
334 qmgr_active_done_2_generic(message
);
337 /* qmgr_active_done_2_generic - continue processing */
339 static void qmgr_active_done_2_generic(QMGR_MESSAGE
*message
)
341 const char *myname
= "qmgr_active_done_2_generic";
347 * A delivery agent marks a queue file as corrupt by changing its
348 * attributes, and by pretending that delivery was deferred.
351 && mail_open_ok(MAIL_QUEUE_ACTIVE
, message
->queue_id
, &st
, &path
) == MAIL_OPEN_NO
) {
352 qmgr_active_corrupt(message
->queue_id
);
353 qmgr_message_free(message
);
358 * If we did not read all recipients from this file, go read some more,
359 * but remember whether some recipients have to be tried again.
361 * Throwing away queue files seems bad, especially when they made it this
362 * far into the mail system. Therefore we save bad files to a separate
363 * directory for further inspection by a human being.
365 if (message
->rcpt_offset
> 0) {
366 if (qmgr_message_realloc(message
) == 0) {
367 qmgr_active_corrupt(message
->queue_id
);
368 qmgr_message_free(message
);
370 if (message
->refcount
== 0)
371 qmgr_active_done(message
); /* recurse for consistency */
377 * As a temporary implementation, synchronously inform the sender of
378 * trace information. This will block for 10 seconds when the qmgr FIFO
381 * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS
382 * and others not. Depending on what subset of recipients are delivered,
383 * a trace file may or may not be created. Even when the last partial
384 * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may
385 * still exist from a previous partial delivery attempt. So as long as
386 * any recipient has NOTIFY=SUCCESS we have to always look for the trace
387 * file and be prepared for the file not to exist.
389 * See also comments in bounce/bounce_notify_util.c.
391 if ((message
->tflags
& (DEL_REQ_FLAG_USR_VRFY
| DEL_REQ_FLAG_RECORD
))
392 || (message
->rflags
& QMGR_READ_FLAG_NOTIFY_SUCCESS
)) {
393 status
= trace_flush(message
->tflags
,
400 if (status
== 0 && message
->tflags_offset
)
401 qmgr_message_kill_record(message
, message
->tflags_offset
);
402 message
->flags
|= status
;
406 * If we get to this point we have tried all recipients for this message.
407 * If the message is too old, try to bounce it.
409 * Bounces are sent asynchronously to avoid stalling while the cleanup
410 * daemon waits for the qmgr to accept the "new mail" trigger.
412 if (message
->flags
) {
413 if (event_time() >= message
->create_time
+
414 (*message
->sender
? var_max_queue_time
: var_dsn_queue_time
)) {
415 msg_info("%s: from=<%s>, status=expired, returned to sender",
416 message
->queue_id
, message
->sender
);
417 if (message
->verp_delims
== 0 || var_verp_bounce_off
)
418 adefer_flush(BOUNCE_FLAG_KEEP
,
425 qmgr_active_done_3_defer_flush
,
428 adefer_flush_verp(BOUNCE_FLAG_KEEP
,
435 message
->verp_delims
,
436 qmgr_active_done_3_defer_flush
,
439 } else if (message
->warn_time
> 0
440 && event_time() >= message
->warn_time
- 1) {
442 msg_info("%s: sending defer warning for %s", myname
, message
->queue_id
);
443 adefer_warn(BOUNCE_FLAG_KEEP
,
450 qmgr_active_done_3_defer_warn
,
457 * Asynchronous processing does not reach this point.
459 qmgr_active_done_3_generic(message
);
462 /* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
464 static void qmgr_active_done_3_defer_warn(int status
, char *context
)
466 QMGR_MESSAGE
*message
= (QMGR_MESSAGE
*) context
;
469 * Process adefer_warn() completion status and continue processing.
472 qmgr_message_update_warn(message
);
473 qmgr_active_done_3_generic(message
);
476 /* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
478 static void qmgr_active_done_3_defer_flush(int status
, char *context
)
480 QMGR_MESSAGE
*message
= (QMGR_MESSAGE
*) context
;
483 * Process adefer_flush() status and continue processing.
485 message
->flags
= status
;
486 qmgr_active_done_3_generic(message
);
489 /* qmgr_active_done_3_generic - continue processing */
491 static void qmgr_active_done_3_generic(QMGR_MESSAGE
*message
)
493 const char *myname
= "qmgr_active_done_3_generic";
497 * Some recipients need to be tried again. Move the queue file time
498 * stamps into the future by the amount of time that the message is
499 * delayed, and move the message to the deferred queue. Impose minimal
500 * and maximal backoff times.
502 * Since we look at actual time in queue, not time since last delivery
503 * attempt, backoff times will be distributed. However, we can still see
504 * spikes in delivery activity because the interval between deferred
505 * queue scans is finite.
507 if (message
->flags
) {
508 if (message
->create_time
> 0) {
509 delay
= event_time() - message
->create_time
;
510 if (delay
> var_max_backoff_time
)
511 delay
= var_max_backoff_time
;
512 if (delay
< var_min_backoff_time
)
513 delay
= var_min_backoff_time
;
515 delay
= var_min_backoff_time
;
517 qmgr_active_defer(message
->queue_name
, message
->queue_id
,
518 MAIL_QUEUE_DEFERRED
, delay
);
522 * All recipients done. Remove the queue file.
525 if (mail_queue_remove(message
->queue_name
, message
->queue_id
)) {
527 msg_fatal("%s: remove %s from %s: %m", myname
,
528 message
->queue_id
, message
->queue_name
);
529 msg_warn("%s: remove %s from %s: %m", myname
,
530 message
->queue_id
, message
->queue_name
);
532 /* Same format as logged by postsuper. */
533 msg_info("%s: removed", message
->queue_id
);
538 * Finally, delete the in-core message structure.
540 qmgr_message_free(message
);
543 /* qmgr_active_drain - drain active queue by allocating a delivery process */
545 void qmgr_active_drain(void)
547 QMGR_TRANSPORT
*transport
;
550 * Allocate one delivery process for every transport with pending mail.
551 * The process allocation completes asynchronously.
553 while ((transport
= qmgr_transport_select()) != 0) {
555 msg_info("qmgr_active_drain: allocate %s", transport
->name
);
556 qmgr_transport_alloc(transport
, qmgr_deliver
);