1 /*-------------------------------------------------------------------------
4 * routines to handle append nodes.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeAppend.c
13 *-------------------------------------------------------------------------
16 * ExecInitAppend - initialize the append node
17 * ExecAppend - retrieve the next tuple from the node
18 * ExecEndAppend - shut down the append node
19 * ExecReScanAppend - rescan the append node
22 * Each append node contains a list of one or more subplans which
23 * must be iteratively processed (forwards or backwards).
24 * Tuples are retrieved by executing the 'whichplan'th subplan
25 * until the subplan stops returning tuples, at which point that
26 * plan is shut down and the next started up.
28 * Append nodes don't make use of their left and right
29 * subtrees, rather they maintain a list of subplans so
30 * a typical append node looks like this in the plan tree:
34 * Append -------+------+------+--- nil
39 * Append nodes are currently used for unions, and to support
40 * inheritance queries, where several relations need to be scanned.
41 * For example, in our standard person/student/employee/student-emp
42 * example, where student and employee inherit from person
43 * and student-emp inherits from student and employee, the
46 * select name from person
51 * Append -------+-------+--------+--------+
53 * nil nil Scan Scan Scan Scan
55 * person employee student student-emp
60 #include "executor/execAsync.h"
61 #include "executor/execPartition.h"
62 #include "executor/executor.h"
63 #include "executor/nodeAppend.h"
64 #include "miscadmin.h"
66 #include "storage/latch.h"
68 /* Shared state for parallel-aware Append. */
69 struct ParallelAppendState
71 LWLock pa_lock
; /* mutual exclusion to choose next subplan */
72 int pa_next_plan
; /* next plan to choose by any worker */
75 * pa_finished[i] should be true if no more workers should select subplan
76 * i. for a non-partial plan, this should be set to true as soon as a
77 * worker selects the plan; for a partial plan, it remains false until
78 * some worker executes the plan to completion.
80 bool pa_finished
[FLEXIBLE_ARRAY_MEMBER
];
83 #define INVALID_SUBPLAN_INDEX -1
84 #define EVENT_BUFFER_SIZE 16
86 static TupleTableSlot
*ExecAppend(PlanState
*pstate
);
87 static bool choose_next_subplan_locally(AppendState
*node
);
88 static bool choose_next_subplan_for_leader(AppendState
*node
);
89 static bool choose_next_subplan_for_worker(AppendState
*node
);
90 static void mark_invalid_subplans_as_finished(AppendState
*node
);
91 static void ExecAppendAsyncBegin(AppendState
*node
);
92 static bool ExecAppendAsyncGetNext(AppendState
*node
, TupleTableSlot
**result
);
93 static bool ExecAppendAsyncRequest(AppendState
*node
, TupleTableSlot
**result
);
94 static void ExecAppendAsyncEventWait(AppendState
*node
);
95 static void classify_matching_subplans(AppendState
*node
);
97 /* ----------------------------------------------------------------
100 * Begin all of the subscans of the append node.
102 * (This is potentially wasteful, since the entire result of the
103 * append node may not be scanned, but this way all of the
104 * structures get allocated in the executor's top level memory
105 * block instead of that of the call to ExecAppend.)
106 * ----------------------------------------------------------------
109 ExecInitAppend(Append
*node
, EState
*estate
, int eflags
)
111 AppendState
*appendstate
= makeNode(AppendState
);
112 PlanState
**appendplanstates
;
113 const TupleTableSlotOps
*appendops
;
114 Bitmapset
*validsubplans
;
115 Bitmapset
*asyncplans
;
122 /* check for unsupported flags */
123 Assert(!(eflags
& EXEC_FLAG_MARK
));
126 * create new AppendState for our append node
128 appendstate
->ps
.plan
= (Plan
*) node
;
129 appendstate
->ps
.state
= estate
;
130 appendstate
->ps
.ExecProcNode
= ExecAppend
;
132 /* Let choose_next_subplan_* function handle setting the first subplan */
133 appendstate
->as_whichplan
= INVALID_SUBPLAN_INDEX
;
134 appendstate
->as_syncdone
= false;
135 appendstate
->as_begun
= false;
137 /* If run-time partition pruning is enabled, then set that up now */
138 if (node
->part_prune_index
>= 0)
140 PartitionPruneState
*prunestate
;
143 * Set up pruning data structure. This also initializes the set of
144 * subplans to initialize (validsubplans) by taking into account the
145 * result of performing initial pruning if any.
147 prunestate
= ExecInitPartitionExecPruning(&appendstate
->ps
,
148 list_length(node
->appendplans
),
149 node
->part_prune_index
,
152 appendstate
->as_prune_state
= prunestate
;
153 nplans
= bms_num_members(validsubplans
);
156 * When no run-time pruning is required and there's at least one
157 * subplan, we can fill as_valid_subplans immediately, preventing
158 * later calls to ExecFindMatchingSubPlans.
160 if (!prunestate
->do_exec_prune
&& nplans
> 0)
162 appendstate
->as_valid_subplans
= bms_add_range(NULL
, 0, nplans
- 1);
163 appendstate
->as_valid_subplans_identified
= true;
168 nplans
= list_length(node
->appendplans
);
171 * When run-time partition pruning is not enabled we can just mark all
172 * subplans as valid; they must also all be initialized.
175 appendstate
->as_valid_subplans
= validsubplans
=
176 bms_add_range(NULL
, 0, nplans
- 1);
177 appendstate
->as_valid_subplans_identified
= true;
178 appendstate
->as_prune_state
= NULL
;
181 appendplanstates
= (PlanState
**) palloc(nplans
*
182 sizeof(PlanState
*));
185 * call ExecInitNode on each of the valid plans to be executed and save
186 * the results into the appendplanstates array.
188 * While at it, find out the first valid partial plan.
195 while ((i
= bms_next_member(validsubplans
, i
)) >= 0)
197 Plan
*initNode
= (Plan
*) list_nth(node
->appendplans
, i
);
200 * Record async subplans. When executing EvalPlanQual, we treat them
201 * as sync ones; don't do this when initializing an EvalPlanQual plan
204 if (initNode
->async_capable
&& estate
->es_epq_active
== NULL
)
206 asyncplans
= bms_add_member(asyncplans
, j
);
211 * Record the lowest appendplans index which is a valid partial plan.
213 if (i
>= node
->first_partial_plan
&& j
< firstvalid
)
216 appendplanstates
[j
++] = ExecInitNode(initNode
, estate
, eflags
);
219 appendstate
->as_first_partial_plan
= firstvalid
;
220 appendstate
->appendplans
= appendplanstates
;
221 appendstate
->as_nplans
= nplans
;
224 * Initialize Append's result tuple type and slot. If the child plans all
225 * produce the same fixed slot type, we can use that slot type; otherwise
226 * make a virtual slot. (Note that the result slot itself is used only to
227 * return a null tuple at end of execution; real tuples are returned to
228 * the caller in the children's own result slots. What we are doing here
229 * is allowing the parent plan node to optimize if the Append will return
230 * only one kind of slot.)
232 appendops
= ExecGetCommonSlotOps(appendplanstates
, j
);
233 if (appendops
!= NULL
)
235 ExecInitResultTupleSlotTL(&appendstate
->ps
, appendops
);
239 ExecInitResultTupleSlotTL(&appendstate
->ps
, &TTSOpsVirtual
);
240 /* show that the output slot type is not fixed */
241 appendstate
->ps
.resultopsset
= true;
242 appendstate
->ps
.resultopsfixed
= false;
245 /* Initialize async state */
246 appendstate
->as_asyncplans
= asyncplans
;
247 appendstate
->as_nasyncplans
= nasyncplans
;
248 appendstate
->as_asyncrequests
= NULL
;
249 appendstate
->as_asyncresults
= NULL
;
250 appendstate
->as_nasyncresults
= 0;
251 appendstate
->as_nasyncremain
= 0;
252 appendstate
->as_needrequest
= NULL
;
253 appendstate
->as_eventset
= NULL
;
254 appendstate
->as_valid_asyncplans
= NULL
;
258 appendstate
->as_asyncrequests
= (AsyncRequest
**)
259 palloc0(nplans
* sizeof(AsyncRequest
*));
262 while ((i
= bms_next_member(asyncplans
, i
)) >= 0)
266 areq
= palloc(sizeof(AsyncRequest
));
267 areq
->requestor
= (PlanState
*) appendstate
;
268 areq
->requestee
= appendplanstates
[i
];
269 areq
->request_index
= i
;
270 areq
->callback_pending
= false;
271 areq
->request_complete
= false;
274 appendstate
->as_asyncrequests
[i
] = areq
;
277 appendstate
->as_asyncresults
= (TupleTableSlot
**)
278 palloc0(nasyncplans
* sizeof(TupleTableSlot
*));
280 if (appendstate
->as_valid_subplans_identified
)
281 classify_matching_subplans(appendstate
);
285 * Miscellaneous initialization
288 appendstate
->ps
.ps_ProjInfo
= NULL
;
290 /* For parallel query, this will be overridden later. */
291 appendstate
->choose_next_subplan
= choose_next_subplan_locally
;
296 /* ----------------------------------------------------------------
299 * Handles iteration over multiple subplans.
300 * ----------------------------------------------------------------
302 static TupleTableSlot
*
303 ExecAppend(PlanState
*pstate
)
305 AppendState
*node
= castNode(AppendState
, pstate
);
306 TupleTableSlot
*result
;
309 * If this is the first call after Init or ReScan, we need to do the
310 * initialization work.
314 Assert(node
->as_whichplan
== INVALID_SUBPLAN_INDEX
);
315 Assert(!node
->as_syncdone
);
317 /* Nothing to do if there are no subplans */
318 if (node
->as_nplans
== 0)
319 return ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
321 /* If there are any async subplans, begin executing them. */
322 if (node
->as_nasyncplans
> 0)
323 ExecAppendAsyncBegin(node
);
326 * If no sync subplan has been chosen, we must choose one before
329 if (!node
->choose_next_subplan(node
) && node
->as_nasyncremain
== 0)
330 return ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
332 Assert(node
->as_syncdone
||
333 (node
->as_whichplan
>= 0 &&
334 node
->as_whichplan
< node
->as_nplans
));
336 /* And we're initialized. */
337 node
->as_begun
= true;
344 CHECK_FOR_INTERRUPTS();
347 * try to get a tuple from an async subplan if any
349 if (node
->as_syncdone
|| !bms_is_empty(node
->as_needrequest
))
351 if (ExecAppendAsyncGetNext(node
, &result
))
353 Assert(!node
->as_syncdone
);
354 Assert(bms_is_empty(node
->as_needrequest
));
358 * figure out which sync subplan we are currently processing
360 Assert(node
->as_whichplan
>= 0 && node
->as_whichplan
< node
->as_nplans
);
361 subnode
= node
->appendplans
[node
->as_whichplan
];
364 * get a tuple from the subplan
366 result
= ExecProcNode(subnode
);
368 if (!TupIsNull(result
))
371 * If the subplan gave us something then return it as-is. We do
372 * NOT make use of the result slot that was set up in
373 * ExecInitAppend; there's no need for it.
379 * wait or poll for async events if any. We do this before checking
380 * for the end of iteration, because it might drain the remaining
383 if (node
->as_nasyncremain
> 0)
384 ExecAppendAsyncEventWait(node
);
386 /* choose new sync subplan; if no sync/async subplans, we're done */
387 if (!node
->choose_next_subplan(node
) && node
->as_nasyncremain
== 0)
388 return ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
392 /* ----------------------------------------------------------------
395 * Shuts down the subscans of the append node.
397 * Returns nothing of interest.
398 * ----------------------------------------------------------------
401 ExecEndAppend(AppendState
*node
)
403 PlanState
**appendplans
;
408 * get information from the node
410 appendplans
= node
->appendplans
;
411 nplans
= node
->as_nplans
;
414 * shut down each of the subscans
416 for (i
= 0; i
< nplans
; i
++)
417 ExecEndNode(appendplans
[i
]);
421 ExecReScanAppend(AppendState
*node
)
423 int nasyncplans
= node
->as_nasyncplans
;
427 * If any PARAM_EXEC Params used in pruning expressions have changed, then
428 * we'd better unset the valid subplans so that they are reselected for
429 * the new parameter values.
431 if (node
->as_prune_state
&&
432 bms_overlap(node
->ps
.chgParam
,
433 node
->as_prune_state
->execparamids
))
435 node
->as_valid_subplans_identified
= false;
436 bms_free(node
->as_valid_subplans
);
437 node
->as_valid_subplans
= NULL
;
438 bms_free(node
->as_valid_asyncplans
);
439 node
->as_valid_asyncplans
= NULL
;
442 for (i
= 0; i
< node
->as_nplans
; i
++)
444 PlanState
*subnode
= node
->appendplans
[i
];
447 * ExecReScan doesn't know about my subplans, so I have to do
448 * changed-parameter signaling myself.
450 if (node
->ps
.chgParam
!= NULL
)
451 UpdateChangedParamSet(subnode
, node
->ps
.chgParam
);
454 * If chgParam of subnode is not null then plan will be re-scanned by
455 * first ExecProcNode or by first ExecAsyncRequest.
457 if (subnode
->chgParam
== NULL
)
461 /* Reset async state */
465 while ((i
= bms_next_member(node
->as_asyncplans
, i
)) >= 0)
467 AsyncRequest
*areq
= node
->as_asyncrequests
[i
];
469 areq
->callback_pending
= false;
470 areq
->request_complete
= false;
474 node
->as_nasyncresults
= 0;
475 node
->as_nasyncremain
= 0;
476 bms_free(node
->as_needrequest
);
477 node
->as_needrequest
= NULL
;
480 /* Let choose_next_subplan_* function handle setting the first subplan */
481 node
->as_whichplan
= INVALID_SUBPLAN_INDEX
;
482 node
->as_syncdone
= false;
483 node
->as_begun
= false;
486 /* ----------------------------------------------------------------
487 * Parallel Append Support
488 * ----------------------------------------------------------------
491 /* ----------------------------------------------------------------
494 * Compute the amount of space we'll need in the parallel
495 * query DSM, and inform pcxt->estimator about our needs.
496 * ----------------------------------------------------------------
499 ExecAppendEstimate(AppendState
*node
,
500 ParallelContext
*pcxt
)
503 add_size(offsetof(ParallelAppendState
, pa_finished
),
504 sizeof(bool) * node
->as_nplans
);
506 shm_toc_estimate_chunk(&pcxt
->estimator
, node
->pstate_len
);
507 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
511 /* ----------------------------------------------------------------
512 * ExecAppendInitializeDSM
514 * Set up shared state for Parallel Append.
515 * ----------------------------------------------------------------
518 ExecAppendInitializeDSM(AppendState
*node
,
519 ParallelContext
*pcxt
)
521 ParallelAppendState
*pstate
;
523 pstate
= shm_toc_allocate(pcxt
->toc
, node
->pstate_len
);
524 memset(pstate
, 0, node
->pstate_len
);
525 LWLockInitialize(&pstate
->pa_lock
, LWTRANCHE_PARALLEL_APPEND
);
526 shm_toc_insert(pcxt
->toc
, node
->ps
.plan
->plan_node_id
, pstate
);
528 node
->as_pstate
= pstate
;
529 node
->choose_next_subplan
= choose_next_subplan_for_leader
;
532 /* ----------------------------------------------------------------
533 * ExecAppendReInitializeDSM
535 * Reset shared state before beginning a fresh scan.
536 * ----------------------------------------------------------------
539 ExecAppendReInitializeDSM(AppendState
*node
, ParallelContext
*pcxt
)
541 ParallelAppendState
*pstate
= node
->as_pstate
;
543 pstate
->pa_next_plan
= 0;
544 memset(pstate
->pa_finished
, 0, sizeof(bool) * node
->as_nplans
);
547 /* ----------------------------------------------------------------
548 * ExecAppendInitializeWorker
550 * Copy relevant information from TOC into planstate, and initialize
551 * whatever is required to choose and execute the optimal subplan.
552 * ----------------------------------------------------------------
555 ExecAppendInitializeWorker(AppendState
*node
, ParallelWorkerContext
*pwcxt
)
557 node
->as_pstate
= shm_toc_lookup(pwcxt
->toc
, node
->ps
.plan
->plan_node_id
, false);
558 node
->choose_next_subplan
= choose_next_subplan_for_worker
;
561 /* ----------------------------------------------------------------
562 * choose_next_subplan_locally
564 * Choose next sync subplan for a non-parallel-aware Append,
565 * returning false if there are no more.
566 * ----------------------------------------------------------------
569 choose_next_subplan_locally(AppendState
*node
)
571 int whichplan
= node
->as_whichplan
;
574 /* We should never be called when there are no subplans */
575 Assert(node
->as_nplans
> 0);
577 /* Nothing to do if syncdone */
578 if (node
->as_syncdone
)
582 * If first call then have the bms member function choose the first valid
583 * sync subplan by initializing whichplan to -1. If there happen to be no
584 * valid sync subplans then the bms member function will handle that by
585 * returning a negative number which will allow us to exit returning a
588 if (whichplan
== INVALID_SUBPLAN_INDEX
)
590 if (node
->as_nasyncplans
> 0)
592 /* We'd have filled as_valid_subplans already */
593 Assert(node
->as_valid_subplans_identified
);
595 else if (!node
->as_valid_subplans_identified
)
597 node
->as_valid_subplans
=
598 ExecFindMatchingSubPlans(node
->as_prune_state
, false);
599 node
->as_valid_subplans_identified
= true;
605 /* Ensure whichplan is within the expected range */
606 Assert(whichplan
>= -1 && whichplan
<= node
->as_nplans
);
608 if (ScanDirectionIsForward(node
->ps
.state
->es_direction
))
609 nextplan
= bms_next_member(node
->as_valid_subplans
, whichplan
);
611 nextplan
= bms_prev_member(node
->as_valid_subplans
, whichplan
);
615 /* Set as_syncdone if in async mode */
616 if (node
->as_nasyncplans
> 0)
617 node
->as_syncdone
= true;
621 node
->as_whichplan
= nextplan
;
626 /* ----------------------------------------------------------------
627 * choose_next_subplan_for_leader
629 * Try to pick a plan which doesn't commit us to doing much
630 * work locally, so that as much work as possible is done in
631 * the workers. Cheapest subplans are at the end.
632 * ----------------------------------------------------------------
635 choose_next_subplan_for_leader(AppendState
*node
)
637 ParallelAppendState
*pstate
= node
->as_pstate
;
639 /* Backward scan is not supported by parallel-aware plans */
640 Assert(ScanDirectionIsForward(node
->ps
.state
->es_direction
));
642 /* We should never be called when there are no subplans */
643 Assert(node
->as_nplans
> 0);
645 LWLockAcquire(&pstate
->pa_lock
, LW_EXCLUSIVE
);
647 if (node
->as_whichplan
!= INVALID_SUBPLAN_INDEX
)
649 /* Mark just-completed subplan as finished. */
650 node
->as_pstate
->pa_finished
[node
->as_whichplan
] = true;
654 /* Start with last subplan. */
655 node
->as_whichplan
= node
->as_nplans
- 1;
658 * If we've yet to determine the valid subplans then do so now. If
659 * run-time pruning is disabled then the valid subplans will always be
660 * set to all subplans.
662 if (!node
->as_valid_subplans_identified
)
664 node
->as_valid_subplans
=
665 ExecFindMatchingSubPlans(node
->as_prune_state
, false);
666 node
->as_valid_subplans_identified
= true;
669 * Mark each invalid plan as finished to allow the loop below to
670 * select the first valid subplan.
672 mark_invalid_subplans_as_finished(node
);
676 /* Loop until we find a subplan to execute. */
677 while (pstate
->pa_finished
[node
->as_whichplan
])
679 if (node
->as_whichplan
== 0)
681 pstate
->pa_next_plan
= INVALID_SUBPLAN_INDEX
;
682 node
->as_whichplan
= INVALID_SUBPLAN_INDEX
;
683 LWLockRelease(&pstate
->pa_lock
);
688 * We needn't pay attention to as_valid_subplans here as all invalid
689 * plans have been marked as finished.
691 node
->as_whichplan
--;
694 /* If non-partial, immediately mark as finished. */
695 if (node
->as_whichplan
< node
->as_first_partial_plan
)
696 node
->as_pstate
->pa_finished
[node
->as_whichplan
] = true;
698 LWLockRelease(&pstate
->pa_lock
);
703 /* ----------------------------------------------------------------
704 * choose_next_subplan_for_worker
706 * Choose next subplan for a parallel-aware Append, returning
707 * false if there are no more.
709 * We start from the first plan and advance through the list;
710 * when we get back to the end, we loop back to the first
711 * partial plan. This assigns the non-partial plans first in
712 * order of descending cost and then spreads out the workers
713 * as evenly as possible across the remaining partial plans.
714 * ----------------------------------------------------------------
717 choose_next_subplan_for_worker(AppendState
*node
)
719 ParallelAppendState
*pstate
= node
->as_pstate
;
721 /* Backward scan is not supported by parallel-aware plans */
722 Assert(ScanDirectionIsForward(node
->ps
.state
->es_direction
));
724 /* We should never be called when there are no subplans */
725 Assert(node
->as_nplans
> 0);
727 LWLockAcquire(&pstate
->pa_lock
, LW_EXCLUSIVE
);
729 /* Mark just-completed subplan as finished. */
730 if (node
->as_whichplan
!= INVALID_SUBPLAN_INDEX
)
731 node
->as_pstate
->pa_finished
[node
->as_whichplan
] = true;
734 * If we've yet to determine the valid subplans then do so now. If
735 * run-time pruning is disabled then the valid subplans will always be set
738 else if (!node
->as_valid_subplans_identified
)
740 node
->as_valid_subplans
=
741 ExecFindMatchingSubPlans(node
->as_prune_state
, false);
742 node
->as_valid_subplans_identified
= true;
744 mark_invalid_subplans_as_finished(node
);
747 /* If all the plans are already done, we have nothing to do */
748 if (pstate
->pa_next_plan
== INVALID_SUBPLAN_INDEX
)
750 LWLockRelease(&pstate
->pa_lock
);
754 /* Save the plan from which we are starting the search. */
755 node
->as_whichplan
= pstate
->pa_next_plan
;
757 /* Loop until we find a valid subplan to execute. */
758 while (pstate
->pa_finished
[pstate
->pa_next_plan
])
762 nextplan
= bms_next_member(node
->as_valid_subplans
,
763 pstate
->pa_next_plan
);
766 /* Advance to the next valid plan. */
767 pstate
->pa_next_plan
= nextplan
;
769 else if (node
->as_whichplan
> node
->as_first_partial_plan
)
772 * Try looping back to the first valid partial plan, if there is
773 * one. If there isn't, arrange to bail out below.
775 nextplan
= bms_next_member(node
->as_valid_subplans
,
776 node
->as_first_partial_plan
- 1);
777 pstate
->pa_next_plan
=
778 nextplan
< 0 ? node
->as_whichplan
: nextplan
;
783 * At last plan, and either there are no partial plans or we've
784 * tried them all. Arrange to bail out.
786 pstate
->pa_next_plan
= node
->as_whichplan
;
789 if (pstate
->pa_next_plan
== node
->as_whichplan
)
791 /* We've tried everything! */
792 pstate
->pa_next_plan
= INVALID_SUBPLAN_INDEX
;
793 LWLockRelease(&pstate
->pa_lock
);
798 /* Pick the plan we found, and advance pa_next_plan one more time. */
799 node
->as_whichplan
= pstate
->pa_next_plan
;
800 pstate
->pa_next_plan
= bms_next_member(node
->as_valid_subplans
,
801 pstate
->pa_next_plan
);
804 * If there are no more valid plans then try setting the next plan to the
805 * first valid partial plan.
807 if (pstate
->pa_next_plan
< 0)
809 int nextplan
= bms_next_member(node
->as_valid_subplans
,
810 node
->as_first_partial_plan
- 1);
813 pstate
->pa_next_plan
= nextplan
;
817 * There are no valid partial plans, and we already chose the last
818 * non-partial plan; so flag that there's nothing more for our
819 * fellow workers to do.
821 pstate
->pa_next_plan
= INVALID_SUBPLAN_INDEX
;
825 /* If non-partial, immediately mark as finished. */
826 if (node
->as_whichplan
< node
->as_first_partial_plan
)
827 node
->as_pstate
->pa_finished
[node
->as_whichplan
] = true;
829 LWLockRelease(&pstate
->pa_lock
);
835 * mark_invalid_subplans_as_finished
836 * Marks the ParallelAppendState's pa_finished as true for each invalid
839 * This function should only be called for parallel Append with run-time
843 mark_invalid_subplans_as_finished(AppendState
*node
)
847 /* Only valid to call this while in parallel Append mode */
848 Assert(node
->as_pstate
);
850 /* Shouldn't have been called when run-time pruning is not enabled */
851 Assert(node
->as_prune_state
);
853 /* Nothing to do if all plans are valid */
854 if (bms_num_members(node
->as_valid_subplans
) == node
->as_nplans
)
857 /* Mark all non-valid plans as finished */
858 for (i
= 0; i
< node
->as_nplans
; i
++)
860 if (!bms_is_member(i
, node
->as_valid_subplans
))
861 node
->as_pstate
->pa_finished
[i
] = true;
865 /* ----------------------------------------------------------------
866 * Asynchronous Append Support
867 * ----------------------------------------------------------------
870 /* ----------------------------------------------------------------
871 * ExecAppendAsyncBegin
873 * Begin executing designed async-capable subplans.
874 * ----------------------------------------------------------------
877 ExecAppendAsyncBegin(AppendState
*node
)
881 /* Backward scan is not supported by async-aware Appends. */
882 Assert(ScanDirectionIsForward(node
->ps
.state
->es_direction
));
884 /* We should never be called when there are no subplans */
885 Assert(node
->as_nplans
> 0);
887 /* We should never be called when there are no async subplans. */
888 Assert(node
->as_nasyncplans
> 0);
890 /* If we've yet to determine the valid subplans then do so now. */
891 if (!node
->as_valid_subplans_identified
)
893 node
->as_valid_subplans
=
894 ExecFindMatchingSubPlans(node
->as_prune_state
, false);
895 node
->as_valid_subplans_identified
= true;
897 classify_matching_subplans(node
);
900 /* Initialize state variables. */
901 node
->as_syncdone
= bms_is_empty(node
->as_valid_subplans
);
902 node
->as_nasyncremain
= bms_num_members(node
->as_valid_asyncplans
);
904 /* Nothing to do if there are no valid async subplans. */
905 if (node
->as_nasyncremain
== 0)
908 /* Make a request for each of the valid async subplans. */
910 while ((i
= bms_next_member(node
->as_valid_asyncplans
, i
)) >= 0)
912 AsyncRequest
*areq
= node
->as_asyncrequests
[i
];
914 Assert(areq
->request_index
== i
);
915 Assert(!areq
->callback_pending
);
917 /* Do the actual work. */
918 ExecAsyncRequest(areq
);
922 /* ----------------------------------------------------------------
923 * ExecAppendAsyncGetNext
925 * Get the next tuple from any of the asynchronous subplans.
926 * ----------------------------------------------------------------
929 ExecAppendAsyncGetNext(AppendState
*node
, TupleTableSlot
**result
)
933 /* We should never be called when there are no valid async subplans. */
934 Assert(node
->as_nasyncremain
> 0);
936 /* Request a tuple asynchronously. */
937 if (ExecAppendAsyncRequest(node
, result
))
940 while (node
->as_nasyncremain
> 0)
942 CHECK_FOR_INTERRUPTS();
944 /* Wait or poll for async events. */
945 ExecAppendAsyncEventWait(node
);
947 /* Request a tuple asynchronously. */
948 if (ExecAppendAsyncRequest(node
, result
))
951 /* Break from loop if there's any sync subplan that isn't complete. */
952 if (!node
->as_syncdone
)
957 * If all sync subplans are complete, we're totally done scanning the
958 * given node. Otherwise, we're done with the asynchronous stuff but must
959 * continue scanning the sync subplans.
961 if (node
->as_syncdone
)
963 Assert(node
->as_nasyncremain
== 0);
964 *result
= ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
971 /* ----------------------------------------------------------------
972 * ExecAppendAsyncRequest
974 * Request a tuple asynchronously.
975 * ----------------------------------------------------------------
978 ExecAppendAsyncRequest(AppendState
*node
, TupleTableSlot
**result
)
980 Bitmapset
*needrequest
;
983 /* Nothing to do if there are no async subplans needing a new request. */
984 if (bms_is_empty(node
->as_needrequest
))
986 Assert(node
->as_nasyncresults
== 0);
991 * If there are any asynchronously-generated results that have not yet
992 * been returned, we have nothing to do; just return one of them.
994 if (node
->as_nasyncresults
> 0)
996 --node
->as_nasyncresults
;
997 *result
= node
->as_asyncresults
[node
->as_nasyncresults
];
1001 /* Make a new request for each of the async subplans that need it. */
1002 needrequest
= node
->as_needrequest
;
1003 node
->as_needrequest
= NULL
;
1005 while ((i
= bms_next_member(needrequest
, i
)) >= 0)
1007 AsyncRequest
*areq
= node
->as_asyncrequests
[i
];
1009 /* Do the actual work. */
1010 ExecAsyncRequest(areq
);
1012 bms_free(needrequest
);
1014 /* Return one of the asynchronously-generated results if any. */
1015 if (node
->as_nasyncresults
> 0)
1017 --node
->as_nasyncresults
;
1018 *result
= node
->as_asyncresults
[node
->as_nasyncresults
];
1025 /* ----------------------------------------------------------------
1026 * ExecAppendAsyncEventWait
1028 * Wait or poll for file descriptor events and fire callbacks.
1029 * ----------------------------------------------------------------
1032 ExecAppendAsyncEventWait(AppendState
*node
)
1034 int nevents
= node
->as_nasyncplans
+ 1;
1035 long timeout
= node
->as_syncdone
? -1 : 0;
1036 WaitEvent occurred_event
[EVENT_BUFFER_SIZE
];
1040 /* We should never be called when there are no valid async subplans. */
1041 Assert(node
->as_nasyncremain
> 0);
1043 Assert(node
->as_eventset
== NULL
);
1044 node
->as_eventset
= CreateWaitEventSet(CurrentResourceOwner
, nevents
);
1045 AddWaitEventToSet(node
->as_eventset
, WL_EXIT_ON_PM_DEATH
, PGINVALID_SOCKET
,
1048 /* Give each waiting subplan a chance to add an event. */
1050 while ((i
= bms_next_member(node
->as_asyncplans
, i
)) >= 0)
1052 AsyncRequest
*areq
= node
->as_asyncrequests
[i
];
1054 if (areq
->callback_pending
)
1055 ExecAsyncConfigureWait(areq
);
1059 * No need for further processing if there are no configured events other
1060 * than the postmaster death event.
1062 if (GetNumRegisteredWaitEvents(node
->as_eventset
) == 1)
1064 FreeWaitEventSet(node
->as_eventset
);
1065 node
->as_eventset
= NULL
;
1069 /* Return at most EVENT_BUFFER_SIZE events in one call. */
1070 if (nevents
> EVENT_BUFFER_SIZE
)
1071 nevents
= EVENT_BUFFER_SIZE
;
1074 * If the timeout is -1, wait until at least one event occurs. If the
1075 * timeout is 0, poll for events, but do not wait at all.
1077 noccurred
= WaitEventSetWait(node
->as_eventset
, timeout
, occurred_event
,
1078 nevents
, WAIT_EVENT_APPEND_READY
);
1079 FreeWaitEventSet(node
->as_eventset
);
1080 node
->as_eventset
= NULL
;
1084 /* Deliver notifications. */
1085 for (i
= 0; i
< noccurred
; i
++)
1087 WaitEvent
*w
= &occurred_event
[i
];
1090 * Each waiting subplan should have registered its wait event with
1091 * user_data pointing back to its AsyncRequest.
1093 if ((w
->events
& WL_SOCKET_READABLE
) != 0)
1095 AsyncRequest
*areq
= (AsyncRequest
*) w
->user_data
;
1097 if (areq
->callback_pending
)
1100 * Mark it as no longer needing a callback. We must do this
1101 * before dispatching the callback in case the callback resets
1104 areq
->callback_pending
= false;
1106 /* Do the actual work. */
1107 ExecAsyncNotify(areq
);
1113 /* ----------------------------------------------------------------
1114 * ExecAsyncAppendResponse
1116 * Receive a response from an asynchronous request we made.
1117 * ----------------------------------------------------------------
1120 ExecAsyncAppendResponse(AsyncRequest
*areq
)
1122 AppendState
*node
= (AppendState
*) areq
->requestor
;
1123 TupleTableSlot
*slot
= areq
->result
;
1125 /* The result should be a TupleTableSlot or NULL. */
1126 Assert(slot
== NULL
|| IsA(slot
, TupleTableSlot
));
1128 /* Nothing to do if the request is pending. */
1129 if (!areq
->request_complete
)
1131 /* The request would have been pending for a callback. */
1132 Assert(areq
->callback_pending
);
1136 /* If the result is NULL or an empty slot, there's nothing more to do. */
1137 if (TupIsNull(slot
))
1139 /* The ending subplan wouldn't have been pending for a callback. */
1140 Assert(!areq
->callback_pending
);
1141 --node
->as_nasyncremain
;
1145 /* Save result so we can return it. */
1146 Assert(node
->as_nasyncresults
< node
->as_nasyncplans
);
1147 node
->as_asyncresults
[node
->as_nasyncresults
++] = slot
;
1150 * Mark the subplan that returned a result as ready for a new request. We
1151 * don't launch another one here immediately because it might complete.
1153 node
->as_needrequest
= bms_add_member(node
->as_needrequest
,
1154 areq
->request_index
);
1157 /* ----------------------------------------------------------------
1158 * classify_matching_subplans
1160 * Classify the node's as_valid_subplans into sync ones and
1161 * async ones, adjust it to contain sync ones only, and save
1162 * async ones in the node's as_valid_asyncplans.
1163 * ----------------------------------------------------------------
1166 classify_matching_subplans(AppendState
*node
)
1168 Bitmapset
*valid_asyncplans
;
1170 Assert(node
->as_valid_subplans_identified
);
1171 Assert(node
->as_valid_asyncplans
== NULL
);
1173 /* Nothing to do if there are no valid subplans. */
1174 if (bms_is_empty(node
->as_valid_subplans
))
1176 node
->as_syncdone
= true;
1177 node
->as_nasyncremain
= 0;
1181 /* Nothing to do if there are no valid async subplans. */
1182 if (!bms_overlap(node
->as_valid_subplans
, node
->as_asyncplans
))
1184 node
->as_nasyncremain
= 0;
1188 /* Get valid async subplans. */
1189 valid_asyncplans
= bms_intersect(node
->as_asyncplans
,
1190 node
->as_valid_subplans
);
1192 /* Adjust the valid subplans to contain sync subplans only. */
1193 node
->as_valid_subplans
= bms_del_members(node
->as_valid_subplans
,
1196 /* Save valid async subplans. */
1197 node
->as_valid_asyncplans
= valid_asyncplans
;