1 /*-------------------------------------------------------------------------
4 * Support routines for parallel vacuum execution.
6 * This file contains routines that are intended to support setting up, using,
7 * and tearing down a ParallelVacuumState.
9 * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 * with parallel worker processes. Individual indexes are processed by one
11 * vacuum process. ParallelVacuumState contains shared information as well as
12 * the memory space for storing dead items allocated in the DSA area. We
13 * launch parallel worker processes at the start of parallel index
14 * bulk-deletion and index cleanup and once all indexes are processed, the
15 * parallel worker processes exit. Each time we process indexes in parallel,
16 * the parallel context is re-initialized so that the same DSM can be used for
17 * multiple passes of index bulk-deletion and index cleanup.
19 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
20 * Portions Copyright (c) 1994, Regents of the University of California
23 * src/backend/commands/vacuumparallel.c
25 *-------------------------------------------------------------------------
29 #include "access/amapi.h"
30 #include "access/table.h"
31 #include "access/xact.h"
32 #include "commands/progress.h"
33 #include "commands/vacuum.h"
34 #include "executor/instrument.h"
35 #include "optimizer/paths.h"
37 #include "storage/bufmgr.h"
38 #include "tcop/tcopprot.h"
39 #include "utils/lsyscache.h"
40 #include "utils/rel.h"
43 * DSM keys for parallel vacuum. Unlike other parallel execution code, since
44 * we don't need to worry about DSM keys conflicting with plan_node_id we can
47 #define PARALLEL_VACUUM_KEY_SHARED 1
48 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
49 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
50 #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
51 #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
54 * Shared information among parallel workers. So this is allocated in the DSM
57 typedef struct PVShared
60 * Target table relid, log level (for messages about parallel workers
61 * launched during VACUUM VERBOSE) and query ID. These fields are not
62 * modified during the parallel vacuum.
69 * Fields for both index vacuum and cleanup.
71 * reltuples is the total number of input heap tuples. We set either old
72 * live tuples in the index vacuum case or the new live tuples in the
75 * estimated_count is true if reltuples is an estimated value. (Note that
76 * reltuples could be -1 in this case, indicating we have no idea.)
82 * In single process vacuum we could consume more memory during index
83 * vacuuming or cleanup apart from the memory for heap scanning. In
84 * parallel vacuum, since individual vacuum workers can consume memory
85 * equal to maintenance_work_mem, the new maintenance_work_mem for each
86 * worker is set such that the parallel operation doesn't consume more
87 * memory than single process vacuum.
89 int maintenance_work_mem_worker
;
92 * The number of buffers each worker's Buffer Access Strategy ring should
98 * Shared vacuum cost balance. During parallel vacuum,
99 * VacuumSharedCostBalance points to this value and it accumulates the
100 * balance of each parallel vacuum worker.
102 pg_atomic_uint32 cost_balance
;
105 * Number of active parallel workers. This is used for computing the
106 * minimum threshold of the vacuum cost balance before a worker sleeps for
109 pg_atomic_uint32 active_nworkers
;
111 /* Counter for vacuuming and cleanup */
112 pg_atomic_uint32 idx
;
114 /* DSA handle where the TidStore lives */
115 dsa_handle dead_items_dsa_handle
;
117 /* DSA pointer to the shared TidStore */
118 dsa_pointer dead_items_handle
;
120 /* Statistics of shared dead items */
121 VacDeadItemsInfo dead_items_info
;
124 /* Status used during parallel index vacuum or cleanup */
125 typedef enum PVIndVacStatus
127 PARALLEL_INDVAC_STATUS_INITIAL
= 0,
128 PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
,
129 PARALLEL_INDVAC_STATUS_NEED_CLEANUP
,
130 PARALLEL_INDVAC_STATUS_COMPLETED
,
134 * Struct for index vacuum statistics of an index that is used for parallel vacuum.
135 * This includes the status of parallel index vacuum as well as index statistics.
137 typedef struct PVIndStats
140 * The following two fields are set by leader process before executing
141 * parallel index vacuum or parallel index cleanup. These fields are not
142 * fixed for the entire VACUUM operation. They are only fixed for an
143 * individual parallel index vacuum and cleanup.
145 * parallel_workers_can_process is true if both leader and worker can
146 * process the index, otherwise only leader can process it.
148 PVIndVacStatus status
;
149 bool parallel_workers_can_process
;
152 * Individual worker or leader stores the result of index vacuum or
155 bool istat_updated
; /* are the stats updated? */
156 IndexBulkDeleteResult istat
;
160 * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
162 struct ParallelVacuumState
164 /* NULL for worker processes */
165 ParallelContext
*pcxt
;
167 /* Parent Heap Relation */
174 /* Shared information among parallel vacuum workers */
178 * Shared index statistics among parallel vacuum workers. The array
179 * element is allocated for every index, even those indexes where parallel
180 * index vacuuming is unsafe or not worthwhile (e.g.,
181 * will_parallel_vacuum[] is false). During parallel vacuum,
182 * IndexBulkDeleteResult of each index is kept in DSM and is copied into
183 * local memory at the end of parallel vacuum.
185 PVIndStats
*indstats
;
187 /* Shared dead items space among parallel vacuum workers */
188 TidStore
*dead_items
;
190 /* Points to buffer usage area in DSM */
191 BufferUsage
*buffer_usage
;
193 /* Points to WAL usage area in DSM */
197 * False if the index is totally unsuitable target for all parallel
198 * processing. For example, the index could be <
199 * min_parallel_index_scan_size cutoff.
201 bool *will_parallel_vacuum
;
204 * The number of indexes that support parallel index bulk-deletion and
205 * parallel index cleanup respectively.
207 int nindexes_parallel_bulkdel
;
208 int nindexes_parallel_cleanup
;
209 int nindexes_parallel_condcleanup
;
211 /* Buffer access strategy used by leader process */
212 BufferAccessStrategy bstrategy
;
215 * Error reporting state. The error callback is set only for workers
216 * processes during parallel index vacuum.
221 PVIndVacStatus status
;
224 static int parallel_vacuum_compute_workers(Relation
*indrels
, int nindexes
, int nrequested
,
225 bool *will_parallel_vacuum
);
226 static void parallel_vacuum_process_all_indexes(ParallelVacuumState
*pvs
, int num_index_scans
,
228 static void parallel_vacuum_process_safe_indexes(ParallelVacuumState
*pvs
);
229 static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState
*pvs
);
230 static void parallel_vacuum_process_one_index(ParallelVacuumState
*pvs
, Relation indrel
,
231 PVIndStats
*indstats
);
232 static bool parallel_vacuum_index_is_parallel_safe(Relation indrel
, int num_index_scans
,
234 static void parallel_vacuum_error_callback(void *arg
);
237 * Try to enter parallel mode and create a parallel context. Then initialize
238 * shared memory state.
240 * On success, return parallel vacuum state. Otherwise return NULL.
242 ParallelVacuumState
*
243 parallel_vacuum_init(Relation rel
, Relation
*indrels
, int nindexes
,
244 int nrequested_workers
, int vac_work_mem
,
245 int elevel
, BufferAccessStrategy bstrategy
)
247 ParallelVacuumState
*pvs
;
248 ParallelContext
*pcxt
;
250 TidStore
*dead_items
;
251 PVIndStats
*indstats
;
252 BufferUsage
*buffer_usage
;
254 bool *will_parallel_vacuum
;
255 Size est_indstats_len
;
257 int nindexes_mwm
= 0;
258 int parallel_workers
= 0;
262 * A parallel vacuum must be requested and there must be indexes on the
265 Assert(nrequested_workers
>= 0);
266 Assert(nindexes
> 0);
269 * Compute the number of parallel vacuum workers to launch
271 will_parallel_vacuum
= (bool *) palloc0(sizeof(bool) * nindexes
);
272 parallel_workers
= parallel_vacuum_compute_workers(indrels
, nindexes
,
274 will_parallel_vacuum
);
275 if (parallel_workers
<= 0)
277 /* Can't perform vacuum in parallel -- return NULL */
278 pfree(will_parallel_vacuum
);
282 pvs
= (ParallelVacuumState
*) palloc0(sizeof(ParallelVacuumState
));
283 pvs
->indrels
= indrels
;
284 pvs
->nindexes
= nindexes
;
285 pvs
->will_parallel_vacuum
= will_parallel_vacuum
;
286 pvs
->bstrategy
= bstrategy
;
290 pcxt
= CreateParallelContext("postgres", "parallel_vacuum_main",
292 Assert(pcxt
->nworkers
> 0);
295 /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
296 est_indstats_len
= mul_size(sizeof(PVIndStats
), nindexes
);
297 shm_toc_estimate_chunk(&pcxt
->estimator
, est_indstats_len
);
298 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
300 /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
301 est_shared_len
= sizeof(PVShared
);
302 shm_toc_estimate_chunk(&pcxt
->estimator
, est_shared_len
);
303 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
306 * Estimate space for BufferUsage and WalUsage --
307 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
309 * If there are no extensions loaded that care, we could skip this. We
310 * have no way of knowing whether anyone's looking at pgBufferUsage or
311 * pgWalUsage, so do it unconditionally.
313 shm_toc_estimate_chunk(&pcxt
->estimator
,
314 mul_size(sizeof(BufferUsage
), pcxt
->nworkers
));
315 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
316 shm_toc_estimate_chunk(&pcxt
->estimator
,
317 mul_size(sizeof(WalUsage
), pcxt
->nworkers
));
318 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
320 /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
321 if (debug_query_string
)
323 querylen
= strlen(debug_query_string
);
324 shm_toc_estimate_chunk(&pcxt
->estimator
, querylen
+ 1);
325 shm_toc_estimate_keys(&pcxt
->estimator
, 1);
328 querylen
= 0; /* keep compiler quiet */
330 InitializeParallelDSM(pcxt
);
332 /* Prepare index vacuum stats */
333 indstats
= (PVIndStats
*) shm_toc_allocate(pcxt
->toc
, est_indstats_len
);
334 MemSet(indstats
, 0, est_indstats_len
);
335 for (int i
= 0; i
< nindexes
; i
++)
337 Relation indrel
= indrels
[i
];
338 uint8 vacoptions
= indrel
->rd_indam
->amparallelvacuumoptions
;
341 * Cleanup option should be either disabled, always performing in
342 * parallel or conditionally performing in parallel.
344 Assert(((vacoptions
& VACUUM_OPTION_PARALLEL_CLEANUP
) == 0) ||
345 ((vacoptions
& VACUUM_OPTION_PARALLEL_COND_CLEANUP
) == 0));
346 Assert(vacoptions
<= VACUUM_OPTION_MAX_VALID_VALUE
);
348 if (!will_parallel_vacuum
[i
])
351 if (indrel
->rd_indam
->amusemaintenanceworkmem
)
355 * Remember the number of indexes that support parallel operation for
358 if ((vacoptions
& VACUUM_OPTION_PARALLEL_BULKDEL
) != 0)
359 pvs
->nindexes_parallel_bulkdel
++;
360 if ((vacoptions
& VACUUM_OPTION_PARALLEL_CLEANUP
) != 0)
361 pvs
->nindexes_parallel_cleanup
++;
362 if ((vacoptions
& VACUUM_OPTION_PARALLEL_COND_CLEANUP
) != 0)
363 pvs
->nindexes_parallel_condcleanup
++;
365 shm_toc_insert(pcxt
->toc
, PARALLEL_VACUUM_KEY_INDEX_STATS
, indstats
);
366 pvs
->indstats
= indstats
;
368 /* Prepare shared information */
369 shared
= (PVShared
*) shm_toc_allocate(pcxt
->toc
, est_shared_len
);
370 MemSet(shared
, 0, est_shared_len
);
371 shared
->relid
= RelationGetRelid(rel
);
372 shared
->elevel
= elevel
;
373 shared
->queryid
= pgstat_get_my_query_id();
374 shared
->maintenance_work_mem_worker
=
376 maintenance_work_mem
/ Min(parallel_workers
, nindexes_mwm
) :
377 maintenance_work_mem
;
378 shared
->dead_items_info
.max_bytes
= vac_work_mem
* 1024L;
380 /* Prepare DSA space for dead items */
381 dead_items
= TidStoreCreateShared(shared
->dead_items_info
.max_bytes
,
382 LWTRANCHE_PARALLEL_VACUUM_DSA
);
383 pvs
->dead_items
= dead_items
;
384 shared
->dead_items_handle
= TidStoreGetHandle(dead_items
);
385 shared
->dead_items_dsa_handle
= dsa_get_handle(TidStoreGetDSA(dead_items
));
387 /* Use the same buffer size for all workers */
388 shared
->ring_nbuffers
= GetAccessStrategyBufferCount(bstrategy
);
390 pg_atomic_init_u32(&(shared
->cost_balance
), 0);
391 pg_atomic_init_u32(&(shared
->active_nworkers
), 0);
392 pg_atomic_init_u32(&(shared
->idx
), 0);
394 shm_toc_insert(pcxt
->toc
, PARALLEL_VACUUM_KEY_SHARED
, shared
);
395 pvs
->shared
= shared
;
398 * Allocate space for each worker's BufferUsage and WalUsage; no need to
401 buffer_usage
= shm_toc_allocate(pcxt
->toc
,
402 mul_size(sizeof(BufferUsage
), pcxt
->nworkers
));
403 shm_toc_insert(pcxt
->toc
, PARALLEL_VACUUM_KEY_BUFFER_USAGE
, buffer_usage
);
404 pvs
->buffer_usage
= buffer_usage
;
405 wal_usage
= shm_toc_allocate(pcxt
->toc
,
406 mul_size(sizeof(WalUsage
), pcxt
->nworkers
));
407 shm_toc_insert(pcxt
->toc
, PARALLEL_VACUUM_KEY_WAL_USAGE
, wal_usage
);
408 pvs
->wal_usage
= wal_usage
;
410 /* Store query string for workers */
411 if (debug_query_string
)
415 sharedquery
= (char *) shm_toc_allocate(pcxt
->toc
, querylen
+ 1);
416 memcpy(sharedquery
, debug_query_string
, querylen
+ 1);
417 sharedquery
[querylen
] = '\0';
418 shm_toc_insert(pcxt
->toc
,
419 PARALLEL_VACUUM_KEY_QUERY_TEXT
, sharedquery
);
422 /* Success -- return parallel vacuum state */
427 * Destroy the parallel context, and end parallel mode.
429 * Since writes are not allowed during parallel mode, copy the
430 * updated index statistics from DSM into local memory and then later use that
431 * to update the index statistics. One might think that we can exit from
432 * parallel mode, update the index statistics and then destroy parallel
433 * context, but that won't be safe (see ExitParallelMode).
436 parallel_vacuum_end(ParallelVacuumState
*pvs
, IndexBulkDeleteResult
**istats
)
438 Assert(!IsParallelWorker());
440 /* Copy the updated statistics */
441 for (int i
= 0; i
< pvs
->nindexes
; i
++)
443 PVIndStats
*indstats
= &(pvs
->indstats
[i
]);
445 if (indstats
->istat_updated
)
447 istats
[i
] = (IndexBulkDeleteResult
*) palloc0(sizeof(IndexBulkDeleteResult
));
448 memcpy(istats
[i
], &indstats
->istat
, sizeof(IndexBulkDeleteResult
));
454 TidStoreDestroy(pvs
->dead_items
);
456 DestroyParallelContext(pvs
->pcxt
);
459 pfree(pvs
->will_parallel_vacuum
);
464 * Returns the dead items space and dead items information.
467 parallel_vacuum_get_dead_items(ParallelVacuumState
*pvs
, VacDeadItemsInfo
**dead_items_info_p
)
469 *dead_items_info_p
= &(pvs
->shared
->dead_items_info
);
470 return pvs
->dead_items
;
473 /* Forget all items in dead_items */
475 parallel_vacuum_reset_dead_items(ParallelVacuumState
*pvs
)
477 VacDeadItemsInfo
*dead_items_info
= &(pvs
->shared
->dead_items_info
);
480 * Free the current tidstore and return allocated DSA segments to the
481 * operating system. Then we recreate the tidstore with the same max_bytes
482 * limitation we just used.
484 TidStoreDestroy(pvs
->dead_items
);
485 pvs
->dead_items
= TidStoreCreateShared(dead_items_info
->max_bytes
,
486 LWTRANCHE_PARALLEL_VACUUM_DSA
);
488 /* Update the DSA pointer for dead_items to the new one */
489 pvs
->shared
->dead_items_dsa_handle
= dsa_get_handle(TidStoreGetDSA(pvs
->dead_items
));
490 pvs
->shared
->dead_items_handle
= TidStoreGetHandle(pvs
->dead_items
);
492 /* Reset the counter */
493 dead_items_info
->num_items
= 0;
497 * Do parallel index bulk-deletion with parallel workers.
500 parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState
*pvs
, long num_table_tuples
,
503 Assert(!IsParallelWorker());
506 * We can only provide an approximate value of num_heap_tuples, at least
509 pvs
->shared
->reltuples
= num_table_tuples
;
510 pvs
->shared
->estimated_count
= true;
512 parallel_vacuum_process_all_indexes(pvs
, num_index_scans
, true);
516 * Do parallel index cleanup with parallel workers.
519 parallel_vacuum_cleanup_all_indexes(ParallelVacuumState
*pvs
, long num_table_tuples
,
520 int num_index_scans
, bool estimated_count
)
522 Assert(!IsParallelWorker());
525 * We can provide a better estimate of total number of surviving tuples
526 * (we assume indexes are more interested in that than in the number of
527 * nominally live tuples).
529 pvs
->shared
->reltuples
= num_table_tuples
;
530 pvs
->shared
->estimated_count
= estimated_count
;
532 parallel_vacuum_process_all_indexes(pvs
, num_index_scans
, false);
536 * Compute the number of parallel worker processes to request. Both index
537 * vacuum and index cleanup can be executed with parallel workers.
538 * The index is eligible for parallel vacuum iff its size is greater than
539 * min_parallel_index_scan_size as invoking workers for very small indexes
540 * can hurt performance.
542 * nrequested is the number of parallel workers that user requested. If
543 * nrequested is 0, we compute the parallel degree based on nindexes, that is
544 * the number of indexes that support parallel vacuum. This function also
545 * sets will_parallel_vacuum to remember indexes that participate in parallel
549 parallel_vacuum_compute_workers(Relation
*indrels
, int nindexes
, int nrequested
,
550 bool *will_parallel_vacuum
)
552 int nindexes_parallel
= 0;
553 int nindexes_parallel_bulkdel
= 0;
554 int nindexes_parallel_cleanup
= 0;
555 int parallel_workers
;
558 * We don't allow performing parallel operation in standalone backend or
559 * when parallelism is disabled.
561 if (!IsUnderPostmaster
|| max_parallel_maintenance_workers
== 0)
565 * Compute the number of indexes that can participate in parallel vacuum.
567 for (int i
= 0; i
< nindexes
; i
++)
569 Relation indrel
= indrels
[i
];
570 uint8 vacoptions
= indrel
->rd_indam
->amparallelvacuumoptions
;
572 /* Skip index that is not a suitable target for parallel index vacuum */
573 if (vacoptions
== VACUUM_OPTION_NO_PARALLEL
||
574 RelationGetNumberOfBlocks(indrel
) < min_parallel_index_scan_size
)
577 will_parallel_vacuum
[i
] = true;
579 if ((vacoptions
& VACUUM_OPTION_PARALLEL_BULKDEL
) != 0)
580 nindexes_parallel_bulkdel
++;
581 if (((vacoptions
& VACUUM_OPTION_PARALLEL_CLEANUP
) != 0) ||
582 ((vacoptions
& VACUUM_OPTION_PARALLEL_COND_CLEANUP
) != 0))
583 nindexes_parallel_cleanup
++;
586 nindexes_parallel
= Max(nindexes_parallel_bulkdel
,
587 nindexes_parallel_cleanup
);
589 /* The leader process takes one index */
592 /* No index supports parallel vacuum */
593 if (nindexes_parallel
<= 0)
596 /* Compute the parallel degree */
597 parallel_workers
= (nrequested
> 0) ?
598 Min(nrequested
, nindexes_parallel
) : nindexes_parallel
;
600 /* Cap by max_parallel_maintenance_workers */
601 parallel_workers
= Min(parallel_workers
, max_parallel_maintenance_workers
);
603 return parallel_workers
;
607 * Perform index vacuum or index cleanup with parallel workers. This function
608 * must be used by the parallel vacuum leader process.
611 parallel_vacuum_process_all_indexes(ParallelVacuumState
*pvs
, int num_index_scans
,
615 PVIndVacStatus new_status
;
617 Assert(!IsParallelWorker());
621 new_status
= PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
;
623 /* Determine the number of parallel workers to launch */
624 nworkers
= pvs
->nindexes_parallel_bulkdel
;
628 new_status
= PARALLEL_INDVAC_STATUS_NEED_CLEANUP
;
630 /* Determine the number of parallel workers to launch */
631 nworkers
= pvs
->nindexes_parallel_cleanup
;
633 /* Add conditionally parallel-aware indexes if in the first time call */
634 if (num_index_scans
== 0)
635 nworkers
+= pvs
->nindexes_parallel_condcleanup
;
638 /* The leader process will participate */
642 * It is possible that parallel context is initialized with fewer workers
643 * than the number of indexes that need a separate worker in the current
644 * phase, so we need to consider it. See
645 * parallel_vacuum_compute_workers().
647 nworkers
= Min(nworkers
, pvs
->pcxt
->nworkers
);
650 * Set index vacuum status and mark whether parallel vacuum worker can
653 for (int i
= 0; i
< pvs
->nindexes
; i
++)
655 PVIndStats
*indstats
= &(pvs
->indstats
[i
]);
657 Assert(indstats
->status
== PARALLEL_INDVAC_STATUS_INITIAL
);
658 indstats
->status
= new_status
;
659 indstats
->parallel_workers_can_process
=
660 (pvs
->will_parallel_vacuum
[i
] &&
661 parallel_vacuum_index_is_parallel_safe(pvs
->indrels
[i
],
666 /* Reset the parallel index processing and progress counters */
667 pg_atomic_write_u32(&(pvs
->shared
->idx
), 0);
669 /* Setup the shared cost-based vacuum delay and launch workers */
672 /* Reinitialize parallel context to relaunch parallel workers */
673 if (num_index_scans
> 0)
674 ReinitializeParallelDSM(pvs
->pcxt
);
677 * Set up shared cost balance and the number of active workers for
678 * vacuum delay. We need to do this before launching workers as
679 * otherwise, they might not see the updated values for these
682 pg_atomic_write_u32(&(pvs
->shared
->cost_balance
), VacuumCostBalance
);
683 pg_atomic_write_u32(&(pvs
->shared
->active_nworkers
), 0);
686 * The number of workers can vary between bulkdelete and cleanup
689 ReinitializeParallelWorkers(pvs
->pcxt
, nworkers
);
691 LaunchParallelWorkers(pvs
->pcxt
);
693 if (pvs
->pcxt
->nworkers_launched
> 0)
696 * Reset the local cost values for leader backend as we have
697 * already accumulated the remaining balance of heap.
699 VacuumCostBalance
= 0;
700 VacuumCostBalanceLocal
= 0;
702 /* Enable shared cost balance for leader backend */
703 VacuumSharedCostBalance
= &(pvs
->shared
->cost_balance
);
704 VacuumActiveNWorkers
= &(pvs
->shared
->active_nworkers
);
708 ereport(pvs
->shared
->elevel
,
709 (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
710 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
711 pvs
->pcxt
->nworkers_launched
),
712 pvs
->pcxt
->nworkers_launched
, nworkers
)));
714 ereport(pvs
->shared
->elevel
,
715 (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
716 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
717 pvs
->pcxt
->nworkers_launched
),
718 pvs
->pcxt
->nworkers_launched
, nworkers
)));
721 /* Vacuum the indexes that can be processed by only leader process */
722 parallel_vacuum_process_unsafe_indexes(pvs
);
725 * Join as a parallel worker. The leader vacuums alone processes all
726 * parallel-safe indexes in the case where no workers are launched.
728 parallel_vacuum_process_safe_indexes(pvs
);
731 * Next, accumulate buffer and WAL usage. (This must wait for the workers
732 * to finish, or we might get incomplete data.)
736 /* Wait for all vacuum workers to finish */
737 WaitForParallelWorkersToFinish(pvs
->pcxt
);
739 for (int i
= 0; i
< pvs
->pcxt
->nworkers_launched
; i
++)
740 InstrAccumParallelQuery(&pvs
->buffer_usage
[i
], &pvs
->wal_usage
[i
]);
744 * Reset all index status back to initial (while checking that we have
745 * vacuumed all indexes).
747 for (int i
= 0; i
< pvs
->nindexes
; i
++)
749 PVIndStats
*indstats
= &(pvs
->indstats
[i
]);
751 if (indstats
->status
!= PARALLEL_INDVAC_STATUS_COMPLETED
)
752 elog(ERROR
, "parallel index vacuum on index \"%s\" is not completed",
753 RelationGetRelationName(pvs
->indrels
[i
]));
755 indstats
->status
= PARALLEL_INDVAC_STATUS_INITIAL
;
759 * Carry the shared balance value to heap scan and disable shared costing
761 if (VacuumSharedCostBalance
)
763 VacuumCostBalance
= pg_atomic_read_u32(VacuumSharedCostBalance
);
764 VacuumSharedCostBalance
= NULL
;
765 VacuumActiveNWorkers
= NULL
;
770 * Index vacuum/cleanup routine used by the leader process and parallel
771 * vacuum worker processes to vacuum the indexes in parallel.
774 parallel_vacuum_process_safe_indexes(ParallelVacuumState
*pvs
)
777 * Increment the active worker count if we are able to launch any worker.
779 if (VacuumActiveNWorkers
)
780 pg_atomic_add_fetch_u32(VacuumActiveNWorkers
, 1);
782 /* Loop until all indexes are vacuumed */
786 PVIndStats
*indstats
;
788 /* Get an index number to process */
789 idx
= pg_atomic_fetch_add_u32(&(pvs
->shared
->idx
), 1);
791 /* Done for all indexes? */
792 if (idx
>= pvs
->nindexes
)
795 indstats
= &(pvs
->indstats
[idx
]);
798 * Skip vacuuming index that is unsafe for workers or has an
799 * unsuitable target for parallel index vacuum (this is vacuumed in
800 * parallel_vacuum_process_unsafe_indexes() by the leader).
802 if (!indstats
->parallel_workers_can_process
)
805 /* Do vacuum or cleanup of the index */
806 parallel_vacuum_process_one_index(pvs
, pvs
->indrels
[idx
], indstats
);
810 * We have completed the index vacuum so decrement the active worker
813 if (VacuumActiveNWorkers
)
814 pg_atomic_sub_fetch_u32(VacuumActiveNWorkers
, 1);
818 * Perform parallel vacuuming of indexes in leader process.
820 * Handles index vacuuming (or index cleanup) for indexes that are not
821 * parallel safe. It's possible that this will vary for a given index, based
822 * on details like whether we're performing index cleanup right now.
824 * Also performs vacuuming of smaller indexes that fell under the size cutoff
825 * enforced by parallel_vacuum_compute_workers().
828 parallel_vacuum_process_unsafe_indexes(ParallelVacuumState
*pvs
)
830 Assert(!IsParallelWorker());
833 * Increment the active worker count if we are able to launch any worker.
835 if (VacuumActiveNWorkers
)
836 pg_atomic_add_fetch_u32(VacuumActiveNWorkers
, 1);
838 for (int i
= 0; i
< pvs
->nindexes
; i
++)
840 PVIndStats
*indstats
= &(pvs
->indstats
[i
]);
842 /* Skip, indexes that are safe for workers */
843 if (indstats
->parallel_workers_can_process
)
846 /* Do vacuum or cleanup of the index */
847 parallel_vacuum_process_one_index(pvs
, pvs
->indrels
[i
], indstats
);
851 * We have completed the index vacuum so decrement the active worker
854 if (VacuumActiveNWorkers
)
855 pg_atomic_sub_fetch_u32(VacuumActiveNWorkers
, 1);
859 * Vacuum or cleanup index either by leader process or by one of the worker
860 * process. After vacuuming the index this function copies the index
861 * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
865 parallel_vacuum_process_one_index(ParallelVacuumState
*pvs
, Relation indrel
,
866 PVIndStats
*indstats
)
868 IndexBulkDeleteResult
*istat
= NULL
;
869 IndexBulkDeleteResult
*istat_res
;
870 IndexVacuumInfo ivinfo
;
873 * Update the pointer to the corresponding bulk-deletion result if someone
874 * has already updated it
876 if (indstats
->istat_updated
)
877 istat
= &(indstats
->istat
);
879 ivinfo
.index
= indrel
;
880 ivinfo
.heaprel
= pvs
->heaprel
;
881 ivinfo
.analyze_only
= false;
882 ivinfo
.report_progress
= false;
883 ivinfo
.message_level
= DEBUG2
;
884 ivinfo
.estimated_count
= pvs
->shared
->estimated_count
;
885 ivinfo
.num_heap_tuples
= pvs
->shared
->reltuples
;
886 ivinfo
.strategy
= pvs
->bstrategy
;
888 /* Update error traceback information */
889 pvs
->indname
= pstrdup(RelationGetRelationName(indrel
));
890 pvs
->status
= indstats
->status
;
892 switch (indstats
->status
)
894 case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
:
895 istat_res
= vac_bulkdel_one_index(&ivinfo
, istat
, pvs
->dead_items
,
896 &pvs
->shared
->dead_items_info
);
898 case PARALLEL_INDVAC_STATUS_NEED_CLEANUP
:
899 istat_res
= vac_cleanup_one_index(&ivinfo
, istat
);
902 elog(ERROR
, "unexpected parallel vacuum index status %d for index \"%s\"",
904 RelationGetRelationName(indrel
));
908 * Copy the index bulk-deletion result returned from ambulkdelete and
909 * amvacuumcleanup to the DSM segment if it's the first cycle because they
910 * allocate locally and it's possible that an index will be vacuumed by a
911 * different vacuum process the next cycle. Copying the result normally
912 * happens only the first time an index is vacuumed. For any additional
913 * vacuum pass, we directly point to the result on the DSM segment and
914 * pass it to vacuum index APIs so that workers can update it directly.
916 * Since all vacuum workers write the bulk-deletion result at different
917 * slots we can write them without locking.
919 if (!indstats
->istat_updated
&& istat_res
!= NULL
)
921 memcpy(&(indstats
->istat
), istat_res
, sizeof(IndexBulkDeleteResult
));
922 indstats
->istat_updated
= true;
924 /* Free the locally-allocated bulk-deletion result */
929 * Update the status to completed. No need to lock here since each worker
930 * touches different indexes.
932 indstats
->status
= PARALLEL_INDVAC_STATUS_COMPLETED
;
934 /* Reset error traceback information */
935 pvs
->status
= PARALLEL_INDVAC_STATUS_COMPLETED
;
940 * Call the parallel variant of pgstat_progress_incr_param so workers can
941 * report progress of index vacuum to the leader.
943 pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED
, 1);
947 * Returns false, if the given index can't participate in the next execution of
948 * parallel index vacuum or parallel index cleanup.
951 parallel_vacuum_index_is_parallel_safe(Relation indrel
, int num_index_scans
,
956 vacoptions
= indrel
->rd_indam
->amparallelvacuumoptions
;
958 /* In parallel vacuum case, check if it supports parallel bulk-deletion */
960 return ((vacoptions
& VACUUM_OPTION_PARALLEL_BULKDEL
) != 0);
962 /* Not safe, if the index does not support parallel cleanup */
963 if (((vacoptions
& VACUUM_OPTION_PARALLEL_CLEANUP
) == 0) &&
964 ((vacoptions
& VACUUM_OPTION_PARALLEL_COND_CLEANUP
) == 0))
968 * Not safe, if the index supports parallel cleanup conditionally, but we
969 * have already processed the index (for bulkdelete). We do this to avoid
970 * the need to invoke workers when parallel index cleanup doesn't need to
971 * scan the index. See the comments for option
972 * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
973 * parallel cleanup conditionally.
975 if (num_index_scans
> 0 &&
976 ((vacoptions
& VACUUM_OPTION_PARALLEL_COND_CLEANUP
) != 0))
983 * Perform work within a launched parallel process.
985 * Since parallel vacuum workers perform only index vacuum or index cleanup,
986 * we don't need to report progress information.
989 parallel_vacuum_main(dsm_segment
*seg
, shm_toc
*toc
)
991 ParallelVacuumState pvs
;
994 PVIndStats
*indstats
;
996 TidStore
*dead_items
;
997 BufferUsage
*buffer_usage
;
1001 ErrorContextCallback errcallback
;
1004 * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1005 * don't support parallel vacuum for autovacuum as of now.
1007 Assert(MyProc
->statusFlags
== PROC_IN_VACUUM
);
1009 elog(DEBUG1
, "starting parallel vacuum worker");
1011 shared
= (PVShared
*) shm_toc_lookup(toc
, PARALLEL_VACUUM_KEY_SHARED
, false);
1013 /* Set debug_query_string for individual workers */
1014 sharedquery
= shm_toc_lookup(toc
, PARALLEL_VACUUM_KEY_QUERY_TEXT
, true);
1015 debug_query_string
= sharedquery
;
1016 pgstat_report_activity(STATE_RUNNING
, debug_query_string
);
1018 /* Track query ID */
1019 pgstat_report_query_id(shared
->queryid
, false);
1022 * Open table. The lock mode is the same as the leader process. It's
1023 * okay because the lock mode does not conflict among the parallel
1026 rel
= table_open(shared
->relid
, ShareUpdateExclusiveLock
);
1029 * Open all indexes. indrels are sorted in order by OID, which should be
1030 * matched to the leader's one.
1032 vac_open_indexes(rel
, RowExclusiveLock
, &nindexes
, &indrels
);
1033 Assert(nindexes
> 0);
1035 if (shared
->maintenance_work_mem_worker
> 0)
1036 maintenance_work_mem
= shared
->maintenance_work_mem_worker
;
1038 /* Set index statistics */
1039 indstats
= (PVIndStats
*) shm_toc_lookup(toc
,
1040 PARALLEL_VACUUM_KEY_INDEX_STATS
,
1043 /* Find dead_items in shared memory */
1044 dead_items
= TidStoreAttach(shared
->dead_items_dsa_handle
,
1045 shared
->dead_items_handle
);
1047 /* Set cost-based vacuum delay */
1048 VacuumUpdateCosts();
1049 VacuumCostBalance
= 0;
1050 VacuumCostBalanceLocal
= 0;
1051 VacuumSharedCostBalance
= &(shared
->cost_balance
);
1052 VacuumActiveNWorkers
= &(shared
->active_nworkers
);
1054 /* Set parallel vacuum state */
1055 pvs
.indrels
= indrels
;
1056 pvs
.nindexes
= nindexes
;
1057 pvs
.indstats
= indstats
;
1058 pvs
.shared
= shared
;
1059 pvs
.dead_items
= dead_items
;
1060 pvs
.relnamespace
= get_namespace_name(RelationGetNamespace(rel
));
1061 pvs
.relname
= pstrdup(RelationGetRelationName(rel
));
1064 /* These fields will be filled during index vacuum or cleanup */
1066 pvs
.status
= PARALLEL_INDVAC_STATUS_INITIAL
;
1068 /* Each parallel VACUUM worker gets its own access strategy. */
1069 pvs
.bstrategy
= GetAccessStrategyWithSize(BAS_VACUUM
,
1070 shared
->ring_nbuffers
* (BLCKSZ
/ 1024));
1072 /* Setup error traceback support for ereport() */
1073 errcallback
.callback
= parallel_vacuum_error_callback
;
1074 errcallback
.arg
= &pvs
;
1075 errcallback
.previous
= error_context_stack
;
1076 error_context_stack
= &errcallback
;
1078 /* Prepare to track buffer usage during parallel execution */
1079 InstrStartParallelQuery();
1081 /* Process indexes to perform vacuum/cleanup */
1082 parallel_vacuum_process_safe_indexes(&pvs
);
1084 /* Report buffer/WAL usage during parallel execution */
1085 buffer_usage
= shm_toc_lookup(toc
, PARALLEL_VACUUM_KEY_BUFFER_USAGE
, false);
1086 wal_usage
= shm_toc_lookup(toc
, PARALLEL_VACUUM_KEY_WAL_USAGE
, false);
1087 InstrEndParallelQuery(&buffer_usage
[ParallelWorkerNumber
],
1088 &wal_usage
[ParallelWorkerNumber
]);
1090 TidStoreDetach(dead_items
);
1092 /* Pop the error context stack */
1093 error_context_stack
= errcallback
.previous
;
1095 vac_close_indexes(nindexes
, indrels
, RowExclusiveLock
);
1096 table_close(rel
, ShareUpdateExclusiveLock
);
1097 FreeAccessStrategy(pvs
.bstrategy
);
1101 * Error context callback for errors occurring during parallel index vacuum.
1102 * The error context messages should match the messages set in the lazy vacuum
1103 * error context. If you change this function, change vacuum_error_callback()
1107 parallel_vacuum_error_callback(void *arg
)
1109 ParallelVacuumState
*errinfo
= arg
;
1111 switch (errinfo
->status
)
1113 case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE
:
1114 errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1116 errinfo
->relnamespace
,
1119 case PARALLEL_INDVAC_STATUS_NEED_CLEANUP
:
1120 errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1122 errinfo
->relnamespace
,
1125 case PARALLEL_INDVAC_STATUS_INITIAL
:
1126 case PARALLEL_INDVAC_STATUS_COMPLETED
: