1 /*-------------------------------------------------------------------------
4 * Routines to handle hash join nodes
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeHashjoin.c
15 * This is based on the "hybrid hash join" algorithm described shortly in the
18 * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
20 * and in detail in the referenced paper:
22 * "An Adaptive Hash Join Algorithm for Multiuser Environments"
23 * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference.
26 * If the inner side tuples of a hash join do not fit in memory, the hash join
27 * can be executed in multiple batches.
29 * If the statistics on the inner side relation are accurate, planner chooses a
30 * multi-batch strategy and estimates the number of batches.
32 * The query executor measures the real size of the hashtable and increases the
33 * number of batches if the hashtable grows too large.
35 * The number of batches is always a power of two, so an increase in the number
36 * of batches doubles it.
38 * Serial hash join measures batch size lazily -- waiting until it is loading a
39 * batch to determine if it will fit in memory. While inserting tuples into the
40 * hashtable, serial hash join will, if that tuple were to exceed work_mem,
41 * dump out the hashtable and reassign them either to other batch files or the
42 * current batch resident in the hashtable.
44 * Parallel hash join, on the other hand, completes all changes to the number
45 * of batches during the build phase. If it increases the number of batches, it
46 * dumps out all the tuples from all batches and reassigns them to entirely new
47 * batch files. Then it checks every batch to ensure it will fit in the space
48 * budget for the query.
50 * In both parallel and serial hash join, the executor currently makes a best
51 * effort. If a particular batch will not fit in memory, it tries doubling the
52 * number of batches. If after a batch increase, there is a batch which
53 * retained all or none of its tuples, the executor disables growth in the
54 * number of batches globally. After growth is disabled, all batches that would
55 * have previously triggered an increase in the number of batches instead
56 * exceed the space allowed.
60 * Hash joins can participate in parallel query execution in several ways. A
61 * parallel-oblivious hash join is one where the node is unaware that it is
62 * part of a parallel plan. In this case, a copy of the inner plan is used to
63 * build a copy of the hash table in every backend, and the outer plan could
64 * either be built from a partial or complete path, so that the results of the
65 * hash join are correspondingly either partial or complete. A parallel-aware
66 * hash join is one that behaves differently, coordinating work between
67 * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
68 * Hash Join always appears with a Parallel Hash node.
70 * Parallel-aware hash joins use the same per-backend state machine to track
71 * progress through the hash join algorithm as parallel-oblivious hash joins.
72 * In a parallel-aware hash join, there is also a shared state machine that
73 * co-operating backends use to synchronize their local state machines and
74 * program counters. The shared state machine is managed with a Barrier IPC
75 * primitive. When all attached participants arrive at a barrier, the phase
76 * advances and all waiting participants are released.
78 * When a participant begins working on a parallel hash join, it must first
79 * figure out how much progress has already been made, because participants
80 * don't wait for each other to begin. For this reason there are switch
81 * statements at key points in the code where we have to synchronize our local
82 * state machine with the phase, and then jump to the correct part of the
83 * algorithm so that we can get started.
85 * One barrier called build_barrier is used to coordinate the hashing phases.
86 * The phase is represented by an integer which begins at zero and increments
87 * one by one, but in the code it is referred to by symbolic names as follows.
88 * An asterisk indicates a phase that is performed by a single arbitrarily
91 * PHJ_BUILD_ELECT -- initial state
92 * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
93 * PHJ_BUILD_HASH_INNER -- all hash the inner rel
94 * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
95 * PHJ_BUILD_RUN -- building done, probing can begin
96 * PHJ_BUILD_FREE* -- all work complete, one frees batches
98 * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
99 * be used repeatedly as required to coordinate expansions in the number of
100 * batches or buckets. Their phases are as follows:
102 * PHJ_GROW_BATCHES_ELECT -- initial state
103 * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
104 * PHJ_GROW_BATCHES_REPARTITION -- all repartition
105 * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
106 * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
108 * PHJ_GROW_BUCKETS_ELECT -- initial state
109 * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
110 * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
112 * If the planner got the number of batches and buckets right, those won't be
113 * necessary, but on the other hand we might finish up needing to expand the
114 * buckets or batches multiple times while hashing the inner relation to stay
115 * within our memory budget and load factor target. For that reason it's a
116 * separate pair of barriers using circular phases.
118 * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
119 * because we need to divide the outer relation into batches up front in order
120 * to be able to process batches entirely independently. In contrast, the
121 * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
122 * batches whenever it encounters them while scanning and probing, which it
123 * can do because it processes batches in serial order.
125 * Once PHJ_BUILD_RUN is reached, backends then split up and process
126 * different batches, or gang up and work together on probing batches if there
127 * aren't enough to go around. For each batch there is a separate barrier
128 * with the following phases:
130 * PHJ_BATCH_ELECT -- initial state
131 * PHJ_BATCH_ALLOCATE* -- one allocates buckets
132 * PHJ_BATCH_LOAD -- all load the hash table from disk
133 * PHJ_BATCH_PROBE -- all probe
134 * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
135 * PHJ_BATCH_FREE* -- one frees memory
137 * Batch 0 is a special case, because it starts out in phase
138 * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
139 * PHJ_BUILD_HASH_INNER so we can skip loading.
141 * Initially we try to plan for a single-batch hash join using the combined
142 * hash_mem of all participants to create a large shared hash table. If that
143 * turns out either at planning or execution time to be impossible then we
144 * fall back to regular hash_mem sized hash tables.
146 * To avoid deadlocks, we never wait for any barrier unless it is known that
147 * all other backends attached to it are actively executing the node or have
148 * finished. Practically, that means that we never emit a tuple while attached
149 * to a barrier, unless the barrier has reached a phase that means that no
150 * process will wait on it again. We emit tuples while attached to the build
151 * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
152 * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
153 * respectively without waiting, using BarrierArriveAndDetach() and
154 * BarrierArriveAndDetachExceptLast() respectively. The last to detach
155 * receives a different return value so that it knows that it's safe to
156 * clean up. Any straggler process that attaches after that phase is reached
157 * will see that it's too late to participate or access the relevant shared
160 *-------------------------------------------------------------------------
163 #include "postgres.h"
165 #include "access/htup_details.h"
166 #include "access/parallel.h"
167 #include "executor/executor.h"
168 #include "executor/hashjoin.h"
169 #include "executor/nodeHash.h"
170 #include "executor/nodeHashjoin.h"
171 #include "miscadmin.h"
172 #include "utils/sharedtuplestore.h"
173 #include "utils/wait_event.h"
177 * States of the ExecHashJoin state machine
179 #define HJ_BUILD_HASHTABLE 1
180 #define HJ_NEED_NEW_OUTER 2
181 #define HJ_SCAN_BUCKET 3
182 #define HJ_FILL_OUTER_TUPLE 4
183 #define HJ_FILL_INNER_TUPLES 5
184 #define HJ_NEED_NEW_BATCH 6
186 /* Returns true if doing null-fill on outer relation */
187 #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
188 /* Returns true if doing null-fill on inner relation */
189 #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
191 static TupleTableSlot
*ExecHashJoinOuterGetTuple(PlanState
*outerNode
,
192 HashJoinState
*hjstate
,
194 static TupleTableSlot
*ExecParallelHashJoinOuterGetTuple(PlanState
*outerNode
,
195 HashJoinState
*hjstate
,
197 static TupleTableSlot
*ExecHashJoinGetSavedTuple(HashJoinState
*hjstate
,
200 TupleTableSlot
*tupleSlot
);
201 static bool ExecHashJoinNewBatch(HashJoinState
*hjstate
);
202 static bool ExecParallelHashJoinNewBatch(HashJoinState
*hjstate
);
203 static void ExecParallelHashJoinPartitionOuter(HashJoinState
*hjstate
);
206 /* ----------------------------------------------------------------
209 * This function implements the Hybrid Hashjoin algorithm. It is marked
210 * with an always-inline attribute so that ExecHashJoin() and
211 * ExecParallelHashJoin() can inline it. Compilers that respect the
212 * attribute should create versions specialized for parallel == true and
213 * parallel == false with unnecessary branches removed.
215 * Note: the relation we build hash table on is the "inner"
216 * the other one is "outer".
217 * ----------------------------------------------------------------
219 static pg_attribute_always_inline TupleTableSlot
*
220 ExecHashJoinImpl(PlanState
*pstate
, bool parallel
)
222 HashJoinState
*node
= castNode(HashJoinState
, pstate
);
223 PlanState
*outerNode
;
226 ExprState
*otherqual
;
227 ExprContext
*econtext
;
228 HashJoinTable hashtable
;
229 TupleTableSlot
*outerTupleSlot
;
232 ParallelHashJoinState
*parallel_state
;
235 * get information from HashJoin node
237 joinqual
= node
->js
.joinqual
;
238 otherqual
= node
->js
.ps
.qual
;
239 hashNode
= (HashState
*) innerPlanState(node
);
240 outerNode
= outerPlanState(node
);
241 hashtable
= node
->hj_HashTable
;
242 econtext
= node
->js
.ps
.ps_ExprContext
;
243 parallel_state
= hashNode
->parallel_state
;
246 * Reset per-tuple memory context to free any expression evaluation
247 * storage allocated in the previous tuple cycle.
249 ResetExprContext(econtext
);
252 * run the hash join state machine
257 * It's possible to iterate this loop many times before returning a
258 * tuple, in some pathological cases such as needing to move much of
259 * the current batch to a later batch. So let's check for interrupts
262 CHECK_FOR_INTERRUPTS();
264 switch (node
->hj_JoinState
)
266 case HJ_BUILD_HASHTABLE
:
269 * First time through: build hash table for inner relation.
271 Assert(hashtable
== NULL
);
274 * If the outer relation is completely empty, and it's not
275 * right/right-anti/full join, we can quit without building
276 * the hash table. However, for an inner join it is only a
277 * win to check this when the outer relation's startup cost is
278 * less than the projected cost of building the hash table.
279 * Otherwise it's best to build the hash table first and see
280 * if the inner relation is empty. (When it's a left join, we
281 * should always make this check, since we aren't going to be
282 * able to skip the join on the strength of an empty inner
285 * If we are rescanning the join, we make use of information
286 * gained on the previous scan: don't bother to try the
287 * prefetch if the previous scan found the outer relation
288 * nonempty. This is not 100% reliable since with new
289 * parameters the outer relation might yield different
290 * results, but it's a good heuristic.
292 * The only way to make the check is to try to fetch a tuple
293 * from the outer plan node. If we succeed, we have to stash
294 * it away for later consumption by ExecHashJoinOuterGetTuple.
296 if (HJ_FILL_INNER(node
))
298 /* no chance to not build the hash table */
299 node
->hj_FirstOuterTupleSlot
= NULL
;
304 * The empty-outer optimization is not implemented for
305 * shared hash tables, because no one participant can
306 * determine that there are no outer tuples, and it's not
307 * yet clear that it's worth the synchronization overhead
308 * of reaching consensus to figure that out. So we have
309 * to build the hash table.
311 node
->hj_FirstOuterTupleSlot
= NULL
;
313 else if (HJ_FILL_OUTER(node
) ||
314 (outerNode
->plan
->startup_cost
< hashNode
->ps
.plan
->total_cost
&&
315 !node
->hj_OuterNotEmpty
))
317 node
->hj_FirstOuterTupleSlot
= ExecProcNode(outerNode
);
318 if (TupIsNull(node
->hj_FirstOuterTupleSlot
))
320 node
->hj_OuterNotEmpty
= false;
324 node
->hj_OuterNotEmpty
= true;
327 node
->hj_FirstOuterTupleSlot
= NULL
;
330 * Create the hash table. If using Parallel Hash, then
331 * whoever gets here first will create the hash table and any
332 * later arrivals will merely attach to it.
334 hashtable
= ExecHashTableCreate(hashNode
,
335 node
->hj_HashOperators
,
337 HJ_FILL_INNER(node
));
338 node
->hj_HashTable
= hashtable
;
341 * Execute the Hash node, to build the hash table. If using
342 * Parallel Hash, then we'll try to help hashing unless we
345 hashNode
->hashtable
= hashtable
;
346 (void) MultiExecProcNode((PlanState
*) hashNode
);
349 * If the inner relation is completely empty, and we're not
350 * doing a left outer join, we can quit without scanning the
353 if (hashtable
->totalTuples
== 0 && !HJ_FILL_OUTER(node
))
358 * Advance the build barrier to PHJ_BUILD_RUN before
359 * proceeding so we can negotiate resource cleanup.
361 Barrier
*build_barrier
= ¶llel_state
->build_barrier
;
363 while (BarrierPhase(build_barrier
) < PHJ_BUILD_RUN
)
364 BarrierArriveAndWait(build_barrier
, 0);
370 * need to remember whether nbatch has increased since we
371 * began scanning the outer relation
373 hashtable
->nbatch_outstart
= hashtable
->nbatch
;
376 * Reset OuterNotEmpty for scan. (It's OK if we fetched a
377 * tuple above, because ExecHashJoinOuterGetTuple will
378 * immediately set it again.)
380 node
->hj_OuterNotEmpty
= false;
384 Barrier
*build_barrier
;
386 build_barrier
= ¶llel_state
->build_barrier
;
387 Assert(BarrierPhase(build_barrier
) == PHJ_BUILD_HASH_OUTER
||
388 BarrierPhase(build_barrier
) == PHJ_BUILD_RUN
||
389 BarrierPhase(build_barrier
) == PHJ_BUILD_FREE
);
390 if (BarrierPhase(build_barrier
) == PHJ_BUILD_HASH_OUTER
)
393 * If multi-batch, we need to hash the outer relation
396 if (hashtable
->nbatch
> 1)
397 ExecParallelHashJoinPartitionOuter(node
);
398 BarrierArriveAndWait(build_barrier
,
399 WAIT_EVENT_HASH_BUILD_HASH_OUTER
);
401 else if (BarrierPhase(build_barrier
) == PHJ_BUILD_FREE
)
404 * If we attached so late that the job is finished and
405 * the batch state has been freed, we can return
411 /* Each backend should now select a batch to work on. */
412 Assert(BarrierPhase(build_barrier
) == PHJ_BUILD_RUN
);
413 hashtable
->curbatch
= -1;
414 node
->hj_JoinState
= HJ_NEED_NEW_BATCH
;
419 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
423 case HJ_NEED_NEW_OUTER
:
426 * We don't have an outer tuple, try to get the next one
430 ExecParallelHashJoinOuterGetTuple(outerNode
, node
,
434 ExecHashJoinOuterGetTuple(outerNode
, node
, &hashvalue
);
436 if (TupIsNull(outerTupleSlot
))
438 /* end of batch, or maybe whole join */
439 if (HJ_FILL_INNER(node
))
441 /* set up to scan for unmatched inner tuples */
445 * Only one process is currently allow to handle
446 * each batch's unmatched tuples, in a parallel
449 if (ExecParallelPrepHashTableForUnmatched(node
))
450 node
->hj_JoinState
= HJ_FILL_INNER_TUPLES
;
452 node
->hj_JoinState
= HJ_NEED_NEW_BATCH
;
456 ExecPrepHashTableForUnmatched(node
);
457 node
->hj_JoinState
= HJ_FILL_INNER_TUPLES
;
461 node
->hj_JoinState
= HJ_NEED_NEW_BATCH
;
465 econtext
->ecxt_outertuple
= outerTupleSlot
;
466 node
->hj_MatchedOuter
= false;
469 * Find the corresponding bucket for this tuple in the main
470 * hash table or skew hash table.
472 node
->hj_CurHashValue
= hashvalue
;
473 ExecHashGetBucketAndBatch(hashtable
, hashvalue
,
474 &node
->hj_CurBucketNo
, &batchno
);
475 node
->hj_CurSkewBucketNo
= ExecHashGetSkewBucket(hashtable
,
477 node
->hj_CurTuple
= NULL
;
480 * The tuple might not belong to the current batch (where
481 * "current batch" includes the skew buckets if any).
483 if (batchno
!= hashtable
->curbatch
&&
484 node
->hj_CurSkewBucketNo
== INVALID_SKEW_BUCKET_NO
)
487 MinimalTuple mintuple
= ExecFetchSlotMinimalTuple(outerTupleSlot
,
491 * Need to postpone this outer tuple to a later batch.
492 * Save it in the corresponding outer-batch file.
494 Assert(parallel_state
== NULL
);
495 Assert(batchno
> hashtable
->curbatch
);
496 ExecHashJoinSaveTuple(mintuple
, hashvalue
,
497 &hashtable
->outerBatchFile
[batchno
],
501 heap_free_minimal_tuple(mintuple
);
503 /* Loop around, staying in HJ_NEED_NEW_OUTER state */
507 /* OK, let's scan the bucket for matches */
508 node
->hj_JoinState
= HJ_SCAN_BUCKET
;
515 * Scan the selected hash bucket for matches to current outer
519 if (!ExecParallelScanHashBucket(node
, econtext
))
521 /* out of matches; check for possible outer-join fill */
522 node
->hj_JoinState
= HJ_FILL_OUTER_TUPLE
;
528 if (!ExecScanHashBucket(node
, econtext
))
530 /* out of matches; check for possible outer-join fill */
531 node
->hj_JoinState
= HJ_FILL_OUTER_TUPLE
;
537 * We've got a match, but still need to test non-hashed quals.
538 * ExecScanHashBucket already set up all the state needed to
541 * If we pass the qual, then save state for next call and have
542 * ExecProject form the projection, store it in the tuple
543 * table, and return the slot.
545 * Only the joinquals determine tuple match status, but all
546 * quals must pass to actually return the tuple.
548 if (joinqual
== NULL
|| ExecQual(joinqual
, econtext
))
550 node
->hj_MatchedOuter
= true;
554 * This is really only needed if HJ_FILL_INNER(node), but
555 * we'll avoid the branch and just set it always.
557 if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node
->hj_CurTuple
)))
558 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node
->hj_CurTuple
));
560 /* In an antijoin, we never return a matched tuple */
561 if (node
->js
.jointype
== JOIN_ANTI
)
563 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
568 * If we only need to consider the first matching inner
569 * tuple, then advance to next outer tuple after we've
570 * processed this one.
572 if (node
->js
.single_match
)
573 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
576 * In a right-antijoin, we never return a matched tuple.
577 * If it's not an inner_unique join, we need to stay on
578 * the current outer tuple to continue scanning the inner
581 if (node
->js
.jointype
== JOIN_RIGHT_ANTI
)
584 if (otherqual
== NULL
|| ExecQual(otherqual
, econtext
))
585 return ExecProject(node
->js
.ps
.ps_ProjInfo
);
587 InstrCountFiltered2(node
, 1);
590 InstrCountFiltered1(node
, 1);
593 case HJ_FILL_OUTER_TUPLE
:
596 * The current outer tuple has run out of matches, so check
597 * whether to emit a dummy outer-join tuple. Whether we emit
598 * one or not, the next state is NEED_NEW_OUTER.
600 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
602 if (!node
->hj_MatchedOuter
&&
606 * Generate a fake join tuple with nulls for the inner
607 * tuple, and return it if it passes the non-join quals.
609 econtext
->ecxt_innertuple
= node
->hj_NullInnerTupleSlot
;
611 if (otherqual
== NULL
|| ExecQual(otherqual
, econtext
))
612 return ExecProject(node
->js
.ps
.ps_ProjInfo
);
614 InstrCountFiltered2(node
, 1);
618 case HJ_FILL_INNER_TUPLES
:
621 * We have finished a batch, but we are doing
622 * right/right-anti/full join, so any unmatched inner tuples
623 * in the hashtable have to be emitted before we continue to
626 if (!(parallel
? ExecParallelScanHashTableForUnmatched(node
, econtext
)
627 : ExecScanHashTableForUnmatched(node
, econtext
)))
629 /* no more unmatched tuples */
630 node
->hj_JoinState
= HJ_NEED_NEW_BATCH
;
635 * Generate a fake join tuple with nulls for the outer tuple,
636 * and return it if it passes the non-join quals.
638 econtext
->ecxt_outertuple
= node
->hj_NullOuterTupleSlot
;
640 if (otherqual
== NULL
|| ExecQual(otherqual
, econtext
))
641 return ExecProject(node
->js
.ps
.ps_ProjInfo
);
643 InstrCountFiltered2(node
, 1);
646 case HJ_NEED_NEW_BATCH
:
649 * Try to advance to next batch. Done if there are no more.
653 if (!ExecParallelHashJoinNewBatch(node
))
654 return NULL
; /* end of parallel-aware join */
658 if (!ExecHashJoinNewBatch(node
))
659 return NULL
; /* end of parallel-oblivious join */
661 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
665 elog(ERROR
, "unrecognized hashjoin state: %d",
666 (int) node
->hj_JoinState
);
671 /* ----------------------------------------------------------------
674 * Parallel-oblivious version.
675 * ----------------------------------------------------------------
677 static TupleTableSlot
* /* return: a tuple or NULL */
678 ExecHashJoin(PlanState
*pstate
)
681 * On sufficiently smart compilers this should be inlined with the
682 * parallel-aware branches removed.
684 return ExecHashJoinImpl(pstate
, false);
687 /* ----------------------------------------------------------------
688 * ExecParallelHashJoin
690 * Parallel-aware version.
691 * ----------------------------------------------------------------
693 static TupleTableSlot
* /* return: a tuple or NULL */
694 ExecParallelHashJoin(PlanState
*pstate
)
697 * On sufficiently smart compilers this should be inlined with the
698 * parallel-oblivious branches removed.
700 return ExecHashJoinImpl(pstate
, true);
703 /* ----------------------------------------------------------------
706 * Init routine for HashJoin node.
707 * ----------------------------------------------------------------
710 ExecInitHashJoin(HashJoin
*node
, EState
*estate
, int eflags
)
712 HashJoinState
*hjstate
;
717 const TupleTableSlotOps
*ops
;
719 /* check for unsupported flags */
720 Assert(!(eflags
& (EXEC_FLAG_BACKWARD
| EXEC_FLAG_MARK
)));
723 * create state structure
725 hjstate
= makeNode(HashJoinState
);
726 hjstate
->js
.ps
.plan
= (Plan
*) node
;
727 hjstate
->js
.ps
.state
= estate
;
730 * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
731 * where this function may be replaced with a parallel version, if we
732 * managed to launch a parallel query.
734 hjstate
->js
.ps
.ExecProcNode
= ExecHashJoin
;
735 hjstate
->js
.jointype
= node
->join
.jointype
;
738 * Miscellaneous initialization
740 * create expression context for node
742 ExecAssignExprContext(estate
, &hjstate
->js
.ps
);
745 * initialize child nodes
747 * Note: we could suppress the REWIND flag for the inner input, which
748 * would amount to betting that the hash will be a single batch. Not
749 * clear if this would be a win or not.
751 outerNode
= outerPlan(node
);
752 hashNode
= (Hash
*) innerPlan(node
);
754 outerPlanState(hjstate
) = ExecInitNode(outerNode
, estate
, eflags
);
755 outerDesc
= ExecGetResultType(outerPlanState(hjstate
));
756 innerPlanState(hjstate
) = ExecInitNode((Plan
*) hashNode
, estate
, eflags
);
757 innerDesc
= ExecGetResultType(innerPlanState(hjstate
));
760 * Initialize result slot, type and projection.
762 ExecInitResultTupleSlotTL(&hjstate
->js
.ps
, &TTSOpsVirtual
);
763 ExecAssignProjectionInfo(&hjstate
->js
.ps
, NULL
);
766 * tuple table initialization
768 ops
= ExecGetResultSlotOps(outerPlanState(hjstate
), NULL
);
769 hjstate
->hj_OuterTupleSlot
= ExecInitExtraTupleSlot(estate
, outerDesc
,
773 * detect whether we need only consider the first matching inner tuple
775 hjstate
->js
.single_match
= (node
->join
.inner_unique
||
776 node
->join
.jointype
== JOIN_SEMI
);
778 /* set up null tuples for outer joins, if needed */
779 switch (node
->join
.jointype
)
786 hjstate
->hj_NullInnerTupleSlot
=
787 ExecInitNullTupleSlot(estate
, innerDesc
, &TTSOpsVirtual
);
790 case JOIN_RIGHT_ANTI
:
791 hjstate
->hj_NullOuterTupleSlot
=
792 ExecInitNullTupleSlot(estate
, outerDesc
, &TTSOpsVirtual
);
795 hjstate
->hj_NullOuterTupleSlot
=
796 ExecInitNullTupleSlot(estate
, outerDesc
, &TTSOpsVirtual
);
797 hjstate
->hj_NullInnerTupleSlot
=
798 ExecInitNullTupleSlot(estate
, innerDesc
, &TTSOpsVirtual
);
801 elog(ERROR
, "unrecognized join type: %d",
802 (int) node
->join
.jointype
);
806 * now for some voodoo. our temporary tuple slot is actually the result
807 * tuple slot of the Hash node (which is our inner plan). we can do this
808 * because Hash nodes don't return tuples via ExecProcNode() -- instead
809 * the hash join node uses ExecScanHashBucket() to get at the contents of
810 * the hash table. -cim 6/9/91
813 HashState
*hashstate
= (HashState
*) innerPlanState(hjstate
);
814 TupleTableSlot
*slot
= hashstate
->ps
.ps_ResultTupleSlot
;
816 hjstate
->hj_HashTupleSlot
= slot
;
820 * initialize child expressions
822 hjstate
->js
.ps
.qual
=
823 ExecInitQual(node
->join
.plan
.qual
, (PlanState
*) hjstate
);
824 hjstate
->js
.joinqual
=
825 ExecInitQual(node
->join
.joinqual
, (PlanState
*) hjstate
);
826 hjstate
->hashclauses
=
827 ExecInitQual(node
->hashclauses
, (PlanState
*) hjstate
);
830 * initialize hash-specific info
832 hjstate
->hj_HashTable
= NULL
;
833 hjstate
->hj_FirstOuterTupleSlot
= NULL
;
835 hjstate
->hj_CurHashValue
= 0;
836 hjstate
->hj_CurBucketNo
= 0;
837 hjstate
->hj_CurSkewBucketNo
= INVALID_SKEW_BUCKET_NO
;
838 hjstate
->hj_CurTuple
= NULL
;
840 hjstate
->hj_OuterHashKeys
= ExecInitExprList(node
->hashkeys
,
841 (PlanState
*) hjstate
);
842 hjstate
->hj_HashOperators
= node
->hashoperators
;
843 hjstate
->hj_Collations
= node
->hashcollations
;
845 hjstate
->hj_JoinState
= HJ_BUILD_HASHTABLE
;
846 hjstate
->hj_MatchedOuter
= false;
847 hjstate
->hj_OuterNotEmpty
= false;
852 /* ----------------------------------------------------------------
855 * clean up routine for HashJoin node
856 * ----------------------------------------------------------------
859 ExecEndHashJoin(HashJoinState
*node
)
864 if (node
->hj_HashTable
)
866 ExecHashTableDestroy(node
->hj_HashTable
);
867 node
->hj_HashTable
= NULL
;
873 ExecEndNode(outerPlanState(node
));
874 ExecEndNode(innerPlanState(node
));
878 * ExecHashJoinOuterGetTuple
880 * get the next outer tuple for a parallel oblivious hashjoin: either by
881 * executing the outer plan node in the first pass, or from the temp
882 * files for the hashjoin batches.
884 * Returns a null slot if no more outer tuples (within the current batch).
886 * On success, the tuple's hash value is stored at *hashvalue --- this is
887 * either originally computed, or re-read from the temp file.
889 static TupleTableSlot
*
890 ExecHashJoinOuterGetTuple(PlanState
*outerNode
,
891 HashJoinState
*hjstate
,
894 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
895 int curbatch
= hashtable
->curbatch
;
896 TupleTableSlot
*slot
;
898 if (curbatch
== 0) /* if it is the first pass */
901 * Check to see if first outer tuple was already fetched by
902 * ExecHashJoin() and not used yet.
904 slot
= hjstate
->hj_FirstOuterTupleSlot
;
905 if (!TupIsNull(slot
))
906 hjstate
->hj_FirstOuterTupleSlot
= NULL
;
908 slot
= ExecProcNode(outerNode
);
910 while (!TupIsNull(slot
))
913 * We have to compute the tuple's hash value.
915 ExprContext
*econtext
= hjstate
->js
.ps
.ps_ExprContext
;
917 econtext
->ecxt_outertuple
= slot
;
918 if (ExecHashGetHashValue(hashtable
, econtext
,
919 hjstate
->hj_OuterHashKeys
,
920 true, /* outer tuple */
921 HJ_FILL_OUTER(hjstate
),
924 /* remember outer relation is not empty for possible rescan */
925 hjstate
->hj_OuterNotEmpty
= true;
931 * That tuple couldn't match because of a NULL, so discard it and
932 * continue with the next one.
934 slot
= ExecProcNode(outerNode
);
937 else if (curbatch
< hashtable
->nbatch
)
939 BufFile
*file
= hashtable
->outerBatchFile
[curbatch
];
942 * In outer-join cases, we could get here even though the batch file
948 slot
= ExecHashJoinGetSavedTuple(hjstate
,
951 hjstate
->hj_OuterTupleSlot
);
952 if (!TupIsNull(slot
))
956 /* End of this batch */
961 * ExecHashJoinOuterGetTuple variant for the parallel case.
963 static TupleTableSlot
*
964 ExecParallelHashJoinOuterGetTuple(PlanState
*outerNode
,
965 HashJoinState
*hjstate
,
968 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
969 int curbatch
= hashtable
->curbatch
;
970 TupleTableSlot
*slot
;
973 * In the Parallel Hash case we only run the outer plan directly for
974 * single-batch hash joins. Otherwise we have to go to batch files, even
977 if (curbatch
== 0 && hashtable
->nbatch
== 1)
979 slot
= ExecProcNode(outerNode
);
981 while (!TupIsNull(slot
))
983 ExprContext
*econtext
= hjstate
->js
.ps
.ps_ExprContext
;
985 econtext
->ecxt_outertuple
= slot
;
986 if (ExecHashGetHashValue(hashtable
, econtext
,
987 hjstate
->hj_OuterHashKeys
,
988 true, /* outer tuple */
989 HJ_FILL_OUTER(hjstate
),
994 * That tuple couldn't match because of a NULL, so discard it and
995 * continue with the next one.
997 slot
= ExecProcNode(outerNode
);
1000 else if (curbatch
< hashtable
->nbatch
)
1004 tuple
= sts_parallel_scan_next(hashtable
->batches
[curbatch
].outer_tuples
,
1008 ExecForceStoreMinimalTuple(tuple
,
1009 hjstate
->hj_OuterTupleSlot
,
1011 slot
= hjstate
->hj_OuterTupleSlot
;
1015 ExecClearTuple(hjstate
->hj_OuterTupleSlot
);
1018 /* End of this batch */
1019 hashtable
->batches
[curbatch
].outer_eof
= true;
1025 * ExecHashJoinNewBatch
1026 * switch to a new hashjoin batch
1028 * Returns true if successful, false if there are no more batches.
1031 ExecHashJoinNewBatch(HashJoinState
*hjstate
)
1033 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
1037 TupleTableSlot
*slot
;
1040 nbatch
= hashtable
->nbatch
;
1041 curbatch
= hashtable
->curbatch
;
1046 * We no longer need the previous outer batch file; close it right
1047 * away to free disk space.
1049 if (hashtable
->outerBatchFile
[curbatch
])
1050 BufFileClose(hashtable
->outerBatchFile
[curbatch
]);
1051 hashtable
->outerBatchFile
[curbatch
] = NULL
;
1053 else /* we just finished the first batch */
1056 * Reset some of the skew optimization state variables, since we no
1057 * longer need to consider skew tuples after the first batch. The
1058 * memory context reset we are about to do will release the skew
1061 hashtable
->skewEnabled
= false;
1062 hashtable
->skewBucket
= NULL
;
1063 hashtable
->skewBucketNums
= NULL
;
1064 hashtable
->nSkewBuckets
= 0;
1065 hashtable
->spaceUsedSkew
= 0;
1069 * We can always skip over any batches that are completely empty on both
1070 * sides. We can sometimes skip over batches that are empty on only one
1071 * side, but there are exceptions:
1073 * 1. In a left/full outer join, we have to process outer batches even if
1074 * the inner batch is empty. Similarly, in a right/right-anti/full outer
1075 * join, we have to process inner batches even if the outer batch is
1078 * 2. If we have increased nbatch since the initial estimate, we have to
1079 * scan inner batches since they might contain tuples that need to be
1080 * reassigned to later inner batches.
1082 * 3. Similarly, if we have increased nbatch since starting the outer
1083 * scan, we have to rescan outer batches in case they contain tuples that
1084 * need to be reassigned.
1087 while (curbatch
< nbatch
&&
1088 (hashtable
->outerBatchFile
[curbatch
] == NULL
||
1089 hashtable
->innerBatchFile
[curbatch
] == NULL
))
1091 if (hashtable
->outerBatchFile
[curbatch
] &&
1092 HJ_FILL_OUTER(hjstate
))
1093 break; /* must process due to rule 1 */
1094 if (hashtable
->innerBatchFile
[curbatch
] &&
1095 HJ_FILL_INNER(hjstate
))
1096 break; /* must process due to rule 1 */
1097 if (hashtable
->innerBatchFile
[curbatch
] &&
1098 nbatch
!= hashtable
->nbatch_original
)
1099 break; /* must process due to rule 2 */
1100 if (hashtable
->outerBatchFile
[curbatch
] &&
1101 nbatch
!= hashtable
->nbatch_outstart
)
1102 break; /* must process due to rule 3 */
1103 /* We can ignore this batch. */
1104 /* Release associated temp files right away. */
1105 if (hashtable
->innerBatchFile
[curbatch
])
1106 BufFileClose(hashtable
->innerBatchFile
[curbatch
]);
1107 hashtable
->innerBatchFile
[curbatch
] = NULL
;
1108 if (hashtable
->outerBatchFile
[curbatch
])
1109 BufFileClose(hashtable
->outerBatchFile
[curbatch
]);
1110 hashtable
->outerBatchFile
[curbatch
] = NULL
;
1114 if (curbatch
>= nbatch
)
1115 return false; /* no more batches */
1117 hashtable
->curbatch
= curbatch
;
1120 * Reload the hash table with the new inner batch (which could be empty)
1122 ExecHashTableReset(hashtable
);
1124 innerFile
= hashtable
->innerBatchFile
[curbatch
];
1126 if (innerFile
!= NULL
)
1128 if (BufFileSeek(innerFile
, 0, 0, SEEK_SET
))
1130 (errcode_for_file_access(),
1131 errmsg("could not rewind hash-join temporary file")));
1133 while ((slot
= ExecHashJoinGetSavedTuple(hjstate
,
1136 hjstate
->hj_HashTupleSlot
)))
1139 * NOTE: some tuples may be sent to future batches. Also, it is
1140 * possible for hashtable->nbatch to be increased here!
1142 ExecHashTableInsert(hashtable
, slot
, hashvalue
);
1146 * after we build the hash table, the inner batch file is no longer
1149 BufFileClose(innerFile
);
1150 hashtable
->innerBatchFile
[curbatch
] = NULL
;
1154 * Rewind outer batch file (if present), so that we can start reading it.
1156 if (hashtable
->outerBatchFile
[curbatch
] != NULL
)
1158 if (BufFileSeek(hashtable
->outerBatchFile
[curbatch
], 0, 0, SEEK_SET
))
1160 (errcode_for_file_access(),
1161 errmsg("could not rewind hash-join temporary file")));
1168 * Choose a batch to work on, and attach to it. Returns true if successful,
1169 * false if there are no more batches.
1172 ExecParallelHashJoinNewBatch(HashJoinState
*hjstate
)
1174 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
1179 * If we were already attached to a batch, remember not to bother checking
1180 * it again, and detach from it (possibly freeing the hash table if we are
1183 if (hashtable
->curbatch
>= 0)
1185 hashtable
->batches
[hashtable
->curbatch
].done
= true;
1186 ExecHashTableDetachBatch(hashtable
);
1190 * Search for a batch that isn't done. We use an atomic counter to start
1191 * our search at a different batch in every participant when there are
1192 * more batches than participants.
1194 batchno
= start_batchno
=
1195 pg_atomic_fetch_add_u32(&hashtable
->parallel_state
->distributor
, 1) %
1201 TupleTableSlot
*slot
;
1203 if (!hashtable
->batches
[batchno
].done
)
1205 SharedTuplestoreAccessor
*inner_tuples
;
1206 Barrier
*batch_barrier
=
1207 &hashtable
->batches
[batchno
].shared
->batch_barrier
;
1209 switch (BarrierAttach(batch_barrier
))
1211 case PHJ_BATCH_ELECT
:
1213 /* One backend allocates the hash table. */
1214 if (BarrierArriveAndWait(batch_barrier
,
1215 WAIT_EVENT_HASH_BATCH_ELECT
))
1216 ExecParallelHashTableAlloc(hashtable
, batchno
);
1219 case PHJ_BATCH_ALLOCATE
:
1220 /* Wait for allocation to complete. */
1221 BarrierArriveAndWait(batch_barrier
,
1222 WAIT_EVENT_HASH_BATCH_ALLOCATE
);
1225 case PHJ_BATCH_LOAD
:
1226 /* Start (or join in) loading tuples. */
1227 ExecParallelHashTableSetCurrentBatch(hashtable
, batchno
);
1228 inner_tuples
= hashtable
->batches
[batchno
].inner_tuples
;
1229 sts_begin_parallel_scan(inner_tuples
);
1230 while ((tuple
= sts_parallel_scan_next(inner_tuples
,
1233 ExecForceStoreMinimalTuple(tuple
,
1234 hjstate
->hj_HashTupleSlot
,
1236 slot
= hjstate
->hj_HashTupleSlot
;
1237 ExecParallelHashTableInsertCurrentBatch(hashtable
, slot
,
1240 sts_end_parallel_scan(inner_tuples
);
1241 BarrierArriveAndWait(batch_barrier
,
1242 WAIT_EVENT_HASH_BATCH_LOAD
);
1245 case PHJ_BATCH_PROBE
:
1248 * This batch is ready to probe. Return control to
1249 * caller. We stay attached to batch_barrier so that the
1250 * hash table stays alive until everyone's finished
1251 * probing it, but no participant is allowed to wait at
1252 * this barrier again (or else a deadlock could occur).
1253 * All attached participants must eventually detach from
1254 * the barrier and one worker must advance the phase so
1255 * that the final phase is reached.
1257 ExecParallelHashTableSetCurrentBatch(hashtable
, batchno
);
1258 sts_begin_parallel_scan(hashtable
->batches
[batchno
].outer_tuples
);
1261 case PHJ_BATCH_SCAN
:
1264 * In principle, we could help scan for unmatched tuples,
1265 * since that phase is already underway (the thing we
1266 * can't do under current deadlock-avoidance rules is wait
1267 * for others to arrive at PHJ_BATCH_SCAN, because
1268 * PHJ_BATCH_PROBE emits tuples, but in this case we just
1269 * got here without waiting). That is not yet done. For
1270 * now, we just detach and go around again. We have to
1271 * use ExecHashTableDetachBatch() because there's a small
1272 * chance we'll be the last to detach, and then we're
1273 * responsible for freeing memory.
1275 ExecParallelHashTableSetCurrentBatch(hashtable
, batchno
);
1276 hashtable
->batches
[batchno
].done
= true;
1277 ExecHashTableDetachBatch(hashtable
);
1280 case PHJ_BATCH_FREE
:
1283 * Already done. Detach and go around again (if any
1286 BarrierDetach(batch_barrier
);
1287 hashtable
->batches
[batchno
].done
= true;
1288 hashtable
->curbatch
= -1;
1292 elog(ERROR
, "unexpected batch phase %d",
1293 BarrierPhase(batch_barrier
));
1296 batchno
= (batchno
+ 1) % hashtable
->nbatch
;
1297 } while (batchno
!= start_batchno
);
1303 * ExecHashJoinSaveTuple
1304 * save a tuple to a batch file.
1306 * The data recorded in the file for each tuple is its hash value,
1307 * then the tuple in MinimalTuple format.
1309 * fileptr points to a batch file in one of the hashtable arrays.
1311 * The batch files (and their buffers) are allocated in the spill context
1312 * created for the hashtable.
1315 ExecHashJoinSaveTuple(MinimalTuple tuple
, uint32 hashvalue
,
1316 BufFile
**fileptr
, HashJoinTable hashtable
)
1318 BufFile
*file
= *fileptr
;
1321 * The batch file is lazily created. If this is the first tuple written to
1322 * this batch, the batch file is created and its buffer is allocated in
1323 * the spillCxt context, NOT in the batchCxt.
1325 * During the build phase, buffered files are created for inner batches.
1326 * Each batch's buffered file is closed (and its buffer freed) after the
1327 * batch is loaded into memory during the outer side scan. Therefore, it
1328 * is necessary to allocate the batch file buffer in a memory context
1329 * which outlives the batch itself.
1331 * Also, we use spillCxt instead of hashCxt for a better accounting of the
1332 * spilling memory consumption.
1336 MemoryContext oldctx
= MemoryContextSwitchTo(hashtable
->spillCxt
);
1338 file
= BufFileCreateTemp(false);
1341 MemoryContextSwitchTo(oldctx
);
1344 BufFileWrite(file
, &hashvalue
, sizeof(uint32
));
1345 BufFileWrite(file
, tuple
, tuple
->t_len
);
1349 * ExecHashJoinGetSavedTuple
1350 * read the next tuple from a batch file. Return NULL if no more.
1352 * On success, *hashvalue is set to the tuple's hash value, and the tuple
1353 * itself is stored in the given slot.
1355 static TupleTableSlot
*
1356 ExecHashJoinGetSavedTuple(HashJoinState
*hjstate
,
1359 TupleTableSlot
*tupleSlot
)
1366 * We check for interrupts here because this is typically taken as an
1367 * alternative code path to an ExecProcNode() call, which would include
1370 CHECK_FOR_INTERRUPTS();
1373 * Since both the hash value and the MinimalTuple length word are uint32,
1374 * we can read them both in one BufFileRead() call without any type
1377 nread
= BufFileReadMaybeEOF(file
, header
, sizeof(header
), true);
1378 if (nread
== 0) /* end of file */
1380 ExecClearTuple(tupleSlot
);
1383 *hashvalue
= header
[0];
1384 tuple
= (MinimalTuple
) palloc(header
[1]);
1385 tuple
->t_len
= header
[1];
1386 BufFileReadExact(file
,
1387 (char *) tuple
+ sizeof(uint32
),
1388 header
[1] - sizeof(uint32
));
1389 ExecForceStoreMinimalTuple(tuple
, tupleSlot
, true);
1395 ExecReScanHashJoin(HashJoinState
*node
)
1397 PlanState
*outerPlan
= outerPlanState(node
);
1398 PlanState
*innerPlan
= innerPlanState(node
);
1401 * In a multi-batch join, we currently have to do rescans the hard way,
1402 * primarily because batch temp files may have already been released. But
1403 * if it's a single-batch join, and there is no parameter change for the
1404 * inner subnode, then we can just re-use the existing hash table without
1407 if (node
->hj_HashTable
!= NULL
)
1409 if (node
->hj_HashTable
->nbatch
== 1 &&
1410 innerPlan
->chgParam
== NULL
)
1413 * Okay to reuse the hash table; needn't rescan inner, either.
1415 * However, if it's a right/right-anti/full join, we'd better
1416 * reset the inner-tuple match flags contained in the table.
1418 if (HJ_FILL_INNER(node
))
1419 ExecHashTableResetMatchFlags(node
->hj_HashTable
);
1422 * Also, we need to reset our state about the emptiness of the
1423 * outer relation, so that the new scan of the outer will update
1424 * it correctly if it turns out to be empty this time. (There's no
1425 * harm in clearing it now because ExecHashJoin won't need the
1426 * info. In the other cases, where the hash table doesn't exist
1427 * or we are destroying it, we leave this state alone because
1428 * ExecHashJoin will need it the first time through.)
1430 node
->hj_OuterNotEmpty
= false;
1432 /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1433 node
->hj_JoinState
= HJ_NEED_NEW_OUTER
;
1437 /* must destroy and rebuild hash table */
1438 HashState
*hashNode
= castNode(HashState
, innerPlan
);
1440 Assert(hashNode
->hashtable
== node
->hj_HashTable
);
1441 /* accumulate stats from old hash table, if wanted */
1442 /* (this should match ExecShutdownHash) */
1443 if (hashNode
->ps
.instrument
&& !hashNode
->hinstrument
)
1444 hashNode
->hinstrument
= (HashInstrumentation
*)
1445 palloc0(sizeof(HashInstrumentation
));
1446 if (hashNode
->hinstrument
)
1447 ExecHashAccumInstrumentation(hashNode
->hinstrument
,
1448 hashNode
->hashtable
);
1449 /* for safety, be sure to clear child plan node's pointer too */
1450 hashNode
->hashtable
= NULL
;
1452 ExecHashTableDestroy(node
->hj_HashTable
);
1453 node
->hj_HashTable
= NULL
;
1454 node
->hj_JoinState
= HJ_BUILD_HASHTABLE
;
1457 * if chgParam of subnode is not null then plan will be re-scanned
1458 * by first ExecProcNode.
1460 if (innerPlan
->chgParam
== NULL
)
1461 ExecReScan(innerPlan
);
1465 /* Always reset intra-tuple state */
1466 node
->hj_CurHashValue
= 0;
1467 node
->hj_CurBucketNo
= 0;
1468 node
->hj_CurSkewBucketNo
= INVALID_SKEW_BUCKET_NO
;
1469 node
->hj_CurTuple
= NULL
;
1471 node
->hj_MatchedOuter
= false;
1472 node
->hj_FirstOuterTupleSlot
= NULL
;
1475 * if chgParam of subnode is not null then plan will be re-scanned by
1476 * first ExecProcNode.
1478 if (outerPlan
->chgParam
== NULL
)
1479 ExecReScan(outerPlan
);
1483 ExecShutdownHashJoin(HashJoinState
*node
)
1485 if (node
->hj_HashTable
)
1488 * Detach from shared state before DSM memory goes away. This makes
1489 * sure that we don't have any pointers into DSM memory by the time
1490 * ExecEndHashJoin runs.
1492 ExecHashTableDetachBatch(node
->hj_HashTable
);
1493 ExecHashTableDetach(node
->hj_HashTable
);
1498 ExecParallelHashJoinPartitionOuter(HashJoinState
*hjstate
)
1500 PlanState
*outerState
= outerPlanState(hjstate
);
1501 ExprContext
*econtext
= hjstate
->js
.ps
.ps_ExprContext
;
1502 HashJoinTable hashtable
= hjstate
->hj_HashTable
;
1503 TupleTableSlot
*slot
;
1507 Assert(hjstate
->hj_FirstOuterTupleSlot
== NULL
);
1509 /* Execute outer plan, writing all tuples to shared tuplestores. */
1512 slot
= ExecProcNode(outerState
);
1513 if (TupIsNull(slot
))
1515 econtext
->ecxt_outertuple
= slot
;
1516 if (ExecHashGetHashValue(hashtable
, econtext
,
1517 hjstate
->hj_OuterHashKeys
,
1518 true, /* outer tuple */
1519 HJ_FILL_OUTER(hjstate
),
1525 MinimalTuple mintup
= ExecFetchSlotMinimalTuple(slot
, &shouldFree
);
1527 ExecHashGetBucketAndBatch(hashtable
, hashvalue
, &bucketno
,
1529 sts_puttuple(hashtable
->batches
[batchno
].outer_tuples
,
1530 &hashvalue
, mintup
);
1533 heap_free_minimal_tuple(mintup
);
1535 CHECK_FOR_INTERRUPTS();
1538 /* Make sure all outer partitions are readable by any backend. */
1539 for (i
= 0; i
< hashtable
->nbatch
; ++i
)
1540 sts_end_write(hashtable
->batches
[i
].outer_tuples
);
1544 ExecHashJoinEstimate(HashJoinState
*state
, ParallelContext
*pcxt
)
1546 shm_toc_estimate_chunk(&pcxt
->estimator
, sizeof(ParallelHashJoinState
));
1547 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
1551 ExecHashJoinInitializeDSM(HashJoinState
*state
, ParallelContext
*pcxt
)
1553 int plan_node_id
= state
->js
.ps
.plan
->plan_node_id
;
1554 HashState
*hashNode
;
1555 ParallelHashJoinState
*pstate
;
1558 * Disable shared hash table mode if we failed to create a real DSM
1559 * segment, because that means that we don't have a DSA area to work with.
1561 if (pcxt
->seg
== NULL
)
1564 ExecSetExecProcNode(&state
->js
.ps
, ExecParallelHashJoin
);
1567 * Set up the state needed to coordinate access to the shared hash
1568 * table(s), using the plan node ID as the toc key.
1570 pstate
= shm_toc_allocate(pcxt
->toc
, sizeof(ParallelHashJoinState
));
1571 shm_toc_insert(pcxt
->toc
, plan_node_id
, pstate
);
1574 * Set up the shared hash join state with no batches initially.
1575 * ExecHashTableCreate() will prepare at least one later and set nbatch
1576 * and space_allowed.
1579 pstate
->space_allowed
= 0;
1580 pstate
->batches
= InvalidDsaPointer
;
1581 pstate
->old_batches
= InvalidDsaPointer
;
1582 pstate
->nbuckets
= 0;
1583 pstate
->growth
= PHJ_GROWTH_OK
;
1584 pstate
->chunk_work_queue
= InvalidDsaPointer
;
1585 pg_atomic_init_u32(&pstate
->distributor
, 0);
1586 pstate
->nparticipants
= pcxt
->nworkers
+ 1;
1587 pstate
->total_tuples
= 0;
1588 LWLockInitialize(&pstate
->lock
,
1589 LWTRANCHE_PARALLEL_HASH_JOIN
);
1590 BarrierInit(&pstate
->build_barrier
, 0);
1591 BarrierInit(&pstate
->grow_batches_barrier
, 0);
1592 BarrierInit(&pstate
->grow_buckets_barrier
, 0);
1594 /* Set up the space we'll use for shared temporary files. */
1595 SharedFileSetInit(&pstate
->fileset
, pcxt
->seg
);
1597 /* Initialize the shared state in the hash node. */
1598 hashNode
= (HashState
*) innerPlanState(state
);
1599 hashNode
->parallel_state
= pstate
;
1602 /* ----------------------------------------------------------------
1603 * ExecHashJoinReInitializeDSM
1605 * Reset shared state before beginning a fresh scan.
1606 * ----------------------------------------------------------------
1609 ExecHashJoinReInitializeDSM(HashJoinState
*state
, ParallelContext
*pcxt
)
1611 int plan_node_id
= state
->js
.ps
.plan
->plan_node_id
;
1612 ParallelHashJoinState
*pstate
;
1614 /* Nothing to do if we failed to create a DSM segment. */
1615 if (pcxt
->seg
== NULL
)
1618 pstate
= shm_toc_lookup(pcxt
->toc
, plan_node_id
, false);
1621 * It would be possible to reuse the shared hash table in single-batch
1622 * cases by resetting and then fast-forwarding build_barrier to
1623 * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1624 * currently shared hash tables are already freed by now (by the last
1625 * participant to detach from the batch). We could consider keeping it
1626 * around for single-batch joins. We'd also need to adjust
1627 * finalize_plan() so that it doesn't record a dummy dependency for
1628 * Parallel Hash nodes, preventing the rescan optimization. For now we
1632 /* Detach, freeing any remaining shared memory. */
1633 if (state
->hj_HashTable
!= NULL
)
1635 ExecHashTableDetachBatch(state
->hj_HashTable
);
1636 ExecHashTableDetach(state
->hj_HashTable
);
1639 /* Clear any shared batch files. */
1640 SharedFileSetDeleteAll(&pstate
->fileset
);
1642 /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1643 BarrierInit(&pstate
->build_barrier
, 0);
1647 ExecHashJoinInitializeWorker(HashJoinState
*state
,
1648 ParallelWorkerContext
*pwcxt
)
1650 HashState
*hashNode
;
1651 int plan_node_id
= state
->js
.ps
.plan
->plan_node_id
;
1652 ParallelHashJoinState
*pstate
=
1653 shm_toc_lookup(pwcxt
->toc
, plan_node_id
, false);
1655 /* Attach to the space for shared temporary files. */
1656 SharedFileSetAttach(&pstate
->fileset
, pwcxt
->seg
);
1658 /* Attach to the shared state in the hash node. */
1659 hashNode
= (HashState
*) innerPlanState(state
);
1660 hashNode
->parallel_state
= pstate
;
1662 ExecSetExecProcNode(&state
->js
.ps
, ExecParallelHashJoin
);