1 /*-------------------------------------------------------------------------
4 * internal structures for hash joins
7 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/include/executor/hashjoin.h
12 *-------------------------------------------------------------------------
17 #include "nodes/execnodes.h"
18 #include "port/atomics.h"
19 #include "storage/barrier.h"
20 #include "storage/buffile.h"
21 #include "storage/lwlock.h"
23 /* ----------------------------------------------------------------
24 * hash-join hash table structures
26 * Each active hashjoin has a HashJoinTable structure, which is
27 * palloc'd in the executor's per-query context. Other storage needed for
28 * each hashjoin is kept in child contexts, three for each hashjoin:
29 * - HashTableContext (hashCxt): the parent hash table storage context
30 * - HashSpillContext (spillCxt): storage for temp files buffers
31 * - HashBatchContext (batchCxt): storage for a batch in serial hash join
33 * The hashtable contexts are made children of the per-query context, ensuring
34 * that they will be discarded at end of statement even if the join is
35 * aborted early by an error. (Likewise, any temporary files we make will
36 * be cleaned up by the virtual file manager in event of an error.)
38 * Storage that should live through the entire join is allocated from the
39 * "hashCxt" (mainly the hashtable's metadata). Also, the "hashCxt" context is
40 * the parent of "spillCxt" and "batchCxt". It makes it easy and fast to
41 * release the storage when we don't need it anymore.
43 * Data associated with temp files is allocated in the "spillCxt" context
44 * which lives for the duration of the entire join as batch files'
45 * creation and usage may span batch execution. These files are
46 * explicitly destroyed by calling BufFileClose() when the code is done
47 * with them. The aim of this context is to help accounting for the
48 * memory allocated for temp files and their buffers.
50 * Finally, data used only during a single batch's execution is allocated
51 * in the "batchCxt". By resetting the batchCxt at the end of each batch,
52 * we free all the per-batch storage reliably and without tedium.
54 * During first scan of inner relation, we get its tuples from executor.
55 * If nbatch > 1 then tuples that don't belong in first batch get saved
56 * into inner-batch temp files. The same statements apply for the
57 * first scan of the outer relation, except we write tuples to outer-batch
58 * temp files. After finishing the first scan, we do the following for
59 * each remaining batch:
60 * 1. Read tuples from inner batch file, load into hash buckets.
61 * 2. Read tuples from outer batch file, match to hash buckets and output.
63 * It is possible to increase nbatch on the fly if the in-memory hash table
64 * gets too big. The hash-value-to-batch computation is arranged so that this
65 * can only cause a tuple to go into a later batch than previously thought,
66 * never into an earlier batch. When we increase nbatch, we rescan the hash
67 * table and dump out any tuples that are now of a later batch to the correct
68 * inner batch file. Subsequently, while reading either inner or outer batch
69 * files, we might find tuples that no longer belong to the current batch;
70 * if so, we just dump them out to the correct batch file.
71 * ----------------------------------------------------------------
74 /* these are in nodes/execnodes.h: */
75 /* typedef struct HashJoinTupleData *HashJoinTuple; */
76 /* typedef struct HashJoinTableData *HashJoinTable; */
78 typedef struct HashJoinTupleData
80 /* link to next tuple in same bucket */
83 struct HashJoinTupleData
*unshared
;
86 uint32 hashvalue
; /* tuple's hash code */
87 /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
90 #define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData))
91 #define HJTUPLE_MINTUPLE(hjtup) \
92 ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
95 * If the outer relation's distribution is sufficiently nonuniform, we attempt
96 * to optimize the join by treating the hash values corresponding to the outer
97 * relation's MCVs specially. Inner relation tuples matching these hash
98 * values go into the "skew" hashtable instead of the main hashtable, and
99 * outer relation tuples with these hash values are matched against that
100 * table instead of the main one. Thus, tuples with these hash values are
101 * effectively handled as part of the first batch and will never go to disk.
102 * The skew hashtable is limited to SKEW_HASH_MEM_PERCENT of the total memory
103 * allowed for the join; while building the hashtables, we decrease the number
104 * of MCVs being specially treated if needed to stay under this limit.
106 * Note: you might wonder why we look at the outer relation stats for this,
107 * rather than the inner. One reason is that the outer relation is typically
108 * bigger, so we get more I/O savings by optimizing for its most common values.
109 * Also, for similarly-sized relations, the planner prefers to put the more
110 * uniformly distributed relation on the inside, so we're more likely to find
111 * interesting skew in the outer relation.
113 typedef struct HashSkewBucket
115 uint32 hashvalue
; /* common hash value */
116 HashJoinTuple tuples
; /* linked list of inner-relation tuples */
119 #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket))
120 #define INVALID_SKEW_BUCKET_NO (-1)
121 #define SKEW_HASH_MEM_PERCENT 2
122 #define SKEW_MIN_OUTER_FRACTION 0.01
125 * To reduce palloc overhead, the HashJoinTuples for the current batch are
126 * packed in 32kB buffers instead of pallocing each tuple individually.
128 typedef struct HashMemoryChunkData
130 int ntuples
; /* number of tuples stored in this chunk */
131 size_t maxlen
; /* size of the chunk's tuple buffer */
132 size_t used
; /* number of buffer bytes already used */
134 /* pointer to the next chunk (linked list) */
137 struct HashMemoryChunkData
*unshared
;
142 * The chunk's tuple buffer starts after the HashMemoryChunkData struct,
143 * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that
144 * that offset is not included in "maxlen" or "used".
146 } HashMemoryChunkData
;
148 typedef struct HashMemoryChunkData
*HashMemoryChunk
;
150 #define HASH_CHUNK_SIZE (32 * 1024L)
151 #define HASH_CHUNK_HEADER_SIZE MAXALIGN(sizeof(HashMemoryChunkData))
152 #define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE)
153 /* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */
154 #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
157 * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
158 * object in shared memory to coordinate access to it. Since they are
159 * followed by variable-sized objects, they are arranged in contiguous memory
160 * but not accessed directly as an array.
162 typedef struct ParallelHashJoinBatch
164 dsa_pointer buckets
; /* array of hash table buckets */
165 Barrier batch_barrier
; /* synchronization for joining this batch */
167 dsa_pointer chunks
; /* chunks of tuples loaded */
168 size_t size
; /* size of buckets + chunks in memory */
169 size_t estimated_size
; /* size of buckets + chunks while writing */
170 size_t ntuples
; /* number of tuples loaded */
171 size_t old_ntuples
; /* number of tuples before repartitioning */
172 bool space_exhausted
;
173 bool skip_unmatched
; /* whether to abandon unmatched scan */
176 * Variable-sized SharedTuplestore objects follow this struct in memory.
177 * See the accessor macros below.
179 } ParallelHashJoinBatch
;
181 /* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
182 #define ParallelHashJoinBatchInner(batch) \
183 ((SharedTuplestore *) \
184 ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
186 /* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
187 #define ParallelHashJoinBatchOuter(batch, nparticipants) \
188 ((SharedTuplestore *) \
189 ((char *) ParallelHashJoinBatchInner(batch) + \
190 MAXALIGN(sts_estimate(nparticipants))))
192 /* Total size of a ParallelHashJoinBatch and tuplestores. */
193 #define EstimateParallelHashJoinBatch(hashtable) \
194 (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
195 MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
197 /* Accessor for the nth ParallelHashJoinBatch given the base. */
198 #define NthParallelHashJoinBatch(base, n) \
199 ((ParallelHashJoinBatch *) \
201 EstimateParallelHashJoinBatch(hashtable) * (n)))
204 * Each backend requires a small amount of per-batch state to interact with
205 * each ParallelHashJoinBatch.
207 typedef struct ParallelHashJoinBatchAccessor
209 ParallelHashJoinBatch
*shared
; /* pointer to shared state */
211 /* Per-backend partial counters to reduce contention. */
212 size_t preallocated
; /* pre-allocated space for this backend */
213 size_t ntuples
; /* number of tuples */
214 size_t size
; /* size of partition in memory */
215 size_t estimated_size
; /* size of partition on disk */
216 size_t old_ntuples
; /* how many tuples before repartitioning? */
217 bool at_least_one_chunk
; /* has this backend allocated a chunk? */
218 bool outer_eof
; /* has this process hit end of batch? */
219 bool done
; /* flag to remember that a batch is done */
220 SharedTuplestoreAccessor
*inner_tuples
;
221 SharedTuplestoreAccessor
*outer_tuples
;
222 } ParallelHashJoinBatchAccessor
;
225 * While hashing the inner relation, any participant might determine that it's
226 * time to increase the number of buckets to reduce the load factor or batches
227 * to reduce the memory size. This is indicated by setting the growth flag to
230 typedef enum ParallelHashGrowth
232 /* The current dimensions are sufficient. */
234 /* The load factor is too high, so we need to add buckets. */
235 PHJ_GROWTH_NEED_MORE_BUCKETS
,
236 /* The memory budget would be exhausted, so we need to repartition. */
237 PHJ_GROWTH_NEED_MORE_BATCHES
,
238 /* Repartitioning didn't help last time, so don't try to do that again. */
240 } ParallelHashGrowth
;
243 * The shared state used to coordinate a Parallel Hash Join. This is stored
244 * in the DSM segment.
246 typedef struct ParallelHashJoinState
248 dsa_pointer batches
; /* array of ParallelHashJoinBatch */
249 dsa_pointer old_batches
; /* previous generation during repartition */
250 int nbatch
; /* number of batches now */
251 int old_nbatch
; /* previous number of batches */
252 int nbuckets
; /* number of buckets */
253 ParallelHashGrowth growth
; /* control batch/bucket growth */
254 dsa_pointer chunk_work_queue
; /* chunk work queue */
256 size_t space_allowed
;
257 size_t total_tuples
; /* total number of inner tuples */
258 LWLock lock
; /* lock protecting the above */
260 Barrier build_barrier
; /* synchronization for the build phases */
261 Barrier grow_batches_barrier
;
262 Barrier grow_buckets_barrier
;
263 pg_atomic_uint32 distributor
; /* counter for load balancing */
265 SharedFileSet fileset
; /* space for shared temporary files */
266 } ParallelHashJoinState
;
268 /* The phases for building batches, used by build_barrier. */
269 #define PHJ_BUILD_ELECT 0
270 #define PHJ_BUILD_ALLOCATE 1
271 #define PHJ_BUILD_HASH_INNER 2
272 #define PHJ_BUILD_HASH_OUTER 3
273 #define PHJ_BUILD_RUN 4
274 #define PHJ_BUILD_FREE 5
276 /* The phases for probing each batch, used by for batch_barrier. */
277 #define PHJ_BATCH_ELECT 0
278 #define PHJ_BATCH_ALLOCATE 1
279 #define PHJ_BATCH_LOAD 2
280 #define PHJ_BATCH_PROBE 3
281 #define PHJ_BATCH_SCAN 4
282 #define PHJ_BATCH_FREE 5
284 /* The phases of batch growth while hashing, for grow_batches_barrier. */
285 #define PHJ_GROW_BATCHES_ELECT 0
286 #define PHJ_GROW_BATCHES_REALLOCATE 1
287 #define PHJ_GROW_BATCHES_REPARTITION 2
288 #define PHJ_GROW_BATCHES_DECIDE 3
289 #define PHJ_GROW_BATCHES_FINISH 4
290 #define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
292 /* The phases of bucket growth while hashing, for grow_buckets_barrier. */
293 #define PHJ_GROW_BUCKETS_ELECT 0
294 #define PHJ_GROW_BUCKETS_REALLOCATE 1
295 #define PHJ_GROW_BUCKETS_REINSERT 2
296 #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
298 typedef struct HashJoinTableData
300 int nbuckets
; /* # buckets in the in-memory hash table */
301 int log2_nbuckets
; /* its log2 (nbuckets must be a power of 2) */
303 int nbuckets_original
; /* # buckets when starting the first hash */
304 int nbuckets_optimal
; /* optimal # buckets (per batch) */
305 int log2_nbuckets_optimal
; /* log2(nbuckets_optimal) */
307 /* buckets[i] is head of list of tuples in i'th in-memory bucket */
310 /* unshared array is per-batch storage, as are all the tuples */
311 struct HashJoinTupleData
**unshared
;
312 /* shared array is per-query DSA area, as are all the tuples */
313 dsa_pointer_atomic
*shared
;
316 bool skewEnabled
; /* are we using skew optimization? */
317 HashSkewBucket
**skewBucket
; /* hashtable of skew buckets */
318 int skewBucketLen
; /* size of skewBucket array (a power of 2!) */
319 int nSkewBuckets
; /* number of active skew buckets */
320 int *skewBucketNums
; /* array indexes of active skew buckets */
322 int nbatch
; /* number of batches */
323 int curbatch
; /* current batch #; 0 during 1st pass */
325 int nbatch_original
; /* nbatch when we started inner scan */
326 int nbatch_outstart
; /* nbatch when we started outer scan */
328 bool growEnabled
; /* flag to shut off nbatch increases */
330 double totalTuples
; /* # tuples obtained from inner plan */
331 double partialTuples
; /* # tuples obtained from inner plan by me */
332 double skewTuples
; /* # tuples inserted into skew tuples */
335 * These arrays are allocated for the life of the hash join, but only if
336 * nbatch > 1. A file is opened only when we first write a tuple into it
337 * (otherwise its pointer remains NULL). Note that the zero'th array
338 * elements never get used, since we will process rather than dump out any
339 * tuples of batch zero.
341 BufFile
**innerBatchFile
; /* buffered virtual temp file per batch */
342 BufFile
**outerBatchFile
; /* buffered virtual temp file per batch */
344 Size spaceUsed
; /* memory space currently used by tuples */
345 Size spaceAllowed
; /* upper limit for space used */
346 Size spacePeak
; /* peak space used */
347 Size spaceUsedSkew
; /* skew hash table's current space usage */
348 Size spaceAllowedSkew
; /* upper limit for skew hashtable */
350 MemoryContext hashCxt
; /* context for whole-hash-join storage */
351 MemoryContext batchCxt
; /* context for this-batch-only storage */
352 MemoryContext spillCxt
; /* context for spilling to temp files */
354 /* used for dense allocation of tuples (into linked chunks) */
355 HashMemoryChunk chunks
; /* one list for the whole batch */
357 /* Shared and private state for Parallel Hash. */
358 HashMemoryChunk current_chunk
; /* this backend's current chunk */
359 dsa_area
*area
; /* DSA area to allocate memory from */
360 ParallelHashJoinState
*parallel_state
;
361 ParallelHashJoinBatchAccessor
*batches
;
362 dsa_pointer current_chunk_shared
;
365 #endif /* HASHJOIN_H */