Expand PMF_FN_* macros.
[netbsd-mini2440.git] / external / ibm-public / postfix / dist / src / oqmgr / qmgr_queue.c
blobfe8727fdc9da03065d0a21fb231389725ef5e1af
1 /* $NetBSD$ */
3 /*++
4 /* NAME
5 /* qmgr_queue 3
6 /* SUMMARY
7 /* per-destination queues
8 /* SYNOPSIS
9 /* #include "qmgr.h"
11 /* int qmgr_queue_count;
13 /* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
14 /* QMGR_TRANSPORT *transport;
15 /* const char *name;
16 /* const char *nexthop;
18 /* void qmgr_queue_done(queue)
19 /* QMGR_QUEUE *queue;
21 /* QMGR_QUEUE *qmgr_queue_find(transport, name)
22 /* QMGR_TRANSPORT *transport;
23 /* const char *name;
25 /* QMGR_QUEUE *qmgr_queue_select(transport)
26 /* QMGR_TRANSPORT *transport;
28 /* void qmgr_queue_throttle(queue, dsn)
29 /* QMGR_QUEUE *queue;
30 /* DSN *dsn;
32 /* void qmgr_queue_unthrottle(queue)
33 /* QMGR_QUEUE *queue;
35 /* void qmgr_queue_suspend(queue, delay)
36 /* QMGR_QUEUE *queue;
37 /* int delay;
38 /* DESCRIPTION
39 /* These routines add/delete/manipulate per-destination queues.
40 /* Each queue corresponds to a specific transport and destination.
41 /* Each queue has a `todo' list of delivery requests for that
42 /* destination, and a `busy' list of delivery requests in progress.
44 /* qmgr_queue_count is a global counter for the total number
45 /* of in-core queue structures.
47 /* qmgr_queue_create() creates an empty named queue for the named
48 /* transport and destination. The queue is given an initial
49 /* concurrency limit as specified with the
50 /* \fIinitial_destination_concurrency\fR configuration parameter,
51 /* provided that it does not exceed the transport-specific
52 /* concurrency limit.
54 /* qmgr_queue_done() disposes of a per-destination queue after all
55 /* its entries have been taken care of. It is an error to dispose
56 /* of a dead queue.
58 /* qmgr_queue_find() looks up the named queue for the named
59 /* transport. A null result means that the queue was not found.
61 /* qmgr_queue_select() uses a round-robin strategy to select
62 /* from the named transport one per-destination queue with a
63 /* non-empty `todo' list.
65 /* qmgr_queue_throttle() handles a delivery error, and decrements the
66 /* concurrency limit for the destination, with a lower bound of 1.
67 /* When the cohort failure bound is reached, qmgr_queue_throttle()
68 /* sets the concurrency limit to zero and starts a timer
69 /* to re-enable delivery to the destination after a configurable delay.
71 /* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
72 /* The concurrency limit for the destination is incremented,
73 /* provided that it does not exceed the destination concurrency
74 /* limit specified for the transport. This routine implements
75 /* "slow open" mode, and eliminates the "thundering herd" problem.
77 /* qmgr_queue_suspend() suspends delivery for this destination
78 /* briefly.
79 /* DIAGNOSTICS
80 /* Panic: consistency check failure.
81 /* LICENSE
82 /* .ad
83 /* .fi
84 /* The Secure Mailer license must be distributed with this software.
85 /* AUTHOR(S)
86 /* Wietse Venema
87 /* IBM T.J. Watson Research
88 /* P.O. Box 704
89 /* Yorktown Heights, NY 10598, USA
90 /*--*/
92 /* System library. */
94 #include <sys_defs.h>
95 #include <time.h>
97 /* Utility library. */
99 #include <msg.h>
100 #include <mymalloc.h>
101 #include <events.h>
102 #include <htable.h>
104 /* Global library. */
106 #include <mail_params.h>
107 #include <recipient_list.h>
108 #include <mail_proto.h> /* QMGR_LOG_WINDOW */
110 /* Application-specific. */
112 #include "qmgr.h"
114 int qmgr_queue_count;
116 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
117 (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
118 || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
120 #define QMGR_LOG_FEEDBACK(feedback) \
121 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
122 msg_info("%s: feedback %g", myname, feedback);
124 #define QMGR_LOG_WINDOW(queue) \
125 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
126 msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
127 myname, queue->name, queue->transport->dest_concurrency_limit, \
128 queue->window, queue->success, queue->failure, queue->fail_cohorts);
130 /* qmgr_queue_resume - resume delivery to destination */
132 static void qmgr_queue_resume(int event, char *context)
134 QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
135 const char *myname = "qmgr_queue_resume";
138 * Sanity checks.
140 if (!QMGR_QUEUE_SUSPENDED(queue))
141 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
144 * We can't simply force delivery on this queue: the transport's pending
145 * count may already be maxed out, and there may be other constraints
146 * that definitely should be none of our business. The best we can do is
147 * to play by the same rules as everyone else: let qmgr_active_drain()
148 * and round-robin selection take care of message selection.
150 queue->window = 1;
153 * Every event handler that leaves a queue in the "ready" state should
154 * remove the queue when it is empty.
156 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
157 qmgr_queue_done(queue);
160 /* qmgr_queue_suspend - briefly suspend a destination */
162 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
164 const char *myname = "qmgr_queue_suspend";
167 * Sanity checks.
169 if (!QMGR_QUEUE_READY(queue))
170 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
171 if (queue->busy_refcount > 0)
172 msg_panic("%s: queue is busy", myname);
175 * Set the queue status to "suspended". No-one is supposed to remove a
176 * queue in suspended state.
178 queue->window = QMGR_QUEUE_STAT_SUSPENDED;
179 event_request_timer(qmgr_queue_resume, (char *) queue, delay);
182 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
184 static void qmgr_queue_unthrottle_wrapper(int unused_event, char *context)
186 QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
189 * This routine runs when a wakeup timer goes off; it does not run in the
190 * context of some queue manipulation. Therefore, it is safe to discard
191 * this in-core queue when it is empty and when this site is not dead.
193 qmgr_queue_unthrottle(queue);
194 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
195 qmgr_queue_done(queue);
198 /* qmgr_queue_unthrottle - give this destination another chance */
200 void qmgr_queue_unthrottle(QMGR_QUEUE *queue)
202 const char *myname = "qmgr_queue_unthrottle";
203 QMGR_TRANSPORT *transport = queue->transport;
204 double feedback;
206 if (msg_verbose)
207 msg_info("%s: queue %s", myname, queue->name);
210 * Sanity checks.
212 if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue))
213 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
216 * Don't restart the negative feedback hysteresis cycle with every
217 * positive feedback. Restart it only when we make a positive concurrency
218 * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
219 * Otherwise negative feedback would be too aggressive: negative feedback
220 * takes effect immediately at the start of its hysteresis cycle.
222 queue->fail_cohorts = 0;
225 * Special case when this site was dead.
227 if (QMGR_QUEUE_THROTTLED(queue)) {
228 event_cancel_timer(qmgr_queue_unthrottle_wrapper, (char *) queue);
229 if (queue->dsn == 0)
230 msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
231 dsn_free(queue->dsn);
232 queue->dsn = 0;
233 /* Back from the almost grave, best concurrency is anyone's guess. */
234 if (queue->busy_refcount > 0)
235 queue->window = queue->busy_refcount;
236 else
237 queue->window = transport->init_dest_concurrency;
238 queue->success = queue->failure = 0;
239 QMGR_LOG_WINDOW(queue);
240 return;
244 * Increase the destination's concurrency limit until we reach the
245 * transport's concurrency limit. Allow for a margin the size of the
246 * initial destination concurrency, so that we're not too gentle.
248 * Why is the concurrency increment based on preferred concurrency and not
249 * on the number of outstanding delivery requests? The latter fluctuates
250 * wildly when deliveries complete in bursts (artificial benchmark
251 * measurements), and does not account for cached connections.
253 * Keep the window within reasonable distance from actual concurrency
254 * otherwise negative feedback will be ineffective. This expression
255 * assumes that busy_refcount changes gradually. This is invalid when
256 * deliveries complete in bursts (artificial benchmark measurements).
258 if (transport->dest_concurrency_limit == 0
259 || transport->dest_concurrency_limit > queue->window)
260 if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
261 feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
262 QMGR_LOG_FEEDBACK(feedback);
263 queue->success += feedback;
264 /* Prepare for overshoot (feedback > hysteresis, rounding error). */
265 while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
266 queue->window += transport->pos_feedback.hysteresis;
267 queue->success -= transport->pos_feedback.hysteresis;
268 queue->failure = 0;
270 /* Prepare for overshoot. */
271 if (transport->dest_concurrency_limit > 0
272 && queue->window > transport->dest_concurrency_limit)
273 queue->window = transport->dest_concurrency_limit;
275 QMGR_LOG_WINDOW(queue);
278 /* qmgr_queue_throttle - handle destination delivery failure */
280 void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
282 const char *myname = "qmgr_queue_throttle";
283 QMGR_TRANSPORT *transport = queue->transport;
284 double feedback;
287 * Sanity checks.
289 if (!QMGR_QUEUE_READY(queue))
290 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
291 if (queue->dsn)
292 msg_panic("%s: queue %s: spurious reason %s",
293 myname, queue->name, queue->dsn->reason);
294 if (msg_verbose)
295 msg_info("%s: queue %s: %s %s",
296 myname, queue->name, dsn->status, dsn->reason);
299 * Don't restart the positive feedback hysteresis cycle with every
300 * negative feedback. Restart it only when we make a negative concurrency
301 * adjustment (i.e. at the start of a negative feedback hysteresis
302 * cycle). Otherwise positive feedback would be too weak (positive
303 * feedback does not take effect until the end of its hysteresis cycle).
307 * This queue is declared dead after a configurable number of
308 * pseudo-cohort failures.
310 if (QMGR_QUEUE_READY(queue)) {
311 queue->fail_cohorts += 1.0 / queue->window;
312 if (transport->fail_cohort_limit > 0
313 && queue->fail_cohorts >= transport->fail_cohort_limit)
314 queue->window = QMGR_QUEUE_STAT_THROTTLED;
318 * Decrease the destination's concurrency limit until we reach 1. Base
319 * adjustments on the concurrency limit itself, instead of using the
320 * actual concurrency. The latter fluctuates wildly when deliveries
321 * complete in bursts (artificial benchmark measurements).
323 * Even after reaching 1, we maintain the negative hysteresis cycle so that
324 * negative feedback can cancel out positive feedback.
326 if (QMGR_QUEUE_READY(queue)) {
327 feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
328 QMGR_LOG_FEEDBACK(feedback);
329 queue->failure -= feedback;
330 /* Prepare for overshoot (feedback > hysteresis, rounding error). */
331 while (queue->failure - feedback / 2 < 0) {
332 queue->window -= transport->neg_feedback.hysteresis;
333 queue->success = 0;
334 queue->failure += transport->neg_feedback.hysteresis;
336 /* Prepare for overshoot. */
337 if (queue->window < 1)
338 queue->window = 1;
342 * Special case for a site that just was declared dead.
344 if (QMGR_QUEUE_THROTTLED(queue)) {
345 queue->dsn = DSN_COPY(dsn);
346 event_request_timer(qmgr_queue_unthrottle_wrapper,
347 (char *) queue, var_min_backoff_time);
348 queue->dflags = 0;
350 QMGR_LOG_WINDOW(queue);
353 /* qmgr_queue_select - select in-core queue for delivery */
355 QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *transport)
357 QMGR_QUEUE *queue;
360 * If we find a suitable site, rotate the list to enforce round-robin
361 * selection. See similar selection code in qmgr_transport_select().
363 for (queue = transport->queue_list.next; queue; queue = queue->peers.next) {
364 if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
365 QMGR_LIST_ROTATE(transport->queue_list, queue);
366 if (msg_verbose)
367 msg_info("qmgr_queue_select: %s", queue->name);
368 return (queue);
371 return (0);
374 /* qmgr_queue_done - delete in-core queue for site */
376 void qmgr_queue_done(QMGR_QUEUE *queue)
378 const char *myname = "qmgr_queue_done";
379 QMGR_TRANSPORT *transport = queue->transport;
382 * Sanity checks. It is an error to delete an in-core queue with pending
383 * messages or timers.
385 if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
386 msg_panic("%s: refcount: %d", myname,
387 queue->busy_refcount + queue->todo_refcount);
388 if (queue->todo.next || queue->busy.next)
389 msg_panic("%s: queue not empty: %s", myname, queue->name);
390 if (!QMGR_QUEUE_READY(queue))
391 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
392 if (queue->dsn)
393 msg_panic("%s: queue %s: spurious reason %s",
394 myname, queue->name, queue->dsn->reason);
397 * Clean up this in-core queue.
399 QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue);
400 htable_delete(transport->queue_byname, queue->name, (void (*) (char *)) 0);
401 myfree(queue->name);
402 myfree(queue->nexthop);
403 qmgr_queue_count--;
404 myfree((char *) queue);
407 /* qmgr_queue_create - create in-core queue for site */
409 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
410 const char *nexthop)
412 QMGR_QUEUE *queue;
415 * If possible, choose an initial concurrency of > 1 so that one bad
416 * message or one bad network won't slow us down unnecessarily.
419 queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
420 qmgr_queue_count++;
421 queue->dflags = 0;
422 queue->last_done = 0;
423 queue->name = mystrdup(name);
424 queue->nexthop = mystrdup(nexthop);
425 queue->todo_refcount = 0;
426 queue->busy_refcount = 0;
427 queue->transport = transport;
428 queue->window = transport->init_dest_concurrency;
429 queue->success = queue->failure = queue->fail_cohorts = 0;
430 QMGR_LIST_INIT(queue->todo);
431 QMGR_LIST_INIT(queue->busy);
432 queue->dsn = 0;
433 queue->clog_time_to_warn = 0;
434 QMGR_LIST_PREPEND(transport->queue_list, queue);
435 htable_enter(transport->queue_byname, name, (char *) queue);
436 return (queue);
439 /* qmgr_queue_find - find in-core named queue */
441 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
443 return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));