1 /*-------------------------------------------------------------------------
4 * Use shm_mq to send & receive tuples between parallel backends
6 * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7 * under the hood, writes tuples from the executor to a shm_mq.
9 * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
11 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
15 * src/backend/executor/tqueue.c
17 *-------------------------------------------------------------------------
22 #include "access/htup_details.h"
23 #include "executor/tqueue.h"
26 * DestReceiver object's private contents
28 * queue is a pointer to data supplied by DestReceiver's caller.
30 typedef struct TQueueDestReceiver
32 DestReceiver pub
; /* public fields */
33 shm_mq_handle
*queue
; /* shm_mq to send to */
37 * TupleQueueReader object's private contents
39 * queue is a pointer to data supplied by reader's caller.
41 * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
43 struct TupleQueueReader
45 shm_mq_handle
*queue
; /* shm_mq to receive from */
49 * Receive a tuple from a query, and send it to the designated shm_mq.
51 * Returns true if successful, false if shm_mq has been detached.
54 tqueueReceiveSlot(TupleTableSlot
*slot
, DestReceiver
*self
)
56 TQueueDestReceiver
*tqueue
= (TQueueDestReceiver
*) self
;
61 /* Send the tuple itself. */
62 tuple
= ExecFetchSlotMinimalTuple(slot
, &should_free
);
63 result
= shm_mq_send(tqueue
->queue
, tuple
->t_len
, tuple
, false, false);
68 /* Check for failure. */
69 if (result
== SHM_MQ_DETACHED
)
71 else if (result
!= SHM_MQ_SUCCESS
)
73 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
74 errmsg("could not send tuple to shared-memory queue")));
80 * Prepare to receive tuples from executor.
83 tqueueStartupReceiver(DestReceiver
*self
, int operation
, TupleDesc typeinfo
)
89 * Clean up at end of an executor run
92 tqueueShutdownReceiver(DestReceiver
*self
)
94 TQueueDestReceiver
*tqueue
= (TQueueDestReceiver
*) self
;
96 if (tqueue
->queue
!= NULL
)
97 shm_mq_detach(tqueue
->queue
);
102 * Destroy receiver when done with it
105 tqueueDestroyReceiver(DestReceiver
*self
)
107 TQueueDestReceiver
*tqueue
= (TQueueDestReceiver
*) self
;
109 /* We probably already detached from queue, but let's be sure */
110 if (tqueue
->queue
!= NULL
)
111 shm_mq_detach(tqueue
->queue
);
116 * Create a DestReceiver that writes tuples to a tuple queue.
119 CreateTupleQueueDestReceiver(shm_mq_handle
*handle
)
121 TQueueDestReceiver
*self
;
123 self
= (TQueueDestReceiver
*) palloc0(sizeof(TQueueDestReceiver
));
125 self
->pub
.receiveSlot
= tqueueReceiveSlot
;
126 self
->pub
.rStartup
= tqueueStartupReceiver
;
127 self
->pub
.rShutdown
= tqueueShutdownReceiver
;
128 self
->pub
.rDestroy
= tqueueDestroyReceiver
;
129 self
->pub
.mydest
= DestTupleQueue
;
130 self
->queue
= handle
;
132 return (DestReceiver
*) self
;
136 * Create a tuple queue reader.
139 CreateTupleQueueReader(shm_mq_handle
*handle
)
141 TupleQueueReader
*reader
= palloc0(sizeof(TupleQueueReader
));
143 reader
->queue
= handle
;
149 * Destroy a tuple queue reader.
151 * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152 * We won't access it here, as it may be detached already.
155 DestroyTupleQueueReader(TupleQueueReader
*reader
)
161 * Fetch a tuple from a tuple queue reader.
163 * The return value is NULL if there are no remaining tuples or if
164 * nowait = true and no tuple is ready to return. *done, if not NULL,
165 * is set to true when there are no remaining tuples and otherwise to false.
167 * The returned tuple, if any, is either in shared memory or a private buffer
168 * and should not be freed. The pointer is invalid after the next call to
169 * TupleQueueReaderNext().
171 * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172 * accumulate bytes from a partially-read message, so it's useful to call
173 * this with nowait = true even if nothing is returned.
176 TupleQueueReaderNext(TupleQueueReader
*reader
, bool nowait
, bool *done
)
179 shm_mq_result result
;
186 /* Attempt to read a message. */
187 result
= shm_mq_receive(reader
->queue
, &nbytes
, &data
, nowait
);
189 /* If queue is detached, set *done and return NULL. */
190 if (result
== SHM_MQ_DETACHED
)
197 /* In non-blocking mode, bail out if no message ready yet. */
198 if (result
== SHM_MQ_WOULD_BLOCK
)
200 Assert(result
== SHM_MQ_SUCCESS
);
203 * Return a pointer to the queue memory directly (which had better be
204 * sufficiently aligned).
206 tuple
= (MinimalTuple
) data
;
207 Assert(tuple
->t_len
== nbytes
);