doc: Fix section of functions age(xid) and mxid_age(xid)
[pgsql.git] / src / fe_utils / astreamer_gzip.c
blobe0b755317cbad27c6eda76963ae0f20f9fa39fe3
1 /*-------------------------------------------------------------------------
3 * astreamer_gzip.c
5 * Archive streamers that deal with data compressed using gzip.
6 * astreamer_gzip_writer applies gzip compression to the input data
7 * and writes the result to a file. astreamer_gzip_decompressor assumes
8 * that the input stream is compressed using gzip and decompresses it.
10 * Note that the code in this file is asymmetric with what we do for
11 * other compression types: for lz4 and zstd, there is a compressor and
12 * a decompressor, rather than a writer and a decompressor. The approach
13 * taken here is less flexible, because a writer can only write to a file,
14 * while a compressor can write to a subsequent astreamer which is free
15 * to do whatever it likes. The reason it's like this is because this
16 * code was adapted from old, less-modular pg_basebackup code that used
17 * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
18 * necessary to change anything at the time.
20 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
22 * IDENTIFICATION
23 * src/bin/pg_basebackup/astreamer_gzip.c
24 *-------------------------------------------------------------------------
27 #include "postgres_fe.h"
29 #include <unistd.h>
31 #ifdef HAVE_LIBZ
32 #include <zlib.h>
33 #endif
35 #include "common/logging.h"
36 #include "fe_utils/astreamer.h"
38 #ifdef HAVE_LIBZ
39 typedef struct astreamer_gzip_writer
41 astreamer base;
42 char *pathname;
43 gzFile gzfile;
44 } astreamer_gzip_writer;
46 typedef struct astreamer_gzip_decompressor
48 astreamer base;
49 z_stream zstream;
50 size_t bytes_written;
51 } astreamer_gzip_decompressor;
53 static void astreamer_gzip_writer_content(astreamer *streamer,
54 astreamer_member *member,
55 const char *data, int len,
56 astreamer_archive_context context);
57 static void astreamer_gzip_writer_finalize(astreamer *streamer);
58 static void astreamer_gzip_writer_free(astreamer *streamer);
59 static const char *get_gz_error(gzFile gzf);
61 static const astreamer_ops astreamer_gzip_writer_ops = {
62 .content = astreamer_gzip_writer_content,
63 .finalize = astreamer_gzip_writer_finalize,
64 .free = astreamer_gzip_writer_free
67 static void astreamer_gzip_decompressor_content(astreamer *streamer,
68 astreamer_member *member,
69 const char *data, int len,
70 astreamer_archive_context context);
71 static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
72 static void astreamer_gzip_decompressor_free(astreamer *streamer);
73 static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
74 static void gzip_pfree(void *opaque, void *address);
76 static const astreamer_ops astreamer_gzip_decompressor_ops = {
77 .content = astreamer_gzip_decompressor_content,
78 .finalize = astreamer_gzip_decompressor_finalize,
79 .free = astreamer_gzip_decompressor_free
81 #endif
84 * Create a astreamer that just compresses data using gzip, and then writes
85 * it to a file.
87 * The caller must specify a pathname and may specify a file. The pathname is
88 * used for error-reporting purposes either way. If file is NULL, the pathname
89 * also identifies the file to which the data should be written: it is opened
90 * for writing and closed when done. If file is not NULL, the data is written
91 * there.
93 * Note that zlib does not use the FILE interface, but operates directly on
94 * a duplicate of the underlying fd. Hence, callers must take care if they
95 * plan to write any other data to the same FILE, either before or after using
96 * this.
98 astreamer *
99 astreamer_gzip_writer_new(char *pathname, FILE *file,
100 pg_compress_specification *compress)
102 #ifdef HAVE_LIBZ
103 astreamer_gzip_writer *streamer;
105 streamer = palloc0(sizeof(astreamer_gzip_writer));
106 *((const astreamer_ops **) &streamer->base.bbs_ops) =
107 &astreamer_gzip_writer_ops;
109 streamer->pathname = pstrdup(pathname);
111 if (file == NULL)
113 streamer->gzfile = gzopen(pathname, "wb");
114 if (streamer->gzfile == NULL)
115 pg_fatal("could not create compressed file \"%s\": %m",
116 pathname);
118 else
121 * We must dup the file handle so that gzclose doesn't break the
122 * caller's FILE. See comment for astreamer_gzip_writer_finalize.
124 int fd = dup(fileno(file));
126 if (fd < 0)
127 pg_fatal("could not duplicate stdout: %m");
129 streamer->gzfile = gzdopen(fd, "wb");
130 if (streamer->gzfile == NULL)
131 pg_fatal("could not open output file: %m");
134 if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
135 pg_fatal("could not set compression level %d: %s",
136 compress->level, get_gz_error(streamer->gzfile));
138 return &streamer->base;
139 #else
140 pg_fatal("this build does not support compression with %s", "gzip");
141 return NULL; /* keep compiler quiet */
142 #endif
145 #ifdef HAVE_LIBZ
147 * Write archive content to gzip file.
149 static void
150 astreamer_gzip_writer_content(astreamer *streamer,
151 astreamer_member *member, const char *data,
152 int len, astreamer_archive_context context)
154 astreamer_gzip_writer *mystreamer;
156 mystreamer = (astreamer_gzip_writer *) streamer;
158 if (len == 0)
159 return;
161 errno = 0;
162 if (gzwrite(mystreamer->gzfile, data, len) != len)
164 /* if write didn't set errno, assume problem is no disk space */
165 if (errno == 0)
166 errno = ENOSPC;
167 pg_fatal("could not write to compressed file \"%s\": %s",
168 mystreamer->pathname, get_gz_error(mystreamer->gzfile));
173 * End-of-archive processing when writing to a gzip file consists of just
174 * calling gzclose.
176 * It makes no difference whether we opened the file or the caller did it,
177 * because libz provides no way of avoiding a close on the underlying file
178 * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
179 * work around this issue, so that the behavior from the caller's viewpoint
180 * is the same as for astreamer_plain_writer.
182 static void
183 astreamer_gzip_writer_finalize(astreamer *streamer)
185 astreamer_gzip_writer *mystreamer;
187 mystreamer = (astreamer_gzip_writer *) streamer;
189 errno = 0; /* in case gzclose() doesn't set it */
190 if (gzclose(mystreamer->gzfile) != 0)
191 pg_fatal("could not close compressed file \"%s\": %m",
192 mystreamer->pathname);
194 mystreamer->gzfile = NULL;
198 * Free memory associated with this astreamer.
200 static void
201 astreamer_gzip_writer_free(astreamer *streamer)
203 astreamer_gzip_writer *mystreamer;
205 mystreamer = (astreamer_gzip_writer *) streamer;
207 Assert(mystreamer->base.bbs_next == NULL);
208 Assert(mystreamer->gzfile == NULL);
210 pfree(mystreamer->pathname);
211 pfree(mystreamer);
215 * Helper function for libz error reporting.
217 static const char *
218 get_gz_error(gzFile gzf)
220 int errnum;
221 const char *errmsg;
223 errmsg = gzerror(gzf, &errnum);
224 if (errnum == Z_ERRNO)
225 return strerror(errno);
226 else
227 return errmsg;
229 #endif
232 * Create a new base backup streamer that performs decompression of gzip
233 * compressed blocks.
235 astreamer *
236 astreamer_gzip_decompressor_new(astreamer *next)
238 #ifdef HAVE_LIBZ
239 astreamer_gzip_decompressor *streamer;
240 z_stream *zs;
242 Assert(next != NULL);
244 streamer = palloc0(sizeof(astreamer_gzip_decompressor));
245 *((const astreamer_ops **) &streamer->base.bbs_ops) =
246 &astreamer_gzip_decompressor_ops;
248 streamer->base.bbs_next = next;
249 initStringInfo(&streamer->base.bbs_buffer);
251 /* Initialize internal stream state for decompression */
252 zs = &streamer->zstream;
253 zs->zalloc = gzip_palloc;
254 zs->zfree = gzip_pfree;
255 zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
256 zs->avail_out = streamer->base.bbs_buffer.maxlen;
259 * Data compression was initialized using deflateInit2 to request a gzip
260 * header. Similarly, we are using inflateInit2 to initialize data
261 * decompression.
263 * Per the documentation for inflateInit2, the second argument is
264 * "windowBits" and its value must be greater than or equal to the value
265 * provided while compressing the data, so we are using the maximum
266 * possible value for safety.
268 if (inflateInit2(zs, 15 + 16) != Z_OK)
269 pg_fatal("could not initialize compression library");
271 return &streamer->base;
272 #else
273 pg_fatal("this build does not support compression with %s", "gzip");
274 return NULL; /* keep compiler quiet */
275 #endif
278 #ifdef HAVE_LIBZ
280 * Decompress the input data to output buffer until we run out of input
281 * data. Each time the output buffer is full, pass on the decompressed data
282 * to the next streamer.
284 static void
285 astreamer_gzip_decompressor_content(astreamer *streamer,
286 astreamer_member *member,
287 const char *data, int len,
288 astreamer_archive_context context)
290 astreamer_gzip_decompressor *mystreamer;
291 z_stream *zs;
293 mystreamer = (astreamer_gzip_decompressor *) streamer;
295 zs = &mystreamer->zstream;
296 zs->next_in = (const uint8 *) data;
297 zs->avail_in = len;
299 /* Process the current chunk */
300 while (zs->avail_in > 0)
302 int res;
304 Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
306 zs->next_out = (uint8 *)
307 mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
308 zs->avail_out =
309 mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
312 * This call decompresses data starting at zs->next_in and updates
313 * zs->next_in * and zs->avail_in. It generates output data starting
314 * at zs->next_out and updates zs->next_out and zs->avail_out
315 * accordingly.
317 res = inflate(zs, Z_NO_FLUSH);
319 if (res == Z_STREAM_ERROR)
320 pg_log_error("could not decompress data: %s", zs->msg);
322 mystreamer->bytes_written =
323 mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
325 /* If output buffer is full then pass data to next streamer */
326 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
328 astreamer_content(mystreamer->base.bbs_next, member,
329 mystreamer->base.bbs_buffer.data,
330 mystreamer->base.bbs_buffer.maxlen, context);
331 mystreamer->bytes_written = 0;
337 * End-of-stream processing.
339 static void
340 astreamer_gzip_decompressor_finalize(astreamer *streamer)
342 astreamer_gzip_decompressor *mystreamer;
344 mystreamer = (astreamer_gzip_decompressor *) streamer;
347 * End of the stream, if there is some pending data in output buffers then
348 * we must forward it to next streamer.
350 astreamer_content(mystreamer->base.bbs_next, NULL,
351 mystreamer->base.bbs_buffer.data,
352 mystreamer->base.bbs_buffer.maxlen,
353 ASTREAMER_UNKNOWN);
355 astreamer_finalize(mystreamer->base.bbs_next);
359 * Free memory.
361 static void
362 astreamer_gzip_decompressor_free(astreamer *streamer)
364 astreamer_free(streamer->bbs_next);
365 pfree(streamer->bbs_buffer.data);
366 pfree(streamer);
370 * Wrapper function to adjust the signature of palloc to match what libz
371 * expects.
373 static void *
374 gzip_palloc(void *opaque, unsigned items, unsigned size)
376 return palloc(items * size);
380 * Wrapper function to adjust the signature of pfree to match what libz
381 * expects.
383 static void
384 gzip_pfree(void *opaque, void *address)
386 pfree(address);
388 #endif