1 /*-------------------------------------------------------------------------
5 * Archive streamers that deal with data compressed using gzip.
6 * astreamer_gzip_writer applies gzip compression to the input data
7 * and writes the result to a file. astreamer_gzip_decompressor assumes
8 * that the input stream is compressed using gzip and decompresses it.
10 * Note that the code in this file is asymmetric with what we do for
11 * other compression types: for lz4 and zstd, there is a compressor and
12 * a decompressor, rather than a writer and a decompressor. The approach
13 * taken here is less flexible, because a writer can only write to a file,
14 * while a compressor can write to a subsequent astreamer which is free
15 * to do whatever it likes. The reason it's like this is because this
16 * code was adapted from old, less-modular pg_basebackup code that used
17 * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
18 * necessary to change anything at the time.
20 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
23 * src/bin/pg_basebackup/astreamer_gzip.c
24 *-------------------------------------------------------------------------
27 #include "postgres_fe.h"
35 #include "common/logging.h"
36 #include "fe_utils/astreamer.h"
39 typedef struct astreamer_gzip_writer
44 } astreamer_gzip_writer
;
46 typedef struct astreamer_gzip_decompressor
51 } astreamer_gzip_decompressor
;
53 static void astreamer_gzip_writer_content(astreamer
*streamer
,
54 astreamer_member
*member
,
55 const char *data
, int len
,
56 astreamer_archive_context context
);
57 static void astreamer_gzip_writer_finalize(astreamer
*streamer
);
58 static void astreamer_gzip_writer_free(astreamer
*streamer
);
59 static const char *get_gz_error(gzFile gzf
);
61 static const astreamer_ops astreamer_gzip_writer_ops
= {
62 .content
= astreamer_gzip_writer_content
,
63 .finalize
= astreamer_gzip_writer_finalize
,
64 .free
= astreamer_gzip_writer_free
67 static void astreamer_gzip_decompressor_content(astreamer
*streamer
,
68 astreamer_member
*member
,
69 const char *data
, int len
,
70 astreamer_archive_context context
);
71 static void astreamer_gzip_decompressor_finalize(astreamer
*streamer
);
72 static void astreamer_gzip_decompressor_free(astreamer
*streamer
);
73 static void *gzip_palloc(void *opaque
, unsigned items
, unsigned size
);
74 static void gzip_pfree(void *opaque
, void *address
);
76 static const astreamer_ops astreamer_gzip_decompressor_ops
= {
77 .content
= astreamer_gzip_decompressor_content
,
78 .finalize
= astreamer_gzip_decompressor_finalize
,
79 .free
= astreamer_gzip_decompressor_free
84 * Create a astreamer that just compresses data using gzip, and then writes
87 * The caller must specify a pathname and may specify a file. The pathname is
88 * used for error-reporting purposes either way. If file is NULL, the pathname
89 * also identifies the file to which the data should be written: it is opened
90 * for writing and closed when done. If file is not NULL, the data is written
93 * Note that zlib does not use the FILE interface, but operates directly on
94 * a duplicate of the underlying fd. Hence, callers must take care if they
95 * plan to write any other data to the same FILE, either before or after using
99 astreamer_gzip_writer_new(char *pathname
, FILE *file
,
100 pg_compress_specification
*compress
)
103 astreamer_gzip_writer
*streamer
;
105 streamer
= palloc0(sizeof(astreamer_gzip_writer
));
106 *((const astreamer_ops
**) &streamer
->base
.bbs_ops
) =
107 &astreamer_gzip_writer_ops
;
109 streamer
->pathname
= pstrdup(pathname
);
113 streamer
->gzfile
= gzopen(pathname
, "wb");
114 if (streamer
->gzfile
== NULL
)
115 pg_fatal("could not create compressed file \"%s\": %m",
121 * We must dup the file handle so that gzclose doesn't break the
122 * caller's FILE. See comment for astreamer_gzip_writer_finalize.
124 int fd
= dup(fileno(file
));
127 pg_fatal("could not duplicate stdout: %m");
129 streamer
->gzfile
= gzdopen(fd
, "wb");
130 if (streamer
->gzfile
== NULL
)
131 pg_fatal("could not open output file: %m");
134 if (gzsetparams(streamer
->gzfile
, compress
->level
, Z_DEFAULT_STRATEGY
) != Z_OK
)
135 pg_fatal("could not set compression level %d: %s",
136 compress
->level
, get_gz_error(streamer
->gzfile
));
138 return &streamer
->base
;
140 pg_fatal("this build does not support compression with %s", "gzip");
141 return NULL
; /* keep compiler quiet */
147 * Write archive content to gzip file.
150 astreamer_gzip_writer_content(astreamer
*streamer
,
151 astreamer_member
*member
, const char *data
,
152 int len
, astreamer_archive_context context
)
154 astreamer_gzip_writer
*mystreamer
;
156 mystreamer
= (astreamer_gzip_writer
*) streamer
;
162 if (gzwrite(mystreamer
->gzfile
, data
, len
) != len
)
164 /* if write didn't set errno, assume problem is no disk space */
167 pg_fatal("could not write to compressed file \"%s\": %s",
168 mystreamer
->pathname
, get_gz_error(mystreamer
->gzfile
));
173 * End-of-archive processing when writing to a gzip file consists of just
176 * It makes no difference whether we opened the file or the caller did it,
177 * because libz provides no way of avoiding a close on the underlying file
178 * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
179 * work around this issue, so that the behavior from the caller's viewpoint
180 * is the same as for astreamer_plain_writer.
183 astreamer_gzip_writer_finalize(astreamer
*streamer
)
185 astreamer_gzip_writer
*mystreamer
;
187 mystreamer
= (astreamer_gzip_writer
*) streamer
;
189 errno
= 0; /* in case gzclose() doesn't set it */
190 if (gzclose(mystreamer
->gzfile
) != 0)
191 pg_fatal("could not close compressed file \"%s\": %m",
192 mystreamer
->pathname
);
194 mystreamer
->gzfile
= NULL
;
198 * Free memory associated with this astreamer.
201 astreamer_gzip_writer_free(astreamer
*streamer
)
203 astreamer_gzip_writer
*mystreamer
;
205 mystreamer
= (astreamer_gzip_writer
*) streamer
;
207 Assert(mystreamer
->base
.bbs_next
== NULL
);
208 Assert(mystreamer
->gzfile
== NULL
);
210 pfree(mystreamer
->pathname
);
215 * Helper function for libz error reporting.
218 get_gz_error(gzFile gzf
)
223 errmsg
= gzerror(gzf
, &errnum
);
224 if (errnum
== Z_ERRNO
)
225 return strerror(errno
);
232 * Create a new base backup streamer that performs decompression of gzip
236 astreamer_gzip_decompressor_new(astreamer
*next
)
239 astreamer_gzip_decompressor
*streamer
;
242 Assert(next
!= NULL
);
244 streamer
= palloc0(sizeof(astreamer_gzip_decompressor
));
245 *((const astreamer_ops
**) &streamer
->base
.bbs_ops
) =
246 &astreamer_gzip_decompressor_ops
;
248 streamer
->base
.bbs_next
= next
;
249 initStringInfo(&streamer
->base
.bbs_buffer
);
251 /* Initialize internal stream state for decompression */
252 zs
= &streamer
->zstream
;
253 zs
->zalloc
= gzip_palloc
;
254 zs
->zfree
= gzip_pfree
;
255 zs
->next_out
= (uint8
*) streamer
->base
.bbs_buffer
.data
;
256 zs
->avail_out
= streamer
->base
.bbs_buffer
.maxlen
;
259 * Data compression was initialized using deflateInit2 to request a gzip
260 * header. Similarly, we are using inflateInit2 to initialize data
263 * Per the documentation for inflateInit2, the second argument is
264 * "windowBits" and its value must be greater than or equal to the value
265 * provided while compressing the data, so we are using the maximum
266 * possible value for safety.
268 if (inflateInit2(zs
, 15 + 16) != Z_OK
)
269 pg_fatal("could not initialize compression library");
271 return &streamer
->base
;
273 pg_fatal("this build does not support compression with %s", "gzip");
274 return NULL
; /* keep compiler quiet */
280 * Decompress the input data to output buffer until we run out of input
281 * data. Each time the output buffer is full, pass on the decompressed data
282 * to the next streamer.
285 astreamer_gzip_decompressor_content(astreamer
*streamer
,
286 astreamer_member
*member
,
287 const char *data
, int len
,
288 astreamer_archive_context context
)
290 astreamer_gzip_decompressor
*mystreamer
;
293 mystreamer
= (astreamer_gzip_decompressor
*) streamer
;
295 zs
= &mystreamer
->zstream
;
296 zs
->next_in
= (const uint8
*) data
;
299 /* Process the current chunk */
300 while (zs
->avail_in
> 0)
304 Assert(mystreamer
->bytes_written
< mystreamer
->base
.bbs_buffer
.maxlen
);
306 zs
->next_out
= (uint8
*)
307 mystreamer
->base
.bbs_buffer
.data
+ mystreamer
->bytes_written
;
309 mystreamer
->base
.bbs_buffer
.maxlen
- mystreamer
->bytes_written
;
312 * This call decompresses data starting at zs->next_in and updates
313 * zs->next_in * and zs->avail_in. It generates output data starting
314 * at zs->next_out and updates zs->next_out and zs->avail_out
317 res
= inflate(zs
, Z_NO_FLUSH
);
319 if (res
== Z_STREAM_ERROR
)
320 pg_log_error("could not decompress data: %s", zs
->msg
);
322 mystreamer
->bytes_written
=
323 mystreamer
->base
.bbs_buffer
.maxlen
- zs
->avail_out
;
325 /* If output buffer is full then pass data to next streamer */
326 if (mystreamer
->bytes_written
>= mystreamer
->base
.bbs_buffer
.maxlen
)
328 astreamer_content(mystreamer
->base
.bbs_next
, member
,
329 mystreamer
->base
.bbs_buffer
.data
,
330 mystreamer
->base
.bbs_buffer
.maxlen
, context
);
331 mystreamer
->bytes_written
= 0;
337 * End-of-stream processing.
340 astreamer_gzip_decompressor_finalize(astreamer
*streamer
)
342 astreamer_gzip_decompressor
*mystreamer
;
344 mystreamer
= (astreamer_gzip_decompressor
*) streamer
;
347 * End of the stream, if there is some pending data in output buffers then
348 * we must forward it to next streamer.
350 astreamer_content(mystreamer
->base
.bbs_next
, NULL
,
351 mystreamer
->base
.bbs_buffer
.data
,
352 mystreamer
->base
.bbs_buffer
.maxlen
,
355 astreamer_finalize(mystreamer
->base
.bbs_next
);
362 astreamer_gzip_decompressor_free(astreamer
*streamer
)
364 astreamer_free(streamer
->bbs_next
);
365 pfree(streamer
->bbs_buffer
.data
);
370 * Wrapper function to adjust the signature of palloc to match what libz
374 gzip_palloc(void *opaque
, unsigned items
, unsigned size
)
376 return palloc(items
* size
);
380 * Wrapper function to adjust the signature of pfree to match what libz
384 gzip_pfree(void *opaque
, void *address
)