Consistently use "superuser" instead of "super user"
[pgsql.git] / src / bin / pg_basebackup / receivelog.c
blob9601fd8d9cf9e5145df1cf54d9555734b1104b6a
1 /*-------------------------------------------------------------------------
3 * receivelog.c - receive WAL files using the streaming
4 * replication protocol.
6 * Author: Magnus Hagander <magnus@hagander.net>
8 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
10 * IDENTIFICATION
11 * src/bin/pg_basebackup/receivelog.c
12 *-------------------------------------------------------------------------
15 #include "postgres_fe.h"
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
23 #include "access/xlog_internal.h"
24 #include "common/file_utils.h"
25 #include "common/logging.h"
26 #include "libpq-fe.h"
27 #include "receivelog.h"
28 #include "streamutil.h"
30 /* fd and filename for currently open WAL file */
31 static Walfile *walfile = NULL;
32 static char current_walfile_name[MAXPGPATH] = "";
33 static bool reportFlushPosition = false;
34 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
36 static bool still_sending = true; /* feedback still needs to be sent? */
38 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
39 XLogRecPtr *stoppos);
40 static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
41 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
42 char **buffer);
43 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
44 int len, XLogRecPtr blockpos, TimestampTz *last_status);
45 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
46 XLogRecPtr *blockpos);
47 static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
48 XLogRecPtr blockpos, XLogRecPtr *stoppos);
49 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
50 static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
51 TimestampTz last_status);
53 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
54 uint32 *timeline);
56 static bool
57 mark_file_as_archived(StreamCtl *stream, const char *fname)
59 Walfile *f;
60 static char tmppath[MAXPGPATH];
62 snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
63 fname);
65 f = stream->walmethod->open_for_write(tmppath, NULL, 0);
66 if (f == NULL)
68 pg_log_error("could not create archive status file \"%s\": %s",
69 tmppath, stream->walmethod->getlasterror());
70 return false;
73 stream->walmethod->close(f, CLOSE_NORMAL);
75 return true;
79 * Open a new WAL file in the specified directory.
81 * Returns true if OK; on failure, returns false after printing an error msg.
82 * On success, 'walfile' is set to the FD for the file, and the base filename
83 * (without partial_suffix) is stored in 'current_walfile_name'.
85 * The file will be padded to 16Mb with zeroes.
87 static bool
88 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
90 Walfile *f;
91 char *fn;
92 ssize_t size;
93 XLogSegNo segno;
95 XLByteToSeg(startpoint, segno, WalSegSz);
96 XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
98 /* Note that this considers the compression used if necessary */
99 fn = stream->walmethod->get_file_name(current_walfile_name,
100 stream->partial_suffix);
103 * When streaming to files, if an existing file exists we verify that it's
104 * either empty (just created), or a complete WalSegSz segment (in which
105 * case it has been created and padded). Anything else indicates a corrupt
106 * file. Compressed files have no need for padding, so just ignore this
107 * case.
109 * When streaming to tar, no file with this name will exist before, so we
110 * never have to verify a size.
112 if (stream->walmethod->compression() == 0 &&
113 stream->walmethod->existsfile(fn))
115 size = stream->walmethod->get_file_size(fn);
116 if (size < 0)
118 pg_log_error("could not get size of write-ahead log file \"%s\": %s",
119 fn, stream->walmethod->getlasterror());
120 pg_free(fn);
121 return false;
123 if (size == WalSegSz)
125 /* Already padded file. Open it for use */
126 f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
127 if (f == NULL)
129 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
130 fn, stream->walmethod->getlasterror());
131 pg_free(fn);
132 return false;
135 /* fsync file in case of a previous crash */
136 if (stream->walmethod->sync(f) != 0)
138 pg_log_fatal("could not fsync existing write-ahead log file \"%s\": %s",
139 fn, stream->walmethod->getlasterror());
140 stream->walmethod->close(f, CLOSE_UNLINK);
141 exit(1);
144 walfile = f;
145 pg_free(fn);
146 return true;
148 if (size != 0)
150 /* if write didn't set errno, assume problem is no disk space */
151 if (errno == 0)
152 errno = ENOSPC;
153 pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
154 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
155 size),
156 fn, size, WalSegSz);
157 pg_free(fn);
158 return false;
160 /* File existed and was empty, so fall through and open */
163 /* No file existed, so create one */
165 f = stream->walmethod->open_for_write(current_walfile_name,
166 stream->partial_suffix, WalSegSz);
167 if (f == NULL)
169 pg_log_error("could not open write-ahead log file \"%s\": %s",
170 fn, stream->walmethod->getlasterror());
171 pg_free(fn);
172 return false;
175 pg_free(fn);
176 walfile = f;
177 return true;
181 * Close the current WAL file (if open), and rename it to the correct
182 * filename if it's complete. On failure, prints an error message to stderr
183 * and returns false, otherwise returns true.
185 static bool
186 close_walfile(StreamCtl *stream, XLogRecPtr pos)
188 off_t currpos;
189 int r;
191 if (walfile == NULL)
192 return true;
194 currpos = stream->walmethod->get_current_pos(walfile);
195 if (currpos == -1)
197 pg_log_error("could not determine seek position in file \"%s\": %s",
198 current_walfile_name, stream->walmethod->getlasterror());
199 stream->walmethod->close(walfile, CLOSE_UNLINK);
200 walfile = NULL;
202 return false;
205 if (stream->partial_suffix)
207 if (currpos == WalSegSz)
208 r = stream->walmethod->close(walfile, CLOSE_NORMAL);
209 else
211 pg_log_info("not renaming \"%s%s\", segment is not complete",
212 current_walfile_name, stream->partial_suffix);
213 r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
216 else
217 r = stream->walmethod->close(walfile, CLOSE_NORMAL);
219 walfile = NULL;
221 if (r != 0)
223 pg_log_error("could not close file \"%s\": %s",
224 current_walfile_name, stream->walmethod->getlasterror());
225 return false;
229 * Mark file as archived if requested by the caller - pg_basebackup needs
230 * to do so as files can otherwise get archived again after promotion of a
231 * new node. This is in line with walreceiver.c always doing a
232 * XLogArchiveForceDone() after a complete segment.
234 if (currpos == WalSegSz && stream->mark_done)
236 /* writes error message if failed */
237 if (!mark_file_as_archived(stream, current_walfile_name))
238 return false;
241 lastFlushPosition = pos;
242 return true;
247 * Check if a timeline history file exists.
249 static bool
250 existsTimeLineHistoryFile(StreamCtl *stream)
252 char histfname[MAXFNAMELEN];
255 * Timeline 1 never has a history file. We treat that as if it existed,
256 * since we never need to stream it.
258 if (stream->timeline == 1)
259 return true;
261 TLHistoryFileName(histfname, stream->timeline);
263 return stream->walmethod->existsfile(histfname);
266 static bool
267 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
269 int size = strlen(content);
270 char histfname[MAXFNAMELEN];
271 Walfile *f;
274 * Check that the server's idea of how timeline history files should be
275 * named matches ours.
277 TLHistoryFileName(histfname, stream->timeline);
278 if (strcmp(histfname, filename) != 0)
280 pg_log_error("server reported unexpected history file name for timeline %u: %s",
281 stream->timeline, filename);
282 return false;
285 f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
286 if (f == NULL)
288 pg_log_error("could not create timeline history file \"%s\": %s",
289 histfname, stream->walmethod->getlasterror());
290 return false;
293 if ((int) stream->walmethod->write(f, content, size) != size)
295 pg_log_error("could not write timeline history file \"%s\": %s",
296 histfname, stream->walmethod->getlasterror());
299 * If we fail to make the file, delete it to release disk space
301 stream->walmethod->close(f, CLOSE_UNLINK);
303 return false;
306 if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
308 pg_log_error("could not close file \"%s\": %s",
309 histfname, stream->walmethod->getlasterror());
310 return false;
313 /* Maintain archive_status, check close_walfile() for details. */
314 if (stream->mark_done)
316 /* writes error message if failed */
317 if (!mark_file_as_archived(stream, histfname))
318 return false;
321 return true;
325 * Send a Standby Status Update message to server.
327 static bool
328 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
330 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
331 int len = 0;
333 replybuf[len] = 'r';
334 len += 1;
335 fe_sendint64(blockpos, &replybuf[len]); /* write */
336 len += 8;
337 if (reportFlushPosition)
338 fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
339 else
340 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
341 len += 8;
342 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
343 len += 8;
344 fe_sendint64(now, &replybuf[len]); /* sendTime */
345 len += 8;
346 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
347 len += 1;
349 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
351 pg_log_error("could not send feedback packet: %s",
352 PQerrorMessage(conn));
353 return false;
356 return true;
360 * Check that the server version we're connected to is supported by
361 * ReceiveXlogStream().
363 * If it's not, an error message is printed to stderr, and false is returned.
365 bool
366 CheckServerVersionForStreaming(PGconn *conn)
368 int minServerMajor,
369 maxServerMajor;
370 int serverMajor;
373 * The message format used in streaming replication changed in 9.3, so we
374 * cannot stream from older servers. And we don't support servers newer
375 * than the client; it might work, but we don't know, so err on the safe
376 * side.
378 minServerMajor = 903;
379 maxServerMajor = PG_VERSION_NUM / 100;
380 serverMajor = PQserverVersion(conn) / 100;
381 if (serverMajor < minServerMajor)
383 const char *serverver = PQparameterStatus(conn, "server_version");
385 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
386 serverver ? serverver : "'unknown'",
387 "9.3");
388 return false;
390 else if (serverMajor > maxServerMajor)
392 const char *serverver = PQparameterStatus(conn, "server_version");
394 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
395 serverver ? serverver : "'unknown'",
396 PG_VERSION);
397 return false;
399 return true;
403 * Receive a log stream starting at the specified position.
405 * Individual parameters are passed through the StreamCtl structure.
407 * If sysidentifier is specified, validate that both the system
408 * identifier and the timeline matches the specified ones
409 * (by sending an extra IDENTIFY_SYSTEM command)
411 * All received segments will be written to the directory
412 * specified by basedir. This will also fetch any missing timeline history
413 * files.
415 * The stream_stop callback will be called every time data
416 * is received, and whenever a segment is completed. If it returns
417 * true, the streaming will stop and the function
418 * return. As long as it returns false, streaming will continue
419 * indefinitely.
421 * If stream_stop() checks for external input, stop_socket should be set to
422 * the FD it checks. This will allow such input to be detected promptly
423 * rather than after standby_message_timeout (which might be indefinite).
424 * Note that signals will interrupt waits for input as well, but that is
425 * race-y since a signal received while busy won't interrupt the wait.
427 * standby_message_timeout controls how often we send a message
428 * back to the primary letting it know our progress, in milliseconds.
429 * Zero means no messages are sent.
430 * This message will only contain the write location, and never
431 * flush or replay.
433 * If 'partial_suffix' is not NULL, files are initially created with the
434 * given suffix, and the suffix is removed once the file is finished. That
435 * allows you to tell the difference between partial and completed files,
436 * so that you can continue later where you left.
438 * If 'synchronous' is true, the received WAL is flushed as soon as written,
439 * otherwise only when the WAL file is closed.
441 * Note: The WAL location *must* be at a log segment start!
443 bool
444 ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
446 char query[128];
447 char slotcmd[128];
448 PGresult *res;
449 XLogRecPtr stoppos;
452 * The caller should've checked the server version already, but doesn't do
453 * any harm to check it here too.
455 if (!CheckServerVersionForStreaming(conn))
456 return false;
459 * Decide whether we want to report the flush position. If we report the
460 * flush position, the primary will know what WAL we'll possibly
461 * re-request, and it can then remove older WAL safely. We must always do
462 * that when we are using slots.
464 * Reporting the flush position makes one eligible as a synchronous
465 * replica. People shouldn't include generic names in
466 * synchronous_standby_names, but we've protected them against it so far,
467 * so let's continue to do so unless specifically requested.
469 if (stream->replication_slot != NULL)
471 reportFlushPosition = true;
472 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
474 else
476 if (stream->synchronous)
477 reportFlushPosition = true;
478 else
479 reportFlushPosition = false;
480 slotcmd[0] = 0;
483 if (stream->sysidentifier != NULL)
485 char *sysidentifier = NULL;
486 TimeLineID servertli;
489 * Get the server system identifier and timeline, and validate them.
491 if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
493 pg_free(sysidentifier);
494 return false;
497 if (strcmp(stream->sysidentifier, sysidentifier) != 0)
499 pg_log_error("system identifier does not match between base backup and streaming connection");
500 pg_free(sysidentifier);
501 return false;
503 pg_free(sysidentifier);
505 if (stream->timeline > servertli)
507 pg_log_error("starting timeline %u is not present in the server",
508 stream->timeline);
509 return false;
514 * initialize flush position to starting point, it's the caller's
515 * responsibility that that's sane.
517 lastFlushPosition = stream->startpos;
519 while (1)
522 * Fetch the timeline history file for this timeline, if we don't have
523 * it already. When streaming log to tar, this will always return
524 * false, as we are never streaming into an existing file and
525 * therefore there can be no pre-existing timeline history file.
527 if (!existsTimeLineHistoryFile(stream))
529 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
530 res = PQexec(conn, query);
531 if (PQresultStatus(res) != PGRES_TUPLES_OK)
533 /* FIXME: we might send it ok, but get an error */
534 pg_log_error("could not send replication command \"%s\": %s",
535 "TIMELINE_HISTORY", PQresultErrorMessage(res));
536 PQclear(res);
537 return false;
541 * The response to TIMELINE_HISTORY is a single row result set
542 * with two fields: filename and content
544 if (PQnfields(res) != 2 || PQntuples(res) != 1)
546 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
547 PQntuples(res), PQnfields(res), 1, 2);
550 /* Write the history file to disk */
551 writeTimeLineHistoryFile(stream,
552 PQgetvalue(res, 0, 0),
553 PQgetvalue(res, 0, 1));
555 PQclear(res);
559 * Before we start streaming from the requested location, check if the
560 * callback tells us to stop here.
562 if (stream->stream_stop(stream->startpos, stream->timeline, false))
563 return true;
565 /* Initiate the replication stream at specified location */
566 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
567 slotcmd,
568 LSN_FORMAT_ARGS(stream->startpos),
569 stream->timeline);
570 res = PQexec(conn, query);
571 if (PQresultStatus(res) != PGRES_COPY_BOTH)
573 pg_log_error("could not send replication command \"%s\": %s",
574 "START_REPLICATION", PQresultErrorMessage(res));
575 PQclear(res);
576 return false;
578 PQclear(res);
580 /* Stream the WAL */
581 res = HandleCopyStream(conn, stream, &stoppos);
582 if (res == NULL)
583 goto error;
586 * Streaming finished.
588 * There are two possible reasons for that: a controlled shutdown, or
589 * we reached the end of the current timeline. In case of
590 * end-of-timeline, the server sends a result set after Copy has
591 * finished, containing information about the next timeline. Read
592 * that, and restart streaming from the next timeline. In case of
593 * controlled shutdown, stop here.
595 if (PQresultStatus(res) == PGRES_TUPLES_OK)
598 * End-of-timeline. Read the next timeline's ID and starting
599 * position. Usually, the starting position will match the end of
600 * the previous timeline, but there are corner cases like if the
601 * server had sent us half of a WAL record, when it was promoted.
602 * The new timeline will begin at the end of the last complete
603 * record in that case, overlapping the partial WAL record on the
604 * old timeline.
606 uint32 newtimeline;
607 bool parsed;
609 parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
610 PQclear(res);
611 if (!parsed)
612 goto error;
614 /* Sanity check the values the server gave us */
615 if (newtimeline <= stream->timeline)
617 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
618 newtimeline, stream->timeline);
619 goto error;
621 if (stream->startpos > stoppos)
623 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
624 stream->timeline, LSN_FORMAT_ARGS(stoppos),
625 newtimeline, LSN_FORMAT_ARGS(stream->startpos));
626 goto error;
629 /* Read the final result, which should be CommandComplete. */
630 res = PQgetResult(conn);
631 if (PQresultStatus(res) != PGRES_COMMAND_OK)
633 pg_log_error("unexpected termination of replication stream: %s",
634 PQresultErrorMessage(res));
635 PQclear(res);
636 goto error;
638 PQclear(res);
641 * Loop back to start streaming from the new timeline. Always
642 * start streaming at the beginning of a segment.
644 stream->timeline = newtimeline;
645 stream->startpos = stream->startpos -
646 XLogSegmentOffset(stream->startpos, WalSegSz);
647 continue;
649 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
651 PQclear(res);
654 * End of replication (ie. controlled shut down of the server).
656 * Check if the callback thinks it's OK to stop here. If not,
657 * complain.
659 if (stream->stream_stop(stoppos, stream->timeline, false))
660 return true;
661 else
663 pg_log_error("replication stream was terminated before stop point");
664 goto error;
667 else
669 /* Server returned an error. */
670 pg_log_error("unexpected termination of replication stream: %s",
671 PQresultErrorMessage(res));
672 PQclear(res);
673 goto error;
677 error:
678 if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
679 pg_log_error("could not close file \"%s\": %s",
680 current_walfile_name, stream->walmethod->getlasterror());
681 walfile = NULL;
682 return false;
686 * Helper function to parse the result set returned by server after streaming
687 * has finished. On failure, prints an error to stderr and returns false.
689 static bool
690 ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
692 uint32 startpos_xlogid,
693 startpos_xrecoff;
695 /*----------
696 * The result set consists of one row and two columns, e.g:
698 * next_tli | next_tli_startpos
699 * ----------+-------------------
700 * 4 | 0/9949AE0
702 * next_tli is the timeline ID of the next timeline after the one that
703 * just finished streaming. next_tli_startpos is the WAL location where
704 * the server switched to it.
705 *----------
707 if (PQnfields(res) < 2 || PQntuples(res) != 1)
709 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
710 PQntuples(res), PQnfields(res), 1, 2);
711 return false;
714 *timeline = atoi(PQgetvalue(res, 0, 0));
715 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
716 &startpos_xrecoff) != 2)
718 pg_log_error("could not parse next timeline's starting point \"%s\"",
719 PQgetvalue(res, 0, 1));
720 return false;
722 *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
724 return true;
728 * The main loop of ReceiveXlogStream. Handles the COPY stream after
729 * initiating streaming with the START_REPLICATION command.
731 * If the COPY ends (not necessarily successfully) due a message from the
732 * server, returns a PGresult and sets *stoppos to the last byte written.
733 * On any other sort of error, returns NULL.
735 static PGresult *
736 HandleCopyStream(PGconn *conn, StreamCtl *stream,
737 XLogRecPtr *stoppos)
739 char *copybuf = NULL;
740 TimestampTz last_status = -1;
741 XLogRecPtr blockpos = stream->startpos;
743 still_sending = true;
745 while (1)
747 int r;
748 TimestampTz now;
749 long sleeptime;
752 * Check if we should continue streaming, or abort at this point.
754 if (!CheckCopyStreamStop(conn, stream, blockpos))
755 goto error;
757 now = feGetCurrentTimestamp();
760 * If synchronous option is true, issue sync command as soon as there
761 * are WAL data which has not been flushed yet.
763 if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
765 if (stream->walmethod->sync(walfile) != 0)
767 pg_log_fatal("could not fsync file \"%s\": %s",
768 current_walfile_name, stream->walmethod->getlasterror());
769 exit(1);
771 lastFlushPosition = blockpos;
774 * Send feedback so that the server sees the latest WAL locations
775 * immediately.
777 if (!sendFeedback(conn, blockpos, now, false))
778 goto error;
779 last_status = now;
783 * Potentially send a status message to the primary
785 if (still_sending && stream->standby_message_timeout > 0 &&
786 feTimestampDifferenceExceeds(last_status, now,
787 stream->standby_message_timeout))
789 /* Time to send feedback! */
790 if (!sendFeedback(conn, blockpos, now, false))
791 goto error;
792 last_status = now;
796 * Calculate how long send/receive loops should sleep
798 sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
799 last_status);
801 r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
802 while (r != 0)
804 if (r == -1)
805 goto error;
806 if (r == -2)
808 PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
810 if (res == NULL)
811 goto error;
812 else
813 return res;
816 /* Check the message type. */
817 if (copybuf[0] == 'k')
819 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
820 &last_status))
821 goto error;
823 else if (copybuf[0] == 'w')
825 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
826 goto error;
829 * Check if we should continue streaming, or abort at this
830 * point.
832 if (!CheckCopyStreamStop(conn, stream, blockpos))
833 goto error;
835 else
837 pg_log_error("unrecognized streaming header: \"%c\"",
838 copybuf[0]);
839 goto error;
843 * Process the received data, and any subsequent data we can read
844 * without blocking.
846 r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
850 error:
851 if (copybuf != NULL)
852 PQfreemem(copybuf);
853 return NULL;
857 * Wait until we can read a CopyData message,
858 * or timeout, or occurrence of a signal or input on the stop_socket.
859 * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
861 * Returns 1 if data has become available for reading, 0 if timed out
862 * or interrupted by signal or stop_socket input, and -1 on an error.
864 static int
865 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
867 int ret;
868 fd_set input_mask;
869 int connsocket;
870 int maxfd;
871 struct timeval timeout;
872 struct timeval *timeoutptr;
874 connsocket = PQsocket(conn);
875 if (connsocket < 0)
877 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
878 return -1;
881 FD_ZERO(&input_mask);
882 FD_SET(connsocket, &input_mask);
883 maxfd = connsocket;
884 if (stop_socket != PGINVALID_SOCKET)
886 FD_SET(stop_socket, &input_mask);
887 maxfd = Max(maxfd, stop_socket);
890 if (timeout_ms < 0)
891 timeoutptr = NULL;
892 else
894 timeout.tv_sec = timeout_ms / 1000L;
895 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
896 timeoutptr = &timeout;
899 ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
901 if (ret < 0)
903 if (errno == EINTR)
904 return 0; /* Got a signal, so not an error */
905 pg_log_error("%s() failed: %m", "select");
906 return -1;
908 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
909 return 1; /* Got input on connection socket */
911 return 0; /* Got timeout or input on stop_socket */
915 * Receive CopyData message available from XLOG stream, blocking for
916 * maximum of 'timeout' ms.
918 * If data was received, returns the length of the data. *buffer is set to
919 * point to a buffer holding the received message. The buffer is only valid
920 * until the next CopyStreamReceive call.
922 * Returns 0 if no data was available within timeout, or if wait was
923 * interrupted by signal or stop_socket input.
924 * -1 on error. -2 if the server ended the COPY.
926 static int
927 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
928 char **buffer)
930 char *copybuf = NULL;
931 int rawlen;
933 if (*buffer != NULL)
934 PQfreemem(*buffer);
935 *buffer = NULL;
937 /* Try to receive a CopyData message */
938 rawlen = PQgetCopyData(conn, &copybuf, 1);
939 if (rawlen == 0)
941 int ret;
944 * No data available. Wait for some to appear, but not longer than
945 * the specified timeout, so that we can ping the server. Also stop
946 * waiting if input appears on stop_socket.
948 ret = CopyStreamPoll(conn, timeout, stop_socket);
949 if (ret <= 0)
950 return ret;
952 /* Now there is actually data on the socket */
953 if (PQconsumeInput(conn) == 0)
955 pg_log_error("could not receive data from WAL stream: %s",
956 PQerrorMessage(conn));
957 return -1;
960 /* Now that we've consumed some input, try again */
961 rawlen = PQgetCopyData(conn, &copybuf, 1);
962 if (rawlen == 0)
963 return 0;
965 if (rawlen == -1) /* end-of-streaming or error */
966 return -2;
967 if (rawlen == -2)
969 pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
970 return -1;
973 /* Return received messages to caller */
974 *buffer = copybuf;
975 return rawlen;
979 * Process the keepalive message.
981 static bool
982 ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
983 XLogRecPtr blockpos, TimestampTz *last_status)
985 int pos;
986 bool replyRequested;
987 TimestampTz now;
990 * Parse the keepalive message, enclosed in the CopyData message. We just
991 * check if the server requested a reply, and ignore the rest.
993 pos = 1; /* skip msgtype 'k' */
994 pos += 8; /* skip walEnd */
995 pos += 8; /* skip sendTime */
997 if (len < pos + 1)
999 pg_log_error("streaming header too small: %d", len);
1000 return false;
1002 replyRequested = copybuf[pos];
1004 /* If the server requested an immediate reply, send one. */
1005 if (replyRequested && still_sending)
1007 if (reportFlushPosition && lastFlushPosition < blockpos &&
1008 walfile != NULL)
1011 * If a valid flush location needs to be reported, flush the
1012 * current WAL file so that the latest flush location is sent back
1013 * to the server. This is necessary to see whether the last WAL
1014 * data has been successfully replicated or not, at the normal
1015 * shutdown of the server.
1017 if (stream->walmethod->sync(walfile) != 0)
1019 pg_log_fatal("could not fsync file \"%s\": %s",
1020 current_walfile_name, stream->walmethod->getlasterror());
1021 exit(1);
1023 lastFlushPosition = blockpos;
1026 now = feGetCurrentTimestamp();
1027 if (!sendFeedback(conn, blockpos, now, false))
1028 return false;
1029 *last_status = now;
1032 return true;
1036 * Process XLogData message.
1038 static bool
1039 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1040 XLogRecPtr *blockpos)
1042 int xlogoff;
1043 int bytes_left;
1044 int bytes_written;
1045 int hdr_len;
1048 * Once we've decided we don't want to receive any more, just ignore any
1049 * subsequent XLogData messages.
1051 if (!(still_sending))
1052 return true;
1055 * Read the header of the XLogData message, enclosed in the CopyData
1056 * message. We only need the WAL location field (dataStart), the rest of
1057 * the header is ignored.
1059 hdr_len = 1; /* msgtype 'w' */
1060 hdr_len += 8; /* dataStart */
1061 hdr_len += 8; /* walEnd */
1062 hdr_len += 8; /* sendTime */
1063 if (len < hdr_len)
1065 pg_log_error("streaming header too small: %d", len);
1066 return false;
1068 *blockpos = fe_recvint64(&copybuf[1]);
1070 /* Extract WAL location for this block */
1071 xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1074 * Verify that the initial location in the stream matches where we think
1075 * we are.
1077 if (walfile == NULL)
1079 /* No file open yet */
1080 if (xlogoff != 0)
1082 pg_log_error("received write-ahead log record for offset %u with no file open",
1083 xlogoff);
1084 return false;
1087 else
1089 /* More data in existing segment */
1090 if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1092 pg_log_error("got WAL data offset %08x, expected %08x",
1093 xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1094 return false;
1098 bytes_left = len - hdr_len;
1099 bytes_written = 0;
1101 while (bytes_left)
1103 int bytes_to_write;
1106 * If crossing a WAL boundary, only write up until we reach wal
1107 * segment size.
1109 if (xlogoff + bytes_left > WalSegSz)
1110 bytes_to_write = WalSegSz - xlogoff;
1111 else
1112 bytes_to_write = bytes_left;
1114 if (walfile == NULL)
1116 if (!open_walfile(stream, *blockpos))
1118 /* Error logged by open_walfile */
1119 return false;
1123 if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1124 bytes_to_write) != bytes_to_write)
1126 pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
1127 bytes_to_write, current_walfile_name,
1128 stream->walmethod->getlasterror());
1129 return false;
1132 /* Write was successful, advance our position */
1133 bytes_written += bytes_to_write;
1134 bytes_left -= bytes_to_write;
1135 *blockpos += bytes_to_write;
1136 xlogoff += bytes_to_write;
1138 /* Did we reach the end of a WAL segment? */
1139 if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1141 if (!close_walfile(stream, *blockpos))
1142 /* Error message written in close_walfile() */
1143 return false;
1145 xlogoff = 0;
1147 if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1149 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1151 pg_log_error("could not send copy-end packet: %s",
1152 PQerrorMessage(conn));
1153 return false;
1155 still_sending = false;
1156 return true; /* ignore the rest of this XLogData packet */
1160 /* No more data left to write, receive next copy packet */
1162 return true;
1166 * Handle end of the copy stream.
1168 static PGresult *
1169 HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1170 XLogRecPtr blockpos, XLogRecPtr *stoppos)
1172 PGresult *res = PQgetResult(conn);
1175 * The server closed its end of the copy stream. If we haven't closed
1176 * ours already, we need to do so now, unless the server threw an error,
1177 * in which case we don't.
1179 if (still_sending)
1181 if (!close_walfile(stream, blockpos))
1183 /* Error message written in close_walfile() */
1184 PQclear(res);
1185 return NULL;
1187 if (PQresultStatus(res) == PGRES_COPY_IN)
1189 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1191 pg_log_error("could not send copy-end packet: %s",
1192 PQerrorMessage(conn));
1193 PQclear(res);
1194 return NULL;
1196 res = PQgetResult(conn);
1198 still_sending = false;
1200 if (copybuf != NULL)
1201 PQfreemem(copybuf);
1202 *stoppos = blockpos;
1203 return res;
1207 * Check if we should continue streaming, or abort at this point.
1209 static bool
1210 CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
1212 if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1214 if (!close_walfile(stream, blockpos))
1216 /* Potential error message is written by close_walfile */
1217 return false;
1219 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1221 pg_log_error("could not send copy-end packet: %s",
1222 PQerrorMessage(conn));
1223 return false;
1225 still_sending = false;
1228 return true;
1232 * Calculate how long send/receive loops should sleep
1234 static long
1235 CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1236 TimestampTz last_status)
1238 TimestampTz status_targettime = 0;
1239 long sleeptime;
1241 if (standby_message_timeout && still_sending)
1242 status_targettime = last_status +
1243 (standby_message_timeout - 1) * ((int64) 1000);
1245 if (status_targettime > 0)
1247 long secs;
1248 int usecs;
1250 feTimestampDifference(now,
1251 status_targettime,
1252 &secs,
1253 &usecs);
1254 /* Always sleep at least 1 sec */
1255 if (secs <= 0)
1257 secs = 1;
1258 usecs = 0;
1261 sleeptime = secs * 1000 + usecs / 1000;
1263 else
1264 sleeptime = -1;
1266 return sleeptime;