Expand PMF_FN_* macros.
[netbsd-mini2440.git] / external / ibm-public / postfix / dist / src / qmgr / qmgr_deliver.c
blobeb700e961389ebb0dea1c2e9734fd44343f12069
1 /* $NetBSD$ */
3 /*++
4 /* NAME
5 /* qmgr_deliver 3
6 /* SUMMARY
7 /* deliver one per-site queue entry to that site
8 /* SYNOPSIS
9 /* #include "qmgr.h"
11 /* int qmgr_deliver_concurrency;
13 /* int qmgr_deliver(transport, fp)
14 /* QMGR_TRANSPORT *transport;
15 /* VSTREAM *fp;
16 /* DESCRIPTION
17 /* This module implements the client side of the `queue manager
18 /* to delivery agent' protocol. The queue manager uses
19 /* asynchronous I/O so that it can drive multiple delivery
20 /* agents in parallel. Depending on the outcome of a delivery
21 /* attempt, the status of messages, queues and transports is
22 /* updated.
24 /* qmgr_deliver_concurrency is a global counter that says how
25 /* many delivery processes are in use. This can be used, for
26 /* example, to control the size of the `active' message queue.
28 /* qmgr_deliver() executes when a delivery process announces its
29 /* availability for the named transport. It arranges for delivery
30 /* of a suitable queue entry. The \fIfp\fR argument specifies a
31 /* stream that is connected to a delivery process, or a null
32 /* pointer if the transport accepts no connection. Upon completion
33 /* of delivery (successful or not), the stream is closed, so that the
34 /* delivery process is released.
35 /* DIAGNOSTICS
36 /* LICENSE
37 /* .ad
38 /* .fi
39 /* The Secure Mailer license must be distributed with this software.
40 /* AUTHOR(S)
41 /* Wietse Venema
42 /* IBM T.J. Watson Research
43 /* P.O. Box 704
44 /* Yorktown Heights, NY 10598, USA
46 /* Preemptive scheduler enhancements:
47 /* Patrik Rak
48 /* Modra 6
49 /* 155 00, Prague, Czech Republic
50 /*--*/
52 /* System library. */
54 #include <sys_defs.h>
55 #include <time.h>
56 #include <string.h>
58 /* Utility library. */
60 #include <msg.h>
61 #include <vstring.h>
62 #include <vstream.h>
63 #include <vstring_vstream.h>
64 #include <events.h>
65 #include <iostuff.h>
66 #include <stringops.h>
67 #include <mymalloc.h>
69 /* Global library. */
71 #include <mail_queue.h>
72 #include <mail_proto.h>
73 #include <recipient_list.h>
74 #include <mail_params.h>
75 #include <deliver_request.h>
76 #include <verp_sender.h>
77 #include <dsn_util.h>
78 #include <dsn_buf.h>
79 #include <dsb_scan.h>
80 #include <rcpt_print.h>
82 /* Application-specific. */
84 #include "qmgr.h"
86 int qmgr_deliver_concurrency;
89 * Message delivery status codes.
91 #define DELIVER_STAT_OK 0 /* all recipients delivered */
92 #define DELIVER_STAT_DEFER 1 /* try some recipients later */
93 #define DELIVER_STAT_CRASH 2 /* mailer internal problem */
95 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
97 static int qmgr_deliver_initial_reply(VSTREAM *stream)
99 int stat;
101 if (peekfd(vstream_fileno(stream)) < 0) {
102 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
103 return (DELIVER_STAT_CRASH);
104 } else if (attr_scan(stream, ATTR_FLAG_STRICT,
105 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat,
106 ATTR_TYPE_END) != 1) {
107 msg_warn("%s: malformed response", VSTREAM_PATH(stream));
108 return (DELIVER_STAT_CRASH);
109 } else {
110 return (stat ? DELIVER_STAT_DEFER : 0);
114 /* qmgr_deliver_final_reply - retrieve final delivery process response */
116 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
118 int stat;
120 if (peekfd(vstream_fileno(stream)) < 0) {
121 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
122 return (DELIVER_STAT_CRASH);
123 } else if (attr_scan(stream, ATTR_FLAG_STRICT,
124 ATTR_TYPE_FUNC, dsb_scan, (void *) dsb,
125 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat,
126 ATTR_TYPE_END) != 2) {
127 msg_warn("%s: malformed response", VSTREAM_PATH(stream));
128 return (DELIVER_STAT_CRASH);
129 } else {
130 return (stat ? DELIVER_STAT_DEFER : 0);
134 /* qmgr_deliver_send_request - send delivery request to delivery process */
136 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
138 RECIPIENT_LIST list = entry->rcpt_list;
139 RECIPIENT *recipient;
140 QMGR_MESSAGE *message = entry->message;
141 VSTRING *sender_buf = 0;
142 MSG_STATS stats;
143 char *sender;
144 int flags;
147 * If variable envelope return path is requested, change prefix+@origin
148 * into prefix+user=domain@origin. Note that with VERP there is only one
149 * recipient per delivery.
151 if (message->verp_delims == 0) {
152 sender = message->sender;
153 } else {
154 sender_buf = vstring_alloc(100);
155 verp_sender(sender_buf, message->verp_delims,
156 message->sender, list.info);
157 sender = vstring_str(sender_buf);
160 flags = message->tflags
161 | entry->queue->dflags
162 | (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
163 (void) QMGR_MSG_STATS(&stats, message);
164 attr_print(stream, ATTR_FLAG_NONE,
165 ATTR_TYPE_INT, MAIL_ATTR_FLAGS, flags,
166 ATTR_TYPE_STR, MAIL_ATTR_QUEUE, message->queue_name,
167 ATTR_TYPE_STR, MAIL_ATTR_QUEUEID, message->queue_id,
168 ATTR_TYPE_LONG, MAIL_ATTR_OFFSET, message->data_offset,
169 ATTR_TYPE_LONG, MAIL_ATTR_SIZE, message->cont_length,
170 ATTR_TYPE_STR, MAIL_ATTR_NEXTHOP, entry->queue->nexthop,
171 ATTR_TYPE_STR, MAIL_ATTR_ENCODING, message->encoding,
172 ATTR_TYPE_STR, MAIL_ATTR_SENDER, sender,
173 ATTR_TYPE_STR, MAIL_ATTR_DSN_ENVID, message->dsn_envid,
174 ATTR_TYPE_INT, MAIL_ATTR_DSN_RET, message->dsn_ret,
175 ATTR_TYPE_FUNC, msg_stats_print, (void *) &stats,
176 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
177 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_NAME, message->client_name,
178 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr,
179 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_PORT, message->client_port,
180 ATTR_TYPE_STR, MAIL_ATTR_LOG_PROTO_NAME, message->client_proto,
181 ATTR_TYPE_STR, MAIL_ATTR_LOG_HELO_NAME, message->client_helo,
182 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
183 ATTR_TYPE_STR, MAIL_ATTR_SASL_METHOD, message->sasl_method,
184 ATTR_TYPE_STR, MAIL_ATTR_SASL_USERNAME, message->sasl_username,
185 ATTR_TYPE_STR, MAIL_ATTR_SASL_SENDER, message->sasl_sender,
186 /* XXX Ditto if we want to pass TLS certificate info. */
187 ATTR_TYPE_STR, MAIL_ATTR_RWR_CONTEXT, message->rewrite_context,
188 ATTR_TYPE_INT, MAIL_ATTR_RCPT_COUNT, list.len,
189 ATTR_TYPE_END);
190 if (sender_buf != 0)
191 vstring_free(sender_buf);
192 for (recipient = list.info; recipient < list.info + list.len; recipient++)
193 attr_print(stream, ATTR_FLAG_NONE,
194 ATTR_TYPE_FUNC, rcpt_print, (void *) recipient,
195 ATTR_TYPE_END);
196 if (vstream_fflush(stream) != 0) {
197 msg_warn("write to process (%s): %m", entry->queue->transport->name);
198 return (-1);
199 } else {
200 if (msg_verbose)
201 msg_info("qmgr_deliver: site `%s'", entry->queue->name);
202 return (0);
206 /* qmgr_deliver_abort - transport response watchdog */
208 static void qmgr_deliver_abort(int unused_event, char *context)
210 QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
211 QMGR_QUEUE *queue = entry->queue;
212 QMGR_TRANSPORT *transport = queue->transport;
213 QMGR_MESSAGE *message = entry->message;
215 msg_fatal("%s: timeout receiving delivery status from transport: %s",
216 message->queue_id, transport->name);
219 /* qmgr_deliver_update - process delivery status report */
221 static void qmgr_deliver_update(int unused_event, char *context)
223 QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
224 QMGR_QUEUE *queue = entry->queue;
225 QMGR_TRANSPORT *transport = queue->transport;
226 QMGR_MESSAGE *message = entry->message;
227 static DSN_BUF *dsb;
228 int status;
231 * Release the delivery agent from a "hot" queue entry.
233 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
234 event_disable_readwrite(vstream_fileno(entry->stream)); \
235 (void) vstream_fclose(entry->stream); \
236 entry->stream = 0; \
237 qmgr_deliver_concurrency--; \
238 } while (0)
240 if (dsb == 0)
241 dsb = dsb_create();
244 * The message transport has responded. Stop the watchdog timer.
246 event_cancel_timer(qmgr_deliver_abort, context);
249 * Retrieve the delivery agent status report. The numerical status code
250 * indicates if delivery should be tried again. The reason text is sent
251 * only when a site should be avoided for a while, so that the queue
252 * manager can log why it does not even try to schedule delivery to the
253 * affected recipients.
255 status = qmgr_deliver_final_reply(entry->stream, dsb);
258 * The mail delivery process failed for some reason (although delivery
259 * may have been successful). Back off with this transport type for a
260 * while. Dispose of queue entries for this transport that await
261 * selection (the todo lists). Stay away from queue entries that have
262 * been selected (the busy lists), or we would have dangling pointers.
263 * The queue itself won't go away before we dispose of the current queue
264 * entry.
266 if (status == DELIVER_STAT_CRASH) {
267 message->flags |= DELIVER_STAT_DEFER;
268 #if 0
269 whatsup = concatenate("unknown ", transport->name,
270 " mail transport error", (char *) 0);
271 qmgr_transport_throttle(transport,
272 DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
273 myfree(whatsup);
274 #else
275 qmgr_transport_throttle(transport,
276 DSN_SIMPLE(&dsb->dsn, "4.3.0",
277 "unknown mail transport error"));
278 #endif
279 msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
280 transport->name);
283 * Assume the worst and write a defer logfile record for each
284 * recipient. This omission was already present in the first queue
285 * manager implementation of 199703, and was fixed 200511.
287 * To avoid the synchronous qmgr_defer_recipient() operation for each
288 * recipient of this queue entry, release the delivery process and
289 * move the entry back to the todo queue. Let qmgr_defer_transport()
290 * log the recipient asynchronously if possible, and get out of here.
291 * Note: if asynchronous logging is not possible,
292 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
293 * the entry becomes a dangling pointer.
295 QMGR_DELIVER_RELEASE_AGENT(entry);
296 qmgr_entry_unselect(entry);
297 qmgr_defer_transport(transport, &dsb->dsn);
298 return;
302 * This message must be tried again.
304 * If we have a problem talking to this site, back off with this site for a
305 * while; dispose of queue entries for this site that await selection
306 * (the todo list); stay away from queue entries that have been selected
307 * (the busy list), or we would have dangling pointers. The queue itself
308 * won't go away before we dispose of the current queue entry.
310 * XXX Caution: DSN_COPY() will panic on empty status or reason.
312 #define SUSPENDED "delivery temporarily suspended: "
314 if (status == DELIVER_STAT_DEFER) {
315 message->flags |= DELIVER_STAT_DEFER;
316 if (VSTRING_LEN(dsb->status)) {
317 /* Sanitize the DSN status/reason from the delivery agent. */
318 if (!dsn_valid(vstring_str(dsb->status)))
319 vstring_strcpy(dsb->status, "4.0.0");
320 if (VSTRING_LEN(dsb->reason) == 0)
321 vstring_strcpy(dsb->reason, "unknown error");
322 vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
323 if (QMGR_QUEUE_READY(queue)) {
324 qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
325 if (QMGR_QUEUE_THROTTLED(queue))
326 qmgr_defer_todo(queue, &dsb->dsn);
332 * No problems detected. Mark the transport and queue as alive. The queue
333 * itself won't go away before we dispose of the current queue entry.
335 if (status != DELIVER_STAT_CRASH && VSTRING_LEN(dsb->reason) == 0) {
336 qmgr_transport_unthrottle(transport);
337 qmgr_queue_unthrottle(queue);
341 * Release the delivery process, and give some other queue entry a chance
342 * to be delivered. When all recipients for a message have been tried,
343 * decide what to do next with this message: defer, bounce, delete.
345 QMGR_DELIVER_RELEASE_AGENT(entry);
346 qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
349 /* qmgr_deliver - deliver one per-site queue entry */
351 void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
353 QMGR_ENTRY *entry;
354 DSN dsn;
357 * Find out if this delivery process is really available. Once elected,
358 * the delivery process is supposed to express its happiness. If there is
359 * a problem, wipe the pending deliveries for this transport. This
360 * routine runs in response to an external event, so it does not run
361 * while some other queue manipulation is happening.
363 if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
364 #if 0
365 whatsup = concatenate(transport->name,
366 " mail transport unavailable", (char *) 0);
367 qmgr_transport_throttle(transport,
368 DSN_SIMPLE(&dsn, "4.3.0", whatsup));
369 myfree(whatsup);
370 #else
371 qmgr_transport_throttle(transport,
372 DSN_SIMPLE(&dsn, "4.3.0",
373 "mail transport unavailable"));
374 #endif
375 qmgr_defer_transport(transport, &dsn);
376 if (stream)
377 (void) vstream_fclose(stream);
378 return;
382 * Find a suitable queue entry. Things may have changed since this
383 * transport was allocated. If no suitable entry is found,
384 * unceremoniously disconnect from the delivery process. The delivery
385 * agent request reading routine is prepared for the queue manager to
386 * change its mind for no apparent reason.
388 if ((entry = qmgr_job_entry_select(transport)) == 0) {
389 (void) vstream_fclose(stream);
390 return;
394 * Send the queue file info and recipient info to the delivery process.
395 * If there is a problem, wipe the pending deliveries for this transport.
396 * This routine runs in response to an external event, so it does not run
397 * while some other queue manipulation is happening.
399 if (qmgr_deliver_send_request(entry, stream) < 0) {
400 qmgr_entry_unselect(entry);
401 #if 0
402 whatsup = concatenate(transport->name,
403 " mail transport unavailable", (char *) 0);
404 qmgr_transport_throttle(transport,
405 DSN_SIMPLE(&dsn, "4.3.0", whatsup));
406 myfree(whatsup);
407 #else
408 qmgr_transport_throttle(transport,
409 DSN_SIMPLE(&dsn, "4.3.0",
410 "mail transport unavailable"));
411 #endif
412 qmgr_defer_transport(transport, &dsn);
413 /* warning: entry may be a dangling pointer here */
414 (void) vstream_fclose(stream);
415 return;
419 * If we get this far, go wait for the delivery status report.
421 qmgr_deliver_concurrency++;
422 entry->stream = stream;
423 event_enable_read(vstream_fileno(stream),
424 qmgr_deliver_update, (char *) entry);
427 * Guard against broken systems.
429 event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout);