7 /* deliver one per-site queue entry to that site
11 /* int qmgr_deliver_concurrency;
13 /* int qmgr_deliver(transport, fp)
14 /* QMGR_TRANSPORT *transport;
17 /* This module implements the client side of the `queue manager
18 /* to delivery agent' protocol. The queue manager uses
19 /* asynchronous I/O so that it can drive multiple delivery
20 /* agents in parallel. Depending on the outcome of a delivery
21 /* attempt, the status of messages, queues and transports is
24 /* qmgr_deliver_concurrency is a global counter that says how
25 /* many delivery processes are in use. This can be used, for
26 /* example, to control the size of the `active' message queue.
28 /* qmgr_deliver() executes when a delivery process announces its
29 /* availability for the named transport. It arranges for delivery
30 /* of a suitable queue entry. The \fIfp\fR argument specifies a
31 /* stream that is connected to the delivery process, or a null
32 /* pointer if the transport accepts no connection. Upon completion
33 /* of delivery (successful or not), the stream is closed, so that the
34 /* delivery process is released.
39 /* The Secure Mailer license must be distributed with this software.
42 /* IBM T.J. Watson Research
44 /* Yorktown Heights, NY 10598, USA
53 /* Utility library. */
58 #include <vstring_vstream.h>
61 #include <stringops.h>
66 #include <mail_queue.h>
67 #include <mail_proto.h>
68 #include <recipient_list.h>
69 #include <mail_params.h>
70 #include <deliver_request.h>
71 #include <verp_sender.h>
75 #include <rcpt_print.h>
77 /* Application-specific. */
81 int qmgr_deliver_concurrency
;
84 * Message delivery status codes.
86 #define DELIVER_STAT_OK 0 /* all recipients delivered */
87 #define DELIVER_STAT_DEFER 1 /* try some recipients later */
88 #define DELIVER_STAT_CRASH 2 /* mailer internal problem */
90 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
92 static int qmgr_deliver_initial_reply(VSTREAM
*stream
)
96 if (peekfd(vstream_fileno(stream
)) < 0) {
97 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream
));
98 return (DELIVER_STAT_CRASH
);
99 } else if (attr_scan(stream
, ATTR_FLAG_STRICT
,
100 ATTR_TYPE_INT
, MAIL_ATTR_STATUS
, &stat
,
101 ATTR_TYPE_END
) != 1) {
102 msg_warn("%s: malformed response", VSTREAM_PATH(stream
));
103 return (DELIVER_STAT_CRASH
);
105 return (stat
? DELIVER_STAT_DEFER
: 0);
109 /* qmgr_deliver_final_reply - retrieve final delivery process response */
111 static int qmgr_deliver_final_reply(VSTREAM
*stream
, DSN_BUF
*dsb
)
115 if (peekfd(vstream_fileno(stream
)) < 0) {
116 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream
));
117 return (DELIVER_STAT_CRASH
);
118 } else if (attr_scan(stream
, ATTR_FLAG_STRICT
,
119 ATTR_TYPE_FUNC
, dsb_scan
, (void *) dsb
,
120 ATTR_TYPE_INT
, MAIL_ATTR_STATUS
, &stat
,
121 ATTR_TYPE_END
) != 2) {
122 msg_warn("%s: malformed response", VSTREAM_PATH(stream
));
123 return (DELIVER_STAT_CRASH
);
125 return (stat
? DELIVER_STAT_DEFER
: 0);
129 /* qmgr_deliver_send_request - send delivery request to delivery process */
131 static int qmgr_deliver_send_request(QMGR_ENTRY
*entry
, VSTREAM
*stream
)
133 RECIPIENT_LIST list
= entry
->rcpt_list
;
134 RECIPIENT
*recipient
;
135 QMGR_MESSAGE
*message
= entry
->message
;
136 VSTRING
*sender_buf
= 0;
142 * If variable envelope return path is requested, change prefix+@origin
143 * into prefix+user=domain@origin. Note that with VERP there is only one
144 * recipient per delivery.
146 if (message
->verp_delims
== 0) {
147 sender
= message
->sender
;
149 sender_buf
= vstring_alloc(100);
150 verp_sender(sender_buf
, message
->verp_delims
,
151 message
->sender
, list
.info
);
152 sender
= vstring_str(sender_buf
);
155 flags
= message
->tflags
156 | entry
->queue
->dflags
157 | (message
->inspect_xport
? DEL_REQ_FLAG_BOUNCE
: DEL_REQ_FLAG_DEFLT
);
158 (void) QMGR_MSG_STATS(&stats
, message
);
159 attr_print(stream
, ATTR_FLAG_NONE
,
160 ATTR_TYPE_INT
, MAIL_ATTR_FLAGS
, flags
,
161 ATTR_TYPE_STR
, MAIL_ATTR_QUEUE
, message
->queue_name
,
162 ATTR_TYPE_STR
, MAIL_ATTR_QUEUEID
, message
->queue_id
,
163 ATTR_TYPE_LONG
, MAIL_ATTR_OFFSET
, message
->data_offset
,
164 ATTR_TYPE_LONG
, MAIL_ATTR_SIZE
, message
->cont_length
,
165 ATTR_TYPE_STR
, MAIL_ATTR_NEXTHOP
, entry
->queue
->nexthop
,
166 ATTR_TYPE_STR
, MAIL_ATTR_ENCODING
, message
->encoding
,
167 ATTR_TYPE_STR
, MAIL_ATTR_SENDER
, sender
,
168 ATTR_TYPE_STR
, MAIL_ATTR_DSN_ENVID
, message
->dsn_envid
,
169 ATTR_TYPE_INT
, MAIL_ATTR_DSN_RET
, message
->dsn_ret
,
170 ATTR_TYPE_FUNC
, msg_stats_print
, (void *) &stats
,
171 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
172 ATTR_TYPE_STR
, MAIL_ATTR_LOG_CLIENT_NAME
, message
->client_name
,
173 ATTR_TYPE_STR
, MAIL_ATTR_LOG_CLIENT_ADDR
, message
->client_addr
,
174 ATTR_TYPE_STR
, MAIL_ATTR_LOG_CLIENT_PORT
, message
->client_port
,
175 ATTR_TYPE_STR
, MAIL_ATTR_LOG_PROTO_NAME
, message
->client_proto
,
176 ATTR_TYPE_STR
, MAIL_ATTR_LOG_HELO_NAME
, message
->client_helo
,
177 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
178 ATTR_TYPE_STR
, MAIL_ATTR_SASL_METHOD
, message
->sasl_method
,
179 ATTR_TYPE_STR
, MAIL_ATTR_SASL_USERNAME
, message
->sasl_username
,
180 ATTR_TYPE_STR
, MAIL_ATTR_SASL_SENDER
, message
->sasl_sender
,
181 /* XXX Ditto if we want to pass TLS certificate info. */
182 ATTR_TYPE_STR
, MAIL_ATTR_RWR_CONTEXT
, message
->rewrite_context
,
183 ATTR_TYPE_INT
, MAIL_ATTR_RCPT_COUNT
, list
.len
,
186 vstring_free(sender_buf
);
187 for (recipient
= list
.info
; recipient
< list
.info
+ list
.len
; recipient
++)
188 attr_print(stream
, ATTR_FLAG_NONE
,
189 ATTR_TYPE_FUNC
, rcpt_print
, (void *) recipient
,
191 if (vstream_fflush(stream
) != 0) {
192 msg_warn("write to process (%s): %m", entry
->queue
->transport
->name
);
196 msg_info("qmgr_deliver: site `%s'", entry
->queue
->name
);
201 /* qmgr_deliver_abort - transport response watchdog */
203 static void qmgr_deliver_abort(int unused_event
, char *context
)
205 QMGR_ENTRY
*entry
= (QMGR_ENTRY
*) context
;
206 QMGR_QUEUE
*queue
= entry
->queue
;
207 QMGR_TRANSPORT
*transport
= queue
->transport
;
208 QMGR_MESSAGE
*message
= entry
->message
;
210 msg_fatal("%s: timeout receiving delivery status from transport: %s",
211 message
->queue_id
, transport
->name
);
214 /* qmgr_deliver_update - process delivery status report */
216 static void qmgr_deliver_update(int unused_event
, char *context
)
218 QMGR_ENTRY
*entry
= (QMGR_ENTRY
*) context
;
219 QMGR_QUEUE
*queue
= entry
->queue
;
220 QMGR_TRANSPORT
*transport
= queue
->transport
;
221 QMGR_MESSAGE
*message
= entry
->message
;
226 * Release the delivery agent from a "hot" queue entry.
228 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
229 event_disable_readwrite(vstream_fileno(entry->stream)); \
230 (void) vstream_fclose(entry->stream); \
232 qmgr_deliver_concurrency--; \
239 * The message transport has responded. Stop the watchdog timer.
241 event_cancel_timer(qmgr_deliver_abort
, context
);
244 * Retrieve the delivery agent status report. The numerical status code
245 * indicates if delivery should be tried again. The reason text is sent
246 * only when a site should be avoided for a while, so that the queue
247 * manager can log why it does not even try to schedule delivery to the
248 * affected recipients.
250 status
= qmgr_deliver_final_reply(entry
->stream
, dsb
);
253 * The mail delivery process failed for some reason (although delivery
254 * may have been successful). Back off with this transport type for a
255 * while. Dispose of queue entries for this transport that await
256 * selection (the todo lists). Stay away from queue entries that have
257 * been selected (the busy lists), or we would have dangling pointers.
258 * The queue itself won't go away before we dispose of the current queue
261 if (status
== DELIVER_STAT_CRASH
) {
262 message
->flags
|= DELIVER_STAT_DEFER
;
264 whatsup
= concatenate("unknown ", transport
->name
,
265 " mail transport error", (char *) 0);
266 qmgr_transport_throttle(transport
,
267 DSN_SIMPLE(&dsb
->dsn
, "4.3.0", whatsup
));
270 qmgr_transport_throttle(transport
,
271 DSN_SIMPLE(&dsb
->dsn
, "4.3.0",
272 "unknown mail transport error"));
274 msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
278 * Assume the worst and write a defer logfile record for each
279 * recipient. This omission was already present in the first queue
280 * manager implementation of 199703, and was fixed 200511.
282 * To avoid the synchronous qmgr_defer_recipient() operation for each
283 * recipient of this queue entry, release the delivery process and
284 * move the entry back to the todo queue. Let qmgr_defer_transport()
285 * log the recipient asynchronously if possible, and get out of here.
286 * Note: if asynchronous logging is not possible,
287 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
288 * the entry becomes a dangling pointer.
290 QMGR_DELIVER_RELEASE_AGENT(entry
);
291 qmgr_entry_unselect(queue
, entry
);
292 qmgr_defer_transport(transport
, &dsb
->dsn
);
297 * This message must be tried again.
299 * If we have a problem talking to this site, back off with this site for a
300 * while; dispose of queue entries for this site that await selection
301 * (the todo list); stay away from queue entries that have been selected
302 * (the busy list), or we would have dangling pointers. The queue itself
303 * won't go away before we dispose of the current queue entry.
305 * XXX Caution: DSN_COPY() will panic on empty status or reason.
307 #define SUSPENDED "delivery temporarily suspended: "
309 if (status
== DELIVER_STAT_DEFER
) {
310 message
->flags
|= DELIVER_STAT_DEFER
;
311 if (VSTRING_LEN(dsb
->status
)) {
312 /* Sanitize the DSN status/reason from the delivery agent. */
313 if (!dsn_valid(vstring_str(dsb
->status
)))
314 vstring_strcpy(dsb
->status
, "4.0.0");
315 if (VSTRING_LEN(dsb
->reason
) == 0)
316 vstring_strcpy(dsb
->reason
, "unknown error");
317 vstring_prepend(dsb
->reason
, SUSPENDED
, sizeof(SUSPENDED
) - 1);
318 if (QMGR_QUEUE_READY(queue
)) {
319 qmgr_queue_throttle(queue
, DSN_FROM_DSN_BUF(dsb
));
320 if (QMGR_QUEUE_THROTTLED(queue
))
321 qmgr_defer_todo(queue
, &dsb
->dsn
);
327 * No problems detected. Mark the transport and queue as alive. The queue
328 * itself won't go away before we dispose of the current queue entry.
330 if (status
!= DELIVER_STAT_CRASH
&& VSTRING_LEN(dsb
->reason
) == 0) {
331 qmgr_transport_unthrottle(transport
);
332 qmgr_queue_unthrottle(queue
);
336 * Release the delivery process, and give some other queue entry a chance
337 * to be delivered. When all recipients for a message have been tried,
338 * decide what to do next with this message: defer, bounce, delete.
340 QMGR_DELIVER_RELEASE_AGENT(entry
);
341 qmgr_entry_done(entry
, QMGR_QUEUE_BUSY
);
344 /* qmgr_deliver - deliver one per-site queue entry */
346 void qmgr_deliver(QMGR_TRANSPORT
*transport
, VSTREAM
*stream
)
353 * Find out if this delivery process is really available. Once elected,
354 * the delivery process is supposed to express its happiness. If there is
355 * a problem, wipe the pending deliveries for this transport. This
356 * routine runs in response to an external event, so it does not run
357 * while some other queue manipulation is happening.
359 if (stream
== 0 || qmgr_deliver_initial_reply(stream
) != 0) {
361 whatsup
= concatenate(transport
->name
,
362 " mail transport unavailable", (char *) 0);
363 qmgr_transport_throttle(transport
,
364 DSN_SIMPLE(&dsn
, "4.3.0", whatsup
));
367 qmgr_transport_throttle(transport
,
368 DSN_SIMPLE(&dsn
, "4.3.0",
369 "mail transport unavailable"));
371 qmgr_defer_transport(transport
, &dsn
);
373 (void) vstream_fclose(stream
);
378 * Find a suitable queue entry. Things may have changed since this
379 * transport was allocated. If no suitable entry is found,
380 * unceremoniously disconnect from the delivery process. The delivery
381 * agent request reading routine is prepared for the queue manager to
382 * change its mind for no apparent reason.
384 if ((queue
= qmgr_queue_select(transport
)) == 0
385 || (entry
= qmgr_entry_select(queue
)) == 0) {
386 (void) vstream_fclose(stream
);
391 * Send the queue file info and recipient info to the delivery process.
392 * If there is a problem, wipe the pending deliveries for this transport.
393 * This routine runs in response to an external event, so it does not run
394 * while some other queue manipulation is happening.
396 if (qmgr_deliver_send_request(entry
, stream
) < 0) {
397 qmgr_entry_unselect(queue
, entry
);
399 whatsup
= concatenate(transport
->name
,
400 " mail transport unavailable", (char *) 0);
401 qmgr_transport_throttle(transport
,
402 DSN_SIMPLE(&dsn
, "4.3.0", whatsup
));
405 qmgr_transport_throttle(transport
,
406 DSN_SIMPLE(&dsn
, "4.3.0",
407 "mail transport unavailable"));
409 qmgr_defer_transport(transport
, &dsn
);
410 /* warning: entry and queue may be dangling pointers here */
411 (void) vstream_fclose(stream
);
416 * If we get this far, go wait for the delivery status report.
418 qmgr_deliver_concurrency
++;
419 entry
->stream
= stream
;
420 event_enable_read(vstream_fileno(stream
),
421 qmgr_deliver_update
, (char *) entry
);
424 * Guard against broken systems.
426 event_request_timer(qmgr_deliver_abort
, (char *) entry
, var_daemon_timeout
);