7 /* per-transport data structures
11 /* QMGR_TRANSPORT *qmgr_transport_create(name)
14 /* QMGR_TRANSPORT *qmgr_transport_find(name)
17 /* QMGR_TRANSPORT *qmgr_transport_select()
19 /* void qmgr_transport_alloc(transport, notify)
20 /* QMGR_TRANSPORT *transport;
21 /* void (*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp);
23 /* void qmgr_transport_throttle(transport, dsn)
24 /* QMGR_TRANSPORT *transport;
27 /* void qmgr_transport_unthrottle(transport)
28 /* QMGR_TRANSPORT *transport;
30 /* This module organizes the world by message transport type.
31 /* Each transport can have zero or more destination queues
32 /* associated with it.
34 /* qmgr_transport_create() instantiates a data structure for the
35 /* named transport type.
37 /* qmgr_transport_find() looks up an existing message transport
40 /* qmgr_transport_select() attempts to find a transport that
41 /* has messages pending delivery. This routine implements
42 /* round-robin search among transports.
44 /* qmgr_transport_alloc() allocates a delivery process for the
45 /* specified transport type. Allocation is performed asynchronously.
46 /* When a process becomes available, the application callback routine
47 /* is invoked with as arguments the transport and a stream that
48 /* is connected to a delivery process. It is an error to call
49 /* qmgr_transport_alloc() while delivery process allocation for
50 /* the same transport is in progress.
52 /* qmgr_transport_throttle blocks further allocation of delivery
53 /* processes for the named transport. Attempts to throttle a
54 /* throttled transport are ignored.
56 /* qmgr_transport_unthrottle() undoes qmgr_transport_throttle().
57 /* Attempts to unthrottle a non-throttled transport are ignored.
59 /* Panic: consistency check failure. Fatal: out of memory.
63 /* The Secure Mailer license must be distributed with this software.
66 /* IBM T.J. Watson Research
68 /* Yorktown Heights, NY 10598, USA
70 /* Preemptive scheduler enhancements:
73 /* 155 00, Prague, Czech Republic
81 #include <sys/time.h> /* FD_SETSIZE */
82 #include <sys/types.h> /* FD_SETSIZE */
83 #include <unistd.h> /* FD_SETSIZE */
85 #ifdef USE_SYS_SELECT_H
86 #include <sys/select.h> /* FD_SETSIZE */
89 /* Utility library. */
100 #include <mail_proto.h>
101 #include <recipient_list.h>
102 #include <mail_conf.h>
103 #include <mail_params.h>
105 /* Application-specific. */
109 HTABLE
*qmgr_transport_byname
; /* transport by name */
110 QMGR_TRANSPORT_LIST qmgr_transport_list
;/* transports, round robin */
113 * A local structure to remember a delivery process allocation request.
115 typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC
;
117 struct QMGR_TRANSPORT_ALLOC
{
118 QMGR_TRANSPORT
*transport
; /* transport context */
119 VSTREAM
*stream
; /* delivery service stream */
120 QMGR_TRANSPORT_ALLOC_NOTIFY notify
; /* application call-back routine */
124 * Connections to delivery agents are managed asynchronously. Each delivery
125 * agent connection goes through multiple wait states:
127 * - With Linux/Solaris and old queue manager implementations only, wait for
128 * the server to invoke accept().
130 * - Wait for the delivery agent's announcement that it is ready to receive a
133 * - Wait for the delivery request completion status.
135 * Older queue manager implementations had only one pending delivery agent
136 * connection per transport. With low-latency destinations, the output rates
137 * were reduced on Linux/Solaris systems that had the extra wait state.
139 * To maximize delivery agent output rates with low-latency destinations, the
140 * following changes were made to the queue manager by the end of the 2.4
143 * - The Linux/Solaris accept() wait state was eliminated.
145 * - A pipeline was implemented for pending delivery agent connections. The
146 * number of pending delivery agent connections was increased from one to
147 * two: the number of before-delivery wait states, plus one extra pipeline
148 * slot to prevent the pipeline from stalling easily. Increasing the
149 * pipeline much further actually hurt performance.
151 * - To reduce queue manager disk competition with delivery agents, the queue
152 * scanning algorithm was modified to import only one message per interrupt.
153 * The incoming and deferred queue scans now happen on alternate interrupts.
155 * Simplistically reasoned, a non-zero (incoming + active) queue length is
156 * equivalent to a time shift for mail deliveries; this is undesirable when
157 * delivery agents are not fully utilized.
159 * On the other hand a non-empty active queue is what allows us to do clever
160 * things such as queue file prefetch, concurrency windows, and connection
161 * caching; the idea is that such "thinking time" is affordable only after
162 * the output channels are maxed out.
164 #ifndef QMGR_TRANSPORT_MAX_PEND
165 #define QMGR_TRANSPORT_MAX_PEND 2
168 /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
170 static void qmgr_transport_unthrottle_wrapper(int unused_event
, char *context
)
172 qmgr_transport_unthrottle((QMGR_TRANSPORT
*) context
);
175 /* qmgr_transport_unthrottle - open the throttle */
177 void qmgr_transport_unthrottle(QMGR_TRANSPORT
*transport
)
179 const char *myname
= "qmgr_transport_unthrottle";
182 * This routine runs after expiration of the timer set by
183 * qmgr_transport_throttle(), or whenever a delivery transport has been
184 * used without malfunction. In either case, we enable delivery again if
185 * the transport was blocked, otherwise the request is ignored.
187 if ((transport
->flags
& QMGR_TRANSPORT_STAT_DEAD
) != 0) {
189 msg_info("%s: transport %s", myname
, transport
->name
);
190 transport
->flags
&= ~QMGR_TRANSPORT_STAT_DEAD
;
191 if (transport
->dsn
== 0)
192 msg_panic("%s: transport %s: null reason",
193 myname
, transport
->name
);
194 dsn_free(transport
->dsn
);
196 event_cancel_timer(qmgr_transport_unthrottle_wrapper
,
201 /* qmgr_transport_throttle - disable delivery process allocation */
203 void qmgr_transport_throttle(QMGR_TRANSPORT
*transport
, DSN
*dsn
)
205 const char *myname
= "qmgr_transport_throttle";
208 * We are unable to connect to a deliver process for this type of message
209 * transport. Instead of hosing the system by retrying in a tight loop,
210 * back off and disable this transport type for a while.
212 if ((transport
->flags
& QMGR_TRANSPORT_STAT_DEAD
) == 0) {
214 msg_info("%s: transport %s: status: %s reason: %s",
215 myname
, transport
->name
, dsn
->status
, dsn
->reason
);
216 transport
->flags
|= QMGR_TRANSPORT_STAT_DEAD
;
218 msg_panic("%s: transport %s: spurious reason: %s",
219 myname
, transport
->name
, transport
->dsn
->reason
);
220 transport
->dsn
= DSN_COPY(dsn
);
221 event_request_timer(qmgr_transport_unthrottle_wrapper
,
222 (char *) transport
, var_transport_retry_time
);
226 /* qmgr_transport_abort - transport connect watchdog */
228 static void qmgr_transport_abort(int unused_event
, char *context
)
230 QMGR_TRANSPORT_ALLOC
*alloc
= (QMGR_TRANSPORT_ALLOC
*) context
;
232 msg_fatal("timeout connecting to transport: %s", alloc
->transport
->name
);
235 /* qmgr_transport_event - delivery process availability notice */
237 static void qmgr_transport_event(int unused_event
, char *context
)
239 QMGR_TRANSPORT_ALLOC
*alloc
= (QMGR_TRANSPORT_ALLOC
*) context
;
242 * This routine notifies the application when the request given to
243 * qmgr_transport_alloc() completes.
246 msg_info("transport_event: %s", alloc
->transport
->name
);
249 * Connection request completed. Stop the watchdog timer.
251 event_cancel_timer(qmgr_transport_abort
, context
);
254 * Disable further read events that end up calling this function, and
255 * free up this pending connection pipeline slot.
258 event_disable_readwrite(vstream_fileno(alloc
->stream
));
259 non_blocking(vstream_fileno(alloc
->stream
), BLOCKING
);
261 alloc
->transport
->pending
-= 1;
264 * Notify the requestor.
266 alloc
->notify(alloc
->transport
, alloc
->stream
);
267 myfree((char *) alloc
);
270 /* qmgr_transport_select - select transport for allocation */
272 QMGR_TRANSPORT
*qmgr_transport_select(void)
274 QMGR_TRANSPORT
*xport
;
279 * If we find a suitable transport, rotate the list of transports to
280 * effectuate round-robin selection. See similar selection code in
281 * qmgr_peer_select().
283 * This function is called repeatedly until all transports have maxed out
284 * the number of pending delivery agent connections, until all delivery
285 * agent concurrency windows are maxed out, or until we run out of "todo"
288 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
290 for (xport
= qmgr_transport_list
.next
; xport
; xport
= xport
->peers
.next
) {
291 if ((xport
->flags
& QMGR_TRANSPORT_STAT_DEAD
) != 0
292 || xport
->pending
>= QMGR_TRANSPORT_MAX_PEND
)
294 need
= xport
->pending
+ 1;
295 for (queue
= xport
->queue_list
.next
; queue
; queue
= queue
->peers
.next
) {
296 if (QMGR_QUEUE_READY(queue
) == 0)
298 if ((need
-= MIN5af51743e4eef(queue
->window
- queue
->busy_refcount
,
299 queue
->todo_refcount
)) <= 0) {
300 QMGR_LIST_ROTATE(qmgr_transport_list
, xport
, peers
);
302 msg_info("qmgr_transport_select: %s", xport
->name
);
310 /* qmgr_transport_alloc - allocate delivery process */
312 void qmgr_transport_alloc(QMGR_TRANSPORT
*transport
, QMGR_TRANSPORT_ALLOC_NOTIFY notify
)
314 QMGR_TRANSPORT_ALLOC
*alloc
;
319 if (transport
->flags
& QMGR_TRANSPORT_STAT_DEAD
)
320 msg_panic("qmgr_transport: dead transport: %s", transport
->name
);
321 if (transport
->pending
>= QMGR_TRANSPORT_MAX_PEND
)
322 msg_panic("qmgr_transport: excess allocation: %s", transport
->name
);
325 * Connect to the well-known port for this delivery service, and wake up
326 * when a process announces its availability. Allow only a limited number
327 * of delivery process allocation attempts for this transport. In case of
328 * problems, back off. Do not hose the system when it is in trouble
331 * Use non-blocking connect(), so that Linux won't block the queue manager
332 * until the delivery agent calls accept().
334 * When the connection to delivery agent cannot be completed, notify the
335 * event handler so that it can throttle the transport and defer the todo
336 * queues, just like it does when communication fails *after* connection
339 * Before Postfix 2.4, the event handler was not invoked after connect()
340 * error, and mail was not deferred. Because of this, mail would be stuck
341 * in the active queue after triggering a "connection refused" condition.
343 alloc
= (QMGR_TRANSPORT_ALLOC
*) mymalloc(sizeof(*alloc
));
344 alloc
->transport
= transport
;
345 alloc
->notify
= notify
;
346 transport
->pending
+= 1;
347 if ((alloc
->stream
= mail_connect(MAIL_CLASS_PRIVATE
, transport
->name
,
348 NON_BLOCKING
)) == 0) {
349 msg_warn("connect to transport %s/%s: %m",
350 MAIL_CLASS_PRIVATE
, transport
->name
);
351 event_request_timer(qmgr_transport_event
, (char *) alloc
, 0);
354 #if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD)
355 #ifndef THRESHOLD_FD_WORKAROUND
356 #define THRESHOLD_FD_WORKAROUND 128
358 vstream_control(alloc
->stream
,
359 VSTREAM_CTL_DUPFD
, THRESHOLD_FD_WORKAROUND
,
362 event_enable_read(vstream_fileno(alloc
->stream
), qmgr_transport_event
,
366 * Guard against broken systems.
368 event_request_timer(qmgr_transport_abort
, (char *) alloc
,
372 /* qmgr_transport_create - create transport instance */
374 QMGR_TRANSPORT
*qmgr_transport_create(const char *name
)
376 QMGR_TRANSPORT
*transport
;
378 if (htable_find(qmgr_transport_byname
, name
) != 0)
379 msg_panic("qmgr_transport_create: transport exists: %s", name
);
380 transport
= (QMGR_TRANSPORT
*) mymalloc(sizeof(QMGR_TRANSPORT
));
381 transport
->flags
= 0;
382 transport
->pending
= 0;
383 transport
->name
= mystrdup(name
);
386 * Use global configuration settings or transport-specific settings.
388 transport
->dest_concurrency_limit
=
389 get_mail_conf_int2(name
, _DEST_CON_LIMIT
,
390 var_dest_con_limit
, 0, 0);
391 transport
->recipient_limit
=
392 get_mail_conf_int2(name
, _DEST_RCPT_LIMIT
,
393 var_dest_rcpt_limit
, 0, 0);
394 transport
->init_dest_concurrency
=
395 get_mail_conf_int2(name
, _INIT_DEST_CON
,
396 var_init_dest_concurrency
, 1, 0);
397 transport
->rate_delay
= get_mail_conf_time2(name
, _DEST_RATE_DELAY
,
401 if (transport
->rate_delay
> 0)
402 transport
->dest_concurrency_limit
= 1;
403 if (transport
->dest_concurrency_limit
!= 0
404 && transport
->dest_concurrency_limit
< transport
->init_dest_concurrency
)
405 transport
->init_dest_concurrency
= transport
->dest_concurrency_limit
;
407 transport
->slot_cost
= get_mail_conf_int2(name
, _DELIVERY_SLOT_COST
,
408 var_delivery_slot_cost
, 0, 0);
409 transport
->slot_loan
= get_mail_conf_int2(name
, _DELIVERY_SLOT_LOAN
,
410 var_delivery_slot_loan
, 0, 0);
411 transport
->slot_loan_factor
=
412 100 - get_mail_conf_int2(name
, _DELIVERY_SLOT_DISCOUNT
,
413 var_delivery_slot_discount
, 0, 100);
414 transport
->min_slots
= get_mail_conf_int2(name
, _MIN_DELIVERY_SLOTS
,
415 var_min_delivery_slots
, 0, 0);
416 transport
->rcpt_unused
= get_mail_conf_int2(name
, _XPORT_RCPT_LIMIT
,
417 var_xport_rcpt_limit
, 0, 0);
418 transport
->rcpt_per_stack
= get_mail_conf_int2(name
, _STACK_RCPT_LIMIT
,
419 var_stack_rcpt_limit
, 0, 0);
420 transport
->refill_limit
= get_mail_conf_int2(name
, _XPORT_REFILL_LIMIT
,
421 var_xport_refill_limit
, 1, 0);
422 transport
->refill_delay
= get_mail_conf_time2(name
, _XPORT_REFILL_DELAY
,
423 var_xport_refill_delay
, 's', 1, 0);
425 transport
->queue_byname
= htable_create(0);
426 QMGR_LIST_INIT(transport
->queue_list
);
427 transport
->job_byname
= htable_create(0);
428 QMGR_LIST_INIT(transport
->job_list
);
429 QMGR_LIST_INIT(transport
->job_bytime
);
430 transport
->job_current
= 0;
431 transport
->job_next_unread
= 0;
432 transport
->candidate_cache
= 0;
433 transport
->candidate_cache_current
= 0;
434 transport
->candidate_cache_time
= (time_t) 0;
435 transport
->blocker_tag
= 1;
437 qmgr_feedback_init(&transport
->pos_feedback
, name
, _CONC_POS_FDBACK
,
438 VAR_CONC_POS_FDBACK
, var_conc_pos_feedback
);
439 qmgr_feedback_init(&transport
->neg_feedback
, name
, _CONC_NEG_FDBACK
,
440 VAR_CONC_NEG_FDBACK
, var_conc_neg_feedback
);
441 transport
->fail_cohort_limit
=
442 get_mail_conf_int2(name
, _CONC_COHORT_LIM
,
443 var_conc_cohort_limit
, 0, 0);
444 if (qmgr_transport_byname
== 0)
445 qmgr_transport_byname
= htable_create(10);
446 htable_enter(qmgr_transport_byname
, name
, (char *) transport
);
447 QMGR_LIST_PREPEND(qmgr_transport_list
, transport
, peers
);
449 msg_info("qmgr_transport_create: %s concurrency %d recipients %d",
450 transport
->name
, transport
->dest_concurrency_limit
,
451 transport
->recipient_limit
);
455 /* qmgr_transport_find - find transport instance */
457 QMGR_TRANSPORT
*qmgr_transport_find(const char *name
)
459 return ((QMGR_TRANSPORT
*) htable_find(qmgr_transport_byname
, name
));