1 /*-------------------------------------------------------------------------
3 * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 * fashion and write it to a local file.
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
9 * src/bin/pg_basebackup/pg_recvlogical.c
10 *-------------------------------------------------------------------------
13 #include "postgres_fe.h"
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
23 #include "access/xlog_internal.h"
24 #include "common/fe_memutils.h"
25 #include "common/file_perm.h"
26 #include "common/logging.h"
27 #include "fe_utils/option_utils.h"
28 #include "getopt_long.h"
30 #include "libpq/pqsignal.h"
31 #include "pqexpbuffer.h"
32 #include "streamutil.h"
34 /* Time to sleep between reconnection attempts */
35 #define RECONNECT_SLEEP_TIME 5
38 static char *outfile
= NULL
;
39 static int verbose
= 0;
40 static bool two_phase
= false;
41 static int noloop
= 0;
42 static int standby_message_timeout
= 10 * 1000; /* 10 sec = default */
43 static int fsync_interval
= 10 * 1000; /* 10 sec = default */
44 static XLogRecPtr startpos
= InvalidXLogRecPtr
;
45 static XLogRecPtr endpos
= InvalidXLogRecPtr
;
46 static bool do_create_slot
= false;
47 static bool slot_exists_ok
= false;
48 static bool do_start_slot
= false;
49 static bool do_drop_slot
= false;
50 static char *replication_slot
= NULL
;
52 /* filled pairwise with option, value. value may be NULL */
53 static char **options
;
54 static size_t noptions
= 0;
55 static const char *plugin
= "test_decoding";
58 static int outfd
= -1;
59 static volatile sig_atomic_t time_to_abort
= false;
60 static volatile sig_atomic_t output_reopen
= false;
61 static bool output_isfile
;
62 static TimestampTz output_last_fsync
= -1;
63 static bool output_needs_fsync
= false;
64 static XLogRecPtr output_written_lsn
= InvalidXLogRecPtr
;
65 static XLogRecPtr output_fsync_lsn
= InvalidXLogRecPtr
;
67 static void usage(void);
68 static void StreamLogicalLog(void);
69 static bool flushAndSendFeedback(PGconn
*conn
, TimestampTz
*now
);
70 static void prepareToTerminate(PGconn
*conn
, XLogRecPtr endpos
,
71 bool keepalive
, XLogRecPtr lsn
);
76 printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
78 printf(_("Usage:\n"));
79 printf(_(" %s [OPTION]...\n"), progname
);
80 printf(_("\nAction to be performed:\n"));
81 printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
82 printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
83 printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
84 printf(_("\nOptions:\n"));
85 printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
86 printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
87 printf(_(" -F --fsync-interval=SECS\n"
88 " time between fsyncs to the output file (default: %d)\n"), (fsync_interval
/ 1000));
89 printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
90 printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
91 printf(_(" -n, --no-loop do not loop on connection lost\n"));
92 printf(_(" -o, --option=NAME[=VALUE]\n"
93 " pass option NAME with optional value VALUE to the\n"
95 printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin
);
96 printf(_(" -s, --status-interval=SECS\n"
97 " time between status packets sent to server (default: %d)\n"), (standby_message_timeout
/ 1000));
98 printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
99 printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n"));
100 printf(_(" -v, --verbose output verbose messages\n"));
101 printf(_(" -V, --version output version information, then exit\n"));
102 printf(_(" -?, --help show this help, then exit\n"));
103 printf(_("\nConnection options:\n"));
104 printf(_(" -d, --dbname=DBNAME database to connect to\n"));
105 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
106 printf(_(" -p, --port=PORT database server port number\n"));
107 printf(_(" -U, --username=NAME connect as specified database user\n"));
108 printf(_(" -w, --no-password never prompt for password\n"));
109 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
110 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT
);
111 printf(_("%s home page: <%s>\n"), PACKAGE_NAME
, PACKAGE_URL
);
115 * Send a Standby Status Update message to server.
118 sendFeedback(PGconn
*conn
, TimestampTz now
, bool force
, bool replyRequested
)
120 static XLogRecPtr last_written_lsn
= InvalidXLogRecPtr
;
121 static XLogRecPtr last_fsync_lsn
= InvalidXLogRecPtr
;
123 char replybuf
[1 + 8 + 8 + 8 + 8 + 1];
127 * we normally don't want to send superfluous feedback, but if it's
128 * because of a timeout we need to, otherwise wal_sender_timeout will kill
132 last_written_lsn
== output_written_lsn
&&
133 last_fsync_lsn
== output_fsync_lsn
)
137 pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
138 LSN_FORMAT_ARGS(output_written_lsn
),
139 LSN_FORMAT_ARGS(output_fsync_lsn
),
144 fe_sendint64(output_written_lsn
, &replybuf
[len
]); /* write */
146 fe_sendint64(output_fsync_lsn
, &replybuf
[len
]); /* flush */
148 fe_sendint64(InvalidXLogRecPtr
, &replybuf
[len
]); /* apply */
150 fe_sendint64(now
, &replybuf
[len
]); /* sendTime */
152 replybuf
[len
] = replyRequested
? 1 : 0; /* replyRequested */
155 startpos
= output_written_lsn
;
156 last_written_lsn
= output_written_lsn
;
157 last_fsync_lsn
= output_fsync_lsn
;
159 if (PQputCopyData(conn
, replybuf
, len
) <= 0 || PQflush(conn
))
161 pg_log_error("could not send feedback packet: %s",
162 PQerrorMessage(conn
));
170 disconnect_atexit(void)
177 OutputFsync(TimestampTz now
)
179 output_last_fsync
= now
;
181 output_fsync_lsn
= output_written_lsn
;
183 if (fsync_interval
<= 0)
186 if (!output_needs_fsync
)
189 output_needs_fsync
= false;
191 /* can only fsync if it's a regular file */
195 if (fsync(outfd
) != 0)
197 pg_log_fatal("could not fsync file \"%s\": %m", outfile
);
205 * Start the log streaming
208 StreamLogicalLog(void)
211 char *copybuf
= NULL
;
212 TimestampTz last_status
= -1;
216 output_written_lsn
= InvalidXLogRecPtr
;
217 output_fsync_lsn
= InvalidXLogRecPtr
;
219 query
= createPQExpBuffer();
222 * Connect in replication mode to the server
225 conn
= GetConnection();
227 /* Error message already written in GetConnection() */
231 * Start the replication
234 pg_log_info("starting log streaming at %X/%X (slot %s)",
235 LSN_FORMAT_ARGS(startpos
),
238 /* Initiate the replication stream at specified location */
239 appendPQExpBuffer(query
, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
240 replication_slot
, LSN_FORMAT_ARGS(startpos
));
242 /* print options if there are any */
244 appendPQExpBufferStr(query
, " (");
246 for (i
= 0; i
< noptions
; i
++)
250 appendPQExpBufferStr(query
, ", ");
252 /* write option name */
253 appendPQExpBuffer(query
, "\"%s\"", options
[(i
* 2)]);
255 /* write option value if specified */
256 if (options
[(i
* 2) + 1] != NULL
)
257 appendPQExpBuffer(query
, " '%s'", options
[(i
* 2) + 1]);
261 appendPQExpBufferChar(query
, ')');
263 res
= PQexec(conn
, query
->data
);
264 if (PQresultStatus(res
) != PGRES_COPY_BOTH
)
266 pg_log_error("could not send replication command \"%s\": %s",
267 query
->data
, PQresultErrorMessage(res
));
272 resetPQExpBuffer(query
);
275 pg_log_info("streaming initiated");
277 while (!time_to_abort
)
284 XLogRecPtr cur_record_lsn
= InvalidXLogRecPtr
;
293 * Potentially send a status message to the primary.
295 now
= feGetCurrentTimestamp();
298 feTimestampDifferenceExceeds(output_last_fsync
, now
,
301 if (!OutputFsync(now
))
305 if (standby_message_timeout
> 0 &&
306 feTimestampDifferenceExceeds(last_status
, now
,
307 standby_message_timeout
))
309 /* Time to send feedback! */
310 if (!sendFeedback(conn
, now
, true, false))
316 /* got SIGHUP, close output file */
317 if (outfd
!= -1 && output_reopen
&& strcmp(outfile
, "-") != 0)
319 now
= feGetCurrentTimestamp();
320 if (!OutputFsync(now
))
325 output_reopen
= false;
327 /* open the output file, if not open yet */
332 if (strcmp(outfile
, "-") == 0)
333 outfd
= fileno(stdout
);
335 outfd
= open(outfile
, O_CREAT
| O_APPEND
| O_WRONLY
| PG_BINARY
,
339 pg_log_error("could not open log file \"%s\": %m", outfile
);
343 if (fstat(outfd
, &statbuf
) != 0)
345 pg_log_error("could not stat file \"%s\": %m", outfile
);
349 output_isfile
= S_ISREG(statbuf
.st_mode
) && !isatty(outfd
);
352 r
= PQgetCopyData(conn
, ©buf
, 1);
356 * In async mode, and no data available. We block on reading but
357 * not more than the specified timeout, so that we can send a
358 * response back to the client.
361 TimestampTz message_target
= 0;
362 TimestampTz fsync_target
= 0;
363 struct timeval timeout
;
364 struct timeval
*timeoutptr
= NULL
;
366 if (PQsocket(conn
) < 0)
368 pg_log_error("invalid socket: %s", PQerrorMessage(conn
));
372 FD_ZERO(&input_mask
);
373 FD_SET(PQsocket(conn
), &input_mask
);
375 /* Compute when we need to wakeup to send a keepalive message. */
376 if (standby_message_timeout
)
377 message_target
= last_status
+ (standby_message_timeout
- 1) *
380 /* Compute when we need to wakeup to fsync the output file. */
381 if (fsync_interval
> 0 && output_needs_fsync
)
382 fsync_target
= output_last_fsync
+ (fsync_interval
- 1) *
385 /* Now compute when to wakeup. */
386 if (message_target
> 0 || fsync_target
> 0)
388 TimestampTz targettime
;
392 targettime
= message_target
;
394 if (fsync_target
> 0 && fsync_target
< targettime
)
395 targettime
= fsync_target
;
397 feTimestampDifference(now
,
402 timeout
.tv_sec
= 1; /* Always sleep at least 1 sec */
404 timeout
.tv_sec
= secs
;
405 timeout
.tv_usec
= usecs
;
406 timeoutptr
= &timeout
;
409 r
= select(PQsocket(conn
) + 1, &input_mask
, NULL
, NULL
, timeoutptr
);
410 if (r
== 0 || (r
< 0 && errno
== EINTR
))
413 * Got a timeout or signal. Continue the loop and either
414 * deliver a status packet to the server or just go back into
421 pg_log_error("%s() failed: %m", "select");
425 /* Else there is actually data on the socket */
426 if (PQconsumeInput(conn
) == 0)
428 pg_log_error("could not receive data from WAL stream: %s",
429 PQerrorMessage(conn
));
435 /* End of copy stream */
439 /* Failure while reading the copy stream */
442 pg_log_error("could not read COPY data: %s",
443 PQerrorMessage(conn
));
447 /* Check the message type. */
448 if (copybuf
[0] == 'k')
453 bool endposReached
= false;
456 * Parse the keepalive message, enclosed in the CopyData message.
457 * We just check if the server requested a reply, and ignore the
460 pos
= 1; /* skip msgtype 'k' */
461 walEnd
= fe_recvint64(©buf
[pos
]);
462 output_written_lsn
= Max(walEnd
, output_written_lsn
);
464 pos
+= 8; /* read walEnd */
466 pos
+= 8; /* skip sendTime */
470 pg_log_error("streaming header too small: %d", r
);
473 replyRequested
= copybuf
[pos
];
475 if (endpos
!= InvalidXLogRecPtr
&& walEnd
>= endpos
)
478 * If there's nothing to read on the socket until a keepalive
479 * we know that the server has nothing to send us; and if
480 * walEnd has passed endpos, we know nothing else can have
481 * committed before endpos. So we can bail out now.
483 endposReached
= true;
486 /* Send a reply, if necessary */
487 if (replyRequested
|| endposReached
)
489 if (!flushAndSendFeedback(conn
, &now
))
496 prepareToTerminate(conn
, endpos
, true, InvalidXLogRecPtr
);
497 time_to_abort
= true;
503 else if (copybuf
[0] != 'w')
505 pg_log_error("unrecognized streaming header: \"%c\"",
511 * Read the header of the XLogData message, enclosed in the CopyData
512 * message. We only need the WAL location field (dataStart), the rest
513 * of the header is ignored.
515 hdr_len
= 1; /* msgtype 'w' */
516 hdr_len
+= 8; /* dataStart */
517 hdr_len
+= 8; /* walEnd */
518 hdr_len
+= 8; /* sendTime */
521 pg_log_error("streaming header too small: %d", r
);
525 /* Extract WAL location for this block */
526 cur_record_lsn
= fe_recvint64(©buf
[1]);
528 if (endpos
!= InvalidXLogRecPtr
&& cur_record_lsn
> endpos
)
531 * We've read past our endpoint, so prepare to go away being
532 * cautious about what happens to our output data.
534 if (!flushAndSendFeedback(conn
, &now
))
536 prepareToTerminate(conn
, endpos
, false, cur_record_lsn
);
537 time_to_abort
= true;
541 output_written_lsn
= Max(cur_record_lsn
, output_written_lsn
);
543 bytes_left
= r
- hdr_len
;
546 /* signal that a fsync is needed */
547 output_needs_fsync
= true;
554 copybuf
+ hdr_len
+ bytes_written
,
559 pg_log_error("could not write %u bytes to log file \"%s\": %m",
560 bytes_left
, outfile
);
564 /* Write was successful, advance our position */
565 bytes_written
+= ret
;
569 if (write(outfd
, "\n", 1) != 1)
571 pg_log_error("could not write %u bytes to log file \"%s\": %m",
576 if (endpos
!= InvalidXLogRecPtr
&& cur_record_lsn
== endpos
)
578 /* endpos was exactly the record we just processed, we're done */
579 if (!flushAndSendFeedback(conn
, &now
))
581 prepareToTerminate(conn
, endpos
, false, cur_record_lsn
);
582 time_to_abort
= true;
587 res
= PQgetResult(conn
);
588 if (PQresultStatus(res
) == PGRES_COPY_OUT
)
593 * We're doing a client-initiated clean exit and have sent CopyDone to
594 * the server. Drain any messages, so we don't miss a last-minute
595 * ErrorResponse. The walsender stops generating XLogData records once
596 * it sees CopyDone, so expect this to finish quickly. After CopyDone,
597 * it's too late for sendFeedback(), even if this were to take a long
598 * time. Hence, use synchronous-mode PQgetCopyData().
609 r
= PQgetCopyData(conn
, ©buf
, 0);
614 pg_log_error("could not read COPY data: %s",
615 PQerrorMessage(conn
));
616 time_to_abort
= false; /* unclean exit */
621 res
= PQgetResult(conn
);
623 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
625 pg_log_error("unexpected termination of replication stream: %s",
626 PQresultErrorMessage(res
));
631 if (outfd
!= -1 && strcmp(outfile
, "-") != 0)
633 TimestampTz t
= feGetCurrentTimestamp();
635 /* no need to jump to error on failure here, we're finishing anyway */
638 if (close(outfd
) != 0)
639 pg_log_error("could not close file \"%s\": %m", outfile
);
648 destroyPQExpBuffer(query
);
654 * Unfortunately we can't do sensible signal handling on windows...
659 * When sigint is called, just tell the system to exit at the next possible
663 sigint_handler(int signum
)
665 time_to_abort
= true;
669 * Trigger the output file to be reopened.
672 sighup_handler(int signum
)
674 output_reopen
= true;
680 main(int argc
, char **argv
)
682 static struct option long_options
[] = {
683 /* general options */
684 {"file", required_argument
, NULL
, 'f'},
685 {"fsync-interval", required_argument
, NULL
, 'F'},
686 {"no-loop", no_argument
, NULL
, 'n'},
687 {"verbose", no_argument
, NULL
, 'v'},
688 {"two-phase", no_argument
, NULL
, 't'},
689 {"version", no_argument
, NULL
, 'V'},
690 {"help", no_argument
, NULL
, '?'},
691 /* connection options */
692 {"dbname", required_argument
, NULL
, 'd'},
693 {"host", required_argument
, NULL
, 'h'},
694 {"port", required_argument
, NULL
, 'p'},
695 {"username", required_argument
, NULL
, 'U'},
696 {"no-password", no_argument
, NULL
, 'w'},
697 {"password", no_argument
, NULL
, 'W'},
698 /* replication options */
699 {"startpos", required_argument
, NULL
, 'I'},
700 {"endpos", required_argument
, NULL
, 'E'},
701 {"option", required_argument
, NULL
, 'o'},
702 {"plugin", required_argument
, NULL
, 'P'},
703 {"status-interval", required_argument
, NULL
, 's'},
704 {"slot", required_argument
, NULL
, 'S'},
706 {"create-slot", no_argument
, NULL
, 1},
707 {"start", no_argument
, NULL
, 2},
708 {"drop-slot", no_argument
, NULL
, 3},
709 {"if-not-exists", no_argument
, NULL
, 4},
718 pg_logging_init(argv
[0]);
719 progname
= get_progname(argv
[0]);
720 set_pglocale_pgservice(argv
[0], PG_TEXTDOMAIN("pg_basebackup"));
724 if (strcmp(argv
[1], "--help") == 0 || strcmp(argv
[1], "-?") == 0)
729 else if (strcmp(argv
[1], "-V") == 0 ||
730 strcmp(argv
[1], "--version") == 0)
732 puts("pg_recvlogical (PostgreSQL) " PG_VERSION
);
737 while ((c
= getopt_long(argc
, argv
, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
738 long_options
, &option_index
)) != -1)
742 /* general options */
744 outfile
= pg_strdup(optarg
);
747 if (!option_parse_int(optarg
, "-F/--fsync-interval", 0,
751 fsync_interval
*= 1000;
762 /* connection options */
764 dbname
= pg_strdup(optarg
);
767 dbhost
= pg_strdup(optarg
);
770 dbport
= pg_strdup(optarg
);
773 dbuser
= pg_strdup(optarg
);
781 /* replication options */
783 if (sscanf(optarg
, "%X/%X", &hi
, &lo
) != 2)
785 pg_log_error("could not parse start position \"%s\"", optarg
);
788 startpos
= ((uint64
) hi
) << 32 | lo
;
791 if (sscanf(optarg
, "%X/%X", &hi
, &lo
) != 2)
793 pg_log_error("could not parse end position \"%s\"", optarg
);
796 endpos
= ((uint64
) hi
) << 32 | lo
;
800 char *data
= pg_strdup(optarg
);
801 char *val
= strchr(data
, '=');
805 /* remove =; separate data from val */
811 options
= pg_realloc(options
, sizeof(char *) * noptions
* 2);
813 options
[(noptions
- 1) * 2] = data
;
814 options
[(noptions
- 1) * 2 + 1] = val
;
819 plugin
= pg_strdup(optarg
);
822 if (!option_parse_int(optarg
, "-s/--status-interval", 0,
824 &standby_message_timeout
))
826 standby_message_timeout
*= 1000;
829 replication_slot
= pg_strdup(optarg
);
833 do_create_slot
= true;
836 do_start_slot
= true;
842 slot_exists_ok
= true;
848 * getopt_long already emitted a complaint
850 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
857 * Any non-option arguments?
861 pg_log_error("too many command-line arguments (first is \"%s\")",
863 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
871 if (replication_slot
== NULL
)
873 pg_log_error("no slot specified");
874 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
879 if (do_start_slot
&& outfile
== NULL
)
881 pg_log_error("no target file specified");
882 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
887 if (!do_drop_slot
&& dbname
== NULL
)
889 pg_log_error("no database specified");
890 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
895 if (!do_drop_slot
&& !do_create_slot
&& !do_start_slot
)
897 pg_log_error("at least one action needs to be specified");
898 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
903 if (do_drop_slot
&& (do_create_slot
|| do_start_slot
))
905 pg_log_error("cannot use --create-slot or --start together with --drop-slot");
906 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
911 if (startpos
!= InvalidXLogRecPtr
&& (do_create_slot
|| do_drop_slot
))
913 pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
914 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
919 if (endpos
!= InvalidXLogRecPtr
&& !do_start_slot
)
921 pg_log_error("--endpos may only be specified with --start");
922 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
927 if (two_phase
&& !do_create_slot
)
929 pg_log_error("--two-phase may only be specified with --create-slot");
930 fprintf(stderr
, _("Try \"%s --help\" for more information.\n"),
937 pqsignal(SIGINT
, sigint_handler
);
938 pqsignal(SIGHUP
, sighup_handler
);
942 * Obtain a connection to server. This is not really necessary but it
943 * helps to get more precise error messages about authentication, required
944 * GUC parameters and such.
946 conn
= GetConnection();
948 /* Error message already written in GetConnection() */
950 atexit(disconnect_atexit
);
953 * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
954 * replication connection.
956 if (!RunIdentifySystem(conn
, NULL
, NULL
, NULL
, &db_name
))
961 pg_log_error("could not establish database-specific replication connection");
966 * Set umask so that directories/files are created with the same
967 * permissions as directories/files in the source data directory.
969 * pg_mode_mask is set to owner-only by default and then updated in
970 * GetConnection() where we get the mode from the server-side with
971 * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
975 /* Drop a replication slot. */
979 pg_log_info("dropping replication slot \"%s\"", replication_slot
);
981 if (!DropReplicationSlot(conn
, replication_slot
))
985 /* Create a replication slot. */
989 pg_log_info("creating replication slot \"%s\"", replication_slot
);
991 if (!CreateReplicationSlot(conn
, replication_slot
, plugin
, false,
992 false, false, slot_exists_ok
, two_phase
))
994 startpos
= InvalidXLogRecPtr
;
1007 * We've been Ctrl-C'ed or reached an exit limit condition. That's
1008 * not an error, so exit without an errorcode.
1014 pg_log_error("disconnected");
1019 /* translator: check source for value for %d */
1020 pg_log_info("disconnected; waiting %d seconds to try again",
1021 RECONNECT_SLEEP_TIME
);
1022 pg_usleep(RECONNECT_SLEEP_TIME
* 1000000);
1028 * Fsync our output data, and send a feedback message to the server. Returns
1029 * true if successful, false otherwise.
1031 * If successful, *now is updated to the current timestamp just before sending
1035 flushAndSendFeedback(PGconn
*conn
, TimestampTz
*now
)
1037 /* flush data to disk, so that we send a recent flush pointer */
1038 if (!OutputFsync(*now
))
1040 *now
= feGetCurrentTimestamp();
1041 if (!sendFeedback(conn
, *now
, true, false))
1048 * Try to inform the server about our upcoming demise, but don't wait around or
1052 prepareToTerminate(PGconn
*conn
, XLogRecPtr endpos
, bool keepalive
, XLogRecPtr lsn
)
1054 (void) PQputCopyEnd(conn
, NULL
);
1055 (void) PQflush(conn
);
1060 pg_log_info("end position %X/%X reached by keepalive",
1061 LSN_FORMAT_ARGS(endpos
));
1063 pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1064 LSN_FORMAT_ARGS(endpos
), LSN_FORMAT_ARGS(lsn
));