No empty .Rs/.Re
[netbsd-mini2440.git] / external / ibm-public / postfix / dist / src / qmgr / qmgr_queue.c
blob6069ef39538bc4ce57e0a2f4524e85a2c49c139e
1 /* $NetBSD$ */
3 /*++
4 /* NAME
5 /* qmgr_queue 3
6 /* SUMMARY
7 /* per-destination queues
8 /* SYNOPSIS
9 /* #include "qmgr.h"
11 /* int qmgr_queue_count;
13 /* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
14 /* QMGR_TRANSPORT *transport;
15 /* const char *name;
16 /* const char *nexthop;
18 /* void qmgr_queue_done(queue)
19 /* QMGR_QUEUE *queue;
21 /* QMGR_QUEUE *qmgr_queue_find(transport, name)
22 /* QMGR_TRANSPORT *transport;
23 /* const char *name;
25 /* void qmgr_queue_throttle(queue, dsn)
26 /* QMGR_QUEUE *queue;
27 /* DSN *dsn;
29 /* void qmgr_queue_unthrottle(queue)
30 /* QMGR_QUEUE *queue;
32 /* void qmgr_queue_suspend(queue, delay)
33 /* QMGR_QUEUE *queue;
34 /* int delay;
35 /* DESCRIPTION
36 /* These routines add/delete/manipulate per-destination queues.
37 /* Each queue corresponds to a specific transport and destination.
38 /* Each queue has a `todo' list of delivery requests for that
39 /* destination, and a `busy' list of delivery requests in progress.
41 /* qmgr_queue_count is a global counter for the total number
42 /* of in-core queue structures.
44 /* qmgr_queue_create() creates an empty named queue for the named
45 /* transport and destination. The queue is given an initial
46 /* concurrency limit as specified with the
47 /* \fIinitial_destination_concurrency\fR configuration parameter,
48 /* provided that it does not exceed the transport-specific
49 /* concurrency limit.
51 /* qmgr_queue_done() disposes of a per-destination queue after all
52 /* its entries have been taken care of. It is an error to dispose
53 /* of a dead queue.
55 /* qmgr_queue_find() looks up the named queue for the named
56 /* transport. A null result means that the queue was not found.
58 /* qmgr_queue_throttle() handles a delivery error, and decrements the
59 /* concurrency limit for the destination, with a lower bound of 1.
60 /* When the cohort failure bound is reached, qmgr_queue_throttle()
61 /* sets the concurrency limit to zero and starts a timer
62 /* to re-enable delivery to the destination after a configurable delay.
64 /* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
65 /* The concurrency limit for the destination is incremented,
66 /* provided that it does not exceed the destination concurrency
67 /* limit specified for the transport. This routine implements
68 /* "slow open" mode, and eliminates the "thundering herd" problem.
70 /* qmgr_queue_suspend() suspends delivery for this destination
71 /* briefly. This function invalidates any scheduling decisions
72 /* that are based on the present queue's concurrency window.
73 /* To compensate for work skipped by qmgr_entry_done(), the
74 /* status of blocker jobs is re-evaluated after the queue is
75 /* resumed.
76 /* DIAGNOSTICS
77 /* Panic: consistency check failure.
78 /* LICENSE
79 /* .ad
80 /* .fi
81 /* The Secure Mailer license must be distributed with this software.
82 /* AUTHOR(S)
83 /* Wietse Venema
84 /* IBM T.J. Watson Research
85 /* P.O. Box 704
86 /* Yorktown Heights, NY 10598, USA
88 /* Pre-emptive scheduler enhancements:
89 /* Patrik Rak
90 /* Modra 6
91 /* 155 00, Prague, Czech Republic
93 /* Concurrency scheduler enhancements with:
94 /* Victor Duchovni
95 /* Morgan Stanley
96 /*--*/
98 /* System library. */
100 #include <sys_defs.h>
101 #include <time.h>
103 /* Utility library. */
105 #include <msg.h>
106 #include <mymalloc.h>
107 #include <events.h>
108 #include <htable.h>
110 /* Global library. */
112 #include <mail_params.h>
113 #include <recipient_list.h>
114 #include <mail_proto.h> /* QMGR_LOG_WINDOW */
116 /* Application-specific. */
118 #include "qmgr.h"
120 int qmgr_queue_count;
122 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
123 (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
124 || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
126 #define QMGR_LOG_FEEDBACK(feedback) \
127 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
128 msg_info("%s: feedback %g", myname, feedback);
130 #define QMGR_LOG_WINDOW(queue) \
131 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
132 msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
133 myname, queue->name, queue->transport->dest_concurrency_limit, \
134 queue->window, queue->success, queue->failure, queue->fail_cohorts);
136 /* qmgr_queue_resume - resume delivery to destination */
138 static void qmgr_queue_resume(int event, char *context)
140 QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
141 const char *myname = "qmgr_queue_resume";
144 * Sanity checks.
146 if (!QMGR_QUEUE_SUSPENDED(queue))
147 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
150 * We can't simply force delivery on this queue: the transport's pending
151 * count may already be maxed out, and there may be other constraints
152 * that definitely should be none of our business. The best we can do is
153 * to play by the same rules as everyone else: let qmgr_active_drain()
154 * and round-robin selection take care of message selection.
156 queue->window = 1;
159 * Every event handler that leaves a queue in the "ready" state should
160 * remove the queue when it is empty.
162 * XXX Do not omit the redundant test below. It is here to simplify code
163 * consistency checks. The check is trivially eliminated by the compiler
164 * optimizer. There is no need to sacrifice code clarity for the sake of
165 * performance.
167 * XXX Do not expose the blocker job logic here. Rate-limited queues are not
168 * a performance-critical feature. Here, too, there is no need to sacrifice
169 * code clarity for the sake of performance.
171 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
172 qmgr_queue_done(queue);
173 else
174 qmgr_job_blocker_update(queue);
177 /* qmgr_queue_suspend - briefly suspend a destination */
179 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
181 const char *myname = "qmgr_queue_suspend";
184 * Sanity checks.
186 if (!QMGR_QUEUE_READY(queue))
187 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
188 if (queue->busy_refcount > 0)
189 msg_panic("%s: queue is busy", myname);
192 * Set the queue status to "suspended". No-one is supposed to remove a
193 * queue in suspended state.
195 queue->window = QMGR_QUEUE_STAT_SUSPENDED;
196 event_request_timer(qmgr_queue_resume, (char *) queue, delay);
199 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
201 static void qmgr_queue_unthrottle_wrapper(int unused_event, char *context)
203 QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
206 * This routine runs when a wakeup timer goes off; it does not run in the
207 * context of some queue manipulation. Therefore, it is safe to discard
208 * this in-core queue when it is empty and when this site is not dead.
210 qmgr_queue_unthrottle(queue);
211 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
212 qmgr_queue_done(queue);
215 /* qmgr_queue_unthrottle - give this destination another chance */
217 void qmgr_queue_unthrottle(QMGR_QUEUE *queue)
219 const char *myname = "qmgr_queue_unthrottle";
220 QMGR_TRANSPORT *transport = queue->transport;
221 double feedback;
223 if (msg_verbose)
224 msg_info("%s: queue %s", myname, queue->name);
227 * Sanity checks.
229 if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue))
230 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
233 * Don't restart the negative feedback hysteresis cycle with every
234 * positive feedback. Restart it only when we make a positive concurrency
235 * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
236 * Otherwise negative feedback would be too aggressive: negative feedback
237 * takes effect immediately at the start of its hysteresis cycle.
239 queue->fail_cohorts = 0;
242 * Special case when this site was dead.
244 if (QMGR_QUEUE_THROTTLED(queue)) {
245 event_cancel_timer(qmgr_queue_unthrottle_wrapper, (char *) queue);
246 if (queue->dsn == 0)
247 msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
248 dsn_free(queue->dsn);
249 queue->dsn = 0;
250 /* Back from the almost grave, best concurrency is anyone's guess. */
251 if (queue->busy_refcount > 0)
252 queue->window = queue->busy_refcount;
253 else
254 queue->window = transport->init_dest_concurrency;
255 queue->success = queue->failure = 0;
256 QMGR_LOG_WINDOW(queue);
257 return;
261 * Increase the destination's concurrency limit until we reach the
262 * transport's concurrency limit. Allow for a margin the size of the
263 * initial destination concurrency, so that we're not too gentle.
265 * Why is the concurrency increment based on preferred concurrency and not
266 * on the number of outstanding delivery requests? The latter fluctuates
267 * wildly when deliveries complete in bursts (artificial benchmark
268 * measurements), and does not account for cached connections.
270 * Keep the window within reasonable distance from actual concurrency
271 * otherwise negative feedback will be ineffective. This expression
272 * assumes that busy_refcount changes gradually. This is invalid when
273 * deliveries complete in bursts (artificial benchmark measurements).
275 if (transport->dest_concurrency_limit == 0
276 || transport->dest_concurrency_limit > queue->window)
277 if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
278 feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
279 QMGR_LOG_FEEDBACK(feedback);
280 queue->success += feedback;
281 /* Prepare for overshoot (feedback > hysteresis, rounding error). */
282 while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
283 queue->window += transport->pos_feedback.hysteresis;
284 queue->success -= transport->pos_feedback.hysteresis;
285 queue->failure = 0;
287 /* Prepare for overshoot. */
288 if (transport->dest_concurrency_limit > 0
289 && queue->window > transport->dest_concurrency_limit)
290 queue->window = transport->dest_concurrency_limit;
292 QMGR_LOG_WINDOW(queue);
295 /* qmgr_queue_throttle - handle destination delivery failure */
297 void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
299 const char *myname = "qmgr_queue_throttle";
300 QMGR_TRANSPORT *transport = queue->transport;
301 double feedback;
304 * Sanity checks.
306 if (!QMGR_QUEUE_READY(queue))
307 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
308 if (queue->dsn)
309 msg_panic("%s: queue %s: spurious reason %s",
310 myname, queue->name, queue->dsn->reason);
311 if (msg_verbose)
312 msg_info("%s: queue %s: %s %s",
313 myname, queue->name, dsn->status, dsn->reason);
316 * Don't restart the positive feedback hysteresis cycle with every
317 * negative feedback. Restart it only when we make a negative concurrency
318 * adjustment (i.e. at the start of a negative feedback hysteresis
319 * cycle). Otherwise positive feedback would be too weak (positive
320 * feedback does not take effect until the end of its hysteresis cycle).
324 * This queue is declared dead after a configurable number of
325 * pseudo-cohort failures.
327 if (QMGR_QUEUE_READY(queue)) {
328 queue->fail_cohorts += 1.0 / queue->window;
329 if (transport->fail_cohort_limit > 0
330 && queue->fail_cohorts >= transport->fail_cohort_limit)
331 queue->window = QMGR_QUEUE_STAT_THROTTLED;
335 * Decrease the destination's concurrency limit until we reach 1. Base
336 * adjustments on the concurrency limit itself, instead of using the
337 * actual concurrency. The latter fluctuates wildly when deliveries
338 * complete in bursts (artificial benchmark measurements).
340 * Even after reaching 1, we maintain the negative hysteresis cycle so that
341 * negative feedback can cancel out positive feedback.
343 if (QMGR_QUEUE_READY(queue)) {
344 feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
345 QMGR_LOG_FEEDBACK(feedback);
346 queue->failure -= feedback;
347 /* Prepare for overshoot (feedback > hysteresis, rounding error). */
348 while (queue->failure - feedback / 2 < 0) {
349 queue->window -= transport->neg_feedback.hysteresis;
350 queue->success = 0;
351 queue->failure += transport->neg_feedback.hysteresis;
353 /* Prepare for overshoot. */
354 if (queue->window < 1)
355 queue->window = 1;
359 * Special case for a site that just was declared dead.
361 if (QMGR_QUEUE_THROTTLED(queue)) {
362 queue->dsn = DSN_COPY(dsn);
363 event_request_timer(qmgr_queue_unthrottle_wrapper,
364 (char *) queue, var_min_backoff_time);
365 queue->dflags = 0;
367 QMGR_LOG_WINDOW(queue);
370 /* qmgr_queue_done - delete in-core queue for site */
372 void qmgr_queue_done(QMGR_QUEUE *queue)
374 const char *myname = "qmgr_queue_done";
375 QMGR_TRANSPORT *transport = queue->transport;
378 * Sanity checks. It is an error to delete an in-core queue with pending
379 * messages or timers.
381 if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
382 msg_panic("%s: refcount: %d", myname,
383 queue->busy_refcount + queue->todo_refcount);
384 if (queue->todo.next || queue->busy.next)
385 msg_panic("%s: queue not empty: %s", myname, queue->name);
386 if (!QMGR_QUEUE_READY(queue))
387 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
388 if (queue->dsn)
389 msg_panic("%s: queue %s: spurious reason %s",
390 myname, queue->name, queue->dsn->reason);
393 * Clean up this in-core queue.
395 QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers);
396 htable_delete(transport->queue_byname, queue->name, (void (*) (char *)) 0);
397 myfree(queue->name);
398 myfree(queue->nexthop);
399 qmgr_queue_count--;
400 myfree((char *) queue);
403 /* qmgr_queue_create - create in-core queue for site */
405 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
406 const char *nexthop)
408 QMGR_QUEUE *queue;
411 * If possible, choose an initial concurrency of > 1 so that one bad
412 * message or one bad network won't slow us down unnecessarily.
415 queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
416 qmgr_queue_count++;
417 queue->dflags = 0;
418 queue->last_done = 0;
419 queue->name = mystrdup(name);
420 queue->nexthop = mystrdup(nexthop);
421 queue->todo_refcount = 0;
422 queue->busy_refcount = 0;
423 queue->transport = transport;
424 queue->window = transport->init_dest_concurrency;
425 queue->success = queue->failure = queue->fail_cohorts = 0;
426 QMGR_LIST_INIT(queue->todo);
427 QMGR_LIST_INIT(queue->busy);
428 queue->dsn = 0;
429 queue->clog_time_to_warn = 0;
430 queue->blocker_tag = 0;
431 QMGR_LIST_APPEND(transport->queue_list, queue, peers);
432 htable_enter(transport->queue_byname, name, (char *) queue);
433 return (queue);
436 /* qmgr_queue_find - find in-core named queue */
438 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
440 return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));