7 /* per-site queue entries
11 /* QMGR_ENTRY *qmgr_entry_create(peer, message)
13 /* QMGR_MESSAGE *message;
15 /* void qmgr_entry_done(entry, which)
19 /* QMGR_ENTRY *qmgr_entry_select(queue)
22 /* void qmgr_entry_unselect(queue, entry)
26 /* void qmgr_entry_move_todo(dst, entry)
30 /* These routines add/delete/manipulate per-site message
33 /* qmgr_entry_create() creates an entry for the named peer and message,
34 /* and appends the entry to the peer's list and its queue's todo list.
35 /* Filling in and cleaning up the recipients is the responsibility
38 /* qmgr_entry_done() discards a per-site queue entry. The
39 /* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry
40 /* of the site's `busy' list (i.e. queue entries that have been
41 /* selected for actual delivery), or QMGR_QUEUE_TODO for an entry
42 /* of the site's `todo' list (i.e. queue entries awaiting selection
43 /* for actual delivery).
45 /* qmgr_entry_done() discards its peer structure when the peer
46 /* is not referenced anymore.
48 /* qmgr_entry_done() triggers cleanup of the per-site queue when
49 /* the site has no pending deliveries, and the site is either
50 /* alive, or the site is dead and the number of in-core queues
51 /* exceeds a configurable limit (see qmgr_queue_done()).
53 /* qmgr_entry_done() triggers special action when the last in-core
54 /* queue entry for a message is done with: either read more
55 /* recipients from the queue file, delete the queue file, or move
56 /* the queue file to the deferred queue; send bounce reports to the
57 /* message originator (see qmgr_active_done()).
59 /* qmgr_entry_select() selects first entry from the named
60 /* per-site queue's `todo' list for actual delivery. The entry is
61 /* moved to the queue's `busy' list: the list of messages being
62 /* delivered. The entry is also removed from its peer list.
64 /* qmgr_entry_unselect() takes the named entry off the named
65 /* per-site queue's `busy' list and moves it to the queue's
66 /* `todo' list. The entry is also prepended to its peer list again.
68 /* qmgr_entry_move_todo() moves the specified "todo" queue entry
69 /* to the specified "todo" queue.
71 /* Panic: interface violations, internal inconsistencies.
75 /* The Secure Mailer license must be distributed with this software.
78 /* IBM T.J. Watson Research
80 /* Yorktown Heights, NY 10598, USA
82 /* Preemptive scheduler enhancements:
85 /* 155 00, Prague, Czech Republic
94 /* Utility library. */
101 /* Global library. */
103 #include <mail_params.h>
104 #include <deliver_request.h> /* opportunistic session caching */
106 /* Application-specific. */
110 /* qmgr_entry_select - select queue entry for delivery */
112 QMGR_ENTRY
*qmgr_entry_select(QMGR_PEER
*peer
)
114 const char *myname
= "qmgr_entry_select";
118 if ((entry
= peer
->entry_list
.next
) != 0) {
119 queue
= entry
->queue
;
120 QMGR_LIST_UNLINK(queue
->todo
, QMGR_ENTRY
*, entry
, queue_peers
);
121 queue
->todo_refcount
--;
122 QMGR_LIST_APPEND(queue
->busy
, entry
, queue_peers
);
123 queue
->busy_refcount
++;
124 QMGR_LIST_UNLINK(peer
->entry_list
, QMGR_ENTRY
*, entry
, peer_peers
);
125 peer
->job
->selected_entries
++;
128 * With opportunistic session caching, the delivery agent must not
129 * only 1) save a session upon completion, but also 2) reuse a cached
130 * session upon the next delivery request. In order to not miss out
131 * on 2), we have to make caching sticky or else we get silly
132 * behavior when the in-memory queue drains. Specifically, new
133 * connections must not be made as long as cached connections exist.
135 * Safety: don't enable opportunistic session caching unless the queue
136 * manager is able to schedule concurrent or back-to-back deliveries
137 * (we need to recognize back-to-back deliveries for transports with
140 * If caching has previously been enabled, but is not now, fetch any
141 * existing entries from the cache, but don't add new ones.
143 #define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \
144 (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY())
146 #define BACK_TO_BACK_DELIVERY() \
147 (queue->last_done + 1 >= event_time())
150 * Turn on session caching after we get up to speed. Don't enable
151 * session caching just because we have concurrent deliveries. This
152 * prevents unnecessary session caching when we have a burst of mail
153 * <= the initial concurrency limit.
155 if ((queue
->dflags
& DEL_REQ_FLAG_CONN_STORE
) == 0) {
156 if (BACK_TO_BACK_DELIVERY()) {
158 msg_info("%s: allowing on-demand session caching for %s",
159 myname
, queue
->name
);
160 queue
->dflags
|= DEL_REQ_FLAG_CONN_MASK
;
165 * Turn off session caching when concurrency drops and we're running
166 * out of steam. This is what prevents from turning off session
167 * caching too early, and from making new connections while old ones
171 if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) {
173 msg_info("%s: disallowing on-demand session caching for %s",
174 myname
, queue
->name
);
175 queue
->dflags
&= ~DEL_REQ_FLAG_CONN_STORE
;
182 /* qmgr_entry_unselect - unselect queue entry for delivery */
184 void qmgr_entry_unselect(QMGR_ENTRY
*entry
)
186 QMGR_PEER
*peer
= entry
->peer
;
187 QMGR_QUEUE
*queue
= entry
->queue
;
190 * Move the entry back to the todo lists. In case of the peer list, put
191 * it back to the beginning, so the select()/unselect() does not reorder
192 * entries. We use this in qmgr_message_assign() to put recipients into
193 * existing entries when possible.
195 QMGR_LIST_UNLINK(queue
->busy
, QMGR_ENTRY
*, entry
, queue_peers
);
196 queue
->busy_refcount
--;
197 QMGR_LIST_APPEND(queue
->todo
, entry
, queue_peers
);
198 queue
->todo_refcount
++;
199 QMGR_LIST_PREPEND(peer
->entry_list
, entry
, peer_peers
);
200 peer
->job
->selected_entries
--;
203 /* qmgr_entry_move_todo - move entry between todo queues */
205 void qmgr_entry_move_todo(QMGR_QUEUE
*dst_queue
, QMGR_ENTRY
*entry
)
207 const char *myname
= "qmgr_entry_move_todo";
208 QMGR_TRANSPORT
*dst_transport
= dst_queue
->transport
;
209 QMGR_MESSAGE
*message
= entry
->message
;
210 QMGR_QUEUE
*src_queue
= entry
->queue
;
211 QMGR_PEER
*dst_peer
, *src_peer
= entry
->peer
;
212 QMGR_JOB
*dst_job
, *src_job
= src_peer
->job
;
213 QMGR_ENTRY
*new_entry
;
214 int rcpt_count
= entry
->rcpt_list
.len
;
216 if (entry
->stream
!= 0)
217 msg_panic("%s: queue %s entry is busy", myname
, src_queue
->name
);
218 if (QMGR_QUEUE_THROTTLED(dst_queue
))
219 msg_panic("%s: destination queue %s is throttled", myname
, dst_queue
->name
);
220 if (QMGR_TRANSPORT_THROTTLED(dst_transport
))
221 msg_panic("%s: destination transport %s is throttled",
222 myname
, dst_transport
->name
);
225 * Create new entry, swap the recipients between the two entries,
226 * adjusting the job counters accordingly, then dispose of the old entry.
228 * Note that qmgr_entry_done() will also take care of adjusting the
229 * recipient limits of all the message jobs, so we do not have to do that
230 * explicitly for the new job here.
232 * XXX This does not enforce the per-entry recipient limit, but that is not
233 * a problem as long as qmgr_entry_move_todo() is called only to bounce
236 dst_job
= qmgr_job_obtain(message
, dst_transport
);
237 dst_peer
= qmgr_peer_obtain(dst_job
, dst_queue
);
239 new_entry
= qmgr_entry_create(dst_peer
, message
);
241 recipient_list_swap(&entry
->rcpt_list
, &new_entry
->rcpt_list
);
243 src_job
->rcpt_count
-= rcpt_count
;
244 dst_job
->rcpt_count
+= rcpt_count
;
246 qmgr_entry_done(entry
, QMGR_QUEUE_TODO
);
249 /* qmgr_entry_done - dispose of queue entry */
251 void qmgr_entry_done(QMGR_ENTRY
*entry
, int which
)
253 const char *myname
= "qmgr_entry_done";
254 QMGR_QUEUE
*queue
= entry
->queue
;
255 QMGR_MESSAGE
*message
= entry
->message
;
256 QMGR_PEER
*peer
= entry
->peer
;
257 QMGR_JOB
*sponsor
, *job
= peer
->job
;
258 QMGR_TRANSPORT
*transport
= job
->transport
;
261 * Take this entry off the in-core queue.
263 if (entry
->stream
!= 0)
264 msg_panic("%s: file is open", myname
);
265 if (which
== QMGR_QUEUE_BUSY
) {
266 QMGR_LIST_UNLINK(queue
->busy
, QMGR_ENTRY
*, entry
, queue_peers
);
267 queue
->busy_refcount
--;
268 } else if (which
== QMGR_QUEUE_TODO
) {
269 QMGR_LIST_UNLINK(peer
->entry_list
, QMGR_ENTRY
*, entry
, peer_peers
);
270 job
->selected_entries
++;
271 QMGR_LIST_UNLINK(queue
->todo
, QMGR_ENTRY
*, entry
, queue_peers
);
272 queue
->todo_refcount
--;
274 msg_panic("%s: bad queue spec: %d", myname
, which
);
278 * Decrease the in-core recipient counts and free the recipient list and
279 * the structure itself.
281 job
->rcpt_count
-= entry
->rcpt_list
.len
;
282 message
->rcpt_count
-= entry
->rcpt_list
.len
;
283 qmgr_recipient_count
-= entry
->rcpt_list
.len
;
284 recipient_list_free(&entry
->rcpt_list
);
285 myfree((char *) entry
);
288 * Make sure that the transport of any retired or finishing job that
289 * donated recipient slots to this message gets them back first. Then, if
290 * possible, pass the remaining unused recipient slots to the next job on
293 for (sponsor
= message
->job_list
.next
; sponsor
; sponsor
= sponsor
->message_peers
.next
) {
294 if (sponsor
->rcpt_count
>= sponsor
->rcpt_limit
|| sponsor
== job
)
296 if (sponsor
->stack_level
< 0 || message
->rcpt_offset
== 0)
297 qmgr_job_move_limits(sponsor
);
299 if (message
->rcpt_offset
== 0) {
300 qmgr_job_move_limits(job
);
304 * We implement a rate-limited queue by emulating a slow delivery
305 * channel. We insert the artificial delays with qmgr_queue_suspend().
307 * When a queue is suspended, we must postpone any job scheduling decisions
308 * until the queue is resumed. Otherwise, we make those decisions now.
309 * The job scheduling decisions are made by qmgr_job_blocker_update().
311 if (which
== QMGR_QUEUE_BUSY
&& transport
->rate_delay
> 0) {
312 if (queue
->window
> 1)
313 msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
314 myname
, transport
->name
, queue
->name
, queue
->window
);
315 if (QMGR_QUEUE_THROTTLED(queue
)) /* XXX */
316 qmgr_queue_unthrottle(queue
);
317 if (QMGR_QUEUE_READY(queue
))
318 qmgr_queue_suspend(queue
, transport
->rate_delay
);
320 if (!QMGR_QUEUE_SUSPENDED(queue
)
321 && queue
->blocker_tag
== transport
->blocker_tag
)
322 qmgr_job_blocker_update(queue
);
325 * When there are no more entries for this peer, discard the peer
329 if (peer
->refcount
== 0)
330 qmgr_peer_free(peer
);
333 * Maintain back-to-back delivery status.
335 if (which
== QMGR_QUEUE_BUSY
)
336 queue
->last_done
= event_time();
339 * When the in-core queue for this site is empty and when this site is
340 * not dead or suspended, discard the in-core queue. When this site is
341 * dead, but the number of in-core queues exceeds some threshold, get rid
342 * of this in-core queue anyway, in order to avoid running out of memory.
344 if (queue
->todo
.next
== 0 && queue
->busy
.next
== 0) {
345 if (QMGR_QUEUE_THROTTLED(queue
) && qmgr_queue_count
> 2 * var_qmgr_rcpt_limit
)
346 qmgr_queue_unthrottle(queue
);
347 if (QMGR_QUEUE_READY(queue
))
348 qmgr_queue_done(queue
);
352 * Update the in-core message reference count. When the in-core message
353 * structure has no more references, dispose of the message.
356 if (message
->refcount
== 0)
357 qmgr_active_done(message
);
360 /* qmgr_entry_create - create queue todo entry */
362 QMGR_ENTRY
*qmgr_entry_create(QMGR_PEER
*peer
, QMGR_MESSAGE
*message
)
365 QMGR_QUEUE
*queue
= peer
->queue
;
370 if (QMGR_QUEUE_THROTTLED(queue
))
371 msg_panic("qmgr_entry_create: dead queue: %s", queue
->name
);
374 * Create the delivery request.
376 entry
= (QMGR_ENTRY
*) mymalloc(sizeof(QMGR_ENTRY
));
378 entry
->message
= message
;
379 recipient_list_init(&entry
->rcpt_list
, RCPT_LIST_INIT_QUEUE
);
382 QMGR_LIST_APPEND(peer
->entry_list
, entry
, peer_peers
);
384 entry
->queue
= queue
;
385 QMGR_LIST_APPEND(queue
->todo
, entry
, queue_peers
);
386 queue
->todo_refcount
++;
387 peer
->job
->read_entries
++;
390 * Warn if a destination is falling behind while the active queue
391 * contains a non-trivial amount of single-recipient email. When a
392 * destination takes up more and more space in the active queue, then
393 * other mail will not get through and delivery performance will suffer.
395 * XXX At this point in the code, the busy reference count is still less
396 * than the concurrency limit (otherwise this code would not be invoked
397 * in the first place) so we have to make make some awkward adjustments
400 * XXX The queue length test below looks at the active queue share of an
401 * individual destination. This catches the case where mail for one
402 * destination is falling behind because it has to round-robin compete
403 * with many other destinations. However, Postfix will also perform
404 * poorly when most of the active queue is tied up by a small number of
405 * concurrency limited destinations. The queue length test below detects
406 * such conditions only indirectly.
408 * XXX This code does not detect the case that the active queue is being
409 * starved because incoming mail is pounding the disk.
411 if (var_helpful_warnings
&& var_qmgr_clog_warn_time
> 0) {
412 int queue_length
= queue
->todo_refcount
+ queue
->busy_refcount
;
414 QMGR_TRANSPORT
*transport
;
417 if (queue_length
> var_qmgr_active_limit
/ 5
418 && (now
= event_time()) >= queue
->clog_time_to_warn
) {
419 active_share
= queue_length
/ (double) qmgr_message_count
;
420 msg_warn("mail for %s is using up %d of %d active queue entries",
421 queue
->nexthop
, queue_length
, qmgr_message_count
);
422 if (active_share
< 0.9)
423 msg_warn("this may slow down other mail deliveries");
424 transport
= queue
->transport
;
425 if (transport
->dest_concurrency_limit
> 0
426 && transport
->dest_concurrency_limit
<= queue
->busy_refcount
+ 1)
427 msg_warn("you may need to increase the main.cf %s%s from %d",
428 transport
->name
, _DEST_CON_LIMIT
,
429 transport
->dest_concurrency_limit
);
430 else if (queue
->window
> var_qmgr_active_limit
* active_share
)
431 msg_warn("you may need to increase the main.cf %s from %d",
432 VAR_QMGR_ACT_LIMIT
, var_qmgr_active_limit
);
433 else if (queue
->peers
.next
!= queue
->peers
.prev
)
434 msg_warn("you may need a separate master.cf transport for %s",
437 msg_warn("you may need to reduce %s connect and helo timeouts",
439 msg_warn("so that Postfix quickly skips unavailable hosts");
440 msg_warn("you may need to increase the main.cf %s and %s",
441 VAR_MIN_BACKOFF_TIME
, VAR_MAX_BACKOFF_TIME
);
442 msg_warn("so that Postfix wastes less time on undeliverable mail");
443 msg_warn("you may need to increase the master.cf %s process limit",
446 msg_warn("please avoid flushing the whole queue when you have");
447 msg_warn("lots of deferred mail, that is bad for performance");
448 msg_warn("to turn off these warnings specify: %s = 0",
449 VAR_QMGR_CLOG_WARN_TIME
);
450 queue
->clog_time_to_warn
= now
+ var_qmgr_clog_warn_time
;