Harmonize parameter names in ecpg code.
[pgsql.git] / src / backend / executor / nodeHashjoin.c
blob2718c2113f586f6cf14c7ad68f00c965c888e553
1 /*-------------------------------------------------------------------------
3 * nodeHashjoin.c
4 * Routines to handle hash join nodes
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/executor/nodeHashjoin.c
13 * PARALLELISM
15 * Hash joins can participate in parallel query execution in several ways. A
16 * parallel-oblivious hash join is one where the node is unaware that it is
17 * part of a parallel plan. In this case, a copy of the inner plan is used to
18 * build a copy of the hash table in every backend, and the outer plan could
19 * either be built from a partial or complete path, so that the results of the
20 * hash join are correspondingly either partial or complete. A parallel-aware
21 * hash join is one that behaves differently, coordinating work between
22 * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23 * Hash Join always appears with a Parallel Hash node.
25 * Parallel-aware hash joins use the same per-backend state machine to track
26 * progress through the hash join algorithm as parallel-oblivious hash joins.
27 * In a parallel-aware hash join, there is also a shared state machine that
28 * co-operating backends use to synchronize their local state machines and
29 * program counters. The shared state machine is managed with a Barrier IPC
30 * primitive. When all attached participants arrive at a barrier, the phase
31 * advances and all waiting participants are released.
33 * When a participant begins working on a parallel hash join, it must first
34 * figure out how much progress has already been made, because participants
35 * don't wait for each other to begin. For this reason there are switch
36 * statements at key points in the code where we have to synchronize our local
37 * state machine with the phase, and then jump to the correct part of the
38 * algorithm so that we can get started.
40 * One barrier called build_barrier is used to coordinate the hashing phases.
41 * The phase is represented by an integer which begins at zero and increments
42 * one by one, but in the code it is referred to by symbolic names as follows:
44 * PHJ_BUILD_ELECTING -- initial state
45 * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
46 * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
47 * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
48 * PHJ_BUILD_DONE -- building done, probing can begin
50 * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
51 * be used repeatedly as required to coordinate expansions in the number of
52 * batches or buckets. Their phases are as follows:
54 * PHJ_GROW_BATCHES_ELECTING -- initial state
55 * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
56 * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
57 * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
59 * PHJ_GROW_BUCKETS_ELECTING -- initial state
60 * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
61 * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
63 * If the planner got the number of batches and buckets right, those won't be
64 * necessary, but on the other hand we might finish up needing to expand the
65 * buckets or batches multiple times while hashing the inner relation to stay
66 * within our memory budget and load factor target. For that reason it's a
67 * separate pair of barriers using circular phases.
69 * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
70 * because we need to divide the outer relation into batches up front in order
71 * to be able to process batches entirely independently. In contrast, the
72 * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
73 * batches whenever it encounters them while scanning and probing, which it
74 * can do because it processes batches in serial order.
76 * Once PHJ_BUILD_DONE is reached, backends then split up and process
77 * different batches, or gang up and work together on probing batches if there
78 * aren't enough to go around. For each batch there is a separate barrier
79 * with the following phases:
81 * PHJ_BATCH_ELECTING -- initial state
82 * PHJ_BATCH_ALLOCATING -- one allocates buckets
83 * PHJ_BATCH_LOADING -- all load the hash table from disk
84 * PHJ_BATCH_PROBING -- all probe
85 * PHJ_BATCH_DONE -- end
87 * Batch 0 is a special case, because it starts out in phase
88 * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
89 * PHJ_BUILD_HASHING_INNER so we can skip loading.
91 * Initially we try to plan for a single-batch hash join using the combined
92 * hash_mem of all participants to create a large shared hash table. If that
93 * turns out either at planning or execution time to be impossible then we
94 * fall back to regular hash_mem sized hash tables.
96 * To avoid deadlocks, we never wait for any barrier unless it is known that
97 * all other backends attached to it are actively executing the node or have
98 * already arrived. Practically, that means that we never return a tuple
99 * while attached to a barrier, unless the barrier has reached its final
100 * state. In the slightly special case of the per-batch barrier, we return
101 * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
102 * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
104 *-------------------------------------------------------------------------
107 #include "postgres.h"
109 #include "access/htup_details.h"
110 #include "access/parallel.h"
111 #include "executor/executor.h"
112 #include "executor/hashjoin.h"
113 #include "executor/nodeHash.h"
114 #include "executor/nodeHashjoin.h"
115 #include "miscadmin.h"
116 #include "pgstat.h"
117 #include "utils/memutils.h"
118 #include "utils/sharedtuplestore.h"
122 * States of the ExecHashJoin state machine
124 #define HJ_BUILD_HASHTABLE 1
125 #define HJ_NEED_NEW_OUTER 2
126 #define HJ_SCAN_BUCKET 3
127 #define HJ_FILL_OUTER_TUPLE 4
128 #define HJ_FILL_INNER_TUPLES 5
129 #define HJ_NEED_NEW_BATCH 6
131 /* Returns true if doing null-fill on outer relation */
132 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
133 /* Returns true if doing null-fill on inner relation */
134 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
136 static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
137 HashJoinState *hjstate,
138 uint32 *hashvalue);
139 static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
140 HashJoinState *hjstate,
141 uint32 *hashvalue);
142 static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
143 BufFile *file,
144 uint32 *hashvalue,
145 TupleTableSlot *tupleSlot);
146 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
147 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
148 static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
151 /* ----------------------------------------------------------------
152 * ExecHashJoinImpl
154 * This function implements the Hybrid Hashjoin algorithm. It is marked
155 * with an always-inline attribute so that ExecHashJoin() and
156 * ExecParallelHashJoin() can inline it. Compilers that respect the
157 * attribute should create versions specialized for parallel == true and
158 * parallel == false with unnecessary branches removed.
160 * Note: the relation we build hash table on is the "inner"
161 * the other one is "outer".
162 * ----------------------------------------------------------------
164 static pg_attribute_always_inline TupleTableSlot *
165 ExecHashJoinImpl(PlanState *pstate, bool parallel)
167 HashJoinState *node = castNode(HashJoinState, pstate);
168 PlanState *outerNode;
169 HashState *hashNode;
170 ExprState *joinqual;
171 ExprState *otherqual;
172 ExprContext *econtext;
173 HashJoinTable hashtable;
174 TupleTableSlot *outerTupleSlot;
175 uint32 hashvalue;
176 int batchno;
177 ParallelHashJoinState *parallel_state;
180 * get information from HashJoin node
182 joinqual = node->js.joinqual;
183 otherqual = node->js.ps.qual;
184 hashNode = (HashState *) innerPlanState(node);
185 outerNode = outerPlanState(node);
186 hashtable = node->hj_HashTable;
187 econtext = node->js.ps.ps_ExprContext;
188 parallel_state = hashNode->parallel_state;
191 * Reset per-tuple memory context to free any expression evaluation
192 * storage allocated in the previous tuple cycle.
194 ResetExprContext(econtext);
197 * run the hash join state machine
199 for (;;)
202 * It's possible to iterate this loop many times before returning a
203 * tuple, in some pathological cases such as needing to move much of
204 * the current batch to a later batch. So let's check for interrupts
205 * each time through.
207 CHECK_FOR_INTERRUPTS();
209 switch (node->hj_JoinState)
211 case HJ_BUILD_HASHTABLE:
214 * First time through: build hash table for inner relation.
216 Assert(hashtable == NULL);
219 * If the outer relation is completely empty, and it's not
220 * right/full join, we can quit without building the hash
221 * table. However, for an inner join it is only a win to
222 * check this when the outer relation's startup cost is less
223 * than the projected cost of building the hash table.
224 * Otherwise it's best to build the hash table first and see
225 * if the inner relation is empty. (When it's a left join, we
226 * should always make this check, since we aren't going to be
227 * able to skip the join on the strength of an empty inner
228 * relation anyway.)
230 * If we are rescanning the join, we make use of information
231 * gained on the previous scan: don't bother to try the
232 * prefetch if the previous scan found the outer relation
233 * nonempty. This is not 100% reliable since with new
234 * parameters the outer relation might yield different
235 * results, but it's a good heuristic.
237 * The only way to make the check is to try to fetch a tuple
238 * from the outer plan node. If we succeed, we have to stash
239 * it away for later consumption by ExecHashJoinOuterGetTuple.
241 if (HJ_FILL_INNER(node))
243 /* no chance to not build the hash table */
244 node->hj_FirstOuterTupleSlot = NULL;
246 else if (parallel)
249 * The empty-outer optimization is not implemented for
250 * shared hash tables, because no one participant can
251 * determine that there are no outer tuples, and it's not
252 * yet clear that it's worth the synchronization overhead
253 * of reaching consensus to figure that out. So we have
254 * to build the hash table.
256 node->hj_FirstOuterTupleSlot = NULL;
258 else if (HJ_FILL_OUTER(node) ||
259 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
260 !node->hj_OuterNotEmpty))
262 node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
263 if (TupIsNull(node->hj_FirstOuterTupleSlot))
265 node->hj_OuterNotEmpty = false;
266 return NULL;
268 else
269 node->hj_OuterNotEmpty = true;
271 else
272 node->hj_FirstOuterTupleSlot = NULL;
275 * Create the hash table. If using Parallel Hash, then
276 * whoever gets here first will create the hash table and any
277 * later arrivals will merely attach to it.
279 hashtable = ExecHashTableCreate(hashNode,
280 node->hj_HashOperators,
281 node->hj_Collations,
282 HJ_FILL_INNER(node));
283 node->hj_HashTable = hashtable;
286 * Execute the Hash node, to build the hash table. If using
287 * Parallel Hash, then we'll try to help hashing unless we
288 * arrived too late.
290 hashNode->hashtable = hashtable;
291 (void) MultiExecProcNode((PlanState *) hashNode);
294 * If the inner relation is completely empty, and we're not
295 * doing a left outer join, we can quit without scanning the
296 * outer relation.
298 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
299 return NULL;
302 * need to remember whether nbatch has increased since we
303 * began scanning the outer relation
305 hashtable->nbatch_outstart = hashtable->nbatch;
308 * Reset OuterNotEmpty for scan. (It's OK if we fetched a
309 * tuple above, because ExecHashJoinOuterGetTuple will
310 * immediately set it again.)
312 node->hj_OuterNotEmpty = false;
314 if (parallel)
316 Barrier *build_barrier;
318 build_barrier = &parallel_state->build_barrier;
319 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
320 BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
321 if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
324 * If multi-batch, we need to hash the outer relation
325 * up front.
327 if (hashtable->nbatch > 1)
328 ExecParallelHashJoinPartitionOuter(node);
329 BarrierArriveAndWait(build_barrier,
330 WAIT_EVENT_HASH_BUILD_HASH_OUTER);
332 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
334 /* Each backend should now select a batch to work on. */
335 hashtable->curbatch = -1;
336 node->hj_JoinState = HJ_NEED_NEW_BATCH;
338 continue;
340 else
341 node->hj_JoinState = HJ_NEED_NEW_OUTER;
343 /* FALL THRU */
345 case HJ_NEED_NEW_OUTER:
348 * We don't have an outer tuple, try to get the next one
350 if (parallel)
351 outerTupleSlot =
352 ExecParallelHashJoinOuterGetTuple(outerNode, node,
353 &hashvalue);
354 else
355 outerTupleSlot =
356 ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
358 if (TupIsNull(outerTupleSlot))
360 /* end of batch, or maybe whole join */
361 if (HJ_FILL_INNER(node))
363 /* set up to scan for unmatched inner tuples */
364 ExecPrepHashTableForUnmatched(node);
365 node->hj_JoinState = HJ_FILL_INNER_TUPLES;
367 else
368 node->hj_JoinState = HJ_NEED_NEW_BATCH;
369 continue;
372 econtext->ecxt_outertuple = outerTupleSlot;
373 node->hj_MatchedOuter = false;
376 * Find the corresponding bucket for this tuple in the main
377 * hash table or skew hash table.
379 node->hj_CurHashValue = hashvalue;
380 ExecHashGetBucketAndBatch(hashtable, hashvalue,
381 &node->hj_CurBucketNo, &batchno);
382 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
383 hashvalue);
384 node->hj_CurTuple = NULL;
387 * The tuple might not belong to the current batch (where
388 * "current batch" includes the skew buckets if any).
390 if (batchno != hashtable->curbatch &&
391 node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
393 bool shouldFree;
394 MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
395 &shouldFree);
398 * Need to postpone this outer tuple to a later batch.
399 * Save it in the corresponding outer-batch file.
401 Assert(parallel_state == NULL);
402 Assert(batchno > hashtable->curbatch);
403 ExecHashJoinSaveTuple(mintuple, hashvalue,
404 &hashtable->outerBatchFile[batchno]);
406 if (shouldFree)
407 heap_free_minimal_tuple(mintuple);
409 /* Loop around, staying in HJ_NEED_NEW_OUTER state */
410 continue;
413 /* OK, let's scan the bucket for matches */
414 node->hj_JoinState = HJ_SCAN_BUCKET;
416 /* FALL THRU */
418 case HJ_SCAN_BUCKET:
421 * Scan the selected hash bucket for matches to current outer
423 if (parallel)
425 if (!ExecParallelScanHashBucket(node, econtext))
427 /* out of matches; check for possible outer-join fill */
428 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
429 continue;
432 else
434 if (!ExecScanHashBucket(node, econtext))
436 /* out of matches; check for possible outer-join fill */
437 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
438 continue;
443 * We've got a match, but still need to test non-hashed quals.
444 * ExecScanHashBucket already set up all the state needed to
445 * call ExecQual.
447 * If we pass the qual, then save state for next call and have
448 * ExecProject form the projection, store it in the tuple
449 * table, and return the slot.
451 * Only the joinquals determine tuple match status, but all
452 * quals must pass to actually return the tuple.
454 if (joinqual == NULL || ExecQual(joinqual, econtext))
456 node->hj_MatchedOuter = true;
458 if (parallel)
461 * Full/right outer joins are currently not supported
462 * for parallel joins, so we don't need to set the
463 * match bit. Experiments show that it's worth
464 * avoiding the shared memory traffic on large
465 * systems.
467 Assert(!HJ_FILL_INNER(node));
469 else
472 * This is really only needed if HJ_FILL_INNER(node),
473 * but we'll avoid the branch and just set it always.
475 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
478 /* In an antijoin, we never return a matched tuple */
479 if (node->js.jointype == JOIN_ANTI)
481 node->hj_JoinState = HJ_NEED_NEW_OUTER;
482 continue;
486 * If we only need to join to the first matching inner
487 * tuple, then consider returning this one, but after that
488 * continue with next outer tuple.
490 if (node->js.single_match)
491 node->hj_JoinState = HJ_NEED_NEW_OUTER;
493 if (otherqual == NULL || ExecQual(otherqual, econtext))
494 return ExecProject(node->js.ps.ps_ProjInfo);
495 else
496 InstrCountFiltered2(node, 1);
498 else
499 InstrCountFiltered1(node, 1);
500 break;
502 case HJ_FILL_OUTER_TUPLE:
505 * The current outer tuple has run out of matches, so check
506 * whether to emit a dummy outer-join tuple. Whether we emit
507 * one or not, the next state is NEED_NEW_OUTER.
509 node->hj_JoinState = HJ_NEED_NEW_OUTER;
511 if (!node->hj_MatchedOuter &&
512 HJ_FILL_OUTER(node))
515 * Generate a fake join tuple with nulls for the inner
516 * tuple, and return it if it passes the non-join quals.
518 econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
520 if (otherqual == NULL || ExecQual(otherqual, econtext))
521 return ExecProject(node->js.ps.ps_ProjInfo);
522 else
523 InstrCountFiltered2(node, 1);
525 break;
527 case HJ_FILL_INNER_TUPLES:
530 * We have finished a batch, but we are doing right/full join,
531 * so any unmatched inner tuples in the hashtable have to be
532 * emitted before we continue to the next batch.
534 if (!ExecScanHashTableForUnmatched(node, econtext))
536 /* no more unmatched tuples */
537 node->hj_JoinState = HJ_NEED_NEW_BATCH;
538 continue;
542 * Generate a fake join tuple with nulls for the outer tuple,
543 * and return it if it passes the non-join quals.
545 econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
547 if (otherqual == NULL || ExecQual(otherqual, econtext))
548 return ExecProject(node->js.ps.ps_ProjInfo);
549 else
550 InstrCountFiltered2(node, 1);
551 break;
553 case HJ_NEED_NEW_BATCH:
556 * Try to advance to next batch. Done if there are no more.
558 if (parallel)
560 if (!ExecParallelHashJoinNewBatch(node))
561 return NULL; /* end of parallel-aware join */
563 else
565 if (!ExecHashJoinNewBatch(node))
566 return NULL; /* end of parallel-oblivious join */
568 node->hj_JoinState = HJ_NEED_NEW_OUTER;
569 break;
571 default:
572 elog(ERROR, "unrecognized hashjoin state: %d",
573 (int) node->hj_JoinState);
578 /* ----------------------------------------------------------------
579 * ExecHashJoin
581 * Parallel-oblivious version.
582 * ----------------------------------------------------------------
584 static TupleTableSlot * /* return: a tuple or NULL */
585 ExecHashJoin(PlanState *pstate)
588 * On sufficiently smart compilers this should be inlined with the
589 * parallel-aware branches removed.
591 return ExecHashJoinImpl(pstate, false);
594 /* ----------------------------------------------------------------
595 * ExecParallelHashJoin
597 * Parallel-aware version.
598 * ----------------------------------------------------------------
600 static TupleTableSlot * /* return: a tuple or NULL */
601 ExecParallelHashJoin(PlanState *pstate)
604 * On sufficiently smart compilers this should be inlined with the
605 * parallel-oblivious branches removed.
607 return ExecHashJoinImpl(pstate, true);
610 /* ----------------------------------------------------------------
611 * ExecInitHashJoin
613 * Init routine for HashJoin node.
614 * ----------------------------------------------------------------
616 HashJoinState *
617 ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
619 HashJoinState *hjstate;
620 Plan *outerNode;
621 Hash *hashNode;
622 TupleDesc outerDesc,
623 innerDesc;
624 const TupleTableSlotOps *ops;
626 /* check for unsupported flags */
627 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
630 * create state structure
632 hjstate = makeNode(HashJoinState);
633 hjstate->js.ps.plan = (Plan *) node;
634 hjstate->js.ps.state = estate;
637 * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
638 * where this function may be replaced with a parallel version, if we
639 * managed to launch a parallel query.
641 hjstate->js.ps.ExecProcNode = ExecHashJoin;
642 hjstate->js.jointype = node->join.jointype;
645 * Miscellaneous initialization
647 * create expression context for node
649 ExecAssignExprContext(estate, &hjstate->js.ps);
652 * initialize child nodes
654 * Note: we could suppress the REWIND flag for the inner input, which
655 * would amount to betting that the hash will be a single batch. Not
656 * clear if this would be a win or not.
658 outerNode = outerPlan(node);
659 hashNode = (Hash *) innerPlan(node);
661 outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
662 outerDesc = ExecGetResultType(outerPlanState(hjstate));
663 innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
664 innerDesc = ExecGetResultType(innerPlanState(hjstate));
667 * Initialize result slot, type and projection.
669 ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
670 ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
673 * tuple table initialization
675 ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
676 hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
677 ops);
680 * detect whether we need only consider the first matching inner tuple
682 hjstate->js.single_match = (node->join.inner_unique ||
683 node->join.jointype == JOIN_SEMI);
685 /* set up null tuples for outer joins, if needed */
686 switch (node->join.jointype)
688 case JOIN_INNER:
689 case JOIN_SEMI:
690 break;
691 case JOIN_LEFT:
692 case JOIN_ANTI:
693 hjstate->hj_NullInnerTupleSlot =
694 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
695 break;
696 case JOIN_RIGHT:
697 hjstate->hj_NullOuterTupleSlot =
698 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
699 break;
700 case JOIN_FULL:
701 hjstate->hj_NullOuterTupleSlot =
702 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
703 hjstate->hj_NullInnerTupleSlot =
704 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
705 break;
706 default:
707 elog(ERROR, "unrecognized join type: %d",
708 (int) node->join.jointype);
712 * now for some voodoo. our temporary tuple slot is actually the result
713 * tuple slot of the Hash node (which is our inner plan). we can do this
714 * because Hash nodes don't return tuples via ExecProcNode() -- instead
715 * the hash join node uses ExecScanHashBucket() to get at the contents of
716 * the hash table. -cim 6/9/91
719 HashState *hashstate = (HashState *) innerPlanState(hjstate);
720 TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
722 hjstate->hj_HashTupleSlot = slot;
726 * initialize child expressions
728 hjstate->js.ps.qual =
729 ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
730 hjstate->js.joinqual =
731 ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
732 hjstate->hashclauses =
733 ExecInitQual(node->hashclauses, (PlanState *) hjstate);
736 * initialize hash-specific info
738 hjstate->hj_HashTable = NULL;
739 hjstate->hj_FirstOuterTupleSlot = NULL;
741 hjstate->hj_CurHashValue = 0;
742 hjstate->hj_CurBucketNo = 0;
743 hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
744 hjstate->hj_CurTuple = NULL;
746 hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
747 (PlanState *) hjstate);
748 hjstate->hj_HashOperators = node->hashoperators;
749 hjstate->hj_Collations = node->hashcollations;
751 hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
752 hjstate->hj_MatchedOuter = false;
753 hjstate->hj_OuterNotEmpty = false;
755 return hjstate;
758 /* ----------------------------------------------------------------
759 * ExecEndHashJoin
761 * clean up routine for HashJoin node
762 * ----------------------------------------------------------------
764 void
765 ExecEndHashJoin(HashJoinState *node)
768 * Free hash table
770 if (node->hj_HashTable)
772 ExecHashTableDestroy(node->hj_HashTable);
773 node->hj_HashTable = NULL;
777 * Free the exprcontext
779 ExecFreeExprContext(&node->js.ps);
782 * clean out the tuple table
784 ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
785 ExecClearTuple(node->hj_OuterTupleSlot);
786 ExecClearTuple(node->hj_HashTupleSlot);
789 * clean up subtrees
791 ExecEndNode(outerPlanState(node));
792 ExecEndNode(innerPlanState(node));
796 * ExecHashJoinOuterGetTuple
798 * get the next outer tuple for a parallel oblivious hashjoin: either by
799 * executing the outer plan node in the first pass, or from the temp
800 * files for the hashjoin batches.
802 * Returns a null slot if no more outer tuples (within the current batch).
804 * On success, the tuple's hash value is stored at *hashvalue --- this is
805 * either originally computed, or re-read from the temp file.
807 static TupleTableSlot *
808 ExecHashJoinOuterGetTuple(PlanState *outerNode,
809 HashJoinState *hjstate,
810 uint32 *hashvalue)
812 HashJoinTable hashtable = hjstate->hj_HashTable;
813 int curbatch = hashtable->curbatch;
814 TupleTableSlot *slot;
816 if (curbatch == 0) /* if it is the first pass */
819 * Check to see if first outer tuple was already fetched by
820 * ExecHashJoin() and not used yet.
822 slot = hjstate->hj_FirstOuterTupleSlot;
823 if (!TupIsNull(slot))
824 hjstate->hj_FirstOuterTupleSlot = NULL;
825 else
826 slot = ExecProcNode(outerNode);
828 while (!TupIsNull(slot))
831 * We have to compute the tuple's hash value.
833 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
835 econtext->ecxt_outertuple = slot;
836 if (ExecHashGetHashValue(hashtable, econtext,
837 hjstate->hj_OuterHashKeys,
838 true, /* outer tuple */
839 HJ_FILL_OUTER(hjstate),
840 hashvalue))
842 /* remember outer relation is not empty for possible rescan */
843 hjstate->hj_OuterNotEmpty = true;
845 return slot;
849 * That tuple couldn't match because of a NULL, so discard it and
850 * continue with the next one.
852 slot = ExecProcNode(outerNode);
855 else if (curbatch < hashtable->nbatch)
857 BufFile *file = hashtable->outerBatchFile[curbatch];
860 * In outer-join cases, we could get here even though the batch file
861 * is empty.
863 if (file == NULL)
864 return NULL;
866 slot = ExecHashJoinGetSavedTuple(hjstate,
867 file,
868 hashvalue,
869 hjstate->hj_OuterTupleSlot);
870 if (!TupIsNull(slot))
871 return slot;
874 /* End of this batch */
875 return NULL;
879 * ExecHashJoinOuterGetTuple variant for the parallel case.
881 static TupleTableSlot *
882 ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
883 HashJoinState *hjstate,
884 uint32 *hashvalue)
886 HashJoinTable hashtable = hjstate->hj_HashTable;
887 int curbatch = hashtable->curbatch;
888 TupleTableSlot *slot;
891 * In the Parallel Hash case we only run the outer plan directly for
892 * single-batch hash joins. Otherwise we have to go to batch files, even
893 * for batch 0.
895 if (curbatch == 0 && hashtable->nbatch == 1)
897 slot = ExecProcNode(outerNode);
899 while (!TupIsNull(slot))
901 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
903 econtext->ecxt_outertuple = slot;
904 if (ExecHashGetHashValue(hashtable, econtext,
905 hjstate->hj_OuterHashKeys,
906 true, /* outer tuple */
907 HJ_FILL_OUTER(hjstate),
908 hashvalue))
909 return slot;
912 * That tuple couldn't match because of a NULL, so discard it and
913 * continue with the next one.
915 slot = ExecProcNode(outerNode);
918 else if (curbatch < hashtable->nbatch)
920 MinimalTuple tuple;
922 tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
923 hashvalue);
924 if (tuple != NULL)
926 ExecForceStoreMinimalTuple(tuple,
927 hjstate->hj_OuterTupleSlot,
928 false);
929 slot = hjstate->hj_OuterTupleSlot;
930 return slot;
932 else
933 ExecClearTuple(hjstate->hj_OuterTupleSlot);
936 /* End of this batch */
937 return NULL;
941 * ExecHashJoinNewBatch
942 * switch to a new hashjoin batch
944 * Returns true if successful, false if there are no more batches.
946 static bool
947 ExecHashJoinNewBatch(HashJoinState *hjstate)
949 HashJoinTable hashtable = hjstate->hj_HashTable;
950 int nbatch;
951 int curbatch;
952 BufFile *innerFile;
953 TupleTableSlot *slot;
954 uint32 hashvalue;
956 nbatch = hashtable->nbatch;
957 curbatch = hashtable->curbatch;
959 if (curbatch > 0)
962 * We no longer need the previous outer batch file; close it right
963 * away to free disk space.
965 if (hashtable->outerBatchFile[curbatch])
966 BufFileClose(hashtable->outerBatchFile[curbatch]);
967 hashtable->outerBatchFile[curbatch] = NULL;
969 else /* we just finished the first batch */
972 * Reset some of the skew optimization state variables, since we no
973 * longer need to consider skew tuples after the first batch. The
974 * memory context reset we are about to do will release the skew
975 * hashtable itself.
977 hashtable->skewEnabled = false;
978 hashtable->skewBucket = NULL;
979 hashtable->skewBucketNums = NULL;
980 hashtable->nSkewBuckets = 0;
981 hashtable->spaceUsedSkew = 0;
985 * We can always skip over any batches that are completely empty on both
986 * sides. We can sometimes skip over batches that are empty on only one
987 * side, but there are exceptions:
989 * 1. In a left/full outer join, we have to process outer batches even if
990 * the inner batch is empty. Similarly, in a right/full outer join, we
991 * have to process inner batches even if the outer batch is empty.
993 * 2. If we have increased nbatch since the initial estimate, we have to
994 * scan inner batches since they might contain tuples that need to be
995 * reassigned to later inner batches.
997 * 3. Similarly, if we have increased nbatch since starting the outer
998 * scan, we have to rescan outer batches in case they contain tuples that
999 * need to be reassigned.
1001 curbatch++;
1002 while (curbatch < nbatch &&
1003 (hashtable->outerBatchFile[curbatch] == NULL ||
1004 hashtable->innerBatchFile[curbatch] == NULL))
1006 if (hashtable->outerBatchFile[curbatch] &&
1007 HJ_FILL_OUTER(hjstate))
1008 break; /* must process due to rule 1 */
1009 if (hashtable->innerBatchFile[curbatch] &&
1010 HJ_FILL_INNER(hjstate))
1011 break; /* must process due to rule 1 */
1012 if (hashtable->innerBatchFile[curbatch] &&
1013 nbatch != hashtable->nbatch_original)
1014 break; /* must process due to rule 2 */
1015 if (hashtable->outerBatchFile[curbatch] &&
1016 nbatch != hashtable->nbatch_outstart)
1017 break; /* must process due to rule 3 */
1018 /* We can ignore this batch. */
1019 /* Release associated temp files right away. */
1020 if (hashtable->innerBatchFile[curbatch])
1021 BufFileClose(hashtable->innerBatchFile[curbatch]);
1022 hashtable->innerBatchFile[curbatch] = NULL;
1023 if (hashtable->outerBatchFile[curbatch])
1024 BufFileClose(hashtable->outerBatchFile[curbatch]);
1025 hashtable->outerBatchFile[curbatch] = NULL;
1026 curbatch++;
1029 if (curbatch >= nbatch)
1030 return false; /* no more batches */
1032 hashtable->curbatch = curbatch;
1035 * Reload the hash table with the new inner batch (which could be empty)
1037 ExecHashTableReset(hashtable);
1039 innerFile = hashtable->innerBatchFile[curbatch];
1041 if (innerFile != NULL)
1043 if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
1044 ereport(ERROR,
1045 (errcode_for_file_access(),
1046 errmsg("could not rewind hash-join temporary file")));
1048 while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1049 innerFile,
1050 &hashvalue,
1051 hjstate->hj_HashTupleSlot)))
1054 * NOTE: some tuples may be sent to future batches. Also, it is
1055 * possible for hashtable->nbatch to be increased here!
1057 ExecHashTableInsert(hashtable, slot, hashvalue);
1061 * after we build the hash table, the inner batch file is no longer
1062 * needed
1064 BufFileClose(innerFile);
1065 hashtable->innerBatchFile[curbatch] = NULL;
1069 * Rewind outer batch file (if present), so that we can start reading it.
1071 if (hashtable->outerBatchFile[curbatch] != NULL)
1073 if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
1074 ereport(ERROR,
1075 (errcode_for_file_access(),
1076 errmsg("could not rewind hash-join temporary file")));
1079 return true;
1083 * Choose a batch to work on, and attach to it. Returns true if successful,
1084 * false if there are no more batches.
1086 static bool
1087 ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1089 HashJoinTable hashtable = hjstate->hj_HashTable;
1090 int start_batchno;
1091 int batchno;
1094 * If we started up so late that the batch tracking array has been freed
1095 * already by ExecHashTableDetach(), then we are finished. See also
1096 * ExecParallelHashEnsureBatchAccessors().
1098 if (hashtable->batches == NULL)
1099 return false;
1102 * If we were already attached to a batch, remember not to bother checking
1103 * it again, and detach from it (possibly freeing the hash table if we are
1104 * last to detach).
1106 if (hashtable->curbatch >= 0)
1108 hashtable->batches[hashtable->curbatch].done = true;
1109 ExecHashTableDetachBatch(hashtable);
1113 * Search for a batch that isn't done. We use an atomic counter to start
1114 * our search at a different batch in every participant when there are
1115 * more batches than participants.
1117 batchno = start_batchno =
1118 pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1119 hashtable->nbatch;
1122 uint32 hashvalue;
1123 MinimalTuple tuple;
1124 TupleTableSlot *slot;
1126 if (!hashtable->batches[batchno].done)
1128 SharedTuplestoreAccessor *inner_tuples;
1129 Barrier *batch_barrier =
1130 &hashtable->batches[batchno].shared->batch_barrier;
1132 switch (BarrierAttach(batch_barrier))
1134 case PHJ_BATCH_ELECTING:
1136 /* One backend allocates the hash table. */
1137 if (BarrierArriveAndWait(batch_barrier,
1138 WAIT_EVENT_HASH_BATCH_ELECT))
1139 ExecParallelHashTableAlloc(hashtable, batchno);
1140 /* Fall through. */
1142 case PHJ_BATCH_ALLOCATING:
1143 /* Wait for allocation to complete. */
1144 BarrierArriveAndWait(batch_barrier,
1145 WAIT_EVENT_HASH_BATCH_ALLOCATE);
1146 /* Fall through. */
1148 case PHJ_BATCH_LOADING:
1149 /* Start (or join in) loading tuples. */
1150 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1151 inner_tuples = hashtable->batches[batchno].inner_tuples;
1152 sts_begin_parallel_scan(inner_tuples);
1153 while ((tuple = sts_parallel_scan_next(inner_tuples,
1154 &hashvalue)))
1156 ExecForceStoreMinimalTuple(tuple,
1157 hjstate->hj_HashTupleSlot,
1158 false);
1159 slot = hjstate->hj_HashTupleSlot;
1160 ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1161 hashvalue);
1163 sts_end_parallel_scan(inner_tuples);
1164 BarrierArriveAndWait(batch_barrier,
1165 WAIT_EVENT_HASH_BATCH_LOAD);
1166 /* Fall through. */
1168 case PHJ_BATCH_PROBING:
1171 * This batch is ready to probe. Return control to
1172 * caller. We stay attached to batch_barrier so that the
1173 * hash table stays alive until everyone's finished
1174 * probing it, but no participant is allowed to wait at
1175 * this barrier again (or else a deadlock could occur).
1176 * All attached participants must eventually call
1177 * BarrierArriveAndDetach() so that the final phase
1178 * PHJ_BATCH_DONE can be reached.
1180 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1181 sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1182 return true;
1184 case PHJ_BATCH_DONE:
1187 * Already done. Detach and go around again (if any
1188 * remain).
1190 BarrierDetach(batch_barrier);
1191 hashtable->batches[batchno].done = true;
1192 hashtable->curbatch = -1;
1193 break;
1195 default:
1196 elog(ERROR, "unexpected batch phase %d",
1197 BarrierPhase(batch_barrier));
1200 batchno = (batchno + 1) % hashtable->nbatch;
1201 } while (batchno != start_batchno);
1203 return false;
1207 * ExecHashJoinSaveTuple
1208 * save a tuple to a batch file.
1210 * The data recorded in the file for each tuple is its hash value,
1211 * then the tuple in MinimalTuple format.
1213 * Note: it is important always to call this in the regular executor
1214 * context, not in a shorter-lived context; else the temp file buffers
1215 * will get messed up.
1217 void
1218 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1219 BufFile **fileptr)
1221 BufFile *file = *fileptr;
1223 if (file == NULL)
1225 /* First write to this batch file, so open it. */
1226 file = BufFileCreateTemp(false);
1227 *fileptr = file;
1230 BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
1231 BufFileWrite(file, (void *) tuple, tuple->t_len);
1235 * ExecHashJoinGetSavedTuple
1236 * read the next tuple from a batch file. Return NULL if no more.
1238 * On success, *hashvalue is set to the tuple's hash value, and the tuple
1239 * itself is stored in the given slot.
1241 static TupleTableSlot *
1242 ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1243 BufFile *file,
1244 uint32 *hashvalue,
1245 TupleTableSlot *tupleSlot)
1247 uint32 header[2];
1248 size_t nread;
1249 MinimalTuple tuple;
1252 * We check for interrupts here because this is typically taken as an
1253 * alternative code path to an ExecProcNode() call, which would include
1254 * such a check.
1256 CHECK_FOR_INTERRUPTS();
1259 * Since both the hash value and the MinimalTuple length word are uint32,
1260 * we can read them both in one BufFileRead() call without any type
1261 * cheating.
1263 nread = BufFileRead(file, (void *) header, sizeof(header));
1264 if (nread == 0) /* end of file */
1266 ExecClearTuple(tupleSlot);
1267 return NULL;
1269 if (nread != sizeof(header))
1270 ereport(ERROR,
1271 (errcode_for_file_access(),
1272 errmsg("could not read from hash-join temporary file: read only %zu of %zu bytes",
1273 nread, sizeof(header))));
1274 *hashvalue = header[0];
1275 tuple = (MinimalTuple) palloc(header[1]);
1276 tuple->t_len = header[1];
1277 nread = BufFileRead(file,
1278 (void *) ((char *) tuple + sizeof(uint32)),
1279 header[1] - sizeof(uint32));
1280 if (nread != header[1] - sizeof(uint32))
1281 ereport(ERROR,
1282 (errcode_for_file_access(),
1283 errmsg("could not read from hash-join temporary file: read only %zu of %zu bytes",
1284 nread, header[1] - sizeof(uint32))));
1285 ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1286 return tupleSlot;
1290 void
1291 ExecReScanHashJoin(HashJoinState *node)
1293 PlanState *outerPlan = outerPlanState(node);
1294 PlanState *innerPlan = innerPlanState(node);
1297 * In a multi-batch join, we currently have to do rescans the hard way,
1298 * primarily because batch temp files may have already been released. But
1299 * if it's a single-batch join, and there is no parameter change for the
1300 * inner subnode, then we can just re-use the existing hash table without
1301 * rebuilding it.
1303 if (node->hj_HashTable != NULL)
1305 if (node->hj_HashTable->nbatch == 1 &&
1306 innerPlan->chgParam == NULL)
1309 * Okay to reuse the hash table; needn't rescan inner, either.
1311 * However, if it's a right/full join, we'd better reset the
1312 * inner-tuple match flags contained in the table.
1314 if (HJ_FILL_INNER(node))
1315 ExecHashTableResetMatchFlags(node->hj_HashTable);
1318 * Also, we need to reset our state about the emptiness of the
1319 * outer relation, so that the new scan of the outer will update
1320 * it correctly if it turns out to be empty this time. (There's no
1321 * harm in clearing it now because ExecHashJoin won't need the
1322 * info. In the other cases, where the hash table doesn't exist
1323 * or we are destroying it, we leave this state alone because
1324 * ExecHashJoin will need it the first time through.)
1326 node->hj_OuterNotEmpty = false;
1328 /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1329 node->hj_JoinState = HJ_NEED_NEW_OUTER;
1331 else
1333 /* must destroy and rebuild hash table */
1334 HashState *hashNode = castNode(HashState, innerPlan);
1336 Assert(hashNode->hashtable == node->hj_HashTable);
1337 /* accumulate stats from old hash table, if wanted */
1338 /* (this should match ExecShutdownHash) */
1339 if (hashNode->ps.instrument && !hashNode->hinstrument)
1340 hashNode->hinstrument = (HashInstrumentation *)
1341 palloc0(sizeof(HashInstrumentation));
1342 if (hashNode->hinstrument)
1343 ExecHashAccumInstrumentation(hashNode->hinstrument,
1344 hashNode->hashtable);
1345 /* for safety, be sure to clear child plan node's pointer too */
1346 hashNode->hashtable = NULL;
1348 ExecHashTableDestroy(node->hj_HashTable);
1349 node->hj_HashTable = NULL;
1350 node->hj_JoinState = HJ_BUILD_HASHTABLE;
1353 * if chgParam of subnode is not null then plan will be re-scanned
1354 * by first ExecProcNode.
1356 if (innerPlan->chgParam == NULL)
1357 ExecReScan(innerPlan);
1361 /* Always reset intra-tuple state */
1362 node->hj_CurHashValue = 0;
1363 node->hj_CurBucketNo = 0;
1364 node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1365 node->hj_CurTuple = NULL;
1367 node->hj_MatchedOuter = false;
1368 node->hj_FirstOuterTupleSlot = NULL;
1371 * if chgParam of subnode is not null then plan will be re-scanned by
1372 * first ExecProcNode.
1374 if (outerPlan->chgParam == NULL)
1375 ExecReScan(outerPlan);
1378 void
1379 ExecShutdownHashJoin(HashJoinState *node)
1381 if (node->hj_HashTable)
1384 * Detach from shared state before DSM memory goes away. This makes
1385 * sure that we don't have any pointers into DSM memory by the time
1386 * ExecEndHashJoin runs.
1388 ExecHashTableDetachBatch(node->hj_HashTable);
1389 ExecHashTableDetach(node->hj_HashTable);
1393 static void
1394 ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1396 PlanState *outerState = outerPlanState(hjstate);
1397 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1398 HashJoinTable hashtable = hjstate->hj_HashTable;
1399 TupleTableSlot *slot;
1400 uint32 hashvalue;
1401 int i;
1403 Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1405 /* Execute outer plan, writing all tuples to shared tuplestores. */
1406 for (;;)
1408 slot = ExecProcNode(outerState);
1409 if (TupIsNull(slot))
1410 break;
1411 econtext->ecxt_outertuple = slot;
1412 if (ExecHashGetHashValue(hashtable, econtext,
1413 hjstate->hj_OuterHashKeys,
1414 true, /* outer tuple */
1415 HJ_FILL_OUTER(hjstate),
1416 &hashvalue))
1418 int batchno;
1419 int bucketno;
1420 bool shouldFree;
1421 MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1423 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1424 &batchno);
1425 sts_puttuple(hashtable->batches[batchno].outer_tuples,
1426 &hashvalue, mintup);
1428 if (shouldFree)
1429 heap_free_minimal_tuple(mintup);
1431 CHECK_FOR_INTERRUPTS();
1434 /* Make sure all outer partitions are readable by any backend. */
1435 for (i = 0; i < hashtable->nbatch; ++i)
1436 sts_end_write(hashtable->batches[i].outer_tuples);
1439 void
1440 ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1442 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1443 shm_toc_estimate_keys(&pcxt->estimator, 1);
1446 void
1447 ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1449 int plan_node_id = state->js.ps.plan->plan_node_id;
1450 HashState *hashNode;
1451 ParallelHashJoinState *pstate;
1454 * Disable shared hash table mode if we failed to create a real DSM
1455 * segment, because that means that we don't have a DSA area to work with.
1457 if (pcxt->seg == NULL)
1458 return;
1460 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1463 * Set up the state needed to coordinate access to the shared hash
1464 * table(s), using the plan node ID as the toc key.
1466 pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1467 shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1470 * Set up the shared hash join state with no batches initially.
1471 * ExecHashTableCreate() will prepare at least one later and set nbatch
1472 * and space_allowed.
1474 pstate->nbatch = 0;
1475 pstate->space_allowed = 0;
1476 pstate->batches = InvalidDsaPointer;
1477 pstate->old_batches = InvalidDsaPointer;
1478 pstate->nbuckets = 0;
1479 pstate->growth = PHJ_GROWTH_OK;
1480 pstate->chunk_work_queue = InvalidDsaPointer;
1481 pg_atomic_init_u32(&pstate->distributor, 0);
1482 pstate->nparticipants = pcxt->nworkers + 1;
1483 pstate->total_tuples = 0;
1484 LWLockInitialize(&pstate->lock,
1485 LWTRANCHE_PARALLEL_HASH_JOIN);
1486 BarrierInit(&pstate->build_barrier, 0);
1487 BarrierInit(&pstate->grow_batches_barrier, 0);
1488 BarrierInit(&pstate->grow_buckets_barrier, 0);
1490 /* Set up the space we'll use for shared temporary files. */
1491 SharedFileSetInit(&pstate->fileset, pcxt->seg);
1493 /* Initialize the shared state in the hash node. */
1494 hashNode = (HashState *) innerPlanState(state);
1495 hashNode->parallel_state = pstate;
1498 /* ----------------------------------------------------------------
1499 * ExecHashJoinReInitializeDSM
1501 * Reset shared state before beginning a fresh scan.
1502 * ----------------------------------------------------------------
1504 void
1505 ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1507 int plan_node_id = state->js.ps.plan->plan_node_id;
1508 ParallelHashJoinState *pstate =
1509 shm_toc_lookup(pcxt->toc, plan_node_id, false);
1512 * It would be possible to reuse the shared hash table in single-batch
1513 * cases by resetting and then fast-forwarding build_barrier to
1514 * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1515 * currently shared hash tables are already freed by now (by the last
1516 * participant to detach from the batch). We could consider keeping it
1517 * around for single-batch joins. We'd also need to adjust
1518 * finalize_plan() so that it doesn't record a dummy dependency for
1519 * Parallel Hash nodes, preventing the rescan optimization. For now we
1520 * don't try.
1523 /* Detach, freeing any remaining shared memory. */
1524 if (state->hj_HashTable != NULL)
1526 ExecHashTableDetachBatch(state->hj_HashTable);
1527 ExecHashTableDetach(state->hj_HashTable);
1530 /* Clear any shared batch files. */
1531 SharedFileSetDeleteAll(&pstate->fileset);
1533 /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1534 BarrierInit(&pstate->build_barrier, 0);
1537 void
1538 ExecHashJoinInitializeWorker(HashJoinState *state,
1539 ParallelWorkerContext *pwcxt)
1541 HashState *hashNode;
1542 int plan_node_id = state->js.ps.plan->plan_node_id;
1543 ParallelHashJoinState *pstate =
1544 shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1546 /* Attach to the space for shared temporary files. */
1547 SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1549 /* Attach to the shared state in the hash node. */
1550 hashNode = (HashState *) innerPlanState(state);
1551 hashNode->parallel_state = pstate;
1553 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);