4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
17 * Copyright (c) 2020 by Delphix. All rights reserved.
33 #include <sys/debug.h>
35 #include <sys/zfs_ioctl.h>
36 #include <sys/zio_checksum.h>
37 #include "zfs_fletcher.h"
41 #define MAX_RDT_PHYSMEM_PERCENT 20
42 #define SMALLEST_POSSIBLE_MAX_RDT_MB 128
44 typedef struct redup_entry
{
45 struct redup_entry
*rde_next
;
49 uint64_t rde_stream_offset
;
52 typedef struct redup_table
{
53 redup_entry_t
**redup_hash_array
;
54 umem_cache_t
*ddecache
;
65 return (NBBY
* sizeof (uint64_t) - __builtin_clzll(i
));
71 void *rv
= calloc(1, n
);
74 "Error: could not allocate %u bytes of memory\n",
82 * Safe version of fread(), exits on error.
85 sfread(void *buf
, size_t size
, FILE *fp
)
87 int rv
= fread(buf
, size
, 1, fp
);
88 if (rv
== 0 && ferror(fp
)) {
89 (void) fprintf(stderr
, "Error while reading file: %s\n",
97 * Safe version of pread(), exits on error.
100 spread(int fd
, void *buf
, size_t count
, off_t offset
)
102 ssize_t err
= pread(fd
, buf
, count
, offset
);
104 (void) fprintf(stderr
,
105 "Error while reading file: %s\n",
108 } else if (err
!= count
) {
109 (void) fprintf(stderr
,
110 "Error while reading file: short read\n");
116 dump_record(dmu_replay_record_t
*drr
, void *payload
, int payload_len
,
117 zio_cksum_t
*zc
, int outfd
)
119 assert(offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
)
120 == sizeof (dmu_replay_record_t
) - sizeof (zio_cksum_t
));
121 fletcher_4_incremental_native(drr
,
122 offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
), zc
);
123 if (drr
->drr_type
!= DRR_BEGIN
) {
124 assert(ZIO_CHECKSUM_IS_ZERO(&drr
->drr_u
.
125 drr_checksum
.drr_checksum
));
126 drr
->drr_u
.drr_checksum
.drr_checksum
= *zc
;
128 fletcher_4_incremental_native(&drr
->drr_u
.drr_checksum
.drr_checksum
,
129 sizeof (zio_cksum_t
), zc
);
130 if (write(outfd
, drr
, sizeof (*drr
)) == -1)
132 if (payload_len
!= 0) {
133 fletcher_4_incremental_native(payload
, payload_len
, zc
);
134 if (write(outfd
, payload
, payload_len
) == -1)
141 rdt_insert(redup_table_t
*rdt
,
142 uint64_t guid
, uint64_t object
, uint64_t offset
, uint64_t stream_offset
)
144 uint64_t ch
= cityhash4(guid
, object
, offset
, 0);
145 uint64_t hashcode
= BF64_GET(ch
, 0, rdt
->numhashbits
);
146 redup_entry_t
**rdepp
;
148 rdepp
= &(rdt
->redup_hash_array
[hashcode
]);
149 redup_entry_t
*rde
= umem_cache_alloc(rdt
->ddecache
, UMEM_NOFAIL
);
150 rde
->rde_next
= *rdepp
;
151 rde
->rde_guid
= guid
;
152 rde
->rde_object
= object
;
153 rde
->rde_offset
= offset
;
154 rde
->rde_stream_offset
= stream_offset
;
160 rdt_lookup(redup_table_t
*rdt
,
161 uint64_t guid
, uint64_t object
, uint64_t offset
,
162 uint64_t *stream_offsetp
)
164 uint64_t ch
= cityhash4(guid
, object
, offset
, 0);
165 uint64_t hashcode
= BF64_GET(ch
, 0, rdt
->numhashbits
);
167 for (redup_entry_t
*rde
= rdt
->redup_hash_array
[hashcode
];
168 rde
!= NULL
; rde
= rde
->rde_next
) {
169 if (rde
->rde_guid
== guid
&&
170 rde
->rde_object
== object
&&
171 rde
->rde_offset
== offset
) {
172 *stream_offsetp
= rde
->rde_stream_offset
;
176 assert(!"could not find expected redup table entry");
180 * Convert a dedup stream (generated by "zfs send -D") to a
181 * non-deduplicated stream. The entire infd will be converted, including
182 * any substreams in a stream package (generated by "zfs send -RD"). The
183 * infd must be seekable.
186 zfs_redup_stream(int infd
, int outfd
, boolean_t verbose
)
188 int bufsz
= SPA_MAXBLOCKSIZE
;
189 dmu_replay_record_t thedrr
= { 0 };
190 dmu_replay_record_t
*drr
= &thedrr
;
192 zio_cksum_t stream_cksum
;
194 uint64_t num_records
= 0;
195 uint64_t num_write_byref_records
= 0;
198 uint64_t max_rde_size
= SMALLEST_POSSIBLE_MAX_RDT_MB
<< 20;
200 uint64_t physmem
= sysconf(_SC_PHYS_PAGES
) * sysconf(_SC_PAGESIZE
);
201 uint64_t max_rde_size
=
202 MAX((physmem
* MAX_RDT_PHYSMEM_PERCENT
) / 100,
203 SMALLEST_POSSIBLE_MAX_RDT_MB
<< 20);
206 numbuckets
= max_rde_size
/ (sizeof (redup_entry_t
));
209 * numbuckets must be a power of 2. Increase number to
210 * a power of 2 if necessary.
212 if (!ISP2(numbuckets
))
213 numbuckets
= 1ULL << highbit64(numbuckets
);
215 rdt
.redup_hash_array
=
216 safe_calloc(numbuckets
* sizeof (redup_entry_t
*));
217 rdt
.ddecache
= umem_cache_create("rde", sizeof (redup_entry_t
), 0,
218 NULL
, NULL
, NULL
, NULL
, NULL
, 0);
219 rdt
.numhashbits
= highbit64(numbuckets
) - 1;
222 char *buf
= safe_calloc(bufsz
);
223 FILE *ofp
= fdopen(infd
, "r");
224 long offset
= ftell(ofp
);
226 boolean_t seen
= B_FALSE
;
227 while (sfread(drr
, sizeof (*drr
), ofp
) != 0) {
231 * We need to regenerate the checksum.
233 if (drr
->drr_type
!= DRR_BEGIN
) {
234 memset(&drr
->drr_u
.drr_checksum
.drr_checksum
, 0,
235 sizeof (drr
->drr_u
.drr_checksum
.drr_checksum
));
238 uint64_t payload_size
= 0;
239 switch (drr
->drr_type
) {
242 struct drr_begin
*drrb
= &drr
->drr_u
.drr_begin
;
244 ZIO_SET_CHECKSUM(&stream_cksum
, 0, 0, 0, 0);
248 assert(drrb
->drr_magic
== DMU_BACKUP_MAGIC
);
250 /* clear the DEDUP feature flag for this stream */
251 fflags
= DMU_GET_FEATUREFLAGS(drrb
->drr_versioninfo
);
252 fflags
&= ~(DMU_BACKUP_FEATURE_DEDUP
|
253 DMU_BACKUP_FEATURE_DEDUPPROPS
);
254 /* cppcheck-suppress syntaxError */
255 DMU_SET_FEATUREFLAGS(drrb
->drr_versioninfo
, fflags
);
257 uint32_t sz
= drr
->drr_payloadlen
;
259 VERIFY3U(sz
, <=, 1U << 28);
264 buf
= safe_calloc(sz
);
267 (void) sfread(buf
, sz
, ofp
);
275 struct drr_end
*drre
= &drr
->drr_u
.drr_end
;
277 * We would prefer to just check --begin == 0, but
278 * replication streams have an end of stream END
279 * record, so we must avoid tripping it.
281 VERIFY3B(seen
, ==, B_TRUE
);
284 * Use the recalculated checksum, unless this is
285 * the END record of a stream package, which has
288 if (!ZIO_CHECKSUM_IS_ZERO(&drre
->drr_checksum
))
289 drre
->drr_checksum
= stream_cksum
;
295 struct drr_object
*drro
= &drr
->drr_u
.drr_object
;
296 VERIFY3S(begin
, ==, 1);
298 if (drro
->drr_bonuslen
> 0) {
299 payload_size
= DRR_OBJECT_PAYLOAD_SIZE(drro
);
300 (void) sfread(buf
, payload_size
, ofp
);
307 struct drr_spill
*drrs
= &drr
->drr_u
.drr_spill
;
308 VERIFY3S(begin
, ==, 1);
309 payload_size
= DRR_SPILL_PAYLOAD_SIZE(drrs
);
310 (void) sfread(buf
, payload_size
, ofp
);
314 case DRR_WRITE_BYREF
:
316 struct drr_write_byref drrwb
=
317 drr
->drr_u
.drr_write_byref
;
318 VERIFY3S(begin
, ==, 1);
320 num_write_byref_records
++;
323 * Look up in hash table by drrwb->drr_refguid,
324 * drr_refobject, drr_refoffset. Replace this
325 * record with the found WRITE record, but with
326 * drr_object,drr_offset,drr_toguid replaced with ours.
328 uint64_t stream_offset
= 0;
329 rdt_lookup(&rdt
, drrwb
.drr_refguid
,
330 drrwb
.drr_refobject
, drrwb
.drr_refoffset
,
333 spread(infd
, drr
, sizeof (*drr
), stream_offset
);
335 assert(drr
->drr_type
== DRR_WRITE
);
336 struct drr_write
*drrw
= &drr
->drr_u
.drr_write
;
337 assert(drrw
->drr_toguid
== drrwb
.drr_refguid
);
338 assert(drrw
->drr_object
== drrwb
.drr_refobject
);
339 assert(drrw
->drr_offset
== drrwb
.drr_refoffset
);
341 payload_size
= DRR_WRITE_PAYLOAD_SIZE(drrw
);
342 spread(infd
, buf
, payload_size
,
343 stream_offset
+ sizeof (*drr
));
345 drrw
->drr_toguid
= drrwb
.drr_toguid
;
346 drrw
->drr_object
= drrwb
.drr_object
;
347 drrw
->drr_offset
= drrwb
.drr_offset
;
353 struct drr_write
*drrw
= &drr
->drr_u
.drr_write
;
354 VERIFY3S(begin
, ==, 1);
355 payload_size
= DRR_WRITE_PAYLOAD_SIZE(drrw
);
356 (void) sfread(buf
, payload_size
, ofp
);
358 rdt_insert(&rdt
, drrw
->drr_toguid
,
359 drrw
->drr_object
, drrw
->drr_offset
, offset
);
363 case DRR_WRITE_EMBEDDED
:
365 struct drr_write_embedded
*drrwe
=
366 &drr
->drr_u
.drr_write_embedded
;
367 VERIFY3S(begin
, ==, 1);
369 P2ROUNDUP((uint64_t)drrwe
->drr_psize
, 8);
370 (void) sfread(buf
, payload_size
, ofp
);
374 case DRR_FREEOBJECTS
:
376 case DRR_OBJECT_RANGE
:
377 VERIFY3S(begin
, ==, 1);
381 (void) fprintf(stderr
, "INVALID record type 0x%x\n",
383 /* should never happen, so assert */
388 fprintf(stderr
, "Error: unexpected end-of-file\n");
392 fprintf(stderr
, "Error while reading file: %s\n",
398 * We need to recalculate the checksum, and it needs to be
399 * initially zero to do that. BEGIN records don't have
402 if (drr
->drr_type
!= DRR_BEGIN
) {
403 memset(&drr
->drr_u
.drr_checksum
.drr_checksum
, 0,
404 sizeof (drr
->drr_u
.drr_checksum
.drr_checksum
));
406 if (dump_record(drr
, buf
, payload_size
,
407 &stream_cksum
, outfd
) != 0)
409 if (drr
->drr_type
== DRR_END
) {
411 * Typically the END record is either the last
412 * thing in the stream, or it is followed
413 * by a BEGIN record (which also zeros the checksum).
414 * However, a stream package ends with two END
415 * records. The last END record's checksum starts
418 ZIO_SET_CHECKSUM(&stream_cksum
, 0, 0, 0, 0);
425 zfs_nicenum(rdt
.ddt_count
* sizeof (redup_entry_t
),
426 mem_str
, sizeof (mem_str
));
427 fprintf(stderr
, "converted stream with %llu total records, "
428 "including %llu dedup records, using %sB memory.\n",
429 (long long)num_records
,
430 (long long)num_write_byref_records
,
434 umem_cache_destroy(rdt
.ddecache
);
435 free(rdt
.redup_hash_array
);
441 zstream_do_redup(int argc
, char *argv
[])
443 boolean_t verbose
= B_FALSE
;
446 while ((c
= getopt(argc
, argv
, "v")) != -1) {
452 (void) fprintf(stderr
, "invalid option '%c'\n",
465 const char *filename
= argv
[0];
467 if (isatty(STDOUT_FILENO
)) {
468 (void) fprintf(stderr
,
469 "Error: Stream can not be written to a terminal.\n"
470 "You must redirect standard output.\n");
474 int fd
= open(filename
, O_RDONLY
);
476 (void) fprintf(stderr
,
477 "Error while opening file '%s': %s\n",
478 filename
, strerror(errno
));
483 zfs_redup_stream(fd
, STDOUT_FILENO
, verbose
);