7 /* queue manager data structures
28 #include <recipient_list.h>
32 * The queue manager is built around lots of mutually-referring structures.
33 * These typedefs save some typing.
35 typedef struct QMGR_TRANSPORT QMGR_TRANSPORT
;
36 typedef struct QMGR_QUEUE QMGR_QUEUE
;
37 typedef struct QMGR_ENTRY QMGR_ENTRY
;
38 typedef struct QMGR_MESSAGE QMGR_MESSAGE
;
39 typedef struct QMGR_JOB QMGR_JOB
;
40 typedef struct QMGR_PEER QMGR_PEER
;
41 typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST
;
42 typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST
;
43 typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST
;
44 typedef struct QMGR_JOB_LIST QMGR_JOB_LIST
;
45 typedef struct QMGR_PEER_LIST QMGR_PEER_LIST
;
46 typedef struct QMGR_SCAN QMGR_SCAN
;
47 typedef struct QMGR_FEEDBACK QMGR_FEEDBACK
;
50 * Hairy macros to update doubly-linked lists.
52 #define QMGR_LIST_ROTATE(head, object, peers) { \
53 head.next->peers.prev = head.prev; \
54 head.prev->peers.next = head.next; \
55 head.next = object->peers.next; \
56 head.next->peers.prev = 0; \
58 object->peers.next = 0; \
61 #define QMGR_LIST_UNLINK(head, type, object, peers) { \
62 type _next = object->peers.next; \
63 type _prev = object->peers.prev; \
64 if (_prev) _prev->peers.next = _next; \
65 else head.next = _next; \
66 if (_next) _next->peers.prev = _prev; \
67 else head.prev = _prev; \
68 object->peers.next = object->peers.prev = 0; \
71 #define QMGR_LIST_LINK(head, pred, object, succ, peers) { \
72 object->peers.prev = pred; \
73 object->peers.next = succ; \
74 if (pred) pred->peers.next = object; \
75 else head.next = object; \
76 if (succ) succ->peers.prev = object; \
77 else head.prev = object; \
80 #define QMGR_LIST_PREPEND(head, object, peers) { \
81 object->peers.next = head.next; \
82 object->peers.prev = 0; \
84 head.next->peers.prev = object; \
91 #define QMGR_LIST_APPEND(head, object, peers) { \
92 object->peers.prev = head.prev; \
93 object->peers.next = 0; \
95 head.prev->peers.next = object; \
102 #define QMGR_LIST_INIT(head) { \
108 * Transports are looked up by name (when we have resolved a message), or
109 * round-robin wise (when we want to distribute resources fairly).
111 struct QMGR_TRANSPORT_LIST
{
112 QMGR_TRANSPORT
*next
;
113 QMGR_TRANSPORT
*prev
;
116 extern struct HTABLE
*qmgr_transport_byname
; /* transport by name */
117 extern QMGR_TRANSPORT_LIST qmgr_transport_list
; /* transports, round robin */
120 * Delivery agents provide feedback, as hints that Postfix should expend
121 * more or fewer resources on a specific destination domain. The main.cf
122 * file specifies how feedback affects delivery concurrency: add/subtract a
123 * constant, a ratio of constants, or a constant divided by the delivery
124 * concurrency; and it specifies how much feedback must accumulate between
125 * concurrency updates.
127 struct QMGR_FEEDBACK
{
128 int hysteresis
; /* to pass, need to be this tall */
129 double base
; /* pre-computed from main.cf */
130 int index
; /* none, window, sqrt(window) */
133 #define QMGR_FEEDBACK_IDX_NONE 0 /* no window dependence */
134 #define QMGR_FEEDBACK_IDX_WIN 1 /* 1/window dependence */
136 #define QMGR_FEEDBACK_IDX_SQRT_WIN 2 /* 1/sqrt(window) dependence */
139 #ifdef QMGR_FEEDBACK_IDX_SQRT_WIN
143 extern void qmgr_feedback_init(QMGR_FEEDBACK
*, const char *, const char *, const char *, const char *);
145 #ifndef QMGR_FEEDBACK_IDX_SQRT_WIN
146 #define QMGR_FEEDBACK_VAL(fb, win) \
147 ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win))
149 #define QMGR_FEEDBACK_VAL(fb, win) \
150 ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \
151 (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \
152 (fb).base / sqrt(win))
156 * Each transport (local, smtp-out, bounce) can have one queue per next hop
157 * name. Queues are looked up by next hop name (when we have resolved a
158 * message destination), or round-robin wise (when we want to deliver
161 struct QMGR_QUEUE_LIST
{
166 struct QMGR_JOB_LIST
{
171 struct QMGR_TRANSPORT
{
172 int flags
; /* blocked, etc. */
173 int pending
; /* incomplete DA connections */
174 char *name
; /* transport name */
175 int dest_concurrency_limit
; /* concurrency per domain */
176 int init_dest_concurrency
; /* init. per-domain concurrency */
177 int recipient_limit
; /* recipients per transaction */
178 int rcpt_per_stack
; /* extra slots reserved for jobs put
179 * on the job stack */
180 int rcpt_unused
; /* available in-core recipient slots */
181 int refill_limit
; /* recipient batch size for message
183 int refill_delay
; /* delay before message refill */
184 int slot_cost
; /* cost of new preemption slot (# of
185 * selected entries) */
186 int slot_loan
; /* preemption boost offset and */
187 int slot_loan_factor
; /* factor, see qmgr_job_preempt() */
188 int min_slots
; /* when preemption can take effect at
190 struct HTABLE
*queue_byname
; /* queues indexed by domain */
191 QMGR_QUEUE_LIST queue_list
; /* queues, round robin order */
192 struct HTABLE
*job_byname
; /* jobs indexed by queue id */
193 QMGR_JOB_LIST job_list
; /* list of message jobs (1 per
194 * message) ordered by scheduler */
195 QMGR_JOB_LIST job_bytime
; /* jobs ordered by time since queued */
196 QMGR_JOB
*job_current
; /* keeps track of the current job */
197 QMGR_JOB
*job_next_unread
; /* next job with unread recipients */
198 QMGR_JOB
*candidate_cache
; /* cached result from
199 * qmgr_job_candidate() */
200 QMGR_JOB
*candidate_cache_current
; /* current job tied to the candidate */
201 time_t candidate_cache_time
; /* when candidate_cache was last
203 int blocker_tag
; /* for marking blocker jobs */
204 QMGR_TRANSPORT_LIST peers
; /* linkage */
205 DSN
*dsn
; /* why unavailable */
206 QMGR_FEEDBACK pos_feedback
; /* positive feedback control */
207 QMGR_FEEDBACK neg_feedback
; /* negative feedback control */
208 int fail_cohort_limit
; /* flow shutdown control */
209 int rate_delay
; /* suspend per delivery */
212 #define QMGR_TRANSPORT_STAT_DEAD (1<<1)
214 typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY
) (QMGR_TRANSPORT
*, VSTREAM
*);
215 extern QMGR_TRANSPORT
*qmgr_transport_select(void);
216 extern void qmgr_transport_alloc(QMGR_TRANSPORT
*, QMGR_TRANSPORT_ALLOC_NOTIFY
);
217 extern void qmgr_transport_throttle(QMGR_TRANSPORT
*, DSN
*);
218 extern void qmgr_transport_unthrottle(QMGR_TRANSPORT
*);
219 extern QMGR_TRANSPORT
*qmgr_transport_create(const char *);
220 extern QMGR_TRANSPORT
*qmgr_transport_find(const char *);
222 #define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
225 * Each next hop (e.g., a domain name) has its own queue of pending message
226 * transactions. The "todo" queue contains messages that are to be delivered
227 * to this next hop. When a message is elected for transmission, it is moved
228 * from the "todo" queue to the "busy" queue. Messages are taken from the
229 * "todo" queue in round-robin order.
231 struct QMGR_ENTRY_LIST
{
237 int dflags
; /* delivery request options */
238 time_t last_done
; /* last delivery completion */
239 char *name
; /* domain name or address */
240 char *nexthop
; /* domain name */
241 int todo_refcount
; /* queue entries (todo list) */
242 int busy_refcount
; /* queue entries (busy list) */
243 int window
; /* slow open algorithm */
244 double success
; /* accumulated positive feedback */
245 double failure
; /* accumulated negative feedback */
246 double fail_cohorts
; /* pseudo-cohort failure count */
247 QMGR_TRANSPORT
*transport
; /* transport linkage */
248 QMGR_ENTRY_LIST todo
; /* todo queue entries */
249 QMGR_ENTRY_LIST busy
; /* messages on the wire */
250 QMGR_QUEUE_LIST peers
; /* neighbor queues */
251 DSN
*dsn
; /* why unavailable */
252 time_t clog_time_to_warn
; /* time of last warning */
253 int blocker_tag
; /* tagged if blocks job list */
256 #define QMGR_QUEUE_TODO 1 /* waiting for service */
257 #define QMGR_QUEUE_BUSY 2 /* recipients on the wire */
259 extern int qmgr_queue_count
;
261 extern QMGR_QUEUE
*qmgr_queue_create(QMGR_TRANSPORT
*, const char *, const char *);
262 extern void qmgr_queue_done(QMGR_QUEUE
*);
263 extern void qmgr_queue_throttle(QMGR_QUEUE
*, DSN
*);
264 extern void qmgr_queue_unthrottle(QMGR_QUEUE
*);
265 extern QMGR_QUEUE
*qmgr_queue_find(QMGR_TRANSPORT
*, const char *);
266 extern void qmgr_queue_suspend(QMGR_QUEUE
*, int);
269 * Exclusive queue states. Originally there were only two: "throttled" and
270 * "not throttled". It was natural to encode these in the queue window size.
271 * After 10 years it's not practical to rip out all the working code and
272 * change representations, so we just clean up the names a little.
274 * Note: only the "ready" state can reach every state (including itself);
275 * non-ready states can reach only the "ready" state. Other transitions are
276 * forbidden, because they would result in dangling event handlers.
278 #define QMGR_QUEUE_STAT_THROTTLED 0 /* back-off timer */
279 #define QMGR_QUEUE_STAT_SUSPENDED -1 /* voluntary delay timer */
280 #define QMGR_QUEUE_STAT_SAVED -2 /* delayed cleanup timer */
281 #define QMGR_QUEUE_STAT_BAD -3 /* can't happen */
283 #define QMGR_QUEUE_READY(q) ((q)->window > 0)
284 #define QMGR_QUEUE_THROTTLED(q) ((q)->window == QMGR_QUEUE_STAT_THROTTLED)
285 #define QMGR_QUEUE_SUSPENDED(q) ((q)->window == QMGR_QUEUE_STAT_SUSPENDED)
286 #define QMGR_QUEUE_SAVED(q) ((q)->window == QMGR_QUEUE_STAT_SAVED)
287 #define QMGR_QUEUE_BAD(q) ((q)->window <= QMGR_QUEUE_STAT_BAD)
289 #define QMGR_QUEUE_STATUS(q) ( \
290 QMGR_QUEUE_READY(q) ? "ready" : \
291 QMGR_QUEUE_THROTTLED(q) ? "throttled" : \
292 QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \
293 QMGR_QUEUE_SAVED(q) ? "saved" : \
294 "invalid queue status" \
298 * Structure of one next-hop queue entry. In order to save some copying
299 * effort we allow multiple recipients per transaction.
302 VSTREAM
*stream
; /* delivery process */
303 QMGR_MESSAGE
*message
; /* message info */
304 RECIPIENT_LIST rcpt_list
; /* as many as it takes */
305 QMGR_QUEUE
*queue
; /* parent linkage */
306 QMGR_PEER
*peer
; /* parent linkage */
307 QMGR_ENTRY_LIST queue_peers
; /* per queue neighbor entries */
308 QMGR_ENTRY_LIST peer_peers
; /* per peer neighbor entries */
311 extern QMGR_ENTRY
*qmgr_entry_select(QMGR_PEER
*);
312 extern void qmgr_entry_unselect(QMGR_ENTRY
*);
313 extern void qmgr_entry_move_todo(QMGR_QUEUE
*, QMGR_ENTRY
*);
314 extern void qmgr_entry_done(QMGR_ENTRY
*, int);
315 extern QMGR_ENTRY
*qmgr_entry_create(QMGR_PEER
*, QMGR_MESSAGE
*);
318 * All common in-core information about a message is kept here. When all
319 * recipients have been tried the message file is linked to the "deferred"
320 * queue (some hosts not reachable), to the "bounce" queue (some recipients
321 * were rejected), and is then removed from the "active" queue.
323 struct QMGR_MESSAGE
{
324 int flags
; /* delivery problems */
325 int qflags
; /* queuing flags */
326 int tflags
; /* tracing flags */
327 long tflags_offset
; /* offset for killing */
328 int rflags
; /* queue file read flags */
329 VSTREAM
*fp
; /* open queue file or null */
330 int refcount
; /* queue entries */
331 int single_rcpt
; /* send one rcpt at a time */
332 struct timeval arrival_time
; /* start of receive transaction */
333 time_t create_time
; /* queue file create time */
334 struct timeval active_time
; /* time of entry into active queue */
335 time_t queued_time
; /* sanitized time when moved to the
337 time_t refill_time
; /* sanitized time of last message
339 long warn_offset
; /* warning bounce flag offset */
340 time_t warn_time
; /* time next warning to be sent */
341 long data_offset
; /* data seek offset */
342 char *queue_name
; /* queue name */
343 char *queue_id
; /* queue file */
344 char *encoding
; /* content encoding */
345 char *sender
; /* complete address */
346 char *dsn_envid
; /* DSN envelope ID */
347 int dsn_ret
; /* DSN headers/full */
348 char *verp_delims
; /* VERP delimiters */
349 char *filter_xport
; /* filtering transport */
350 char *inspect_xport
; /* inspecting transport */
351 char *redirect_addr
; /* info@spammer.tld */
352 long data_size
; /* data segment size */
353 long cont_length
; /* message content length */
354 long rcpt_offset
; /* more recipients here */
355 char *client_name
; /* client hostname */
356 char *client_addr
; /* client address */
357 char *client_port
; /* client port */
358 char *client_proto
; /* client protocol */
359 char *client_helo
; /* helo parameter */
360 char *sasl_method
; /* SASL method */
361 char *sasl_username
; /* SASL user name */
362 char *sasl_sender
; /* SASL sender */
363 char *rewrite_context
; /* address qualification */
364 RECIPIENT_LIST rcpt_list
; /* complete addresses */
365 int rcpt_count
; /* used recipient slots */
366 int rcpt_limit
; /* maximum read in-core */
367 int rcpt_unread
; /* # of recipients left in queue file */
368 QMGR_JOB_LIST job_list
; /* jobs delivering this message (1
373 * Flags 0-15 are reserved for qmgr_user.h.
375 #define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT (1<<16)
377 #define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1)
379 extern int qmgr_message_count
;
380 extern int qmgr_recipient_count
;
382 extern void qmgr_message_free(QMGR_MESSAGE
*);
383 extern void qmgr_message_update_warn(QMGR_MESSAGE
*);
384 extern void qmgr_message_kill_record(QMGR_MESSAGE
*, long);
385 extern QMGR_MESSAGE
*qmgr_message_alloc(const char *, const char *, int, mode_t
);
386 extern QMGR_MESSAGE
*qmgr_message_realloc(QMGR_MESSAGE
*);
388 #define QMGR_MSG_STATS(stats, message) \
389 MSG_STATS_INIT2(stats, \
390 incoming_arrival, message->arrival_time, \
391 active_arrival, message->active_time)
394 * Sometimes it's required to access the transport queues and entries on per
395 * message basis. That's what the QMGR_JOB structure is for - it groups all
396 * per message information within each transport using a list of QMGR_PEER
397 * structures. These structures in turn correspond with per message
398 * QMGR_QUEUE structure and list all per message QMGR_ENTRY structures.
400 struct QMGR_PEER_LIST
{
406 QMGR_MESSAGE
*message
; /* message delivered by this job */
407 QMGR_TRANSPORT
*transport
; /* transport this job belongs to */
408 QMGR_JOB_LIST message_peers
; /* per message neighbor linkage */
409 QMGR_JOB_LIST transport_peers
; /* per transport neighbor linkage */
410 QMGR_JOB_LIST time_peers
; /* by time neighbor linkage */
411 QMGR_JOB
*stack_parent
; /* stack parent */
412 QMGR_JOB_LIST stack_children
; /* all stack children */
413 QMGR_JOB_LIST stack_siblings
; /* stack children linkage */
414 int stack_level
; /* job stack nesting level (-1 means
415 * it's not on the lists at all) */
416 int blocker_tag
; /* tagged if blocks the job list */
417 struct HTABLE
*peer_byname
; /* message job peers, indexed by
419 QMGR_PEER_LIST peer_list
; /* list of message job peers */
420 int slots_used
; /* slots used during preemption */
421 int slots_available
; /* slots available for preemption (in
422 * multiples of slot_cost) */
423 int selected_entries
; /* # of entries selected for delivery
425 int read_entries
; /* # of entries read in-core so far */
426 int rcpt_count
; /* used recipient slots */
427 int rcpt_limit
; /* available recipient slots */
431 QMGR_JOB
*job
; /* job handling this peer */
432 QMGR_QUEUE
*queue
; /* queue corresponding with this peer */
433 int refcount
; /* peer entries */
434 QMGR_ENTRY_LIST entry_list
; /* todo message entries queued for
436 QMGR_PEER_LIST peers
; /* neighbor linkage */
439 extern QMGR_ENTRY
*qmgr_job_entry_select(QMGR_TRANSPORT
*);
440 extern QMGR_PEER
*qmgr_peer_select(QMGR_JOB
*);
441 extern void qmgr_job_blocker_update(QMGR_QUEUE
*);
443 extern QMGR_JOB
*qmgr_job_obtain(QMGR_MESSAGE
*, QMGR_TRANSPORT
*);
444 extern void qmgr_job_free(QMGR_JOB
*);
445 extern void qmgr_job_move_limits(QMGR_JOB
*);
447 extern QMGR_PEER
*qmgr_peer_create(QMGR_JOB
*, QMGR_QUEUE
*);
448 extern QMGR_PEER
*qmgr_peer_find(QMGR_JOB
*, QMGR_QUEUE
*);
449 extern QMGR_PEER
*qmgr_peer_obtain(QMGR_JOB
*, QMGR_QUEUE
*);
450 extern void qmgr_peer_free(QMGR_PEER
*);
455 extern void qmgr_defer_transport(QMGR_TRANSPORT
*, DSN
*);
456 extern void qmgr_defer_todo(QMGR_QUEUE
*, DSN
*);
457 extern void qmgr_defer_recipient(QMGR_MESSAGE
*, RECIPIENT
*, DSN
*);
462 extern void qmgr_bounce_recipient(QMGR_MESSAGE
*, RECIPIENT
*, DSN
*);
467 extern int qmgr_deliver_concurrency
;
468 extern void qmgr_deliver(QMGR_TRANSPORT
*, VSTREAM
*);
473 extern int qmgr_active_feed(QMGR_SCAN
*, const char *);
474 extern void qmgr_active_drain(void);
475 extern void qmgr_active_done(QMGR_MESSAGE
*);
480 extern void qmgr_move(const char *, const char *, time_t);
485 extern void qmgr_enable_all(void);
486 extern void qmgr_enable_transport(QMGR_TRANSPORT
*);
487 extern void qmgr_enable_queue(QMGR_QUEUE
*);
490 * Queue scan context.
493 char *queue
; /* queue name */
494 int flags
; /* private, this run */
495 int nflags
; /* private, next run */
496 struct SCAN_DIR
*handle
; /* scan */
500 * Flags that control queue scans or destination selection. These are
501 * similar to the QMGR_REQ_XXX request codes.
503 #define QMGR_SCAN_START (1<<0) /* start now/restart when done */
504 #define QMGR_SCAN_ALL (1<<1) /* all queue file time stamps */
505 #define QMGR_FLUSH_ONCE (1<<2) /* unthrottle once */
506 #define QMGR_FLUSH_DFXP (1<<3) /* override defer_transports */
507 #define QMGR_FLUSH_EACH (1<<4) /* unthrottle per message */
512 extern QMGR_SCAN
*qmgr_scan_create(const char *);
513 extern void qmgr_scan_request(QMGR_SCAN
*, int);
514 extern char *qmgr_scan_next(QMGR_SCAN
*);
519 extern QMGR_TRANSPORT
*qmgr_error_transport(const char *);
520 extern QMGR_QUEUE
*qmgr_error_queue(const char *, DSN
*);
521 extern char *qmgr_error_nexthop(DSN
*);
526 /* The Secure Mailer license must be distributed with this software.
529 /* IBM T.J. Watson Research
531 /* Yorktown Heights, NY 10598, USA
533 /* Preemptive scheduler enhancements:
536 /* 155 00, Prague, Czech Republic