1 /*-------------------------------------------------------------------------
5 * Archive streamers that deal with data compressed using lz4.
6 * astreamer_lz4_compressor applies lz4 compression to the input stream,
7 * and astreamer_lz4_decompressor does the reverse.
9 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 * src/fe_utils/astreamer_lz4.c
13 *-------------------------------------------------------------------------
16 #include "postgres_fe.h"
24 #include "common/logging.h"
25 #include "fe_utils/astreamer.h"
28 typedef struct astreamer_lz4_frame
32 LZ4F_compressionContext_t cctx
;
33 LZ4F_decompressionContext_t dctx
;
34 LZ4F_preferences_t prefs
;
38 } astreamer_lz4_frame
;
40 static void astreamer_lz4_compressor_content(astreamer
*streamer
,
41 astreamer_member
*member
,
42 const char *data
, int len
,
43 astreamer_archive_context context
);
44 static void astreamer_lz4_compressor_finalize(astreamer
*streamer
);
45 static void astreamer_lz4_compressor_free(astreamer
*streamer
);
47 static const astreamer_ops astreamer_lz4_compressor_ops
= {
48 .content
= astreamer_lz4_compressor_content
,
49 .finalize
= astreamer_lz4_compressor_finalize
,
50 .free
= astreamer_lz4_compressor_free
53 static void astreamer_lz4_decompressor_content(astreamer
*streamer
,
54 astreamer_member
*member
,
55 const char *data
, int len
,
56 astreamer_archive_context context
);
57 static void astreamer_lz4_decompressor_finalize(astreamer
*streamer
);
58 static void astreamer_lz4_decompressor_free(astreamer
*streamer
);
60 static const astreamer_ops astreamer_lz4_decompressor_ops
= {
61 .content
= astreamer_lz4_decompressor_content
,
62 .finalize
= astreamer_lz4_decompressor_finalize
,
63 .free
= astreamer_lz4_decompressor_free
68 * Create a new base backup streamer that performs lz4 compression of tar
72 astreamer_lz4_compressor_new(astreamer
*next
, pg_compress_specification
*compress
)
75 astreamer_lz4_frame
*streamer
;
76 LZ4F_errorCode_t ctxError
;
77 LZ4F_preferences_t
*prefs
;
81 streamer
= palloc0(sizeof(astreamer_lz4_frame
));
82 *((const astreamer_ops
**) &streamer
->base
.bbs_ops
) =
83 &astreamer_lz4_compressor_ops
;
85 streamer
->base
.bbs_next
= next
;
86 initStringInfo(&streamer
->base
.bbs_buffer
);
87 streamer
->header_written
= false;
89 /* Initialize stream compression preferences */
90 prefs
= &streamer
->prefs
;
91 memset(prefs
, 0, sizeof(LZ4F_preferences_t
));
92 prefs
->frameInfo
.blockSizeID
= LZ4F_max256KB
;
93 prefs
->compressionLevel
= compress
->level
;
95 ctxError
= LZ4F_createCompressionContext(&streamer
->cctx
, LZ4F_VERSION
);
96 if (LZ4F_isError(ctxError
))
97 pg_log_error("could not create lz4 compression context: %s",
98 LZ4F_getErrorName(ctxError
));
100 return &streamer
->base
;
102 pg_fatal("this build does not support compression with %s", "LZ4");
103 return NULL
; /* keep compiler quiet */
109 * Compress the input data to output buffer.
111 * Find out the compression bound based on input data length for each
112 * invocation to make sure that output buffer has enough capacity to
113 * accommodate the compressed data. In case if the output buffer
114 * capacity falls short of compression bound then forward the content
115 * of output buffer to next streamer and empty the buffer.
118 astreamer_lz4_compressor_content(astreamer
*streamer
,
119 astreamer_member
*member
,
120 const char *data
, int len
,
121 astreamer_archive_context context
)
123 astreamer_lz4_frame
*mystreamer
;
130 mystreamer
= (astreamer_lz4_frame
*) streamer
;
131 next_in
= (uint8
*) data
;
133 /* Write header before processing the first input chunk. */
134 if (!mystreamer
->header_written
)
136 compressed_size
= LZ4F_compressBegin(mystreamer
->cctx
,
137 (uint8
*) mystreamer
->base
.bbs_buffer
.data
,
138 mystreamer
->base
.bbs_buffer
.maxlen
,
141 if (LZ4F_isError(compressed_size
))
142 pg_log_error("could not write lz4 header: %s",
143 LZ4F_getErrorName(compressed_size
));
145 mystreamer
->bytes_written
+= compressed_size
;
146 mystreamer
->header_written
= true;
150 * Update the offset and capacity of output buffer based on number of
151 * bytes written to output buffer.
153 next_out
= (uint8
*) mystreamer
->base
.bbs_buffer
.data
+ mystreamer
->bytes_written
;
154 avail_out
= mystreamer
->base
.bbs_buffer
.maxlen
- mystreamer
->bytes_written
;
157 * Find out the compression bound and make sure that output buffer has the
158 * required capacity for the success of LZ4F_compressUpdate. If needed
159 * forward the content to next streamer and empty the buffer.
161 out_bound
= LZ4F_compressBound(len
, &mystreamer
->prefs
);
162 if (avail_out
< out_bound
)
164 astreamer_content(mystreamer
->base
.bbs_next
, member
,
165 mystreamer
->base
.bbs_buffer
.data
,
166 mystreamer
->bytes_written
,
169 /* Enlarge buffer if it falls short of out bound. */
170 if (mystreamer
->base
.bbs_buffer
.maxlen
< out_bound
)
171 enlargeStringInfo(&mystreamer
->base
.bbs_buffer
, out_bound
);
173 avail_out
= mystreamer
->base
.bbs_buffer
.maxlen
;
174 mystreamer
->bytes_written
= 0;
175 next_out
= (uint8
*) mystreamer
->base
.bbs_buffer
.data
;
179 * This call compresses the data starting at next_in and generates the
180 * output starting at next_out. It expects the caller to provide the size
181 * of input buffer and capacity of output buffer by providing parameters
184 * It returns the number of bytes compressed to output buffer.
186 compressed_size
= LZ4F_compressUpdate(mystreamer
->cctx
,
190 if (LZ4F_isError(compressed_size
))
191 pg_log_error("could not compress data: %s",
192 LZ4F_getErrorName(compressed_size
));
194 mystreamer
->bytes_written
+= compressed_size
;
198 * End-of-stream processing.
201 astreamer_lz4_compressor_finalize(astreamer
*streamer
)
203 astreamer_lz4_frame
*mystreamer
;
209 mystreamer
= (astreamer_lz4_frame
*) streamer
;
211 /* Find out the footer bound and update the output buffer. */
212 footer_bound
= LZ4F_compressBound(0, &mystreamer
->prefs
);
213 if ((mystreamer
->base
.bbs_buffer
.maxlen
- mystreamer
->bytes_written
) <
216 astreamer_content(mystreamer
->base
.bbs_next
, NULL
,
217 mystreamer
->base
.bbs_buffer
.data
,
218 mystreamer
->bytes_written
,
221 /* Enlarge buffer if it falls short of footer bound. */
222 if (mystreamer
->base
.bbs_buffer
.maxlen
< footer_bound
)
223 enlargeStringInfo(&mystreamer
->base
.bbs_buffer
, footer_bound
);
225 avail_out
= mystreamer
->base
.bbs_buffer
.maxlen
;
226 mystreamer
->bytes_written
= 0;
227 next_out
= (uint8
*) mystreamer
->base
.bbs_buffer
.data
;
231 next_out
= (uint8
*) mystreamer
->base
.bbs_buffer
.data
+ mystreamer
->bytes_written
;
232 avail_out
= mystreamer
->base
.bbs_buffer
.maxlen
- mystreamer
->bytes_written
;
236 * Finalize the frame and flush whatever data remaining in compression
239 compressed_size
= LZ4F_compressEnd(mystreamer
->cctx
,
240 next_out
, avail_out
, NULL
);
242 if (LZ4F_isError(compressed_size
))
243 pg_log_error("could not end lz4 compression: %s",
244 LZ4F_getErrorName(compressed_size
));
246 mystreamer
->bytes_written
+= compressed_size
;
248 astreamer_content(mystreamer
->base
.bbs_next
, NULL
,
249 mystreamer
->base
.bbs_buffer
.data
,
250 mystreamer
->bytes_written
,
253 astreamer_finalize(mystreamer
->base
.bbs_next
);
260 astreamer_lz4_compressor_free(astreamer
*streamer
)
262 astreamer_lz4_frame
*mystreamer
;
264 mystreamer
= (astreamer_lz4_frame
*) streamer
;
265 astreamer_free(streamer
->bbs_next
);
266 LZ4F_freeCompressionContext(mystreamer
->cctx
);
267 pfree(streamer
->bbs_buffer
.data
);
273 * Create a new base backup streamer that performs decompression of lz4
277 astreamer_lz4_decompressor_new(astreamer
*next
)
280 astreamer_lz4_frame
*streamer
;
281 LZ4F_errorCode_t ctxError
;
283 Assert(next
!= NULL
);
285 streamer
= palloc0(sizeof(astreamer_lz4_frame
));
286 *((const astreamer_ops
**) &streamer
->base
.bbs_ops
) =
287 &astreamer_lz4_decompressor_ops
;
289 streamer
->base
.bbs_next
= next
;
290 initStringInfo(&streamer
->base
.bbs_buffer
);
292 /* Initialize internal stream state for decompression */
293 ctxError
= LZ4F_createDecompressionContext(&streamer
->dctx
, LZ4F_VERSION
);
294 if (LZ4F_isError(ctxError
))
295 pg_fatal("could not initialize compression library: %s",
296 LZ4F_getErrorName(ctxError
));
298 return &streamer
->base
;
300 pg_fatal("this build does not support compression with %s", "LZ4");
301 return NULL
; /* keep compiler quiet */
307 * Decompress the input data to output buffer until we run out of input
308 * data. Each time the output buffer is full, pass on the decompressed data
309 * to the next streamer.
312 astreamer_lz4_decompressor_content(astreamer
*streamer
,
313 astreamer_member
*member
,
314 const char *data
, int len
,
315 astreamer_archive_context context
)
317 astreamer_lz4_frame
*mystreamer
;
323 mystreamer
= (astreamer_lz4_frame
*) streamer
;
324 next_in
= (uint8
*) data
;
325 next_out
= (uint8
*) mystreamer
->base
.bbs_buffer
.data
;
327 avail_out
= mystreamer
->base
.bbs_buffer
.maxlen
;
335 read_size
= avail_in
;
336 out_size
= avail_out
;
339 * This call decompresses the data starting at next_in and generates
340 * the output data starting at next_out. It expects the caller to
341 * provide size of the input buffer and total capacity of the output
342 * buffer by providing the read_size and out_size parameters
345 * Per the documentation of LZ4, parameters read_size and out_size
346 * behaves as dual parameters. On return, the number of bytes consumed
347 * from the input buffer will be written back to read_size and the
348 * number of bytes decompressed to output buffer will be written back
349 * to out_size respectively.
351 ret
= LZ4F_decompress(mystreamer
->dctx
,
353 next_in
, &read_size
, NULL
);
355 if (LZ4F_isError(ret
))
356 pg_log_error("could not decompress data: %s",
357 LZ4F_getErrorName(ret
));
359 /* Update input buffer based on number of bytes consumed */
360 avail_in
-= read_size
;
361 next_in
+= read_size
;
363 mystreamer
->bytes_written
+= out_size
;
366 * If output buffer is full then forward the content to next streamer
367 * and update the output buffer.
369 if (mystreamer
->bytes_written
>= mystreamer
->base
.bbs_buffer
.maxlen
)
371 astreamer_content(mystreamer
->base
.bbs_next
, member
,
372 mystreamer
->base
.bbs_buffer
.data
,
373 mystreamer
->base
.bbs_buffer
.maxlen
,
376 avail_out
= mystreamer
->base
.bbs_buffer
.maxlen
;
377 mystreamer
->bytes_written
= 0;
378 next_out
= (uint8
*) mystreamer
->base
.bbs_buffer
.data
;
382 avail_out
= mystreamer
->base
.bbs_buffer
.maxlen
- mystreamer
->bytes_written
;
383 next_out
+= mystreamer
->bytes_written
;
389 * End-of-stream processing.
392 astreamer_lz4_decompressor_finalize(astreamer
*streamer
)
394 astreamer_lz4_frame
*mystreamer
;
396 mystreamer
= (astreamer_lz4_frame
*) streamer
;
399 * End of the stream, if there is some pending data in output buffers then
400 * we must forward it to next streamer.
402 astreamer_content(mystreamer
->base
.bbs_next
, NULL
,
403 mystreamer
->base
.bbs_buffer
.data
,
404 mystreamer
->base
.bbs_buffer
.maxlen
,
407 astreamer_finalize(mystreamer
->base
.bbs_next
);
414 astreamer_lz4_decompressor_free(astreamer
*streamer
)
416 astreamer_lz4_frame
*mystreamer
;
418 mystreamer
= (astreamer_lz4_frame
*) streamer
;
419 astreamer_free(streamer
->bbs_next
);
420 LZ4F_freeDecompressionContext(mystreamer
->dctx
);
421 pfree(streamer
->bbs_buffer
.data
);