1 /*-------------------------------------------------------------------------
4 * Basebackup sink implementing lz4 compression.
6 * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
9 * src/backend/backup/basebackup_lz4.c
11 *-------------------------------------------------------------------------
19 #include "backup/basebackup_sink.h"
23 typedef struct bbsink_lz4
25 /* Common information for all types of sink. */
28 /* Compression level. */
31 LZ4F_compressionContext_t ctx
;
32 LZ4F_preferences_t prefs
;
34 /* Number of bytes staged in output buffer. */
38 static void bbsink_lz4_begin_backup(bbsink
*sink
);
39 static void bbsink_lz4_begin_archive(bbsink
*sink
, const char *archive_name
);
40 static void bbsink_lz4_archive_contents(bbsink
*sink
, size_t avail_in
);
41 static void bbsink_lz4_manifest_contents(bbsink
*sink
, size_t len
);
42 static void bbsink_lz4_end_archive(bbsink
*sink
);
43 static void bbsink_lz4_cleanup(bbsink
*sink
);
45 static const bbsink_ops bbsink_lz4_ops
= {
46 .begin_backup
= bbsink_lz4_begin_backup
,
47 .begin_archive
= bbsink_lz4_begin_archive
,
48 .archive_contents
= bbsink_lz4_archive_contents
,
49 .end_archive
= bbsink_lz4_end_archive
,
50 .begin_manifest
= bbsink_forward_begin_manifest
,
51 .manifest_contents
= bbsink_lz4_manifest_contents
,
52 .end_manifest
= bbsink_forward_end_manifest
,
53 .end_backup
= bbsink_forward_end_backup
,
54 .cleanup
= bbsink_lz4_cleanup
59 * Create a new basebackup sink that performs lz4 compression.
62 bbsink_lz4_new(bbsink
*next
, pg_compress_specification
*compress
)
66 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
67 errmsg("lz4 compression is not supported by this build")));
68 return NULL
; /* keep compiler quiet */
75 compresslevel
= compress
->level
;
76 Assert(compresslevel
>= 0 && compresslevel
<= 12);
78 sink
= palloc0(sizeof(bbsink_lz4
));
79 *((const bbsink_ops
**) &sink
->base
.bbs_ops
) = &bbsink_lz4_ops
;
80 sink
->base
.bbs_next
= next
;
81 sink
->compresslevel
= compresslevel
;
93 bbsink_lz4_begin_backup(bbsink
*sink
)
95 bbsink_lz4
*mysink
= (bbsink_lz4
*) sink
;
96 size_t output_buffer_bound
;
97 LZ4F_preferences_t
*prefs
= &mysink
->prefs
;
99 /* Initialize compressor object. */
100 memset(prefs
, 0, sizeof(LZ4F_preferences_t
));
101 prefs
->frameInfo
.blockSizeID
= LZ4F_max256KB
;
102 prefs
->compressionLevel
= mysink
->compresslevel
;
105 * We need our own buffer, because we're going to pass different data to
106 * the next sink than what gets passed to us.
108 mysink
->base
.bbs_buffer
= palloc(mysink
->base
.bbs_buffer_length
);
111 * Since LZ4F_compressUpdate() requires the output buffer of size equal or
112 * greater than that of LZ4F_compressBound(), make sure we have the next
113 * sink's bbs_buffer of length that can accommodate the compressed input
116 output_buffer_bound
= LZ4F_compressBound(mysink
->base
.bbs_buffer_length
,
120 * The buffer length is expected to be a multiple of BLCKSZ, so round up.
122 output_buffer_bound
= output_buffer_bound
+ BLCKSZ
-
123 (output_buffer_bound
% BLCKSZ
);
125 bbsink_begin_backup(sink
->bbs_next
, sink
->bbs_state
, output_buffer_bound
);
129 * Prepare to compress the next archive.
132 bbsink_lz4_begin_archive(bbsink
*sink
, const char *archive_name
)
134 bbsink_lz4
*mysink
= (bbsink_lz4
*) sink
;
135 char *lz4_archive_name
;
136 LZ4F_errorCode_t ctxError
;
139 ctxError
= LZ4F_createCompressionContext(&mysink
->ctx
, LZ4F_VERSION
);
140 if (LZ4F_isError(ctxError
))
141 elog(ERROR
, "could not create lz4 compression context: %s",
142 LZ4F_getErrorName(ctxError
));
144 /* First of all write the frame header to destination buffer. */
145 headerSize
= LZ4F_compressBegin(mysink
->ctx
,
146 mysink
->base
.bbs_next
->bbs_buffer
,
147 mysink
->base
.bbs_next
->bbs_buffer_length
,
150 if (LZ4F_isError(headerSize
))
151 elog(ERROR
, "could not write lz4 header: %s",
152 LZ4F_getErrorName(headerSize
));
155 * We need to write the compressed data after the header in the output
156 * buffer. So, make sure to update the notion of bytes written to output
159 mysink
->bytes_written
+= headerSize
;
161 /* Add ".lz4" to the archive name. */
162 lz4_archive_name
= psprintf("%s.lz4", archive_name
);
163 Assert(sink
->bbs_next
!= NULL
);
164 bbsink_begin_archive(sink
->bbs_next
, lz4_archive_name
);
165 pfree(lz4_archive_name
);
169 * Compress the input data to the output buffer until we run out of input
170 * data. Each time the output buffer falls below the compression bound for
171 * the input buffer, invoke the archive_contents() method for then next sink.
173 * Note that since we're compressing the input, it may very commonly happen
174 * that we consume all the input data without filling the output buffer. In
175 * that case, the compressed representation of the current input data won't
176 * actually be sent to the next bbsink until a later call to this function,
177 * or perhaps even not until bbsink_lz4_end_archive() is invoked.
180 bbsink_lz4_archive_contents(bbsink
*sink
, size_t avail_in
)
182 bbsink_lz4
*mysink
= (bbsink_lz4
*) sink
;
183 size_t compressedSize
;
184 size_t avail_in_bound
;
186 avail_in_bound
= LZ4F_compressBound(avail_in
, &mysink
->prefs
);
189 * If the number of available bytes has fallen below the value computed by
190 * LZ4F_compressBound(), ask the next sink to process the data so that we
191 * can empty the buffer.
193 if ((mysink
->base
.bbs_next
->bbs_buffer_length
- mysink
->bytes_written
) <
196 bbsink_archive_contents(sink
->bbs_next
, mysink
->bytes_written
);
197 mysink
->bytes_written
= 0;
201 * Compress the input buffer and write it into the output buffer.
203 compressedSize
= LZ4F_compressUpdate(mysink
->ctx
,
204 mysink
->base
.bbs_next
->bbs_buffer
+ mysink
->bytes_written
,
205 mysink
->base
.bbs_next
->bbs_buffer_length
- mysink
->bytes_written
,
206 (uint8
*) mysink
->base
.bbs_buffer
,
210 if (LZ4F_isError(compressedSize
))
211 elog(ERROR
, "could not compress data: %s",
212 LZ4F_getErrorName(compressedSize
));
215 * Update our notion of how many bytes we've written into output buffer.
217 mysink
->bytes_written
+= compressedSize
;
221 * There might be some data inside lz4's internal buffers; we need to get
222 * that flushed out and also finalize the lz4 frame and then get that forwarded
223 * to the successor sink as archive content.
225 * Then we can end processing for this archive.
228 bbsink_lz4_end_archive(bbsink
*sink
)
230 bbsink_lz4
*mysink
= (bbsink_lz4
*) sink
;
231 size_t compressedSize
;
232 size_t lz4_footer_bound
;
234 lz4_footer_bound
= LZ4F_compressBound(0, &mysink
->prefs
);
236 Assert(mysink
->base
.bbs_next
->bbs_buffer_length
>= lz4_footer_bound
);
238 if ((mysink
->base
.bbs_next
->bbs_buffer_length
- mysink
->bytes_written
) <
241 bbsink_archive_contents(sink
->bbs_next
, mysink
->bytes_written
);
242 mysink
->bytes_written
= 0;
245 compressedSize
= LZ4F_compressEnd(mysink
->ctx
,
246 mysink
->base
.bbs_next
->bbs_buffer
+ mysink
->bytes_written
,
247 mysink
->base
.bbs_next
->bbs_buffer_length
- mysink
->bytes_written
,
250 if (LZ4F_isError(compressedSize
))
251 elog(ERROR
, "could not end lz4 compression: %s",
252 LZ4F_getErrorName(compressedSize
));
254 /* Update our notion of how many bytes we've written. */
255 mysink
->bytes_written
+= compressedSize
;
257 /* Send whatever accumulated output bytes we have. */
258 bbsink_archive_contents(sink
->bbs_next
, mysink
->bytes_written
);
259 mysink
->bytes_written
= 0;
261 /* Release the resources. */
262 LZ4F_freeCompressionContext(mysink
->ctx
);
265 /* Pass on the information that this archive has ended. */
266 bbsink_forward_end_archive(sink
);
270 * Manifest contents are not compressed, but we do need to copy them into
271 * the successor sink's buffer, because we have our own.
274 bbsink_lz4_manifest_contents(bbsink
*sink
, size_t len
)
276 memcpy(sink
->bbs_next
->bbs_buffer
, sink
->bbs_buffer
, len
);
277 bbsink_manifest_contents(sink
->bbs_next
, len
);
281 * In case the backup fails, make sure we free the compression context by
282 * calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
285 bbsink_lz4_cleanup(bbsink
*sink
)
287 bbsink_lz4
*mysink
= (bbsink_lz4
*) sink
;
291 LZ4F_freeCompressionContext(mysink
->ctx
);