11 /* QMGR_JOB *qmgr_job_obtain(message, transport)
12 /* QMGR_MESSAGE *message;
13 /* QMGR_TRANSPORT *transport;
15 /* void qmgr_job_free(job)
18 /* void qmgr_job_move_limits(job)
21 /* QMGR_ENTRY *qmgr_job_entry_select(transport)
22 /* QMGR_TRANSPORT *transport;
24 /* void qmgr_job_blocker_update(queue)
27 /* These routines add/delete/manipulate per-transport jobs.
28 /* Each job corresponds to a specific transport and message.
29 /* Each job has a peer list containing all pending delivery
30 /* requests for that message.
32 /* qmgr_job_obtain() finds an existing job for named message and
33 /* transport combination. New empty job is created if no existing can
34 /* be found. In either case, the job is prepared for assignment of
35 /* (more) message recipients.
37 /* qmgr_job_free() disposes of a per-transport job after all
38 /* its entries have been taken care of. It is an error to dispose
39 /* of a job that is still in use.
41 /* qmgr_job_entry_select() attempts to find the next entry suitable
42 /* for delivery. The job preempting algorithm is also exercised.
43 /* If necessary, an attempt to read more recipients into core is made.
44 /* This can result in creation of more job, queue and entry structures.
46 /* qmgr_job_blocker_update() updates the status of blocked
47 /* jobs after a decrease in the queue's concurrency level,
48 /* after the queue is throttled, or after the queue is resumed
51 /* qmgr_job_move_limits() takes care of proper distribution of the
52 /* per-transport recipients limit among the per-transport jobs.
53 /* Should be called whenever a job's recipient slot becomes available.
55 /* Panic: consistency check failure.
59 /* The Secure Mailer license must be distributed with this software.
69 /* Utility library. */
74 #include <sane_time.h>
76 /* Application-specific. */
80 /* Forward declarations */
82 static void qmgr_job_pop(QMGR_JOB
*);
86 #define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries)
89 * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread
90 * because we don't know if all those unread recipients go to our transport yet.
93 #define MIN_ENTRIES(job) ((job)->read_entries)
94 #define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread)
96 #define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0)
98 #define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag)
100 /* qmgr_job_create - create and initialize message job structure */
102 static QMGR_JOB
*qmgr_job_create(QMGR_MESSAGE
*message
, QMGR_TRANSPORT
*transport
)
106 job
= (QMGR_JOB
*) mymalloc(sizeof(QMGR_JOB
));
107 job
->message
= message
;
108 QMGR_LIST_APPEND(message
->job_list
, job
, message_peers
);
109 htable_enter(transport
->job_byname
, message
->queue_id
, (char *) job
);
110 job
->transport
= transport
;
111 QMGR_LIST_INIT(job
->transport_peers
);
112 QMGR_LIST_INIT(job
->time_peers
);
113 job
->stack_parent
= 0;
114 QMGR_LIST_INIT(job
->stack_children
);
115 QMGR_LIST_INIT(job
->stack_siblings
);
116 job
->stack_level
= -1;
117 job
->blocker_tag
= 0;
118 job
->peer_byname
= htable_create(0);
119 QMGR_LIST_INIT(job
->peer_list
);
121 job
->slots_available
= 0;
122 job
->selected_entries
= 0;
123 job
->read_entries
= 0;
129 /* qmgr_job_link - append the job to the job lists based on the time it was queued */
131 static void qmgr_job_link(QMGR_JOB
*job
)
133 QMGR_TRANSPORT
*transport
= job
->transport
;
134 QMGR_MESSAGE
*message
= job
->message
;
146 if (job
->stack_level
>= 0)
147 msg_panic("qmgr_job_link: already on the job lists (%d)", job
->stack_level
);
150 * Traverse the time list and the scheduler list from the end and stop
151 * when we found job older than the one being linked.
153 * During the traversals keep track if we have come across either the
154 * current job or the first unread job on the job list. If this is the
155 * case, these pointers will be adjusted below as required.
157 * Although both lists are exactly the same when only jobs on the stack
158 * level zero are considered, it's easier to traverse them separately.
159 * Otherwise it's impossible to keep track of the current job pointer
162 * This may look inefficient but under normal operation it is expected that
163 * the loops will stop right away, resulting in normal list appends
164 * below. However, this code is necessary for reviving retired jobs and
165 * for jobs which are created long after the first chunk of recipients
166 * was read in-core (either of these can happen only for multi-transport
169 current
= transport
->job_current
;
170 for (next
= 0, prev
= transport
->job_list
.prev
; prev
;
171 next
= prev
, prev
= prev
->transport_peers
.prev
) {
172 if (prev
->stack_parent
== 0) {
173 delay
= message
->queued_time
- prev
->message
->queued_time
;
183 unread
= transport
->job_next_unread
;
184 for (next
= 0, prev
= transport
->job_bytime
.prev
; prev
;
185 next
= prev
, prev
= prev
->time_peers
.prev
) {
186 delay
= message
->queued_time
- prev
->message
->queued_time
;
194 * Link the job into the proper place on the job lists and mark it so we
195 * know it has been linked.
197 job
->stack_level
= 0;
198 QMGR_LIST_LINK(transport
->job_list
, list_prev
, job
, list_next
, transport_peers
);
199 QMGR_LIST_LINK(transport
->job_bytime
, prev
, job
, next
, time_peers
);
202 * Update the current job pointer if necessary.
205 transport
->job_current
= job
;
208 * Update the pointer to the first unread job on the job list and steal
209 * the unused recipient slots from the old one.
212 unread
= transport
->job_next_unread
;
213 transport
->job_next_unread
= job
;
215 qmgr_job_move_limits(unread
);
219 * Get as much recipient slots as possible. The excess will be returned
220 * to the transport pool as soon as the exact amount required is known
221 * (which is usually after all recipients have been read in core).
223 if (transport
->rcpt_unused
> 0) {
224 job
->rcpt_limit
+= transport
->rcpt_unused
;
225 message
->rcpt_limit
+= transport
->rcpt_unused
;
226 transport
->rcpt_unused
= 0;
230 /* qmgr_job_find - lookup job associated with named message and transport */
232 static QMGR_JOB
*qmgr_job_find(QMGR_MESSAGE
*message
, QMGR_TRANSPORT
*transport
)
236 * Instead of traversing the message job list, we use single per
237 * transport hash table. This is better (at least with respect to memory
238 * usage) than having single hash table (usually almost empty) for each
241 return ((QMGR_JOB
*) htable_find(transport
->job_byname
, message
->queue_id
));
244 /* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */
246 QMGR_JOB
*qmgr_job_obtain(QMGR_MESSAGE
*message
, QMGR_TRANSPORT
*transport
)
251 * Try finding an existing job, reviving it if it was already retired.
252 * Create a new job for this transport/message combination otherwise. In
253 * either case, the job ends linked on the job lists.
255 if ((job
= qmgr_job_find(message
, transport
)) == 0)
256 job
= qmgr_job_create(message
, transport
);
257 if (job
->stack_level
< 0)
261 * Reset the candidate cache because of the new expected recipients. Make
262 * sure the job is not marked as a blocker for the same reason. Note that
263 * this can result in having a non-blocker followed by more blockers.
264 * Consequently, we can't just update the current job pointer, we have to
265 * reset it. Fortunately qmgr_job_entry_select() will easily deal with
266 * this and will lookup the real current job for us.
268 RESET_CANDIDATE_CACHE(transport
);
269 if (IS_BLOCKER(job
, transport
)) {
270 job
->blocker_tag
= 0;
271 transport
->job_current
= transport
->job_list
.next
;
276 /* qmgr_job_move_limits - move unused recipient slots to the next unread job */
278 void qmgr_job_move_limits(QMGR_JOB
*job
)
280 QMGR_TRANSPORT
*transport
= job
->transport
;
281 QMGR_MESSAGE
*message
= job
->message
;
282 QMGR_JOB
*next
= transport
->job_next_unread
;
287 * Find next unread job on the job list if necessary. Cache it for later.
288 * This makes the amortized efficiency of this routine O(1) per job. Note
289 * that we use the time list whose ordering doesn't change over time.
292 for (next
= next
->time_peers
.next
; next
; next
= next
->time_peers
.next
)
293 if (next
->message
->rcpt_offset
!= 0)
295 transport
->job_next_unread
= next
;
299 * Calculate the number of available unused slots.
301 rcpt_unused
= job
->rcpt_limit
- job
->rcpt_count
;
302 msg_rcpt_unused
= message
->rcpt_limit
- message
->rcpt_count
;
303 if (msg_rcpt_unused
< rcpt_unused
)
304 rcpt_unused
= msg_rcpt_unused
;
307 * Transfer the unused recipient slots back to the transport pool and to
308 * the next not-fully-read job. Job's message limits are adjusted
309 * accordingly. Note that the transport pool can be negative if we used
310 * some of the rcpt_per_stack slots.
312 if (rcpt_unused
> 0) {
313 job
->rcpt_limit
-= rcpt_unused
;
314 message
->rcpt_limit
-= rcpt_unused
;
315 transport
->rcpt_unused
+= rcpt_unused
;
316 if (next
!= 0 && (rcpt_unused
= transport
->rcpt_unused
) > 0) {
317 next
->rcpt_limit
+= rcpt_unused
;
318 next
->message
->rcpt_limit
+= rcpt_unused
;
319 transport
->rcpt_unused
= 0;
324 /* qmgr_job_parent_gone - take care of orphaned stack children */
326 static void qmgr_job_parent_gone(QMGR_JOB
*job
, QMGR_JOB
*parent
)
330 while ((child
= job
->stack_children
.next
) != 0) {
331 QMGR_LIST_UNLINK(job
->stack_children
, QMGR_JOB
*, child
, stack_siblings
);
333 QMGR_LIST_APPEND(parent
->stack_children
, child
, stack_siblings
);
334 child
->stack_parent
= parent
;
338 /* qmgr_job_unlink - unlink the job from the job lists */
340 static void qmgr_job_unlink(QMGR_JOB
*job
)
342 const char *myname
= "qmgr_job_unlink";
343 QMGR_TRANSPORT
*transport
= job
->transport
;
348 if (job
->stack_level
!= 0)
349 msg_panic("%s: non-zero stack level (%d)", myname
, job
->stack_level
);
350 if (job
->stack_parent
!= 0)
351 msg_panic("%s: parent present", myname
);
352 if (job
->stack_siblings
.next
!= 0)
353 msg_panic("%s: siblings present", myname
);
356 * Make sure that children of job on zero stack level are informed that
357 * their parent is gone too.
359 qmgr_job_parent_gone(job
, 0);
362 * Update the current job pointer if necessary.
364 if (transport
->job_current
== job
)
365 transport
->job_current
= job
->transport_peers
.next
;
368 * Invalidate the candidate selection cache if necessary.
370 if (job
== transport
->candidate_cache
371 || job
== transport
->candidate_cache_current
)
372 RESET_CANDIDATE_CACHE(transport
);
375 * Remove the job from the job lists and mark it as unlinked.
377 QMGR_LIST_UNLINK(transport
->job_list
, QMGR_JOB
*, job
, transport_peers
);
378 QMGR_LIST_UNLINK(transport
->job_bytime
, QMGR_JOB
*, job
, time_peers
);
379 job
->stack_level
= -1;
382 /* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */
384 static void qmgr_job_retire(QMGR_JOB
*job
)
387 msg_info("qmgr_job_retire: %s", job
->message
->queue_id
);
390 * Pop the job from the job stack if necessary.
392 if (job
->stack_level
> 0)
396 * Make sure this job is not cached as the next unread job for this
397 * transport. The qmgr_entry_done() will make sure that the slots donated
398 * by this job are moved back to the transport pool as soon as possible.
400 qmgr_job_move_limits(job
);
403 * Remove the job from the job lists. Note that it remains on the message
404 * job list, though, and that it can be revived by using
405 * qmgr_job_obtain(). Also note that the available slot counter is left
408 qmgr_job_unlink(job
);
411 /* qmgr_job_free - release the job structure */
413 void qmgr_job_free(QMGR_JOB
*job
)
415 const char *myname
= "qmgr_job_free";
416 QMGR_MESSAGE
*message
= job
->message
;
417 QMGR_TRANSPORT
*transport
= job
->transport
;
420 msg_info("%s: %s %s", myname
, message
->queue_id
, transport
->name
);
426 msg_panic("%s: non-zero recipient count (%d)", myname
, job
->rcpt_count
);
429 * Pop the job from the job stack if necessary.
431 if (job
->stack_level
> 0)
435 * Return any remaining recipient slots back to the recipient slots pool.
437 qmgr_job_move_limits(job
);
439 msg_panic("%s: recipient slots leak (%d)", myname
, job
->rcpt_limit
);
442 * Unlink and discard the structure. Check if the job is still linked on
443 * the job lists or if it was already retired before unlinking it.
445 if (job
->stack_level
>= 0)
446 qmgr_job_unlink(job
);
447 QMGR_LIST_UNLINK(message
->job_list
, QMGR_JOB
*, job
, message_peers
);
448 htable_delete(transport
->job_byname
, message
->queue_id
, (void (*) (char *)) 0);
449 htable_free(job
->peer_byname
, (void (*) (char *)) 0);
450 myfree((char *) job
);
453 /* qmgr_job_count_slots - maintain the delivery slot counters */
455 static void qmgr_job_count_slots(QMGR_JOB
*job
)
459 * Count the number of delivery slots used during the delivery of the
460 * selected job. Also count the number of delivery slots available for
463 * Despite its trivial look, this is one of the key parts of the theory
464 * behind this preempting scheduler.
466 job
->slots_available
++;
470 * If the selected job is not the original current job, reset the
471 * candidate cache because the change above have slightly increased the
472 * chance of this job becoming a candidate next time.
474 * Don't expect that the change of the current jobs this turn will render
475 * the candidate cache invalid the next turn - it can happen that the
476 * next turn the original current job will be selected again and the
477 * cache would be considered valid in such case.
479 if (job
!= job
->transport
->candidate_cache_current
)
480 RESET_CANDIDATE_CACHE(job
->transport
);
483 /* qmgr_job_candidate - find best job candidate for preempting given job */
485 static QMGR_JOB
*qmgr_job_candidate(QMGR_JOB
*current
)
487 QMGR_TRANSPORT
*transport
= current
->transport
;
496 time_t now
= sane_time();
499 * Fetch the result directly from the cache if the cache is still valid.
501 * Note that we cache negative results too, so the cache must be invalidated
502 * by resetting the cached current job pointer, not the candidate pointer
505 * In case the cache is valid and contains no candidate, we can ignore the
506 * time change, as it affects only which candidate is the best, not if
507 * one exists. However, this feature requires that we no longer relax the
508 * cache resetting rules, depending on the automatic cache timeout.
510 if (transport
->candidate_cache_current
== current
511 && (transport
->candidate_cache_time
== now
512 || transport
->candidate_cache
== 0))
513 return (transport
->candidate_cache
);
516 * Estimate the minimum amount of delivery slots that can ever be
517 * accumulated for the given job. All jobs that won't fit into these
518 * slots are excluded from the candidate selection.
520 max_slots
= (MIN_ENTRIES(current
) - current
->selected_entries
521 + current
->slots_available
) / transport
->slot_cost
;
524 * Select the candidate with best time_since_queued/total_recipients
525 * score. In addition to jobs which don't meet the max_slots limit, skip
526 * also jobs which don't have any selectable entries at the moment.
528 * Instead of traversing the whole job list we traverse it just from the
529 * current job forward. This has several advantages. First, we skip some
530 * of the blocker jobs and the current job itself right away. But the
531 * really important advantage is that we are sure that we don't consider
532 * any jobs that are already stack children of the current job. Thanks to
533 * this we can easily include all encountered jobs which are leaf
534 * children of some of the preempting stacks as valid candidates. All we
535 * need to do is to make sure we do not include any of the stack parents.
536 * And, because the leaf children are not ordered by the time since
537 * queued, we have to exclude them from the early loop end test.
539 * However, don't bother searching if we can't find anything suitable
543 for (job
= current
->transport_peers
.next
; job
; job
= job
->transport_peers
.next
) {
544 if (job
->stack_children
.next
!= 0 || IS_BLOCKER(job
, transport
))
546 max_total_entries
= MAX_ENTRIES(job
);
547 max_needed_entries
= max_total_entries
- job
->selected_entries
;
548 delay
= now
- job
->message
->queued_time
+ 1;
549 if (max_needed_entries
> 0 && max_needed_entries
<= max_slots
) {
550 score
= (double) delay
/ max_total_entries
;
551 if (score
> best_score
) {
558 * Stop early if the best score is as good as it can get.
560 if (delay
<= best_score
&& job
->stack_level
== 0)
566 * Cache the result for later use.
568 transport
->candidate_cache
= best_job
;
569 transport
->candidate_cache_current
= current
;
570 transport
->candidate_cache_time
= now
;
575 /* qmgr_job_preempt - preempt large message with smaller one */
577 static QMGR_JOB
*qmgr_job_preempt(QMGR_JOB
*current
)
579 const char *myname
= "qmgr_job_preempt";
580 QMGR_TRANSPORT
*transport
= current
->transport
;
587 * Suppress preempting completely if the current job is not big enough to
588 * accumulate even the minimal number of slots required.
590 * Also, don't look for better job candidate if there are no available slots
591 * yet (the count can get negative due to the slot loans below).
593 if (current
->slots_available
<= 0
594 || MAX_ENTRIES(current
) < transport
->min_slots
* transport
->slot_cost
)
598 * Find best candidate for preempting the current job.
600 * Note that the function also takes care that the candidate fits within the
601 * number of delivery slots which the current job is still able to
604 if ((job
= qmgr_job_candidate(current
)) == 0)
611 msg_panic("%s: attempt to preempt itself", myname
);
612 if (job
->stack_children
.next
!= 0)
613 msg_panic("%s: already on the job stack (%d)", myname
, job
->stack_level
);
614 if (job
->stack_level
< 0)
615 msg_panic("%s: not on the job list (%d)", myname
, job
->stack_level
);
618 * Check if there is enough available delivery slots accumulated to
619 * preempt the current job.
621 * The slot loaning scheme improves the average message response time. Note
622 * that the loan only allows the preemption happen earlier, though. It
623 * doesn't affect how many slots have to be "paid" - in either case the
624 * full number of slots required has to be accumulated later before the
625 * current job can be preempted again.
627 expected_slots
= MAX_ENTRIES(job
) - job
->selected_entries
;
628 if (current
->slots_available
/ transport
->slot_cost
+ transport
->slot_loan
629 < expected_slots
* transport
->slot_loan_factor
/ 100.0)
633 * Preempt the current job.
635 * This involves placing the selected candidate in front of the current job
636 * on the job list and updating the stack parent/child/sibling pointers
637 * appropriately. But first we need to make sure that the candidate is
638 * taken from its previous job stack which it might be top of.
640 if (job
->stack_level
> 0)
642 QMGR_LIST_UNLINK(transport
->job_list
, QMGR_JOB
*, job
, transport_peers
);
643 prev
= current
->transport_peers
.prev
;
644 QMGR_LIST_LINK(transport
->job_list
, prev
, job
, current
, transport_peers
);
645 job
->stack_parent
= current
;
646 QMGR_LIST_APPEND(current
->stack_children
, job
, stack_siblings
);
647 job
->stack_level
= current
->stack_level
+ 1;
650 * Update the current job pointer and explicitly reset the candidate
653 transport
->job_current
= job
;
654 RESET_CANDIDATE_CACHE(transport
);
657 * Since the single job can be preempted by several jobs at the same
658 * time, we have to adjust the available slot count now to prevent using
659 * the same slots multiple times. To do that we subtract the number of
660 * slots the preempting job will supposedly use. This number will be
661 * corrected later when that job is popped from the stack to reflect the
662 * number of slots really used.
664 * As long as we don't need to keep track of how many slots were really
665 * used, we can (ab)use the slots_used counter for counting the
666 * difference between the real and expected amounts instead of the
669 current
->slots_available
-= expected_slots
* transport
->slot_cost
;
670 job
->slots_used
= -expected_slots
;
673 * Add part of extra recipient slots reserved for preempting jobs to the
674 * new current job if necessary.
676 * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such
679 if (job
->message
->rcpt_offset
!= 0) {
680 rcpt_slots
= (transport
->rcpt_per_stack
+ transport
->rcpt_unused
+ 1) / 2;
681 job
->rcpt_limit
+= rcpt_slots
;
682 job
->message
->rcpt_limit
+= rcpt_slots
;
683 transport
->rcpt_unused
-= rcpt_slots
;
686 msg_info("%s: %s by %s, level %d", myname
, current
->message
->queue_id
,
687 job
->message
->queue_id
, job
->stack_level
);
692 /* qmgr_job_pop - remove the job from its job preemption stack */
694 static void qmgr_job_pop(QMGR_JOB
*job
)
696 const char *myname
= "qmgr_job_pop";
697 QMGR_TRANSPORT
*transport
= job
->transport
;
701 msg_info("%s: %s", myname
, job
->message
->queue_id
);
706 if (job
->stack_level
<= 0)
707 msg_panic("%s: not on the job stack (%d)", myname
, job
->stack_level
);
710 * Adjust the number of delivery slots available to preempt job's parent.
712 * Note that we intentionally do not adjust slots_used of the parent. Doing
713 * so would decrease the maximum per message inflation factor if the
714 * preemption appeared near the end of parent delivery.
716 * For the same reason we do not adjust parent's slots_available if the
717 * parent is not the original parent that was preempted by this job
718 * (i.e., the original parent job has already completed).
720 * This is another key part of the theory behind this preempting scheduler.
722 if ((parent
= job
->stack_parent
) != 0
723 && job
->stack_level
== parent
->stack_level
+ 1)
724 parent
->slots_available
-= job
->slots_used
* transport
->slot_cost
;
727 * Remove the job from its parent's children list.
730 QMGR_LIST_UNLINK(parent
->stack_children
, QMGR_JOB
*, job
, stack_siblings
);
731 job
->stack_parent
= 0;
735 * If there is a parent, let it adopt all those orphaned children.
736 * Otherwise at least notify the children that their parent is gone.
738 qmgr_job_parent_gone(job
, parent
);
741 * Put the job back to stack level zero.
743 job
->stack_level
= 0;
746 * Explicitly reset the candidate cache. It's not worth trying to skip
747 * this under some complicated conditions - in most cases the popped job
748 * is the current job so we would have to reset it anyway.
750 RESET_CANDIDATE_CACHE(transport
);
753 * Here we leave the remaining work involving the proper placement on the
754 * job list to the caller. The most important reason for this is that it
755 * allows us not to look up where exactly to place the job.
757 * The caller is also made responsible for invalidating the current job
758 * cache if necessary.
761 QMGR_LIST_UNLINK(transport
->job_list
, QMGR_JOB
*, job
, transport_peers
);
762 QMGR_LIST_LINK(transport
->job_list
, some_prev
, job
, some_next
, transport_peers
);
764 if (transport
->job_current
== job
)
765 transport
->job_current
= job
->transport_peers
.next
;
769 /* qmgr_job_peer_select - select next peer suitable for delivery */
771 static QMGR_PEER
*qmgr_job_peer_select(QMGR_JOB
*job
)
774 QMGR_MESSAGE
*message
= job
->message
;
777 * Try reading in more recipients. We do that as soon as possible
778 * (almost, see below), to make sure there is enough new blood pouring
779 * in. Otherwise single recipient for slow destination might starve the
780 * entire message delivery, leaving lot of fast destination recipients
781 * sitting idle in the queue file.
783 * Ideally we would like to read in recipients whenever there is a
784 * space, but to prevent excessive I/O, we read them only when enough
785 * time has passed or we can read enough of them at once.
787 * Note that even if we read the recipients few at a time, the message
788 * loading code tries to put them to existing recipient entries whenever
789 * possible, so the per-destination recipient grouping is not grossly
792 * XXX Workaround for logic mismatch. The message->refcount test needs
793 * explanation. If the refcount is zero, it means that qmgr_active_done()
794 * is being completed asynchronously. In such case, we can't read in
795 * more recipients as bad things would happen after qmgr_active_done()
796 * continues processing. Note that this results in the given job being
797 * stalled for some time, but fortunately this particular situation is so
798 * rare that it is not critical. Still we seek for better solution.
800 if (message
->rcpt_offset
!= 0
801 && message
->refcount
> 0
802 && (message
->rcpt_limit
- message
->rcpt_count
>= job
->transport
->refill_limit
803 || (message
->rcpt_limit
> message
->rcpt_count
804 && sane_time() - message
->refill_time
>= job
->transport
->refill_delay
)))
805 qmgr_message_realloc(message
);
808 * Get the next suitable peer, if there is any.
810 if (HAS_ENTRIES(job
) && (peer
= qmgr_peer_select(job
)) != 0)
814 * There is no suitable peer in-core, so try reading in more recipients if possible.
815 * This is our last chance to get suitable peer before giving up on this job for now.
817 * XXX For message->refcount, see above.
819 if (message
->rcpt_offset
!= 0
820 && message
->refcount
> 0
821 && message
->rcpt_limit
> message
->rcpt_count
) {
822 qmgr_message_realloc(message
);
823 if (HAS_ENTRIES(job
))
824 return (qmgr_peer_select(job
));
829 /* qmgr_job_entry_select - select next entry suitable for delivery */
831 QMGR_ENTRY
*qmgr_job_entry_select(QMGR_TRANSPORT
*transport
)
839 * Get the current job if there is one.
841 if ((job
= transport
->job_current
) == 0)
845 * Exercise the preempting algorithm if enabled.
847 * The slot_cost equal to 1 causes the algorithm to degenerate and is
848 * therefore disabled too.
850 if (transport
->slot_cost
>= 2)
851 job
= qmgr_job_preempt(job
);
854 * Select next entry suitable for delivery. In case the current job can't
855 * provide one because of the per-destination concurrency limits, we mark
856 * it as a "blocker" job and continue with the next job on the job list.
858 * Note that the loop also takes care of getting the "stall" jobs (job with
859 * no entries currently available) out of the way if necessary. Stall
860 * jobs can appear in case of multi-transport messages whose recipients
861 * don't fit in-core at once. Some jobs created by such message may have
862 * only few recipients and would stay on the job list until all other
863 * jobs of that message are delivered, blocking precious recipient slots
864 * available to this transport. Or it can happen that the job has some
865 * more entries but suddenly they all get deferred. Whatever the reason,
866 * we retire such jobs below if we happen to come across some.
868 for ( /* empty */ ; job
; job
= next
) {
869 next
= job
->transport_peers
.next
;
872 * Don't bother if the job is known to have no available entries
873 * because of the per-destination concurrency limits.
875 if (IS_BLOCKER(job
, transport
))
878 if ((peer
= qmgr_job_peer_select(job
)) != 0) {
881 * We have found a suitable peer. Select one of its entries and
882 * adjust the delivery slot counters.
884 entry
= qmgr_entry_select(peer
);
885 qmgr_job_count_slots(job
);
888 * Remember the current job for the next time so we don't have to
889 * crawl over all those blockers again. They will be reconsidered
890 * when the concurrency limit permits.
892 transport
->job_current
= job
;
895 * In case we selected the very last job entry, remove the job
896 * from the job lists right now.
898 * This action uses the assumption that once the job entry has been
899 * selected, it can be unselected only before the message ifself
900 * is deferred. Thus the job with all entries selected can't
901 * re-appear with more entries available for selection again
902 * (without reading in more entries from the queue file, which in
903 * turn invokes qmgr_job_obtain() which re-links the job back on
904 * the lists if necessary).
906 * Note that qmgr_job_move_limits() transfers the recipients slots
907 * correctly even if the job is unlinked from the job list thanks
908 * to the job_next_unread caching.
910 if (!HAS_ENTRIES(job
) && job
->message
->rcpt_offset
== 0)
911 qmgr_job_retire(job
);
914 * Finally. Hand back the fruit of our tedious effort.
917 } else if (HAS_ENTRIES(job
)) {
920 * The job can't be selected due the concurrency limits. Mark it
921 * together with its queues so we know they are blocking the job
922 * list and they get the appropriate treatment. In particular,
923 * all blockers will be reconsidered when one of the problematic
924 * queues will accept more deliveries. And the job itself will be
925 * reconsidered if it is assigned some more entries.
927 job
->blocker_tag
= transport
->blocker_tag
;
928 for (peer
= job
->peer_list
.next
; peer
; peer
= peer
->peers
.next
)
929 if (peer
->entry_list
.next
!= 0)
930 peer
->queue
->blocker_tag
= transport
->blocker_tag
;
934 * The job is "stalled". Retire it until it either gets freed or
935 * gets more entries later.
937 qmgr_job_retire(job
);
942 * We have not found any entry we could use for delivery. Well, things
943 * must have changed since this transport was selected for asynchronous
944 * allocation. Never mind. Clear the current job pointer and reluctantly
945 * report back that we have failed in our task.
947 transport
->job_current
= 0;
951 /* qmgr_job_blocker_update - update "blocked job" status */
953 void qmgr_job_blocker_update(QMGR_QUEUE
*queue
)
955 QMGR_TRANSPORT
*transport
= queue
->transport
;
958 * If the queue was blocking some of the jobs on the job list, check if
959 * the concurrency limit has lifted. If there are still some pending
960 * deliveries, give it a try and unmark all transport blockers at once.
961 * The qmgr_job_entry_select() will do the rest. In either case make sure
962 * the queue is not marked as a blocker anymore, with extra handling of
963 * queues which were declared dead.
965 * Note that changing the blocker status also affects the candidate cache.
966 * Most of the cases would be automatically recognized by the current job
967 * change, but we play safe and reset the cache explicitly below.
969 * Keeping the transport blocker tag odd is an easy way to make sure the tag
970 * never matches jobs that are not explicitly marked as blockers.
972 if (queue
->blocker_tag
== transport
->blocker_tag
) {
973 if (queue
->window
> queue
->busy_refcount
&& queue
->todo
.next
!= 0) {
974 transport
->blocker_tag
+= 2;
975 transport
->job_current
= transport
->job_list
.next
;
976 transport
->candidate_cache_current
= 0;
978 if (queue
->window
> queue
->busy_refcount
|| QMGR_QUEUE_THROTTLED(queue
))
979 queue
->blocker_tag
= 0;