Harmonize parameter names in ecpg code.
[pgsql.git] / src / backend / executor / tqueue.c
blob3449b8039cd1436a4a09cf917bad6e41b73edbdb
1 /*-------------------------------------------------------------------------
3 * tqueue.c
4 * Use shm_mq to send & receive tuples between parallel backends
6 * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7 * under the hood, writes tuples from the executor to a shm_mq.
9 * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
11 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
14 * IDENTIFICATION
15 * src/backend/executor/tqueue.c
17 *-------------------------------------------------------------------------
20 #include "postgres.h"
22 #include "access/htup_details.h"
23 #include "executor/tqueue.h"
26 * DestReceiver object's private contents
28 * queue is a pointer to data supplied by DestReceiver's caller.
30 typedef struct TQueueDestReceiver
32 DestReceiver pub; /* public fields */
33 shm_mq_handle *queue; /* shm_mq to send to */
34 } TQueueDestReceiver;
37 * TupleQueueReader object's private contents
39 * queue is a pointer to data supplied by reader's caller.
41 * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
43 struct TupleQueueReader
45 shm_mq_handle *queue; /* shm_mq to receive from */
49 * Receive a tuple from a query, and send it to the designated shm_mq.
51 * Returns true if successful, false if shm_mq has been detached.
53 static bool
54 tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
56 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57 MinimalTuple tuple;
58 shm_mq_result result;
59 bool should_free;
61 /* Send the tuple itself. */
62 tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
63 result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
65 if (should_free)
66 pfree(tuple);
68 /* Check for failure. */
69 if (result == SHM_MQ_DETACHED)
70 return false;
71 else if (result != SHM_MQ_SUCCESS)
72 ereport(ERROR,
73 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74 errmsg("could not send tuple to shared-memory queue")));
76 return true;
80 * Prepare to receive tuples from executor.
82 static void
83 tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
85 /* do nothing */
89 * Clean up at end of an executor run
91 static void
92 tqueueShutdownReceiver(DestReceiver *self)
94 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
96 if (tqueue->queue != NULL)
97 shm_mq_detach(tqueue->queue);
98 tqueue->queue = NULL;
102 * Destroy receiver when done with it
104 static void
105 tqueueDestroyReceiver(DestReceiver *self)
107 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
109 /* We probably already detached from queue, but let's be sure */
110 if (tqueue->queue != NULL)
111 shm_mq_detach(tqueue->queue);
112 pfree(self);
116 * Create a DestReceiver that writes tuples to a tuple queue.
118 DestReceiver *
119 CreateTupleQueueDestReceiver(shm_mq_handle *handle)
121 TQueueDestReceiver *self;
123 self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
125 self->pub.receiveSlot = tqueueReceiveSlot;
126 self->pub.rStartup = tqueueStartupReceiver;
127 self->pub.rShutdown = tqueueShutdownReceiver;
128 self->pub.rDestroy = tqueueDestroyReceiver;
129 self->pub.mydest = DestTupleQueue;
130 self->queue = handle;
132 return (DestReceiver *) self;
136 * Create a tuple queue reader.
138 TupleQueueReader *
139 CreateTupleQueueReader(shm_mq_handle *handle)
141 TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
143 reader->queue = handle;
145 return reader;
149 * Destroy a tuple queue reader.
151 * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152 * We won't access it here, as it may be detached already.
154 void
155 DestroyTupleQueueReader(TupleQueueReader *reader)
157 pfree(reader);
161 * Fetch a tuple from a tuple queue reader.
163 * The return value is NULL if there are no remaining tuples or if
164 * nowait = true and no tuple is ready to return. *done, if not NULL,
165 * is set to true when there are no remaining tuples and otherwise to false.
167 * The returned tuple, if any, is either in shared memory or a private buffer
168 * and should not be freed. The pointer is invalid after the next call to
169 * TupleQueueReaderNext().
171 * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172 * accumulate bytes from a partially-read message, so it's useful to call
173 * this with nowait = true even if nothing is returned.
175 MinimalTuple
176 TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
178 MinimalTuple tuple;
179 shm_mq_result result;
180 Size nbytes;
181 void *data;
183 if (done != NULL)
184 *done = false;
186 /* Attempt to read a message. */
187 result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
189 /* If queue is detached, set *done and return NULL. */
190 if (result == SHM_MQ_DETACHED)
192 if (done != NULL)
193 *done = true;
194 return NULL;
197 /* In non-blocking mode, bail out if no message ready yet. */
198 if (result == SHM_MQ_WOULD_BLOCK)
199 return NULL;
200 Assert(result == SHM_MQ_SUCCESS);
203 * Return a pointer to the queue memory directly (which had better be
204 * sufficiently aligned).
206 tuple = (MinimalTuple) data;
207 Assert(tuple->t_len == nbytes);
209 return tuple;