Improve fix for not entering parallel mode when holding interrupts.
[pgsql.git] / src / backend / executor / nodeHashjoin.c
blob96014f5bf7d2438d69a397ad33804422ca0c0b3a
1 /*-------------------------------------------------------------------------
3 * nodeHashjoin.c
4 * Routines to handle hash join nodes
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/executor/nodeHashjoin.c
13 * HASH JOIN
15 * This is based on the "hybrid hash join" algorithm described shortly in the
16 * following page
18 * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
20 * and in detail in the referenced paper:
22 * "An Adaptive Hash Join Algorithm for Multiuser Environments"
23 * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference.
24 * Brisbane: 186–197.
26 * If the inner side tuples of a hash join do not fit in memory, the hash join
27 * can be executed in multiple batches.
29 * If the statistics on the inner side relation are accurate, planner chooses a
30 * multi-batch strategy and estimates the number of batches.
32 * The query executor measures the real size of the hashtable and increases the
33 * number of batches if the hashtable grows too large.
35 * The number of batches is always a power of two, so an increase in the number
36 * of batches doubles it.
38 * Serial hash join measures batch size lazily -- waiting until it is loading a
39 * batch to determine if it will fit in memory. While inserting tuples into the
40 * hashtable, serial hash join will, if that tuple were to exceed work_mem,
41 * dump out the hashtable and reassign them either to other batch files or the
42 * current batch resident in the hashtable.
44 * Parallel hash join, on the other hand, completes all changes to the number
45 * of batches during the build phase. If it increases the number of batches, it
46 * dumps out all the tuples from all batches and reassigns them to entirely new
47 * batch files. Then it checks every batch to ensure it will fit in the space
48 * budget for the query.
50 * In both parallel and serial hash join, the executor currently makes a best
51 * effort. If a particular batch will not fit in memory, it tries doubling the
52 * number of batches. If after a batch increase, there is a batch which
53 * retained all or none of its tuples, the executor disables growth in the
54 * number of batches globally. After growth is disabled, all batches that would
55 * have previously triggered an increase in the number of batches instead
56 * exceed the space allowed.
58 * PARALLELISM
60 * Hash joins can participate in parallel query execution in several ways. A
61 * parallel-oblivious hash join is one where the node is unaware that it is
62 * part of a parallel plan. In this case, a copy of the inner plan is used to
63 * build a copy of the hash table in every backend, and the outer plan could
64 * either be built from a partial or complete path, so that the results of the
65 * hash join are correspondingly either partial or complete. A parallel-aware
66 * hash join is one that behaves differently, coordinating work between
67 * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
68 * Hash Join always appears with a Parallel Hash node.
70 * Parallel-aware hash joins use the same per-backend state machine to track
71 * progress through the hash join algorithm as parallel-oblivious hash joins.
72 * In a parallel-aware hash join, there is also a shared state machine that
73 * co-operating backends use to synchronize their local state machines and
74 * program counters. The shared state machine is managed with a Barrier IPC
75 * primitive. When all attached participants arrive at a barrier, the phase
76 * advances and all waiting participants are released.
78 * When a participant begins working on a parallel hash join, it must first
79 * figure out how much progress has already been made, because participants
80 * don't wait for each other to begin. For this reason there are switch
81 * statements at key points in the code where we have to synchronize our local
82 * state machine with the phase, and then jump to the correct part of the
83 * algorithm so that we can get started.
85 * One barrier called build_barrier is used to coordinate the hashing phases.
86 * The phase is represented by an integer which begins at zero and increments
87 * one by one, but in the code it is referred to by symbolic names as follows.
88 * An asterisk indicates a phase that is performed by a single arbitrarily
89 * chosen process.
91 * PHJ_BUILD_ELECT -- initial state
92 * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
93 * PHJ_BUILD_HASH_INNER -- all hash the inner rel
94 * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
95 * PHJ_BUILD_RUN -- building done, probing can begin
96 * PHJ_BUILD_FREE* -- all work complete, one frees batches
98 * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
99 * be used repeatedly as required to coordinate expansions in the number of
100 * batches or buckets. Their phases are as follows:
102 * PHJ_GROW_BATCHES_ELECT -- initial state
103 * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
104 * PHJ_GROW_BATCHES_REPARTITION -- all repartition
105 * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
106 * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
108 * PHJ_GROW_BUCKETS_ELECT -- initial state
109 * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
110 * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
112 * If the planner got the number of batches and buckets right, those won't be
113 * necessary, but on the other hand we might finish up needing to expand the
114 * buckets or batches multiple times while hashing the inner relation to stay
115 * within our memory budget and load factor target. For that reason it's a
116 * separate pair of barriers using circular phases.
118 * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
119 * because we need to divide the outer relation into batches up front in order
120 * to be able to process batches entirely independently. In contrast, the
121 * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
122 * batches whenever it encounters them while scanning and probing, which it
123 * can do because it processes batches in serial order.
125 * Once PHJ_BUILD_RUN is reached, backends then split up and process
126 * different batches, or gang up and work together on probing batches if there
127 * aren't enough to go around. For each batch there is a separate barrier
128 * with the following phases:
130 * PHJ_BATCH_ELECT -- initial state
131 * PHJ_BATCH_ALLOCATE* -- one allocates buckets
132 * PHJ_BATCH_LOAD -- all load the hash table from disk
133 * PHJ_BATCH_PROBE -- all probe
134 * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
135 * PHJ_BATCH_FREE* -- one frees memory
137 * Batch 0 is a special case, because it starts out in phase
138 * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
139 * PHJ_BUILD_HASH_INNER so we can skip loading.
141 * Initially we try to plan for a single-batch hash join using the combined
142 * hash_mem of all participants to create a large shared hash table. If that
143 * turns out either at planning or execution time to be impossible then we
144 * fall back to regular hash_mem sized hash tables.
146 * To avoid deadlocks, we never wait for any barrier unless it is known that
147 * all other backends attached to it are actively executing the node or have
148 * finished. Practically, that means that we never emit a tuple while attached
149 * to a barrier, unless the barrier has reached a phase that means that no
150 * process will wait on it again. We emit tuples while attached to the build
151 * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
152 * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
153 * respectively without waiting, using BarrierArriveAndDetach() and
154 * BarrierArriveAndDetachExceptLast() respectively. The last to detach
155 * receives a different return value so that it knows that it's safe to
156 * clean up. Any straggler process that attaches after that phase is reached
157 * will see that it's too late to participate or access the relevant shared
158 * memory objects.
160 *-------------------------------------------------------------------------
163 #include "postgres.h"
165 #include "access/htup_details.h"
166 #include "access/parallel.h"
167 #include "executor/executor.h"
168 #include "executor/hashjoin.h"
169 #include "executor/nodeHash.h"
170 #include "executor/nodeHashjoin.h"
171 #include "miscadmin.h"
172 #include "utils/sharedtuplestore.h"
173 #include "utils/wait_event.h"
177 * States of the ExecHashJoin state machine
179 #define HJ_BUILD_HASHTABLE 1
180 #define HJ_NEED_NEW_OUTER 2
181 #define HJ_SCAN_BUCKET 3
182 #define HJ_FILL_OUTER_TUPLE 4
183 #define HJ_FILL_INNER_TUPLES 5
184 #define HJ_NEED_NEW_BATCH 6
186 /* Returns true if doing null-fill on outer relation */
187 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
188 /* Returns true if doing null-fill on inner relation */
189 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
191 static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
192 HashJoinState *hjstate,
193 uint32 *hashvalue);
194 static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
195 HashJoinState *hjstate,
196 uint32 *hashvalue);
197 static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
198 BufFile *file,
199 uint32 *hashvalue,
200 TupleTableSlot *tupleSlot);
201 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
202 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
203 static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
206 /* ----------------------------------------------------------------
207 * ExecHashJoinImpl
209 * This function implements the Hybrid Hashjoin algorithm. It is marked
210 * with an always-inline attribute so that ExecHashJoin() and
211 * ExecParallelHashJoin() can inline it. Compilers that respect the
212 * attribute should create versions specialized for parallel == true and
213 * parallel == false with unnecessary branches removed.
215 * Note: the relation we build hash table on is the "inner"
216 * the other one is "outer".
217 * ----------------------------------------------------------------
219 static pg_attribute_always_inline TupleTableSlot *
220 ExecHashJoinImpl(PlanState *pstate, bool parallel)
222 HashJoinState *node = castNode(HashJoinState, pstate);
223 PlanState *outerNode;
224 HashState *hashNode;
225 ExprState *joinqual;
226 ExprState *otherqual;
227 ExprContext *econtext;
228 HashJoinTable hashtable;
229 TupleTableSlot *outerTupleSlot;
230 uint32 hashvalue;
231 int batchno;
232 ParallelHashJoinState *parallel_state;
235 * get information from HashJoin node
237 joinqual = node->js.joinqual;
238 otherqual = node->js.ps.qual;
239 hashNode = (HashState *) innerPlanState(node);
240 outerNode = outerPlanState(node);
241 hashtable = node->hj_HashTable;
242 econtext = node->js.ps.ps_ExprContext;
243 parallel_state = hashNode->parallel_state;
246 * Reset per-tuple memory context to free any expression evaluation
247 * storage allocated in the previous tuple cycle.
249 ResetExprContext(econtext);
252 * run the hash join state machine
254 for (;;)
257 * It's possible to iterate this loop many times before returning a
258 * tuple, in some pathological cases such as needing to move much of
259 * the current batch to a later batch. So let's check for interrupts
260 * each time through.
262 CHECK_FOR_INTERRUPTS();
264 switch (node->hj_JoinState)
266 case HJ_BUILD_HASHTABLE:
269 * First time through: build hash table for inner relation.
271 Assert(hashtable == NULL);
274 * If the outer relation is completely empty, and it's not
275 * right/right-anti/full join, we can quit without building
276 * the hash table. However, for an inner join it is only a
277 * win to check this when the outer relation's startup cost is
278 * less than the projected cost of building the hash table.
279 * Otherwise it's best to build the hash table first and see
280 * if the inner relation is empty. (When it's a left join, we
281 * should always make this check, since we aren't going to be
282 * able to skip the join on the strength of an empty inner
283 * relation anyway.)
285 * If we are rescanning the join, we make use of information
286 * gained on the previous scan: don't bother to try the
287 * prefetch if the previous scan found the outer relation
288 * nonempty. This is not 100% reliable since with new
289 * parameters the outer relation might yield different
290 * results, but it's a good heuristic.
292 * The only way to make the check is to try to fetch a tuple
293 * from the outer plan node. If we succeed, we have to stash
294 * it away for later consumption by ExecHashJoinOuterGetTuple.
296 if (HJ_FILL_INNER(node))
298 /* no chance to not build the hash table */
299 node->hj_FirstOuterTupleSlot = NULL;
301 else if (parallel)
304 * The empty-outer optimization is not implemented for
305 * shared hash tables, because no one participant can
306 * determine that there are no outer tuples, and it's not
307 * yet clear that it's worth the synchronization overhead
308 * of reaching consensus to figure that out. So we have
309 * to build the hash table.
311 node->hj_FirstOuterTupleSlot = NULL;
313 else if (HJ_FILL_OUTER(node) ||
314 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
315 !node->hj_OuterNotEmpty))
317 node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
318 if (TupIsNull(node->hj_FirstOuterTupleSlot))
320 node->hj_OuterNotEmpty = false;
321 return NULL;
323 else
324 node->hj_OuterNotEmpty = true;
326 else
327 node->hj_FirstOuterTupleSlot = NULL;
330 * Create the hash table. If using Parallel Hash, then
331 * whoever gets here first will create the hash table and any
332 * later arrivals will merely attach to it.
334 hashtable = ExecHashTableCreate(hashNode,
335 node->hj_HashOperators,
336 node->hj_Collations,
337 HJ_FILL_INNER(node));
338 node->hj_HashTable = hashtable;
341 * Execute the Hash node, to build the hash table. If using
342 * Parallel Hash, then we'll try to help hashing unless we
343 * arrived too late.
345 hashNode->hashtable = hashtable;
346 (void) MultiExecProcNode((PlanState *) hashNode);
349 * If the inner relation is completely empty, and we're not
350 * doing a left outer join, we can quit without scanning the
351 * outer relation.
353 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
355 if (parallel)
358 * Advance the build barrier to PHJ_BUILD_RUN before
359 * proceeding so we can negotiate resource cleanup.
361 Barrier *build_barrier = &parallel_state->build_barrier;
363 while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
364 BarrierArriveAndWait(build_barrier, 0);
366 return NULL;
370 * need to remember whether nbatch has increased since we
371 * began scanning the outer relation
373 hashtable->nbatch_outstart = hashtable->nbatch;
376 * Reset OuterNotEmpty for scan. (It's OK if we fetched a
377 * tuple above, because ExecHashJoinOuterGetTuple will
378 * immediately set it again.)
380 node->hj_OuterNotEmpty = false;
382 if (parallel)
384 Barrier *build_barrier;
386 build_barrier = &parallel_state->build_barrier;
387 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
388 BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
389 BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
390 if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
393 * If multi-batch, we need to hash the outer relation
394 * up front.
396 if (hashtable->nbatch > 1)
397 ExecParallelHashJoinPartitionOuter(node);
398 BarrierArriveAndWait(build_barrier,
399 WAIT_EVENT_HASH_BUILD_HASH_OUTER);
401 else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
404 * If we attached so late that the job is finished and
405 * the batch state has been freed, we can return
406 * immediately.
408 return NULL;
411 /* Each backend should now select a batch to work on. */
412 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
413 hashtable->curbatch = -1;
414 node->hj_JoinState = HJ_NEED_NEW_BATCH;
416 continue;
418 else
419 node->hj_JoinState = HJ_NEED_NEW_OUTER;
421 /* FALL THRU */
423 case HJ_NEED_NEW_OUTER:
426 * We don't have an outer tuple, try to get the next one
428 if (parallel)
429 outerTupleSlot =
430 ExecParallelHashJoinOuterGetTuple(outerNode, node,
431 &hashvalue);
432 else
433 outerTupleSlot =
434 ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
436 if (TupIsNull(outerTupleSlot))
438 /* end of batch, or maybe whole join */
439 if (HJ_FILL_INNER(node))
441 /* set up to scan for unmatched inner tuples */
442 if (parallel)
445 * Only one process is currently allow to handle
446 * each batch's unmatched tuples, in a parallel
447 * join.
449 if (ExecParallelPrepHashTableForUnmatched(node))
450 node->hj_JoinState = HJ_FILL_INNER_TUPLES;
451 else
452 node->hj_JoinState = HJ_NEED_NEW_BATCH;
454 else
456 ExecPrepHashTableForUnmatched(node);
457 node->hj_JoinState = HJ_FILL_INNER_TUPLES;
460 else
461 node->hj_JoinState = HJ_NEED_NEW_BATCH;
462 continue;
465 econtext->ecxt_outertuple = outerTupleSlot;
466 node->hj_MatchedOuter = false;
469 * Find the corresponding bucket for this tuple in the main
470 * hash table or skew hash table.
472 node->hj_CurHashValue = hashvalue;
473 ExecHashGetBucketAndBatch(hashtable, hashvalue,
474 &node->hj_CurBucketNo, &batchno);
475 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
476 hashvalue);
477 node->hj_CurTuple = NULL;
480 * The tuple might not belong to the current batch (where
481 * "current batch" includes the skew buckets if any).
483 if (batchno != hashtable->curbatch &&
484 node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
486 bool shouldFree;
487 MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
488 &shouldFree);
491 * Need to postpone this outer tuple to a later batch.
492 * Save it in the corresponding outer-batch file.
494 Assert(parallel_state == NULL);
495 Assert(batchno > hashtable->curbatch);
496 ExecHashJoinSaveTuple(mintuple, hashvalue,
497 &hashtable->outerBatchFile[batchno],
498 hashtable);
500 if (shouldFree)
501 heap_free_minimal_tuple(mintuple);
503 /* Loop around, staying in HJ_NEED_NEW_OUTER state */
504 continue;
507 /* OK, let's scan the bucket for matches */
508 node->hj_JoinState = HJ_SCAN_BUCKET;
510 /* FALL THRU */
512 case HJ_SCAN_BUCKET:
515 * Scan the selected hash bucket for matches to current outer
517 if (parallel)
519 if (!ExecParallelScanHashBucket(node, econtext))
521 /* out of matches; check for possible outer-join fill */
522 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
523 continue;
526 else
528 if (!ExecScanHashBucket(node, econtext))
530 /* out of matches; check for possible outer-join fill */
531 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
532 continue;
537 * We've got a match, but still need to test non-hashed quals.
538 * ExecScanHashBucket already set up all the state needed to
539 * call ExecQual.
541 * If we pass the qual, then save state for next call and have
542 * ExecProject form the projection, store it in the tuple
543 * table, and return the slot.
545 * Only the joinquals determine tuple match status, but all
546 * quals must pass to actually return the tuple.
548 if (joinqual == NULL || ExecQual(joinqual, econtext))
550 node->hj_MatchedOuter = true;
554 * This is really only needed if HJ_FILL_INNER(node), but
555 * we'll avoid the branch and just set it always.
557 if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
558 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
560 /* In an antijoin, we never return a matched tuple */
561 if (node->js.jointype == JOIN_ANTI)
563 node->hj_JoinState = HJ_NEED_NEW_OUTER;
564 continue;
568 * If we only need to consider the first matching inner
569 * tuple, then advance to next outer tuple after we've
570 * processed this one.
572 if (node->js.single_match)
573 node->hj_JoinState = HJ_NEED_NEW_OUTER;
576 * In a right-antijoin, we never return a matched tuple.
577 * If it's not an inner_unique join, we need to stay on
578 * the current outer tuple to continue scanning the inner
579 * side for matches.
581 if (node->js.jointype == JOIN_RIGHT_ANTI)
582 continue;
584 if (otherqual == NULL || ExecQual(otherqual, econtext))
585 return ExecProject(node->js.ps.ps_ProjInfo);
586 else
587 InstrCountFiltered2(node, 1);
589 else
590 InstrCountFiltered1(node, 1);
591 break;
593 case HJ_FILL_OUTER_TUPLE:
596 * The current outer tuple has run out of matches, so check
597 * whether to emit a dummy outer-join tuple. Whether we emit
598 * one or not, the next state is NEED_NEW_OUTER.
600 node->hj_JoinState = HJ_NEED_NEW_OUTER;
602 if (!node->hj_MatchedOuter &&
603 HJ_FILL_OUTER(node))
606 * Generate a fake join tuple with nulls for the inner
607 * tuple, and return it if it passes the non-join quals.
609 econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
611 if (otherqual == NULL || ExecQual(otherqual, econtext))
612 return ExecProject(node->js.ps.ps_ProjInfo);
613 else
614 InstrCountFiltered2(node, 1);
616 break;
618 case HJ_FILL_INNER_TUPLES:
621 * We have finished a batch, but we are doing
622 * right/right-anti/full join, so any unmatched inner tuples
623 * in the hashtable have to be emitted before we continue to
624 * the next batch.
626 if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
627 : ExecScanHashTableForUnmatched(node, econtext)))
629 /* no more unmatched tuples */
630 node->hj_JoinState = HJ_NEED_NEW_BATCH;
631 continue;
635 * Generate a fake join tuple with nulls for the outer tuple,
636 * and return it if it passes the non-join quals.
638 econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
640 if (otherqual == NULL || ExecQual(otherqual, econtext))
641 return ExecProject(node->js.ps.ps_ProjInfo);
642 else
643 InstrCountFiltered2(node, 1);
644 break;
646 case HJ_NEED_NEW_BATCH:
649 * Try to advance to next batch. Done if there are no more.
651 if (parallel)
653 if (!ExecParallelHashJoinNewBatch(node))
654 return NULL; /* end of parallel-aware join */
656 else
658 if (!ExecHashJoinNewBatch(node))
659 return NULL; /* end of parallel-oblivious join */
661 node->hj_JoinState = HJ_NEED_NEW_OUTER;
662 break;
664 default:
665 elog(ERROR, "unrecognized hashjoin state: %d",
666 (int) node->hj_JoinState);
671 /* ----------------------------------------------------------------
672 * ExecHashJoin
674 * Parallel-oblivious version.
675 * ----------------------------------------------------------------
677 static TupleTableSlot * /* return: a tuple or NULL */
678 ExecHashJoin(PlanState *pstate)
681 * On sufficiently smart compilers this should be inlined with the
682 * parallel-aware branches removed.
684 return ExecHashJoinImpl(pstate, false);
687 /* ----------------------------------------------------------------
688 * ExecParallelHashJoin
690 * Parallel-aware version.
691 * ----------------------------------------------------------------
693 static TupleTableSlot * /* return: a tuple or NULL */
694 ExecParallelHashJoin(PlanState *pstate)
697 * On sufficiently smart compilers this should be inlined with the
698 * parallel-oblivious branches removed.
700 return ExecHashJoinImpl(pstate, true);
703 /* ----------------------------------------------------------------
704 * ExecInitHashJoin
706 * Init routine for HashJoin node.
707 * ----------------------------------------------------------------
709 HashJoinState *
710 ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
712 HashJoinState *hjstate;
713 Plan *outerNode;
714 Hash *hashNode;
715 TupleDesc outerDesc,
716 innerDesc;
717 const TupleTableSlotOps *ops;
719 /* check for unsupported flags */
720 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
723 * create state structure
725 hjstate = makeNode(HashJoinState);
726 hjstate->js.ps.plan = (Plan *) node;
727 hjstate->js.ps.state = estate;
730 * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
731 * where this function may be replaced with a parallel version, if we
732 * managed to launch a parallel query.
734 hjstate->js.ps.ExecProcNode = ExecHashJoin;
735 hjstate->js.jointype = node->join.jointype;
738 * Miscellaneous initialization
740 * create expression context for node
742 ExecAssignExprContext(estate, &hjstate->js.ps);
745 * initialize child nodes
747 * Note: we could suppress the REWIND flag for the inner input, which
748 * would amount to betting that the hash will be a single batch. Not
749 * clear if this would be a win or not.
751 outerNode = outerPlan(node);
752 hashNode = (Hash *) innerPlan(node);
754 outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
755 outerDesc = ExecGetResultType(outerPlanState(hjstate));
756 innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
757 innerDesc = ExecGetResultType(innerPlanState(hjstate));
760 * Initialize result slot, type and projection.
762 ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
763 ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
766 * tuple table initialization
768 ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
769 hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
770 ops);
773 * detect whether we need only consider the first matching inner tuple
775 hjstate->js.single_match = (node->join.inner_unique ||
776 node->join.jointype == JOIN_SEMI);
778 /* set up null tuples for outer joins, if needed */
779 switch (node->join.jointype)
781 case JOIN_INNER:
782 case JOIN_SEMI:
783 break;
784 case JOIN_LEFT:
785 case JOIN_ANTI:
786 hjstate->hj_NullInnerTupleSlot =
787 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
788 break;
789 case JOIN_RIGHT:
790 case JOIN_RIGHT_ANTI:
791 hjstate->hj_NullOuterTupleSlot =
792 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
793 break;
794 case JOIN_FULL:
795 hjstate->hj_NullOuterTupleSlot =
796 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
797 hjstate->hj_NullInnerTupleSlot =
798 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
799 break;
800 default:
801 elog(ERROR, "unrecognized join type: %d",
802 (int) node->join.jointype);
806 * now for some voodoo. our temporary tuple slot is actually the result
807 * tuple slot of the Hash node (which is our inner plan). we can do this
808 * because Hash nodes don't return tuples via ExecProcNode() -- instead
809 * the hash join node uses ExecScanHashBucket() to get at the contents of
810 * the hash table. -cim 6/9/91
813 HashState *hashstate = (HashState *) innerPlanState(hjstate);
814 TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
816 hjstate->hj_HashTupleSlot = slot;
820 * initialize child expressions
822 hjstate->js.ps.qual =
823 ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
824 hjstate->js.joinqual =
825 ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
826 hjstate->hashclauses =
827 ExecInitQual(node->hashclauses, (PlanState *) hjstate);
830 * initialize hash-specific info
832 hjstate->hj_HashTable = NULL;
833 hjstate->hj_FirstOuterTupleSlot = NULL;
835 hjstate->hj_CurHashValue = 0;
836 hjstate->hj_CurBucketNo = 0;
837 hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
838 hjstate->hj_CurTuple = NULL;
840 hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
841 (PlanState *) hjstate);
842 hjstate->hj_HashOperators = node->hashoperators;
843 hjstate->hj_Collations = node->hashcollations;
845 hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
846 hjstate->hj_MatchedOuter = false;
847 hjstate->hj_OuterNotEmpty = false;
849 return hjstate;
852 /* ----------------------------------------------------------------
853 * ExecEndHashJoin
855 * clean up routine for HashJoin node
856 * ----------------------------------------------------------------
858 void
859 ExecEndHashJoin(HashJoinState *node)
862 * Free hash table
864 if (node->hj_HashTable)
866 ExecHashTableDestroy(node->hj_HashTable);
867 node->hj_HashTable = NULL;
871 * clean up subtrees
873 ExecEndNode(outerPlanState(node));
874 ExecEndNode(innerPlanState(node));
878 * ExecHashJoinOuterGetTuple
880 * get the next outer tuple for a parallel oblivious hashjoin: either by
881 * executing the outer plan node in the first pass, or from the temp
882 * files for the hashjoin batches.
884 * Returns a null slot if no more outer tuples (within the current batch).
886 * On success, the tuple's hash value is stored at *hashvalue --- this is
887 * either originally computed, or re-read from the temp file.
889 static TupleTableSlot *
890 ExecHashJoinOuterGetTuple(PlanState *outerNode,
891 HashJoinState *hjstate,
892 uint32 *hashvalue)
894 HashJoinTable hashtable = hjstate->hj_HashTable;
895 int curbatch = hashtable->curbatch;
896 TupleTableSlot *slot;
898 if (curbatch == 0) /* if it is the first pass */
901 * Check to see if first outer tuple was already fetched by
902 * ExecHashJoin() and not used yet.
904 slot = hjstate->hj_FirstOuterTupleSlot;
905 if (!TupIsNull(slot))
906 hjstate->hj_FirstOuterTupleSlot = NULL;
907 else
908 slot = ExecProcNode(outerNode);
910 while (!TupIsNull(slot))
913 * We have to compute the tuple's hash value.
915 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
917 econtext->ecxt_outertuple = slot;
918 if (ExecHashGetHashValue(hashtable, econtext,
919 hjstate->hj_OuterHashKeys,
920 true, /* outer tuple */
921 HJ_FILL_OUTER(hjstate),
922 hashvalue))
924 /* remember outer relation is not empty for possible rescan */
925 hjstate->hj_OuterNotEmpty = true;
927 return slot;
931 * That tuple couldn't match because of a NULL, so discard it and
932 * continue with the next one.
934 slot = ExecProcNode(outerNode);
937 else if (curbatch < hashtable->nbatch)
939 BufFile *file = hashtable->outerBatchFile[curbatch];
942 * In outer-join cases, we could get here even though the batch file
943 * is empty.
945 if (file == NULL)
946 return NULL;
948 slot = ExecHashJoinGetSavedTuple(hjstate,
949 file,
950 hashvalue,
951 hjstate->hj_OuterTupleSlot);
952 if (!TupIsNull(slot))
953 return slot;
956 /* End of this batch */
957 return NULL;
961 * ExecHashJoinOuterGetTuple variant for the parallel case.
963 static TupleTableSlot *
964 ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
965 HashJoinState *hjstate,
966 uint32 *hashvalue)
968 HashJoinTable hashtable = hjstate->hj_HashTable;
969 int curbatch = hashtable->curbatch;
970 TupleTableSlot *slot;
973 * In the Parallel Hash case we only run the outer plan directly for
974 * single-batch hash joins. Otherwise we have to go to batch files, even
975 * for batch 0.
977 if (curbatch == 0 && hashtable->nbatch == 1)
979 slot = ExecProcNode(outerNode);
981 while (!TupIsNull(slot))
983 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
985 econtext->ecxt_outertuple = slot;
986 if (ExecHashGetHashValue(hashtable, econtext,
987 hjstate->hj_OuterHashKeys,
988 true, /* outer tuple */
989 HJ_FILL_OUTER(hjstate),
990 hashvalue))
991 return slot;
994 * That tuple couldn't match because of a NULL, so discard it and
995 * continue with the next one.
997 slot = ExecProcNode(outerNode);
1000 else if (curbatch < hashtable->nbatch)
1002 MinimalTuple tuple;
1004 tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
1005 hashvalue);
1006 if (tuple != NULL)
1008 ExecForceStoreMinimalTuple(tuple,
1009 hjstate->hj_OuterTupleSlot,
1010 false);
1011 slot = hjstate->hj_OuterTupleSlot;
1012 return slot;
1014 else
1015 ExecClearTuple(hjstate->hj_OuterTupleSlot);
1018 /* End of this batch */
1019 hashtable->batches[curbatch].outer_eof = true;
1021 return NULL;
1025 * ExecHashJoinNewBatch
1026 * switch to a new hashjoin batch
1028 * Returns true if successful, false if there are no more batches.
1030 static bool
1031 ExecHashJoinNewBatch(HashJoinState *hjstate)
1033 HashJoinTable hashtable = hjstate->hj_HashTable;
1034 int nbatch;
1035 int curbatch;
1036 BufFile *innerFile;
1037 TupleTableSlot *slot;
1038 uint32 hashvalue;
1040 nbatch = hashtable->nbatch;
1041 curbatch = hashtable->curbatch;
1043 if (curbatch > 0)
1046 * We no longer need the previous outer batch file; close it right
1047 * away to free disk space.
1049 if (hashtable->outerBatchFile[curbatch])
1050 BufFileClose(hashtable->outerBatchFile[curbatch]);
1051 hashtable->outerBatchFile[curbatch] = NULL;
1053 else /* we just finished the first batch */
1056 * Reset some of the skew optimization state variables, since we no
1057 * longer need to consider skew tuples after the first batch. The
1058 * memory context reset we are about to do will release the skew
1059 * hashtable itself.
1061 hashtable->skewEnabled = false;
1062 hashtable->skewBucket = NULL;
1063 hashtable->skewBucketNums = NULL;
1064 hashtable->nSkewBuckets = 0;
1065 hashtable->spaceUsedSkew = 0;
1069 * We can always skip over any batches that are completely empty on both
1070 * sides. We can sometimes skip over batches that are empty on only one
1071 * side, but there are exceptions:
1073 * 1. In a left/full outer join, we have to process outer batches even if
1074 * the inner batch is empty. Similarly, in a right/right-anti/full outer
1075 * join, we have to process inner batches even if the outer batch is
1076 * empty.
1078 * 2. If we have increased nbatch since the initial estimate, we have to
1079 * scan inner batches since they might contain tuples that need to be
1080 * reassigned to later inner batches.
1082 * 3. Similarly, if we have increased nbatch since starting the outer
1083 * scan, we have to rescan outer batches in case they contain tuples that
1084 * need to be reassigned.
1086 curbatch++;
1087 while (curbatch < nbatch &&
1088 (hashtable->outerBatchFile[curbatch] == NULL ||
1089 hashtable->innerBatchFile[curbatch] == NULL))
1091 if (hashtable->outerBatchFile[curbatch] &&
1092 HJ_FILL_OUTER(hjstate))
1093 break; /* must process due to rule 1 */
1094 if (hashtable->innerBatchFile[curbatch] &&
1095 HJ_FILL_INNER(hjstate))
1096 break; /* must process due to rule 1 */
1097 if (hashtable->innerBatchFile[curbatch] &&
1098 nbatch != hashtable->nbatch_original)
1099 break; /* must process due to rule 2 */
1100 if (hashtable->outerBatchFile[curbatch] &&
1101 nbatch != hashtable->nbatch_outstart)
1102 break; /* must process due to rule 3 */
1103 /* We can ignore this batch. */
1104 /* Release associated temp files right away. */
1105 if (hashtable->innerBatchFile[curbatch])
1106 BufFileClose(hashtable->innerBatchFile[curbatch]);
1107 hashtable->innerBatchFile[curbatch] = NULL;
1108 if (hashtable->outerBatchFile[curbatch])
1109 BufFileClose(hashtable->outerBatchFile[curbatch]);
1110 hashtable->outerBatchFile[curbatch] = NULL;
1111 curbatch++;
1114 if (curbatch >= nbatch)
1115 return false; /* no more batches */
1117 hashtable->curbatch = curbatch;
1120 * Reload the hash table with the new inner batch (which could be empty)
1122 ExecHashTableReset(hashtable);
1124 innerFile = hashtable->innerBatchFile[curbatch];
1126 if (innerFile != NULL)
1128 if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
1129 ereport(ERROR,
1130 (errcode_for_file_access(),
1131 errmsg("could not rewind hash-join temporary file")));
1133 while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1134 innerFile,
1135 &hashvalue,
1136 hjstate->hj_HashTupleSlot)))
1139 * NOTE: some tuples may be sent to future batches. Also, it is
1140 * possible for hashtable->nbatch to be increased here!
1142 ExecHashTableInsert(hashtable, slot, hashvalue);
1146 * after we build the hash table, the inner batch file is no longer
1147 * needed
1149 BufFileClose(innerFile);
1150 hashtable->innerBatchFile[curbatch] = NULL;
1154 * Rewind outer batch file (if present), so that we can start reading it.
1156 if (hashtable->outerBatchFile[curbatch] != NULL)
1158 if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
1159 ereport(ERROR,
1160 (errcode_for_file_access(),
1161 errmsg("could not rewind hash-join temporary file")));
1164 return true;
1168 * Choose a batch to work on, and attach to it. Returns true if successful,
1169 * false if there are no more batches.
1171 static bool
1172 ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1174 HashJoinTable hashtable = hjstate->hj_HashTable;
1175 int start_batchno;
1176 int batchno;
1179 * If we were already attached to a batch, remember not to bother checking
1180 * it again, and detach from it (possibly freeing the hash table if we are
1181 * last to detach).
1183 if (hashtable->curbatch >= 0)
1185 hashtable->batches[hashtable->curbatch].done = true;
1186 ExecHashTableDetachBatch(hashtable);
1190 * Search for a batch that isn't done. We use an atomic counter to start
1191 * our search at a different batch in every participant when there are
1192 * more batches than participants.
1194 batchno = start_batchno =
1195 pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1196 hashtable->nbatch;
1199 uint32 hashvalue;
1200 MinimalTuple tuple;
1201 TupleTableSlot *slot;
1203 if (!hashtable->batches[batchno].done)
1205 SharedTuplestoreAccessor *inner_tuples;
1206 Barrier *batch_barrier =
1207 &hashtable->batches[batchno].shared->batch_barrier;
1209 switch (BarrierAttach(batch_barrier))
1211 case PHJ_BATCH_ELECT:
1213 /* One backend allocates the hash table. */
1214 if (BarrierArriveAndWait(batch_barrier,
1215 WAIT_EVENT_HASH_BATCH_ELECT))
1216 ExecParallelHashTableAlloc(hashtable, batchno);
1217 /* Fall through. */
1219 case PHJ_BATCH_ALLOCATE:
1220 /* Wait for allocation to complete. */
1221 BarrierArriveAndWait(batch_barrier,
1222 WAIT_EVENT_HASH_BATCH_ALLOCATE);
1223 /* Fall through. */
1225 case PHJ_BATCH_LOAD:
1226 /* Start (or join in) loading tuples. */
1227 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1228 inner_tuples = hashtable->batches[batchno].inner_tuples;
1229 sts_begin_parallel_scan(inner_tuples);
1230 while ((tuple = sts_parallel_scan_next(inner_tuples,
1231 &hashvalue)))
1233 ExecForceStoreMinimalTuple(tuple,
1234 hjstate->hj_HashTupleSlot,
1235 false);
1236 slot = hjstate->hj_HashTupleSlot;
1237 ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1238 hashvalue);
1240 sts_end_parallel_scan(inner_tuples);
1241 BarrierArriveAndWait(batch_barrier,
1242 WAIT_EVENT_HASH_BATCH_LOAD);
1243 /* Fall through. */
1245 case PHJ_BATCH_PROBE:
1248 * This batch is ready to probe. Return control to
1249 * caller. We stay attached to batch_barrier so that the
1250 * hash table stays alive until everyone's finished
1251 * probing it, but no participant is allowed to wait at
1252 * this barrier again (or else a deadlock could occur).
1253 * All attached participants must eventually detach from
1254 * the barrier and one worker must advance the phase so
1255 * that the final phase is reached.
1257 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1258 sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1260 return true;
1261 case PHJ_BATCH_SCAN:
1264 * In principle, we could help scan for unmatched tuples,
1265 * since that phase is already underway (the thing we
1266 * can't do under current deadlock-avoidance rules is wait
1267 * for others to arrive at PHJ_BATCH_SCAN, because
1268 * PHJ_BATCH_PROBE emits tuples, but in this case we just
1269 * got here without waiting). That is not yet done. For
1270 * now, we just detach and go around again. We have to
1271 * use ExecHashTableDetachBatch() because there's a small
1272 * chance we'll be the last to detach, and then we're
1273 * responsible for freeing memory.
1275 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1276 hashtable->batches[batchno].done = true;
1277 ExecHashTableDetachBatch(hashtable);
1278 break;
1280 case PHJ_BATCH_FREE:
1283 * Already done. Detach and go around again (if any
1284 * remain).
1286 BarrierDetach(batch_barrier);
1287 hashtable->batches[batchno].done = true;
1288 hashtable->curbatch = -1;
1289 break;
1291 default:
1292 elog(ERROR, "unexpected batch phase %d",
1293 BarrierPhase(batch_barrier));
1296 batchno = (batchno + 1) % hashtable->nbatch;
1297 } while (batchno != start_batchno);
1299 return false;
1303 * ExecHashJoinSaveTuple
1304 * save a tuple to a batch file.
1306 * The data recorded in the file for each tuple is its hash value,
1307 * then the tuple in MinimalTuple format.
1309 * fileptr points to a batch file in one of the hashtable arrays.
1311 * The batch files (and their buffers) are allocated in the spill context
1312 * created for the hashtable.
1314 void
1315 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1316 BufFile **fileptr, HashJoinTable hashtable)
1318 BufFile *file = *fileptr;
1321 * The batch file is lazily created. If this is the first tuple written to
1322 * this batch, the batch file is created and its buffer is allocated in
1323 * the spillCxt context, NOT in the batchCxt.
1325 * During the build phase, buffered files are created for inner batches.
1326 * Each batch's buffered file is closed (and its buffer freed) after the
1327 * batch is loaded into memory during the outer side scan. Therefore, it
1328 * is necessary to allocate the batch file buffer in a memory context
1329 * which outlives the batch itself.
1331 * Also, we use spillCxt instead of hashCxt for a better accounting of the
1332 * spilling memory consumption.
1334 if (file == NULL)
1336 MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
1338 file = BufFileCreateTemp(false);
1339 *fileptr = file;
1341 MemoryContextSwitchTo(oldctx);
1344 BufFileWrite(file, &hashvalue, sizeof(uint32));
1345 BufFileWrite(file, tuple, tuple->t_len);
1349 * ExecHashJoinGetSavedTuple
1350 * read the next tuple from a batch file. Return NULL if no more.
1352 * On success, *hashvalue is set to the tuple's hash value, and the tuple
1353 * itself is stored in the given slot.
1355 static TupleTableSlot *
1356 ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1357 BufFile *file,
1358 uint32 *hashvalue,
1359 TupleTableSlot *tupleSlot)
1361 uint32 header[2];
1362 size_t nread;
1363 MinimalTuple tuple;
1366 * We check for interrupts here because this is typically taken as an
1367 * alternative code path to an ExecProcNode() call, which would include
1368 * such a check.
1370 CHECK_FOR_INTERRUPTS();
1373 * Since both the hash value and the MinimalTuple length word are uint32,
1374 * we can read them both in one BufFileRead() call without any type
1375 * cheating.
1377 nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
1378 if (nread == 0) /* end of file */
1380 ExecClearTuple(tupleSlot);
1381 return NULL;
1383 *hashvalue = header[0];
1384 tuple = (MinimalTuple) palloc(header[1]);
1385 tuple->t_len = header[1];
1386 BufFileReadExact(file,
1387 (char *) tuple + sizeof(uint32),
1388 header[1] - sizeof(uint32));
1389 ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1390 return tupleSlot;
1394 void
1395 ExecReScanHashJoin(HashJoinState *node)
1397 PlanState *outerPlan = outerPlanState(node);
1398 PlanState *innerPlan = innerPlanState(node);
1401 * In a multi-batch join, we currently have to do rescans the hard way,
1402 * primarily because batch temp files may have already been released. But
1403 * if it's a single-batch join, and there is no parameter change for the
1404 * inner subnode, then we can just re-use the existing hash table without
1405 * rebuilding it.
1407 if (node->hj_HashTable != NULL)
1409 if (node->hj_HashTable->nbatch == 1 &&
1410 innerPlan->chgParam == NULL)
1413 * Okay to reuse the hash table; needn't rescan inner, either.
1415 * However, if it's a right/right-anti/full join, we'd better
1416 * reset the inner-tuple match flags contained in the table.
1418 if (HJ_FILL_INNER(node))
1419 ExecHashTableResetMatchFlags(node->hj_HashTable);
1422 * Also, we need to reset our state about the emptiness of the
1423 * outer relation, so that the new scan of the outer will update
1424 * it correctly if it turns out to be empty this time. (There's no
1425 * harm in clearing it now because ExecHashJoin won't need the
1426 * info. In the other cases, where the hash table doesn't exist
1427 * or we are destroying it, we leave this state alone because
1428 * ExecHashJoin will need it the first time through.)
1430 node->hj_OuterNotEmpty = false;
1432 /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1433 node->hj_JoinState = HJ_NEED_NEW_OUTER;
1435 else
1437 /* must destroy and rebuild hash table */
1438 HashState *hashNode = castNode(HashState, innerPlan);
1440 Assert(hashNode->hashtable == node->hj_HashTable);
1441 /* accumulate stats from old hash table, if wanted */
1442 /* (this should match ExecShutdownHash) */
1443 if (hashNode->ps.instrument && !hashNode->hinstrument)
1444 hashNode->hinstrument = (HashInstrumentation *)
1445 palloc0(sizeof(HashInstrumentation));
1446 if (hashNode->hinstrument)
1447 ExecHashAccumInstrumentation(hashNode->hinstrument,
1448 hashNode->hashtable);
1449 /* for safety, be sure to clear child plan node's pointer too */
1450 hashNode->hashtable = NULL;
1452 ExecHashTableDestroy(node->hj_HashTable);
1453 node->hj_HashTable = NULL;
1454 node->hj_JoinState = HJ_BUILD_HASHTABLE;
1457 * if chgParam of subnode is not null then plan will be re-scanned
1458 * by first ExecProcNode.
1460 if (innerPlan->chgParam == NULL)
1461 ExecReScan(innerPlan);
1465 /* Always reset intra-tuple state */
1466 node->hj_CurHashValue = 0;
1467 node->hj_CurBucketNo = 0;
1468 node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1469 node->hj_CurTuple = NULL;
1471 node->hj_MatchedOuter = false;
1472 node->hj_FirstOuterTupleSlot = NULL;
1475 * if chgParam of subnode is not null then plan will be re-scanned by
1476 * first ExecProcNode.
1478 if (outerPlan->chgParam == NULL)
1479 ExecReScan(outerPlan);
1482 void
1483 ExecShutdownHashJoin(HashJoinState *node)
1485 if (node->hj_HashTable)
1488 * Detach from shared state before DSM memory goes away. This makes
1489 * sure that we don't have any pointers into DSM memory by the time
1490 * ExecEndHashJoin runs.
1492 ExecHashTableDetachBatch(node->hj_HashTable);
1493 ExecHashTableDetach(node->hj_HashTable);
1497 static void
1498 ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1500 PlanState *outerState = outerPlanState(hjstate);
1501 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1502 HashJoinTable hashtable = hjstate->hj_HashTable;
1503 TupleTableSlot *slot;
1504 uint32 hashvalue;
1505 int i;
1507 Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1509 /* Execute outer plan, writing all tuples to shared tuplestores. */
1510 for (;;)
1512 slot = ExecProcNode(outerState);
1513 if (TupIsNull(slot))
1514 break;
1515 econtext->ecxt_outertuple = slot;
1516 if (ExecHashGetHashValue(hashtable, econtext,
1517 hjstate->hj_OuterHashKeys,
1518 true, /* outer tuple */
1519 HJ_FILL_OUTER(hjstate),
1520 &hashvalue))
1522 int batchno;
1523 int bucketno;
1524 bool shouldFree;
1525 MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1527 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1528 &batchno);
1529 sts_puttuple(hashtable->batches[batchno].outer_tuples,
1530 &hashvalue, mintup);
1532 if (shouldFree)
1533 heap_free_minimal_tuple(mintup);
1535 CHECK_FOR_INTERRUPTS();
1538 /* Make sure all outer partitions are readable by any backend. */
1539 for (i = 0; i < hashtable->nbatch; ++i)
1540 sts_end_write(hashtable->batches[i].outer_tuples);
1543 void
1544 ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1546 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1547 shm_toc_estimate_keys(&pcxt->estimator, 1);
1550 void
1551 ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1553 int plan_node_id = state->js.ps.plan->plan_node_id;
1554 HashState *hashNode;
1555 ParallelHashJoinState *pstate;
1558 * Disable shared hash table mode if we failed to create a real DSM
1559 * segment, because that means that we don't have a DSA area to work with.
1561 if (pcxt->seg == NULL)
1562 return;
1564 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1567 * Set up the state needed to coordinate access to the shared hash
1568 * table(s), using the plan node ID as the toc key.
1570 pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1571 shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1574 * Set up the shared hash join state with no batches initially.
1575 * ExecHashTableCreate() will prepare at least one later and set nbatch
1576 * and space_allowed.
1578 pstate->nbatch = 0;
1579 pstate->space_allowed = 0;
1580 pstate->batches = InvalidDsaPointer;
1581 pstate->old_batches = InvalidDsaPointer;
1582 pstate->nbuckets = 0;
1583 pstate->growth = PHJ_GROWTH_OK;
1584 pstate->chunk_work_queue = InvalidDsaPointer;
1585 pg_atomic_init_u32(&pstate->distributor, 0);
1586 pstate->nparticipants = pcxt->nworkers + 1;
1587 pstate->total_tuples = 0;
1588 LWLockInitialize(&pstate->lock,
1589 LWTRANCHE_PARALLEL_HASH_JOIN);
1590 BarrierInit(&pstate->build_barrier, 0);
1591 BarrierInit(&pstate->grow_batches_barrier, 0);
1592 BarrierInit(&pstate->grow_buckets_barrier, 0);
1594 /* Set up the space we'll use for shared temporary files. */
1595 SharedFileSetInit(&pstate->fileset, pcxt->seg);
1597 /* Initialize the shared state in the hash node. */
1598 hashNode = (HashState *) innerPlanState(state);
1599 hashNode->parallel_state = pstate;
1602 /* ----------------------------------------------------------------
1603 * ExecHashJoinReInitializeDSM
1605 * Reset shared state before beginning a fresh scan.
1606 * ----------------------------------------------------------------
1608 void
1609 ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1611 int plan_node_id = state->js.ps.plan->plan_node_id;
1612 ParallelHashJoinState *pstate;
1614 /* Nothing to do if we failed to create a DSM segment. */
1615 if (pcxt->seg == NULL)
1616 return;
1618 pstate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
1621 * It would be possible to reuse the shared hash table in single-batch
1622 * cases by resetting and then fast-forwarding build_barrier to
1623 * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1624 * currently shared hash tables are already freed by now (by the last
1625 * participant to detach from the batch). We could consider keeping it
1626 * around for single-batch joins. We'd also need to adjust
1627 * finalize_plan() so that it doesn't record a dummy dependency for
1628 * Parallel Hash nodes, preventing the rescan optimization. For now we
1629 * don't try.
1632 /* Detach, freeing any remaining shared memory. */
1633 if (state->hj_HashTable != NULL)
1635 ExecHashTableDetachBatch(state->hj_HashTable);
1636 ExecHashTableDetach(state->hj_HashTable);
1639 /* Clear any shared batch files. */
1640 SharedFileSetDeleteAll(&pstate->fileset);
1642 /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1643 BarrierInit(&pstate->build_barrier, 0);
1646 void
1647 ExecHashJoinInitializeWorker(HashJoinState *state,
1648 ParallelWorkerContext *pwcxt)
1650 HashState *hashNode;
1651 int plan_node_id = state->js.ps.plan->plan_node_id;
1652 ParallelHashJoinState *pstate =
1653 shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1655 /* Attach to the space for shared temporary files. */
1656 SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1658 /* Attach to the shared state in the hash node. */
1659 hashNode = (HashState *) innerPlanState(state);
1660 hashNode->parallel_state = pstate;
1662 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);