1 /*-------------------------------------------------------------------------
4 * Use the frontend/backend protocol for communication over a shm_mq
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * src/backend/libpq/pqmq.c
11 *-------------------------------------------------------------------------
16 #include "access/parallel.h"
17 #include "libpq/libpq.h"
18 #include "libpq/pqformat.h"
19 #include "libpq/pqmq.h"
20 #include "miscadmin.h"
22 #include "replication/logicalworker.h"
23 #include "tcop/tcopprot.h"
24 #include "utils/builtins.h"
26 static shm_mq_handle
*pq_mq_handle
;
27 static bool pq_mq_busy
= false;
28 static pid_t pq_mq_parallel_leader_pid
= 0;
29 static pid_t pq_mq_parallel_leader_proc_number
= INVALID_PROC_NUMBER
;
31 static void pq_cleanup_redirect_to_shm_mq(dsm_segment
*seg
, Datum arg
);
32 static void mq_comm_reset(void);
33 static int mq_flush(void);
34 static int mq_flush_if_writable(void);
35 static bool mq_is_send_pending(void);
36 static int mq_putmessage(char msgtype
, const char *s
, size_t len
);
37 static void mq_putmessage_noblock(char msgtype
, const char *s
, size_t len
);
39 static const PQcommMethods PqCommMqMethods
= {
40 .comm_reset
= mq_comm_reset
,
42 .flush_if_writable
= mq_flush_if_writable
,
43 .is_send_pending
= mq_is_send_pending
,
44 .putmessage
= mq_putmessage
,
45 .putmessage_noblock
= mq_putmessage_noblock
49 * Arrange to redirect frontend/backend protocol messages to a shared-memory
53 pq_redirect_to_shm_mq(dsm_segment
*seg
, shm_mq_handle
*mqh
)
55 PqCommMethods
= &PqCommMqMethods
;
57 whereToSendOutput
= DestRemote
;
58 FrontendProtocol
= PG_PROTOCOL_LATEST
;
59 on_dsm_detach(seg
, pq_cleanup_redirect_to_shm_mq
, (Datum
) 0);
63 * When the DSM that contains our shm_mq goes away, we need to stop sending
67 pq_cleanup_redirect_to_shm_mq(dsm_segment
*seg
, Datum arg
)
70 whereToSendOutput
= DestNone
;
74 * Arrange to SendProcSignal() to the parallel leader each time we transmit
75 * message data via the shm_mq.
78 pq_set_parallel_leader(pid_t pid
, ProcNumber procNumber
)
80 Assert(PqCommMethods
== &PqCommMqMethods
);
81 pq_mq_parallel_leader_pid
= pid
;
82 pq_mq_parallel_leader_proc_number
= procNumber
;
99 mq_flush_if_writable(void)
106 mq_is_send_pending(void)
108 /* There's never anything pending. */
113 * Transmit a libpq protocol message to the shared memory message queue
114 * selected via pq_mq_handle. We don't include a length word, because the
115 * receiver will know the length of the message from shm_mq_receive().
118 mq_putmessage(char msgtype
, const char *s
, size_t len
)
121 shm_mq_result result
;
124 * If we're sending a message, and we have to wait because the queue is
125 * full, and then we get interrupted, and that interrupt results in trying
126 * to send another message, we respond by detaching the queue. There's no
127 * way to return to the original context, but even if there were, just
128 * queueing the message would amount to indefinitely postponing the
129 * response to the interrupt. So we do this instead.
133 if (pq_mq_handle
!= NULL
)
134 shm_mq_detach(pq_mq_handle
);
140 * If the message queue is already gone, just ignore the message. This
141 * doesn't necessarily indicate a problem; for example, DEBUG messages can
142 * be generated late in the shutdown sequence, after all DSMs have already
145 if (pq_mq_handle
== NULL
)
150 iov
[0].data
= &msgtype
;
155 Assert(pq_mq_handle
!= NULL
);
160 * Immediately notify the receiver by passing force_flush as true so
161 * that the shared memory value is updated before we send the parallel
162 * message signal right after this.
164 result
= shm_mq_sendv(pq_mq_handle
, iov
, 2, true, true);
166 if (pq_mq_parallel_leader_pid
!= 0)
168 if (IsLogicalParallelApplyWorker())
169 SendProcSignal(pq_mq_parallel_leader_pid
,
170 PROCSIG_PARALLEL_APPLY_MESSAGE
,
171 pq_mq_parallel_leader_proc_number
);
174 Assert(IsParallelWorker());
175 SendProcSignal(pq_mq_parallel_leader_pid
,
176 PROCSIG_PARALLEL_MESSAGE
,
177 pq_mq_parallel_leader_proc_number
);
181 if (result
!= SHM_MQ_WOULD_BLOCK
)
184 (void) WaitLatch(MyLatch
, WL_LATCH_SET
| WL_EXIT_ON_PM_DEATH
, 0,
185 WAIT_EVENT_MESSAGE_QUEUE_PUT_MESSAGE
);
187 CHECK_FOR_INTERRUPTS();
192 Assert(result
== SHM_MQ_SUCCESS
|| result
== SHM_MQ_DETACHED
);
193 if (result
!= SHM_MQ_SUCCESS
)
199 mq_putmessage_noblock(char msgtype
, const char *s
, size_t len
)
202 * While the shm_mq machinery does support sending a message in
203 * non-blocking mode, there's currently no way to try sending beginning to
204 * send the message that doesn't also commit us to completing the
205 * transmission. This could be improved in the future, but for now we
208 elog(ERROR
, "not currently supported");
212 * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
213 * structure with the results.
216 pq_parse_errornotice(StringInfo msg
, ErrorData
*edata
)
218 /* Initialize edata with reasonable defaults. */
219 MemSet(edata
, 0, sizeof(ErrorData
));
220 edata
->elevel
= ERROR
;
221 edata
->assoc_context
= CurrentMemoryContext
;
223 /* Loop over fields and extract each one. */
226 char code
= pq_getmsgbyte(msg
);
234 value
= pq_getmsgrawstring(msg
);
238 case PG_DIAG_SEVERITY
:
239 /* ignore, trusting we'll get a nonlocalized version */
241 case PG_DIAG_SEVERITY_NONLOCALIZED
:
242 if (strcmp(value
, "DEBUG") == 0)
245 * We can't reconstruct the exact DEBUG level, but
246 * presumably it was >= client_min_messages, so select
247 * DEBUG1 to ensure we'll pass it on to the client.
249 edata
->elevel
= DEBUG1
;
251 else if (strcmp(value
, "LOG") == 0)
254 * It can't be LOG_SERVER_ONLY, or the worker wouldn't
255 * have sent it to us; so LOG is the correct value.
259 else if (strcmp(value
, "INFO") == 0)
260 edata
->elevel
= INFO
;
261 else if (strcmp(value
, "NOTICE") == 0)
262 edata
->elevel
= NOTICE
;
263 else if (strcmp(value
, "WARNING") == 0)
264 edata
->elevel
= WARNING
;
265 else if (strcmp(value
, "ERROR") == 0)
266 edata
->elevel
= ERROR
;
267 else if (strcmp(value
, "FATAL") == 0)
268 edata
->elevel
= FATAL
;
269 else if (strcmp(value
, "PANIC") == 0)
270 edata
->elevel
= PANIC
;
272 elog(ERROR
, "unrecognized error severity: \"%s\"", value
);
274 case PG_DIAG_SQLSTATE
:
275 if (strlen(value
) != 5)
276 elog(ERROR
, "invalid SQLSTATE: \"%s\"", value
);
277 edata
->sqlerrcode
= MAKE_SQLSTATE(value
[0], value
[1], value
[2],
280 case PG_DIAG_MESSAGE_PRIMARY
:
281 edata
->message
= pstrdup(value
);
283 case PG_DIAG_MESSAGE_DETAIL
:
284 edata
->detail
= pstrdup(value
);
286 case PG_DIAG_MESSAGE_HINT
:
287 edata
->hint
= pstrdup(value
);
289 case PG_DIAG_STATEMENT_POSITION
:
290 edata
->cursorpos
= pg_strtoint32(value
);
292 case PG_DIAG_INTERNAL_POSITION
:
293 edata
->internalpos
= pg_strtoint32(value
);
295 case PG_DIAG_INTERNAL_QUERY
:
296 edata
->internalquery
= pstrdup(value
);
298 case PG_DIAG_CONTEXT
:
299 edata
->context
= pstrdup(value
);
301 case PG_DIAG_SCHEMA_NAME
:
302 edata
->schema_name
= pstrdup(value
);
304 case PG_DIAG_TABLE_NAME
:
305 edata
->table_name
= pstrdup(value
);
307 case PG_DIAG_COLUMN_NAME
:
308 edata
->column_name
= pstrdup(value
);
310 case PG_DIAG_DATATYPE_NAME
:
311 edata
->datatype_name
= pstrdup(value
);
313 case PG_DIAG_CONSTRAINT_NAME
:
314 edata
->constraint_name
= pstrdup(value
);
316 case PG_DIAG_SOURCE_FILE
:
317 edata
->filename
= pstrdup(value
);
319 case PG_DIAG_SOURCE_LINE
:
320 edata
->lineno
= pg_strtoint32(value
);
322 case PG_DIAG_SOURCE_FUNCTION
:
323 edata
->funcname
= pstrdup(value
);
326 elog(ERROR
, "unrecognized error field code: %d", (int) code
);