7 /* queue manager data structures
28 #include <recipient_list.h>
32 * The queue manager is built around lots of mutually-referring structures.
33 * These typedefs save some typing.
35 typedef struct QMGR_TRANSPORT QMGR_TRANSPORT
;
36 typedef struct QMGR_QUEUE QMGR_QUEUE
;
37 typedef struct QMGR_ENTRY QMGR_ENTRY
;
38 typedef struct QMGR_MESSAGE QMGR_MESSAGE
;
39 typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST
;
40 typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST
;
41 typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST
;
42 typedef struct QMGR_SCAN QMGR_SCAN
;
43 typedef struct QMGR_FEEDBACK QMGR_FEEDBACK
;
46 * Hairy macros to update doubly-linked lists.
48 #define QMGR_LIST_ROTATE(head, object) { \
49 head.next->peers.prev = head.prev; \
50 head.prev->peers.next = head.next; \
51 head.next = object->peers.next; \
52 if (object->peers.next) \
53 head.next->peers.prev = 0; \
55 object->peers.next = 0; \
58 #define QMGR_LIST_UNLINK(head, type, object) { \
59 type next = object->peers.next; \
60 type prev = object->peers.prev; \
61 if (prev) prev->peers.next = next; \
62 else head.next = next; \
63 if (next) next->peers.prev = prev; \
64 else head.prev = prev; \
65 object->peers.next = object->peers.prev = 0; \
68 #define QMGR_LIST_APPEND(head, object) { \
69 object->peers.next = head.next; \
70 object->peers.prev = 0; \
72 head.next->peers.prev = object; \
79 #define QMGR_LIST_PREPEND(head, object) { \
80 object->peers.prev = head.prev; \
81 object->peers.next = 0; \
83 head.prev->peers.next = object; \
90 #define QMGR_LIST_INIT(head) { \
96 * Transports are looked up by name (when we have resolved a message), or
97 * round-robin wise (when we want to distribute resources fairly).
99 struct QMGR_TRANSPORT_LIST
{
100 QMGR_TRANSPORT
*next
;
101 QMGR_TRANSPORT
*prev
;
104 extern struct HTABLE
*qmgr_transport_byname
; /* transport by name */
105 extern QMGR_TRANSPORT_LIST qmgr_transport_list
; /* transports, round robin */
108 * Delivery agents provide feedback, as hints that Postfix should expend
109 * more or fewer resources on a specific destination domain. The main.cf
110 * file specifies how feedback affects delivery concurrency: add/subtract a
111 * constant, a ratio of constants, or a constant divided by the delivery
112 * concurrency; and it specifies how much feedback must accumulate between
113 * concurrency updates.
115 struct QMGR_FEEDBACK
{
116 int hysteresis
; /* to pass, need to be this tall */
117 double base
; /* pre-computed from main.cf */
118 int index
; /* none, window, sqrt(window) */
121 #define QMGR_FEEDBACK_IDX_NONE 0 /* no window dependence */
122 #define QMGR_FEEDBACK_IDX_WIN 1 /* 1/window dependence */
124 #define QMGR_FEEDBACK_IDX_SQRT_WIN 2 /* 1/sqrt(window) dependence */
127 #ifdef QMGR_FEEDBACK_IDX_SQRT_WIN
131 extern void qmgr_feedback_init(QMGR_FEEDBACK
*, const char *, const char *, const char *, const char *);
133 #ifndef QMGR_FEEDBACK_IDX_SQRT_WIN
134 #define QMGR_FEEDBACK_VAL(fb, win) \
135 ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win))
137 #define QMGR_FEEDBACK_VAL(fb, win) \
138 ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \
139 (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \
140 (fb).base / sqrt(win))
144 * Each transport (local, smtp-out, bounce) can have one queue per next hop
145 * name. Queues are looked up by next hop name (when we have resolved a
146 * message destination), or round-robin wise (when we want to deliver
149 struct QMGR_QUEUE_LIST
{
154 struct QMGR_TRANSPORT
{
155 int flags
; /* blocked, etc. */
156 int pending
; /* incomplete DA connections */
157 char *name
; /* transport name */
158 int dest_concurrency_limit
; /* concurrency per domain */
159 int init_dest_concurrency
; /* init. per-domain concurrency */
160 int recipient_limit
; /* recipients per transaction */
161 struct HTABLE
*queue_byname
; /* queues indexed by domain */
162 QMGR_QUEUE_LIST queue_list
; /* queues, round robin order */
163 QMGR_TRANSPORT_LIST peers
; /* linkage */
164 DSN
*dsn
; /* why unavailable */
165 QMGR_FEEDBACK pos_feedback
; /* positive feedback control */
166 QMGR_FEEDBACK neg_feedback
; /* negative feedback control */
167 int fail_cohort_limit
; /* flow shutdown control */
168 int rate_delay
; /* suspend per delivery */
171 #define QMGR_TRANSPORT_STAT_DEAD (1<<1)
173 typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY
) (QMGR_TRANSPORT
*, VSTREAM
*);
174 extern QMGR_TRANSPORT
*qmgr_transport_select(void);
175 extern void qmgr_transport_alloc(QMGR_TRANSPORT
*, QMGR_TRANSPORT_ALLOC_NOTIFY
);
176 extern void qmgr_transport_throttle(QMGR_TRANSPORT
*, DSN
*);
177 extern void qmgr_transport_unthrottle(QMGR_TRANSPORT
*);
178 extern QMGR_TRANSPORT
*qmgr_transport_create(const char *);
179 extern QMGR_TRANSPORT
*qmgr_transport_find(const char *);
181 #define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
184 * Each next hop (e.g., a domain name) has its own queue of pending message
185 * transactions. The "todo" queue contains messages that are to be delivered
186 * to this next hop. When a message is elected for transmission, it is moved
187 * from the "todo" queue to the "busy" queue. Messages are taken from the
188 * "todo" queue in sequence. An initial destination delivery concurrency > 1
189 * ensures that one problematic message will not block all other traffic to
192 struct QMGR_ENTRY_LIST
{
198 int dflags
; /* delivery request options */
199 time_t last_done
; /* last delivery completion */
200 char *name
; /* domain name or address */
201 char *nexthop
; /* domain name */
202 int todo_refcount
; /* queue entries (todo list) */
203 int busy_refcount
; /* queue entries (busy list) */
204 int window
; /* slow open algorithm */
205 double success
; /* accumulated positive feedback */
206 double failure
; /* accumulated negative feedback */
207 double fail_cohorts
; /* pseudo-cohort failure count */
208 QMGR_TRANSPORT
*transport
; /* transport linkage */
209 QMGR_ENTRY_LIST todo
; /* todo queue entries */
210 QMGR_ENTRY_LIST busy
; /* messages on the wire */
211 QMGR_QUEUE_LIST peers
; /* neighbor queues */
212 DSN
*dsn
; /* why unavailable */
213 time_t clog_time_to_warn
; /* time of next warning */
216 #define QMGR_QUEUE_TODO 1 /* waiting for service */
217 #define QMGR_QUEUE_BUSY 2 /* recipients on the wire */
219 extern int qmgr_queue_count
;
221 extern QMGR_QUEUE
*qmgr_queue_create(QMGR_TRANSPORT
*, const char *, const char *);
222 extern QMGR_QUEUE
*qmgr_queue_select(QMGR_TRANSPORT
*);
223 extern void qmgr_queue_done(QMGR_QUEUE
*);
224 extern void qmgr_queue_throttle(QMGR_QUEUE
*, DSN
*);
225 extern void qmgr_queue_unthrottle(QMGR_QUEUE
*);
226 extern QMGR_QUEUE
*qmgr_queue_find(QMGR_TRANSPORT
*, const char *);
227 extern void qmgr_queue_suspend(QMGR_QUEUE
*, int);
230 * Exclusive queue states. Originally there were only two: "throttled" and
231 * "not throttled". It was natural to encode these in the queue window size.
232 * After 10 years it's not practical to rip out all the working code and
233 * change representations, so we just clean up the names a little.
235 * Note: only the "ready" state can reach every state (including itself);
236 * non-ready states can reach only the "ready" state. Other transitions are
237 * forbidden, because they would result in dangling event handlers.
239 #define QMGR_QUEUE_STAT_THROTTLED 0 /* back-off timer */
240 #define QMGR_QUEUE_STAT_SUSPENDED -1 /* voluntary delay timer */
241 #define QMGR_QUEUE_STAT_SAVED -2 /* delayed cleanup timer */
242 #define QMGR_QUEUE_STAT_BAD -3 /* can't happen */
244 #define QMGR_QUEUE_READY(q) ((q)->window > 0)
245 #define QMGR_QUEUE_THROTTLED(q) ((q)->window == QMGR_QUEUE_STAT_THROTTLED)
246 #define QMGR_QUEUE_SUSPENDED(q) ((q)->window == QMGR_QUEUE_STAT_SUSPENDED)
247 #define QMGR_QUEUE_SAVED(q) ((q)->window == QMGR_QUEUE_STAT_SAVED)
248 #define QMGR_QUEUE_BAD(q) ((q)->window <= QMGR_QUEUE_STAT_BAD)
250 #define QMGR_QUEUE_STATUS(q) ( \
251 QMGR_QUEUE_READY(q) ? "ready" : \
252 QMGR_QUEUE_THROTTLED(q) ? "throttled" : \
253 QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \
254 QMGR_QUEUE_SAVED(q) ? "saved" : \
255 "invalid queue status" \
259 * Structure of one next-hop queue entry. In order to save some copying
260 * effort we allow multiple recipients per transaction.
263 VSTREAM
*stream
; /* delivery process */
264 QMGR_MESSAGE
*message
; /* message info */
265 RECIPIENT_LIST rcpt_list
; /* as many as it takes */
266 QMGR_QUEUE
*queue
; /* parent linkage */
267 QMGR_ENTRY_LIST peers
; /* neighbor entries */
270 extern QMGR_ENTRY
*qmgr_entry_select(QMGR_QUEUE
*);
271 extern void qmgr_entry_unselect(QMGR_QUEUE
*, QMGR_ENTRY
*);
272 extern void qmgr_entry_move_todo(QMGR_QUEUE
*, QMGR_ENTRY
*);
273 extern void qmgr_entry_done(QMGR_ENTRY
*, int);
274 extern QMGR_ENTRY
*qmgr_entry_create(QMGR_QUEUE
*, QMGR_MESSAGE
*);
277 * All common in-core information about a message is kept here. When all
278 * recipients have been tried the message file is linked to the "deferred"
279 * queue (some hosts not reachable), to the "bounce" queue (some recipients
280 * were rejected), and is then removed from the "active" queue.
282 struct QMGR_MESSAGE
{
283 int flags
; /* delivery problems */
284 int qflags
; /* queuing flags */
285 int tflags
; /* tracing flags */
286 long tflags_offset
; /* offset for killing */
287 int rflags
; /* queue file read flags */
288 VSTREAM
*fp
; /* open queue file or null */
289 int refcount
; /* queue entries */
290 int single_rcpt
; /* send one rcpt at a time */
291 struct timeval arrival_time
; /* start of receive transaction */
292 time_t create_time
; /* queue file create time */
293 struct timeval active_time
; /* time of entry into active queue */
294 long warn_offset
; /* warning bounce flag offset */
295 time_t warn_time
; /* time next warning to be sent */
296 long data_offset
; /* data seek offset */
297 char *queue_name
; /* queue name */
298 char *queue_id
; /* queue file */
299 char *encoding
; /* content encoding */
300 char *sender
; /* complete address */
301 char *dsn_envid
; /* DSN envelope ID */
302 int dsn_ret
; /* DSN headers/full */
303 char *verp_delims
; /* VERP delimiters */
304 char *filter_xport
; /* filtering transport */
305 char *inspect_xport
; /* inspecting transport */
306 char *redirect_addr
; /* info@spammer.tld */
307 long data_size
; /* data segment size */
308 long cont_length
; /* message content length */
309 long rcpt_offset
; /* more recipients here */
310 char *client_name
; /* client hostname */
311 char *client_addr
; /* client address */
312 char *client_port
; /* client port */
313 char *client_proto
; /* client protocol */
314 char *client_helo
; /* helo parameter */
315 char *sasl_method
; /* SASL method */
316 char *sasl_username
; /* SASL user name */
317 char *sasl_sender
; /* SASL sender */
318 char *rewrite_context
; /* address qualification */
319 RECIPIENT_LIST rcpt_list
; /* complete addresses */
323 * Flags 0-15 are reserved for qmgr_user.h.
325 #define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT (1<<16)
327 #define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1)
329 extern int qmgr_message_count
;
330 extern int qmgr_recipient_count
;
332 extern void qmgr_message_free(QMGR_MESSAGE
*);
333 extern void qmgr_message_update_warn(QMGR_MESSAGE
*);
334 extern void qmgr_message_kill_record(QMGR_MESSAGE
*, long);
335 extern QMGR_MESSAGE
*qmgr_message_alloc(const char *, const char *, int, mode_t
);
336 extern QMGR_MESSAGE
*qmgr_message_realloc(QMGR_MESSAGE
*);
338 #define QMGR_MSG_STATS(stats, message) \
339 MSG_STATS_INIT2(stats, \
340 incoming_arrival, message->arrival_time, \
341 active_arrival, message->active_time)
346 extern void qmgr_defer_transport(QMGR_TRANSPORT
*, DSN
*);
347 extern void qmgr_defer_todo(QMGR_QUEUE
*, DSN
*);
348 extern void qmgr_defer_recipient(QMGR_MESSAGE
*, RECIPIENT
*, DSN
*);
353 extern void qmgr_bounce_recipient(QMGR_MESSAGE
*, RECIPIENT
*, DSN
*);
358 extern int qmgr_deliver_concurrency
;
359 extern void qmgr_deliver(QMGR_TRANSPORT
*, VSTREAM
*);
364 extern int qmgr_active_feed(QMGR_SCAN
*, const char *);
365 extern void qmgr_active_drain(void);
366 extern void qmgr_active_done(QMGR_MESSAGE
*);
371 extern void qmgr_move(const char *, const char *, time_t);
376 extern void qmgr_enable_all(void);
377 extern void qmgr_enable_transport(QMGR_TRANSPORT
*);
378 extern void qmgr_enable_queue(QMGR_QUEUE
*);
381 * Queue scan context.
384 char *queue
; /* queue name */
385 int flags
; /* private, this run */
386 int nflags
; /* private, next run */
387 struct SCAN_DIR
*handle
; /* scan */
391 * Flags that control queue scans or destination selection. These are
392 * similar to the QMGR_REQ_XXX request codes.
394 #define QMGR_SCAN_START (1<<0) /* start now/restart when done */
395 #define QMGR_SCAN_ALL (1<<1) /* all queue file time stamps */
396 #define QMGR_FLUSH_ONCE (1<<2) /* unthrottle once */
397 #define QMGR_FLUSH_DFXP (1<<3) /* override defer_transports */
398 #define QMGR_FLUSH_EACH (1<<4) /* unthrottle per message */
403 extern QMGR_SCAN
*qmgr_scan_create(const char *);
404 extern void qmgr_scan_request(QMGR_SCAN
*, int);
405 extern char *qmgr_scan_next(QMGR_SCAN
*);
410 extern QMGR_TRANSPORT
*qmgr_error_transport(const char *);
411 extern QMGR_QUEUE
*qmgr_error_queue(const char *, DSN
*);
412 extern char *qmgr_error_nexthop(DSN
*);
417 /* The Secure Mailer license must be distributed with this software.
420 /* IBM T.J. Watson Research
422 /* Yorktown Heights, NY 10598, USA