1 /*-------------------------------------------------------------------------
4 * Support routines for scanning a plan via multiple workers.
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * A Gather executor launches parallel workers to run multiple copies of a
10 * plan. It can also run the plan itself, if the workers are not available
11 * or have not started up yet. It then merges all of the results it produces
12 * and the results from the workers into a single output stream. Therefore,
13 * it will normally be used with a plan where running multiple copies of the
14 * same plan does not produce duplicate output, such as parallel-aware
17 * Alternatively, a Gather node can be configured to use just one worker
18 * and the single-copy flag can be set. In this case, the Gather node will
19 * run the plan in one worker and will not execute the plan itself. In
20 * this case, it simply returns whatever tuples were returned by the worker.
21 * If a worker cannot be obtained, then it will run the plan itself and
22 * return the results. Therefore, a plan used with a single-copy Gather
23 * node need not be parallel-aware.
26 * src/backend/executor/nodeGather.c
28 *-------------------------------------------------------------------------
33 #include "access/relscan.h"
34 #include "access/xact.h"
35 #include "executor/execdebug.h"
36 #include "executor/execParallel.h"
37 #include "executor/nodeGather.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "miscadmin.h"
41 #include "optimizer/optimizer.h"
43 #include "utils/memutils.h"
44 #include "utils/rel.h"
47 static TupleTableSlot
*ExecGather(PlanState
*pstate
);
48 static TupleTableSlot
*gather_getnext(GatherState
*gatherstate
);
49 static MinimalTuple
gather_readnext(GatherState
*gatherstate
);
50 static void ExecShutdownGatherWorkers(GatherState
*node
);
53 /* ----------------------------------------------------------------
55 * ----------------------------------------------------------------
58 ExecInitGather(Gather
*node
, EState
*estate
, int eflags
)
60 GatherState
*gatherstate
;
64 /* Gather node doesn't have innerPlan node. */
65 Assert(innerPlan(node
) == NULL
);
68 * create state structure
70 gatherstate
= makeNode(GatherState
);
71 gatherstate
->ps
.plan
= (Plan
*) node
;
72 gatherstate
->ps
.state
= estate
;
73 gatherstate
->ps
.ExecProcNode
= ExecGather
;
75 gatherstate
->initialized
= false;
76 gatherstate
->need_to_scan_locally
=
77 !node
->single_copy
&& parallel_leader_participation
;
78 gatherstate
->tuples_needed
= -1;
81 * Miscellaneous initialization
83 * create expression context for node
85 ExecAssignExprContext(estate
, &gatherstate
->ps
);
88 * now initialize outer plan
90 outerNode
= outerPlan(node
);
91 outerPlanState(gatherstate
) = ExecInitNode(outerNode
, estate
, eflags
);
92 tupDesc
= ExecGetResultType(outerPlanState(gatherstate
));
95 * Leader may access ExecProcNode result directly (if
96 * need_to_scan_locally), or from workers via tuple queue. So we can't
97 * trivially rely on the slot type being fixed for expressions evaluated
100 gatherstate
->ps
.outeropsset
= true;
101 gatherstate
->ps
.outeropsfixed
= false;
104 * Initialize result type and projection.
106 ExecInitResultTypeTL(&gatherstate
->ps
);
107 ExecConditionalAssignProjectionInfo(&gatherstate
->ps
, tupDesc
, OUTER_VAR
);
110 * Without projections result slot type is not trivially known, see
113 if (gatherstate
->ps
.ps_ProjInfo
== NULL
)
115 gatherstate
->ps
.resultopsset
= true;
116 gatherstate
->ps
.resultopsfixed
= false;
120 * Initialize funnel slot to same tuple descriptor as outer plan.
122 gatherstate
->funnel_slot
= ExecInitExtraTupleSlot(estate
, tupDesc
,
123 &TTSOpsMinimalTuple
);
126 * Gather doesn't support checking a qual (it's always more efficient to
127 * do it in the child node).
129 Assert(!node
->plan
.qual
);
134 /* ----------------------------------------------------------------
137 * Scans the relation via multiple workers and returns
138 * the next qualifying tuple.
139 * ----------------------------------------------------------------
141 static TupleTableSlot
*
142 ExecGather(PlanState
*pstate
)
144 GatherState
*node
= castNode(GatherState
, pstate
);
145 TupleTableSlot
*slot
;
146 ExprContext
*econtext
;
148 CHECK_FOR_INTERRUPTS();
151 * Initialize the parallel context and workers on first execution. We do
152 * this on first execution rather than during node initialization, as it
153 * needs to allocate a large dynamic segment, so it is better to do it
154 * only if it is really needed.
156 if (!node
->initialized
)
158 EState
*estate
= node
->ps
.state
;
159 Gather
*gather
= (Gather
*) node
->ps
.plan
;
162 * Sometimes we might have to run without parallelism; but if parallel
163 * mode is active then we can try to fire up some workers.
165 if (gather
->num_workers
> 0 && estate
->es_use_parallel_mode
)
167 ParallelContext
*pcxt
;
169 /* Initialize, or re-initialize, shared state needed by workers. */
171 node
->pei
= ExecInitParallelPlan(outerPlanState(node
),
175 node
->tuples_needed
);
177 ExecParallelReinitialize(outerPlanState(node
),
182 * Register backend workers. We might not get as many as we
183 * requested, or indeed any at all.
185 pcxt
= node
->pei
->pcxt
;
186 LaunchParallelWorkers(pcxt
);
187 /* We save # workers launched for the benefit of EXPLAIN */
188 node
->nworkers_launched
= pcxt
->nworkers_launched
;
190 /* Set up tuple queue readers to read the results. */
191 if (pcxt
->nworkers_launched
> 0)
193 ExecParallelCreateReaders(node
->pei
);
194 /* Make a working array showing the active readers */
195 node
->nreaders
= pcxt
->nworkers_launched
;
196 node
->reader
= (TupleQueueReader
**)
197 palloc(node
->nreaders
* sizeof(TupleQueueReader
*));
198 memcpy(node
->reader
, node
->pei
->reader
,
199 node
->nreaders
* sizeof(TupleQueueReader
*));
203 /* No workers? Then never mind. */
207 node
->nextreader
= 0;
210 /* Run plan locally if no workers or enabled and not single-copy. */
211 node
->need_to_scan_locally
= (node
->nreaders
== 0)
212 || (!gather
->single_copy
&& parallel_leader_participation
);
213 node
->initialized
= true;
217 * Reset per-tuple memory context to free any expression evaluation
218 * storage allocated in the previous tuple cycle.
220 econtext
= node
->ps
.ps_ExprContext
;
221 ResetExprContext(econtext
);
224 * Get next tuple, either from one of our workers, or by running the plan
227 slot
= gather_getnext(node
);
231 /* If no projection is required, we're done. */
232 if (node
->ps
.ps_ProjInfo
== NULL
)
236 * Form the result tuple using ExecProject(), and return it.
238 econtext
->ecxt_outertuple
= slot
;
239 return ExecProject(node
->ps
.ps_ProjInfo
);
242 /* ----------------------------------------------------------------
245 * frees any storage allocated through C routines.
246 * ----------------------------------------------------------------
249 ExecEndGather(GatherState
*node
)
251 ExecEndNode(outerPlanState(node
)); /* let children clean up first */
252 ExecShutdownGather(node
);
253 ExecFreeExprContext(&node
->ps
);
254 if (node
->ps
.ps_ResultTupleSlot
)
255 ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
259 * Read the next tuple. We might fetch a tuple from one of the tuple queues
260 * using gather_readnext, or if no tuple queue contains a tuple and the
261 * single_copy flag is not set, we might generate one locally instead.
263 static TupleTableSlot
*
264 gather_getnext(GatherState
*gatherstate
)
266 PlanState
*outerPlan
= outerPlanState(gatherstate
);
267 TupleTableSlot
*outerTupleSlot
;
268 TupleTableSlot
*fslot
= gatherstate
->funnel_slot
;
271 while (gatherstate
->nreaders
> 0 || gatherstate
->need_to_scan_locally
)
273 CHECK_FOR_INTERRUPTS();
275 if (gatherstate
->nreaders
> 0)
277 tup
= gather_readnext(gatherstate
);
279 if (HeapTupleIsValid(tup
))
281 ExecStoreMinimalTuple(tup
, /* tuple to store */
282 fslot
, /* slot to store the tuple */
283 false); /* don't pfree tuple */
288 if (gatherstate
->need_to_scan_locally
)
290 EState
*estate
= gatherstate
->ps
.state
;
292 /* Install our DSA area while executing the plan. */
293 estate
->es_query_dsa
=
294 gatherstate
->pei
? gatherstate
->pei
->area
: NULL
;
295 outerTupleSlot
= ExecProcNode(outerPlan
);
296 estate
->es_query_dsa
= NULL
;
298 if (!TupIsNull(outerTupleSlot
))
299 return outerTupleSlot
;
301 gatherstate
->need_to_scan_locally
= false;
305 return ExecClearTuple(fslot
);
309 * Attempt to read a tuple from one of our parallel workers.
312 gather_readnext(GatherState
*gatherstate
)
318 TupleQueueReader
*reader
;
322 /* Check for async events, particularly messages from workers. */
323 CHECK_FOR_INTERRUPTS();
326 * Attempt to read a tuple, but don't block if none is available.
328 * Note that TupleQueueReaderNext will just return NULL for a worker
329 * which fails to initialize. We'll treat that worker as having
330 * produced no tuples; WaitForParallelWorkersToFinish will error out
333 Assert(gatherstate
->nextreader
< gatherstate
->nreaders
);
334 reader
= gatherstate
->reader
[gatherstate
->nextreader
];
335 tup
= TupleQueueReaderNext(reader
, true, &readerdone
);
338 * If this reader is done, remove it from our working array of active
339 * readers. If all readers are done, we're outta here.
344 --gatherstate
->nreaders
;
345 if (gatherstate
->nreaders
== 0)
347 ExecShutdownGatherWorkers(gatherstate
);
350 memmove(&gatherstate
->reader
[gatherstate
->nextreader
],
351 &gatherstate
->reader
[gatherstate
->nextreader
+ 1],
352 sizeof(TupleQueueReader
*)
353 * (gatherstate
->nreaders
- gatherstate
->nextreader
));
354 if (gatherstate
->nextreader
>= gatherstate
->nreaders
)
355 gatherstate
->nextreader
= 0;
359 /* If we got a tuple, return it. */
364 * Advance nextreader pointer in round-robin fashion. Note that we
365 * only reach this code if we weren't able to get a tuple from the
366 * current worker. We used to advance the nextreader pointer after
367 * every tuple, but it turns out to be much more efficient to keep
368 * reading from the same queue until that would require blocking.
370 gatherstate
->nextreader
++;
371 if (gatherstate
->nextreader
>= gatherstate
->nreaders
)
372 gatherstate
->nextreader
= 0;
374 /* Have we visited every (surviving) TupleQueueReader? */
376 if (nvisited
>= gatherstate
->nreaders
)
379 * If (still) running plan locally, return NULL so caller can
380 * generate another tuple from the local copy of the plan.
382 if (gatherstate
->need_to_scan_locally
)
385 /* Nothing to do except wait for developments. */
386 (void) WaitLatch(MyLatch
, WL_LATCH_SET
| WL_EXIT_ON_PM_DEATH
, 0,
387 WAIT_EVENT_EXECUTE_GATHER
);
394 /* ----------------------------------------------------------------
395 * ExecShutdownGatherWorkers
397 * Stop all the parallel workers.
398 * ----------------------------------------------------------------
401 ExecShutdownGatherWorkers(GatherState
*node
)
403 if (node
->pei
!= NULL
)
404 ExecParallelFinish(node
->pei
);
406 /* Flush local copy of reader array */
412 /* ----------------------------------------------------------------
415 * Destroy the setup for parallel workers including parallel context.
416 * ----------------------------------------------------------------
419 ExecShutdownGather(GatherState
*node
)
421 ExecShutdownGatherWorkers(node
);
423 /* Now destroy the parallel context. */
424 if (node
->pei
!= NULL
)
426 ExecParallelCleanup(node
->pei
);
431 /* ----------------------------------------------------------------
433 * ----------------------------------------------------------------
436 /* ----------------------------------------------------------------
439 * Prepare to re-scan the result of a Gather.
440 * ----------------------------------------------------------------
443 ExecReScanGather(GatherState
*node
)
445 Gather
*gather
= (Gather
*) node
->ps
.plan
;
446 PlanState
*outerPlan
= outerPlanState(node
);
448 /* Make sure any existing workers are gracefully shut down */
449 ExecShutdownGatherWorkers(node
);
451 /* Mark node so that shared state will be rebuilt at next call */
452 node
->initialized
= false;
455 * Set child node's chgParam to tell it that the next scan might deliver a
456 * different set of rows within the leader process. (The overall rowset
457 * shouldn't change, but the leader process's subset might; hence nodes
458 * between here and the parallel table scan node mustn't optimize on the
459 * assumption of an unchanging rowset.)
461 if (gather
->rescan_param
>= 0)
462 outerPlan
->chgParam
= bms_add_member(outerPlan
->chgParam
,
463 gather
->rescan_param
);
466 * If chgParam of subnode is not null then plan will be re-scanned by
467 * first ExecProcNode. Note: because this does nothing if we have a
468 * rescan_param, it's currently guaranteed that parallel-aware child nodes
469 * will not see a ReScan call until after they get a ReInitializeDSM call.
470 * That ordering might not be something to rely on, though. A good rule
471 * of thumb is that ReInitializeDSM should reset only shared state, ReScan
472 * should reset only local state, and anything that depends on both of
473 * those steps being finished must wait until the first ExecProcNode call.
475 if (outerPlan
->chgParam
== NULL
)
476 ExecReScan(outerPlan
);