1 /*-------------------------------------------------------------------------
4 * Basebackup sink implementing zstd compression.
6 * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
9 * src/backend/backup/basebackup_zstd.c
11 *-------------------------------------------------------------------------
19 #include "backup/basebackup_sink.h"
23 typedef struct bbsink_zstd
25 /* Common information for all types of sink. */
28 /* Compression options */
29 pg_compress_specification
*compress
;
32 ZSTD_outBuffer zstd_outBuf
;
35 static void bbsink_zstd_begin_backup(bbsink
*sink
);
36 static void bbsink_zstd_begin_archive(bbsink
*sink
, const char *archive_name
);
37 static void bbsink_zstd_archive_contents(bbsink
*sink
, size_t len
);
38 static void bbsink_zstd_manifest_contents(bbsink
*sink
, size_t len
);
39 static void bbsink_zstd_end_archive(bbsink
*sink
);
40 static void bbsink_zstd_cleanup(bbsink
*sink
);
41 static void bbsink_zstd_end_backup(bbsink
*sink
, XLogRecPtr endptr
,
44 static const bbsink_ops bbsink_zstd_ops
= {
45 .begin_backup
= bbsink_zstd_begin_backup
,
46 .begin_archive
= bbsink_zstd_begin_archive
,
47 .archive_contents
= bbsink_zstd_archive_contents
,
48 .end_archive
= bbsink_zstd_end_archive
,
49 .begin_manifest
= bbsink_forward_begin_manifest
,
50 .manifest_contents
= bbsink_zstd_manifest_contents
,
51 .end_manifest
= bbsink_forward_end_manifest
,
52 .end_backup
= bbsink_zstd_end_backup
,
53 .cleanup
= bbsink_zstd_cleanup
58 * Create a new basebackup sink that performs zstd compression.
61 bbsink_zstd_new(bbsink
*next
, pg_compress_specification
*compress
)
65 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
66 errmsg("zstd compression is not supported by this build")));
67 return NULL
; /* keep compiler quiet */
73 sink
= palloc0(sizeof(bbsink_zstd
));
74 *((const bbsink_ops
**) &sink
->base
.bbs_ops
) = &bbsink_zstd_ops
;
75 sink
->base
.bbs_next
= next
;
76 sink
->compress
= compress
;
88 bbsink_zstd_begin_backup(bbsink
*sink
)
90 bbsink_zstd
*mysink
= (bbsink_zstd
*) sink
;
91 size_t output_buffer_bound
;
93 pg_compress_specification
*compress
= mysink
->compress
;
95 mysink
->cctx
= ZSTD_createCCtx();
97 elog(ERROR
, "could not create zstd compression context");
99 ret
= ZSTD_CCtx_setParameter(mysink
->cctx
, ZSTD_c_compressionLevel
,
101 if (ZSTD_isError(ret
))
102 elog(ERROR
, "could not set zstd compression level to %d: %s",
103 compress
->level
, ZSTD_getErrorName(ret
));
105 if ((compress
->options
& PG_COMPRESSION_OPTION_WORKERS
) != 0)
108 * On older versions of libzstd, this option does not exist, and
109 * trying to set it will fail. Similarly for newer versions if they
110 * are compiled without threading support.
112 ret
= ZSTD_CCtx_setParameter(mysink
->cctx
, ZSTD_c_nbWorkers
,
114 if (ZSTD_isError(ret
))
116 errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
117 errmsg("could not set compression worker count to %d: %s",
118 compress
->workers
, ZSTD_getErrorName(ret
)));
121 if ((compress
->options
& PG_COMPRESSION_OPTION_LONG_DISTANCE
) != 0)
123 ret
= ZSTD_CCtx_setParameter(mysink
->cctx
,
124 ZSTD_c_enableLongDistanceMatching
,
125 compress
->long_distance
);
126 if (ZSTD_isError(ret
))
128 errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
129 errmsg("could not enable long-distance mode: %s",
130 ZSTD_getErrorName(ret
)));
134 * We need our own buffer, because we're going to pass different data to
135 * the next sink than what gets passed to us.
137 mysink
->base
.bbs_buffer
= palloc(mysink
->base
.bbs_buffer_length
);
140 * Make sure that the next sink's bbs_buffer is big enough to accommodate
141 * the compressed input buffer.
143 output_buffer_bound
= ZSTD_compressBound(mysink
->base
.bbs_buffer_length
);
146 * The buffer length is expected to be a multiple of BLCKSZ, so round up.
148 output_buffer_bound
= output_buffer_bound
+ BLCKSZ
-
149 (output_buffer_bound
% BLCKSZ
);
151 bbsink_begin_backup(sink
->bbs_next
, sink
->bbs_state
, output_buffer_bound
);
155 * Prepare to compress the next archive.
158 bbsink_zstd_begin_archive(bbsink
*sink
, const char *archive_name
)
160 bbsink_zstd
*mysink
= (bbsink_zstd
*) sink
;
161 char *zstd_archive_name
;
164 * At the start of each archive we reset the state to start a new
165 * compression operation. The parameters are sticky and they will stick
166 * around as we are resetting with option ZSTD_reset_session_only.
168 ZSTD_CCtx_reset(mysink
->cctx
, ZSTD_reset_session_only
);
170 mysink
->zstd_outBuf
.dst
= mysink
->base
.bbs_next
->bbs_buffer
;
171 mysink
->zstd_outBuf
.size
= mysink
->base
.bbs_next
->bbs_buffer_length
;
172 mysink
->zstd_outBuf
.pos
= 0;
174 /* Add ".zst" to the archive name. */
175 zstd_archive_name
= psprintf("%s.zst", archive_name
);
176 Assert(sink
->bbs_next
!= NULL
);
177 bbsink_begin_archive(sink
->bbs_next
, zstd_archive_name
);
178 pfree(zstd_archive_name
);
182 * Compress the input data to the output buffer until we run out of input
183 * data. Each time the output buffer falls below the compression bound for
184 * the input buffer, invoke the archive_contents() method for the next sink.
186 * Note that since we're compressing the input, it may very commonly happen
187 * that we consume all the input data without filling the output buffer. In
188 * that case, the compressed representation of the current input data won't
189 * actually be sent to the next bbsink until a later call to this function,
190 * or perhaps even not until bbsink_zstd_end_archive() is invoked.
193 bbsink_zstd_archive_contents(bbsink
*sink
, size_t len
)
195 bbsink_zstd
*mysink
= (bbsink_zstd
*) sink
;
196 ZSTD_inBuffer inBuf
= {mysink
->base
.bbs_buffer
, len
, 0};
198 while (inBuf
.pos
< inBuf
.size
)
201 size_t max_needed
= ZSTD_compressBound(inBuf
.size
- inBuf
.pos
);
204 * If the out buffer is not left with enough space, send the output
205 * buffer to the next sink, and reset it.
207 if (mysink
->zstd_outBuf
.size
- mysink
->zstd_outBuf
.pos
< max_needed
)
209 bbsink_archive_contents(mysink
->base
.bbs_next
,
210 mysink
->zstd_outBuf
.pos
);
211 mysink
->zstd_outBuf
.dst
= mysink
->base
.bbs_next
->bbs_buffer
;
212 mysink
->zstd_outBuf
.size
=
213 mysink
->base
.bbs_next
->bbs_buffer_length
;
214 mysink
->zstd_outBuf
.pos
= 0;
217 yet_to_flush
= ZSTD_compressStream2(mysink
->cctx
, &mysink
->zstd_outBuf
,
218 &inBuf
, ZSTD_e_continue
);
220 if (ZSTD_isError(yet_to_flush
))
222 "could not compress data: %s",
223 ZSTD_getErrorName(yet_to_flush
));
228 * There might be some data inside zstd's internal buffers; we need to get that
229 * flushed out, also end the zstd frame and then get that forwarded to the
230 * successor sink as archive content.
232 * Then we can end processing for this archive.
235 bbsink_zstd_end_archive(bbsink
*sink
)
237 bbsink_zstd
*mysink
= (bbsink_zstd
*) sink
;
242 ZSTD_inBuffer in
= {NULL
, 0, 0};
243 size_t max_needed
= ZSTD_compressBound(0);
246 * If the out buffer is not left with enough space, send the output
247 * buffer to the next sink, and reset it.
249 if (mysink
->zstd_outBuf
.size
- mysink
->zstd_outBuf
.pos
< max_needed
)
251 bbsink_archive_contents(mysink
->base
.bbs_next
,
252 mysink
->zstd_outBuf
.pos
);
253 mysink
->zstd_outBuf
.dst
= mysink
->base
.bbs_next
->bbs_buffer
;
254 mysink
->zstd_outBuf
.size
=
255 mysink
->base
.bbs_next
->bbs_buffer_length
;
256 mysink
->zstd_outBuf
.pos
= 0;
259 yet_to_flush
= ZSTD_compressStream2(mysink
->cctx
,
260 &mysink
->zstd_outBuf
,
263 if (ZSTD_isError(yet_to_flush
))
264 elog(ERROR
, "could not compress data: %s",
265 ZSTD_getErrorName(yet_to_flush
));
267 } while (yet_to_flush
> 0);
269 /* Make sure to pass any remaining bytes to the next sink. */
270 if (mysink
->zstd_outBuf
.pos
> 0)
271 bbsink_archive_contents(mysink
->base
.bbs_next
,
272 mysink
->zstd_outBuf
.pos
);
274 /* Pass on the information that this archive has ended. */
275 bbsink_forward_end_archive(sink
);
279 * Free the resources and context.
282 bbsink_zstd_end_backup(bbsink
*sink
, XLogRecPtr endptr
,
285 bbsink_zstd
*mysink
= (bbsink_zstd
*) sink
;
287 /* Release the context. */
290 ZSTD_freeCCtx(mysink
->cctx
);
294 bbsink_forward_end_backup(sink
, endptr
, endtli
);
298 * Manifest contents are not compressed, but we do need to copy them into
299 * the successor sink's buffer, because we have our own.
302 bbsink_zstd_manifest_contents(bbsink
*sink
, size_t len
)
304 memcpy(sink
->bbs_next
->bbs_buffer
, sink
->bbs_buffer
, len
);
305 bbsink_manifest_contents(sink
->bbs_next
, len
);
309 * In case the backup fails, make sure we free any compression context that
310 * got allocated, so that we don't leak memory.
313 bbsink_zstd_cleanup(bbsink
*sink
)
315 bbsink_zstd
*mysink
= (bbsink_zstd
*) sink
;
317 /* Release the context if not already released. */
320 ZSTD_freeCCtx(mysink
->cctx
);