Consistently use "superuser" instead of "super user"
[pgsql.git] / src / bin / pg_basebackup / pg_recvlogical.c
blobebeb12d497c5eb348c75f655cbca4b4bf1472f28
1 /*-------------------------------------------------------------------------
3 * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 * fashion and write it to a local file.
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_recvlogical.c
10 *-------------------------------------------------------------------------
13 #include "postgres_fe.h"
15 #include <dirent.h>
16 #include <limits.h>
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
23 #include "access/xlog_internal.h"
24 #include "common/fe_memutils.h"
25 #include "common/file_perm.h"
26 #include "common/logging.h"
27 #include "fe_utils/option_utils.h"
28 #include "getopt_long.h"
29 #include "libpq-fe.h"
30 #include "libpq/pqsignal.h"
31 #include "pqexpbuffer.h"
32 #include "streamutil.h"
34 /* Time to sleep between reconnection attempts */
35 #define RECONNECT_SLEEP_TIME 5
37 /* Global Options */
38 static char *outfile = NULL;
39 static int verbose = 0;
40 static bool two_phase = false;
41 static int noloop = 0;
42 static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
43 static int fsync_interval = 10 * 1000; /* 10 sec = default */
44 static XLogRecPtr startpos = InvalidXLogRecPtr;
45 static XLogRecPtr endpos = InvalidXLogRecPtr;
46 static bool do_create_slot = false;
47 static bool slot_exists_ok = false;
48 static bool do_start_slot = false;
49 static bool do_drop_slot = false;
50 static char *replication_slot = NULL;
52 /* filled pairwise with option, value. value may be NULL */
53 static char **options;
54 static size_t noptions = 0;
55 static const char *plugin = "test_decoding";
57 /* Global State */
58 static int outfd = -1;
59 static volatile sig_atomic_t time_to_abort = false;
60 static volatile sig_atomic_t output_reopen = false;
61 static bool output_isfile;
62 static TimestampTz output_last_fsync = -1;
63 static bool output_needs_fsync = false;
64 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
65 static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
67 static void usage(void);
68 static void StreamLogicalLog(void);
69 static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
70 static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
71 bool keepalive, XLogRecPtr lsn);
73 static void
74 usage(void)
76 printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
77 progname);
78 printf(_("Usage:\n"));
79 printf(_(" %s [OPTION]...\n"), progname);
80 printf(_("\nAction to be performed:\n"));
81 printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
82 printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
83 printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
84 printf(_("\nOptions:\n"));
85 printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
86 printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
87 printf(_(" -F --fsync-interval=SECS\n"
88 " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
89 printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
90 printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
91 printf(_(" -n, --no-loop do not loop on connection lost\n"));
92 printf(_(" -o, --option=NAME[=VALUE]\n"
93 " pass option NAME with optional value VALUE to the\n"
94 " output plugin\n"));
95 printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
96 printf(_(" -s, --status-interval=SECS\n"
97 " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
98 printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
99 printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n"));
100 printf(_(" -v, --verbose output verbose messages\n"));
101 printf(_(" -V, --version output version information, then exit\n"));
102 printf(_(" -?, --help show this help, then exit\n"));
103 printf(_("\nConnection options:\n"));
104 printf(_(" -d, --dbname=DBNAME database to connect to\n"));
105 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
106 printf(_(" -p, --port=PORT database server port number\n"));
107 printf(_(" -U, --username=NAME connect as specified database user\n"));
108 printf(_(" -w, --no-password never prompt for password\n"));
109 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
110 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
111 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
115 * Send a Standby Status Update message to server.
117 static bool
118 sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
120 static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
121 static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
123 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
124 int len = 0;
127 * we normally don't want to send superfluous feedback, but if it's
128 * because of a timeout we need to, otherwise wal_sender_timeout will kill
129 * us.
131 if (!force &&
132 last_written_lsn == output_written_lsn &&
133 last_fsync_lsn == output_fsync_lsn)
134 return true;
136 if (verbose)
137 pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
138 LSN_FORMAT_ARGS(output_written_lsn),
139 LSN_FORMAT_ARGS(output_fsync_lsn),
140 replication_slot);
142 replybuf[len] = 'r';
143 len += 1;
144 fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
145 len += 8;
146 fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
147 len += 8;
148 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
149 len += 8;
150 fe_sendint64(now, &replybuf[len]); /* sendTime */
151 len += 8;
152 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
153 len += 1;
155 startpos = output_written_lsn;
156 last_written_lsn = output_written_lsn;
157 last_fsync_lsn = output_fsync_lsn;
159 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
161 pg_log_error("could not send feedback packet: %s",
162 PQerrorMessage(conn));
163 return false;
166 return true;
169 static void
170 disconnect_atexit(void)
172 if (conn != NULL)
173 PQfinish(conn);
176 static bool
177 OutputFsync(TimestampTz now)
179 output_last_fsync = now;
181 output_fsync_lsn = output_written_lsn;
183 if (fsync_interval <= 0)
184 return true;
186 if (!output_needs_fsync)
187 return true;
189 output_needs_fsync = false;
191 /* can only fsync if it's a regular file */
192 if (!output_isfile)
193 return true;
195 if (fsync(outfd) != 0)
197 pg_log_fatal("could not fsync file \"%s\": %m", outfile);
198 exit(1);
201 return true;
205 * Start the log streaming
207 static void
208 StreamLogicalLog(void)
210 PGresult *res;
211 char *copybuf = NULL;
212 TimestampTz last_status = -1;
213 int i;
214 PQExpBuffer query;
216 output_written_lsn = InvalidXLogRecPtr;
217 output_fsync_lsn = InvalidXLogRecPtr;
219 query = createPQExpBuffer();
222 * Connect in replication mode to the server
224 if (!conn)
225 conn = GetConnection();
226 if (!conn)
227 /* Error message already written in GetConnection() */
228 return;
231 * Start the replication
233 if (verbose)
234 pg_log_info("starting log streaming at %X/%X (slot %s)",
235 LSN_FORMAT_ARGS(startpos),
236 replication_slot);
238 /* Initiate the replication stream at specified location */
239 appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
240 replication_slot, LSN_FORMAT_ARGS(startpos));
242 /* print options if there are any */
243 if (noptions)
244 appendPQExpBufferStr(query, " (");
246 for (i = 0; i < noptions; i++)
248 /* separator */
249 if (i > 0)
250 appendPQExpBufferStr(query, ", ");
252 /* write option name */
253 appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
255 /* write option value if specified */
256 if (options[(i * 2) + 1] != NULL)
257 appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
260 if (noptions)
261 appendPQExpBufferChar(query, ')');
263 res = PQexec(conn, query->data);
264 if (PQresultStatus(res) != PGRES_COPY_BOTH)
266 pg_log_error("could not send replication command \"%s\": %s",
267 query->data, PQresultErrorMessage(res));
268 PQclear(res);
269 goto error;
271 PQclear(res);
272 resetPQExpBuffer(query);
274 if (verbose)
275 pg_log_info("streaming initiated");
277 while (!time_to_abort)
279 int r;
280 int bytes_left;
281 int bytes_written;
282 TimestampTz now;
283 int hdr_len;
284 XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
286 if (copybuf != NULL)
288 PQfreemem(copybuf);
289 copybuf = NULL;
293 * Potentially send a status message to the primary.
295 now = feGetCurrentTimestamp();
297 if (outfd != -1 &&
298 feTimestampDifferenceExceeds(output_last_fsync, now,
299 fsync_interval))
301 if (!OutputFsync(now))
302 goto error;
305 if (standby_message_timeout > 0 &&
306 feTimestampDifferenceExceeds(last_status, now,
307 standby_message_timeout))
309 /* Time to send feedback! */
310 if (!sendFeedback(conn, now, true, false))
311 goto error;
313 last_status = now;
316 /* got SIGHUP, close output file */
317 if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
319 now = feGetCurrentTimestamp();
320 if (!OutputFsync(now))
321 goto error;
322 close(outfd);
323 outfd = -1;
325 output_reopen = false;
327 /* open the output file, if not open yet */
328 if (outfd == -1)
330 struct stat statbuf;
332 if (strcmp(outfile, "-") == 0)
333 outfd = fileno(stdout);
334 else
335 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
336 S_IRUSR | S_IWUSR);
337 if (outfd == -1)
339 pg_log_error("could not open log file \"%s\": %m", outfile);
340 goto error;
343 if (fstat(outfd, &statbuf) != 0)
345 pg_log_error("could not stat file \"%s\": %m", outfile);
346 goto error;
349 output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
352 r = PQgetCopyData(conn, &copybuf, 1);
353 if (r == 0)
356 * In async mode, and no data available. We block on reading but
357 * not more than the specified timeout, so that we can send a
358 * response back to the client.
360 fd_set input_mask;
361 TimestampTz message_target = 0;
362 TimestampTz fsync_target = 0;
363 struct timeval timeout;
364 struct timeval *timeoutptr = NULL;
366 if (PQsocket(conn) < 0)
368 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
369 goto error;
372 FD_ZERO(&input_mask);
373 FD_SET(PQsocket(conn), &input_mask);
375 /* Compute when we need to wakeup to send a keepalive message. */
376 if (standby_message_timeout)
377 message_target = last_status + (standby_message_timeout - 1) *
378 ((int64) 1000);
380 /* Compute when we need to wakeup to fsync the output file. */
381 if (fsync_interval > 0 && output_needs_fsync)
382 fsync_target = output_last_fsync + (fsync_interval - 1) *
383 ((int64) 1000);
385 /* Now compute when to wakeup. */
386 if (message_target > 0 || fsync_target > 0)
388 TimestampTz targettime;
389 long secs;
390 int usecs;
392 targettime = message_target;
394 if (fsync_target > 0 && fsync_target < targettime)
395 targettime = fsync_target;
397 feTimestampDifference(now,
398 targettime,
399 &secs,
400 &usecs);
401 if (secs <= 0)
402 timeout.tv_sec = 1; /* Always sleep at least 1 sec */
403 else
404 timeout.tv_sec = secs;
405 timeout.tv_usec = usecs;
406 timeoutptr = &timeout;
409 r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
410 if (r == 0 || (r < 0 && errno == EINTR))
413 * Got a timeout or signal. Continue the loop and either
414 * deliver a status packet to the server or just go back into
415 * blocking.
417 continue;
419 else if (r < 0)
421 pg_log_error("%s() failed: %m", "select");
422 goto error;
425 /* Else there is actually data on the socket */
426 if (PQconsumeInput(conn) == 0)
428 pg_log_error("could not receive data from WAL stream: %s",
429 PQerrorMessage(conn));
430 goto error;
432 continue;
435 /* End of copy stream */
436 if (r == -1)
437 break;
439 /* Failure while reading the copy stream */
440 if (r == -2)
442 pg_log_error("could not read COPY data: %s",
443 PQerrorMessage(conn));
444 goto error;
447 /* Check the message type. */
448 if (copybuf[0] == 'k')
450 int pos;
451 bool replyRequested;
452 XLogRecPtr walEnd;
453 bool endposReached = false;
456 * Parse the keepalive message, enclosed in the CopyData message.
457 * We just check if the server requested a reply, and ignore the
458 * rest.
460 pos = 1; /* skip msgtype 'k' */
461 walEnd = fe_recvint64(&copybuf[pos]);
462 output_written_lsn = Max(walEnd, output_written_lsn);
464 pos += 8; /* read walEnd */
466 pos += 8; /* skip sendTime */
468 if (r < pos + 1)
470 pg_log_error("streaming header too small: %d", r);
471 goto error;
473 replyRequested = copybuf[pos];
475 if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
478 * If there's nothing to read on the socket until a keepalive
479 * we know that the server has nothing to send us; and if
480 * walEnd has passed endpos, we know nothing else can have
481 * committed before endpos. So we can bail out now.
483 endposReached = true;
486 /* Send a reply, if necessary */
487 if (replyRequested || endposReached)
489 if (!flushAndSendFeedback(conn, &now))
490 goto error;
491 last_status = now;
494 if (endposReached)
496 prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
497 time_to_abort = true;
498 break;
501 continue;
503 else if (copybuf[0] != 'w')
505 pg_log_error("unrecognized streaming header: \"%c\"",
506 copybuf[0]);
507 goto error;
511 * Read the header of the XLogData message, enclosed in the CopyData
512 * message. We only need the WAL location field (dataStart), the rest
513 * of the header is ignored.
515 hdr_len = 1; /* msgtype 'w' */
516 hdr_len += 8; /* dataStart */
517 hdr_len += 8; /* walEnd */
518 hdr_len += 8; /* sendTime */
519 if (r < hdr_len + 1)
521 pg_log_error("streaming header too small: %d", r);
522 goto error;
525 /* Extract WAL location for this block */
526 cur_record_lsn = fe_recvint64(&copybuf[1]);
528 if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
531 * We've read past our endpoint, so prepare to go away being
532 * cautious about what happens to our output data.
534 if (!flushAndSendFeedback(conn, &now))
535 goto error;
536 prepareToTerminate(conn, endpos, false, cur_record_lsn);
537 time_to_abort = true;
538 break;
541 output_written_lsn = Max(cur_record_lsn, output_written_lsn);
543 bytes_left = r - hdr_len;
544 bytes_written = 0;
546 /* signal that a fsync is needed */
547 output_needs_fsync = true;
549 while (bytes_left)
551 int ret;
553 ret = write(outfd,
554 copybuf + hdr_len + bytes_written,
555 bytes_left);
557 if (ret < 0)
559 pg_log_error("could not write %u bytes to log file \"%s\": %m",
560 bytes_left, outfile);
561 goto error;
564 /* Write was successful, advance our position */
565 bytes_written += ret;
566 bytes_left -= ret;
569 if (write(outfd, "\n", 1) != 1)
571 pg_log_error("could not write %u bytes to log file \"%s\": %m",
572 1, outfile);
573 goto error;
576 if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
578 /* endpos was exactly the record we just processed, we're done */
579 if (!flushAndSendFeedback(conn, &now))
580 goto error;
581 prepareToTerminate(conn, endpos, false, cur_record_lsn);
582 time_to_abort = true;
583 break;
587 res = PQgetResult(conn);
588 if (PQresultStatus(res) == PGRES_COPY_OUT)
590 PQclear(res);
593 * We're doing a client-initiated clean exit and have sent CopyDone to
594 * the server. Drain any messages, so we don't miss a last-minute
595 * ErrorResponse. The walsender stops generating XLogData records once
596 * it sees CopyDone, so expect this to finish quickly. After CopyDone,
597 * it's too late for sendFeedback(), even if this were to take a long
598 * time. Hence, use synchronous-mode PQgetCopyData().
600 while (1)
602 int r;
604 if (copybuf != NULL)
606 PQfreemem(copybuf);
607 copybuf = NULL;
609 r = PQgetCopyData(conn, &copybuf, 0);
610 if (r == -1)
611 break;
612 if (r == -2)
614 pg_log_error("could not read COPY data: %s",
615 PQerrorMessage(conn));
616 time_to_abort = false; /* unclean exit */
617 goto error;
621 res = PQgetResult(conn);
623 if (PQresultStatus(res) != PGRES_COMMAND_OK)
625 pg_log_error("unexpected termination of replication stream: %s",
626 PQresultErrorMessage(res));
627 goto error;
629 PQclear(res);
631 if (outfd != -1 && strcmp(outfile, "-") != 0)
633 TimestampTz t = feGetCurrentTimestamp();
635 /* no need to jump to error on failure here, we're finishing anyway */
636 OutputFsync(t);
638 if (close(outfd) != 0)
639 pg_log_error("could not close file \"%s\": %m", outfile);
641 outfd = -1;
642 error:
643 if (copybuf != NULL)
645 PQfreemem(copybuf);
646 copybuf = NULL;
648 destroyPQExpBuffer(query);
649 PQfinish(conn);
650 conn = NULL;
654 * Unfortunately we can't do sensible signal handling on windows...
656 #ifndef WIN32
659 * When sigint is called, just tell the system to exit at the next possible
660 * moment.
662 static void
663 sigint_handler(int signum)
665 time_to_abort = true;
669 * Trigger the output file to be reopened.
671 static void
672 sighup_handler(int signum)
674 output_reopen = true;
676 #endif
680 main(int argc, char **argv)
682 static struct option long_options[] = {
683 /* general options */
684 {"file", required_argument, NULL, 'f'},
685 {"fsync-interval", required_argument, NULL, 'F'},
686 {"no-loop", no_argument, NULL, 'n'},
687 {"verbose", no_argument, NULL, 'v'},
688 {"two-phase", no_argument, NULL, 't'},
689 {"version", no_argument, NULL, 'V'},
690 {"help", no_argument, NULL, '?'},
691 /* connection options */
692 {"dbname", required_argument, NULL, 'd'},
693 {"host", required_argument, NULL, 'h'},
694 {"port", required_argument, NULL, 'p'},
695 {"username", required_argument, NULL, 'U'},
696 {"no-password", no_argument, NULL, 'w'},
697 {"password", no_argument, NULL, 'W'},
698 /* replication options */
699 {"startpos", required_argument, NULL, 'I'},
700 {"endpos", required_argument, NULL, 'E'},
701 {"option", required_argument, NULL, 'o'},
702 {"plugin", required_argument, NULL, 'P'},
703 {"status-interval", required_argument, NULL, 's'},
704 {"slot", required_argument, NULL, 'S'},
705 /* action */
706 {"create-slot", no_argument, NULL, 1},
707 {"start", no_argument, NULL, 2},
708 {"drop-slot", no_argument, NULL, 3},
709 {"if-not-exists", no_argument, NULL, 4},
710 {NULL, 0, NULL, 0}
712 int c;
713 int option_index;
714 uint32 hi,
716 char *db_name;
718 pg_logging_init(argv[0]);
719 progname = get_progname(argv[0]);
720 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
722 if (argc > 1)
724 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
726 usage();
727 exit(0);
729 else if (strcmp(argv[1], "-V") == 0 ||
730 strcmp(argv[1], "--version") == 0)
732 puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
733 exit(0);
737 while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
738 long_options, &option_index)) != -1)
740 switch (c)
742 /* general options */
743 case 'f':
744 outfile = pg_strdup(optarg);
745 break;
746 case 'F':
747 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
748 INT_MAX / 1000,
749 &fsync_interval))
750 exit(1);
751 fsync_interval *= 1000;
752 break;
753 case 'n':
754 noloop = 1;
755 break;
756 case 'v':
757 verbose++;
758 break;
759 case 't':
760 two_phase = true;
761 break;
762 /* connection options */
763 case 'd':
764 dbname = pg_strdup(optarg);
765 break;
766 case 'h':
767 dbhost = pg_strdup(optarg);
768 break;
769 case 'p':
770 dbport = pg_strdup(optarg);
771 break;
772 case 'U':
773 dbuser = pg_strdup(optarg);
774 break;
775 case 'w':
776 dbgetpassword = -1;
777 break;
778 case 'W':
779 dbgetpassword = 1;
780 break;
781 /* replication options */
782 case 'I':
783 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
785 pg_log_error("could not parse start position \"%s\"", optarg);
786 exit(1);
788 startpos = ((uint64) hi) << 32 | lo;
789 break;
790 case 'E':
791 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
793 pg_log_error("could not parse end position \"%s\"", optarg);
794 exit(1);
796 endpos = ((uint64) hi) << 32 | lo;
797 break;
798 case 'o':
800 char *data = pg_strdup(optarg);
801 char *val = strchr(data, '=');
803 if (val != NULL)
805 /* remove =; separate data from val */
806 *val = '\0';
807 val++;
810 noptions += 1;
811 options = pg_realloc(options, sizeof(char *) * noptions * 2);
813 options[(noptions - 1) * 2] = data;
814 options[(noptions - 1) * 2 + 1] = val;
817 break;
818 case 'P':
819 plugin = pg_strdup(optarg);
820 break;
821 case 's':
822 if (!option_parse_int(optarg, "-s/--status-interval", 0,
823 INT_MAX / 1000,
824 &standby_message_timeout))
825 exit(1);
826 standby_message_timeout *= 1000;
827 break;
828 case 'S':
829 replication_slot = pg_strdup(optarg);
830 break;
831 /* action */
832 case 1:
833 do_create_slot = true;
834 break;
835 case 2:
836 do_start_slot = true;
837 break;
838 case 3:
839 do_drop_slot = true;
840 break;
841 case 4:
842 slot_exists_ok = true;
843 break;
845 default:
848 * getopt_long already emitted a complaint
850 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
851 progname);
852 exit(1);
857 * Any non-option arguments?
859 if (optind < argc)
861 pg_log_error("too many command-line arguments (first is \"%s\")",
862 argv[optind]);
863 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
864 progname);
865 exit(1);
869 * Required arguments
871 if (replication_slot == NULL)
873 pg_log_error("no slot specified");
874 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
875 progname);
876 exit(1);
879 if (do_start_slot && outfile == NULL)
881 pg_log_error("no target file specified");
882 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
883 progname);
884 exit(1);
887 if (!do_drop_slot && dbname == NULL)
889 pg_log_error("no database specified");
890 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
891 progname);
892 exit(1);
895 if (!do_drop_slot && !do_create_slot && !do_start_slot)
897 pg_log_error("at least one action needs to be specified");
898 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
899 progname);
900 exit(1);
903 if (do_drop_slot && (do_create_slot || do_start_slot))
905 pg_log_error("cannot use --create-slot or --start together with --drop-slot");
906 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
907 progname);
908 exit(1);
911 if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
913 pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
914 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
915 progname);
916 exit(1);
919 if (endpos != InvalidXLogRecPtr && !do_start_slot)
921 pg_log_error("--endpos may only be specified with --start");
922 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
923 progname);
924 exit(1);
927 if (two_phase && !do_create_slot)
929 pg_log_error("--two-phase may only be specified with --create-slot");
930 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
931 progname);
932 exit(1);
936 #ifndef WIN32
937 pqsignal(SIGINT, sigint_handler);
938 pqsignal(SIGHUP, sighup_handler);
939 #endif
942 * Obtain a connection to server. This is not really necessary but it
943 * helps to get more precise error messages about authentication, required
944 * GUC parameters and such.
946 conn = GetConnection();
947 if (!conn)
948 /* Error message already written in GetConnection() */
949 exit(1);
950 atexit(disconnect_atexit);
953 * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
954 * replication connection.
956 if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
957 exit(1);
959 if (db_name == NULL)
961 pg_log_error("could not establish database-specific replication connection");
962 exit(1);
966 * Set umask so that directories/files are created with the same
967 * permissions as directories/files in the source data directory.
969 * pg_mode_mask is set to owner-only by default and then updated in
970 * GetConnection() where we get the mode from the server-side with
971 * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
973 umask(pg_mode_mask);
975 /* Drop a replication slot. */
976 if (do_drop_slot)
978 if (verbose)
979 pg_log_info("dropping replication slot \"%s\"", replication_slot);
981 if (!DropReplicationSlot(conn, replication_slot))
982 exit(1);
985 /* Create a replication slot. */
986 if (do_create_slot)
988 if (verbose)
989 pg_log_info("creating replication slot \"%s\"", replication_slot);
991 if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
992 false, false, slot_exists_ok, two_phase))
993 exit(1);
994 startpos = InvalidXLogRecPtr;
997 if (!do_start_slot)
998 exit(0);
1000 /* Stream loop */
1001 while (true)
1003 StreamLogicalLog();
1004 if (time_to_abort)
1007 * We've been Ctrl-C'ed or reached an exit limit condition. That's
1008 * not an error, so exit without an errorcode.
1010 exit(0);
1012 else if (noloop)
1014 pg_log_error("disconnected");
1015 exit(1);
1017 else
1019 /* translator: check source for value for %d */
1020 pg_log_info("disconnected; waiting %d seconds to try again",
1021 RECONNECT_SLEEP_TIME);
1022 pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1028 * Fsync our output data, and send a feedback message to the server. Returns
1029 * true if successful, false otherwise.
1031 * If successful, *now is updated to the current timestamp just before sending
1032 * feedback.
1034 static bool
1035 flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1037 /* flush data to disk, so that we send a recent flush pointer */
1038 if (!OutputFsync(*now))
1039 return false;
1040 *now = feGetCurrentTimestamp();
1041 if (!sendFeedback(conn, *now, true, false))
1042 return false;
1044 return true;
1048 * Try to inform the server about our upcoming demise, but don't wait around or
1049 * retry on failure.
1051 static void
1052 prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
1054 (void) PQputCopyEnd(conn, NULL);
1055 (void) PQflush(conn);
1057 if (verbose)
1059 if (keepalive)
1060 pg_log_info("end position %X/%X reached by keepalive",
1061 LSN_FORMAT_ARGS(endpos));
1062 else
1063 pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1064 LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));