Fix a compiler warning in initStringInfo().
[pgsql.git] / src / fe_utils / astreamer_lz4.c
blob781aaf99f38fe8ade5abc33167025890e622c67c
1 /*-------------------------------------------------------------------------
3 * astreamer_lz4.c
5 * Archive streamers that deal with data compressed using lz4.
6 * astreamer_lz4_compressor applies lz4 compression to the input stream,
7 * and astreamer_lz4_decompressor does the reverse.
9 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
11 * IDENTIFICATION
12 * src/fe_utils/astreamer_lz4.c
13 *-------------------------------------------------------------------------
16 #include "postgres_fe.h"
18 #include <unistd.h>
20 #ifdef USE_LZ4
21 #include <lz4frame.h>
22 #endif
24 #include "common/logging.h"
25 #include "fe_utils/astreamer.h"
27 #ifdef USE_LZ4
28 typedef struct astreamer_lz4_frame
30 astreamer base;
32 LZ4F_compressionContext_t cctx;
33 LZ4F_decompressionContext_t dctx;
34 LZ4F_preferences_t prefs;
36 size_t bytes_written;
37 bool header_written;
38 } astreamer_lz4_frame;
40 static void astreamer_lz4_compressor_content(astreamer *streamer,
41 astreamer_member *member,
42 const char *data, int len,
43 astreamer_archive_context context);
44 static void astreamer_lz4_compressor_finalize(astreamer *streamer);
45 static void astreamer_lz4_compressor_free(astreamer *streamer);
47 static const astreamer_ops astreamer_lz4_compressor_ops = {
48 .content = astreamer_lz4_compressor_content,
49 .finalize = astreamer_lz4_compressor_finalize,
50 .free = astreamer_lz4_compressor_free
53 static void astreamer_lz4_decompressor_content(astreamer *streamer,
54 astreamer_member *member,
55 const char *data, int len,
56 astreamer_archive_context context);
57 static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
58 static void astreamer_lz4_decompressor_free(astreamer *streamer);
60 static const astreamer_ops astreamer_lz4_decompressor_ops = {
61 .content = astreamer_lz4_decompressor_content,
62 .finalize = astreamer_lz4_decompressor_finalize,
63 .free = astreamer_lz4_decompressor_free
65 #endif
68 * Create a new base backup streamer that performs lz4 compression of tar
69 * blocks.
71 astreamer *
72 astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
74 #ifdef USE_LZ4
75 astreamer_lz4_frame *streamer;
76 LZ4F_errorCode_t ctxError;
77 LZ4F_preferences_t *prefs;
79 Assert(next != NULL);
81 streamer = palloc0(sizeof(astreamer_lz4_frame));
82 *((const astreamer_ops **) &streamer->base.bbs_ops) =
83 &astreamer_lz4_compressor_ops;
85 streamer->base.bbs_next = next;
86 initStringInfo(&streamer->base.bbs_buffer);
87 streamer->header_written = false;
89 /* Initialize stream compression preferences */
90 prefs = &streamer->prefs;
91 memset(prefs, 0, sizeof(LZ4F_preferences_t));
92 prefs->frameInfo.blockSizeID = LZ4F_max256KB;
93 prefs->compressionLevel = compress->level;
95 ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
96 if (LZ4F_isError(ctxError))
97 pg_log_error("could not create lz4 compression context: %s",
98 LZ4F_getErrorName(ctxError));
100 return &streamer->base;
101 #else
102 pg_fatal("this build does not support compression with %s", "LZ4");
103 return NULL; /* keep compiler quiet */
104 #endif
107 #ifdef USE_LZ4
109 * Compress the input data to output buffer.
111 * Find out the compression bound based on input data length for each
112 * invocation to make sure that output buffer has enough capacity to
113 * accommodate the compressed data. In case if the output buffer
114 * capacity falls short of compression bound then forward the content
115 * of output buffer to next streamer and empty the buffer.
117 static void
118 astreamer_lz4_compressor_content(astreamer *streamer,
119 astreamer_member *member,
120 const char *data, int len,
121 astreamer_archive_context context)
123 astreamer_lz4_frame *mystreamer;
124 uint8 *next_in,
125 *next_out;
126 size_t out_bound,
127 compressed_size,
128 avail_out;
130 mystreamer = (astreamer_lz4_frame *) streamer;
131 next_in = (uint8 *) data;
133 /* Write header before processing the first input chunk. */
134 if (!mystreamer->header_written)
136 compressed_size = LZ4F_compressBegin(mystreamer->cctx,
137 (uint8 *) mystreamer->base.bbs_buffer.data,
138 mystreamer->base.bbs_buffer.maxlen,
139 &mystreamer->prefs);
141 if (LZ4F_isError(compressed_size))
142 pg_log_error("could not write lz4 header: %s",
143 LZ4F_getErrorName(compressed_size));
145 mystreamer->bytes_written += compressed_size;
146 mystreamer->header_written = true;
150 * Update the offset and capacity of output buffer based on number of
151 * bytes written to output buffer.
153 next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
154 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
157 * Find out the compression bound and make sure that output buffer has the
158 * required capacity for the success of LZ4F_compressUpdate. If needed
159 * forward the content to next streamer and empty the buffer.
161 out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
162 if (avail_out < out_bound)
164 astreamer_content(mystreamer->base.bbs_next, member,
165 mystreamer->base.bbs_buffer.data,
166 mystreamer->bytes_written,
167 context);
169 /* Enlarge buffer if it falls short of out bound. */
170 if (mystreamer->base.bbs_buffer.maxlen < out_bound)
171 enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
173 avail_out = mystreamer->base.bbs_buffer.maxlen;
174 mystreamer->bytes_written = 0;
175 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
179 * This call compresses the data starting at next_in and generates the
180 * output starting at next_out. It expects the caller to provide the size
181 * of input buffer and capacity of output buffer by providing parameters
182 * len and avail_out.
184 * It returns the number of bytes compressed to output buffer.
186 compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
187 next_out, avail_out,
188 next_in, len, NULL);
190 if (LZ4F_isError(compressed_size))
191 pg_log_error("could not compress data: %s",
192 LZ4F_getErrorName(compressed_size));
194 mystreamer->bytes_written += compressed_size;
198 * End-of-stream processing.
200 static void
201 astreamer_lz4_compressor_finalize(astreamer *streamer)
203 astreamer_lz4_frame *mystreamer;
204 uint8 *next_out;
205 size_t footer_bound,
206 compressed_size,
207 avail_out;
209 mystreamer = (astreamer_lz4_frame *) streamer;
211 /* Find out the footer bound and update the output buffer. */
212 footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
213 if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
214 footer_bound)
216 astreamer_content(mystreamer->base.bbs_next, NULL,
217 mystreamer->base.bbs_buffer.data,
218 mystreamer->bytes_written,
219 ASTREAMER_UNKNOWN);
221 /* Enlarge buffer if it falls short of footer bound. */
222 if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
223 enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
225 avail_out = mystreamer->base.bbs_buffer.maxlen;
226 mystreamer->bytes_written = 0;
227 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
229 else
231 next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
232 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
236 * Finalize the frame and flush whatever data remaining in compression
237 * context.
239 compressed_size = LZ4F_compressEnd(mystreamer->cctx,
240 next_out, avail_out, NULL);
242 if (LZ4F_isError(compressed_size))
243 pg_log_error("could not end lz4 compression: %s",
244 LZ4F_getErrorName(compressed_size));
246 mystreamer->bytes_written += compressed_size;
248 astreamer_content(mystreamer->base.bbs_next, NULL,
249 mystreamer->base.bbs_buffer.data,
250 mystreamer->bytes_written,
251 ASTREAMER_UNKNOWN);
253 astreamer_finalize(mystreamer->base.bbs_next);
257 * Free memory.
259 static void
260 astreamer_lz4_compressor_free(astreamer *streamer)
262 astreamer_lz4_frame *mystreamer;
264 mystreamer = (astreamer_lz4_frame *) streamer;
265 astreamer_free(streamer->bbs_next);
266 LZ4F_freeCompressionContext(mystreamer->cctx);
267 pfree(streamer->bbs_buffer.data);
268 pfree(streamer);
270 #endif
273 * Create a new base backup streamer that performs decompression of lz4
274 * compressed blocks.
276 astreamer *
277 astreamer_lz4_decompressor_new(astreamer *next)
279 #ifdef USE_LZ4
280 astreamer_lz4_frame *streamer;
281 LZ4F_errorCode_t ctxError;
283 Assert(next != NULL);
285 streamer = palloc0(sizeof(astreamer_lz4_frame));
286 *((const astreamer_ops **) &streamer->base.bbs_ops) =
287 &astreamer_lz4_decompressor_ops;
289 streamer->base.bbs_next = next;
290 initStringInfo(&streamer->base.bbs_buffer);
292 /* Initialize internal stream state for decompression */
293 ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
294 if (LZ4F_isError(ctxError))
295 pg_fatal("could not initialize compression library: %s",
296 LZ4F_getErrorName(ctxError));
298 return &streamer->base;
299 #else
300 pg_fatal("this build does not support compression with %s", "LZ4");
301 return NULL; /* keep compiler quiet */
302 #endif
305 #ifdef USE_LZ4
307 * Decompress the input data to output buffer until we run out of input
308 * data. Each time the output buffer is full, pass on the decompressed data
309 * to the next streamer.
311 static void
312 astreamer_lz4_decompressor_content(astreamer *streamer,
313 astreamer_member *member,
314 const char *data, int len,
315 astreamer_archive_context context)
317 astreamer_lz4_frame *mystreamer;
318 uint8 *next_in,
319 *next_out;
320 size_t avail_in,
321 avail_out;
323 mystreamer = (astreamer_lz4_frame *) streamer;
324 next_in = (uint8 *) data;
325 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
326 avail_in = len;
327 avail_out = mystreamer->base.bbs_buffer.maxlen;
329 while (avail_in > 0)
331 size_t ret,
332 read_size,
333 out_size;
335 read_size = avail_in;
336 out_size = avail_out;
339 * This call decompresses the data starting at next_in and generates
340 * the output data starting at next_out. It expects the caller to
341 * provide size of the input buffer and total capacity of the output
342 * buffer by providing the read_size and out_size parameters
343 * respectively.
345 * Per the documentation of LZ4, parameters read_size and out_size
346 * behaves as dual parameters. On return, the number of bytes consumed
347 * from the input buffer will be written back to read_size and the
348 * number of bytes decompressed to output buffer will be written back
349 * to out_size respectively.
351 ret = LZ4F_decompress(mystreamer->dctx,
352 next_out, &out_size,
353 next_in, &read_size, NULL);
355 if (LZ4F_isError(ret))
356 pg_log_error("could not decompress data: %s",
357 LZ4F_getErrorName(ret));
359 /* Update input buffer based on number of bytes consumed */
360 avail_in -= read_size;
361 next_in += read_size;
363 mystreamer->bytes_written += out_size;
366 * If output buffer is full then forward the content to next streamer
367 * and update the output buffer.
369 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
371 astreamer_content(mystreamer->base.bbs_next, member,
372 mystreamer->base.bbs_buffer.data,
373 mystreamer->base.bbs_buffer.maxlen,
374 context);
376 avail_out = mystreamer->base.bbs_buffer.maxlen;
377 mystreamer->bytes_written = 0;
378 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
380 else
382 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
383 next_out += mystreamer->bytes_written;
389 * End-of-stream processing.
391 static void
392 astreamer_lz4_decompressor_finalize(astreamer *streamer)
394 astreamer_lz4_frame *mystreamer;
396 mystreamer = (astreamer_lz4_frame *) streamer;
399 * End of the stream, if there is some pending data in output buffers then
400 * we must forward it to next streamer.
402 astreamer_content(mystreamer->base.bbs_next, NULL,
403 mystreamer->base.bbs_buffer.data,
404 mystreamer->base.bbs_buffer.maxlen,
405 ASTREAMER_UNKNOWN);
407 astreamer_finalize(mystreamer->base.bbs_next);
411 * Free memory.
413 static void
414 astreamer_lz4_decompressor_free(astreamer *streamer)
416 astreamer_lz4_frame *mystreamer;
418 mystreamer = (astreamer_lz4_frame *) streamer;
419 astreamer_free(streamer->bbs_next);
420 LZ4F_freeDecompressionContext(mystreamer->dctx);
421 pfree(streamer->bbs_buffer.data);
422 pfree(streamer);
424 #endif