1 /*-------------------------------------------------------------------------
5 * Parallel support for pg_dump and pg_restore
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/bin/pg_dump/parallel.c
13 *-------------------------------------------------------------------------
17 * Parallel operation works like this:
19 * The original, leader process calls ParallelBackupStart(), which forks off
20 * the desired number of worker processes, which each enter WaitForCommands().
22 * The leader process dispatches an individual work item to one of the worker
23 * processes in DispatchJobForTocEntry(). We send a command string such as
24 * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 * The worker process receives and decodes the command and passes it to the
26 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 * which are routines of the current archive format. That routine performs
28 * the required action (dump or restore) and returns an integer status code.
29 * This is passed back to the leader where we pass it to the
30 * ParallelCompletionPtr callback function that was passed to
31 * DispatchJobForTocEntry(). The callback function does state updating
32 * for the leader control logic in pg_backup_archiver.c.
34 * In principle additional archive-format-specific information might be needed
35 * in commands or worker status responses, but so far that hasn't proved
36 * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 * data structures. Remember that we have forked off the workers only after
38 * we have read in the catalog. That's why our worker processes can also
39 * access the catalog information. (In the Windows case, the workers are
40 * threads in the same process. To avoid problems, they work with cloned
41 * copies of the Archive data structure; see RunWorker().)
43 * In the leader process, the workerStatus field for each worker has one of
44 * the following values:
45 * WRKR_NOT_STARTED: we've not yet forked this worker
46 * WRKR_IDLE: it's waiting for a command
47 * WRKR_WORKING: it's working on a command
48 * WRKR_TERMINATED: process ended
49 * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 * state, and must be NULL in other states.
53 #include "postgres_fe.h"
56 #include <sys/select.h>
63 #include "fe_utils/string_utils.h"
65 #include "pg_backup_utils.h"
67 #include "port/pg_bswap.h"
70 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
74 #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
76 /* Worker process statuses */
85 #define WORKER_IS_RUNNING(workerStatus) \
86 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
89 * Private per-parallel-worker state (typedef for this is in parallel.h).
91 * Much of this is valid only in the leader process (or, on Windows, should
92 * be touched only by the leader thread). But the AH field should be touched
93 * only by workers. The pipe descriptors are valid everywhere.
97 T_WorkerStatus workerStatus
; /* see enum above */
99 /* These fields are valid if workerStatus == WRKR_WORKING: */
100 ParallelCompletionPtr callback
; /* function to call on completion */
101 void *callback_data
; /* passthrough data for it */
103 ArchiveHandle
*AH
; /* Archive data worker is using */
105 int pipeRead
; /* leader's end of the pipes */
107 int pipeRevRead
; /* child's end of the pipes */
110 /* Child process/thread identity info: */
113 unsigned int threadId
;
122 * Structure to hold info passed by _beginthreadex() to the function it calls
123 * via its single allowed argument.
127 ArchiveHandle
*AH
; /* leader database connection */
128 ParallelSlot
*slot
; /* this worker's parallel slot */
131 /* Windows implementation of pipe access */
132 static int pgpipe(int handles
[2]);
133 #define piperead(a,b,c) recv(a,b,c,0)
134 #define pipewrite(a,b,c) send(a,b,c,0)
138 /* Non-Windows implementation of pipe access */
139 #define pgpipe(a) pipe(a)
140 #define piperead(a,b,c) read(a,b,c)
141 #define pipewrite(a,b,c) write(a,b,c)
146 * State info for archive_close_connection() shutdown callback.
148 typedef struct ShutdownInformation
150 ParallelState
*pstate
;
152 } ShutdownInformation
;
154 static ShutdownInformation shutdown_info
;
157 * State info for signal handling.
158 * We assume signal_info initializes to zeroes.
160 * On Unix, myAH is the leader DB connection in the leader process, and the
161 * worker's own connection in worker processes. On Windows, we have only one
162 * instance of signal_info, so myAH is the leader connection and the worker
163 * connections must be dug out of pstate->parallelSlot[].
165 typedef struct DumpSignalInformation
167 ArchiveHandle
*myAH
; /* database connection to issue cancel for */
168 ParallelState
*pstate
; /* parallel state, if any */
169 bool handler_set
; /* signal handler set up in this process? */
171 bool am_worker
; /* am I a worker process? */
173 } DumpSignalInformation
;
175 static volatile DumpSignalInformation signal_info
;
178 static CRITICAL_SECTION signal_info_lock
;
182 * Write a simple string to stderr --- must be safe in a signal handler.
183 * We ignore the write() result since there's not much we could do about it.
184 * Certain compilers make that harder than it ought to be.
186 #define write_stderr(str) \
188 const char *str_ = (str); \
190 rc_ = write(fileno(stderr), str_, strlen(str_)); \
196 /* file-scope variables */
197 static DWORD tls_index
;
199 /* globally visible variables (needed by exit_nicely) */
200 bool parallel_init_done
= false;
204 /* Local function prototypes */
205 static ParallelSlot
*GetMyPSlot(ParallelState
*pstate
);
206 static void archive_close_connection(int code
, void *arg
);
207 static void ShutdownWorkersHard(ParallelState
*pstate
);
208 static void WaitForTerminatingWorkers(ParallelState
*pstate
);
209 static void set_cancel_handler(void);
210 static void set_cancel_pstate(ParallelState
*pstate
);
211 static void set_cancel_slot_archive(ParallelSlot
*slot
, ArchiveHandle
*AH
);
212 static void RunWorker(ArchiveHandle
*AH
, ParallelSlot
*slot
);
213 static int GetIdleWorker(ParallelState
*pstate
);
214 static bool HasEveryWorkerTerminated(ParallelState
*pstate
);
215 static void lockTableForWorker(ArchiveHandle
*AH
, TocEntry
*te
);
216 static void WaitForCommands(ArchiveHandle
*AH
, int pipefd
[2]);
217 static bool ListenToWorkers(ArchiveHandle
*AH
, ParallelState
*pstate
,
219 static char *getMessageFromLeader(int pipefd
[2]);
220 static void sendMessageToLeader(int pipefd
[2], const char *str
);
221 static int select_loop(int maxFd
, fd_set
*workerset
);
222 static char *getMessageFromWorker(ParallelState
*pstate
,
223 bool do_wait
, int *worker
);
224 static void sendMessageToWorker(ParallelState
*pstate
,
225 int worker
, const char *str
);
226 static char *readMessageFromPipe(int fd
);
228 #define messageStartsWith(msg, prefix) \
229 (strncmp(msg, prefix, strlen(prefix)) == 0)
233 * Initialize parallel dump support --- should be called early in process
234 * startup. (Currently, this is called whether or not we intend parallel
238 init_parallel_dump_utils(void)
241 if (!parallel_init_done
)
246 /* Prepare for threaded operation */
247 tls_index
= TlsAlloc();
248 mainThreadId
= GetCurrentThreadId();
250 /* Initialize socket access */
251 err
= WSAStartup(MAKEWORD(2, 2), &wsaData
);
253 pg_fatal("%s() failed: error code %d", "WSAStartup", err
);
255 parallel_init_done
= true;
261 * Find the ParallelSlot for the current worker process or thread.
263 * Returns NULL if no matching slot is found (this implies we're the leader).
265 static ParallelSlot
*
266 GetMyPSlot(ParallelState
*pstate
)
270 for (i
= 0; i
< pstate
->numWorkers
; i
++)
273 if (pstate
->parallelSlot
[i
].threadId
== GetCurrentThreadId())
275 if (pstate
->parallelSlot
[i
].pid
== getpid())
277 return &(pstate
->parallelSlot
[i
]);
284 * A thread-local version of getLocalPQExpBuffer().
286 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
287 * thread, which is much better than one per fmtId/fmtQualifiedId call.
291 getThreadLocalPQExpBuffer(void)
294 * The Tls code goes awry if we use a static var, so we provide for both
295 * static and auto, and omit any use of the static var when using Tls. We
296 * rely on TlsGetValue() to return 0 if the value is not yet set.
298 static PQExpBuffer s_id_return
= NULL
;
299 PQExpBuffer id_return
;
301 if (parallel_init_done
)
302 id_return
= (PQExpBuffer
) TlsGetValue(tls_index
);
304 id_return
= s_id_return
;
306 if (id_return
) /* first time through? */
308 /* same buffer, just wipe contents */
309 resetPQExpBuffer(id_return
);
314 id_return
= createPQExpBuffer();
315 if (parallel_init_done
)
316 TlsSetValue(tls_index
, id_return
);
318 s_id_return
= id_return
;
326 * pg_dump and pg_restore call this to register the cleanup handler
327 * as soon as they've created the ArchiveHandle.
330 on_exit_close_archive(Archive
*AHX
)
332 shutdown_info
.AHX
= AHX
;
333 on_exit_nicely(archive_close_connection
, &shutdown_info
);
337 * on_exit_nicely handler for shutting down database connections and
338 * worker processes cleanly.
341 archive_close_connection(int code
, void *arg
)
343 ShutdownInformation
*si
= (ShutdownInformation
*) arg
;
347 /* In parallel mode, must figure out who we are */
348 ParallelSlot
*slot
= GetMyPSlot(si
->pstate
);
353 * We're the leader. Forcibly shut down workers, then close our
354 * own database connection, if any.
356 ShutdownWorkersHard(si
->pstate
);
359 DisconnectDatabase(si
->AHX
);
364 * We're a worker. Shut down our own DB connection if any. On
365 * Windows, we also have to close our communication sockets, to
366 * emulate what will happen on Unix when the worker process exits.
367 * (Without this, if this is a premature exit, the leader would
368 * fail to detect it because there would be no EOF condition on
369 * the other end of the pipe.)
372 DisconnectDatabase(&(slot
->AH
->public));
375 closesocket(slot
->pipeRevRead
);
376 closesocket(slot
->pipeRevWrite
);
382 /* Non-parallel operation: just kill the leader DB connection */
384 DisconnectDatabase(si
->AHX
);
389 * Forcibly shut down any remaining workers, waiting for them to finish.
391 * Note that we don't expect to come here during normal exit (the workers
392 * should be long gone, and the ParallelState too). We're only here in a
393 * pg_fatal() situation, so intervening to cancel active commands is
397 ShutdownWorkersHard(ParallelState
*pstate
)
402 * Close our write end of the sockets so that any workers waiting for
403 * commands know they can exit. (Note: some of the pipeWrite fields might
404 * still be zero, if we failed to initialize all the workers. Hence, just
405 * ignore errors here.)
407 for (i
= 0; i
< pstate
->numWorkers
; i
++)
408 closesocket(pstate
->parallelSlot
[i
].pipeWrite
);
411 * Force early termination of any commands currently in progress.
414 /* On non-Windows, send SIGTERM to each worker process. */
415 for (i
= 0; i
< pstate
->numWorkers
; i
++)
417 pid_t pid
= pstate
->parallelSlot
[i
].pid
;
425 * On Windows, send query cancels directly to the workers' backends. Use
426 * a critical section to ensure worker threads don't change state.
428 EnterCriticalSection(&signal_info_lock
);
429 for (i
= 0; i
< pstate
->numWorkers
; i
++)
431 ArchiveHandle
*AH
= pstate
->parallelSlot
[i
].AH
;
434 if (AH
!= NULL
&& AH
->connCancel
!= NULL
)
435 (void) PQcancel(AH
->connCancel
, errbuf
, sizeof(errbuf
));
437 LeaveCriticalSection(&signal_info_lock
);
440 /* Now wait for them to terminate. */
441 WaitForTerminatingWorkers(pstate
);
445 * Wait for all workers to terminate.
448 WaitForTerminatingWorkers(ParallelState
*pstate
)
450 while (!HasEveryWorkerTerminated(pstate
))
452 ParallelSlot
*slot
= NULL
;
456 /* On non-Windows, use wait() to wait for next worker to end */
458 pid_t pid
= wait(&status
);
460 /* Find dead worker's slot, and clear the PID field */
461 for (j
= 0; j
< pstate
->numWorkers
; j
++)
463 slot
= &(pstate
->parallelSlot
[j
]);
464 if (slot
->pid
== pid
)
471 /* On Windows, we must use WaitForMultipleObjects() */
472 HANDLE
*lpHandles
= pg_malloc(sizeof(HANDLE
) * pstate
->numWorkers
);
477 for (j
= 0; j
< pstate
->numWorkers
; j
++)
479 if (WORKER_IS_RUNNING(pstate
->parallelSlot
[j
].workerStatus
))
481 lpHandles
[nrun
] = (HANDLE
) pstate
->parallelSlot
[j
].hThread
;
485 ret
= WaitForMultipleObjects(nrun
, lpHandles
, false, INFINITE
);
486 Assert(ret
!= WAIT_FAILED
);
487 hThread
= (uintptr_t) lpHandles
[ret
- WAIT_OBJECT_0
];
490 /* Find dead worker's slot, and clear the hThread field */
491 for (j
= 0; j
< pstate
->numWorkers
; j
++)
493 slot
= &(pstate
->parallelSlot
[j
]);
494 if (slot
->hThread
== hThread
)
496 /* For cleanliness, close handles for dead threads */
497 CloseHandle((HANDLE
) slot
->hThread
);
498 slot
->hThread
= (uintptr_t) INVALID_HANDLE_VALUE
;
504 /* On all platforms, update workerStatus and te[] as well */
505 Assert(j
< pstate
->numWorkers
);
506 slot
->workerStatus
= WRKR_TERMINATED
;
507 pstate
->te
[j
] = NULL
;
513 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
515 * This doesn't quite belong in this module, but it needs access to the
516 * ParallelState data, so there's not really a better place either.
518 * When we get a cancel interrupt, we could just die, but in pg_restore that
519 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
520 * for a long time. Instead, we try to send a cancel request and then die.
521 * pg_dump probably doesn't really need this, but we might as well use it
522 * there too. Note that sending the cancel directly from the signal handler
523 * is safe because PQcancel() is written to make it so.
525 * In parallel operation on Unix, each process is responsible for canceling
526 * its own connection (this must be so because nobody else has access to it).
527 * Furthermore, the leader process should attempt to forward its signal to
528 * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
529 * needed because typing control-C at the console would deliver SIGINT to
530 * every member of the terminal process group --- but in other scenarios it
531 * might be that only the leader gets signaled.
533 * On Windows, the cancel handler runs in a separate thread, because that's
534 * how SetConsoleCtrlHandler works. We make it stop worker threads, send
535 * cancels on all active connections, and then return FALSE, which will allow
536 * the process to die. For safety's sake, we use a critical section to
537 * protect the PGcancel structures against being changed while the signal
544 * Signal handler (Unix only)
547 sigTermHandler(SIGNAL_ARGS
)
553 * Some platforms allow delivery of new signals to interrupt an active
554 * signal handler. That could muck up our attempt to send PQcancel, so
555 * disable the signals that set_cancel_handler enabled.
557 pqsignal(SIGINT
, SIG_IGN
);
558 pqsignal(SIGTERM
, SIG_IGN
);
559 pqsignal(SIGQUIT
, SIG_IGN
);
562 * If we're in the leader, forward signal to all workers. (It seems best
563 * to do this before PQcancel; killing the leader transaction will result
564 * in invalid-snapshot errors from active workers, which maybe we can
565 * quiet by killing workers first.) Ignore any errors.
567 if (signal_info
.pstate
!= NULL
)
569 for (i
= 0; i
< signal_info
.pstate
->numWorkers
; i
++)
571 pid_t pid
= signal_info
.pstate
->parallelSlot
[i
].pid
;
579 * Send QueryCancel if we have a connection to send to. Ignore errors,
580 * there's not much we can do about them anyway.
582 if (signal_info
.myAH
!= NULL
&& signal_info
.myAH
->connCancel
!= NULL
)
583 (void) PQcancel(signal_info
.myAH
->connCancel
, errbuf
, sizeof(errbuf
));
586 * Report we're quitting, using nothing more complicated than write(2).
587 * When in parallel operation, only the leader process should do this.
589 if (!signal_info
.am_worker
)
593 write_stderr(progname
);
596 write_stderr("terminated by user\n");
600 * And die, using _exit() not exit() because the latter will invoke atexit
601 * handlers that can fail if we interrupted related code.
607 * Enable cancel interrupt handler, if not already done.
610 set_cancel_handler(void)
613 * When forking, signal_info.handler_set will propagate into the new
614 * process, but that's fine because the signal handler state does too.
616 if (!signal_info
.handler_set
)
618 signal_info
.handler_set
= true;
620 pqsignal(SIGINT
, sigTermHandler
);
621 pqsignal(SIGTERM
, sigTermHandler
);
622 pqsignal(SIGQUIT
, sigTermHandler
);
629 * Console interrupt handler --- runs in a newly-started thread.
631 * After stopping other threads and sending cancel requests on all open
632 * connections, we return FALSE which will allow the default ExitProcess()
633 * action to be taken.
636 consoleHandler(DWORD dwCtrlType
)
641 if (dwCtrlType
== CTRL_C_EVENT
||
642 dwCtrlType
== CTRL_BREAK_EVENT
)
644 /* Critical section prevents changing data we look at here */
645 EnterCriticalSection(&signal_info_lock
);
648 * If in parallel mode, stop worker threads and send QueryCancel to
649 * their connected backends. The main point of stopping the worker
650 * threads is to keep them from reporting the query cancels as errors,
651 * which would clutter the user's screen. We needn't stop the leader
652 * thread since it won't be doing much anyway. Do this before
653 * canceling the main transaction, else we might get invalid-snapshot
654 * errors reported before we can stop the workers. Ignore errors,
655 * there's not much we can do about them anyway.
657 if (signal_info
.pstate
!= NULL
)
659 for (i
= 0; i
< signal_info
.pstate
->numWorkers
; i
++)
661 ParallelSlot
*slot
= &(signal_info
.pstate
->parallelSlot
[i
]);
662 ArchiveHandle
*AH
= slot
->AH
;
663 HANDLE hThread
= (HANDLE
) slot
->hThread
;
666 * Using TerminateThread here may leave some resources leaked,
667 * but it doesn't matter since we're about to end the whole
670 if (hThread
!= INVALID_HANDLE_VALUE
)
671 TerminateThread(hThread
, 0);
673 if (AH
!= NULL
&& AH
->connCancel
!= NULL
)
674 (void) PQcancel(AH
->connCancel
, errbuf
, sizeof(errbuf
));
679 * Send QueryCancel to leader connection, if enabled. Ignore errors,
680 * there's not much we can do about them anyway.
682 if (signal_info
.myAH
!= NULL
&& signal_info
.myAH
->connCancel
!= NULL
)
683 (void) PQcancel(signal_info
.myAH
->connCancel
,
684 errbuf
, sizeof(errbuf
));
686 LeaveCriticalSection(&signal_info_lock
);
689 * Report we're quitting, using nothing more complicated than
690 * write(2). (We might be able to get away with using pg_log_*()
691 * here, but since we terminated other threads uncleanly above, it
692 * seems better to assume as little as possible.)
696 write_stderr(progname
);
699 write_stderr("terminated by user\n");
702 /* Always return FALSE to allow signal handling to continue */
707 * Enable cancel interrupt handler, if not already done.
710 set_cancel_handler(void)
712 if (!signal_info
.handler_set
)
714 signal_info
.handler_set
= true;
716 InitializeCriticalSection(&signal_info_lock
);
718 SetConsoleCtrlHandler(consoleHandler
, TRUE
);
726 * set_archive_cancel_info
728 * Fill AH->connCancel with cancellation info for the specified database
729 * connection; or clear it if conn is NULL.
732 set_archive_cancel_info(ArchiveHandle
*AH
, PGconn
*conn
)
734 PGcancel
*oldConnCancel
;
737 * Activate the interrupt handler if we didn't yet in this process. On
738 * Windows, this also initializes signal_info_lock; therefore it's
739 * important that this happen at least once before we fork off any
742 set_cancel_handler();
745 * On Unix, we assume that storing a pointer value is atomic with respect
746 * to any possible signal interrupt. On Windows, use a critical section.
750 EnterCriticalSection(&signal_info_lock
);
753 /* Free the old one if we have one */
754 oldConnCancel
= AH
->connCancel
;
755 /* be sure interrupt handler doesn't use pointer while freeing */
756 AH
->connCancel
= NULL
;
758 if (oldConnCancel
!= NULL
)
759 PQfreeCancel(oldConnCancel
);
761 /* Set the new one if specified */
763 AH
->connCancel
= PQgetCancel(conn
);
766 * On Unix, there's only ever one active ArchiveHandle per process, so we
767 * can just set signal_info.myAH unconditionally. On Windows, do that
768 * only in the main thread; worker threads have to make sure their
769 * ArchiveHandle appears in the pstate data, which is dealt with in
773 signal_info
.myAH
= AH
;
775 if (mainThreadId
== GetCurrentThreadId())
776 signal_info
.myAH
= AH
;
780 LeaveCriticalSection(&signal_info_lock
);
787 * Set signal_info.pstate to point to the specified ParallelState, if any.
788 * We need this mainly to have an interlock against Windows signal thread.
791 set_cancel_pstate(ParallelState
*pstate
)
794 EnterCriticalSection(&signal_info_lock
);
797 signal_info
.pstate
= pstate
;
800 LeaveCriticalSection(&signal_info_lock
);
805 * set_cancel_slot_archive
807 * Set ParallelSlot's AH field to point to the specified archive, if any.
808 * We need this mainly to have an interlock against Windows signal thread.
811 set_cancel_slot_archive(ParallelSlot
*slot
, ArchiveHandle
*AH
)
814 EnterCriticalSection(&signal_info_lock
);
820 LeaveCriticalSection(&signal_info_lock
);
826 * This function is called by both Unix and Windows variants to set up
827 * and run a worker process. Caller should exit the process (or thread)
831 RunWorker(ArchiveHandle
*AH
, ParallelSlot
*slot
)
835 /* fetch child ends of pipes */
836 pipefd
[PIPE_READ
] = slot
->pipeRevRead
;
837 pipefd
[PIPE_WRITE
] = slot
->pipeRevWrite
;
840 * Clone the archive so that we have our own state to work with, and in
841 * particular our own database connection.
843 * We clone on Unix as well as Windows, even though technically we don't
844 * need to because fork() gives us a copy in our own address space
845 * already. But CloneArchive resets the state information and also clones
846 * the database connection which both seem kinda helpful.
848 AH
= CloneArchive(AH
);
850 /* Remember cloned archive where signal handler can find it */
851 set_cancel_slot_archive(slot
, AH
);
854 * Call the setup worker function that's defined in the ArchiveHandle.
856 (AH
->SetupWorkerPtr
) ((Archive
*) AH
);
859 * Execute commands until done.
861 WaitForCommands(AH
, pipefd
);
864 * Disconnect from database and clean up.
866 set_cancel_slot_archive(slot
, NULL
);
867 DisconnectDatabase(&(AH
->public));
872 * Thread base function for Windows
875 static unsigned __stdcall
876 init_spawned_worker_win32(WorkerInfo
*wi
)
878 ArchiveHandle
*AH
= wi
->AH
;
879 ParallelSlot
*slot
= wi
->slot
;
881 /* Don't need WorkerInfo anymore */
884 /* Run the worker ... */
887 /* Exit the thread */
894 * This function starts a parallel dump or restore by spawning off the worker
895 * processes. For Windows, it creates a number of threads; on Unix the
896 * workers are created with fork().
899 ParallelBackupStart(ArchiveHandle
*AH
)
901 ParallelState
*pstate
;
904 Assert(AH
->public.numWorkers
> 0);
906 pstate
= (ParallelState
*) pg_malloc(sizeof(ParallelState
));
908 pstate
->numWorkers
= AH
->public.numWorkers
;
910 pstate
->parallelSlot
= NULL
;
912 if (AH
->public.numWorkers
== 1)
915 /* Create status arrays, being sure to initialize all fields to 0 */
916 pstate
->te
= (TocEntry
**)
917 pg_malloc0(pstate
->numWorkers
* sizeof(TocEntry
*));
918 pstate
->parallelSlot
= (ParallelSlot
*)
919 pg_malloc0(pstate
->numWorkers
* sizeof(ParallelSlot
));
922 /* Make fmtId() and fmtQualifiedId() use thread-local storage */
923 getLocalPQExpBuffer
= getThreadLocalPQExpBuffer
;
927 * Set the pstate in shutdown_info, to tell the exit handler that it must
928 * clean up workers as well as the main database connection. But we don't
929 * set this in signal_info yet, because we don't want child processes to
930 * inherit non-NULL signal_info.pstate.
932 shutdown_info
.pstate
= pstate
;
935 * Temporarily disable query cancellation on the leader connection. This
936 * ensures that child processes won't inherit valid AH->connCancel
937 * settings and thus won't try to issue cancels against the leader's
938 * connection. No harm is done if we fail while it's disabled, because
939 * the leader connection is idle at this point anyway.
941 set_archive_cancel_info(AH
, NULL
);
943 /* Ensure stdio state is quiesced before forking */
946 /* Create desired number of workers */
947 for (i
= 0; i
< pstate
->numWorkers
; i
++)
955 ParallelSlot
*slot
= &(pstate
->parallelSlot
[i
]);
959 /* Create communication pipes for this worker */
960 if (pgpipe(pipeMW
) < 0 || pgpipe(pipeWM
) < 0)
961 pg_fatal("could not create communication channels: %m");
963 /* leader's ends of the pipes */
964 slot
->pipeRead
= pipeWM
[PIPE_READ
];
965 slot
->pipeWrite
= pipeMW
[PIPE_WRITE
];
966 /* child's ends of the pipes */
967 slot
->pipeRevRead
= pipeMW
[PIPE_READ
];
968 slot
->pipeRevWrite
= pipeWM
[PIPE_WRITE
];
971 /* Create transient structure to pass args to worker function */
972 wi
= (WorkerInfo
*) pg_malloc(sizeof(WorkerInfo
));
977 handle
= _beginthreadex(NULL
, 0, (void *) &init_spawned_worker_win32
,
978 wi
, 0, &(slot
->threadId
));
979 slot
->hThread
= handle
;
980 slot
->workerStatus
= WRKR_IDLE
;
985 /* we are the worker */
988 /* this is needed for GetMyPSlot() */
989 slot
->pid
= getpid();
991 /* instruct signal handler that we're in a worker now */
992 signal_info
.am_worker
= true;
994 /* close read end of Worker -> Leader */
995 closesocket(pipeWM
[PIPE_READ
]);
996 /* close write end of Leader -> Worker */
997 closesocket(pipeMW
[PIPE_WRITE
]);
1000 * Close all inherited fds for communication of the leader with
1001 * previously-forked workers.
1003 for (j
= 0; j
< i
; j
++)
1005 closesocket(pstate
->parallelSlot
[j
].pipeRead
);
1006 closesocket(pstate
->parallelSlot
[j
].pipeWrite
);
1009 /* Run the worker ... */
1010 RunWorker(AH
, slot
);
1012 /* We can just exit(0) when done */
1018 pg_fatal("could not create worker process: %m");
1021 /* In Leader after successful fork */
1023 slot
->workerStatus
= WRKR_IDLE
;
1025 /* close read end of Leader -> Worker */
1026 closesocket(pipeMW
[PIPE_READ
]);
1027 /* close write end of Worker -> Leader */
1028 closesocket(pipeWM
[PIPE_WRITE
]);
1033 * Having forked off the workers, disable SIGPIPE so that leader isn't
1034 * killed if it tries to send a command to a dead worker. We don't want
1035 * the workers to inherit this setting, though.
1038 pqsignal(SIGPIPE
, SIG_IGN
);
1042 * Re-establish query cancellation on the leader connection.
1044 set_archive_cancel_info(AH
, AH
->connection
);
1047 * Tell the cancel signal handler to forward signals to worker processes,
1048 * too. (As with query cancel, we did not need this earlier because the
1049 * workers have not yet been given anything to do; if we die before this
1050 * point, any already-started workers will see EOF and quit promptly.)
1052 set_cancel_pstate(pstate
);
1058 * Close down a parallel dump or restore.
1061 ParallelBackupEnd(ArchiveHandle
*AH
, ParallelState
*pstate
)
1065 /* No work if non-parallel */
1066 if (pstate
->numWorkers
== 1)
1069 /* There should not be any unfinished jobs */
1070 Assert(IsEveryWorkerIdle(pstate
));
1072 /* Close the sockets so that the workers know they can exit */
1073 for (i
= 0; i
< pstate
->numWorkers
; i
++)
1075 closesocket(pstate
->parallelSlot
[i
].pipeRead
);
1076 closesocket(pstate
->parallelSlot
[i
].pipeWrite
);
1079 /* Wait for them to exit */
1080 WaitForTerminatingWorkers(pstate
);
1083 * Unlink pstate from shutdown_info, so the exit handler will not try to
1084 * use it; and likewise unlink from signal_info.
1086 shutdown_info
.pstate
= NULL
;
1087 set_cancel_pstate(NULL
);
1089 /* Release state (mere neatnik-ism, since we're about to terminate) */
1091 free(pstate
->parallelSlot
);
1096 * These next four functions handle construction and parsing of the command
1097 * strings and response strings for parallel workers.
1099 * Currently, these can be the same regardless of which archive format we are
1100 * processing. In future, we might want to let format modules override these
1101 * functions to add format-specific data to a command or response.
1105 * buildWorkerCommand: format a command string to send to a worker.
1107 * The string is built in the caller-supplied buffer of size buflen.
1110 buildWorkerCommand(ArchiveHandle
*AH
, TocEntry
*te
, T_Action act
,
1111 char *buf
, int buflen
)
1113 if (act
== ACT_DUMP
)
1114 snprintf(buf
, buflen
, "DUMP %d", te
->dumpId
);
1115 else if (act
== ACT_RESTORE
)
1116 snprintf(buf
, buflen
, "RESTORE %d", te
->dumpId
);
1122 * parseWorkerCommand: interpret a command string in a worker.
1125 parseWorkerCommand(ArchiveHandle
*AH
, TocEntry
**te
, T_Action
*act
,
1131 if (messageStartsWith(msg
, "DUMP "))
1134 sscanf(msg
, "DUMP %d%n", &dumpId
, &nBytes
);
1135 Assert(nBytes
== strlen(msg
));
1136 *te
= getTocEntryByDumpId(AH
, dumpId
);
1137 Assert(*te
!= NULL
);
1139 else if (messageStartsWith(msg
, "RESTORE "))
1142 sscanf(msg
, "RESTORE %d%n", &dumpId
, &nBytes
);
1143 Assert(nBytes
== strlen(msg
));
1144 *te
= getTocEntryByDumpId(AH
, dumpId
);
1145 Assert(*te
!= NULL
);
1148 pg_fatal("unrecognized command received from leader: \"%s\"",
1153 * buildWorkerResponse: format a response string to send to the leader.
1155 * The string is built in the caller-supplied buffer of size buflen.
1158 buildWorkerResponse(ArchiveHandle
*AH
, TocEntry
*te
, T_Action act
, int status
,
1159 char *buf
, int buflen
)
1161 snprintf(buf
, buflen
, "OK %d %d %d",
1164 status
== WORKER_IGNORED_ERRORS
? AH
->public.n_errors
: 0);
1168 * parseWorkerResponse: parse the status message returned by a worker.
1170 * Returns the integer status code, and may update fields of AH and/or te.
1173 parseWorkerResponse(ArchiveHandle
*AH
, TocEntry
*te
,
1181 if (messageStartsWith(msg
, "OK "))
1183 sscanf(msg
, "OK %d %d %d%n", &dumpId
, &status
, &n_errors
, &nBytes
);
1185 Assert(dumpId
== te
->dumpId
);
1186 Assert(nBytes
== strlen(msg
));
1188 AH
->public.n_errors
+= n_errors
;
1191 pg_fatal("invalid message received from worker: \"%s\"",
1198 * Dispatch a job to some free worker.
1200 * te is the TocEntry to be processed, act is the action to be taken on it.
1201 * callback is the function to call on completion of the job.
1203 * If no worker is currently available, this will block, and previously
1204 * registered callback functions may be called.
1207 DispatchJobForTocEntry(ArchiveHandle
*AH
,
1208 ParallelState
*pstate
,
1211 ParallelCompletionPtr callback
,
1212 void *callback_data
)
1217 /* Get a worker, waiting if none are idle */
1218 while ((worker
= GetIdleWorker(pstate
)) == NO_SLOT
)
1219 WaitForWorkers(AH
, pstate
, WFW_ONE_IDLE
);
1221 /* Construct and send command string */
1222 buildWorkerCommand(AH
, te
, act
, buf
, sizeof(buf
));
1224 sendMessageToWorker(pstate
, worker
, buf
);
1226 /* Remember worker is busy, and which TocEntry it's working on */
1227 pstate
->parallelSlot
[worker
].workerStatus
= WRKR_WORKING
;
1228 pstate
->parallelSlot
[worker
].callback
= callback
;
1229 pstate
->parallelSlot
[worker
].callback_data
= callback_data
;
1230 pstate
->te
[worker
] = te
;
1234 * Find an idle worker and return its slot number.
1235 * Return NO_SLOT if none are idle.
1238 GetIdleWorker(ParallelState
*pstate
)
1242 for (i
= 0; i
< pstate
->numWorkers
; i
++)
1244 if (pstate
->parallelSlot
[i
].workerStatus
== WRKR_IDLE
)
1251 * Return true iff no worker is running.
1254 HasEveryWorkerTerminated(ParallelState
*pstate
)
1258 for (i
= 0; i
< pstate
->numWorkers
; i
++)
1260 if (WORKER_IS_RUNNING(pstate
->parallelSlot
[i
].workerStatus
))
1267 * Return true iff every worker is in the WRKR_IDLE state.
1270 IsEveryWorkerIdle(ParallelState
*pstate
)
1274 for (i
= 0; i
< pstate
->numWorkers
; i
++)
1276 if (pstate
->parallelSlot
[i
].workerStatus
!= WRKR_IDLE
)
1283 * Acquire lock on a table to be dumped by a worker process.
1285 * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1286 * it's no problem for a worker to get one too, but if anything else besides
1287 * pg_dump is running, there's a possible deadlock:
1289 * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1290 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1291 * because the leader holds a conflicting ACCESS SHARE lock).
1292 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1293 * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1294 * 4) Now we have a deadlock, since the leader is effectively waiting for
1295 * the worker. The server cannot detect that, however.
1297 * To prevent an infinite wait, prior to touching a table in a worker, request
1298 * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1299 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1300 * so we have a deadlock. We must fail the backup in that case.
1303 lockTableForWorker(ArchiveHandle
*AH
, TocEntry
*te
)
1309 /* Nothing to do for BLOBS */
1310 if (strcmp(te
->desc
, "BLOBS") == 0)
1313 query
= createPQExpBuffer();
1315 qualId
= fmtQualifiedId(te
->namespace, te
->tag
);
1317 appendPQExpBuffer(query
, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1320 res
= PQexec(AH
->connection
, query
->data
);
1322 if (!res
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
1323 pg_fatal("could not obtain lock on relation \"%s\"\n"
1324 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1325 "on the table after the pg_dump parent process had gotten the "
1326 "initial ACCESS SHARE lock on the table.", qualId
);
1329 destroyPQExpBuffer(query
);
1333 * WaitForCommands: main routine for a worker process.
1335 * Read and execute commands from the leader until we see EOF on the pipe.
1338 WaitForCommands(ArchiveHandle
*AH
, int pipefd
[2])
1348 if (!(command
= getMessageFromLeader(pipefd
)))
1354 /* Decode the command */
1355 parseWorkerCommand(AH
, &te
, &act
, command
);
1357 if (act
== ACT_DUMP
)
1359 /* Acquire lock on this table within the worker's session */
1360 lockTableForWorker(AH
, te
);
1362 /* Perform the dump command */
1363 status
= (AH
->WorkerJobDumpPtr
) (AH
, te
);
1365 else if (act
== ACT_RESTORE
)
1367 /* Perform the restore command */
1368 status
= (AH
->WorkerJobRestorePtr
) (AH
, te
);
1373 /* Return status to leader */
1374 buildWorkerResponse(AH
, te
, act
, status
, buf
, sizeof(buf
));
1376 sendMessageToLeader(pipefd
, buf
);
1378 /* command was pg_malloc'd and we are responsible for free()ing it. */
1384 * Check for status messages from workers.
1386 * If do_wait is true, wait to get a status message; otherwise, just return
1387 * immediately if there is none available.
1389 * When we get a status message, we pass the status code to the callback
1390 * function that was specified to DispatchJobForTocEntry, then reset the
1391 * worker status to IDLE.
1393 * Returns true if we collected a status message, else false.
1395 * XXX is it worth checking for more than one status message per call?
1396 * It seems somewhat unlikely that multiple workers would finish at exactly
1400 ListenToWorkers(ArchiveHandle
*AH
, ParallelState
*pstate
, bool do_wait
)
1405 /* Try to collect a status message */
1406 msg
= getMessageFromWorker(pstate
, do_wait
, &worker
);
1410 /* If do_wait is true, we must have detected EOF on some socket */
1412 pg_fatal("a worker process died unexpectedly");
1416 /* Process it and update our idea of the worker's status */
1417 if (messageStartsWith(msg
, "OK "))
1419 ParallelSlot
*slot
= &pstate
->parallelSlot
[worker
];
1420 TocEntry
*te
= pstate
->te
[worker
];
1423 status
= parseWorkerResponse(AH
, te
, msg
);
1424 slot
->callback(AH
, te
, status
, slot
->callback_data
);
1425 slot
->workerStatus
= WRKR_IDLE
;
1426 pstate
->te
[worker
] = NULL
;
1429 pg_fatal("invalid message received from worker: \"%s\"",
1432 /* Free the string returned from getMessageFromWorker */
1439 * Check for status results from workers, waiting if necessary.
1441 * Available wait modes are:
1442 * WFW_NO_WAIT: reap any available status, but don't block
1443 * WFW_GOT_STATUS: wait for at least one more worker to finish
1444 * WFW_ONE_IDLE: wait for at least one worker to be idle
1445 * WFW_ALL_IDLE: wait for all workers to be idle
1447 * Any received results are passed to the callback specified to
1448 * DispatchJobForTocEntry.
1450 * This function is executed in the leader process.
1453 WaitForWorkers(ArchiveHandle
*AH
, ParallelState
*pstate
, WFW_WaitOption mode
)
1455 bool do_wait
= false;
1458 * In GOT_STATUS mode, always block waiting for a message, since we can't
1459 * return till we get something. In other modes, we don't block the first
1460 * time through the loop.
1462 if (mode
== WFW_GOT_STATUS
)
1464 /* Assert that caller knows what it's doing */
1465 Assert(!IsEveryWorkerIdle(pstate
));
1472 * Check for status messages, even if we don't need to block. We do
1473 * not try very hard to reap all available messages, though, since
1474 * there's unlikely to be more than one.
1476 if (ListenToWorkers(AH
, pstate
, do_wait
))
1479 * If we got a message, we are done by definition for GOT_STATUS
1480 * mode, and we can also be certain that there's at least one idle
1481 * worker. So we're done in all but ALL_IDLE mode.
1483 if (mode
!= WFW_ALL_IDLE
)
1487 /* Check whether we must wait for new status messages */
1491 return; /* never wait */
1492 case WFW_GOT_STATUS
:
1493 Assert(false); /* can't get here, because we waited */
1496 if (GetIdleWorker(pstate
) != NO_SLOT
)
1500 if (IsEveryWorkerIdle(pstate
))
1505 /* Loop back, and this time wait for something to happen */
1511 * Read one command message from the leader, blocking if necessary
1512 * until one is available, and return it as a malloc'd string.
1513 * On EOF, return NULL.
1515 * This function is executed in worker processes.
1518 getMessageFromLeader(int pipefd
[2])
1520 return readMessageFromPipe(pipefd
[PIPE_READ
]);
1524 * Send a status message to the leader.
1526 * This function is executed in worker processes.
1529 sendMessageToLeader(int pipefd
[2], const char *str
)
1531 int len
= strlen(str
) + 1;
1533 if (pipewrite(pipefd
[PIPE_WRITE
], str
, len
) != len
)
1534 pg_fatal("could not write to the communication channel: %m");
1538 * Wait until some descriptor in "workerset" becomes readable.
1539 * Returns -1 on error, else the number of readable descriptors.
1542 select_loop(int maxFd
, fd_set
*workerset
)
1545 fd_set saveSet
= *workerset
;
1549 *workerset
= saveSet
;
1550 i
= select(maxFd
+ 1, workerset
, NULL
, NULL
, NULL
);
1553 if (i
< 0 && errno
== EINTR
)
1556 if (i
== SOCKET_ERROR
&& WSAGetLastError() == WSAEINTR
)
1567 * Check for messages from worker processes.
1569 * If a message is available, return it as a malloc'd string, and put the
1570 * index of the sending worker in *worker.
1572 * If nothing is available, wait if "do_wait" is true, else return NULL.
1574 * If we detect EOF on any socket, we'll return NULL. It's not great that
1575 * that's hard to distinguish from the no-data-available case, but for now
1576 * our one caller is okay with that.
1578 * This function is executed in the leader process.
1581 getMessageFromWorker(ParallelState
*pstate
, bool do_wait
, int *worker
)
1586 struct timeval nowait
= {0, 0};
1588 /* construct bitmap of socket descriptors for select() */
1589 FD_ZERO(&workerset
);
1590 for (i
= 0; i
< pstate
->numWorkers
; i
++)
1592 if (!WORKER_IS_RUNNING(pstate
->parallelSlot
[i
].workerStatus
))
1594 FD_SET(pstate
->parallelSlot
[i
].pipeRead
, &workerset
);
1595 if (pstate
->parallelSlot
[i
].pipeRead
> maxFd
)
1596 maxFd
= pstate
->parallelSlot
[i
].pipeRead
;
1601 i
= select_loop(maxFd
, &workerset
);
1606 if ((i
= select(maxFd
+ 1, &workerset
, NULL
, NULL
, &nowait
)) == 0)
1611 pg_fatal("%s() failed: %m", "select");
1613 for (i
= 0; i
< pstate
->numWorkers
; i
++)
1617 if (!WORKER_IS_RUNNING(pstate
->parallelSlot
[i
].workerStatus
))
1619 if (!FD_ISSET(pstate
->parallelSlot
[i
].pipeRead
, &workerset
))
1623 * Read the message if any. If the socket is ready because of EOF,
1624 * we'll return NULL instead (and the socket will stay ready, so the
1625 * condition will persist).
1627 * Note: because this is a blocking read, we'll wait if only part of
1628 * the message is available. Waiting a long time would be bad, but
1629 * since worker status messages are short and are always sent in one
1630 * operation, it shouldn't be a problem in practice.
1632 msg
= readMessageFromPipe(pstate
->parallelSlot
[i
].pipeRead
);
1641 * Send a command message to the specified worker process.
1643 * This function is executed in the leader process.
1646 sendMessageToWorker(ParallelState
*pstate
, int worker
, const char *str
)
1648 int len
= strlen(str
) + 1;
1650 if (pipewrite(pstate
->parallelSlot
[worker
].pipeWrite
, str
, len
) != len
)
1652 pg_fatal("could not write to the communication channel: %m");
1657 * Read one message from the specified pipe (fd), blocking if necessary
1658 * until one is available, and return it as a malloc'd string.
1659 * On EOF, return NULL.
1661 * A "message" on the channel is just a null-terminated string.
1664 readMessageFromPipe(int fd
)
1672 * In theory, if we let piperead() read multiple bytes, it might give us
1673 * back fragments of multiple messages. (That can't actually occur, since
1674 * neither leader nor workers send more than one message without waiting
1675 * for a reply, but we don't wish to assume that here.) For simplicity,
1676 * read a byte at a time until we get the terminating '\0'. This method
1677 * is a bit inefficient, but since this is only used for relatively short
1678 * command and status strings, it shouldn't matter.
1680 bufsize
= 64; /* could be any number */
1681 msg
= (char *) pg_malloc(bufsize
);
1685 Assert(msgsize
< bufsize
);
1686 ret
= piperead(fd
, msg
+ msgsize
, 1);
1688 break; /* error or connection closure */
1692 if (msg
[msgsize
] == '\0')
1693 return msg
; /* collected whole message */
1696 if (msgsize
== bufsize
) /* enlarge buffer if needed */
1698 bufsize
+= 16; /* could be any number */
1699 msg
= (char *) pg_realloc(msg
, bufsize
);
1703 /* Other end has closed the connection */
1711 * This is a replacement version of pipe(2) for Windows which allows the pipe
1712 * handles to be used in select().
1714 * Reads and writes on the pipe must go through piperead()/pipewrite().
1716 * For consistency with Unix we declare the returned handles as "int".
1717 * This is okay even on WIN64 because system handles are not more than
1718 * 32 bits wide, but we do have to do some casting.
1721 pgpipe(int handles
[2])
1725 struct sockaddr_in serv_addr
;
1726 int len
= sizeof(serv_addr
);
1728 /* We have to use the Unix socket invalid file descriptor value here. */
1729 handles
[0] = handles
[1] = -1;
1732 * setup listen socket
1734 if ((s
= socket(AF_INET
, SOCK_STREAM
, 0)) == PGINVALID_SOCKET
)
1736 pg_log_error("pgpipe: could not create socket: error code %d",
1741 memset(&serv_addr
, 0, sizeof(serv_addr
));
1742 serv_addr
.sin_family
= AF_INET
;
1743 serv_addr
.sin_port
= pg_hton16(0);
1744 serv_addr
.sin_addr
.s_addr
= pg_hton32(INADDR_LOOPBACK
);
1745 if (bind(s
, (SOCKADDR
*) &serv_addr
, len
) == SOCKET_ERROR
)
1747 pg_log_error("pgpipe: could not bind: error code %d",
1752 if (listen(s
, 1) == SOCKET_ERROR
)
1754 pg_log_error("pgpipe: could not listen: error code %d",
1759 if (getsockname(s
, (SOCKADDR
*) &serv_addr
, &len
) == SOCKET_ERROR
)
1761 pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1768 * setup pipe handles
1770 if ((tmp_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == PGINVALID_SOCKET
)
1772 pg_log_error("pgpipe: could not create second socket: error code %d",
1777 handles
[1] = (int) tmp_sock
;
1779 if (connect(handles
[1], (SOCKADDR
*) &serv_addr
, len
) == SOCKET_ERROR
)
1781 pg_log_error("pgpipe: could not connect socket: error code %d",
1783 closesocket(handles
[1]);
1788 if ((tmp_sock
= accept(s
, (SOCKADDR
*) &serv_addr
, &len
)) == PGINVALID_SOCKET
)
1790 pg_log_error("pgpipe: could not accept connection: error code %d",
1792 closesocket(handles
[1]);
1797 handles
[0] = (int) tmp_sock
;