4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
17 * Copyright (c) 2020 by Delphix. All rights reserved.
33 #include <sys/debug.h>
35 #include <sys/zfs_ioctl.h>
36 #include <sys/zio_checksum.h>
37 #include "zfs_fletcher.h"
41 #define MAX_RDT_PHYSMEM_PERCENT 20
42 #define SMALLEST_POSSIBLE_MAX_RDT_MB 128
44 typedef struct redup_entry
{
45 struct redup_entry
*rde_next
;
49 uint64_t rde_stream_offset
;
52 typedef struct redup_table
{
53 redup_entry_t
**redup_hash_array
;
54 umem_cache_t
*ddecache
;
65 return (NBBY
* sizeof (uint64_t) - __builtin_clzll(i
));
71 void *rv
= calloc(1, n
);
74 "Error: could not allocate %u bytes of memory\n",
82 * Safe version of fread(), exits on error.
85 sfread(void *buf
, size_t size
, FILE *fp
)
87 int rv
= fread(buf
, size
, 1, fp
);
88 if (rv
== 0 && ferror(fp
)) {
89 (void) fprintf(stderr
, "Error while reading file: %s\n",
97 * Safe version of pread(), exits on error.
100 spread(int fd
, void *buf
, size_t count
, off_t offset
)
102 ssize_t err
= pread(fd
, buf
, count
, offset
);
104 (void) fprintf(stderr
,
105 "Error while reading file: %s\n",
108 } else if (err
!= count
) {
109 (void) fprintf(stderr
,
110 "Error while reading file: short read\n");
116 dump_record(dmu_replay_record_t
*drr
, void *payload
, int payload_len
,
117 zio_cksum_t
*zc
, int outfd
)
119 assert(offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
)
120 == sizeof (dmu_replay_record_t
) - sizeof (zio_cksum_t
));
121 fletcher_4_incremental_native(drr
,
122 offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
), zc
);
123 if (drr
->drr_type
!= DRR_BEGIN
) {
124 assert(ZIO_CHECKSUM_IS_ZERO(&drr
->drr_u
.
125 drr_checksum
.drr_checksum
));
126 drr
->drr_u
.drr_checksum
.drr_checksum
= *zc
;
128 fletcher_4_incremental_native(&drr
->drr_u
.drr_checksum
.drr_checksum
,
129 sizeof (zio_cksum_t
), zc
);
130 if (write(outfd
, drr
, sizeof (*drr
)) == -1)
132 if (payload_len
!= 0) {
133 fletcher_4_incremental_native(payload
, payload_len
, zc
);
134 if (write(outfd
, payload
, payload_len
) == -1)
141 rdt_insert(redup_table_t
*rdt
,
142 uint64_t guid
, uint64_t object
, uint64_t offset
, uint64_t stream_offset
)
144 uint64_t ch
= cityhash4(guid
, object
, offset
, 0);
145 uint64_t hashcode
= BF64_GET(ch
, 0, rdt
->numhashbits
);
146 redup_entry_t
**rdepp
;
148 rdepp
= &(rdt
->redup_hash_array
[hashcode
]);
149 redup_entry_t
*rde
= umem_cache_alloc(rdt
->ddecache
, UMEM_NOFAIL
);
150 rde
->rde_next
= *rdepp
;
151 rde
->rde_guid
= guid
;
152 rde
->rde_object
= object
;
153 rde
->rde_offset
= offset
;
154 rde
->rde_stream_offset
= stream_offset
;
160 rdt_lookup(redup_table_t
*rdt
,
161 uint64_t guid
, uint64_t object
, uint64_t offset
,
162 uint64_t *stream_offsetp
)
164 uint64_t ch
= cityhash4(guid
, object
, offset
, 0);
165 uint64_t hashcode
= BF64_GET(ch
, 0, rdt
->numhashbits
);
167 for (redup_entry_t
*rde
= rdt
->redup_hash_array
[hashcode
];
168 rde
!= NULL
; rde
= rde
->rde_next
) {
169 if (rde
->rde_guid
== guid
&&
170 rde
->rde_object
== object
&&
171 rde
->rde_offset
== offset
) {
172 *stream_offsetp
= rde
->rde_stream_offset
;
176 assert(!"could not find expected redup table entry");
180 * Convert a dedup stream (generated by "zfs send -D") to a
181 * non-deduplicated stream. The entire infd will be converted, including
182 * any substreams in a stream package (generated by "zfs send -RD"). The
183 * infd must be seekable.
186 zfs_redup_stream(int infd
, int outfd
, boolean_t verbose
)
188 int bufsz
= SPA_MAXBLOCKSIZE
;
189 dmu_replay_record_t thedrr
= { 0 };
190 dmu_replay_record_t
*drr
= &thedrr
;
192 zio_cksum_t stream_cksum
;
194 uint64_t num_records
= 0;
195 uint64_t num_write_byref_records
= 0;
198 uint64_t max_rde_size
= SMALLEST_POSSIBLE_MAX_RDT_MB
<< 20;
200 uint64_t physmem
= sysconf(_SC_PHYS_PAGES
) * sysconf(_SC_PAGESIZE
);
201 uint64_t max_rde_size
=
202 MAX((physmem
* MAX_RDT_PHYSMEM_PERCENT
) / 100,
203 SMALLEST_POSSIBLE_MAX_RDT_MB
<< 20);
206 numbuckets
= max_rde_size
/ (sizeof (redup_entry_t
));
209 * numbuckets must be a power of 2. Increase number to
210 * a power of 2 if necessary.
212 if (!ISP2(numbuckets
))
213 numbuckets
= 1ULL << highbit64(numbuckets
);
215 rdt
.redup_hash_array
=
216 safe_calloc(numbuckets
* sizeof (redup_entry_t
*));
217 rdt
.ddecache
= umem_cache_create("rde", sizeof (redup_entry_t
), 0,
218 NULL
, NULL
, NULL
, NULL
, NULL
, 0);
219 rdt
.numhashbits
= highbit64(numbuckets
) - 1;
222 char *buf
= safe_calloc(bufsz
);
223 FILE *ofp
= fdopen(infd
, "r");
224 long offset
= ftell(ofp
);
225 while (sfread(drr
, sizeof (*drr
), ofp
) != 0) {
229 * We need to regenerate the checksum.
231 if (drr
->drr_type
!= DRR_BEGIN
) {
232 memset(&drr
->drr_u
.drr_checksum
.drr_checksum
, 0,
233 sizeof (drr
->drr_u
.drr_checksum
.drr_checksum
));
236 uint64_t payload_size
= 0;
237 switch (drr
->drr_type
) {
240 struct drr_begin
*drrb
= &drr
->drr_u
.drr_begin
;
242 ZIO_SET_CHECKSUM(&stream_cksum
, 0, 0, 0, 0);
244 assert(drrb
->drr_magic
== DMU_BACKUP_MAGIC
);
246 /* clear the DEDUP feature flag for this stream */
247 fflags
= DMU_GET_FEATUREFLAGS(drrb
->drr_versioninfo
);
248 fflags
&= ~(DMU_BACKUP_FEATURE_DEDUP
|
249 DMU_BACKUP_FEATURE_DEDUPPROPS
);
250 /* cppcheck-suppress syntaxError */
251 DMU_SET_FEATUREFLAGS(drrb
->drr_versioninfo
, fflags
);
253 int sz
= drr
->drr_payloadlen
;
257 buf
= safe_calloc(sz
);
260 (void) sfread(buf
, sz
, ofp
);
268 struct drr_end
*drre
= &drr
->drr_u
.drr_end
;
270 * Use the recalculated checksum, unless this is
271 * the END record of a stream package, which has
274 if (!ZIO_CHECKSUM_IS_ZERO(&drre
->drr_checksum
))
275 drre
->drr_checksum
= stream_cksum
;
281 struct drr_object
*drro
= &drr
->drr_u
.drr_object
;
283 if (drro
->drr_bonuslen
> 0) {
284 payload_size
= DRR_OBJECT_PAYLOAD_SIZE(drro
);
285 (void) sfread(buf
, payload_size
, ofp
);
292 struct drr_spill
*drrs
= &drr
->drr_u
.drr_spill
;
293 payload_size
= DRR_SPILL_PAYLOAD_SIZE(drrs
);
294 (void) sfread(buf
, payload_size
, ofp
);
298 case DRR_WRITE_BYREF
:
300 struct drr_write_byref drrwb
=
301 drr
->drr_u
.drr_write_byref
;
303 num_write_byref_records
++;
306 * Look up in hash table by drrwb->drr_refguid,
307 * drr_refobject, drr_refoffset. Replace this
308 * record with the found WRITE record, but with
309 * drr_object,drr_offset,drr_toguid replaced with ours.
311 uint64_t stream_offset
= 0;
312 rdt_lookup(&rdt
, drrwb
.drr_refguid
,
313 drrwb
.drr_refobject
, drrwb
.drr_refoffset
,
316 spread(infd
, drr
, sizeof (*drr
), stream_offset
);
318 assert(drr
->drr_type
== DRR_WRITE
);
319 struct drr_write
*drrw
= &drr
->drr_u
.drr_write
;
320 assert(drrw
->drr_toguid
== drrwb
.drr_refguid
);
321 assert(drrw
->drr_object
== drrwb
.drr_refobject
);
322 assert(drrw
->drr_offset
== drrwb
.drr_refoffset
);
324 payload_size
= DRR_WRITE_PAYLOAD_SIZE(drrw
);
325 spread(infd
, buf
, payload_size
,
326 stream_offset
+ sizeof (*drr
));
328 drrw
->drr_toguid
= drrwb
.drr_toguid
;
329 drrw
->drr_object
= drrwb
.drr_object
;
330 drrw
->drr_offset
= drrwb
.drr_offset
;
336 struct drr_write
*drrw
= &drr
->drr_u
.drr_write
;
337 payload_size
= DRR_WRITE_PAYLOAD_SIZE(drrw
);
338 (void) sfread(buf
, payload_size
, ofp
);
340 rdt_insert(&rdt
, drrw
->drr_toguid
,
341 drrw
->drr_object
, drrw
->drr_offset
, offset
);
345 case DRR_WRITE_EMBEDDED
:
347 struct drr_write_embedded
*drrwe
=
348 &drr
->drr_u
.drr_write_embedded
;
350 P2ROUNDUP((uint64_t)drrwe
->drr_psize
, 8);
351 (void) sfread(buf
, payload_size
, ofp
);
355 case DRR_FREEOBJECTS
:
357 case DRR_OBJECT_RANGE
:
361 (void) fprintf(stderr
, "INVALID record type 0x%x\n",
363 /* should never happen, so assert */
368 fprintf(stderr
, "Error: unexpected end-of-file\n");
372 fprintf(stderr
, "Error while reading file: %s\n",
378 * We need to recalculate the checksum, and it needs to be
379 * initially zero to do that. BEGIN records don't have
382 if (drr
->drr_type
!= DRR_BEGIN
) {
383 memset(&drr
->drr_u
.drr_checksum
.drr_checksum
, 0,
384 sizeof (drr
->drr_u
.drr_checksum
.drr_checksum
));
386 if (dump_record(drr
, buf
, payload_size
,
387 &stream_cksum
, outfd
) != 0)
389 if (drr
->drr_type
== DRR_END
) {
391 * Typically the END record is either the last
392 * thing in the stream, or it is followed
393 * by a BEGIN record (which also zeros the checksum).
394 * However, a stream package ends with two END
395 * records. The last END record's checksum starts
398 ZIO_SET_CHECKSUM(&stream_cksum
, 0, 0, 0, 0);
405 zfs_nicenum(rdt
.ddt_count
* sizeof (redup_entry_t
),
406 mem_str
, sizeof (mem_str
));
407 fprintf(stderr
, "converted stream with %llu total records, "
408 "including %llu dedup records, using %sB memory.\n",
409 (long long)num_records
,
410 (long long)num_write_byref_records
,
414 umem_cache_destroy(rdt
.ddecache
);
415 free(rdt
.redup_hash_array
);
421 zstream_do_redup(int argc
, char *argv
[])
423 boolean_t verbose
= B_FALSE
;
426 while ((c
= getopt(argc
, argv
, "v")) != -1) {
432 (void) fprintf(stderr
, "invalid option '%c'\n",
445 const char *filename
= argv
[0];
447 if (isatty(STDOUT_FILENO
)) {
448 (void) fprintf(stderr
,
449 "Error: Stream can not be written to a terminal.\n"
450 "You must redirect standard output.\n");
454 int fd
= open(filename
, O_RDONLY
);
456 (void) fprintf(stderr
,
457 "Error while opening file '%s': %s\n",
458 filename
, strerror(errno
));
463 zfs_redup_stream(fd
, STDOUT_FILENO
, verbose
);