1 /*-------------------------------------------------------------------------
4 * routines to handle append nodes.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeAppend.c
13 *-------------------------------------------------------------------------
16 * ExecInitAppend - initialize the append node
17 * ExecAppend - retrieve the next tuple from the node
18 * ExecEndAppend - shut down the append node
19 * ExecReScanAppend - rescan the append node
22 * Each append node contains a list of one or more subplans which
23 * must be iteratively processed (forwards or backwards).
24 * Tuples are retrieved by executing the 'whichplan'th subplan
25 * until the subplan stops returning tuples, at which point that
26 * plan is shut down and the next started up.
28 * Append nodes don't make use of their left and right
29 * subtrees, rather they maintain a list of subplans so
30 * a typical append node looks like this in the plan tree:
34 * Append -------+------+------+--- nil
39 * Append nodes are currently used for unions, and to support
40 * inheritance queries, where several relations need to be scanned.
41 * For example, in our standard person/student/employee/student-emp
42 * example, where student and employee inherit from person
43 * and student-emp inherits from student and employee, the
46 * select name from person
51 * Append -------+-------+--------+--------+
53 * nil nil Scan Scan Scan Scan
55 * person employee student student-emp
60 #include "executor/execAsync.h"
61 #include "executor/execPartition.h"
62 #include "executor/executor.h"
63 #include "executor/nodeAppend.h"
64 #include "miscadmin.h"
66 #include "storage/latch.h"
68 /* Shared state for parallel-aware Append. */
69 struct ParallelAppendState
71 LWLock pa_lock
; /* mutual exclusion to choose next subplan */
72 int pa_next_plan
; /* next plan to choose by any worker */
75 * pa_finished[i] should be true if no more workers should select subplan
76 * i. for a non-partial plan, this should be set to true as soon as a
77 * worker selects the plan; for a partial plan, it remains false until
78 * some worker executes the plan to completion.
80 bool pa_finished
[FLEXIBLE_ARRAY_MEMBER
];
83 #define INVALID_SUBPLAN_INDEX -1
84 #define EVENT_BUFFER_SIZE 16
86 static TupleTableSlot
*ExecAppend(PlanState
*pstate
);
87 static bool choose_next_subplan_locally(AppendState
*node
);
88 static bool choose_next_subplan_for_leader(AppendState
*node
);
89 static bool choose_next_subplan_for_worker(AppendState
*node
);
90 static void mark_invalid_subplans_as_finished(AppendState
*node
);
91 static void ExecAppendAsyncBegin(AppendState
*node
);
92 static bool ExecAppendAsyncGetNext(AppendState
*node
, TupleTableSlot
**result
);
93 static bool ExecAppendAsyncRequest(AppendState
*node
, TupleTableSlot
**result
);
94 static void ExecAppendAsyncEventWait(AppendState
*node
);
95 static void classify_matching_subplans(AppendState
*node
);
97 /* ----------------------------------------------------------------
100 * Begin all of the subscans of the append node.
102 * (This is potentially wasteful, since the entire result of the
103 * append node may not be scanned, but this way all of the
104 * structures get allocated in the executor's top level memory
105 * block instead of that of the call to ExecAppend.)
106 * ----------------------------------------------------------------
109 ExecInitAppend(Append
*node
, EState
*estate
, int eflags
)
111 AppendState
*appendstate
= makeNode(AppendState
);
112 PlanState
**appendplanstates
;
113 const TupleTableSlotOps
*appendops
;
114 Bitmapset
*validsubplans
;
115 Bitmapset
*asyncplans
;
122 /* check for unsupported flags */
123 Assert(!(eflags
& EXEC_FLAG_MARK
));
126 * create new AppendState for our append node
128 appendstate
->ps
.plan
= (Plan
*) node
;
129 appendstate
->ps
.state
= estate
;
130 appendstate
->ps
.ExecProcNode
= ExecAppend
;
132 /* Let choose_next_subplan_* function handle setting the first subplan */
133 appendstate
->as_whichplan
= INVALID_SUBPLAN_INDEX
;
134 appendstate
->as_syncdone
= false;
135 appendstate
->as_begun
= false;
137 /* If run-time partition pruning is enabled, then set that up now */
138 if (node
->part_prune_info
!= NULL
)
140 PartitionPruneState
*prunestate
;
143 * Set up pruning data structure. This also initializes the set of
144 * subplans to initialize (validsubplans) by taking into account the
145 * result of performing initial pruning if any.
147 prunestate
= ExecInitPartitionPruning(&appendstate
->ps
,
148 list_length(node
->appendplans
),
149 node
->part_prune_info
,
151 appendstate
->as_prune_state
= prunestate
;
152 nplans
= bms_num_members(validsubplans
);
155 * When no run-time pruning is required and there's at least one
156 * subplan, we can fill as_valid_subplans immediately, preventing
157 * later calls to ExecFindMatchingSubPlans.
159 if (!prunestate
->do_exec_prune
&& nplans
> 0)
161 appendstate
->as_valid_subplans
= bms_add_range(NULL
, 0, nplans
- 1);
162 appendstate
->as_valid_subplans_identified
= true;
167 nplans
= list_length(node
->appendplans
);
170 * When run-time partition pruning is not enabled we can just mark all
171 * subplans as valid; they must also all be initialized.
174 appendstate
->as_valid_subplans
= validsubplans
=
175 bms_add_range(NULL
, 0, nplans
- 1);
176 appendstate
->as_valid_subplans_identified
= true;
177 appendstate
->as_prune_state
= NULL
;
180 appendplanstates
= (PlanState
**) palloc(nplans
*
181 sizeof(PlanState
*));
184 * call ExecInitNode on each of the valid plans to be executed and save
185 * the results into the appendplanstates array.
187 * While at it, find out the first valid partial plan.
194 while ((i
= bms_next_member(validsubplans
, i
)) >= 0)
196 Plan
*initNode
= (Plan
*) list_nth(node
->appendplans
, i
);
199 * Record async subplans. When executing EvalPlanQual, we treat them
200 * as sync ones; don't do this when initializing an EvalPlanQual plan
203 if (initNode
->async_capable
&& estate
->es_epq_active
== NULL
)
205 asyncplans
= bms_add_member(asyncplans
, j
);
210 * Record the lowest appendplans index which is a valid partial plan.
212 if (i
>= node
->first_partial_plan
&& j
< firstvalid
)
215 appendplanstates
[j
++] = ExecInitNode(initNode
, estate
, eflags
);
218 appendstate
->as_first_partial_plan
= firstvalid
;
219 appendstate
->appendplans
= appendplanstates
;
220 appendstate
->as_nplans
= nplans
;
223 * Initialize Append's result tuple type and slot. If the child plans all
224 * produce the same fixed slot type, we can use that slot type; otherwise
225 * make a virtual slot. (Note that the result slot itself is used only to
226 * return a null tuple at end of execution; real tuples are returned to
227 * the caller in the children's own result slots. What we are doing here
228 * is allowing the parent plan node to optimize if the Append will return
229 * only one kind of slot.)
231 appendops
= ExecGetCommonSlotOps(appendplanstates
, j
);
232 if (appendops
!= NULL
)
234 ExecInitResultTupleSlotTL(&appendstate
->ps
, appendops
);
238 ExecInitResultTupleSlotTL(&appendstate
->ps
, &TTSOpsVirtual
);
239 /* show that the output slot type is not fixed */
240 appendstate
->ps
.resultopsset
= true;
241 appendstate
->ps
.resultopsfixed
= false;
244 /* Initialize async state */
245 appendstate
->as_asyncplans
= asyncplans
;
246 appendstate
->as_nasyncplans
= nasyncplans
;
247 appendstate
->as_asyncrequests
= NULL
;
248 appendstate
->as_asyncresults
= NULL
;
249 appendstate
->as_nasyncresults
= 0;
250 appendstate
->as_nasyncremain
= 0;
251 appendstate
->as_needrequest
= NULL
;
252 appendstate
->as_eventset
= NULL
;
253 appendstate
->as_valid_asyncplans
= NULL
;
257 appendstate
->as_asyncrequests
= (AsyncRequest
**)
258 palloc0(nplans
* sizeof(AsyncRequest
*));
261 while ((i
= bms_next_member(asyncplans
, i
)) >= 0)
265 areq
= palloc(sizeof(AsyncRequest
));
266 areq
->requestor
= (PlanState
*) appendstate
;
267 areq
->requestee
= appendplanstates
[i
];
268 areq
->request_index
= i
;
269 areq
->callback_pending
= false;
270 areq
->request_complete
= false;
273 appendstate
->as_asyncrequests
[i
] = areq
;
276 appendstate
->as_asyncresults
= (TupleTableSlot
**)
277 palloc0(nasyncplans
* sizeof(TupleTableSlot
*));
279 if (appendstate
->as_valid_subplans_identified
)
280 classify_matching_subplans(appendstate
);
284 * Miscellaneous initialization
287 appendstate
->ps
.ps_ProjInfo
= NULL
;
289 /* For parallel query, this will be overridden later. */
290 appendstate
->choose_next_subplan
= choose_next_subplan_locally
;
295 /* ----------------------------------------------------------------
298 * Handles iteration over multiple subplans.
299 * ----------------------------------------------------------------
301 static TupleTableSlot
*
302 ExecAppend(PlanState
*pstate
)
304 AppendState
*node
= castNode(AppendState
, pstate
);
305 TupleTableSlot
*result
;
308 * If this is the first call after Init or ReScan, we need to do the
309 * initialization work.
313 Assert(node
->as_whichplan
== INVALID_SUBPLAN_INDEX
);
314 Assert(!node
->as_syncdone
);
316 /* Nothing to do if there are no subplans */
317 if (node
->as_nplans
== 0)
318 return ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
320 /* If there are any async subplans, begin executing them. */
321 if (node
->as_nasyncplans
> 0)
322 ExecAppendAsyncBegin(node
);
325 * If no sync subplan has been chosen, we must choose one before
328 if (!node
->choose_next_subplan(node
) && node
->as_nasyncremain
== 0)
329 return ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
331 Assert(node
->as_syncdone
||
332 (node
->as_whichplan
>= 0 &&
333 node
->as_whichplan
< node
->as_nplans
));
335 /* And we're initialized. */
336 node
->as_begun
= true;
343 CHECK_FOR_INTERRUPTS();
346 * try to get a tuple from an async subplan if any
348 if (node
->as_syncdone
|| !bms_is_empty(node
->as_needrequest
))
350 if (ExecAppendAsyncGetNext(node
, &result
))
352 Assert(!node
->as_syncdone
);
353 Assert(bms_is_empty(node
->as_needrequest
));
357 * figure out which sync subplan we are currently processing
359 Assert(node
->as_whichplan
>= 0 && node
->as_whichplan
< node
->as_nplans
);
360 subnode
= node
->appendplans
[node
->as_whichplan
];
363 * get a tuple from the subplan
365 result
= ExecProcNode(subnode
);
367 if (!TupIsNull(result
))
370 * If the subplan gave us something then return it as-is. We do
371 * NOT make use of the result slot that was set up in
372 * ExecInitAppend; there's no need for it.
378 * wait or poll for async events if any. We do this before checking
379 * for the end of iteration, because it might drain the remaining
382 if (node
->as_nasyncremain
> 0)
383 ExecAppendAsyncEventWait(node
);
385 /* choose new sync subplan; if no sync/async subplans, we're done */
386 if (!node
->choose_next_subplan(node
) && node
->as_nasyncremain
== 0)
387 return ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
391 /* ----------------------------------------------------------------
394 * Shuts down the subscans of the append node.
396 * Returns nothing of interest.
397 * ----------------------------------------------------------------
400 ExecEndAppend(AppendState
*node
)
402 PlanState
**appendplans
;
407 * get information from the node
409 appendplans
= node
->appendplans
;
410 nplans
= node
->as_nplans
;
413 * shut down each of the subscans
415 for (i
= 0; i
< nplans
; i
++)
416 ExecEndNode(appendplans
[i
]);
420 ExecReScanAppend(AppendState
*node
)
422 int nasyncplans
= node
->as_nasyncplans
;
426 * If any PARAM_EXEC Params used in pruning expressions have changed, then
427 * we'd better unset the valid subplans so that they are reselected for
428 * the new parameter values.
430 if (node
->as_prune_state
&&
431 bms_overlap(node
->ps
.chgParam
,
432 node
->as_prune_state
->execparamids
))
434 node
->as_valid_subplans_identified
= false;
435 bms_free(node
->as_valid_subplans
);
436 node
->as_valid_subplans
= NULL
;
437 bms_free(node
->as_valid_asyncplans
);
438 node
->as_valid_asyncplans
= NULL
;
441 for (i
= 0; i
< node
->as_nplans
; i
++)
443 PlanState
*subnode
= node
->appendplans
[i
];
446 * ExecReScan doesn't know about my subplans, so I have to do
447 * changed-parameter signaling myself.
449 if (node
->ps
.chgParam
!= NULL
)
450 UpdateChangedParamSet(subnode
, node
->ps
.chgParam
);
453 * If chgParam of subnode is not null then plan will be re-scanned by
454 * first ExecProcNode or by first ExecAsyncRequest.
456 if (subnode
->chgParam
== NULL
)
460 /* Reset async state */
464 while ((i
= bms_next_member(node
->as_asyncplans
, i
)) >= 0)
466 AsyncRequest
*areq
= node
->as_asyncrequests
[i
];
468 areq
->callback_pending
= false;
469 areq
->request_complete
= false;
473 node
->as_nasyncresults
= 0;
474 node
->as_nasyncremain
= 0;
475 bms_free(node
->as_needrequest
);
476 node
->as_needrequest
= NULL
;
479 /* Let choose_next_subplan_* function handle setting the first subplan */
480 node
->as_whichplan
= INVALID_SUBPLAN_INDEX
;
481 node
->as_syncdone
= false;
482 node
->as_begun
= false;
485 /* ----------------------------------------------------------------
486 * Parallel Append Support
487 * ----------------------------------------------------------------
490 /* ----------------------------------------------------------------
493 * Compute the amount of space we'll need in the parallel
494 * query DSM, and inform pcxt->estimator about our needs.
495 * ----------------------------------------------------------------
498 ExecAppendEstimate(AppendState
*node
,
499 ParallelContext
*pcxt
)
502 add_size(offsetof(ParallelAppendState
, pa_finished
),
503 sizeof(bool) * node
->as_nplans
);
505 shm_toc_estimate_chunk(&pcxt
->estimator
, node
->pstate_len
);
506 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
510 /* ----------------------------------------------------------------
511 * ExecAppendInitializeDSM
513 * Set up shared state for Parallel Append.
514 * ----------------------------------------------------------------
517 ExecAppendInitializeDSM(AppendState
*node
,
518 ParallelContext
*pcxt
)
520 ParallelAppendState
*pstate
;
522 pstate
= shm_toc_allocate(pcxt
->toc
, node
->pstate_len
);
523 memset(pstate
, 0, node
->pstate_len
);
524 LWLockInitialize(&pstate
->pa_lock
, LWTRANCHE_PARALLEL_APPEND
);
525 shm_toc_insert(pcxt
->toc
, node
->ps
.plan
->plan_node_id
, pstate
);
527 node
->as_pstate
= pstate
;
528 node
->choose_next_subplan
= choose_next_subplan_for_leader
;
531 /* ----------------------------------------------------------------
532 * ExecAppendReInitializeDSM
534 * Reset shared state before beginning a fresh scan.
535 * ----------------------------------------------------------------
538 ExecAppendReInitializeDSM(AppendState
*node
, ParallelContext
*pcxt
)
540 ParallelAppendState
*pstate
= node
->as_pstate
;
542 pstate
->pa_next_plan
= 0;
543 memset(pstate
->pa_finished
, 0, sizeof(bool) * node
->as_nplans
);
546 /* ----------------------------------------------------------------
547 * ExecAppendInitializeWorker
549 * Copy relevant information from TOC into planstate, and initialize
550 * whatever is required to choose and execute the optimal subplan.
551 * ----------------------------------------------------------------
554 ExecAppendInitializeWorker(AppendState
*node
, ParallelWorkerContext
*pwcxt
)
556 node
->as_pstate
= shm_toc_lookup(pwcxt
->toc
, node
->ps
.plan
->plan_node_id
, false);
557 node
->choose_next_subplan
= choose_next_subplan_for_worker
;
560 /* ----------------------------------------------------------------
561 * choose_next_subplan_locally
563 * Choose next sync subplan for a non-parallel-aware Append,
564 * returning false if there are no more.
565 * ----------------------------------------------------------------
568 choose_next_subplan_locally(AppendState
*node
)
570 int whichplan
= node
->as_whichplan
;
573 /* We should never be called when there are no subplans */
574 Assert(node
->as_nplans
> 0);
576 /* Nothing to do if syncdone */
577 if (node
->as_syncdone
)
581 * If first call then have the bms member function choose the first valid
582 * sync subplan by initializing whichplan to -1. If there happen to be no
583 * valid sync subplans then the bms member function will handle that by
584 * returning a negative number which will allow us to exit returning a
587 if (whichplan
== INVALID_SUBPLAN_INDEX
)
589 if (node
->as_nasyncplans
> 0)
591 /* We'd have filled as_valid_subplans already */
592 Assert(node
->as_valid_subplans_identified
);
594 else if (!node
->as_valid_subplans_identified
)
596 node
->as_valid_subplans
=
597 ExecFindMatchingSubPlans(node
->as_prune_state
, false);
598 node
->as_valid_subplans_identified
= true;
604 /* Ensure whichplan is within the expected range */
605 Assert(whichplan
>= -1 && whichplan
<= node
->as_nplans
);
607 if (ScanDirectionIsForward(node
->ps
.state
->es_direction
))
608 nextplan
= bms_next_member(node
->as_valid_subplans
, whichplan
);
610 nextplan
= bms_prev_member(node
->as_valid_subplans
, whichplan
);
614 /* Set as_syncdone if in async mode */
615 if (node
->as_nasyncplans
> 0)
616 node
->as_syncdone
= true;
620 node
->as_whichplan
= nextplan
;
625 /* ----------------------------------------------------------------
626 * choose_next_subplan_for_leader
628 * Try to pick a plan which doesn't commit us to doing much
629 * work locally, so that as much work as possible is done in
630 * the workers. Cheapest subplans are at the end.
631 * ----------------------------------------------------------------
634 choose_next_subplan_for_leader(AppendState
*node
)
636 ParallelAppendState
*pstate
= node
->as_pstate
;
638 /* Backward scan is not supported by parallel-aware plans */
639 Assert(ScanDirectionIsForward(node
->ps
.state
->es_direction
));
641 /* We should never be called when there are no subplans */
642 Assert(node
->as_nplans
> 0);
644 LWLockAcquire(&pstate
->pa_lock
, LW_EXCLUSIVE
);
646 if (node
->as_whichplan
!= INVALID_SUBPLAN_INDEX
)
648 /* Mark just-completed subplan as finished. */
649 node
->as_pstate
->pa_finished
[node
->as_whichplan
] = true;
653 /* Start with last subplan. */
654 node
->as_whichplan
= node
->as_nplans
- 1;
657 * If we've yet to determine the valid subplans then do so now. If
658 * run-time pruning is disabled then the valid subplans will always be
659 * set to all subplans.
661 if (!node
->as_valid_subplans_identified
)
663 node
->as_valid_subplans
=
664 ExecFindMatchingSubPlans(node
->as_prune_state
, false);
665 node
->as_valid_subplans_identified
= true;
668 * Mark each invalid plan as finished to allow the loop below to
669 * select the first valid subplan.
671 mark_invalid_subplans_as_finished(node
);
675 /* Loop until we find a subplan to execute. */
676 while (pstate
->pa_finished
[node
->as_whichplan
])
678 if (node
->as_whichplan
== 0)
680 pstate
->pa_next_plan
= INVALID_SUBPLAN_INDEX
;
681 node
->as_whichplan
= INVALID_SUBPLAN_INDEX
;
682 LWLockRelease(&pstate
->pa_lock
);
687 * We needn't pay attention to as_valid_subplans here as all invalid
688 * plans have been marked as finished.
690 node
->as_whichplan
--;
693 /* If non-partial, immediately mark as finished. */
694 if (node
->as_whichplan
< node
->as_first_partial_plan
)
695 node
->as_pstate
->pa_finished
[node
->as_whichplan
] = true;
697 LWLockRelease(&pstate
->pa_lock
);
702 /* ----------------------------------------------------------------
703 * choose_next_subplan_for_worker
705 * Choose next subplan for a parallel-aware Append, returning
706 * false if there are no more.
708 * We start from the first plan and advance through the list;
709 * when we get back to the end, we loop back to the first
710 * partial plan. This assigns the non-partial plans first in
711 * order of descending cost and then spreads out the workers
712 * as evenly as possible across the remaining partial plans.
713 * ----------------------------------------------------------------
716 choose_next_subplan_for_worker(AppendState
*node
)
718 ParallelAppendState
*pstate
= node
->as_pstate
;
720 /* Backward scan is not supported by parallel-aware plans */
721 Assert(ScanDirectionIsForward(node
->ps
.state
->es_direction
));
723 /* We should never be called when there are no subplans */
724 Assert(node
->as_nplans
> 0);
726 LWLockAcquire(&pstate
->pa_lock
, LW_EXCLUSIVE
);
728 /* Mark just-completed subplan as finished. */
729 if (node
->as_whichplan
!= INVALID_SUBPLAN_INDEX
)
730 node
->as_pstate
->pa_finished
[node
->as_whichplan
] = true;
733 * If we've yet to determine the valid subplans then do so now. If
734 * run-time pruning is disabled then the valid subplans will always be set
737 else if (!node
->as_valid_subplans_identified
)
739 node
->as_valid_subplans
=
740 ExecFindMatchingSubPlans(node
->as_prune_state
, false);
741 node
->as_valid_subplans_identified
= true;
743 mark_invalid_subplans_as_finished(node
);
746 /* If all the plans are already done, we have nothing to do */
747 if (pstate
->pa_next_plan
== INVALID_SUBPLAN_INDEX
)
749 LWLockRelease(&pstate
->pa_lock
);
753 /* Save the plan from which we are starting the search. */
754 node
->as_whichplan
= pstate
->pa_next_plan
;
756 /* Loop until we find a valid subplan to execute. */
757 while (pstate
->pa_finished
[pstate
->pa_next_plan
])
761 nextplan
= bms_next_member(node
->as_valid_subplans
,
762 pstate
->pa_next_plan
);
765 /* Advance to the next valid plan. */
766 pstate
->pa_next_plan
= nextplan
;
768 else if (node
->as_whichplan
> node
->as_first_partial_plan
)
771 * Try looping back to the first valid partial plan, if there is
772 * one. If there isn't, arrange to bail out below.
774 nextplan
= bms_next_member(node
->as_valid_subplans
,
775 node
->as_first_partial_plan
- 1);
776 pstate
->pa_next_plan
=
777 nextplan
< 0 ? node
->as_whichplan
: nextplan
;
782 * At last plan, and either there are no partial plans or we've
783 * tried them all. Arrange to bail out.
785 pstate
->pa_next_plan
= node
->as_whichplan
;
788 if (pstate
->pa_next_plan
== node
->as_whichplan
)
790 /* We've tried everything! */
791 pstate
->pa_next_plan
= INVALID_SUBPLAN_INDEX
;
792 LWLockRelease(&pstate
->pa_lock
);
797 /* Pick the plan we found, and advance pa_next_plan one more time. */
798 node
->as_whichplan
= pstate
->pa_next_plan
;
799 pstate
->pa_next_plan
= bms_next_member(node
->as_valid_subplans
,
800 pstate
->pa_next_plan
);
803 * If there are no more valid plans then try setting the next plan to the
804 * first valid partial plan.
806 if (pstate
->pa_next_plan
< 0)
808 int nextplan
= bms_next_member(node
->as_valid_subplans
,
809 node
->as_first_partial_plan
- 1);
812 pstate
->pa_next_plan
= nextplan
;
816 * There are no valid partial plans, and we already chose the last
817 * non-partial plan; so flag that there's nothing more for our
818 * fellow workers to do.
820 pstate
->pa_next_plan
= INVALID_SUBPLAN_INDEX
;
824 /* If non-partial, immediately mark as finished. */
825 if (node
->as_whichplan
< node
->as_first_partial_plan
)
826 node
->as_pstate
->pa_finished
[node
->as_whichplan
] = true;
828 LWLockRelease(&pstate
->pa_lock
);
834 * mark_invalid_subplans_as_finished
835 * Marks the ParallelAppendState's pa_finished as true for each invalid
838 * This function should only be called for parallel Append with run-time
842 mark_invalid_subplans_as_finished(AppendState
*node
)
846 /* Only valid to call this while in parallel Append mode */
847 Assert(node
->as_pstate
);
849 /* Shouldn't have been called when run-time pruning is not enabled */
850 Assert(node
->as_prune_state
);
852 /* Nothing to do if all plans are valid */
853 if (bms_num_members(node
->as_valid_subplans
) == node
->as_nplans
)
856 /* Mark all non-valid plans as finished */
857 for (i
= 0; i
< node
->as_nplans
; i
++)
859 if (!bms_is_member(i
, node
->as_valid_subplans
))
860 node
->as_pstate
->pa_finished
[i
] = true;
864 /* ----------------------------------------------------------------
865 * Asynchronous Append Support
866 * ----------------------------------------------------------------
869 /* ----------------------------------------------------------------
870 * ExecAppendAsyncBegin
872 * Begin executing designed async-capable subplans.
873 * ----------------------------------------------------------------
876 ExecAppendAsyncBegin(AppendState
*node
)
880 /* Backward scan is not supported by async-aware Appends. */
881 Assert(ScanDirectionIsForward(node
->ps
.state
->es_direction
));
883 /* We should never be called when there are no subplans */
884 Assert(node
->as_nplans
> 0);
886 /* We should never be called when there are no async subplans. */
887 Assert(node
->as_nasyncplans
> 0);
889 /* If we've yet to determine the valid subplans then do so now. */
890 if (!node
->as_valid_subplans_identified
)
892 node
->as_valid_subplans
=
893 ExecFindMatchingSubPlans(node
->as_prune_state
, false);
894 node
->as_valid_subplans_identified
= true;
896 classify_matching_subplans(node
);
899 /* Initialize state variables. */
900 node
->as_syncdone
= bms_is_empty(node
->as_valid_subplans
);
901 node
->as_nasyncremain
= bms_num_members(node
->as_valid_asyncplans
);
903 /* Nothing to do if there are no valid async subplans. */
904 if (node
->as_nasyncremain
== 0)
907 /* Make a request for each of the valid async subplans. */
909 while ((i
= bms_next_member(node
->as_valid_asyncplans
, i
)) >= 0)
911 AsyncRequest
*areq
= node
->as_asyncrequests
[i
];
913 Assert(areq
->request_index
== i
);
914 Assert(!areq
->callback_pending
);
916 /* Do the actual work. */
917 ExecAsyncRequest(areq
);
921 /* ----------------------------------------------------------------
922 * ExecAppendAsyncGetNext
924 * Get the next tuple from any of the asynchronous subplans.
925 * ----------------------------------------------------------------
928 ExecAppendAsyncGetNext(AppendState
*node
, TupleTableSlot
**result
)
932 /* We should never be called when there are no valid async subplans. */
933 Assert(node
->as_nasyncremain
> 0);
935 /* Request a tuple asynchronously. */
936 if (ExecAppendAsyncRequest(node
, result
))
939 while (node
->as_nasyncremain
> 0)
941 CHECK_FOR_INTERRUPTS();
943 /* Wait or poll for async events. */
944 ExecAppendAsyncEventWait(node
);
946 /* Request a tuple asynchronously. */
947 if (ExecAppendAsyncRequest(node
, result
))
950 /* Break from loop if there's any sync subplan that isn't complete. */
951 if (!node
->as_syncdone
)
956 * If all sync subplans are complete, we're totally done scanning the
957 * given node. Otherwise, we're done with the asynchronous stuff but must
958 * continue scanning the sync subplans.
960 if (node
->as_syncdone
)
962 Assert(node
->as_nasyncremain
== 0);
963 *result
= ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
970 /* ----------------------------------------------------------------
971 * ExecAppendAsyncRequest
973 * Request a tuple asynchronously.
974 * ----------------------------------------------------------------
977 ExecAppendAsyncRequest(AppendState
*node
, TupleTableSlot
**result
)
979 Bitmapset
*needrequest
;
982 /* Nothing to do if there are no async subplans needing a new request. */
983 if (bms_is_empty(node
->as_needrequest
))
985 Assert(node
->as_nasyncresults
== 0);
990 * If there are any asynchronously-generated results that have not yet
991 * been returned, we have nothing to do; just return one of them.
993 if (node
->as_nasyncresults
> 0)
995 --node
->as_nasyncresults
;
996 *result
= node
->as_asyncresults
[node
->as_nasyncresults
];
1000 /* Make a new request for each of the async subplans that need it. */
1001 needrequest
= node
->as_needrequest
;
1002 node
->as_needrequest
= NULL
;
1004 while ((i
= bms_next_member(needrequest
, i
)) >= 0)
1006 AsyncRequest
*areq
= node
->as_asyncrequests
[i
];
1008 /* Do the actual work. */
1009 ExecAsyncRequest(areq
);
1011 bms_free(needrequest
);
1013 /* Return one of the asynchronously-generated results if any. */
1014 if (node
->as_nasyncresults
> 0)
1016 --node
->as_nasyncresults
;
1017 *result
= node
->as_asyncresults
[node
->as_nasyncresults
];
1024 /* ----------------------------------------------------------------
1025 * ExecAppendAsyncEventWait
1027 * Wait or poll for file descriptor events and fire callbacks.
1028 * ----------------------------------------------------------------
1031 ExecAppendAsyncEventWait(AppendState
*node
)
1033 int nevents
= node
->as_nasyncplans
+ 1;
1034 long timeout
= node
->as_syncdone
? -1 : 0;
1035 WaitEvent occurred_event
[EVENT_BUFFER_SIZE
];
1039 /* We should never be called when there are no valid async subplans. */
1040 Assert(node
->as_nasyncremain
> 0);
1042 Assert(node
->as_eventset
== NULL
);
1043 node
->as_eventset
= CreateWaitEventSet(CurrentResourceOwner
, nevents
);
1044 AddWaitEventToSet(node
->as_eventset
, WL_EXIT_ON_PM_DEATH
, PGINVALID_SOCKET
,
1047 /* Give each waiting subplan a chance to add an event. */
1049 while ((i
= bms_next_member(node
->as_asyncplans
, i
)) >= 0)
1051 AsyncRequest
*areq
= node
->as_asyncrequests
[i
];
1053 if (areq
->callback_pending
)
1054 ExecAsyncConfigureWait(areq
);
1058 * No need for further processing if there are no configured events other
1059 * than the postmaster death event.
1061 if (GetNumRegisteredWaitEvents(node
->as_eventset
) == 1)
1063 FreeWaitEventSet(node
->as_eventset
);
1064 node
->as_eventset
= NULL
;
1068 /* Return at most EVENT_BUFFER_SIZE events in one call. */
1069 if (nevents
> EVENT_BUFFER_SIZE
)
1070 nevents
= EVENT_BUFFER_SIZE
;
1073 * If the timeout is -1, wait until at least one event occurs. If the
1074 * timeout is 0, poll for events, but do not wait at all.
1076 noccurred
= WaitEventSetWait(node
->as_eventset
, timeout
, occurred_event
,
1077 nevents
, WAIT_EVENT_APPEND_READY
);
1078 FreeWaitEventSet(node
->as_eventset
);
1079 node
->as_eventset
= NULL
;
1083 /* Deliver notifications. */
1084 for (i
= 0; i
< noccurred
; i
++)
1086 WaitEvent
*w
= &occurred_event
[i
];
1089 * Each waiting subplan should have registered its wait event with
1090 * user_data pointing back to its AsyncRequest.
1092 if ((w
->events
& WL_SOCKET_READABLE
) != 0)
1094 AsyncRequest
*areq
= (AsyncRequest
*) w
->user_data
;
1096 if (areq
->callback_pending
)
1099 * Mark it as no longer needing a callback. We must do this
1100 * before dispatching the callback in case the callback resets
1103 areq
->callback_pending
= false;
1105 /* Do the actual work. */
1106 ExecAsyncNotify(areq
);
1112 /* ----------------------------------------------------------------
1113 * ExecAsyncAppendResponse
1115 * Receive a response from an asynchronous request we made.
1116 * ----------------------------------------------------------------
1119 ExecAsyncAppendResponse(AsyncRequest
*areq
)
1121 AppendState
*node
= (AppendState
*) areq
->requestor
;
1122 TupleTableSlot
*slot
= areq
->result
;
1124 /* The result should be a TupleTableSlot or NULL. */
1125 Assert(slot
== NULL
|| IsA(slot
, TupleTableSlot
));
1127 /* Nothing to do if the request is pending. */
1128 if (!areq
->request_complete
)
1130 /* The request would have been pending for a callback. */
1131 Assert(areq
->callback_pending
);
1135 /* If the result is NULL or an empty slot, there's nothing more to do. */
1136 if (TupIsNull(slot
))
1138 /* The ending subplan wouldn't have been pending for a callback. */
1139 Assert(!areq
->callback_pending
);
1140 --node
->as_nasyncremain
;
1144 /* Save result so we can return it. */
1145 Assert(node
->as_nasyncresults
< node
->as_nasyncplans
);
1146 node
->as_asyncresults
[node
->as_nasyncresults
++] = slot
;
1149 * Mark the subplan that returned a result as ready for a new request. We
1150 * don't launch another one here immediately because it might complete.
1152 node
->as_needrequest
= bms_add_member(node
->as_needrequest
,
1153 areq
->request_index
);
1156 /* ----------------------------------------------------------------
1157 * classify_matching_subplans
1159 * Classify the node's as_valid_subplans into sync ones and
1160 * async ones, adjust it to contain sync ones only, and save
1161 * async ones in the node's as_valid_asyncplans.
1162 * ----------------------------------------------------------------
1165 classify_matching_subplans(AppendState
*node
)
1167 Bitmapset
*valid_asyncplans
;
1169 Assert(node
->as_valid_subplans_identified
);
1170 Assert(node
->as_valid_asyncplans
== NULL
);
1172 /* Nothing to do if there are no valid subplans. */
1173 if (bms_is_empty(node
->as_valid_subplans
))
1175 node
->as_syncdone
= true;
1176 node
->as_nasyncremain
= 0;
1180 /* Nothing to do if there are no valid async subplans. */
1181 if (!bms_overlap(node
->as_valid_subplans
, node
->as_asyncplans
))
1183 node
->as_nasyncremain
= 0;
1187 /* Get valid async subplans. */
1188 valid_asyncplans
= bms_intersect(node
->as_asyncplans
,
1189 node
->as_valid_subplans
);
1191 /* Adjust the valid subplans to contain sync subplans only. */
1192 node
->as_valid_subplans
= bms_del_members(node
->as_valid_subplans
,
1195 /* Save valid async subplans. */
1196 node
->as_valid_asyncplans
= valid_asyncplans
;