Fix a compiler warning in initStringInfo().
[pgsql.git] / src / backend / executor / nodeAppend.c
blob0bd0e4e54d3f2a40575141130320c9ae17de19a6
1 /*-------------------------------------------------------------------------
3 * nodeAppend.c
4 * routines to handle append nodes.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/executor/nodeAppend.c
13 *-------------------------------------------------------------------------
15 /* INTERFACE ROUTINES
16 * ExecInitAppend - initialize the append node
17 * ExecAppend - retrieve the next tuple from the node
18 * ExecEndAppend - shut down the append node
19 * ExecReScanAppend - rescan the append node
21 * NOTES
22 * Each append node contains a list of one or more subplans which
23 * must be iteratively processed (forwards or backwards).
24 * Tuples are retrieved by executing the 'whichplan'th subplan
25 * until the subplan stops returning tuples, at which point that
26 * plan is shut down and the next started up.
28 * Append nodes don't make use of their left and right
29 * subtrees, rather they maintain a list of subplans so
30 * a typical append node looks like this in the plan tree:
32 * ...
33 * /
34 * Append -------+------+------+--- nil
35 * / \ | | |
36 * nil nil ... ... ...
37 * subplans
39 * Append nodes are currently used for unions, and to support
40 * inheritance queries, where several relations need to be scanned.
41 * For example, in our standard person/student/employee/student-emp
42 * example, where student and employee inherit from person
43 * and student-emp inherits from student and employee, the
44 * query:
46 * select name from person
48 * generates the plan:
50 * |
51 * Append -------+-------+--------+--------+
52 * / \ | | | |
53 * nil nil Scan Scan Scan Scan
54 * | | | |
55 * person employee student student-emp
58 #include "postgres.h"
60 #include "executor/execAsync.h"
61 #include "executor/execPartition.h"
62 #include "executor/executor.h"
63 #include "executor/nodeAppend.h"
64 #include "miscadmin.h"
65 #include "pgstat.h"
66 #include "storage/latch.h"
68 /* Shared state for parallel-aware Append. */
69 struct ParallelAppendState
71 LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 int pa_next_plan; /* next plan to choose by any worker */
75 * pa_finished[i] should be true if no more workers should select subplan
76 * i. for a non-partial plan, this should be set to true as soon as a
77 * worker selects the plan; for a partial plan, it remains false until
78 * some worker executes the plan to completion.
80 bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
83 #define INVALID_SUBPLAN_INDEX -1
84 #define EVENT_BUFFER_SIZE 16
86 static TupleTableSlot *ExecAppend(PlanState *pstate);
87 static bool choose_next_subplan_locally(AppendState *node);
88 static bool choose_next_subplan_for_leader(AppendState *node);
89 static bool choose_next_subplan_for_worker(AppendState *node);
90 static void mark_invalid_subplans_as_finished(AppendState *node);
91 static void ExecAppendAsyncBegin(AppendState *node);
92 static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 static void ExecAppendAsyncEventWait(AppendState *node);
95 static void classify_matching_subplans(AppendState *node);
97 /* ----------------------------------------------------------------
98 * ExecInitAppend
100 * Begin all of the subscans of the append node.
102 * (This is potentially wasteful, since the entire result of the
103 * append node may not be scanned, but this way all of the
104 * structures get allocated in the executor's top level memory
105 * block instead of that of the call to ExecAppend.)
106 * ----------------------------------------------------------------
108 AppendState *
109 ExecInitAppend(Append *node, EState *estate, int eflags)
111 AppendState *appendstate = makeNode(AppendState);
112 PlanState **appendplanstates;
113 const TupleTableSlotOps *appendops;
114 Bitmapset *validsubplans;
115 Bitmapset *asyncplans;
116 int nplans;
117 int nasyncplans;
118 int firstvalid;
119 int i,
122 /* check for unsupported flags */
123 Assert(!(eflags & EXEC_FLAG_MARK));
126 * create new AppendState for our append node
128 appendstate->ps.plan = (Plan *) node;
129 appendstate->ps.state = estate;
130 appendstate->ps.ExecProcNode = ExecAppend;
132 /* Let choose_next_subplan_* function handle setting the first subplan */
133 appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
134 appendstate->as_syncdone = false;
135 appendstate->as_begun = false;
137 /* If run-time partition pruning is enabled, then set that up now */
138 if (node->part_prune_info != NULL)
140 PartitionPruneState *prunestate;
143 * Set up pruning data structure. This also initializes the set of
144 * subplans to initialize (validsubplans) by taking into account the
145 * result of performing initial pruning if any.
147 prunestate = ExecInitPartitionPruning(&appendstate->ps,
148 list_length(node->appendplans),
149 node->part_prune_info,
150 &validsubplans);
151 appendstate->as_prune_state = prunestate;
152 nplans = bms_num_members(validsubplans);
155 * When no run-time pruning is required and there's at least one
156 * subplan, we can fill as_valid_subplans immediately, preventing
157 * later calls to ExecFindMatchingSubPlans.
159 if (!prunestate->do_exec_prune && nplans > 0)
161 appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
162 appendstate->as_valid_subplans_identified = true;
165 else
167 nplans = list_length(node->appendplans);
170 * When run-time partition pruning is not enabled we can just mark all
171 * subplans as valid; they must also all be initialized.
173 Assert(nplans > 0);
174 appendstate->as_valid_subplans = validsubplans =
175 bms_add_range(NULL, 0, nplans - 1);
176 appendstate->as_valid_subplans_identified = true;
177 appendstate->as_prune_state = NULL;
180 appendplanstates = (PlanState **) palloc(nplans *
181 sizeof(PlanState *));
184 * call ExecInitNode on each of the valid plans to be executed and save
185 * the results into the appendplanstates array.
187 * While at it, find out the first valid partial plan.
189 j = 0;
190 asyncplans = NULL;
191 nasyncplans = 0;
192 firstvalid = nplans;
193 i = -1;
194 while ((i = bms_next_member(validsubplans, i)) >= 0)
196 Plan *initNode = (Plan *) list_nth(node->appendplans, i);
199 * Record async subplans. When executing EvalPlanQual, we treat them
200 * as sync ones; don't do this when initializing an EvalPlanQual plan
201 * tree.
203 if (initNode->async_capable && estate->es_epq_active == NULL)
205 asyncplans = bms_add_member(asyncplans, j);
206 nasyncplans++;
210 * Record the lowest appendplans index which is a valid partial plan.
212 if (i >= node->first_partial_plan && j < firstvalid)
213 firstvalid = j;
215 appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
218 appendstate->as_first_partial_plan = firstvalid;
219 appendstate->appendplans = appendplanstates;
220 appendstate->as_nplans = nplans;
223 * Initialize Append's result tuple type and slot. If the child plans all
224 * produce the same fixed slot type, we can use that slot type; otherwise
225 * make a virtual slot. (Note that the result slot itself is used only to
226 * return a null tuple at end of execution; real tuples are returned to
227 * the caller in the children's own result slots. What we are doing here
228 * is allowing the parent plan node to optimize if the Append will return
229 * only one kind of slot.)
231 appendops = ExecGetCommonSlotOps(appendplanstates, j);
232 if (appendops != NULL)
234 ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
236 else
238 ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
239 /* show that the output slot type is not fixed */
240 appendstate->ps.resultopsset = true;
241 appendstate->ps.resultopsfixed = false;
244 /* Initialize async state */
245 appendstate->as_asyncplans = asyncplans;
246 appendstate->as_nasyncplans = nasyncplans;
247 appendstate->as_asyncrequests = NULL;
248 appendstate->as_asyncresults = NULL;
249 appendstate->as_nasyncresults = 0;
250 appendstate->as_nasyncremain = 0;
251 appendstate->as_needrequest = NULL;
252 appendstate->as_eventset = NULL;
253 appendstate->as_valid_asyncplans = NULL;
255 if (nasyncplans > 0)
257 appendstate->as_asyncrequests = (AsyncRequest **)
258 palloc0(nplans * sizeof(AsyncRequest *));
260 i = -1;
261 while ((i = bms_next_member(asyncplans, i)) >= 0)
263 AsyncRequest *areq;
265 areq = palloc(sizeof(AsyncRequest));
266 areq->requestor = (PlanState *) appendstate;
267 areq->requestee = appendplanstates[i];
268 areq->request_index = i;
269 areq->callback_pending = false;
270 areq->request_complete = false;
271 areq->result = NULL;
273 appendstate->as_asyncrequests[i] = areq;
276 appendstate->as_asyncresults = (TupleTableSlot **)
277 palloc0(nasyncplans * sizeof(TupleTableSlot *));
279 if (appendstate->as_valid_subplans_identified)
280 classify_matching_subplans(appendstate);
284 * Miscellaneous initialization
287 appendstate->ps.ps_ProjInfo = NULL;
289 /* For parallel query, this will be overridden later. */
290 appendstate->choose_next_subplan = choose_next_subplan_locally;
292 return appendstate;
295 /* ----------------------------------------------------------------
296 * ExecAppend
298 * Handles iteration over multiple subplans.
299 * ----------------------------------------------------------------
301 static TupleTableSlot *
302 ExecAppend(PlanState *pstate)
304 AppendState *node = castNode(AppendState, pstate);
305 TupleTableSlot *result;
308 * If this is the first call after Init or ReScan, we need to do the
309 * initialization work.
311 if (!node->as_begun)
313 Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
314 Assert(!node->as_syncdone);
316 /* Nothing to do if there are no subplans */
317 if (node->as_nplans == 0)
318 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
320 /* If there are any async subplans, begin executing them. */
321 if (node->as_nasyncplans > 0)
322 ExecAppendAsyncBegin(node);
325 * If no sync subplan has been chosen, we must choose one before
326 * proceeding.
328 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
329 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
331 Assert(node->as_syncdone ||
332 (node->as_whichplan >= 0 &&
333 node->as_whichplan < node->as_nplans));
335 /* And we're initialized. */
336 node->as_begun = true;
339 for (;;)
341 PlanState *subnode;
343 CHECK_FOR_INTERRUPTS();
346 * try to get a tuple from an async subplan if any
348 if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
350 if (ExecAppendAsyncGetNext(node, &result))
351 return result;
352 Assert(!node->as_syncdone);
353 Assert(bms_is_empty(node->as_needrequest));
357 * figure out which sync subplan we are currently processing
359 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
360 subnode = node->appendplans[node->as_whichplan];
363 * get a tuple from the subplan
365 result = ExecProcNode(subnode);
367 if (!TupIsNull(result))
370 * If the subplan gave us something then return it as-is. We do
371 * NOT make use of the result slot that was set up in
372 * ExecInitAppend; there's no need for it.
374 return result;
378 * wait or poll for async events if any. We do this before checking
379 * for the end of iteration, because it might drain the remaining
380 * async subplans.
382 if (node->as_nasyncremain > 0)
383 ExecAppendAsyncEventWait(node);
385 /* choose new sync subplan; if no sync/async subplans, we're done */
386 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
387 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
391 /* ----------------------------------------------------------------
392 * ExecEndAppend
394 * Shuts down the subscans of the append node.
396 * Returns nothing of interest.
397 * ----------------------------------------------------------------
399 void
400 ExecEndAppend(AppendState *node)
402 PlanState **appendplans;
403 int nplans;
404 int i;
407 * get information from the node
409 appendplans = node->appendplans;
410 nplans = node->as_nplans;
413 * shut down each of the subscans
415 for (i = 0; i < nplans; i++)
416 ExecEndNode(appendplans[i]);
419 void
420 ExecReScanAppend(AppendState *node)
422 int nasyncplans = node->as_nasyncplans;
423 int i;
426 * If any PARAM_EXEC Params used in pruning expressions have changed, then
427 * we'd better unset the valid subplans so that they are reselected for
428 * the new parameter values.
430 if (node->as_prune_state &&
431 bms_overlap(node->ps.chgParam,
432 node->as_prune_state->execparamids))
434 node->as_valid_subplans_identified = false;
435 bms_free(node->as_valid_subplans);
436 node->as_valid_subplans = NULL;
437 bms_free(node->as_valid_asyncplans);
438 node->as_valid_asyncplans = NULL;
441 for (i = 0; i < node->as_nplans; i++)
443 PlanState *subnode = node->appendplans[i];
446 * ExecReScan doesn't know about my subplans, so I have to do
447 * changed-parameter signaling myself.
449 if (node->ps.chgParam != NULL)
450 UpdateChangedParamSet(subnode, node->ps.chgParam);
453 * If chgParam of subnode is not null then plan will be re-scanned by
454 * first ExecProcNode or by first ExecAsyncRequest.
456 if (subnode->chgParam == NULL)
457 ExecReScan(subnode);
460 /* Reset async state */
461 if (nasyncplans > 0)
463 i = -1;
464 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
466 AsyncRequest *areq = node->as_asyncrequests[i];
468 areq->callback_pending = false;
469 areq->request_complete = false;
470 areq->result = NULL;
473 node->as_nasyncresults = 0;
474 node->as_nasyncremain = 0;
475 bms_free(node->as_needrequest);
476 node->as_needrequest = NULL;
479 /* Let choose_next_subplan_* function handle setting the first subplan */
480 node->as_whichplan = INVALID_SUBPLAN_INDEX;
481 node->as_syncdone = false;
482 node->as_begun = false;
485 /* ----------------------------------------------------------------
486 * Parallel Append Support
487 * ----------------------------------------------------------------
490 /* ----------------------------------------------------------------
491 * ExecAppendEstimate
493 * Compute the amount of space we'll need in the parallel
494 * query DSM, and inform pcxt->estimator about our needs.
495 * ----------------------------------------------------------------
497 void
498 ExecAppendEstimate(AppendState *node,
499 ParallelContext *pcxt)
501 node->pstate_len =
502 add_size(offsetof(ParallelAppendState, pa_finished),
503 sizeof(bool) * node->as_nplans);
505 shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
506 shm_toc_estimate_keys(&pcxt->estimator, 1);
510 /* ----------------------------------------------------------------
511 * ExecAppendInitializeDSM
513 * Set up shared state for Parallel Append.
514 * ----------------------------------------------------------------
516 void
517 ExecAppendInitializeDSM(AppendState *node,
518 ParallelContext *pcxt)
520 ParallelAppendState *pstate;
522 pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
523 memset(pstate, 0, node->pstate_len);
524 LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
525 shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
527 node->as_pstate = pstate;
528 node->choose_next_subplan = choose_next_subplan_for_leader;
531 /* ----------------------------------------------------------------
532 * ExecAppendReInitializeDSM
534 * Reset shared state before beginning a fresh scan.
535 * ----------------------------------------------------------------
537 void
538 ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
540 ParallelAppendState *pstate = node->as_pstate;
542 pstate->pa_next_plan = 0;
543 memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
546 /* ----------------------------------------------------------------
547 * ExecAppendInitializeWorker
549 * Copy relevant information from TOC into planstate, and initialize
550 * whatever is required to choose and execute the optimal subplan.
551 * ----------------------------------------------------------------
553 void
554 ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
556 node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
557 node->choose_next_subplan = choose_next_subplan_for_worker;
560 /* ----------------------------------------------------------------
561 * choose_next_subplan_locally
563 * Choose next sync subplan for a non-parallel-aware Append,
564 * returning false if there are no more.
565 * ----------------------------------------------------------------
567 static bool
568 choose_next_subplan_locally(AppendState *node)
570 int whichplan = node->as_whichplan;
571 int nextplan;
573 /* We should never be called when there are no subplans */
574 Assert(node->as_nplans > 0);
576 /* Nothing to do if syncdone */
577 if (node->as_syncdone)
578 return false;
581 * If first call then have the bms member function choose the first valid
582 * sync subplan by initializing whichplan to -1. If there happen to be no
583 * valid sync subplans then the bms member function will handle that by
584 * returning a negative number which will allow us to exit returning a
585 * false value.
587 if (whichplan == INVALID_SUBPLAN_INDEX)
589 if (node->as_nasyncplans > 0)
591 /* We'd have filled as_valid_subplans already */
592 Assert(node->as_valid_subplans_identified);
594 else if (!node->as_valid_subplans_identified)
596 node->as_valid_subplans =
597 ExecFindMatchingSubPlans(node->as_prune_state, false);
598 node->as_valid_subplans_identified = true;
601 whichplan = -1;
604 /* Ensure whichplan is within the expected range */
605 Assert(whichplan >= -1 && whichplan <= node->as_nplans);
607 if (ScanDirectionIsForward(node->ps.state->es_direction))
608 nextplan = bms_next_member(node->as_valid_subplans, whichplan);
609 else
610 nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
612 if (nextplan < 0)
614 /* Set as_syncdone if in async mode */
615 if (node->as_nasyncplans > 0)
616 node->as_syncdone = true;
617 return false;
620 node->as_whichplan = nextplan;
622 return true;
625 /* ----------------------------------------------------------------
626 * choose_next_subplan_for_leader
628 * Try to pick a plan which doesn't commit us to doing much
629 * work locally, so that as much work as possible is done in
630 * the workers. Cheapest subplans are at the end.
631 * ----------------------------------------------------------------
633 static bool
634 choose_next_subplan_for_leader(AppendState *node)
636 ParallelAppendState *pstate = node->as_pstate;
638 /* Backward scan is not supported by parallel-aware plans */
639 Assert(ScanDirectionIsForward(node->ps.state->es_direction));
641 /* We should never be called when there are no subplans */
642 Assert(node->as_nplans > 0);
644 LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
646 if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
648 /* Mark just-completed subplan as finished. */
649 node->as_pstate->pa_finished[node->as_whichplan] = true;
651 else
653 /* Start with last subplan. */
654 node->as_whichplan = node->as_nplans - 1;
657 * If we've yet to determine the valid subplans then do so now. If
658 * run-time pruning is disabled then the valid subplans will always be
659 * set to all subplans.
661 if (!node->as_valid_subplans_identified)
663 node->as_valid_subplans =
664 ExecFindMatchingSubPlans(node->as_prune_state, false);
665 node->as_valid_subplans_identified = true;
668 * Mark each invalid plan as finished to allow the loop below to
669 * select the first valid subplan.
671 mark_invalid_subplans_as_finished(node);
675 /* Loop until we find a subplan to execute. */
676 while (pstate->pa_finished[node->as_whichplan])
678 if (node->as_whichplan == 0)
680 pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
681 node->as_whichplan = INVALID_SUBPLAN_INDEX;
682 LWLockRelease(&pstate->pa_lock);
683 return false;
687 * We needn't pay attention to as_valid_subplans here as all invalid
688 * plans have been marked as finished.
690 node->as_whichplan--;
693 /* If non-partial, immediately mark as finished. */
694 if (node->as_whichplan < node->as_first_partial_plan)
695 node->as_pstate->pa_finished[node->as_whichplan] = true;
697 LWLockRelease(&pstate->pa_lock);
699 return true;
702 /* ----------------------------------------------------------------
703 * choose_next_subplan_for_worker
705 * Choose next subplan for a parallel-aware Append, returning
706 * false if there are no more.
708 * We start from the first plan and advance through the list;
709 * when we get back to the end, we loop back to the first
710 * partial plan. This assigns the non-partial plans first in
711 * order of descending cost and then spreads out the workers
712 * as evenly as possible across the remaining partial plans.
713 * ----------------------------------------------------------------
715 static bool
716 choose_next_subplan_for_worker(AppendState *node)
718 ParallelAppendState *pstate = node->as_pstate;
720 /* Backward scan is not supported by parallel-aware plans */
721 Assert(ScanDirectionIsForward(node->ps.state->es_direction));
723 /* We should never be called when there are no subplans */
724 Assert(node->as_nplans > 0);
726 LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
728 /* Mark just-completed subplan as finished. */
729 if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
730 node->as_pstate->pa_finished[node->as_whichplan] = true;
733 * If we've yet to determine the valid subplans then do so now. If
734 * run-time pruning is disabled then the valid subplans will always be set
735 * to all subplans.
737 else if (!node->as_valid_subplans_identified)
739 node->as_valid_subplans =
740 ExecFindMatchingSubPlans(node->as_prune_state, false);
741 node->as_valid_subplans_identified = true;
743 mark_invalid_subplans_as_finished(node);
746 /* If all the plans are already done, we have nothing to do */
747 if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
749 LWLockRelease(&pstate->pa_lock);
750 return false;
753 /* Save the plan from which we are starting the search. */
754 node->as_whichplan = pstate->pa_next_plan;
756 /* Loop until we find a valid subplan to execute. */
757 while (pstate->pa_finished[pstate->pa_next_plan])
759 int nextplan;
761 nextplan = bms_next_member(node->as_valid_subplans,
762 pstate->pa_next_plan);
763 if (nextplan >= 0)
765 /* Advance to the next valid plan. */
766 pstate->pa_next_plan = nextplan;
768 else if (node->as_whichplan > node->as_first_partial_plan)
771 * Try looping back to the first valid partial plan, if there is
772 * one. If there isn't, arrange to bail out below.
774 nextplan = bms_next_member(node->as_valid_subplans,
775 node->as_first_partial_plan - 1);
776 pstate->pa_next_plan =
777 nextplan < 0 ? node->as_whichplan : nextplan;
779 else
782 * At last plan, and either there are no partial plans or we've
783 * tried them all. Arrange to bail out.
785 pstate->pa_next_plan = node->as_whichplan;
788 if (pstate->pa_next_plan == node->as_whichplan)
790 /* We've tried everything! */
791 pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
792 LWLockRelease(&pstate->pa_lock);
793 return false;
797 /* Pick the plan we found, and advance pa_next_plan one more time. */
798 node->as_whichplan = pstate->pa_next_plan;
799 pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
800 pstate->pa_next_plan);
803 * If there are no more valid plans then try setting the next plan to the
804 * first valid partial plan.
806 if (pstate->pa_next_plan < 0)
808 int nextplan = bms_next_member(node->as_valid_subplans,
809 node->as_first_partial_plan - 1);
811 if (nextplan >= 0)
812 pstate->pa_next_plan = nextplan;
813 else
816 * There are no valid partial plans, and we already chose the last
817 * non-partial plan; so flag that there's nothing more for our
818 * fellow workers to do.
820 pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
824 /* If non-partial, immediately mark as finished. */
825 if (node->as_whichplan < node->as_first_partial_plan)
826 node->as_pstate->pa_finished[node->as_whichplan] = true;
828 LWLockRelease(&pstate->pa_lock);
830 return true;
834 * mark_invalid_subplans_as_finished
835 * Marks the ParallelAppendState's pa_finished as true for each invalid
836 * subplan.
838 * This function should only be called for parallel Append with run-time
839 * pruning enabled.
841 static void
842 mark_invalid_subplans_as_finished(AppendState *node)
844 int i;
846 /* Only valid to call this while in parallel Append mode */
847 Assert(node->as_pstate);
849 /* Shouldn't have been called when run-time pruning is not enabled */
850 Assert(node->as_prune_state);
852 /* Nothing to do if all plans are valid */
853 if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
854 return;
856 /* Mark all non-valid plans as finished */
857 for (i = 0; i < node->as_nplans; i++)
859 if (!bms_is_member(i, node->as_valid_subplans))
860 node->as_pstate->pa_finished[i] = true;
864 /* ----------------------------------------------------------------
865 * Asynchronous Append Support
866 * ----------------------------------------------------------------
869 /* ----------------------------------------------------------------
870 * ExecAppendAsyncBegin
872 * Begin executing designed async-capable subplans.
873 * ----------------------------------------------------------------
875 static void
876 ExecAppendAsyncBegin(AppendState *node)
878 int i;
880 /* Backward scan is not supported by async-aware Appends. */
881 Assert(ScanDirectionIsForward(node->ps.state->es_direction));
883 /* We should never be called when there are no subplans */
884 Assert(node->as_nplans > 0);
886 /* We should never be called when there are no async subplans. */
887 Assert(node->as_nasyncplans > 0);
889 /* If we've yet to determine the valid subplans then do so now. */
890 if (!node->as_valid_subplans_identified)
892 node->as_valid_subplans =
893 ExecFindMatchingSubPlans(node->as_prune_state, false);
894 node->as_valid_subplans_identified = true;
896 classify_matching_subplans(node);
899 /* Initialize state variables. */
900 node->as_syncdone = bms_is_empty(node->as_valid_subplans);
901 node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
903 /* Nothing to do if there are no valid async subplans. */
904 if (node->as_nasyncremain == 0)
905 return;
907 /* Make a request for each of the valid async subplans. */
908 i = -1;
909 while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
911 AsyncRequest *areq = node->as_asyncrequests[i];
913 Assert(areq->request_index == i);
914 Assert(!areq->callback_pending);
916 /* Do the actual work. */
917 ExecAsyncRequest(areq);
921 /* ----------------------------------------------------------------
922 * ExecAppendAsyncGetNext
924 * Get the next tuple from any of the asynchronous subplans.
925 * ----------------------------------------------------------------
927 static bool
928 ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
930 *result = NULL;
932 /* We should never be called when there are no valid async subplans. */
933 Assert(node->as_nasyncremain > 0);
935 /* Request a tuple asynchronously. */
936 if (ExecAppendAsyncRequest(node, result))
937 return true;
939 while (node->as_nasyncremain > 0)
941 CHECK_FOR_INTERRUPTS();
943 /* Wait or poll for async events. */
944 ExecAppendAsyncEventWait(node);
946 /* Request a tuple asynchronously. */
947 if (ExecAppendAsyncRequest(node, result))
948 return true;
950 /* Break from loop if there's any sync subplan that isn't complete. */
951 if (!node->as_syncdone)
952 break;
956 * If all sync subplans are complete, we're totally done scanning the
957 * given node. Otherwise, we're done with the asynchronous stuff but must
958 * continue scanning the sync subplans.
960 if (node->as_syncdone)
962 Assert(node->as_nasyncremain == 0);
963 *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
964 return true;
967 return false;
970 /* ----------------------------------------------------------------
971 * ExecAppendAsyncRequest
973 * Request a tuple asynchronously.
974 * ----------------------------------------------------------------
976 static bool
977 ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
979 Bitmapset *needrequest;
980 int i;
982 /* Nothing to do if there are no async subplans needing a new request. */
983 if (bms_is_empty(node->as_needrequest))
985 Assert(node->as_nasyncresults == 0);
986 return false;
990 * If there are any asynchronously-generated results that have not yet
991 * been returned, we have nothing to do; just return one of them.
993 if (node->as_nasyncresults > 0)
995 --node->as_nasyncresults;
996 *result = node->as_asyncresults[node->as_nasyncresults];
997 return true;
1000 /* Make a new request for each of the async subplans that need it. */
1001 needrequest = node->as_needrequest;
1002 node->as_needrequest = NULL;
1003 i = -1;
1004 while ((i = bms_next_member(needrequest, i)) >= 0)
1006 AsyncRequest *areq = node->as_asyncrequests[i];
1008 /* Do the actual work. */
1009 ExecAsyncRequest(areq);
1011 bms_free(needrequest);
1013 /* Return one of the asynchronously-generated results if any. */
1014 if (node->as_nasyncresults > 0)
1016 --node->as_nasyncresults;
1017 *result = node->as_asyncresults[node->as_nasyncresults];
1018 return true;
1021 return false;
1024 /* ----------------------------------------------------------------
1025 * ExecAppendAsyncEventWait
1027 * Wait or poll for file descriptor events and fire callbacks.
1028 * ----------------------------------------------------------------
1030 static void
1031 ExecAppendAsyncEventWait(AppendState *node)
1033 int nevents = node->as_nasyncplans + 1;
1034 long timeout = node->as_syncdone ? -1 : 0;
1035 WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1036 int noccurred;
1037 int i;
1039 /* We should never be called when there are no valid async subplans. */
1040 Assert(node->as_nasyncremain > 0);
1042 Assert(node->as_eventset == NULL);
1043 node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1044 AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1045 NULL, NULL);
1047 /* Give each waiting subplan a chance to add an event. */
1048 i = -1;
1049 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1051 AsyncRequest *areq = node->as_asyncrequests[i];
1053 if (areq->callback_pending)
1054 ExecAsyncConfigureWait(areq);
1058 * No need for further processing if there are no configured events other
1059 * than the postmaster death event.
1061 if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1063 FreeWaitEventSet(node->as_eventset);
1064 node->as_eventset = NULL;
1065 return;
1068 /* Return at most EVENT_BUFFER_SIZE events in one call. */
1069 if (nevents > EVENT_BUFFER_SIZE)
1070 nevents = EVENT_BUFFER_SIZE;
1073 * If the timeout is -1, wait until at least one event occurs. If the
1074 * timeout is 0, poll for events, but do not wait at all.
1076 noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1077 nevents, WAIT_EVENT_APPEND_READY);
1078 FreeWaitEventSet(node->as_eventset);
1079 node->as_eventset = NULL;
1080 if (noccurred == 0)
1081 return;
1083 /* Deliver notifications. */
1084 for (i = 0; i < noccurred; i++)
1086 WaitEvent *w = &occurred_event[i];
1089 * Each waiting subplan should have registered its wait event with
1090 * user_data pointing back to its AsyncRequest.
1092 if ((w->events & WL_SOCKET_READABLE) != 0)
1094 AsyncRequest *areq = (AsyncRequest *) w->user_data;
1096 if (areq->callback_pending)
1099 * Mark it as no longer needing a callback. We must do this
1100 * before dispatching the callback in case the callback resets
1101 * the flag.
1103 areq->callback_pending = false;
1105 /* Do the actual work. */
1106 ExecAsyncNotify(areq);
1112 /* ----------------------------------------------------------------
1113 * ExecAsyncAppendResponse
1115 * Receive a response from an asynchronous request we made.
1116 * ----------------------------------------------------------------
1118 void
1119 ExecAsyncAppendResponse(AsyncRequest *areq)
1121 AppendState *node = (AppendState *) areq->requestor;
1122 TupleTableSlot *slot = areq->result;
1124 /* The result should be a TupleTableSlot or NULL. */
1125 Assert(slot == NULL || IsA(slot, TupleTableSlot));
1127 /* Nothing to do if the request is pending. */
1128 if (!areq->request_complete)
1130 /* The request would have been pending for a callback. */
1131 Assert(areq->callback_pending);
1132 return;
1135 /* If the result is NULL or an empty slot, there's nothing more to do. */
1136 if (TupIsNull(slot))
1138 /* The ending subplan wouldn't have been pending for a callback. */
1139 Assert(!areq->callback_pending);
1140 --node->as_nasyncremain;
1141 return;
1144 /* Save result so we can return it. */
1145 Assert(node->as_nasyncresults < node->as_nasyncplans);
1146 node->as_asyncresults[node->as_nasyncresults++] = slot;
1149 * Mark the subplan that returned a result as ready for a new request. We
1150 * don't launch another one here immediately because it might complete.
1152 node->as_needrequest = bms_add_member(node->as_needrequest,
1153 areq->request_index);
1156 /* ----------------------------------------------------------------
1157 * classify_matching_subplans
1159 * Classify the node's as_valid_subplans into sync ones and
1160 * async ones, adjust it to contain sync ones only, and save
1161 * async ones in the node's as_valid_asyncplans.
1162 * ----------------------------------------------------------------
1164 static void
1165 classify_matching_subplans(AppendState *node)
1167 Bitmapset *valid_asyncplans;
1169 Assert(node->as_valid_subplans_identified);
1170 Assert(node->as_valid_asyncplans == NULL);
1172 /* Nothing to do if there are no valid subplans. */
1173 if (bms_is_empty(node->as_valid_subplans))
1175 node->as_syncdone = true;
1176 node->as_nasyncremain = 0;
1177 return;
1180 /* Nothing to do if there are no valid async subplans. */
1181 if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1183 node->as_nasyncremain = 0;
1184 return;
1187 /* Get valid async subplans. */
1188 valid_asyncplans = bms_intersect(node->as_asyncplans,
1189 node->as_valid_subplans);
1191 /* Adjust the valid subplans to contain sync subplans only. */
1192 node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1193 valid_asyncplans);
1195 /* Save valid async subplans. */
1196 node->as_valid_asyncplans = valid_asyncplans;