nbtree: fix read page recheck typo.
[pgsql.git] / src / backend / storage / ipc / shm_mq.c
blob9235fcd08ec21ae8c1e4959cc6042dee0c0c323f
1 /*-------------------------------------------------------------------------
3 * shm_mq.c
4 * single-reader, single-writer shared memory message queue
6 * Both the sender and the receiver must have a PGPROC; their respective
7 * process latches are used for synchronization. Only the sender may send,
8 * and only the receiver may receive. This is intended to allow a user
9 * backend to communicate with worker backends that it has registered.
11 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
14 * src/backend/storage/ipc/shm_mq.c
16 *-------------------------------------------------------------------------
19 #include "postgres.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "port/pg_bitutils.h"
24 #include "postmaster/bgworker.h"
25 #include "storage/shm_mq.h"
26 #include "storage/spin.h"
27 #include "utils/memutils.h"
30 * This structure represents the actual queue, stored in shared memory.
32 * Some notes on synchronization:
34 * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35 * mq_sender and mq_bytes_written can only be changed by the sender.
36 * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
37 * they cannot change once set, and thus may be read without a lock once this
38 * is known to be the case.
40 * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
41 * they are written atomically using 8 byte loads and stores. Memory barriers
42 * must be carefully used to synchronize reads and writes of these values with
43 * reads and writes of the actual data in mq_ring.
45 * mq_detached needs no locking. It can be set by either the sender or the
46 * receiver, but only ever from false to true, so redundant writes don't
47 * matter. It is important that if we set mq_detached and then set the
48 * counterparty's latch, the counterparty must be certain to see the change
49 * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
50 * ends with one, this should be OK.
52 * mq_ring_size and mq_ring_offset never change after initialization, and
53 * can therefore be read without the lock.
55 * Importantly, mq_ring can be safely read and written without a lock.
56 * At any given time, the difference between mq_bytes_read and
57 * mq_bytes_written defines the number of bytes within mq_ring that contain
58 * unread data, and mq_bytes_read defines the position where those bytes
59 * begin. The sender can increase the number of unread bytes at any time,
60 * but only the receiver can give license to overwrite those bytes, by
61 * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
62 * the unread bytes it knows to be present without the lock. Conversely,
63 * the sender can write to the unused portion of the ring buffer without
64 * the lock, because nobody else can be reading or writing those bytes. The
65 * receiver could be making more bytes unused by incrementing mq_bytes_read,
66 * but that's OK. Note that it would be unsafe for the receiver to read any
67 * data it's already marked as read, or to write any data; and it would be
68 * unsafe for the sender to reread any data after incrementing
69 * mq_bytes_written, but fortunately there's no need for any of that.
71 struct shm_mq
73 slock_t mq_mutex;
74 PGPROC *mq_receiver;
75 PGPROC *mq_sender;
76 pg_atomic_uint64 mq_bytes_read;
77 pg_atomic_uint64 mq_bytes_written;
78 Size mq_ring_size;
79 bool mq_detached;
80 uint8 mq_ring_offset;
81 char mq_ring[FLEXIBLE_ARRAY_MEMBER];
85 * This structure is a backend-private handle for access to a queue.
87 * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
88 * an optional pointer to the dynamic shared memory segment that contains it.
89 * (If mqh_segment is provided, we register an on_dsm_detach callback to
90 * make sure we detach from the queue before detaching from DSM.)
92 * If this queue is intended to connect the current process with a background
93 * worker that started it, the user can pass a pointer to the worker handle
94 * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
95 * is to allow us to begin sending to or receiving from that queue before the
96 * process we'll be communicating with has even been started. If it fails
97 * to start, the handle will allow us to notice that and fail cleanly, rather
98 * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
99 * simple cases - e.g. where there are just 2 processes communicating; in
100 * more complex scenarios, every process may not have a BackgroundWorkerHandle
101 * available, or may need to watch for the failure of more than one other
102 * process at a time.
104 * When a message exists as a contiguous chunk of bytes in the queue - that is,
105 * it is smaller than the size of the ring buffer and does not wrap around
106 * the end - we return the message to the caller as a pointer into the buffer.
107 * For messages that are larger or happen to wrap, we reassemble the message
108 * locally by copying the chunks into a backend-local buffer. mqh_buffer is
109 * the buffer, and mqh_buflen is the number of bytes allocated for it.
111 * mqh_send_pending, is number of bytes that is written to the queue but not
112 * yet updated in the shared memory. We will not update it until the written
113 * data is 1/4th of the ring size or the tuple queue is full. This will
114 * prevent frequent CPU cache misses, and it will also avoid frequent
115 * SetLatch() calls, which are quite expensive.
117 * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
118 * are used to track the state of non-blocking operations. When the caller
119 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
120 * are expected to retry the call at a later time with the same argument;
121 * we need to retain enough state to pick up where we left off.
122 * mqh_length_word_complete tracks whether we are done sending or receiving
123 * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
124 * the number of bytes read or written for either the length word or the
125 * message itself, and mqh_expected_bytes - which is used only for reads -
126 * tracks the expected total size of the payload.
128 * mqh_counterparty_attached tracks whether we know the counterparty to have
129 * attached to the queue at some previous point. This lets us avoid some
130 * mutex acquisitions.
132 * mqh_context is the memory context in effect at the time we attached to
133 * the shm_mq. The shm_mq_handle itself is allocated in this context, and
134 * we make sure any other allocations we do happen in this context as well,
135 * to avoid nasty surprises.
137 struct shm_mq_handle
139 shm_mq *mqh_queue;
140 dsm_segment *mqh_segment;
141 BackgroundWorkerHandle *mqh_handle;
142 char *mqh_buffer;
143 Size mqh_buflen;
144 Size mqh_consume_pending;
145 Size mqh_send_pending;
146 Size mqh_partial_bytes;
147 Size mqh_expected_bytes;
148 bool mqh_length_word_complete;
149 bool mqh_counterparty_attached;
150 MemoryContext mqh_context;
153 static void shm_mq_detach_internal(shm_mq *mq);
154 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
155 const void *data, bool nowait, Size *bytes_written);
156 static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
157 Size bytes_needed, bool nowait, Size *nbytesp,
158 void **datap);
159 static bool shm_mq_counterparty_gone(shm_mq *mq,
160 BackgroundWorkerHandle *handle);
161 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
162 BackgroundWorkerHandle *handle);
163 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
164 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
165 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
167 /* Minimum queue size is enough for header and at least one chunk of data. */
168 const Size shm_mq_minimum_size =
169 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
171 #define MQH_INITIAL_BUFSIZE 8192
174 * Initialize a new shared message queue.
176 shm_mq *
177 shm_mq_create(void *address, Size size)
179 shm_mq *mq = address;
180 Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
182 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
183 size = MAXALIGN_DOWN(size);
185 /* Queue size must be large enough to hold some data. */
186 Assert(size > data_offset);
188 /* Initialize queue header. */
189 SpinLockInit(&mq->mq_mutex);
190 mq->mq_receiver = NULL;
191 mq->mq_sender = NULL;
192 pg_atomic_init_u64(&mq->mq_bytes_read, 0);
193 pg_atomic_init_u64(&mq->mq_bytes_written, 0);
194 mq->mq_ring_size = size - data_offset;
195 mq->mq_detached = false;
196 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
198 return mq;
202 * Set the identity of the process that will receive from a shared message
203 * queue.
205 void
206 shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
208 PGPROC *sender;
210 SpinLockAcquire(&mq->mq_mutex);
211 Assert(mq->mq_receiver == NULL);
212 mq->mq_receiver = proc;
213 sender = mq->mq_sender;
214 SpinLockRelease(&mq->mq_mutex);
216 if (sender != NULL)
217 SetLatch(&sender->procLatch);
221 * Set the identity of the process that will send to a shared message queue.
223 void
224 shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
226 PGPROC *receiver;
228 SpinLockAcquire(&mq->mq_mutex);
229 Assert(mq->mq_sender == NULL);
230 mq->mq_sender = proc;
231 receiver = mq->mq_receiver;
232 SpinLockRelease(&mq->mq_mutex);
234 if (receiver != NULL)
235 SetLatch(&receiver->procLatch);
239 * Get the configured receiver.
241 PGPROC *
242 shm_mq_get_receiver(shm_mq *mq)
244 PGPROC *receiver;
246 SpinLockAcquire(&mq->mq_mutex);
247 receiver = mq->mq_receiver;
248 SpinLockRelease(&mq->mq_mutex);
250 return receiver;
254 * Get the configured sender.
256 PGPROC *
257 shm_mq_get_sender(shm_mq *mq)
259 PGPROC *sender;
261 SpinLockAcquire(&mq->mq_mutex);
262 sender = mq->mq_sender;
263 SpinLockRelease(&mq->mq_mutex);
265 return sender;
269 * Attach to a shared message queue so we can send or receive messages.
271 * The memory context in effect at the time this function is called should
272 * be one which will last for at least as long as the message queue itself.
273 * We'll allocate the handle in that context, and future allocations that
274 * are needed to buffer incoming data will happen in that context as well.
276 * If seg != NULL, the queue will be automatically detached when that dynamic
277 * shared memory segment is detached.
279 * If handle != NULL, the queue can be read or written even before the
280 * other process has attached. We'll wait for it to do so if needed. The
281 * handle must be for a background worker initialized with bgw_notify_pid
282 * equal to our PID.
284 * shm_mq_detach() should be called when done. This will free the
285 * shm_mq_handle and mark the queue itself as detached, so that our
286 * counterpart won't get stuck waiting for us to fill or drain the queue
287 * after we've already lost interest.
289 shm_mq_handle *
290 shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
292 shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
294 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
295 mqh->mqh_queue = mq;
296 mqh->mqh_segment = seg;
297 mqh->mqh_handle = handle;
298 mqh->mqh_buffer = NULL;
299 mqh->mqh_buflen = 0;
300 mqh->mqh_consume_pending = 0;
301 mqh->mqh_send_pending = 0;
302 mqh->mqh_partial_bytes = 0;
303 mqh->mqh_expected_bytes = 0;
304 mqh->mqh_length_word_complete = false;
305 mqh->mqh_counterparty_attached = false;
306 mqh->mqh_context = CurrentMemoryContext;
308 if (seg != NULL)
309 on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
311 return mqh;
315 * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
316 * been passed to shm_mq_attach.
318 void
319 shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
321 Assert(mqh->mqh_handle == NULL);
322 mqh->mqh_handle = handle;
326 * Write a message into a shared message queue.
328 shm_mq_result
329 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
330 bool force_flush)
332 shm_mq_iovec iov;
334 iov.data = data;
335 iov.len = nbytes;
337 return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
341 * Write a message into a shared message queue, gathered from multiple
342 * addresses.
344 * When nowait = false, we'll wait on our process latch when the ring buffer
345 * fills up, and then continue writing once the receiver has drained some data.
346 * The process latch is reset after each wait.
348 * When nowait = true, we do not manipulate the state of the process latch;
349 * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
350 * this case, the caller should call this function again, with the same
351 * arguments, each time the process latch is set. (Once begun, the sending
352 * of a message cannot be aborted except by detaching from the queue; changing
353 * the length or payload will corrupt the queue.)
355 * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
356 * and notify the receiver (if it is already attached). Otherwise, we don't
357 * update it until we have written an amount of data greater than 1/4th of the
358 * ring size.
360 shm_mq_result
361 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
362 bool force_flush)
364 shm_mq_result res;
365 shm_mq *mq = mqh->mqh_queue;
366 PGPROC *receiver;
367 Size nbytes = 0;
368 Size bytes_written;
369 int i;
370 int which_iov = 0;
371 Size offset;
373 Assert(mq->mq_sender == MyProc);
375 /* Compute total size of write. */
376 for (i = 0; i < iovcnt; ++i)
377 nbytes += iov[i].len;
379 /* Prevent writing messages overwhelming the receiver. */
380 if (nbytes > MaxAllocSize)
381 ereport(ERROR,
382 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
383 errmsg("cannot send a message of size %zu via shared memory queue",
384 nbytes)));
386 /* Try to write, or finish writing, the length word into the buffer. */
387 while (!mqh->mqh_length_word_complete)
389 Assert(mqh->mqh_partial_bytes < sizeof(Size));
390 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
391 ((char *) &nbytes) + mqh->mqh_partial_bytes,
392 nowait, &bytes_written);
394 if (res == SHM_MQ_DETACHED)
396 /* Reset state in case caller tries to send another message. */
397 mqh->mqh_partial_bytes = 0;
398 mqh->mqh_length_word_complete = false;
399 return res;
401 mqh->mqh_partial_bytes += bytes_written;
403 if (mqh->mqh_partial_bytes >= sizeof(Size))
405 Assert(mqh->mqh_partial_bytes == sizeof(Size));
407 mqh->mqh_partial_bytes = 0;
408 mqh->mqh_length_word_complete = true;
411 if (res != SHM_MQ_SUCCESS)
412 return res;
414 /* Length word can't be split unless bigger than required alignment. */
415 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
418 /* Write the actual data bytes into the buffer. */
419 Assert(mqh->mqh_partial_bytes <= nbytes);
420 offset = mqh->mqh_partial_bytes;
423 Size chunksize;
425 /* Figure out which bytes need to be sent next. */
426 if (offset >= iov[which_iov].len)
428 offset -= iov[which_iov].len;
429 ++which_iov;
430 if (which_iov >= iovcnt)
431 break;
432 continue;
436 * We want to avoid copying the data if at all possible, but every
437 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
438 * the last. Thus, if a chunk other than the last one ends on a
439 * non-MAXALIGN'd boundary, we have to combine the tail end of its
440 * data with data from one or more following chunks until we either
441 * reach the last chunk or accumulate a number of bytes which is
442 * MAXALIGN'd.
444 if (which_iov + 1 < iovcnt &&
445 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
447 char tmpbuf[MAXIMUM_ALIGNOF];
448 int j = 0;
450 for (;;)
452 if (offset < iov[which_iov].len)
454 tmpbuf[j] = iov[which_iov].data[offset];
455 j++;
456 offset++;
457 if (j == MAXIMUM_ALIGNOF)
458 break;
460 else
462 offset -= iov[which_iov].len;
463 which_iov++;
464 if (which_iov >= iovcnt)
465 break;
469 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
471 if (res == SHM_MQ_DETACHED)
473 /* Reset state in case caller tries to send another message. */
474 mqh->mqh_partial_bytes = 0;
475 mqh->mqh_length_word_complete = false;
476 return res;
479 mqh->mqh_partial_bytes += bytes_written;
480 if (res != SHM_MQ_SUCCESS)
481 return res;
482 continue;
486 * If this is the last chunk, we can write all the data, even if it
487 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
488 * MAXALIGN_DOWN the write size.
490 chunksize = iov[which_iov].len - offset;
491 if (which_iov + 1 < iovcnt)
492 chunksize = MAXALIGN_DOWN(chunksize);
493 res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
494 nowait, &bytes_written);
496 if (res == SHM_MQ_DETACHED)
498 /* Reset state in case caller tries to send another message. */
499 mqh->mqh_length_word_complete = false;
500 mqh->mqh_partial_bytes = 0;
501 return res;
504 mqh->mqh_partial_bytes += bytes_written;
505 offset += bytes_written;
506 if (res != SHM_MQ_SUCCESS)
507 return res;
508 } while (mqh->mqh_partial_bytes < nbytes);
510 /* Reset for next message. */
511 mqh->mqh_partial_bytes = 0;
512 mqh->mqh_length_word_complete = false;
514 /* If queue has been detached, let caller know. */
515 if (mq->mq_detached)
516 return SHM_MQ_DETACHED;
519 * If the counterparty is known to have attached, we can read mq_receiver
520 * without acquiring the spinlock. Otherwise, more caution is needed.
522 if (mqh->mqh_counterparty_attached)
523 receiver = mq->mq_receiver;
524 else
526 SpinLockAcquire(&mq->mq_mutex);
527 receiver = mq->mq_receiver;
528 SpinLockRelease(&mq->mq_mutex);
529 if (receiver != NULL)
530 mqh->mqh_counterparty_attached = true;
534 * If the caller has requested force flush or we have written more than
535 * 1/4 of the ring size, mark it as written in shared memory and notify
536 * the receiver.
538 if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
540 shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
541 if (receiver != NULL)
542 SetLatch(&receiver->procLatch);
543 mqh->mqh_send_pending = 0;
546 return SHM_MQ_SUCCESS;
550 * Receive a message from a shared message queue.
552 * We set *nbytes to the message length and *data to point to the message
553 * payload. If the entire message exists in the queue as a single,
554 * contiguous chunk, *data will point directly into shared memory; otherwise,
555 * it will point to a temporary buffer. This mostly avoids data copying in
556 * the hoped-for case where messages are short compared to the buffer size,
557 * while still allowing longer messages. In either case, the return value
558 * remains valid until the next receive operation is performed on the queue.
560 * When nowait = false, we'll wait on our process latch when the ring buffer
561 * is empty and we have not yet received a full message. The sender will
562 * set our process latch after more data has been written, and we'll resume
563 * processing. Each call will therefore return a complete message
564 * (unless the sender detaches the queue).
566 * When nowait = true, we do not manipulate the state of the process latch;
567 * instead, whenever the buffer is empty and we need to read from it, we
568 * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
569 * function again after the process latch has been set.
571 shm_mq_result
572 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
574 shm_mq *mq = mqh->mqh_queue;
575 shm_mq_result res;
576 Size rb = 0;
577 Size nbytes;
578 void *rawdata;
580 Assert(mq->mq_receiver == MyProc);
582 /* We can't receive data until the sender has attached. */
583 if (!mqh->mqh_counterparty_attached)
585 if (nowait)
587 int counterparty_gone;
590 * We shouldn't return at this point at all unless the sender
591 * hasn't attached yet. However, the correct return value depends
592 * on whether the sender is still attached. If we first test
593 * whether the sender has ever attached and then test whether the
594 * sender has detached, there's a race condition: a sender that
595 * attaches and detaches very quickly might fool us into thinking
596 * the sender never attached at all. So, test whether our
597 * counterparty is definitively gone first, and only afterwards
598 * check whether the sender ever attached in the first place.
600 counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
601 if (shm_mq_get_sender(mq) == NULL)
603 if (counterparty_gone)
604 return SHM_MQ_DETACHED;
605 else
606 return SHM_MQ_WOULD_BLOCK;
609 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
610 && shm_mq_get_sender(mq) == NULL)
612 mq->mq_detached = true;
613 return SHM_MQ_DETACHED;
615 mqh->mqh_counterparty_attached = true;
619 * If we've consumed an amount of data greater than 1/4th of the ring
620 * size, mark it consumed in shared memory. We try to avoid doing this
621 * unnecessarily when only a small amount of data has been consumed,
622 * because SetLatch() is fairly expensive and we don't want to do it too
623 * often.
625 if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
627 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
628 mqh->mqh_consume_pending = 0;
631 /* Try to read, or finish reading, the length word from the buffer. */
632 while (!mqh->mqh_length_word_complete)
634 /* Try to receive the message length word. */
635 Assert(mqh->mqh_partial_bytes < sizeof(Size));
636 res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
637 nowait, &rb, &rawdata);
638 if (res != SHM_MQ_SUCCESS)
639 return res;
642 * Hopefully, we'll receive the entire message length word at once.
643 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
644 * multiple reads.
646 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
648 Size needed;
650 nbytes = *(Size *) rawdata;
652 /* If we've already got the whole message, we're done. */
653 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
654 if (rb >= needed)
656 mqh->mqh_consume_pending += needed;
657 *nbytesp = nbytes;
658 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
659 return SHM_MQ_SUCCESS;
663 * We don't have the whole message, but we at least have the whole
664 * length word.
666 mqh->mqh_expected_bytes = nbytes;
667 mqh->mqh_length_word_complete = true;
668 mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
669 rb -= MAXALIGN(sizeof(Size));
671 else
673 Size lengthbytes;
675 /* Can't be split unless bigger than required alignment. */
676 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
678 /* Message word is split; need buffer to reassemble. */
679 if (mqh->mqh_buffer == NULL)
681 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
682 MQH_INITIAL_BUFSIZE);
683 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
685 Assert(mqh->mqh_buflen >= sizeof(Size));
687 /* Copy partial length word; remember to consume it. */
688 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
689 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
690 else
691 lengthbytes = rb;
692 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
693 lengthbytes);
694 mqh->mqh_partial_bytes += lengthbytes;
695 mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
696 rb -= lengthbytes;
698 /* If we now have the whole word, we're ready to read payload. */
699 if (mqh->mqh_partial_bytes >= sizeof(Size))
701 Assert(mqh->mqh_partial_bytes == sizeof(Size));
702 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
703 mqh->mqh_length_word_complete = true;
704 mqh->mqh_partial_bytes = 0;
708 nbytes = mqh->mqh_expected_bytes;
711 * Should be disallowed on the sending side already, but better check and
712 * error out on the receiver side as well rather than trying to read a
713 * prohibitively large message.
715 if (nbytes > MaxAllocSize)
716 ereport(ERROR,
717 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
718 errmsg("invalid message size %zu in shared memory queue",
719 nbytes)));
721 if (mqh->mqh_partial_bytes == 0)
724 * Try to obtain the whole message in a single chunk. If this works,
725 * we need not copy the data and can return a pointer directly into
726 * shared memory.
728 res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
729 if (res != SHM_MQ_SUCCESS)
730 return res;
731 if (rb >= nbytes)
733 mqh->mqh_length_word_complete = false;
734 mqh->mqh_consume_pending += MAXALIGN(nbytes);
735 *nbytesp = nbytes;
736 *datap = rawdata;
737 return SHM_MQ_SUCCESS;
741 * The message has wrapped the buffer. We'll need to copy it in order
742 * to return it to the client in one chunk. First, make sure we have
743 * a large enough buffer available.
745 if (mqh->mqh_buflen < nbytes)
747 Size newbuflen;
750 * Increase size to the next power of 2 that's >= nbytes, but
751 * limit to MaxAllocSize.
753 newbuflen = pg_nextpower2_size_t(nbytes);
754 newbuflen = Min(newbuflen, MaxAllocSize);
756 if (mqh->mqh_buffer != NULL)
758 pfree(mqh->mqh_buffer);
759 mqh->mqh_buffer = NULL;
760 mqh->mqh_buflen = 0;
762 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
763 mqh->mqh_buflen = newbuflen;
767 /* Loop until we've copied the entire message. */
768 for (;;)
770 Size still_needed;
772 /* Copy as much as we can. */
773 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
774 if (rb > 0)
776 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
777 mqh->mqh_partial_bytes += rb;
781 * Update count of bytes that can be consumed, accounting for
782 * alignment padding. Note that this will never actually insert any
783 * padding except at the end of a message, because the buffer size is
784 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
786 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
787 mqh->mqh_consume_pending += MAXALIGN(rb);
789 /* If we got all the data, exit the loop. */
790 if (mqh->mqh_partial_bytes >= nbytes)
791 break;
793 /* Wait for some more data. */
794 still_needed = nbytes - mqh->mqh_partial_bytes;
795 res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
796 if (res != SHM_MQ_SUCCESS)
797 return res;
798 if (rb > still_needed)
799 rb = still_needed;
802 /* Return the complete message, and reset for next message. */
803 *nbytesp = nbytes;
804 *datap = mqh->mqh_buffer;
805 mqh->mqh_length_word_complete = false;
806 mqh->mqh_partial_bytes = 0;
807 return SHM_MQ_SUCCESS;
811 * Wait for the other process that's supposed to use this queue to attach
812 * to it.
814 * The return value is SHM_MQ_DETACHED if the worker has already detached or
815 * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
816 * Note that we will only be able to detect that the worker has died before
817 * attaching if a background worker handle was passed to shm_mq_attach().
819 shm_mq_result
820 shm_mq_wait_for_attach(shm_mq_handle *mqh)
822 shm_mq *mq = mqh->mqh_queue;
823 PGPROC **victim;
825 if (shm_mq_get_receiver(mq) == MyProc)
826 victim = &mq->mq_sender;
827 else
829 Assert(shm_mq_get_sender(mq) == MyProc);
830 victim = &mq->mq_receiver;
833 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
834 return SHM_MQ_SUCCESS;
835 else
836 return SHM_MQ_DETACHED;
840 * Detach from a shared message queue, and destroy the shm_mq_handle.
842 void
843 shm_mq_detach(shm_mq_handle *mqh)
845 /* Before detaching, notify the receiver about any already-written data. */
846 if (mqh->mqh_send_pending > 0)
848 shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
849 mqh->mqh_send_pending = 0;
852 /* Notify counterparty that we're outta here. */
853 shm_mq_detach_internal(mqh->mqh_queue);
855 /* Cancel on_dsm_detach callback, if any. */
856 if (mqh->mqh_segment)
857 cancel_on_dsm_detach(mqh->mqh_segment,
858 shm_mq_detach_callback,
859 PointerGetDatum(mqh->mqh_queue));
861 /* Release local memory associated with handle. */
862 if (mqh->mqh_buffer != NULL)
863 pfree(mqh->mqh_buffer);
864 pfree(mqh);
868 * Notify counterparty that we're detaching from shared message queue.
870 * The purpose of this function is to make sure that the process
871 * with which we're communicating doesn't block forever waiting for us to
872 * fill or drain the queue once we've lost interest. When the sender
873 * detaches, the receiver can read any messages remaining in the queue;
874 * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
875 * further attempts to send messages will likewise return SHM_MQ_DETACHED.
877 * This is separated out from shm_mq_detach() because if the on_dsm_detach
878 * callback fires, we only want to do this much. We do not try to touch
879 * the local shm_mq_handle, as it may have been pfree'd already.
881 static void
882 shm_mq_detach_internal(shm_mq *mq)
884 PGPROC *victim;
886 SpinLockAcquire(&mq->mq_mutex);
887 if (mq->mq_sender == MyProc)
888 victim = mq->mq_receiver;
889 else
891 Assert(mq->mq_receiver == MyProc);
892 victim = mq->mq_sender;
894 mq->mq_detached = true;
895 SpinLockRelease(&mq->mq_mutex);
897 if (victim != NULL)
898 SetLatch(&victim->procLatch);
902 * Get the shm_mq from handle.
904 shm_mq *
905 shm_mq_get_queue(shm_mq_handle *mqh)
907 return mqh->mqh_queue;
911 * Write bytes into a shared message queue.
913 static shm_mq_result
914 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
915 bool nowait, Size *bytes_written)
917 shm_mq *mq = mqh->mqh_queue;
918 Size sent = 0;
919 uint64 used;
920 Size ringsize = mq->mq_ring_size;
921 Size available;
923 while (sent < nbytes)
925 uint64 rb;
926 uint64 wb;
928 /* Compute number of ring buffer bytes used and available. */
929 rb = pg_atomic_read_u64(&mq->mq_bytes_read);
930 wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
931 Assert(wb >= rb);
932 used = wb - rb;
933 Assert(used <= ringsize);
934 available = Min(ringsize - used, nbytes - sent);
937 * Bail out if the queue has been detached. Note that we would be in
938 * trouble if the compiler decided to cache the value of
939 * mq->mq_detached in a register or on the stack across loop
940 * iterations. It probably shouldn't do that anyway since we'll
941 * always return, call an external function that performs a system
942 * call, or reach a memory barrier at some point later in the loop,
943 * but just to be sure, insert a compiler barrier here.
945 pg_compiler_barrier();
946 if (mq->mq_detached)
948 *bytes_written = sent;
949 return SHM_MQ_DETACHED;
952 if (available == 0 && !mqh->mqh_counterparty_attached)
955 * The queue is full, so if the receiver isn't yet known to be
956 * attached, we must wait for that to happen.
958 if (nowait)
960 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
962 *bytes_written = sent;
963 return SHM_MQ_DETACHED;
965 if (shm_mq_get_receiver(mq) == NULL)
967 *bytes_written = sent;
968 return SHM_MQ_WOULD_BLOCK;
971 else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
972 mqh->mqh_handle))
974 mq->mq_detached = true;
975 *bytes_written = sent;
976 return SHM_MQ_DETACHED;
978 mqh->mqh_counterparty_attached = true;
981 * The receiver may have read some data after attaching, so we
982 * must not wait without rechecking the queue state.
985 else if (available == 0)
987 /* Update the pending send bytes in the shared memory. */
988 shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
991 * Since mq->mqh_counterparty_attached is known to be true at this
992 * point, mq_receiver has been set, and it can't change once set.
993 * Therefore, we can read it without acquiring the spinlock.
995 Assert(mqh->mqh_counterparty_attached);
996 SetLatch(&mq->mq_receiver->procLatch);
999 * We have just updated the mqh_send_pending bytes in the shared
1000 * memory so reset it.
1002 mqh->mqh_send_pending = 0;
1004 /* Skip manipulation of our latch if nowait = true. */
1005 if (nowait)
1007 *bytes_written = sent;
1008 return SHM_MQ_WOULD_BLOCK;
1012 * Wait for our latch to be set. It might already be set for some
1013 * unrelated reason, but that'll just result in one extra trip
1014 * through the loop. It's worth it to avoid resetting the latch
1015 * at top of loop, because setting an already-set latch is much
1016 * cheaper than setting one that has been reset.
1018 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1019 WAIT_EVENT_MESSAGE_QUEUE_SEND);
1021 /* Reset the latch so we don't spin. */
1022 ResetLatch(MyLatch);
1024 /* An interrupt may have occurred while we were waiting. */
1025 CHECK_FOR_INTERRUPTS();
1027 else
1029 Size offset;
1030 Size sendnow;
1032 offset = wb % (uint64) ringsize;
1033 sendnow = Min(available, ringsize - offset);
1036 * Write as much data as we can via a single memcpy(). Make sure
1037 * these writes happen after the read of mq_bytes_read, above.
1038 * This barrier pairs with the one in shm_mq_inc_bytes_read.
1039 * (Since we're separating the read of mq_bytes_read from a
1040 * subsequent write to mq_ring, we need a full barrier here.)
1042 pg_memory_barrier();
1043 memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1044 (char *) data + sent, sendnow);
1045 sent += sendnow;
1048 * Update count of bytes written, with alignment padding. Note
1049 * that this will never actually insert any padding except at the
1050 * end of a run of bytes, because the buffer size is a multiple of
1051 * MAXIMUM_ALIGNOF, and each read is as well.
1053 Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1056 * For efficiency, we don't update the bytes written in the shared
1057 * memory and also don't set the reader's latch here. Refer to
1058 * the comments atop the shm_mq_handle structure for more
1059 * information.
1061 mqh->mqh_send_pending += MAXALIGN(sendnow);
1065 *bytes_written = sent;
1066 return SHM_MQ_SUCCESS;
1070 * Wait until at least *nbytesp bytes are available to be read from the
1071 * shared message queue, or until the buffer wraps around. If the queue is
1072 * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1073 * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1074 * to the location at which data bytes can be read, *nbytesp is set to the
1075 * number of bytes which can be read at that address, and the return value
1076 * is SHM_MQ_SUCCESS.
1078 static shm_mq_result
1079 shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1080 Size *nbytesp, void **datap)
1082 shm_mq *mq = mqh->mqh_queue;
1083 Size ringsize = mq->mq_ring_size;
1084 uint64 used;
1085 uint64 written;
1087 for (;;)
1089 Size offset;
1090 uint64 read;
1092 /* Get bytes written, so we can compute what's available to read. */
1093 written = pg_atomic_read_u64(&mq->mq_bytes_written);
1096 * Get bytes read. Include bytes we could consume but have not yet
1097 * consumed.
1099 read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1100 mqh->mqh_consume_pending;
1101 used = written - read;
1102 Assert(used <= ringsize);
1103 offset = read % (uint64) ringsize;
1105 /* If we have enough data or buffer has wrapped, we're done. */
1106 if (used >= bytes_needed || offset + used >= ringsize)
1108 *nbytesp = Min(used, ringsize - offset);
1109 *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1112 * Separate the read of mq_bytes_written, above, from caller's
1113 * attempt to read the data itself. Pairs with the barrier in
1114 * shm_mq_inc_bytes_written.
1116 pg_read_barrier();
1117 return SHM_MQ_SUCCESS;
1121 * Fall out before waiting if the queue has been detached.
1123 * Note that we don't check for this until *after* considering whether
1124 * the data already available is enough, since the receiver can finish
1125 * receiving a message stored in the buffer even after the sender has
1126 * detached.
1128 if (mq->mq_detached)
1131 * If the writer advanced mq_bytes_written and then set
1132 * mq_detached, we might not have read the final value of
1133 * mq_bytes_written above. Insert a read barrier and then check
1134 * again if mq_bytes_written has advanced.
1136 pg_read_barrier();
1137 if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1138 continue;
1140 return SHM_MQ_DETACHED;
1144 * We didn't get enough data to satisfy the request, so mark any data
1145 * previously-consumed as read to make more buffer space.
1147 if (mqh->mqh_consume_pending > 0)
1149 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1150 mqh->mqh_consume_pending = 0;
1153 /* Skip manipulation of our latch if nowait = true. */
1154 if (nowait)
1155 return SHM_MQ_WOULD_BLOCK;
1158 * Wait for our latch to be set. It might already be set for some
1159 * unrelated reason, but that'll just result in one extra trip through
1160 * the loop. It's worth it to avoid resetting the latch at top of
1161 * loop, because setting an already-set latch is much cheaper than
1162 * setting one that has been reset.
1164 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1165 WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1167 /* Reset the latch so we don't spin. */
1168 ResetLatch(MyLatch);
1170 /* An interrupt may have occurred while we were waiting. */
1171 CHECK_FOR_INTERRUPTS();
1176 * Test whether a counterparty who may not even be alive yet is definitely gone.
1178 static bool
1179 shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1181 pid_t pid;
1183 /* If the queue has been detached, counterparty is definitely gone. */
1184 if (mq->mq_detached)
1185 return true;
1187 /* If there's a handle, check worker status. */
1188 if (handle != NULL)
1190 BgwHandleStatus status;
1192 /* Check for unexpected worker death. */
1193 status = GetBackgroundWorkerPid(handle, &pid);
1194 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1196 /* Mark it detached, just to make it official. */
1197 mq->mq_detached = true;
1198 return true;
1202 /* Counterparty is not definitively gone. */
1203 return false;
1207 * This is used when a process is waiting for its counterpart to attach to the
1208 * queue. We exit when the other process attaches as expected, or, if
1209 * handle != NULL, when the referenced background process or the postmaster
1210 * dies. Note that if handle == NULL, and the process fails to attach, we'll
1211 * potentially get stuck here forever waiting for a process that may never
1212 * start. We do check for interrupts, though.
1214 * ptr is a pointer to the memory address that we're expecting to become
1215 * non-NULL when our counterpart attaches to the queue.
1217 static bool
1218 shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1220 bool result = false;
1222 for (;;)
1224 BgwHandleStatus status;
1225 pid_t pid;
1227 /* Acquire the lock just long enough to check the pointer. */
1228 SpinLockAcquire(&mq->mq_mutex);
1229 result = (*ptr != NULL);
1230 SpinLockRelease(&mq->mq_mutex);
1232 /* Fail if detached; else succeed if initialized. */
1233 if (mq->mq_detached)
1235 result = false;
1236 break;
1238 if (result)
1239 break;
1241 if (handle != NULL)
1243 /* Check for unexpected worker death. */
1244 status = GetBackgroundWorkerPid(handle, &pid);
1245 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1247 result = false;
1248 break;
1252 /* Wait to be signaled. */
1253 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1254 WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
1256 /* Reset the latch so we don't spin. */
1257 ResetLatch(MyLatch);
1259 /* An interrupt may have occurred while we were waiting. */
1260 CHECK_FOR_INTERRUPTS();
1263 return result;
1267 * Increment the number of bytes read.
1269 static void
1270 shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1272 PGPROC *sender;
1275 * Separate prior reads of mq_ring from the increment of mq_bytes_read
1276 * which follows. This pairs with the full barrier in
1277 * shm_mq_send_bytes(). We only need a read barrier here because the
1278 * increment of mq_bytes_read is actually a read followed by a dependent
1279 * write.
1281 pg_read_barrier();
1284 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1285 * else can be changing this value. This method should be cheaper.
1287 pg_atomic_write_u64(&mq->mq_bytes_read,
1288 pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1291 * We shouldn't have any bytes to read without a sender, so we can read
1292 * mq_sender here without a lock. Once it's initialized, it can't change.
1294 sender = mq->mq_sender;
1295 Assert(sender != NULL);
1296 SetLatch(&sender->procLatch);
1300 * Increment the number of bytes written.
1302 static void
1303 shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1306 * Separate prior reads of mq_ring from the write of mq_bytes_written
1307 * which we're about to do. Pairs with the read barrier found in
1308 * shm_mq_receive_bytes.
1310 pg_write_barrier();
1313 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1314 * else can be changing this value. This method avoids taking the bus
1315 * lock unnecessarily.
1317 pg_atomic_write_u64(&mq->mq_bytes_written,
1318 pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1321 /* Shim for on_dsm_detach callback. */
1322 static void
1323 shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1325 shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1327 shm_mq_detach_internal(mq);