1 /*-------------------------------------------------------------------------
3 * walmethods.c - implementations of different ways to write received wal
5 * NOTE! The caller must ensure that only one method is instantiated in
6 * any given program, and that it's only instantiated once!
8 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
11 * src/bin/pg_basebackup/walmethods.c
12 *-------------------------------------------------------------------------
15 #include "postgres_fe.h"
24 #include "common/file_perm.h"
25 #include "common/file_utils.h"
27 #include "receivelog.h"
28 #include "streamutil.h"
30 /* Size of zlib buffer for .tar.gz */
31 #define ZLIB_OUT_SIZE 4096
33 /*-------------------------------------------------------------------------
34 * WalDirectoryMethod - write wal to a directory looking like pg_wal
35 *-------------------------------------------------------------------------
39 * Global static data for this method
41 typedef struct DirectoryMethodData
46 } DirectoryMethodData
;
47 static DirectoryMethodData
*dir_data
= NULL
;
52 typedef struct DirectoryMethodFile
62 } DirectoryMethodFile
;
65 dir_getlasterror(void)
67 /* Directory method always sets errno, so just use strerror */
68 return strerror(errno
);
72 dir_get_file_name(const char *pathname
, const char *temp_suffix
)
74 char *filename
= pg_malloc0(MAXPGPATH
* sizeof(char));
76 snprintf(filename
, MAXPGPATH
, "%s%s%s",
77 pathname
, dir_data
->compression
> 0 ? ".gz" : "",
78 temp_suffix
? temp_suffix
: "");
84 dir_open_for_write(const char *pathname
, const char *temp_suffix
, size_t pad_to_size
)
86 static char tmppath
[MAXPGPATH
];
89 DirectoryMethodFile
*f
;
94 filename
= dir_get_file_name(pathname
, temp_suffix
);
95 snprintf(tmppath
, sizeof(tmppath
), "%s/%s",
96 dir_data
->basedir
, filename
);
100 * Open a file for non-compressed as well as compressed files. Tracking
101 * the file descriptor is important for dir_sync() method as gzflush()
102 * does not do any system calls to fsync() to make changes permanent on
105 fd
= open(tmppath
, O_WRONLY
| O_CREAT
| PG_BINARY
, pg_file_create_mode
);
110 if (dir_data
->compression
> 0)
112 gzfp
= gzdopen(fd
, "wb");
119 if (gzsetparams(gzfp
, dir_data
->compression
,
120 Z_DEFAULT_STRATEGY
) != Z_OK
)
128 /* Do pre-padding on non-compressed files */
129 if (pad_to_size
&& dir_data
->compression
== 0)
131 PGAlignedXLogBlock zerobuf
;
134 memset(zerobuf
.data
, 0, XLOG_BLCKSZ
);
135 for (bytes
= 0; bytes
< pad_to_size
; bytes
+= XLOG_BLCKSZ
)
138 if (write(fd
, zerobuf
.data
, XLOG_BLCKSZ
) != XLOG_BLCKSZ
)
140 int save_errno
= errno
;
145 * If write didn't set errno, assume problem is no disk space.
147 errno
= save_errno
? save_errno
: ENOSPC
;
152 if (lseek(fd
, 0, SEEK_SET
) != 0)
154 int save_errno
= errno
;
163 * fsync WAL file and containing directory, to ensure the file is
164 * persistently created and zeroed (if padded). That's particularly
165 * important when using synchronous mode, where the file is modified and
166 * fsynced in-place, without a directory fsync.
170 if (fsync_fname(tmppath
, false) != 0 ||
171 fsync_parent_path(tmppath
) != 0)
174 if (dir_data
->compression
> 0)
183 f
= pg_malloc0(sizeof(DirectoryMethodFile
));
185 if (dir_data
->compression
> 0)
190 f
->pathname
= pg_strdup(pathname
);
191 f
->fullpath
= pg_strdup(tmppath
);
193 f
->temp_suffix
= pg_strdup(temp_suffix
);
199 dir_write(Walfile f
, const void *buf
, size_t count
)
202 DirectoryMethodFile
*df
= (DirectoryMethodFile
*) f
;
207 if (dir_data
->compression
> 0)
208 r
= (ssize_t
) gzwrite(df
->gzfp
, buf
, count
);
211 r
= write(df
->fd
, buf
, count
);
218 dir_get_current_pos(Walfile f
)
222 /* Use a cached value to prevent lots of reseeks */
223 return ((DirectoryMethodFile
*) f
)->currpos
;
227 dir_close(Walfile f
, WalCloseMethod method
)
230 DirectoryMethodFile
*df
= (DirectoryMethodFile
*) f
;
231 static char tmppath
[MAXPGPATH
];
232 static char tmppath2
[MAXPGPATH
];
237 if (dir_data
->compression
> 0)
238 r
= gzclose(df
->gzfp
);
245 /* Build path to the current version of the file */
246 if (method
== CLOSE_NORMAL
&& df
->temp_suffix
)
252 * If we have a temp prefix, normal operation is to rename the
255 filename
= dir_get_file_name(df
->pathname
, df
->temp_suffix
);
256 snprintf(tmppath
, sizeof(tmppath
), "%s/%s",
257 dir_data
->basedir
, filename
);
260 /* permanent name, so no need for the prefix */
261 filename2
= dir_get_file_name(df
->pathname
, NULL
);
262 snprintf(tmppath2
, sizeof(tmppath2
), "%s/%s",
263 dir_data
->basedir
, filename2
);
265 r
= durable_rename(tmppath
, tmppath2
);
267 else if (method
== CLOSE_UNLINK
)
271 /* Unlink the file once it's closed */
272 filename
= dir_get_file_name(df
->pathname
, df
->temp_suffix
);
273 snprintf(tmppath
, sizeof(tmppath
), "%s/%s",
274 dir_data
->basedir
, filename
);
281 * Else either CLOSE_NORMAL and no temp suffix, or
282 * CLOSE_NO_RENAME. In this case, fsync the file and containing
283 * directory if sync mode is requested.
287 r
= fsync_fname(df
->fullpath
, false);
289 r
= fsync_parent_path(df
->fullpath
);
294 pg_free(df
->pathname
);
295 pg_free(df
->fullpath
);
297 pg_free(df
->temp_suffix
);
312 if (dir_data
->compression
> 0)
314 if (gzflush(((DirectoryMethodFile
*) f
)->gzfp
, Z_SYNC_FLUSH
) != Z_OK
)
319 return fsync(((DirectoryMethodFile
*) f
)->fd
);
323 dir_get_file_size(const char *pathname
)
326 static char tmppath
[MAXPGPATH
];
328 snprintf(tmppath
, sizeof(tmppath
), "%s/%s",
329 dir_data
->basedir
, pathname
);
331 if (stat(tmppath
, &statbuf
) != 0)
334 return statbuf
.st_size
;
338 dir_compression(void)
340 return dir_data
->compression
;
344 dir_existsfile(const char *pathname
)
346 static char tmppath
[MAXPGPATH
];
349 snprintf(tmppath
, sizeof(tmppath
), "%s/%s",
350 dir_data
->basedir
, pathname
);
352 fd
= open(tmppath
, O_RDONLY
| PG_BINARY
, 0);
365 * Files are fsynced when they are closed, but we need to fsync the
366 * directory entry here as well.
368 if (fsync_fname(dir_data
->basedir
, true) != 0)
376 CreateWalDirectoryMethod(const char *basedir
, int compression
, bool sync
)
378 WalWriteMethod
*method
;
380 method
= pg_malloc0(sizeof(WalWriteMethod
));
381 method
->open_for_write
= dir_open_for_write
;
382 method
->write
= dir_write
;
383 method
->get_current_pos
= dir_get_current_pos
;
384 method
->get_file_size
= dir_get_file_size
;
385 method
->get_file_name
= dir_get_file_name
;
386 method
->compression
= dir_compression
;
387 method
->close
= dir_close
;
388 method
->sync
= dir_sync
;
389 method
->existsfile
= dir_existsfile
;
390 method
->finish
= dir_finish
;
391 method
->getlasterror
= dir_getlasterror
;
393 dir_data
= pg_malloc0(sizeof(DirectoryMethodData
));
394 dir_data
->compression
= compression
;
395 dir_data
->basedir
= pg_strdup(basedir
);
396 dir_data
->sync
= sync
;
402 FreeWalDirectoryMethod(void)
404 pg_free(dir_data
->basedir
);
409 /*-------------------------------------------------------------------------
410 * WalTarMethod - write wal to a tar file containing pg_wal contents
411 *-------------------------------------------------------------------------
414 typedef struct TarMethodFile
416 off_t ofs_start
; /* Where does the *header* for this file start */
418 char header
[TAR_BLOCK_SIZE
];
423 typedef struct TarMethodData
429 TarMethodFile
*currentfile
;
430 char lasterror
[1024];
436 static TarMethodData
*tar_data
= NULL
;
438 #define tar_clear_error() tar_data->lasterror[0] = '\0'
439 #define tar_set_error(msg) strlcpy(tar_data->lasterror, _(msg), sizeof(tar_data->lasterror))
442 tar_getlasterror(void)
445 * If a custom error is set, return that one. Otherwise, assume errno is
446 * set and return that one.
448 if (tar_data
->lasterror
[0])
449 return tar_data
->lasterror
;
450 return strerror(errno
);
455 tar_write_compressed_data(void *buf
, size_t count
, bool flush
)
457 tar_data
->zp
->next_in
= buf
;
458 tar_data
->zp
->avail_in
= count
;
460 while (tar_data
->zp
->avail_in
|| flush
)
464 r
= deflate(tar_data
->zp
, flush
? Z_FINISH
: Z_NO_FLUSH
);
465 if (r
== Z_STREAM_ERROR
)
467 tar_set_error("could not compress data");
471 if (tar_data
->zp
->avail_out
< ZLIB_OUT_SIZE
)
473 size_t len
= ZLIB_OUT_SIZE
- tar_data
->zp
->avail_out
;
476 if (write(tar_data
->fd
, tar_data
->zlibOut
, len
) != len
)
479 * If write didn't set errno, assume problem is no disk space.
486 tar_data
->zp
->next_out
= tar_data
->zlibOut
;
487 tar_data
->zp
->avail_out
= ZLIB_OUT_SIZE
;
490 if (r
== Z_STREAM_END
)
496 /* Reset the stream for writing */
497 if (deflateReset(tar_data
->zp
) != Z_OK
)
499 tar_set_error("could not reset compression stream");
509 tar_write(Walfile f
, const void *buf
, size_t count
)
516 /* Tarfile will always be positioned at the end */
517 if (!tar_data
->compression
)
519 r
= write(tar_data
->fd
, buf
, count
);
521 ((TarMethodFile
*) f
)->currpos
+= r
;
527 if (!tar_write_compressed_data(unconstify(void *, buf
), count
, false))
529 ((TarMethodFile
*) f
)->currpos
+= count
;
534 /* Can't happen - compression enabled with no libz */
540 tar_write_padding_data(TarMethodFile
*f
, size_t bytes
)
542 PGAlignedXLogBlock zerobuf
;
543 size_t bytesleft
= bytes
;
545 memset(zerobuf
.data
, 0, XLOG_BLCKSZ
);
548 size_t bytestowrite
= Min(bytesleft
, XLOG_BLCKSZ
);
549 ssize_t r
= tar_write(f
, zerobuf
.data
, bytestowrite
);
560 tar_get_file_name(const char *pathname
, const char *temp_suffix
)
562 char *filename
= pg_malloc0(MAXPGPATH
* sizeof(char));
564 snprintf(filename
, MAXPGPATH
, "%s%s",
565 pathname
, temp_suffix
? temp_suffix
: "");
571 tar_open_for_write(const char *pathname
, const char *temp_suffix
, size_t pad_to_size
)
578 if (tar_data
->fd
< 0)
581 * We open the tar file only when we first try to write to it.
583 tar_data
->fd
= open(tar_data
->tarfilename
,
584 O_WRONLY
| O_CREAT
| PG_BINARY
,
585 pg_file_create_mode
);
586 if (tar_data
->fd
< 0)
590 if (tar_data
->compression
)
592 tar_data
->zp
= (z_streamp
) pg_malloc(sizeof(z_stream
));
593 tar_data
->zp
->zalloc
= Z_NULL
;
594 tar_data
->zp
->zfree
= Z_NULL
;
595 tar_data
->zp
->opaque
= Z_NULL
;
596 tar_data
->zp
->next_out
= tar_data
->zlibOut
;
597 tar_data
->zp
->avail_out
= ZLIB_OUT_SIZE
;
600 * Initialize deflation library. Adding the magic value 16 to the
601 * default 15 for the windowBits parameter makes the output be
602 * gzip instead of zlib.
604 if (deflateInit2(tar_data
->zp
, tar_data
->compression
, Z_DEFLATED
, 15 + 16, 8, Z_DEFAULT_STRATEGY
) != Z_OK
)
606 pg_free(tar_data
->zp
);
608 tar_set_error("could not initialize compression library");
614 /* There's no tar header itself, the file starts with regular files */
617 Assert(tar_data
->currentfile
== NULL
);
618 if (tar_data
->currentfile
!= NULL
)
620 tar_set_error("implementation error: tar files can't have more than one open file");
624 tar_data
->currentfile
= pg_malloc0(sizeof(TarMethodFile
));
626 tmppath
= tar_get_file_name(pathname
, temp_suffix
);
628 /* Create a header with size set to 0 - we will fill out the size on close */
629 if (tarCreateHeader(tar_data
->currentfile
->header
, tmppath
, NULL
, 0, S_IRUSR
| S_IWUSR
, 0, 0, time(NULL
)) != TAR_OK
)
631 pg_free(tar_data
->currentfile
);
633 tar_data
->currentfile
= NULL
;
634 tar_set_error("could not create tar header");
641 if (tar_data
->compression
)
643 /* Flush existing data */
644 if (!tar_write_compressed_data(NULL
, 0, true))
647 /* Turn off compression for header */
648 if (deflateParams(tar_data
->zp
, 0, 0) != Z_OK
)
650 tar_set_error("could not change compression parameters");
656 tar_data
->currentfile
->ofs_start
= lseek(tar_data
->fd
, 0, SEEK_CUR
);
657 if (tar_data
->currentfile
->ofs_start
== -1)
660 pg_free(tar_data
->currentfile
);
661 tar_data
->currentfile
= NULL
;
665 tar_data
->currentfile
->currpos
= 0;
667 if (!tar_data
->compression
)
670 if (write(tar_data
->fd
, tar_data
->currentfile
->header
,
671 TAR_BLOCK_SIZE
) != TAR_BLOCK_SIZE
)
674 pg_free(tar_data
->currentfile
);
675 tar_data
->currentfile
= NULL
;
676 /* if write didn't set errno, assume problem is no disk space */
677 errno
= save_errno
? save_errno
: ENOSPC
;
684 /* Write header through the zlib APIs but with no compression */
685 if (!tar_write_compressed_data(tar_data
->currentfile
->header
,
686 TAR_BLOCK_SIZE
, true))
689 /* Re-enable compression for the rest of the file */
690 if (deflateParams(tar_data
->zp
, tar_data
->compression
, 0) != Z_OK
)
692 tar_set_error("could not change compression parameters");
698 tar_data
->currentfile
->pathname
= pg_strdup(pathname
);
701 * Uncompressed files are padded on creation, but for compression we can't
706 tar_data
->currentfile
->pad_to_size
= pad_to_size
;
707 if (!tar_data
->compression
)
709 /* Uncompressed, so pad now */
710 tar_write_padding_data(tar_data
->currentfile
, pad_to_size
);
711 /* Seek back to start */
712 if (lseek(tar_data
->fd
,
713 tar_data
->currentfile
->ofs_start
+ TAR_BLOCK_SIZE
,
714 SEEK_SET
) != tar_data
->currentfile
->ofs_start
+ TAR_BLOCK_SIZE
)
717 tar_data
->currentfile
->currpos
= 0;
721 return tar_data
->currentfile
;
725 tar_get_file_size(const char *pathname
)
729 /* Currently not used, so not supported */
735 tar_compression(void)
737 return tar_data
->compression
;
741 tar_get_current_pos(Walfile f
)
746 return ((TarMethodFile
*) f
)->currpos
;
759 * Always sync the whole tarfile, because that's all we can do. This makes
760 * no sense on compressed files, so just ignore those.
762 if (tar_data
->compression
)
765 return fsync(tar_data
->fd
);
769 tar_close(Walfile f
, WalCloseMethod method
)
773 TarMethodFile
*tf
= (TarMethodFile
*) f
;
778 if (method
== CLOSE_UNLINK
)
780 if (tar_data
->compression
)
782 tar_set_error("unlink not supported with compression");
787 * Unlink the file that we just wrote to the tar. We do this by
788 * truncating it to the start of the header. This is safe as we only
789 * allow writing of the very last file.
791 if (ftruncate(tar_data
->fd
, tf
->ofs_start
) != 0)
794 pg_free(tf
->pathname
);
796 tar_data
->currentfile
= NULL
;
802 * Pad the file itself with zeroes if necessary. Note that this is
803 * different from the tar format padding -- this is the padding we asked
804 * for when the file was opened.
808 if (tar_data
->compression
)
811 * A compressed tarfile is padded on close since we cannot know
812 * the size of the compressed output until the end.
814 size_t sizeleft
= tf
->pad_to_size
- tf
->currpos
;
818 if (!tar_write_padding_data(tf
, sizeleft
))
825 * An uncompressed tarfile was padded on creation, so just adjust
826 * the current position as if we seeked to the end.
828 tf
->currpos
= tf
->pad_to_size
;
833 * Get the size of the file, and pad out to a multiple of the tar block
836 filesize
= tar_get_current_pos(f
);
837 padding
= tarPaddingBytesRequired(filesize
);
840 char zerobuf
[TAR_BLOCK_SIZE
];
842 MemSet(zerobuf
, 0, padding
);
843 if (tar_write(f
, zerobuf
, padding
) != padding
)
849 if (tar_data
->compression
)
851 /* Flush the current buffer */
852 if (!tar_write_compressed_data(NULL
, 0, true))
861 * Now go back and update the header with the correct filesize and
862 * possibly also renaming the file. We overwrite the entire current header
863 * when done, including the checksum.
865 print_tar_number(&(tf
->header
[124]), 12, filesize
);
867 if (method
== CLOSE_NORMAL
)
870 * We overwrite it with what it was before if we have no tempname,
871 * since we're going to write the buffer anyway.
873 strlcpy(&(tf
->header
[0]), tf
->pathname
, 100);
875 print_tar_number(&(tf
->header
[148]), 8, tarChecksum(((TarMethodFile
*) f
)->header
));
876 if (lseek(tar_data
->fd
, tf
->ofs_start
, SEEK_SET
) != ((TarMethodFile
*) f
)->ofs_start
)
878 if (!tar_data
->compression
)
881 if (write(tar_data
->fd
, tf
->header
, TAR_BLOCK_SIZE
) != TAR_BLOCK_SIZE
)
883 /* if write didn't set errno, assume problem is no disk space */
892 /* Turn off compression */
893 if (deflateParams(tar_data
->zp
, 0, 0) != Z_OK
)
895 tar_set_error("could not change compression parameters");
899 /* Overwrite the header, assuming the size will be the same */
900 if (!tar_write_compressed_data(tar_data
->currentfile
->header
,
901 TAR_BLOCK_SIZE
, true))
904 /* Turn compression back on */
905 if (deflateParams(tar_data
->zp
, tar_data
->compression
, 0) != Z_OK
)
907 tar_set_error("could not change compression parameters");
913 /* Move file pointer back down to end, so we can write the next file */
914 if (lseek(tar_data
->fd
, 0, SEEK_END
) < 0)
917 /* Always fsync on close, so the padding gets fsynced */
921 /* Clean up and done */
922 pg_free(tf
->pathname
);
924 tar_data
->currentfile
= NULL
;
930 tar_existsfile(const char *pathname
)
933 /* We only deal with new tarfiles, so nothing externally created exists */
944 if (tar_data
->currentfile
)
946 if (tar_close(tar_data
->currentfile
, CLOSE_NORMAL
) != 0)
950 /* A tarfile always ends with two empty blocks */
951 MemSet(zerobuf
, 0, sizeof(zerobuf
));
952 if (!tar_data
->compression
)
955 if (write(tar_data
->fd
, zerobuf
, sizeof(zerobuf
)) != sizeof(zerobuf
))
957 /* if write didn't set errno, assume problem is no disk space */
966 if (!tar_write_compressed_data(zerobuf
, sizeof(zerobuf
), false))
969 /* Also flush all data to make sure the gzip stream is finished */
970 tar_data
->zp
->next_in
= NULL
;
971 tar_data
->zp
->avail_in
= 0;
976 r
= deflate(tar_data
->zp
, Z_FINISH
);
978 if (r
== Z_STREAM_ERROR
)
980 tar_set_error("could not compress data");
983 if (tar_data
->zp
->avail_out
< ZLIB_OUT_SIZE
)
985 size_t len
= ZLIB_OUT_SIZE
- tar_data
->zp
->avail_out
;
988 if (write(tar_data
->fd
, tar_data
->zlibOut
, len
) != len
)
991 * If write didn't set errno, assume problem is no disk
999 if (r
== Z_STREAM_END
)
1003 if (deflateEnd(tar_data
->zp
) != Z_OK
)
1005 tar_set_error("could not close compression stream");
1011 /* sync the empty blocks as well, since they're after the last file */
1014 if (fsync(tar_data
->fd
) != 0)
1018 if (close(tar_data
->fd
) != 0)
1025 if (fsync_fname(tar_data
->tarfilename
, false) != 0)
1027 if (fsync_parent_path(tar_data
->tarfilename
) != 0)
1035 CreateWalTarMethod(const char *tarbase
, int compression
, bool sync
)
1037 WalWriteMethod
*method
;
1038 const char *suffix
= (compression
!= 0) ? ".tar.gz" : ".tar";
1040 method
= pg_malloc0(sizeof(WalWriteMethod
));
1041 method
->open_for_write
= tar_open_for_write
;
1042 method
->write
= tar_write
;
1043 method
->get_current_pos
= tar_get_current_pos
;
1044 method
->get_file_size
= tar_get_file_size
;
1045 method
->get_file_name
= tar_get_file_name
;
1046 method
->compression
= tar_compression
;
1047 method
->close
= tar_close
;
1048 method
->sync
= tar_sync
;
1049 method
->existsfile
= tar_existsfile
;
1050 method
->finish
= tar_finish
;
1051 method
->getlasterror
= tar_getlasterror
;
1053 tar_data
= pg_malloc0(sizeof(TarMethodData
));
1054 tar_data
->tarfilename
= pg_malloc0(strlen(tarbase
) + strlen(suffix
) + 1);
1055 sprintf(tar_data
->tarfilename
, "%s%s", tarbase
, suffix
);
1057 tar_data
->compression
= compression
;
1058 tar_data
->sync
= sync
;
1061 tar_data
->zlibOut
= (char *) pg_malloc(ZLIB_OUT_SIZE
+ 1);
1068 FreeWalTarMethod(void)
1070 pg_free(tar_data
->tarfilename
);
1072 if (tar_data
->compression
)
1073 pg_free(tar_data
->zlibOut
);