1 /*-------------------------------------------------------------------------
3 * receivelog.c - receive WAL files using the streaming
4 * replication protocol.
6 * Author: Magnus Hagander <magnus@hagander.net>
8 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
11 * src/bin/pg_basebackup/receivelog.c
12 *-------------------------------------------------------------------------
15 #include "postgres_fe.h"
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
23 #include "access/xlog_internal.h"
24 #include "common/file_utils.h"
25 #include "common/logging.h"
27 #include "receivelog.h"
28 #include "streamutil.h"
30 /* fd and filename for currently open WAL file */
31 static Walfile
*walfile
= NULL
;
32 static char current_walfile_name
[MAXPGPATH
] = "";
33 static bool reportFlushPosition
= false;
34 static XLogRecPtr lastFlushPosition
= InvalidXLogRecPtr
;
36 static bool still_sending
= true; /* feedback still needs to be sent? */
38 static PGresult
*HandleCopyStream(PGconn
*conn
, StreamCtl
*stream
,
40 static int CopyStreamPoll(PGconn
*conn
, long timeout_ms
, pgsocket stop_socket
);
41 static int CopyStreamReceive(PGconn
*conn
, long timeout
, pgsocket stop_socket
,
43 static bool ProcessKeepaliveMsg(PGconn
*conn
, StreamCtl
*stream
, char *copybuf
,
44 int len
, XLogRecPtr blockpos
, TimestampTz
*last_status
);
45 static bool ProcessXLogDataMsg(PGconn
*conn
, StreamCtl
*stream
, char *copybuf
, int len
,
46 XLogRecPtr
*blockpos
);
47 static PGresult
*HandleEndOfCopyStream(PGconn
*conn
, StreamCtl
*stream
, char *copybuf
,
48 XLogRecPtr blockpos
, XLogRecPtr
*stoppos
);
49 static bool CheckCopyStreamStop(PGconn
*conn
, StreamCtl
*stream
, XLogRecPtr blockpos
);
50 static long CalculateCopyStreamSleeptime(TimestampTz now
, int standby_message_timeout
,
51 TimestampTz last_status
);
53 static bool ReadEndOfStreamingResult(PGresult
*res
, XLogRecPtr
*startpos
,
57 mark_file_as_archived(StreamCtl
*stream
, const char *fname
)
60 static char tmppath
[MAXPGPATH
];
62 snprintf(tmppath
, sizeof(tmppath
), "archive_status/%s.done",
65 f
= stream
->walmethod
->open_for_write(tmppath
, NULL
, 0);
68 pg_log_error("could not create archive status file \"%s\": %s",
69 tmppath
, stream
->walmethod
->getlasterror());
73 stream
->walmethod
->close(f
, CLOSE_NORMAL
);
79 * Open a new WAL file in the specified directory.
81 * Returns true if OK; on failure, returns false after printing an error msg.
82 * On success, 'walfile' is set to the FD for the file, and the base filename
83 * (without partial_suffix) is stored in 'current_walfile_name'.
85 * The file will be padded to 16Mb with zeroes.
88 open_walfile(StreamCtl
*stream
, XLogRecPtr startpoint
)
95 XLByteToSeg(startpoint
, segno
, WalSegSz
);
96 XLogFileName(current_walfile_name
, stream
->timeline
, segno
, WalSegSz
);
98 /* Note that this considers the compression used if necessary */
99 fn
= stream
->walmethod
->get_file_name(current_walfile_name
,
100 stream
->partial_suffix
);
103 * When streaming to files, if an existing file exists we verify that it's
104 * either empty (just created), or a complete WalSegSz segment (in which
105 * case it has been created and padded). Anything else indicates a corrupt
106 * file. Compressed files have no need for padding, so just ignore this
109 * When streaming to tar, no file with this name will exist before, so we
110 * never have to verify a size.
112 if (stream
->walmethod
->compression() == 0 &&
113 stream
->walmethod
->existsfile(fn
))
115 size
= stream
->walmethod
->get_file_size(fn
);
118 pg_log_error("could not get size of write-ahead log file \"%s\": %s",
119 fn
, stream
->walmethod
->getlasterror());
123 if (size
== WalSegSz
)
125 /* Already padded file. Open it for use */
126 f
= stream
->walmethod
->open_for_write(current_walfile_name
, stream
->partial_suffix
, 0);
129 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
130 fn
, stream
->walmethod
->getlasterror());
135 /* fsync file in case of a previous crash */
136 if (stream
->walmethod
->sync(f
) != 0)
138 pg_log_fatal("could not fsync existing write-ahead log file \"%s\": %s",
139 fn
, stream
->walmethod
->getlasterror());
140 stream
->walmethod
->close(f
, CLOSE_UNLINK
);
150 /* if write didn't set errno, assume problem is no disk space */
153 pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
154 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
160 /* File existed and was empty, so fall through and open */
163 /* No file existed, so create one */
165 f
= stream
->walmethod
->open_for_write(current_walfile_name
,
166 stream
->partial_suffix
, WalSegSz
);
169 pg_log_error("could not open write-ahead log file \"%s\": %s",
170 fn
, stream
->walmethod
->getlasterror());
181 * Close the current WAL file (if open), and rename it to the correct
182 * filename if it's complete. On failure, prints an error message to stderr
183 * and returns false, otherwise returns true.
186 close_walfile(StreamCtl
*stream
, XLogRecPtr pos
)
194 currpos
= stream
->walmethod
->get_current_pos(walfile
);
197 pg_log_error("could not determine seek position in file \"%s\": %s",
198 current_walfile_name
, stream
->walmethod
->getlasterror());
199 stream
->walmethod
->close(walfile
, CLOSE_UNLINK
);
205 if (stream
->partial_suffix
)
207 if (currpos
== WalSegSz
)
208 r
= stream
->walmethod
->close(walfile
, CLOSE_NORMAL
);
211 pg_log_info("not renaming \"%s%s\", segment is not complete",
212 current_walfile_name
, stream
->partial_suffix
);
213 r
= stream
->walmethod
->close(walfile
, CLOSE_NO_RENAME
);
217 r
= stream
->walmethod
->close(walfile
, CLOSE_NORMAL
);
223 pg_log_error("could not close file \"%s\": %s",
224 current_walfile_name
, stream
->walmethod
->getlasterror());
229 * Mark file as archived if requested by the caller - pg_basebackup needs
230 * to do so as files can otherwise get archived again after promotion of a
231 * new node. This is in line with walreceiver.c always doing a
232 * XLogArchiveForceDone() after a complete segment.
234 if (currpos
== WalSegSz
&& stream
->mark_done
)
236 /* writes error message if failed */
237 if (!mark_file_as_archived(stream
, current_walfile_name
))
241 lastFlushPosition
= pos
;
247 * Check if a timeline history file exists.
250 existsTimeLineHistoryFile(StreamCtl
*stream
)
252 char histfname
[MAXFNAMELEN
];
255 * Timeline 1 never has a history file. We treat that as if it existed,
256 * since we never need to stream it.
258 if (stream
->timeline
== 1)
261 TLHistoryFileName(histfname
, stream
->timeline
);
263 return stream
->walmethod
->existsfile(histfname
);
267 writeTimeLineHistoryFile(StreamCtl
*stream
, char *filename
, char *content
)
269 int size
= strlen(content
);
270 char histfname
[MAXFNAMELEN
];
274 * Check that the server's idea of how timeline history files should be
275 * named matches ours.
277 TLHistoryFileName(histfname
, stream
->timeline
);
278 if (strcmp(histfname
, filename
) != 0)
280 pg_log_error("server reported unexpected history file name for timeline %u: %s",
281 stream
->timeline
, filename
);
285 f
= stream
->walmethod
->open_for_write(histfname
, ".tmp", 0);
288 pg_log_error("could not create timeline history file \"%s\": %s",
289 histfname
, stream
->walmethod
->getlasterror());
293 if ((int) stream
->walmethod
->write(f
, content
, size
) != size
)
295 pg_log_error("could not write timeline history file \"%s\": %s",
296 histfname
, stream
->walmethod
->getlasterror());
299 * If we fail to make the file, delete it to release disk space
301 stream
->walmethod
->close(f
, CLOSE_UNLINK
);
306 if (stream
->walmethod
->close(f
, CLOSE_NORMAL
) != 0)
308 pg_log_error("could not close file \"%s\": %s",
309 histfname
, stream
->walmethod
->getlasterror());
313 /* Maintain archive_status, check close_walfile() for details. */
314 if (stream
->mark_done
)
316 /* writes error message if failed */
317 if (!mark_file_as_archived(stream
, histfname
))
325 * Send a Standby Status Update message to server.
328 sendFeedback(PGconn
*conn
, XLogRecPtr blockpos
, TimestampTz now
, bool replyRequested
)
330 char replybuf
[1 + 8 + 8 + 8 + 8 + 1];
335 fe_sendint64(blockpos
, &replybuf
[len
]); /* write */
337 if (reportFlushPosition
)
338 fe_sendint64(lastFlushPosition
, &replybuf
[len
]); /* flush */
340 fe_sendint64(InvalidXLogRecPtr
, &replybuf
[len
]); /* flush */
342 fe_sendint64(InvalidXLogRecPtr
, &replybuf
[len
]); /* apply */
344 fe_sendint64(now
, &replybuf
[len
]); /* sendTime */
346 replybuf
[len
] = replyRequested
? 1 : 0; /* replyRequested */
349 if (PQputCopyData(conn
, replybuf
, len
) <= 0 || PQflush(conn
))
351 pg_log_error("could not send feedback packet: %s",
352 PQerrorMessage(conn
));
360 * Check that the server version we're connected to is supported by
361 * ReceiveXlogStream().
363 * If it's not, an error message is printed to stderr, and false is returned.
366 CheckServerVersionForStreaming(PGconn
*conn
)
373 * The message format used in streaming replication changed in 9.3, so we
374 * cannot stream from older servers. And we don't support servers newer
375 * than the client; it might work, but we don't know, so err on the safe
378 minServerMajor
= 903;
379 maxServerMajor
= PG_VERSION_NUM
/ 100;
380 serverMajor
= PQserverVersion(conn
) / 100;
381 if (serverMajor
< minServerMajor
)
383 const char *serverver
= PQparameterStatus(conn
, "server_version");
385 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
386 serverver
? serverver
: "'unknown'",
390 else if (serverMajor
> maxServerMajor
)
392 const char *serverver
= PQparameterStatus(conn
, "server_version");
394 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
395 serverver
? serverver
: "'unknown'",
403 * Receive a log stream starting at the specified position.
405 * Individual parameters are passed through the StreamCtl structure.
407 * If sysidentifier is specified, validate that both the system
408 * identifier and the timeline matches the specified ones
409 * (by sending an extra IDENTIFY_SYSTEM command)
411 * All received segments will be written to the directory
412 * specified by basedir. This will also fetch any missing timeline history
415 * The stream_stop callback will be called every time data
416 * is received, and whenever a segment is completed. If it returns
417 * true, the streaming will stop and the function
418 * return. As long as it returns false, streaming will continue
421 * If stream_stop() checks for external input, stop_socket should be set to
422 * the FD it checks. This will allow such input to be detected promptly
423 * rather than after standby_message_timeout (which might be indefinite).
424 * Note that signals will interrupt waits for input as well, but that is
425 * race-y since a signal received while busy won't interrupt the wait.
427 * standby_message_timeout controls how often we send a message
428 * back to the primary letting it know our progress, in milliseconds.
429 * Zero means no messages are sent.
430 * This message will only contain the write location, and never
433 * If 'partial_suffix' is not NULL, files are initially created with the
434 * given suffix, and the suffix is removed once the file is finished. That
435 * allows you to tell the difference between partial and completed files,
436 * so that you can continue later where you left.
438 * If 'synchronous' is true, the received WAL is flushed as soon as written,
439 * otherwise only when the WAL file is closed.
441 * Note: The WAL location *must* be at a log segment start!
444 ReceiveXlogStream(PGconn
*conn
, StreamCtl
*stream
)
452 * The caller should've checked the server version already, but doesn't do
453 * any harm to check it here too.
455 if (!CheckServerVersionForStreaming(conn
))
459 * Decide whether we want to report the flush position. If we report the
460 * flush position, the primary will know what WAL we'll possibly
461 * re-request, and it can then remove older WAL safely. We must always do
462 * that when we are using slots.
464 * Reporting the flush position makes one eligible as a synchronous
465 * replica. People shouldn't include generic names in
466 * synchronous_standby_names, but we've protected them against it so far,
467 * so let's continue to do so unless specifically requested.
469 if (stream
->replication_slot
!= NULL
)
471 reportFlushPosition
= true;
472 sprintf(slotcmd
, "SLOT \"%s\" ", stream
->replication_slot
);
476 if (stream
->synchronous
)
477 reportFlushPosition
= true;
479 reportFlushPosition
= false;
483 if (stream
->sysidentifier
!= NULL
)
485 char *sysidentifier
= NULL
;
486 TimeLineID servertli
;
489 * Get the server system identifier and timeline, and validate them.
491 if (!RunIdentifySystem(conn
, &sysidentifier
, &servertli
, NULL
, NULL
))
493 pg_free(sysidentifier
);
497 if (strcmp(stream
->sysidentifier
, sysidentifier
) != 0)
499 pg_log_error("system identifier does not match between base backup and streaming connection");
500 pg_free(sysidentifier
);
503 pg_free(sysidentifier
);
505 if (stream
->timeline
> servertli
)
507 pg_log_error("starting timeline %u is not present in the server",
514 * initialize flush position to starting point, it's the caller's
515 * responsibility that that's sane.
517 lastFlushPosition
= stream
->startpos
;
522 * Fetch the timeline history file for this timeline, if we don't have
523 * it already. When streaming log to tar, this will always return
524 * false, as we are never streaming into an existing file and
525 * therefore there can be no pre-existing timeline history file.
527 if (!existsTimeLineHistoryFile(stream
))
529 snprintf(query
, sizeof(query
), "TIMELINE_HISTORY %u", stream
->timeline
);
530 res
= PQexec(conn
, query
);
531 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
533 /* FIXME: we might send it ok, but get an error */
534 pg_log_error("could not send replication command \"%s\": %s",
535 "TIMELINE_HISTORY", PQresultErrorMessage(res
));
541 * The response to TIMELINE_HISTORY is a single row result set
542 * with two fields: filename and content
544 if (PQnfields(res
) != 2 || PQntuples(res
) != 1)
546 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
547 PQntuples(res
), PQnfields(res
), 1, 2);
550 /* Write the history file to disk */
551 writeTimeLineHistoryFile(stream
,
552 PQgetvalue(res
, 0, 0),
553 PQgetvalue(res
, 0, 1));
559 * Before we start streaming from the requested location, check if the
560 * callback tells us to stop here.
562 if (stream
->stream_stop(stream
->startpos
, stream
->timeline
, false))
565 /* Initiate the replication stream at specified location */
566 snprintf(query
, sizeof(query
), "START_REPLICATION %s%X/%X TIMELINE %u",
568 LSN_FORMAT_ARGS(stream
->startpos
),
570 res
= PQexec(conn
, query
);
571 if (PQresultStatus(res
) != PGRES_COPY_BOTH
)
573 pg_log_error("could not send replication command \"%s\": %s",
574 "START_REPLICATION", PQresultErrorMessage(res
));
581 res
= HandleCopyStream(conn
, stream
, &stoppos
);
586 * Streaming finished.
588 * There are two possible reasons for that: a controlled shutdown, or
589 * we reached the end of the current timeline. In case of
590 * end-of-timeline, the server sends a result set after Copy has
591 * finished, containing information about the next timeline. Read
592 * that, and restart streaming from the next timeline. In case of
593 * controlled shutdown, stop here.
595 if (PQresultStatus(res
) == PGRES_TUPLES_OK
)
598 * End-of-timeline. Read the next timeline's ID and starting
599 * position. Usually, the starting position will match the end of
600 * the previous timeline, but there are corner cases like if the
601 * server had sent us half of a WAL record, when it was promoted.
602 * The new timeline will begin at the end of the last complete
603 * record in that case, overlapping the partial WAL record on the
609 parsed
= ReadEndOfStreamingResult(res
, &stream
->startpos
, &newtimeline
);
614 /* Sanity check the values the server gave us */
615 if (newtimeline
<= stream
->timeline
)
617 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
618 newtimeline
, stream
->timeline
);
621 if (stream
->startpos
> stoppos
)
623 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
624 stream
->timeline
, LSN_FORMAT_ARGS(stoppos
),
625 newtimeline
, LSN_FORMAT_ARGS(stream
->startpos
));
629 /* Read the final result, which should be CommandComplete. */
630 res
= PQgetResult(conn
);
631 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
633 pg_log_error("unexpected termination of replication stream: %s",
634 PQresultErrorMessage(res
));
641 * Loop back to start streaming from the new timeline. Always
642 * start streaming at the beginning of a segment.
644 stream
->timeline
= newtimeline
;
645 stream
->startpos
= stream
->startpos
-
646 XLogSegmentOffset(stream
->startpos
, WalSegSz
);
649 else if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
654 * End of replication (ie. controlled shut down of the server).
656 * Check if the callback thinks it's OK to stop here. If not,
659 if (stream
->stream_stop(stoppos
, stream
->timeline
, false))
663 pg_log_error("replication stream was terminated before stop point");
669 /* Server returned an error. */
670 pg_log_error("unexpected termination of replication stream: %s",
671 PQresultErrorMessage(res
));
678 if (walfile
!= NULL
&& stream
->walmethod
->close(walfile
, CLOSE_NO_RENAME
) != 0)
679 pg_log_error("could not close file \"%s\": %s",
680 current_walfile_name
, stream
->walmethod
->getlasterror());
686 * Helper function to parse the result set returned by server after streaming
687 * has finished. On failure, prints an error to stderr and returns false.
690 ReadEndOfStreamingResult(PGresult
*res
, XLogRecPtr
*startpos
, uint32
*timeline
)
692 uint32 startpos_xlogid
,
696 * The result set consists of one row and two columns, e.g:
698 * next_tli | next_tli_startpos
699 * ----------+-------------------
702 * next_tli is the timeline ID of the next timeline after the one that
703 * just finished streaming. next_tli_startpos is the WAL location where
704 * the server switched to it.
707 if (PQnfields(res
) < 2 || PQntuples(res
) != 1)
709 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
710 PQntuples(res
), PQnfields(res
), 1, 2);
714 *timeline
= atoi(PQgetvalue(res
, 0, 0));
715 if (sscanf(PQgetvalue(res
, 0, 1), "%X/%X", &startpos_xlogid
,
716 &startpos_xrecoff
) != 2)
718 pg_log_error("could not parse next timeline's starting point \"%s\"",
719 PQgetvalue(res
, 0, 1));
722 *startpos
= ((uint64
) startpos_xlogid
<< 32) | startpos_xrecoff
;
728 * The main loop of ReceiveXlogStream. Handles the COPY stream after
729 * initiating streaming with the START_REPLICATION command.
731 * If the COPY ends (not necessarily successfully) due a message from the
732 * server, returns a PGresult and sets *stoppos to the last byte written.
733 * On any other sort of error, returns NULL.
736 HandleCopyStream(PGconn
*conn
, StreamCtl
*stream
,
739 char *copybuf
= NULL
;
740 TimestampTz last_status
= -1;
741 XLogRecPtr blockpos
= stream
->startpos
;
743 still_sending
= true;
752 * Check if we should continue streaming, or abort at this point.
754 if (!CheckCopyStreamStop(conn
, stream
, blockpos
))
757 now
= feGetCurrentTimestamp();
760 * If synchronous option is true, issue sync command as soon as there
761 * are WAL data which has not been flushed yet.
763 if (stream
->synchronous
&& lastFlushPosition
< blockpos
&& walfile
!= NULL
)
765 if (stream
->walmethod
->sync(walfile
) != 0)
767 pg_log_fatal("could not fsync file \"%s\": %s",
768 current_walfile_name
, stream
->walmethod
->getlasterror());
771 lastFlushPosition
= blockpos
;
774 * Send feedback so that the server sees the latest WAL locations
777 if (!sendFeedback(conn
, blockpos
, now
, false))
783 * Potentially send a status message to the primary
785 if (still_sending
&& stream
->standby_message_timeout
> 0 &&
786 feTimestampDifferenceExceeds(last_status
, now
,
787 stream
->standby_message_timeout
))
789 /* Time to send feedback! */
790 if (!sendFeedback(conn
, blockpos
, now
, false))
796 * Calculate how long send/receive loops should sleep
798 sleeptime
= CalculateCopyStreamSleeptime(now
, stream
->standby_message_timeout
,
801 r
= CopyStreamReceive(conn
, sleeptime
, stream
->stop_socket
, ©buf
);
808 PGresult
*res
= HandleEndOfCopyStream(conn
, stream
, copybuf
, blockpos
, stoppos
);
816 /* Check the message type. */
817 if (copybuf
[0] == 'k')
819 if (!ProcessKeepaliveMsg(conn
, stream
, copybuf
, r
, blockpos
,
823 else if (copybuf
[0] == 'w')
825 if (!ProcessXLogDataMsg(conn
, stream
, copybuf
, r
, &blockpos
))
829 * Check if we should continue streaming, or abort at this
832 if (!CheckCopyStreamStop(conn
, stream
, blockpos
))
837 pg_log_error("unrecognized streaming header: \"%c\"",
843 * Process the received data, and any subsequent data we can read
846 r
= CopyStreamReceive(conn
, 0, stream
->stop_socket
, ©buf
);
857 * Wait until we can read a CopyData message,
858 * or timeout, or occurrence of a signal or input on the stop_socket.
859 * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
861 * Returns 1 if data has become available for reading, 0 if timed out
862 * or interrupted by signal or stop_socket input, and -1 on an error.
865 CopyStreamPoll(PGconn
*conn
, long timeout_ms
, pgsocket stop_socket
)
871 struct timeval timeout
;
872 struct timeval
*timeoutptr
;
874 connsocket
= PQsocket(conn
);
877 pg_log_error("invalid socket: %s", PQerrorMessage(conn
));
881 FD_ZERO(&input_mask
);
882 FD_SET(connsocket
, &input_mask
);
884 if (stop_socket
!= PGINVALID_SOCKET
)
886 FD_SET(stop_socket
, &input_mask
);
887 maxfd
= Max(maxfd
, stop_socket
);
894 timeout
.tv_sec
= timeout_ms
/ 1000L;
895 timeout
.tv_usec
= (timeout_ms
% 1000L) * 1000L;
896 timeoutptr
= &timeout
;
899 ret
= select(maxfd
+ 1, &input_mask
, NULL
, NULL
, timeoutptr
);
904 return 0; /* Got a signal, so not an error */
905 pg_log_error("%s() failed: %m", "select");
908 if (ret
> 0 && FD_ISSET(connsocket
, &input_mask
))
909 return 1; /* Got input on connection socket */
911 return 0; /* Got timeout or input on stop_socket */
915 * Receive CopyData message available from XLOG stream, blocking for
916 * maximum of 'timeout' ms.
918 * If data was received, returns the length of the data. *buffer is set to
919 * point to a buffer holding the received message. The buffer is only valid
920 * until the next CopyStreamReceive call.
922 * Returns 0 if no data was available within timeout, or if wait was
923 * interrupted by signal or stop_socket input.
924 * -1 on error. -2 if the server ended the COPY.
927 CopyStreamReceive(PGconn
*conn
, long timeout
, pgsocket stop_socket
,
930 char *copybuf
= NULL
;
937 /* Try to receive a CopyData message */
938 rawlen
= PQgetCopyData(conn
, ©buf
, 1);
944 * No data available. Wait for some to appear, but not longer than
945 * the specified timeout, so that we can ping the server. Also stop
946 * waiting if input appears on stop_socket.
948 ret
= CopyStreamPoll(conn
, timeout
, stop_socket
);
952 /* Now there is actually data on the socket */
953 if (PQconsumeInput(conn
) == 0)
955 pg_log_error("could not receive data from WAL stream: %s",
956 PQerrorMessage(conn
));
960 /* Now that we've consumed some input, try again */
961 rawlen
= PQgetCopyData(conn
, ©buf
, 1);
965 if (rawlen
== -1) /* end-of-streaming or error */
969 pg_log_error("could not read COPY data: %s", PQerrorMessage(conn
));
973 /* Return received messages to caller */
979 * Process the keepalive message.
982 ProcessKeepaliveMsg(PGconn
*conn
, StreamCtl
*stream
, char *copybuf
, int len
,
983 XLogRecPtr blockpos
, TimestampTz
*last_status
)
990 * Parse the keepalive message, enclosed in the CopyData message. We just
991 * check if the server requested a reply, and ignore the rest.
993 pos
= 1; /* skip msgtype 'k' */
994 pos
+= 8; /* skip walEnd */
995 pos
+= 8; /* skip sendTime */
999 pg_log_error("streaming header too small: %d", len
);
1002 replyRequested
= copybuf
[pos
];
1004 /* If the server requested an immediate reply, send one. */
1005 if (replyRequested
&& still_sending
)
1007 if (reportFlushPosition
&& lastFlushPosition
< blockpos
&&
1011 * If a valid flush location needs to be reported, flush the
1012 * current WAL file so that the latest flush location is sent back
1013 * to the server. This is necessary to see whether the last WAL
1014 * data has been successfully replicated or not, at the normal
1015 * shutdown of the server.
1017 if (stream
->walmethod
->sync(walfile
) != 0)
1019 pg_log_fatal("could not fsync file \"%s\": %s",
1020 current_walfile_name
, stream
->walmethod
->getlasterror());
1023 lastFlushPosition
= blockpos
;
1026 now
= feGetCurrentTimestamp();
1027 if (!sendFeedback(conn
, blockpos
, now
, false))
1036 * Process XLogData message.
1039 ProcessXLogDataMsg(PGconn
*conn
, StreamCtl
*stream
, char *copybuf
, int len
,
1040 XLogRecPtr
*blockpos
)
1048 * Once we've decided we don't want to receive any more, just ignore any
1049 * subsequent XLogData messages.
1051 if (!(still_sending
))
1055 * Read the header of the XLogData message, enclosed in the CopyData
1056 * message. We only need the WAL location field (dataStart), the rest of
1057 * the header is ignored.
1059 hdr_len
= 1; /* msgtype 'w' */
1060 hdr_len
+= 8; /* dataStart */
1061 hdr_len
+= 8; /* walEnd */
1062 hdr_len
+= 8; /* sendTime */
1065 pg_log_error("streaming header too small: %d", len
);
1068 *blockpos
= fe_recvint64(©buf
[1]);
1070 /* Extract WAL location for this block */
1071 xlogoff
= XLogSegmentOffset(*blockpos
, WalSegSz
);
1074 * Verify that the initial location in the stream matches where we think
1077 if (walfile
== NULL
)
1079 /* No file open yet */
1082 pg_log_error("received write-ahead log record for offset %u with no file open",
1089 /* More data in existing segment */
1090 if (stream
->walmethod
->get_current_pos(walfile
) != xlogoff
)
1092 pg_log_error("got WAL data offset %08x, expected %08x",
1093 xlogoff
, (int) stream
->walmethod
->get_current_pos(walfile
));
1098 bytes_left
= len
- hdr_len
;
1106 * If crossing a WAL boundary, only write up until we reach wal
1109 if (xlogoff
+ bytes_left
> WalSegSz
)
1110 bytes_to_write
= WalSegSz
- xlogoff
;
1112 bytes_to_write
= bytes_left
;
1114 if (walfile
== NULL
)
1116 if (!open_walfile(stream
, *blockpos
))
1118 /* Error logged by open_walfile */
1123 if (stream
->walmethod
->write(walfile
, copybuf
+ hdr_len
+ bytes_written
,
1124 bytes_to_write
) != bytes_to_write
)
1126 pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
1127 bytes_to_write
, current_walfile_name
,
1128 stream
->walmethod
->getlasterror());
1132 /* Write was successful, advance our position */
1133 bytes_written
+= bytes_to_write
;
1134 bytes_left
-= bytes_to_write
;
1135 *blockpos
+= bytes_to_write
;
1136 xlogoff
+= bytes_to_write
;
1138 /* Did we reach the end of a WAL segment? */
1139 if (XLogSegmentOffset(*blockpos
, WalSegSz
) == 0)
1141 if (!close_walfile(stream
, *blockpos
))
1142 /* Error message written in close_walfile() */
1147 if (still_sending
&& stream
->stream_stop(*blockpos
, stream
->timeline
, true))
1149 if (PQputCopyEnd(conn
, NULL
) <= 0 || PQflush(conn
))
1151 pg_log_error("could not send copy-end packet: %s",
1152 PQerrorMessage(conn
));
1155 still_sending
= false;
1156 return true; /* ignore the rest of this XLogData packet */
1160 /* No more data left to write, receive next copy packet */
1166 * Handle end of the copy stream.
1169 HandleEndOfCopyStream(PGconn
*conn
, StreamCtl
*stream
, char *copybuf
,
1170 XLogRecPtr blockpos
, XLogRecPtr
*stoppos
)
1172 PGresult
*res
= PQgetResult(conn
);
1175 * The server closed its end of the copy stream. If we haven't closed
1176 * ours already, we need to do so now, unless the server threw an error,
1177 * in which case we don't.
1181 if (!close_walfile(stream
, blockpos
))
1183 /* Error message written in close_walfile() */
1187 if (PQresultStatus(res
) == PGRES_COPY_IN
)
1189 if (PQputCopyEnd(conn
, NULL
) <= 0 || PQflush(conn
))
1191 pg_log_error("could not send copy-end packet: %s",
1192 PQerrorMessage(conn
));
1196 res
= PQgetResult(conn
);
1198 still_sending
= false;
1200 if (copybuf
!= NULL
)
1202 *stoppos
= blockpos
;
1207 * Check if we should continue streaming, or abort at this point.
1210 CheckCopyStreamStop(PGconn
*conn
, StreamCtl
*stream
, XLogRecPtr blockpos
)
1212 if (still_sending
&& stream
->stream_stop(blockpos
, stream
->timeline
, false))
1214 if (!close_walfile(stream
, blockpos
))
1216 /* Potential error message is written by close_walfile */
1219 if (PQputCopyEnd(conn
, NULL
) <= 0 || PQflush(conn
))
1221 pg_log_error("could not send copy-end packet: %s",
1222 PQerrorMessage(conn
));
1225 still_sending
= false;
1232 * Calculate how long send/receive loops should sleep
1235 CalculateCopyStreamSleeptime(TimestampTz now
, int standby_message_timeout
,
1236 TimestampTz last_status
)
1238 TimestampTz status_targettime
= 0;
1241 if (standby_message_timeout
&& still_sending
)
1242 status_targettime
= last_status
+
1243 (standby_message_timeout
- 1) * ((int64
) 1000);
1245 if (status_targettime
> 0)
1250 feTimestampDifference(now
,
1254 /* Always sleep at least 1 sec */
1261 sleeptime
= secs
* 1000 + usecs
/ 1000;