zpool/zfs: allow --json wherever -j is allowed
[zfs.git] / cmd / zstream / zstream_recompress.c
blob586ac5623aa57b9e5a08ddcf9e650bc762210b99
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or https://opensource.org/licenses/CDDL-1.0.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
23 * Copyright 2022 Axcient. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2022 by Delphix. All rights reserved.
27 * Copyright (c) 2024, Klara, Inc.
30 #include <err.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <unistd.h>
34 #include <sys/zfs_ioctl.h>
35 #include <sys/zio_checksum.h>
36 #include <sys/zstd/zstd.h>
37 #include "zfs_fletcher.h"
38 #include "zstream.h"
40 static int
41 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
42 zio_cksum_t *zc, int outfd)
44 assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
45 == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
46 fletcher_4_incremental_native(drr,
47 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
48 if (drr->drr_type != DRR_BEGIN) {
49 assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
50 drr_checksum.drr_checksum));
51 drr->drr_u.drr_checksum.drr_checksum = *zc;
53 fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
54 sizeof (zio_cksum_t), zc);
55 if (write(outfd, drr, sizeof (*drr)) == -1)
56 return (errno);
57 if (payload_len != 0) {
58 fletcher_4_incremental_native(payload, payload_len, zc);
59 if (write(outfd, payload, payload_len) == -1)
60 return (errno);
62 return (0);
65 int
66 zstream_do_recompress(int argc, char *argv[])
68 int bufsz = SPA_MAXBLOCKSIZE;
69 char *buf = safe_malloc(bufsz);
70 dmu_replay_record_t thedrr;
71 dmu_replay_record_t *drr = &thedrr;
72 zio_cksum_t stream_cksum;
73 int c;
74 int level = 0;
76 while ((c = getopt(argc, argv, "l:")) != -1) {
77 switch (c) {
78 case 'l':
79 if (sscanf(optarg, "%d", &level) != 1) {
80 fprintf(stderr,
81 "failed to parse level '%s'\n",
82 optarg);
83 zstream_usage();
85 break;
86 case '?':
87 (void) fprintf(stderr, "invalid option '%c'\n",
88 optopt);
89 zstream_usage();
90 break;
94 argc -= optind;
95 argv += optind;
97 if (argc != 1)
98 zstream_usage();
100 enum zio_compress ctype;
101 if (strcmp(argv[0], "off") == 0) {
102 ctype = ZIO_COMPRESS_OFF;
103 } else {
104 for (ctype = 0; ctype < ZIO_COMPRESS_FUNCTIONS; ctype++) {
105 if (strcmp(argv[0],
106 zio_compress_table[ctype].ci_name) == 0)
107 break;
109 if (ctype == ZIO_COMPRESS_FUNCTIONS ||
110 zio_compress_table[ctype].ci_compress == NULL) {
111 fprintf(stderr, "Invalid compression type %s.\n",
112 argv[0]);
113 exit(2);
117 if (isatty(STDIN_FILENO)) {
118 (void) fprintf(stderr,
119 "Error: The send stream is a binary format "
120 "and can not be read from a\n"
121 "terminal. Standard input must be redirected.\n");
122 exit(1);
125 abd_init();
126 fletcher_4_init();
127 zio_init();
128 zstd_init();
129 int begin = 0;
130 boolean_t seen = B_FALSE;
131 while (sfread(drr, sizeof (*drr), stdin) != 0) {
132 struct drr_write *drrw;
133 uint64_t payload_size = 0;
136 * We need to regenerate the checksum.
138 if (drr->drr_type != DRR_BEGIN) {
139 memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
140 sizeof (drr->drr_u.drr_checksum.drr_checksum));
144 switch (drr->drr_type) {
145 case DRR_BEGIN:
147 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
148 VERIFY0(begin++);
149 seen = B_TRUE;
151 uint32_t sz = drr->drr_payloadlen;
153 VERIFY3U(sz, <=, 1U << 28);
155 if (sz != 0) {
156 if (sz > bufsz) {
157 buf = realloc(buf, sz);
158 if (buf == NULL)
159 err(1, "realloc");
160 bufsz = sz;
162 (void) sfread(buf, sz, stdin);
164 payload_size = sz;
165 break;
167 case DRR_END:
169 struct drr_end *drre = &drr->drr_u.drr_end;
171 * We would prefer to just check --begin == 0, but
172 * replication streams have an end of stream END
173 * record, so we must avoid tripping it.
175 VERIFY3B(seen, ==, B_TRUE);
176 begin--;
178 * Use the recalculated checksum, unless this is
179 * the END record of a stream package, which has
180 * no checksum.
182 if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
183 drre->drr_checksum = stream_cksum;
184 break;
187 case DRR_OBJECT:
189 struct drr_object *drro = &drr->drr_u.drr_object;
190 VERIFY3S(begin, ==, 1);
192 if (drro->drr_bonuslen > 0) {
193 payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
194 (void) sfread(buf, payload_size, stdin);
196 break;
199 case DRR_SPILL:
201 struct drr_spill *drrs = &drr->drr_u.drr_spill;
202 VERIFY3S(begin, ==, 1);
203 payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
204 (void) sfread(buf, payload_size, stdin);
205 break;
208 case DRR_WRITE_BYREF:
209 VERIFY3S(begin, ==, 1);
210 fprintf(stderr,
211 "Deduplicated streams are not supported\n");
212 exit(1);
213 break;
215 case DRR_WRITE:
217 VERIFY3S(begin, ==, 1);
218 drrw = &thedrr.drr_u.drr_write;
219 payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
221 * In order to recompress an encrypted block, you have
222 * to decrypt, decompress, recompress, and
223 * re-encrypt. That can be a future enhancement (along
224 * with decryption or re-encryption), but for now we
225 * skip encrypted blocks.
227 boolean_t encrypted = B_FALSE;
228 for (int i = 0; i < ZIO_DATA_SALT_LEN; i++) {
229 if (drrw->drr_salt[i] != 0) {
230 encrypted = B_TRUE;
231 break;
234 if (encrypted) {
235 (void) sfread(buf, payload_size, stdin);
236 break;
238 enum zio_compress dtype = drrw->drr_compressiontype;
239 if (dtype >= ZIO_COMPRESS_FUNCTIONS) {
240 fprintf(stderr, "Invalid compression type in "
241 "stream: %d\n", dtype);
242 exit(3);
244 if (zio_compress_table[dtype].ci_decompress == NULL)
245 dtype = ZIO_COMPRESS_OFF;
247 /* Set up buffers to minimize memcpys */
248 char *cbuf, *dbuf;
249 if (ctype == ZIO_COMPRESS_OFF)
250 dbuf = buf;
251 else
252 dbuf = safe_calloc(bufsz);
254 if (dtype == ZIO_COMPRESS_OFF)
255 cbuf = dbuf;
256 else
257 cbuf = safe_calloc(payload_size);
259 /* Read and decompress the payload */
260 (void) sfread(cbuf, payload_size, stdin);
261 if (dtype != ZIO_COMPRESS_OFF) {
262 abd_t cabd, dabd;
263 abd_get_from_buf_struct(&cabd,
264 cbuf, payload_size);
265 abd_get_from_buf_struct(&dabd, dbuf,
266 MIN(bufsz, drrw->drr_logical_size));
267 if (zio_decompress_data(dtype, &cabd, &dabd,
268 payload_size, abd_get_size(&dabd),
269 NULL) != 0) {
270 warnx("decompression type %d failed "
271 "for ino %llu offset %llu",
272 dtype,
273 (u_longlong_t)drrw->drr_object,
274 (u_longlong_t)drrw->drr_offset);
275 exit(4);
277 payload_size = drrw->drr_logical_size;
278 abd_free(&dabd);
279 abd_free(&cabd);
280 free(cbuf);
283 /* Recompress the payload */
284 if (ctype != ZIO_COMPRESS_OFF) {
285 abd_t dabd, abd;
286 abd_get_from_buf_struct(&dabd,
287 dbuf, drrw->drr_logical_size);
288 abd_t *pabd =
289 abd_get_from_buf_struct(&abd, buf, bufsz);
290 size_t csize = zio_compress_data(ctype, &dabd,
291 &pabd, drrw->drr_logical_size,
292 drrw->drr_logical_size, level);
293 size_t rounded =
294 P2ROUNDUP(csize, SPA_MINBLOCKSIZE);
295 if (rounded >= drrw->drr_logical_size) {
296 memcpy(buf, dbuf, payload_size);
297 drrw->drr_compressiontype = 0;
298 drrw->drr_compressed_size = 0;
299 } else {
300 abd_zero_off(pabd, csize,
301 rounded - csize);
302 drrw->drr_compressiontype = ctype;
303 drrw->drr_compressed_size =
304 payload_size = rounded;
306 abd_free(&abd);
307 abd_free(&dabd);
308 free(dbuf);
309 } else {
310 drrw->drr_compressiontype = 0;
311 drrw->drr_compressed_size = 0;
313 break;
316 case DRR_WRITE_EMBEDDED:
318 struct drr_write_embedded *drrwe =
319 &drr->drr_u.drr_write_embedded;
320 VERIFY3S(begin, ==, 1);
321 payload_size =
322 P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
323 (void) sfread(buf, payload_size, stdin);
324 break;
327 case DRR_FREEOBJECTS:
328 case DRR_FREE:
329 case DRR_OBJECT_RANGE:
330 VERIFY3S(begin, ==, 1);
331 break;
333 default:
334 (void) fprintf(stderr, "INVALID record type 0x%x\n",
335 drr->drr_type);
336 /* should never happen, so assert */
337 assert(B_FALSE);
340 if (feof(stdout)) {
341 fprintf(stderr, "Error: unexpected end-of-file\n");
342 exit(1);
344 if (ferror(stdout)) {
345 fprintf(stderr, "Error while reading file: %s\n",
346 strerror(errno));
347 exit(1);
351 * We need to recalculate the checksum, and it needs to be
352 * initially zero to do that. BEGIN records don't have
353 * a checksum.
355 if (drr->drr_type != DRR_BEGIN) {
356 memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
357 sizeof (drr->drr_u.drr_checksum.drr_checksum));
359 if (dump_record(drr, buf, payload_size,
360 &stream_cksum, STDOUT_FILENO) != 0)
361 break;
362 if (drr->drr_type == DRR_END) {
364 * Typically the END record is either the last
365 * thing in the stream, or it is followed
366 * by a BEGIN record (which also zeros the checksum).
367 * However, a stream package ends with two END
368 * records. The last END record's checksum starts
369 * from zero.
371 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
374 free(buf);
375 fletcher_4_fini();
376 zio_fini();
377 zstd_fini();
378 abd_fini();
380 return (0);