1 /*-------------------------------------------------------------------------
4 * Routines to handle hash join nodes
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeHashjoin.c
15 * Hash joins can participate in parallel query execution in several ways. A
16 * parallel-oblivious hash join is one where the node is unaware that it is
17 * part of a parallel plan. In this case, a copy of the inner plan is used to
18 * build a copy of the hash table in every backend, and the outer plan could
19 * either be built from a partial or complete path, so that the results of the
20 * hash join are correspondingly either partial or complete. A parallel-aware
21 * hash join is one that behaves differently, coordinating work between
22 * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23 * Hash Join always appears with a Parallel Hash node.
25 * Parallel-aware hash joins use the same per-backend state machine to track
26 * progress through the hash join algorithm as parallel-oblivious hash joins.
27 * In a parallel-aware hash join, there is also a shared state machine that
28 * co-operating backends use to synchronize their local state machines and
29 * program counters. The shared state machine is managed with a Barrier IPC
30 * primitive. When all attached participants arrive at a barrier, the phase
31 * advances and all waiting participants are released.
33 * When a participant begins working on a parallel hash join, it must first
34 * figure out how much progress has already been made, because participants
35 * don't wait for each other to begin. For this reason there are switch
36 * statements at key points in the code where we have to synchronize our local
37 * state machine with the phase, and then jump to the correct part of the
38 * algorithm so that we can get started.
40 * One barrier called build_barrier is used to coordinate the hashing phases.
41 * The phase is represented by an integer which begins at zero and increments
42 * one by one, but in the code it is referred to by symbolic names as follows:
44 * PHJ_BUILD_ELECTING -- initial state
45 * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
46 * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
47 * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
48 * PHJ_BUILD_DONE -- building done, probing can begin
50 * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
51 * be used repeatedly as required to coordinate expansions in the number of
52 * batches or buckets. Their phases are as follows:
54 * PHJ_GROW_BATCHES_ELECTING -- initial state
55 * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
56 * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
57 * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
59 * PHJ_GROW_BUCKETS_ELECTING -- initial state
60 * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
61 * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
63 * If the planner got the number of batches and buckets right, those won't be
64 * necessary, but on the other hand we might finish up needing to expand the
65 * buckets or batches multiple times while hashing the inner relation to stay
66 * within our memory budget and load factor target. For that reason it's a
67 * separate pair of barriers using circular phases.
69 * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
70 * because we need to divide the outer relation into batches up front in order
71 * to be able to process batches entirely independently. In contrast, the
72 * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
73 * batches whenever it encounters them while scanning and probing, which it
74 * can do because it processes batches in serial order.
76 * Once PHJ_BUILD_DONE is reached, backends then split up and process
77 * different batches, or gang up and work together on probing batches if there
78 * aren't enough to go around. For each batch there is a separate barrier
79 * with the following phases:
81 * PHJ_BATCH_ELECTING -- initial state
82 * PHJ_BATCH_ALLOCATING -- one allocates buckets
83 * PHJ_BATCH_LOADING -- all load the hash table from disk
84 * PHJ_BATCH_PROBING -- all probe
85 * PHJ_BATCH_DONE -- end
87 * Batch 0 is a special case, because it starts out in phase
88 * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
89 * PHJ_BUILD_HASHING_INNER so we can skip loading.
91 * Initially we try to plan for a single-batch hash join using the combined
92 * hash_mem of all participants to create a large shared hash table. If that
93 * turns out either at planning or execution time to be impossible then we
94 * fall back to regular hash_mem sized hash tables.
96 * To avoid deadlocks, we never wait for any barrier unless it is known that
97 * all other backends attached to it are actively executing the node or have
98 * already arrived. Practically, that means that we never return a tuple
99 * while attached to a barrier, unless the barrier has reached its final
100 * state. In the slightly special case of the per-batch barrier, we return
101 * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
102 * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
104 *-------------------------------------------------------------------------
107 #include "postgres.h"
109 #include "access/htup_details.h"
110 #include "access/parallel.h"
111 #include "executor/executor.h"
112 #include "executor/hashjoin.h"
113 #include "executor/nodeHash.h"
114 #include "executor/nodeHashjoin.h"
115 #include "miscadmin.h"
117 #include "utils/memutils.h"
118 #include "utils/sharedtuplestore.h"
122 * States of the ExecHashJoin state machine
124 #define HJ_BUILD_HASHTABLE 1
125 #define HJ_NEED_NEW_OUTER 2
126 #define HJ_SCAN_BUCKET 3
127 #define HJ_FILL_OUTER_TUPLE 4
128 #define HJ_FILL_INNER_TUPLES 5
129 #define HJ_NEED_NEW_BATCH 6
131 /* Returns true if doing null-fill on outer relation */
132 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
133 /* Returns true if doing null-fill on inner relation */
134 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
136 static TupleTableSlot
*ExecHashJoinOuterGetTuple(PlanState
*outerNode
,
137 HashJoinState
*hjstate
,
139 static TupleTableSlot
*ExecParallelHashJoinOuterGetTuple(PlanState
*outerNode
,
140 HashJoinState
*hjstate
,
142 static TupleTableSlot
*ExecHashJoinGetSavedTuple(HashJoinState
*hjstate
,
145 TupleTableSlot
*tupleSlot
);
146 static bool ExecHashJoinNewBatch(HashJoinState
*hjstate
);
147 static bool ExecParallelHashJoinNewBatch(HashJoinState
*hjstate
);
148 static void ExecParallelHashJoinPartitionOuter(HashJoinState
*hjstate
);
151 /* ----------------------------------------------------------------
154 * This function implements the Hybrid Hashjoin algorithm. It is marked
155 * with an always-inline attribute so that ExecHashJoin() and
156 * ExecParallelHashJoin() can inline it. Compilers that respect the
157 * attribute should create versions specialized for parallel == true and
158 * parallel == false with unnecessary branches removed.
160 * Note: the relation we build hash table on is the "inner"
161 * the other one is "outer".
162 * ----------------------------------------------------------------
164 static pg_attribute_always_inline TupleTableSlot
*
165 ExecHashJoinImpl(PlanState
*pstate
, bool parallel
)
167 HashJoinState
*node
= castNode(HashJoinState
, pstate
);
168 PlanState
*outerNode
;
171 ExprState
*otherqual
;
172 ExprContext
*econtext
;
173 HashJoinTable hashtable
;
174 TupleTableSlot
*outerTupleSlot
;
177 ParallelHashJoinState
*parallel_state
;
180 * get information from HashJoin node
182 joinqual
= node
->js
.joinqual
;
183 otherqual
= node
->js
.ps
.qual
;
184 hashNode
= (HashState
*) innerPlanState(node
);
185 outerNode
= outerPlanState(node
);
186 hashtable
= node
->hj_HashTable
;
187 econtext
= node
->js
.ps
.ps_ExprContext
;
188 parallel_state
= hashNode
->parallel_state
;
191 * Reset per-tuple memory context to free any expression evaluation
192 * storage allocated in the previous tuple cycle.
194 ResetExprContext(econtext
);
197 * run the hash join state machine
202 * It's possible to iterate this loop many times before returning a
203 * tuple, in some pathological cases such as needing to move much of
204 * the current batch to a later batch. So let's check for interrupts
207 CHECK_FOR_INTERRUPTS();
209 switch (node
->hj_JoinState
)
211 case HJ_BUILD_HASHTABLE
:
214 * First time through: build hash table for inner relation.
216 Assert(hashtable
== NULL
);
219 * If the outer relation is completely empty, and it's not
220 * right/full join, we can quit without building the hash
221 * table. However, for an inner join it is only a win to
222 * check this when the outer relation's startup cost is less
223 * than the projected cost of building the hash table.
224 * Otherwise it's best to build the hash table first and see
225 * if the inner relation is empty. (When it's a left join, we
226 * should always make this check, since we aren't going to be
227 * able to skip the join on the strength of an empty inner
230 * If we are rescanning the join, we make use of information
231 * gained on the previous scan: don't bother to try the
232 * prefetch if the previous scan found the outer relation
233 * nonempty. This is not 100% reliable since with new
234 * parameters the outer relation might yield different
235 * results, but it's a good heuristic.
237 * The only way to make the check is to try to fetch a tuple
238 * from the outer plan node. If we succeed, we have to stash
239 * it away for later consumption by ExecHashJoinOuterGetTuple.
241 if (HJ_FILL_INNER(node
))
243 /* no chance to not build the hash table */
244 node
->hj_FirstOuterTupleSlot
= NULL
;
249 * The empty-outer optimization is not implemented for
250 * shared hash tables, because no one participant can
251 * determine that there are no outer tuples, and it's not
252 * yet clear that it's worth the synchronization overhead
253 * of reaching consensus to figure that out. So we have
254 * to build the hash table.
256 node
->hj_FirstOuterTupleSlot
= NULL
;
258 else if (HJ_FILL_OUTER(node
) ||
259 (outerNode
->plan
->startup_cost
< hashNode
->ps
.plan
->total_cost
&&
260 !node
->hj_OuterNotEmpty
))
262 node
->hj_FirstOuterTupleSlot
= ExecProcNode(outerNode
);
263 if (TupIsNull(node
->hj_FirstOuterTupleSlot
))
265 node
->hj_OuterNotEmpty
= false;
269 node
->hj_OuterNotEmpty
= true;
272 node
->hj_FirstOuterTupleSlot
= NULL
;
275 * Create the hash table. If using Parallel Hash, then
276 * whoever gets here first will create the hash table and any
277 * later arrivals will merely attach to it.
279 hashtable
= ExecHashTableCreate(hashNode
,
280 node
->hj_HashOperators
,
282 HJ_FILL_INNER(node
));
283 node
->hj_HashTable
= hashtable
;
286 * Execute the Hash node, to build the hash table. If using
287 * Parallel Hash, then we'll try to help hashing unless we
290 hashNode
->hashtable
= hashtable
;
291 (void) MultiExecProcNode((PlanState
*) hashNode
);
294 * If the inner relation is completely empty, and we're not
295 * doing a left outer join, we can quit without scanning the
298 if (hashtable
->totalTuples
== 0 && !HJ_FILL_OUTER(node
))
302 * need to remember whether nbatch has increased since we
303 * began scanning the outer relation
305 hashtable
->nbatch_outstart
= hashtable
->nbatch
;
308 * Reset OuterNotEmpty for scan. (It's OK if we fetched a
309 * tuple above, because ExecHashJoinOuterGetTuple will
310 * immediately set it again.)
312 node
->hj_OuterNotEmpty
= false;
316 Barrier
*build_barrier
;
318 build_barrier
= ¶llel_state
->build_barrier
;
319 Assert(BarrierPhase(build_barrier
) == PHJ_BUILD_HASHING_OUTER
||
320 BarrierPhase(build_barrier
) == PHJ_BUILD_DONE
);
321 if (BarrierPhase(build_barrier
) == PHJ_BUILD_HASHING_OUTER
)
324 * If multi-batch, we need to hash the outer relation
327 if (hashtable
->nbatch
> 1)
328 ExecParallelHashJoinPartitionOuter(node
);
329 BarrierArriveAndWait(build_barrier
,
330 WAIT_EVENT_HASH_BUILD_HASH_OUTER
);
332 Assert(BarrierPhase(build_barrier
) == PHJ_BUILD_DONE
);
334 /* Each backend should now select a batch to work on. */
335 hashtable
->curbatch
= -1;
336 node
->hj_JoinState
= HJ_NEED_NEW_BATCH
;
341 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
345 case HJ_NEED_NEW_OUTER
:
348 * We don't have an outer tuple, try to get the next one
352 ExecParallelHashJoinOuterGetTuple(outerNode
, node
,
356 ExecHashJoinOuterGetTuple(outerNode
, node
, &hashvalue
);
358 if (TupIsNull(outerTupleSlot
))
360 /* end of batch, or maybe whole join */
361 if (HJ_FILL_INNER(node
))
363 /* set up to scan for unmatched inner tuples */
364 ExecPrepHashTableForUnmatched(node
);
365 node
->hj_JoinState
= HJ_FILL_INNER_TUPLES
;
368 node
->hj_JoinState
= HJ_NEED_NEW_BATCH
;
372 econtext
->ecxt_outertuple
= outerTupleSlot
;
373 node
->hj_MatchedOuter
= false;
376 * Find the corresponding bucket for this tuple in the main
377 * hash table or skew hash table.
379 node
->hj_CurHashValue
= hashvalue
;
380 ExecHashGetBucketAndBatch(hashtable
, hashvalue
,
381 &node
->hj_CurBucketNo
, &batchno
);
382 node
->hj_CurSkewBucketNo
= ExecHashGetSkewBucket(hashtable
,
384 node
->hj_CurTuple
= NULL
;
387 * The tuple might not belong to the current batch (where
388 * "current batch" includes the skew buckets if any).
390 if (batchno
!= hashtable
->curbatch
&&
391 node
->hj_CurSkewBucketNo
== INVALID_SKEW_BUCKET_NO
)
394 MinimalTuple mintuple
= ExecFetchSlotMinimalTuple(outerTupleSlot
,
398 * Need to postpone this outer tuple to a later batch.
399 * Save it in the corresponding outer-batch file.
401 Assert(parallel_state
== NULL
);
402 Assert(batchno
> hashtable
->curbatch
);
403 ExecHashJoinSaveTuple(mintuple
, hashvalue
,
404 &hashtable
->outerBatchFile
[batchno
]);
407 heap_free_minimal_tuple(mintuple
);
409 /* Loop around, staying in HJ_NEED_NEW_OUTER state */
413 /* OK, let's scan the bucket for matches */
414 node
->hj_JoinState
= HJ_SCAN_BUCKET
;
421 * Scan the selected hash bucket for matches to current outer
425 if (!ExecParallelScanHashBucket(node
, econtext
))
427 /* out of matches; check for possible outer-join fill */
428 node
->hj_JoinState
= HJ_FILL_OUTER_TUPLE
;
434 if (!ExecScanHashBucket(node
, econtext
))
436 /* out of matches; check for possible outer-join fill */
437 node
->hj_JoinState
= HJ_FILL_OUTER_TUPLE
;
443 * We've got a match, but still need to test non-hashed quals.
444 * ExecScanHashBucket already set up all the state needed to
447 * If we pass the qual, then save state for next call and have
448 * ExecProject form the projection, store it in the tuple
449 * table, and return the slot.
451 * Only the joinquals determine tuple match status, but all
452 * quals must pass to actually return the tuple.
454 if (joinqual
== NULL
|| ExecQual(joinqual
, econtext
))
456 node
->hj_MatchedOuter
= true;
461 * Full/right outer joins are currently not supported
462 * for parallel joins, so we don't need to set the
463 * match bit. Experiments show that it's worth
464 * avoiding the shared memory traffic on large
467 Assert(!HJ_FILL_INNER(node
));
472 * This is really only needed if HJ_FILL_INNER(node),
473 * but we'll avoid the branch and just set it always.
475 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node
->hj_CurTuple
));
478 /* In an antijoin, we never return a matched tuple */
479 if (node
->js
.jointype
== JOIN_ANTI
)
481 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
486 * If we only need to join to the first matching inner
487 * tuple, then consider returning this one, but after that
488 * continue with next outer tuple.
490 if (node
->js
.single_match
)
491 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
493 if (otherqual
== NULL
|| ExecQual(otherqual
, econtext
))
494 return ExecProject(node
->js
.ps
.ps_ProjInfo
);
496 InstrCountFiltered2(node
, 1);
499 InstrCountFiltered1(node
, 1);
502 case HJ_FILL_OUTER_TUPLE
:
505 * The current outer tuple has run out of matches, so check
506 * whether to emit a dummy outer-join tuple. Whether we emit
507 * one or not, the next state is NEED_NEW_OUTER.
509 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
511 if (!node
->hj_MatchedOuter
&&
515 * Generate a fake join tuple with nulls for the inner
516 * tuple, and return it if it passes the non-join quals.
518 econtext
->ecxt_innertuple
= node
->hj_NullInnerTupleSlot
;
520 if (otherqual
== NULL
|| ExecQual(otherqual
, econtext
))
521 return ExecProject(node
->js
.ps
.ps_ProjInfo
);
523 InstrCountFiltered2(node
, 1);
527 case HJ_FILL_INNER_TUPLES
:
530 * We have finished a batch, but we are doing right/full join,
531 * so any unmatched inner tuples in the hashtable have to be
532 * emitted before we continue to the next batch.
534 if (!ExecScanHashTableForUnmatched(node
, econtext
))
536 /* no more unmatched tuples */
537 node
->hj_JoinState
= HJ_NEED_NEW_BATCH
;
542 * Generate a fake join tuple with nulls for the outer tuple,
543 * and return it if it passes the non-join quals.
545 econtext
->ecxt_outertuple
= node
->hj_NullOuterTupleSlot
;
547 if (otherqual
== NULL
|| ExecQual(otherqual
, econtext
))
548 return ExecProject(node
->js
.ps
.ps_ProjInfo
);
550 InstrCountFiltered2(node
, 1);
553 case HJ_NEED_NEW_BATCH
:
556 * Try to advance to next batch. Done if there are no more.
560 if (!ExecParallelHashJoinNewBatch(node
))
561 return NULL
; /* end of parallel-aware join */
565 if (!ExecHashJoinNewBatch(node
))
566 return NULL
; /* end of parallel-oblivious join */
568 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
572 elog(ERROR
, "unrecognized hashjoin state: %d",
573 (int) node
->hj_JoinState
);
578 /* ----------------------------------------------------------------
581 * Parallel-oblivious version.
582 * ----------------------------------------------------------------
584 static TupleTableSlot
* /* return: a tuple or NULL */
585 ExecHashJoin(PlanState
*pstate
)
588 * On sufficiently smart compilers this should be inlined with the
589 * parallel-aware branches removed.
591 return ExecHashJoinImpl(pstate
, false);
594 /* ----------------------------------------------------------------
595 * ExecParallelHashJoin
597 * Parallel-aware version.
598 * ----------------------------------------------------------------
600 static TupleTableSlot
* /* return: a tuple or NULL */
601 ExecParallelHashJoin(PlanState
*pstate
)
604 * On sufficiently smart compilers this should be inlined with the
605 * parallel-oblivious branches removed.
607 return ExecHashJoinImpl(pstate
, true);
610 /* ----------------------------------------------------------------
613 * Init routine for HashJoin node.
614 * ----------------------------------------------------------------
617 ExecInitHashJoin(HashJoin
*node
, EState
*estate
, int eflags
)
619 HashJoinState
*hjstate
;
624 const TupleTableSlotOps
*ops
;
626 /* check for unsupported flags */
627 Assert(!(eflags
& (EXEC_FLAG_BACKWARD
| EXEC_FLAG_MARK
)));
630 * create state structure
632 hjstate
= makeNode(HashJoinState
);
633 hjstate
->js
.ps
.plan
= (Plan
*) node
;
634 hjstate
->js
.ps
.state
= estate
;
637 * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
638 * where this function may be replaced with a parallel version, if we
639 * managed to launch a parallel query.
641 hjstate
->js
.ps
.ExecProcNode
= ExecHashJoin
;
642 hjstate
->js
.jointype
= node
->join
.jointype
;
645 * Miscellaneous initialization
647 * create expression context for node
649 ExecAssignExprContext(estate
, &hjstate
->js
.ps
);
652 * initialize child nodes
654 * Note: we could suppress the REWIND flag for the inner input, which
655 * would amount to betting that the hash will be a single batch. Not
656 * clear if this would be a win or not.
658 outerNode
= outerPlan(node
);
659 hashNode
= (Hash
*) innerPlan(node
);
661 outerPlanState(hjstate
) = ExecInitNode(outerNode
, estate
, eflags
);
662 outerDesc
= ExecGetResultType(outerPlanState(hjstate
));
663 innerPlanState(hjstate
) = ExecInitNode((Plan
*) hashNode
, estate
, eflags
);
664 innerDesc
= ExecGetResultType(innerPlanState(hjstate
));
667 * Initialize result slot, type and projection.
669 ExecInitResultTupleSlotTL(&hjstate
->js
.ps
, &TTSOpsVirtual
);
670 ExecAssignProjectionInfo(&hjstate
->js
.ps
, NULL
);
673 * tuple table initialization
675 ops
= ExecGetResultSlotOps(outerPlanState(hjstate
), NULL
);
676 hjstate
->hj_OuterTupleSlot
= ExecInitExtraTupleSlot(estate
, outerDesc
,
680 * detect whether we need only consider the first matching inner tuple
682 hjstate
->js
.single_match
= (node
->join
.inner_unique
||
683 node
->join
.jointype
== JOIN_SEMI
);
685 /* set up null tuples for outer joins, if needed */
686 switch (node
->join
.jointype
)
693 hjstate
->hj_NullInnerTupleSlot
=
694 ExecInitNullTupleSlot(estate
, innerDesc
, &TTSOpsVirtual
);
697 hjstate
->hj_NullOuterTupleSlot
=
698 ExecInitNullTupleSlot(estate
, outerDesc
, &TTSOpsVirtual
);
701 hjstate
->hj_NullOuterTupleSlot
=
702 ExecInitNullTupleSlot(estate
, outerDesc
, &TTSOpsVirtual
);
703 hjstate
->hj_NullInnerTupleSlot
=
704 ExecInitNullTupleSlot(estate
, innerDesc
, &TTSOpsVirtual
);
707 elog(ERROR
, "unrecognized join type: %d",
708 (int) node
->join
.jointype
);
712 * now for some voodoo. our temporary tuple slot is actually the result
713 * tuple slot of the Hash node (which is our inner plan). we can do this
714 * because Hash nodes don't return tuples via ExecProcNode() -- instead
715 * the hash join node uses ExecScanHashBucket() to get at the contents of
716 * the hash table. -cim 6/9/91
719 HashState
*hashstate
= (HashState
*) innerPlanState(hjstate
);
720 TupleTableSlot
*slot
= hashstate
->ps
.ps_ResultTupleSlot
;
722 hjstate
->hj_HashTupleSlot
= slot
;
726 * initialize child expressions
728 hjstate
->js
.ps
.qual
=
729 ExecInitQual(node
->join
.plan
.qual
, (PlanState
*) hjstate
);
730 hjstate
->js
.joinqual
=
731 ExecInitQual(node
->join
.joinqual
, (PlanState
*) hjstate
);
732 hjstate
->hashclauses
=
733 ExecInitQual(node
->hashclauses
, (PlanState
*) hjstate
);
736 * initialize hash-specific info
738 hjstate
->hj_HashTable
= NULL
;
739 hjstate
->hj_FirstOuterTupleSlot
= NULL
;
741 hjstate
->hj_CurHashValue
= 0;
742 hjstate
->hj_CurBucketNo
= 0;
743 hjstate
->hj_CurSkewBucketNo
= INVALID_SKEW_BUCKET_NO
;
744 hjstate
->hj_CurTuple
= NULL
;
746 hjstate
->hj_OuterHashKeys
= ExecInitExprList(node
->hashkeys
,
747 (PlanState
*) hjstate
);
748 hjstate
->hj_HashOperators
= node
->hashoperators
;
749 hjstate
->hj_Collations
= node
->hashcollations
;
751 hjstate
->hj_JoinState
= HJ_BUILD_HASHTABLE
;
752 hjstate
->hj_MatchedOuter
= false;
753 hjstate
->hj_OuterNotEmpty
= false;
758 /* ----------------------------------------------------------------
761 * clean up routine for HashJoin node
762 * ----------------------------------------------------------------
765 ExecEndHashJoin(HashJoinState
*node
)
770 if (node
->hj_HashTable
)
772 ExecHashTableDestroy(node
->hj_HashTable
);
773 node
->hj_HashTable
= NULL
;
777 * Free the exprcontext
779 ExecFreeExprContext(&node
->js
.ps
);
782 * clean out the tuple table
784 ExecClearTuple(node
->js
.ps
.ps_ResultTupleSlot
);
785 ExecClearTuple(node
->hj_OuterTupleSlot
);
786 ExecClearTuple(node
->hj_HashTupleSlot
);
791 ExecEndNode(outerPlanState(node
));
792 ExecEndNode(innerPlanState(node
));
796 * ExecHashJoinOuterGetTuple
798 * get the next outer tuple for a parallel oblivious hashjoin: either by
799 * executing the outer plan node in the first pass, or from the temp
800 * files for the hashjoin batches.
802 * Returns a null slot if no more outer tuples (within the current batch).
804 * On success, the tuple's hash value is stored at *hashvalue --- this is
805 * either originally computed, or re-read from the temp file.
807 static TupleTableSlot
*
808 ExecHashJoinOuterGetTuple(PlanState
*outerNode
,
809 HashJoinState
*hjstate
,
812 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
813 int curbatch
= hashtable
->curbatch
;
814 TupleTableSlot
*slot
;
816 if (curbatch
== 0) /* if it is the first pass */
819 * Check to see if first outer tuple was already fetched by
820 * ExecHashJoin() and not used yet.
822 slot
= hjstate
->hj_FirstOuterTupleSlot
;
823 if (!TupIsNull(slot
))
824 hjstate
->hj_FirstOuterTupleSlot
= NULL
;
826 slot
= ExecProcNode(outerNode
);
828 while (!TupIsNull(slot
))
831 * We have to compute the tuple's hash value.
833 ExprContext
*econtext
= hjstate
->js
.ps
.ps_ExprContext
;
835 econtext
->ecxt_outertuple
= slot
;
836 if (ExecHashGetHashValue(hashtable
, econtext
,
837 hjstate
->hj_OuterHashKeys
,
838 true, /* outer tuple */
839 HJ_FILL_OUTER(hjstate
),
842 /* remember outer relation is not empty for possible rescan */
843 hjstate
->hj_OuterNotEmpty
= true;
849 * That tuple couldn't match because of a NULL, so discard it and
850 * continue with the next one.
852 slot
= ExecProcNode(outerNode
);
855 else if (curbatch
< hashtable
->nbatch
)
857 BufFile
*file
= hashtable
->outerBatchFile
[curbatch
];
860 * In outer-join cases, we could get here even though the batch file
866 slot
= ExecHashJoinGetSavedTuple(hjstate
,
869 hjstate
->hj_OuterTupleSlot
);
870 if (!TupIsNull(slot
))
874 /* End of this batch */
879 * ExecHashJoinOuterGetTuple variant for the parallel case.
881 static TupleTableSlot
*
882 ExecParallelHashJoinOuterGetTuple(PlanState
*outerNode
,
883 HashJoinState
*hjstate
,
886 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
887 int curbatch
= hashtable
->curbatch
;
888 TupleTableSlot
*slot
;
891 * In the Parallel Hash case we only run the outer plan directly for
892 * single-batch hash joins. Otherwise we have to go to batch files, even
895 if (curbatch
== 0 && hashtable
->nbatch
== 1)
897 slot
= ExecProcNode(outerNode
);
899 while (!TupIsNull(slot
))
901 ExprContext
*econtext
= hjstate
->js
.ps
.ps_ExprContext
;
903 econtext
->ecxt_outertuple
= slot
;
904 if (ExecHashGetHashValue(hashtable
, econtext
,
905 hjstate
->hj_OuterHashKeys
,
906 true, /* outer tuple */
907 HJ_FILL_OUTER(hjstate
),
912 * That tuple couldn't match because of a NULL, so discard it and
913 * continue with the next one.
915 slot
= ExecProcNode(outerNode
);
918 else if (curbatch
< hashtable
->nbatch
)
922 tuple
= sts_parallel_scan_next(hashtable
->batches
[curbatch
].outer_tuples
,
926 ExecForceStoreMinimalTuple(tuple
,
927 hjstate
->hj_OuterTupleSlot
,
929 slot
= hjstate
->hj_OuterTupleSlot
;
933 ExecClearTuple(hjstate
->hj_OuterTupleSlot
);
936 /* End of this batch */
941 * ExecHashJoinNewBatch
942 * switch to a new hashjoin batch
944 * Returns true if successful, false if there are no more batches.
947 ExecHashJoinNewBatch(HashJoinState
*hjstate
)
949 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
953 TupleTableSlot
*slot
;
956 nbatch
= hashtable
->nbatch
;
957 curbatch
= hashtable
->curbatch
;
962 * We no longer need the previous outer batch file; close it right
963 * away to free disk space.
965 if (hashtable
->outerBatchFile
[curbatch
])
966 BufFileClose(hashtable
->outerBatchFile
[curbatch
]);
967 hashtable
->outerBatchFile
[curbatch
] = NULL
;
969 else /* we just finished the first batch */
972 * Reset some of the skew optimization state variables, since we no
973 * longer need to consider skew tuples after the first batch. The
974 * memory context reset we are about to do will release the skew
977 hashtable
->skewEnabled
= false;
978 hashtable
->skewBucket
= NULL
;
979 hashtable
->skewBucketNums
= NULL
;
980 hashtable
->nSkewBuckets
= 0;
981 hashtable
->spaceUsedSkew
= 0;
985 * We can always skip over any batches that are completely empty on both
986 * sides. We can sometimes skip over batches that are empty on only one
987 * side, but there are exceptions:
989 * 1. In a left/full outer join, we have to process outer batches even if
990 * the inner batch is empty. Similarly, in a right/full outer join, we
991 * have to process inner batches even if the outer batch is empty.
993 * 2. If we have increased nbatch since the initial estimate, we have to
994 * scan inner batches since they might contain tuples that need to be
995 * reassigned to later inner batches.
997 * 3. Similarly, if we have increased nbatch since starting the outer
998 * scan, we have to rescan outer batches in case they contain tuples that
999 * need to be reassigned.
1002 while (curbatch
< nbatch
&&
1003 (hashtable
->outerBatchFile
[curbatch
] == NULL
||
1004 hashtable
->innerBatchFile
[curbatch
] == NULL
))
1006 if (hashtable
->outerBatchFile
[curbatch
] &&
1007 HJ_FILL_OUTER(hjstate
))
1008 break; /* must process due to rule 1 */
1009 if (hashtable
->innerBatchFile
[curbatch
] &&
1010 HJ_FILL_INNER(hjstate
))
1011 break; /* must process due to rule 1 */
1012 if (hashtable
->innerBatchFile
[curbatch
] &&
1013 nbatch
!= hashtable
->nbatch_original
)
1014 break; /* must process due to rule 2 */
1015 if (hashtable
->outerBatchFile
[curbatch
] &&
1016 nbatch
!= hashtable
->nbatch_outstart
)
1017 break; /* must process due to rule 3 */
1018 /* We can ignore this batch. */
1019 /* Release associated temp files right away. */
1020 if (hashtable
->innerBatchFile
[curbatch
])
1021 BufFileClose(hashtable
->innerBatchFile
[curbatch
]);
1022 hashtable
->innerBatchFile
[curbatch
] = NULL
;
1023 if (hashtable
->outerBatchFile
[curbatch
])
1024 BufFileClose(hashtable
->outerBatchFile
[curbatch
]);
1025 hashtable
->outerBatchFile
[curbatch
] = NULL
;
1029 if (curbatch
>= nbatch
)
1030 return false; /* no more batches */
1032 hashtable
->curbatch
= curbatch
;
1035 * Reload the hash table with the new inner batch (which could be empty)
1037 ExecHashTableReset(hashtable
);
1039 innerFile
= hashtable
->innerBatchFile
[curbatch
];
1041 if (innerFile
!= NULL
)
1043 if (BufFileSeek(innerFile
, 0, 0L, SEEK_SET
))
1045 (errcode_for_file_access(),
1046 errmsg("could not rewind hash-join temporary file")));
1048 while ((slot
= ExecHashJoinGetSavedTuple(hjstate
,
1051 hjstate
->hj_HashTupleSlot
)))
1054 * NOTE: some tuples may be sent to future batches. Also, it is
1055 * possible for hashtable->nbatch to be increased here!
1057 ExecHashTableInsert(hashtable
, slot
, hashvalue
);
1061 * after we build the hash table, the inner batch file is no longer
1064 BufFileClose(innerFile
);
1065 hashtable
->innerBatchFile
[curbatch
] = NULL
;
1069 * Rewind outer batch file (if present), so that we can start reading it.
1071 if (hashtable
->outerBatchFile
[curbatch
] != NULL
)
1073 if (BufFileSeek(hashtable
->outerBatchFile
[curbatch
], 0, 0L, SEEK_SET
))
1075 (errcode_for_file_access(),
1076 errmsg("could not rewind hash-join temporary file")));
1083 * Choose a batch to work on, and attach to it. Returns true if successful,
1084 * false if there are no more batches.
1087 ExecParallelHashJoinNewBatch(HashJoinState
*hjstate
)
1089 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
1094 * If we started up so late that the batch tracking array has been freed
1095 * already by ExecHashTableDetach(), then we are finished. See also
1096 * ExecParallelHashEnsureBatchAccessors().
1098 if (hashtable
->batches
== NULL
)
1102 * If we were already attached to a batch, remember not to bother checking
1103 * it again, and detach from it (possibly freeing the hash table if we are
1106 if (hashtable
->curbatch
>= 0)
1108 hashtable
->batches
[hashtable
->curbatch
].done
= true;
1109 ExecHashTableDetachBatch(hashtable
);
1113 * Search for a batch that isn't done. We use an atomic counter to start
1114 * our search at a different batch in every participant when there are
1115 * more batches than participants.
1117 batchno
= start_batchno
=
1118 pg_atomic_fetch_add_u32(&hashtable
->parallel_state
->distributor
, 1) %
1124 TupleTableSlot
*slot
;
1126 if (!hashtable
->batches
[batchno
].done
)
1128 SharedTuplestoreAccessor
*inner_tuples
;
1129 Barrier
*batch_barrier
=
1130 &hashtable
->batches
[batchno
].shared
->batch_barrier
;
1132 switch (BarrierAttach(batch_barrier
))
1134 case PHJ_BATCH_ELECTING
:
1136 /* One backend allocates the hash table. */
1137 if (BarrierArriveAndWait(batch_barrier
,
1138 WAIT_EVENT_HASH_BATCH_ELECT
))
1139 ExecParallelHashTableAlloc(hashtable
, batchno
);
1142 case PHJ_BATCH_ALLOCATING
:
1143 /* Wait for allocation to complete. */
1144 BarrierArriveAndWait(batch_barrier
,
1145 WAIT_EVENT_HASH_BATCH_ALLOCATE
);
1148 case PHJ_BATCH_LOADING
:
1149 /* Start (or join in) loading tuples. */
1150 ExecParallelHashTableSetCurrentBatch(hashtable
, batchno
);
1151 inner_tuples
= hashtable
->batches
[batchno
].inner_tuples
;
1152 sts_begin_parallel_scan(inner_tuples
);
1153 while ((tuple
= sts_parallel_scan_next(inner_tuples
,
1156 ExecForceStoreMinimalTuple(tuple
,
1157 hjstate
->hj_HashTupleSlot
,
1159 slot
= hjstate
->hj_HashTupleSlot
;
1160 ExecParallelHashTableInsertCurrentBatch(hashtable
, slot
,
1163 sts_end_parallel_scan(inner_tuples
);
1164 BarrierArriveAndWait(batch_barrier
,
1165 WAIT_EVENT_HASH_BATCH_LOAD
);
1168 case PHJ_BATCH_PROBING
:
1171 * This batch is ready to probe. Return control to
1172 * caller. We stay attached to batch_barrier so that the
1173 * hash table stays alive until everyone's finished
1174 * probing it, but no participant is allowed to wait at
1175 * this barrier again (or else a deadlock could occur).
1176 * All attached participants must eventually call
1177 * BarrierArriveAndDetach() so that the final phase
1178 * PHJ_BATCH_DONE can be reached.
1180 ExecParallelHashTableSetCurrentBatch(hashtable
, batchno
);
1181 sts_begin_parallel_scan(hashtable
->batches
[batchno
].outer_tuples
);
1184 case PHJ_BATCH_DONE
:
1187 * Already done. Detach and go around again (if any
1190 BarrierDetach(batch_barrier
);
1191 hashtable
->batches
[batchno
].done
= true;
1192 hashtable
->curbatch
= -1;
1196 elog(ERROR
, "unexpected batch phase %d",
1197 BarrierPhase(batch_barrier
));
1200 batchno
= (batchno
+ 1) % hashtable
->nbatch
;
1201 } while (batchno
!= start_batchno
);
1207 * ExecHashJoinSaveTuple
1208 * save a tuple to a batch file.
1210 * The data recorded in the file for each tuple is its hash value,
1211 * then the tuple in MinimalTuple format.
1213 * Note: it is important always to call this in the regular executor
1214 * context, not in a shorter-lived context; else the temp file buffers
1215 * will get messed up.
1218 ExecHashJoinSaveTuple(MinimalTuple tuple
, uint32 hashvalue
,
1221 BufFile
*file
= *fileptr
;
1225 /* First write to this batch file, so open it. */
1226 file
= BufFileCreateTemp(false);
1230 BufFileWrite(file
, (void *) &hashvalue
, sizeof(uint32
));
1231 BufFileWrite(file
, (void *) tuple
, tuple
->t_len
);
1235 * ExecHashJoinGetSavedTuple
1236 * read the next tuple from a batch file. Return NULL if no more.
1238 * On success, *hashvalue is set to the tuple's hash value, and the tuple
1239 * itself is stored in the given slot.
1241 static TupleTableSlot
*
1242 ExecHashJoinGetSavedTuple(HashJoinState
*hjstate
,
1245 TupleTableSlot
*tupleSlot
)
1252 * We check for interrupts here because this is typically taken as an
1253 * alternative code path to an ExecProcNode() call, which would include
1256 CHECK_FOR_INTERRUPTS();
1259 * Since both the hash value and the MinimalTuple length word are uint32,
1260 * we can read them both in one BufFileRead() call without any type
1263 nread
= BufFileRead(file
, (void *) header
, sizeof(header
));
1264 if (nread
== 0) /* end of file */
1266 ExecClearTuple(tupleSlot
);
1269 if (nread
!= sizeof(header
))
1271 (errcode_for_file_access(),
1272 errmsg("could not read from hash-join temporary file: read only %zu of %zu bytes",
1273 nread
, sizeof(header
))));
1274 *hashvalue
= header
[0];
1275 tuple
= (MinimalTuple
) palloc(header
[1]);
1276 tuple
->t_len
= header
[1];
1277 nread
= BufFileRead(file
,
1278 (void *) ((char *) tuple
+ sizeof(uint32
)),
1279 header
[1] - sizeof(uint32
));
1280 if (nread
!= header
[1] - sizeof(uint32
))
1282 (errcode_for_file_access(),
1283 errmsg("could not read from hash-join temporary file: read only %zu of %zu bytes",
1284 nread
, header
[1] - sizeof(uint32
))));
1285 ExecForceStoreMinimalTuple(tuple
, tupleSlot
, true);
1291 ExecReScanHashJoin(HashJoinState
*node
)
1293 PlanState
*outerPlan
= outerPlanState(node
);
1294 PlanState
*innerPlan
= innerPlanState(node
);
1297 * In a multi-batch join, we currently have to do rescans the hard way,
1298 * primarily because batch temp files may have already been released. But
1299 * if it's a single-batch join, and there is no parameter change for the
1300 * inner subnode, then we can just re-use the existing hash table without
1303 if (node
->hj_HashTable
!= NULL
)
1305 if (node
->hj_HashTable
->nbatch
== 1 &&
1306 innerPlan
->chgParam
== NULL
)
1309 * Okay to reuse the hash table; needn't rescan inner, either.
1311 * However, if it's a right/full join, we'd better reset the
1312 * inner-tuple match flags contained in the table.
1314 if (HJ_FILL_INNER(node
))
1315 ExecHashTableResetMatchFlags(node
->hj_HashTable
);
1318 * Also, we need to reset our state about the emptiness of the
1319 * outer relation, so that the new scan of the outer will update
1320 * it correctly if it turns out to be empty this time. (There's no
1321 * harm in clearing it now because ExecHashJoin won't need the
1322 * info. In the other cases, where the hash table doesn't exist
1323 * or we are destroying it, we leave this state alone because
1324 * ExecHashJoin will need it the first time through.)
1326 node
->hj_OuterNotEmpty
= false;
1328 /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1329 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
1333 /* must destroy and rebuild hash table */
1334 HashState
*hashNode
= castNode(HashState
, innerPlan
);
1336 Assert(hashNode
->hashtable
== node
->hj_HashTable
);
1337 /* accumulate stats from old hash table, if wanted */
1338 /* (this should match ExecShutdownHash) */
1339 if (hashNode
->ps
.instrument
&& !hashNode
->hinstrument
)
1340 hashNode
->hinstrument
= (HashInstrumentation
*)
1341 palloc0(sizeof(HashInstrumentation
));
1342 if (hashNode
->hinstrument
)
1343 ExecHashAccumInstrumentation(hashNode
->hinstrument
,
1344 hashNode
->hashtable
);
1345 /* for safety, be sure to clear child plan node's pointer too */
1346 hashNode
->hashtable
= NULL
;
1348 ExecHashTableDestroy(node
->hj_HashTable
);
1349 node
->hj_HashTable
= NULL
;
1350 node
->hj_JoinState
= HJ_BUILD_HASHTABLE
;
1353 * if chgParam of subnode is not null then plan will be re-scanned
1354 * by first ExecProcNode.
1356 if (innerPlan
->chgParam
== NULL
)
1357 ExecReScan(innerPlan
);
1361 /* Always reset intra-tuple state */
1362 node
->hj_CurHashValue
= 0;
1363 node
->hj_CurBucketNo
= 0;
1364 node
->hj_CurSkewBucketNo
= INVALID_SKEW_BUCKET_NO
;
1365 node
->hj_CurTuple
= NULL
;
1367 node
->hj_MatchedOuter
= false;
1368 node
->hj_FirstOuterTupleSlot
= NULL
;
1371 * if chgParam of subnode is not null then plan will be re-scanned by
1372 * first ExecProcNode.
1374 if (outerPlan
->chgParam
== NULL
)
1375 ExecReScan(outerPlan
);
1379 ExecShutdownHashJoin(HashJoinState
*node
)
1381 if (node
->hj_HashTable
)
1384 * Detach from shared state before DSM memory goes away. This makes
1385 * sure that we don't have any pointers into DSM memory by the time
1386 * ExecEndHashJoin runs.
1388 ExecHashTableDetachBatch(node
->hj_HashTable
);
1389 ExecHashTableDetach(node
->hj_HashTable
);
1394 ExecParallelHashJoinPartitionOuter(HashJoinState
*hjstate
)
1396 PlanState
*outerState
= outerPlanState(hjstate
);
1397 ExprContext
*econtext
= hjstate
->js
.ps
.ps_ExprContext
;
1398 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
1399 TupleTableSlot
*slot
;
1403 Assert(hjstate
->hj_FirstOuterTupleSlot
== NULL
);
1405 /* Execute outer plan, writing all tuples to shared tuplestores. */
1408 slot
= ExecProcNode(outerState
);
1409 if (TupIsNull(slot
))
1411 econtext
->ecxt_outertuple
= slot
;
1412 if (ExecHashGetHashValue(hashtable
, econtext
,
1413 hjstate
->hj_OuterHashKeys
,
1414 true, /* outer tuple */
1415 HJ_FILL_OUTER(hjstate
),
1421 MinimalTuple mintup
= ExecFetchSlotMinimalTuple(slot
, &shouldFree
);
1423 ExecHashGetBucketAndBatch(hashtable
, hashvalue
, &bucketno
,
1425 sts_puttuple(hashtable
->batches
[batchno
].outer_tuples
,
1426 &hashvalue
, mintup
);
1429 heap_free_minimal_tuple(mintup
);
1431 CHECK_FOR_INTERRUPTS();
1434 /* Make sure all outer partitions are readable by any backend. */
1435 for (i
= 0; i
< hashtable
->nbatch
; ++i
)
1436 sts_end_write(hashtable
->batches
[i
].outer_tuples
);
1440 ExecHashJoinEstimate(HashJoinState
*state
, ParallelContext
*pcxt
)
1442 shm_toc_estimate_chunk(&pcxt
->estimator
, sizeof(ParallelHashJoinState
));
1443 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
1447 ExecHashJoinInitializeDSM(HashJoinState
*state
, ParallelContext
*pcxt
)
1449 int plan_node_id
= state
->js
.ps
.plan
->plan_node_id
;
1450 HashState
*hashNode
;
1451 ParallelHashJoinState
*pstate
;
1454 * Disable shared hash table mode if we failed to create a real DSM
1455 * segment, because that means that we don't have a DSA area to work with.
1457 if (pcxt
->seg
== NULL
)
1460 ExecSetExecProcNode(&state
->js
.ps
, ExecParallelHashJoin
);
1463 * Set up the state needed to coordinate access to the shared hash
1464 * table(s), using the plan node ID as the toc key.
1466 pstate
= shm_toc_allocate(pcxt
->toc
, sizeof(ParallelHashJoinState
));
1467 shm_toc_insert(pcxt
->toc
, plan_node_id
, pstate
);
1470 * Set up the shared hash join state with no batches initially.
1471 * ExecHashTableCreate() will prepare at least one later and set nbatch
1472 * and space_allowed.
1475 pstate
->space_allowed
= 0;
1476 pstate
->batches
= InvalidDsaPointer
;
1477 pstate
->old_batches
= InvalidDsaPointer
;
1478 pstate
->nbuckets
= 0;
1479 pstate
->growth
= PHJ_GROWTH_OK
;
1480 pstate
->chunk_work_queue
= InvalidDsaPointer
;
1481 pg_atomic_init_u32(&pstate
->distributor
, 0);
1482 pstate
->nparticipants
= pcxt
->nworkers
+ 1;
1483 pstate
->total_tuples
= 0;
1484 LWLockInitialize(&pstate
->lock
,
1485 LWTRANCHE_PARALLEL_HASH_JOIN
);
1486 BarrierInit(&pstate
->build_barrier
, 0);
1487 BarrierInit(&pstate
->grow_batches_barrier
, 0);
1488 BarrierInit(&pstate
->grow_buckets_barrier
, 0);
1490 /* Set up the space we'll use for shared temporary files. */
1491 SharedFileSetInit(&pstate
->fileset
, pcxt
->seg
);
1493 /* Initialize the shared state in the hash node. */
1494 hashNode
= (HashState
*) innerPlanState(state
);
1495 hashNode
->parallel_state
= pstate
;
1498 /* ----------------------------------------------------------------
1499 * ExecHashJoinReInitializeDSM
1501 * Reset shared state before beginning a fresh scan.
1502 * ----------------------------------------------------------------
1505 ExecHashJoinReInitializeDSM(HashJoinState
*state
, ParallelContext
*pcxt
)
1507 int plan_node_id
= state
->js
.ps
.plan
->plan_node_id
;
1508 ParallelHashJoinState
*pstate
=
1509 shm_toc_lookup(pcxt
->toc
, plan_node_id
, false);
1512 * It would be possible to reuse the shared hash table in single-batch
1513 * cases by resetting and then fast-forwarding build_barrier to
1514 * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1515 * currently shared hash tables are already freed by now (by the last
1516 * participant to detach from the batch). We could consider keeping it
1517 * around for single-batch joins. We'd also need to adjust
1518 * finalize_plan() so that it doesn't record a dummy dependency for
1519 * Parallel Hash nodes, preventing the rescan optimization. For now we
1523 /* Detach, freeing any remaining shared memory. */
1524 if (state
->hj_HashTable
!= NULL
)
1526 ExecHashTableDetachBatch(state
->hj_HashTable
);
1527 ExecHashTableDetach(state
->hj_HashTable
);
1530 /* Clear any shared batch files. */
1531 SharedFileSetDeleteAll(&pstate
->fileset
);
1533 /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1534 BarrierInit(&pstate
->build_barrier
, 0);
1538 ExecHashJoinInitializeWorker(HashJoinState
*state
,
1539 ParallelWorkerContext
*pwcxt
)
1541 HashState
*hashNode
;
1542 int plan_node_id
= state
->js
.ps
.plan
->plan_node_id
;
1543 ParallelHashJoinState
*pstate
=
1544 shm_toc_lookup(pwcxt
->toc
, plan_node_id
, false);
1546 /* Attach to the space for shared temporary files. */
1547 SharedFileSetAttach(&pstate
->fileset
, pwcxt
->seg
);
1549 /* Attach to the shared state in the hash node. */
1550 hashNode
= (HashState
*) innerPlanState(state
);
1551 hashNode
->parallel_state
= pstate
;
1553 ExecSetExecProcNode(&state
->js
.ps
, ExecParallelHashJoin
);