1 /*-------------------------------------------------------------------------
5 * Archive streamers that deal with data compressed using zstd.
6 * astreamer_zstd_compressor applies lz4 compression to the input stream,
7 * and astreamer_zstd_decompressor does the reverse.
9 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 * src/fe_utils/astreamer_zstd.c
13 *-------------------------------------------------------------------------
16 #include "postgres_fe.h"
24 #include "common/logging.h"
25 #include "fe_utils/astreamer.h"
29 typedef struct astreamer_zstd_frame
35 ZSTD_outBuffer zstd_outBuf
;
36 } astreamer_zstd_frame
;
38 static void astreamer_zstd_compressor_content(astreamer
*streamer
,
39 astreamer_member
*member
,
40 const char *data
, int len
,
41 astreamer_archive_context context
);
42 static void astreamer_zstd_compressor_finalize(astreamer
*streamer
);
43 static void astreamer_zstd_compressor_free(astreamer
*streamer
);
45 static const astreamer_ops astreamer_zstd_compressor_ops
= {
46 .content
= astreamer_zstd_compressor_content
,
47 .finalize
= astreamer_zstd_compressor_finalize
,
48 .free
= astreamer_zstd_compressor_free
51 static void astreamer_zstd_decompressor_content(astreamer
*streamer
,
52 astreamer_member
*member
,
53 const char *data
, int len
,
54 astreamer_archive_context context
);
55 static void astreamer_zstd_decompressor_finalize(astreamer
*streamer
);
56 static void astreamer_zstd_decompressor_free(astreamer
*streamer
);
58 static const astreamer_ops astreamer_zstd_decompressor_ops
= {
59 .content
= astreamer_zstd_decompressor_content
,
60 .finalize
= astreamer_zstd_decompressor_finalize
,
61 .free
= astreamer_zstd_decompressor_free
66 * Create a new base backup streamer that performs zstd compression of tar
70 astreamer_zstd_compressor_new(astreamer
*next
, pg_compress_specification
*compress
)
73 astreamer_zstd_frame
*streamer
;
78 streamer
= palloc0(sizeof(astreamer_zstd_frame
));
80 *((const astreamer_ops
**) &streamer
->base
.bbs_ops
) =
81 &astreamer_zstd_compressor_ops
;
83 streamer
->base
.bbs_next
= next
;
84 initStringInfo(&streamer
->base
.bbs_buffer
);
85 enlargeStringInfo(&streamer
->base
.bbs_buffer
, ZSTD_DStreamOutSize());
87 streamer
->cctx
= ZSTD_createCCtx();
89 pg_fatal("could not create zstd compression context");
91 /* Set compression level */
92 ret
= ZSTD_CCtx_setParameter(streamer
->cctx
, ZSTD_c_compressionLevel
,
94 if (ZSTD_isError(ret
))
95 pg_fatal("could not set zstd compression level to %d: %s",
96 compress
->level
, ZSTD_getErrorName(ret
));
98 /* Set # of workers, if specified */
99 if ((compress
->options
& PG_COMPRESSION_OPTION_WORKERS
) != 0)
102 * On older versions of libzstd, this option does not exist, and
103 * trying to set it will fail. Similarly for newer versions if they
104 * are compiled without threading support.
106 ret
= ZSTD_CCtx_setParameter(streamer
->cctx
, ZSTD_c_nbWorkers
,
108 if (ZSTD_isError(ret
))
109 pg_fatal("could not set compression worker count to %d: %s",
110 compress
->workers
, ZSTD_getErrorName(ret
));
113 if ((compress
->options
& PG_COMPRESSION_OPTION_LONG_DISTANCE
) != 0)
115 ret
= ZSTD_CCtx_setParameter(streamer
->cctx
,
116 ZSTD_c_enableLongDistanceMatching
,
117 compress
->long_distance
);
118 if (ZSTD_isError(ret
))
120 pg_log_error("could not enable long-distance mode: %s",
121 ZSTD_getErrorName(ret
));
126 /* Initialize the ZSTD output buffer. */
127 streamer
->zstd_outBuf
.dst
= streamer
->base
.bbs_buffer
.data
;
128 streamer
->zstd_outBuf
.size
= streamer
->base
.bbs_buffer
.maxlen
;
129 streamer
->zstd_outBuf
.pos
= 0;
131 return &streamer
->base
;
133 pg_fatal("this build does not support compression with %s", "ZSTD");
134 return NULL
; /* keep compiler quiet */
140 * Compress the input data to output buffer.
142 * Find out the compression bound based on input data length for each
143 * invocation to make sure that output buffer has enough capacity to
144 * accommodate the compressed data. In case if the output buffer
145 * capacity falls short of compression bound then forward the content
146 * of output buffer to next streamer and empty the buffer.
149 astreamer_zstd_compressor_content(astreamer
*streamer
,
150 astreamer_member
*member
,
151 const char *data
, int len
,
152 astreamer_archive_context context
)
154 astreamer_zstd_frame
*mystreamer
= (astreamer_zstd_frame
*) streamer
;
155 ZSTD_inBuffer inBuf
= {data
, len
, 0};
157 while (inBuf
.pos
< inBuf
.size
)
160 size_t max_needed
= ZSTD_compressBound(inBuf
.size
- inBuf
.pos
);
163 * If the output buffer is not left with enough space, send the
164 * compressed bytes to the next streamer, and empty the buffer.
166 if (mystreamer
->zstd_outBuf
.size
- mystreamer
->zstd_outBuf
.pos
<
169 astreamer_content(mystreamer
->base
.bbs_next
, member
,
170 mystreamer
->zstd_outBuf
.dst
,
171 mystreamer
->zstd_outBuf
.pos
,
174 /* Reset the ZSTD output buffer. */
175 mystreamer
->zstd_outBuf
.dst
= mystreamer
->base
.bbs_buffer
.data
;
176 mystreamer
->zstd_outBuf
.size
= mystreamer
->base
.bbs_buffer
.maxlen
;
177 mystreamer
->zstd_outBuf
.pos
= 0;
181 ZSTD_compressStream2(mystreamer
->cctx
, &mystreamer
->zstd_outBuf
,
182 &inBuf
, ZSTD_e_continue
);
184 if (ZSTD_isError(yet_to_flush
))
185 pg_log_error("could not compress data: %s",
186 ZSTD_getErrorName(yet_to_flush
));
191 * End-of-stream processing.
194 astreamer_zstd_compressor_finalize(astreamer
*streamer
)
196 astreamer_zstd_frame
*mystreamer
= (astreamer_zstd_frame
*) streamer
;
201 ZSTD_inBuffer in
= {NULL
, 0, 0};
202 size_t max_needed
= ZSTD_compressBound(0);
205 * If the output buffer is not left with enough space, send the
206 * compressed bytes to the next streamer, and empty the buffer.
208 if (mystreamer
->zstd_outBuf
.size
- mystreamer
->zstd_outBuf
.pos
<
211 astreamer_content(mystreamer
->base
.bbs_next
, NULL
,
212 mystreamer
->zstd_outBuf
.dst
,
213 mystreamer
->zstd_outBuf
.pos
,
216 /* Reset the ZSTD output buffer. */
217 mystreamer
->zstd_outBuf
.dst
= mystreamer
->base
.bbs_buffer
.data
;
218 mystreamer
->zstd_outBuf
.size
= mystreamer
->base
.bbs_buffer
.maxlen
;
219 mystreamer
->zstd_outBuf
.pos
= 0;
222 yet_to_flush
= ZSTD_compressStream2(mystreamer
->cctx
,
223 &mystreamer
->zstd_outBuf
,
226 if (ZSTD_isError(yet_to_flush
))
227 pg_log_error("could not compress data: %s",
228 ZSTD_getErrorName(yet_to_flush
));
230 } while (yet_to_flush
> 0);
232 /* Make sure to pass any remaining bytes to the next streamer. */
233 if (mystreamer
->zstd_outBuf
.pos
> 0)
234 astreamer_content(mystreamer
->base
.bbs_next
, NULL
,
235 mystreamer
->zstd_outBuf
.dst
,
236 mystreamer
->zstd_outBuf
.pos
,
239 astreamer_finalize(mystreamer
->base
.bbs_next
);
246 astreamer_zstd_compressor_free(astreamer
*streamer
)
248 astreamer_zstd_frame
*mystreamer
= (astreamer_zstd_frame
*) streamer
;
250 astreamer_free(streamer
->bbs_next
);
251 ZSTD_freeCCtx(mystreamer
->cctx
);
252 pfree(streamer
->bbs_buffer
.data
);
258 * Create a new base backup streamer that performs decompression of zstd
262 astreamer_zstd_decompressor_new(astreamer
*next
)
265 astreamer_zstd_frame
*streamer
;
267 Assert(next
!= NULL
);
269 streamer
= palloc0(sizeof(astreamer_zstd_frame
));
270 *((const astreamer_ops
**) &streamer
->base
.bbs_ops
) =
271 &astreamer_zstd_decompressor_ops
;
273 streamer
->base
.bbs_next
= next
;
274 initStringInfo(&streamer
->base
.bbs_buffer
);
275 enlargeStringInfo(&streamer
->base
.bbs_buffer
, ZSTD_DStreamOutSize());
277 streamer
->dctx
= ZSTD_createDCtx();
279 pg_fatal("could not create zstd decompression context");
281 /* Initialize the ZSTD output buffer. */
282 streamer
->zstd_outBuf
.dst
= streamer
->base
.bbs_buffer
.data
;
283 streamer
->zstd_outBuf
.size
= streamer
->base
.bbs_buffer
.maxlen
;
284 streamer
->zstd_outBuf
.pos
= 0;
286 return &streamer
->base
;
288 pg_fatal("this build does not support compression with %s", "ZSTD");
289 return NULL
; /* keep compiler quiet */
295 * Decompress the input data to output buffer until we run out of input
296 * data. Each time the output buffer is full, pass on the decompressed data
297 * to the next streamer.
300 astreamer_zstd_decompressor_content(astreamer
*streamer
,
301 astreamer_member
*member
,
302 const char *data
, int len
,
303 astreamer_archive_context context
)
305 astreamer_zstd_frame
*mystreamer
= (astreamer_zstd_frame
*) streamer
;
306 ZSTD_inBuffer inBuf
= {data
, len
, 0};
308 while (inBuf
.pos
< inBuf
.size
)
313 * If output buffer is full then forward the content to next streamer
314 * and update the output buffer.
316 if (mystreamer
->zstd_outBuf
.pos
>= mystreamer
->zstd_outBuf
.size
)
318 astreamer_content(mystreamer
->base
.bbs_next
, member
,
319 mystreamer
->zstd_outBuf
.dst
,
320 mystreamer
->zstd_outBuf
.pos
,
323 /* Reset the ZSTD output buffer. */
324 mystreamer
->zstd_outBuf
.dst
= mystreamer
->base
.bbs_buffer
.data
;
325 mystreamer
->zstd_outBuf
.size
= mystreamer
->base
.bbs_buffer
.maxlen
;
326 mystreamer
->zstd_outBuf
.pos
= 0;
329 ret
= ZSTD_decompressStream(mystreamer
->dctx
,
330 &mystreamer
->zstd_outBuf
, &inBuf
);
332 if (ZSTD_isError(ret
))
333 pg_log_error("could not decompress data: %s",
334 ZSTD_getErrorName(ret
));
339 * End-of-stream processing.
342 astreamer_zstd_decompressor_finalize(astreamer
*streamer
)
344 astreamer_zstd_frame
*mystreamer
= (astreamer_zstd_frame
*) streamer
;
347 * End of the stream, if there is some pending data in output buffers then
348 * we must forward it to next streamer.
350 if (mystreamer
->zstd_outBuf
.pos
> 0)
351 astreamer_content(mystreamer
->base
.bbs_next
, NULL
,
352 mystreamer
->base
.bbs_buffer
.data
,
353 mystreamer
->base
.bbs_buffer
.maxlen
,
356 astreamer_finalize(mystreamer
->base
.bbs_next
);
363 astreamer_zstd_decompressor_free(astreamer
*streamer
)
365 astreamer_zstd_frame
*mystreamer
= (astreamer_zstd_frame
*) streamer
;
367 astreamer_free(streamer
->bbs_next
);
368 ZSTD_freeDCtx(mystreamer
->dctx
);
369 pfree(streamer
->bbs_buffer
.data
);