1 /*-------------------------------------------------------------------------
4 * routines to handle MergeAppend nodes.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeMergeAppend.c
13 *-------------------------------------------------------------------------
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
33 * MergeAppend---+------+------+--- nil
41 #include "executor/executor.h"
42 #include "executor/execPartition.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "lib/binaryheap.h"
45 #include "miscadmin.h"
48 * We have one slot for each item in the heap array. We use SlotNumber
49 * to store slot indexes. This doesn't actually provide any formal
50 * type-safety, but it makes the code more self-documenting.
52 typedef int32 SlotNumber
;
54 static TupleTableSlot
*ExecMergeAppend(PlanState
*pstate
);
55 static int heap_compare_slots(Datum a
, Datum b
, void *arg
);
58 /* ----------------------------------------------------------------
61 * Begin all of the subscans of the MergeAppend node.
62 * ----------------------------------------------------------------
65 ExecInitMergeAppend(MergeAppend
*node
, EState
*estate
, int eflags
)
67 MergeAppendState
*mergestate
= makeNode(MergeAppendState
);
68 PlanState
**mergeplanstates
;
69 const TupleTableSlotOps
*mergeops
;
70 Bitmapset
*validsubplans
;
75 /* check for unsupported flags */
76 Assert(!(eflags
& (EXEC_FLAG_BACKWARD
| EXEC_FLAG_MARK
)));
79 * create new MergeAppendState for our node
81 mergestate
->ps
.plan
= (Plan
*) node
;
82 mergestate
->ps
.state
= estate
;
83 mergestate
->ps
.ExecProcNode
= ExecMergeAppend
;
85 /* If run-time partition pruning is enabled, then set that up now */
86 if (node
->part_prune_info
!= NULL
)
88 PartitionPruneState
*prunestate
;
91 * Set up pruning data structure. This also initializes the set of
92 * subplans to initialize (validsubplans) by taking into account the
93 * result of performing initial pruning if any.
95 prunestate
= ExecInitPartitionPruning(&mergestate
->ps
,
96 list_length(node
->mergeplans
),
97 node
->part_prune_info
,
99 mergestate
->ms_prune_state
= prunestate
;
100 nplans
= bms_num_members(validsubplans
);
103 * When no run-time pruning is required and there's at least one
104 * subplan, we can fill ms_valid_subplans immediately, preventing
105 * later calls to ExecFindMatchingSubPlans.
107 if (!prunestate
->do_exec_prune
&& nplans
> 0)
108 mergestate
->ms_valid_subplans
= bms_add_range(NULL
, 0, nplans
- 1);
112 nplans
= list_length(node
->mergeplans
);
115 * When run-time partition pruning is not enabled we can just mark all
116 * subplans as valid; they must also all be initialized.
119 mergestate
->ms_valid_subplans
= validsubplans
=
120 bms_add_range(NULL
, 0, nplans
- 1);
121 mergestate
->ms_prune_state
= NULL
;
124 mergeplanstates
= (PlanState
**) palloc(nplans
* sizeof(PlanState
*));
125 mergestate
->mergeplans
= mergeplanstates
;
126 mergestate
->ms_nplans
= nplans
;
128 mergestate
->ms_slots
= (TupleTableSlot
**) palloc0(sizeof(TupleTableSlot
*) * nplans
);
129 mergestate
->ms_heap
= binaryheap_allocate(nplans
, heap_compare_slots
,
133 * call ExecInitNode on each of the valid plans to be executed and save
134 * the results into the mergeplanstates array.
138 while ((i
= bms_next_member(validsubplans
, i
)) >= 0)
140 Plan
*initNode
= (Plan
*) list_nth(node
->mergeplans
, i
);
142 mergeplanstates
[j
++] = ExecInitNode(initNode
, estate
, eflags
);
146 * Initialize MergeAppend's result tuple type and slot. If the child
147 * plans all produce the same fixed slot type, we can use that slot type;
148 * otherwise make a virtual slot. (Note that the result slot itself is
149 * used only to return a null tuple at end of execution; real tuples are
150 * returned to the caller in the children's own result slots. What we are
151 * doing here is allowing the parent plan node to optimize if the
152 * MergeAppend will return only one kind of slot.)
154 mergeops
= ExecGetCommonSlotOps(mergeplanstates
, j
);
155 if (mergeops
!= NULL
)
157 ExecInitResultTupleSlotTL(&mergestate
->ps
, mergeops
);
161 ExecInitResultTupleSlotTL(&mergestate
->ps
, &TTSOpsVirtual
);
162 /* show that the output slot type is not fixed */
163 mergestate
->ps
.resultopsset
= true;
164 mergestate
->ps
.resultopsfixed
= false;
168 * Miscellaneous initialization
170 mergestate
->ps
.ps_ProjInfo
= NULL
;
173 * initialize sort-key information
175 mergestate
->ms_nkeys
= node
->numCols
;
176 mergestate
->ms_sortkeys
= palloc0(sizeof(SortSupportData
) * node
->numCols
);
178 for (i
= 0; i
< node
->numCols
; i
++)
180 SortSupport sortKey
= mergestate
->ms_sortkeys
+ i
;
182 sortKey
->ssup_cxt
= CurrentMemoryContext
;
183 sortKey
->ssup_collation
= node
->collations
[i
];
184 sortKey
->ssup_nulls_first
= node
->nullsFirst
[i
];
185 sortKey
->ssup_attno
= node
->sortColIdx
[i
];
188 * It isn't feasible to perform abbreviated key conversion, since
189 * tuples are pulled into mergestate's binary heap as needed. It
190 * would likely be counter-productive to convert tuples into an
191 * abbreviated representation as they're pulled up, so opt out of that
192 * additional optimization entirely.
194 sortKey
->abbreviate
= false;
196 PrepareSortSupportFromOrderingOp(node
->sortOperators
[i
], sortKey
);
200 * initialize to show we have not run the subplans yet
202 mergestate
->ms_initialized
= false;
207 /* ----------------------------------------------------------------
210 * Handles iteration over multiple subplans.
211 * ----------------------------------------------------------------
213 static TupleTableSlot
*
214 ExecMergeAppend(PlanState
*pstate
)
216 MergeAppendState
*node
= castNode(MergeAppendState
, pstate
);
217 TupleTableSlot
*result
;
220 CHECK_FOR_INTERRUPTS();
222 if (!node
->ms_initialized
)
224 /* Nothing to do if all subplans were pruned */
225 if (node
->ms_nplans
== 0)
226 return ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
229 * If we've yet to determine the valid subplans then do so now. If
230 * run-time pruning is disabled then the valid subplans will always be
231 * set to all subplans.
233 if (node
->ms_valid_subplans
== NULL
)
234 node
->ms_valid_subplans
=
235 ExecFindMatchingSubPlans(node
->ms_prune_state
, false);
238 * First time through: pull the first tuple from each valid subplan,
239 * and set up the heap.
242 while ((i
= bms_next_member(node
->ms_valid_subplans
, i
)) >= 0)
244 node
->ms_slots
[i
] = ExecProcNode(node
->mergeplans
[i
]);
245 if (!TupIsNull(node
->ms_slots
[i
]))
246 binaryheap_add_unordered(node
->ms_heap
, Int32GetDatum(i
));
248 binaryheap_build(node
->ms_heap
);
249 node
->ms_initialized
= true;
254 * Otherwise, pull the next tuple from whichever subplan we returned
255 * from last time, and reinsert the subplan index into the heap,
256 * because it might now compare differently against the existing
257 * elements of the heap. (We could perhaps simplify the logic a bit
258 * by doing this before returning from the prior call, but it's better
259 * to not pull tuples until necessary.)
261 i
= DatumGetInt32(binaryheap_first(node
->ms_heap
));
262 node
->ms_slots
[i
] = ExecProcNode(node
->mergeplans
[i
]);
263 if (!TupIsNull(node
->ms_slots
[i
]))
264 binaryheap_replace_first(node
->ms_heap
, Int32GetDatum(i
));
266 (void) binaryheap_remove_first(node
->ms_heap
);
269 if (binaryheap_empty(node
->ms_heap
))
271 /* All the subplans are exhausted, and so is the heap */
272 result
= ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
276 i
= DatumGetInt32(binaryheap_first(node
->ms_heap
));
277 result
= node
->ms_slots
[i
];
284 * Compare the tuples in the two given slots.
287 heap_compare_slots(Datum a
, Datum b
, void *arg
)
289 MergeAppendState
*node
= (MergeAppendState
*) arg
;
290 SlotNumber slot1
= DatumGetInt32(a
);
291 SlotNumber slot2
= DatumGetInt32(b
);
293 TupleTableSlot
*s1
= node
->ms_slots
[slot1
];
294 TupleTableSlot
*s2
= node
->ms_slots
[slot2
];
297 Assert(!TupIsNull(s1
));
298 Assert(!TupIsNull(s2
));
300 for (nkey
= 0; nkey
< node
->ms_nkeys
; nkey
++)
302 SortSupport sortKey
= node
->ms_sortkeys
+ nkey
;
303 AttrNumber attno
= sortKey
->ssup_attno
;
310 datum1
= slot_getattr(s1
, attno
, &isNull1
);
311 datum2
= slot_getattr(s2
, attno
, &isNull2
);
313 compare
= ApplySortComparator(datum1
, isNull1
,
318 INVERT_COMPARE_RESULT(compare
);
325 /* ----------------------------------------------------------------
328 * Shuts down the subscans of the MergeAppend node.
330 * Returns nothing of interest.
331 * ----------------------------------------------------------------
334 ExecEndMergeAppend(MergeAppendState
*node
)
336 PlanState
**mergeplans
;
341 * get information from the node
343 mergeplans
= node
->mergeplans
;
344 nplans
= node
->ms_nplans
;
347 * shut down each of the subscans
349 for (i
= 0; i
< nplans
; i
++)
350 ExecEndNode(mergeplans
[i
]);
354 ExecReScanMergeAppend(MergeAppendState
*node
)
359 * If any PARAM_EXEC Params used in pruning expressions have changed, then
360 * we'd better unset the valid subplans so that they are reselected for
361 * the new parameter values.
363 if (node
->ms_prune_state
&&
364 bms_overlap(node
->ps
.chgParam
,
365 node
->ms_prune_state
->execparamids
))
367 bms_free(node
->ms_valid_subplans
);
368 node
->ms_valid_subplans
= NULL
;
371 for (i
= 0; i
< node
->ms_nplans
; i
++)
373 PlanState
*subnode
= node
->mergeplans
[i
];
376 * ExecReScan doesn't know about my subplans, so I have to do
377 * changed-parameter signaling myself.
379 if (node
->ps
.chgParam
!= NULL
)
380 UpdateChangedParamSet(subnode
, node
->ps
.chgParam
);
383 * If chgParam of subnode is not null then plan will be re-scanned by
384 * first ExecProcNode.
386 if (subnode
->chgParam
== NULL
)
389 binaryheap_reset(node
->ms_heap
);
390 node
->ms_initialized
= false;