4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
17 * Copyright (c) 2020 by Delphix. All rights reserved.
25 #include <libzfs_impl.h>
34 #include <sys/debug.h>
36 #include <sys/zfs_ioctl.h>
37 #include <sys/zio_checksum.h>
38 #include "zfs_fletcher.h"
42 #define MAX_RDT_PHYSMEM_PERCENT 20
43 #define SMALLEST_POSSIBLE_MAX_RDT_MB 128
45 typedef struct redup_entry
{
46 struct redup_entry
*rde_next
;
50 uint64_t rde_stream_offset
;
53 typedef struct redup_table
{
54 redup_entry_t
**redup_hash_array
;
55 umem_cache_t
*ddecache
;
66 return (NBBY
* sizeof (uint64_t) - __builtin_clzll(i
));
72 void *rv
= calloc(1, n
);
75 "Error: could not allocate %u bytes of memory\n",
83 * Safe version of fread(), exits on error.
86 sfread(void *buf
, size_t size
, FILE *fp
)
88 int rv
= fread(buf
, size
, 1, fp
);
89 if (rv
== 0 && ferror(fp
)) {
90 (void) fprintf(stderr
, "Error while reading file: %s\n",
98 * Safe version of pread(), exits on error.
101 spread(int fd
, void *buf
, size_t count
, off_t offset
)
103 ssize_t err
= pread(fd
, buf
, count
, offset
);
105 (void) fprintf(stderr
,
106 "Error while reading file: %s\n",
109 } else if (err
!= count
) {
110 (void) fprintf(stderr
,
111 "Error while reading file: short read\n");
117 dump_record(dmu_replay_record_t
*drr
, void *payload
, int payload_len
,
118 zio_cksum_t
*zc
, int outfd
)
120 assert(offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
)
121 == sizeof (dmu_replay_record_t
) - sizeof (zio_cksum_t
));
122 fletcher_4_incremental_native(drr
,
123 offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
), zc
);
124 if (drr
->drr_type
!= DRR_BEGIN
) {
125 assert(ZIO_CHECKSUM_IS_ZERO(&drr
->drr_u
.
126 drr_checksum
.drr_checksum
));
127 drr
->drr_u
.drr_checksum
.drr_checksum
= *zc
;
129 fletcher_4_incremental_native(&drr
->drr_u
.drr_checksum
.drr_checksum
,
130 sizeof (zio_cksum_t
), zc
);
131 if (write(outfd
, drr
, sizeof (*drr
)) == -1)
133 if (payload_len
!= 0) {
134 fletcher_4_incremental_native(payload
, payload_len
, zc
);
135 if (write(outfd
, payload
, payload_len
) == -1)
142 rdt_insert(redup_table_t
*rdt
,
143 uint64_t guid
, uint64_t object
, uint64_t offset
, uint64_t stream_offset
)
145 uint64_t ch
= cityhash4(guid
, object
, offset
, 0);
146 uint64_t hashcode
= BF64_GET(ch
, 0, rdt
->numhashbits
);
147 redup_entry_t
**rdepp
;
149 rdepp
= &(rdt
->redup_hash_array
[hashcode
]);
150 redup_entry_t
*rde
= umem_cache_alloc(rdt
->ddecache
, UMEM_NOFAIL
);
151 rde
->rde_next
= *rdepp
;
152 rde
->rde_guid
= guid
;
153 rde
->rde_object
= object
;
154 rde
->rde_offset
= offset
;
155 rde
->rde_stream_offset
= stream_offset
;
161 rdt_lookup(redup_table_t
*rdt
,
162 uint64_t guid
, uint64_t object
, uint64_t offset
,
163 uint64_t *stream_offsetp
)
165 uint64_t ch
= cityhash4(guid
, object
, offset
, 0);
166 uint64_t hashcode
= BF64_GET(ch
, 0, rdt
->numhashbits
);
168 for (redup_entry_t
*rde
= rdt
->redup_hash_array
[hashcode
];
169 rde
!= NULL
; rde
= rde
->rde_next
) {
170 if (rde
->rde_guid
== guid
&&
171 rde
->rde_object
== object
&&
172 rde
->rde_offset
== offset
) {
173 *stream_offsetp
= rde
->rde_stream_offset
;
177 assert(!"could not find expected redup table entry");
181 * Convert a dedup stream (generated by "zfs send -D") to a
182 * non-deduplicated stream. The entire infd will be converted, including
183 * any substreams in a stream package (generated by "zfs send -RD"). The
184 * infd must be seekable.
187 zfs_redup_stream(int infd
, int outfd
, boolean_t verbose
)
189 int bufsz
= SPA_MAXBLOCKSIZE
;
190 dmu_replay_record_t thedrr
= { 0 };
191 dmu_replay_record_t
*drr
= &thedrr
;
193 zio_cksum_t stream_cksum
;
195 uint64_t num_records
= 0;
196 uint64_t num_write_byref_records
= 0;
199 uint64_t max_rde_size
= SMALLEST_POSSIBLE_MAX_RDT_MB
<< 20;
201 uint64_t physmem
= sysconf(_SC_PHYS_PAGES
) * sysconf(_SC_PAGESIZE
);
202 uint64_t max_rde_size
=
203 MAX((physmem
* MAX_RDT_PHYSMEM_PERCENT
) / 100,
204 SMALLEST_POSSIBLE_MAX_RDT_MB
<< 20);
207 numbuckets
= max_rde_size
/ (sizeof (redup_entry_t
));
210 * numbuckets must be a power of 2. Increase number to
211 * a power of 2 if necessary.
213 if (!ISP2(numbuckets
))
214 numbuckets
= 1ULL << highbit64(numbuckets
);
216 rdt
.redup_hash_array
=
217 safe_calloc(numbuckets
* sizeof (redup_entry_t
*));
218 rdt
.ddecache
= umem_cache_create("rde", sizeof (redup_entry_t
), 0,
219 NULL
, NULL
, NULL
, NULL
, NULL
, 0);
220 rdt
.numhashbits
= highbit64(numbuckets
) - 1;
223 char *buf
= safe_calloc(bufsz
);
224 FILE *ofp
= fdopen(infd
, "r");
225 long offset
= ftell(ofp
);
226 while (sfread(drr
, sizeof (*drr
), ofp
) != 0) {
230 * We need to regenerate the checksum.
232 if (drr
->drr_type
!= DRR_BEGIN
) {
233 bzero(&drr
->drr_u
.drr_checksum
.drr_checksum
,
234 sizeof (drr
->drr_u
.drr_checksum
.drr_checksum
));
237 uint64_t payload_size
= 0;
238 switch (drr
->drr_type
) {
241 struct drr_begin
*drrb
= &drr
->drr_u
.drr_begin
;
243 ZIO_SET_CHECKSUM(&stream_cksum
, 0, 0, 0, 0);
245 assert(drrb
->drr_magic
== DMU_BACKUP_MAGIC
);
247 /* clear the DEDUP feature flag for this stream */
248 fflags
= DMU_GET_FEATUREFLAGS(drrb
->drr_versioninfo
);
249 fflags
&= ~(DMU_BACKUP_FEATURE_DEDUP
|
250 DMU_BACKUP_FEATURE_DEDUPPROPS
);
251 DMU_SET_FEATUREFLAGS(drrb
->drr_versioninfo
, fflags
);
253 int sz
= drr
->drr_payloadlen
;
257 buf
= safe_calloc(sz
);
260 (void) sfread(buf
, sz
, ofp
);
268 struct drr_end
*drre
= &drr
->drr_u
.drr_end
;
270 * Use the recalculated checksum, unless this is
271 * the END record of a stream package, which has
274 if (!ZIO_CHECKSUM_IS_ZERO(&drre
->drr_checksum
))
275 drre
->drr_checksum
= stream_cksum
;
281 struct drr_object
*drro
= &drr
->drr_u
.drr_object
;
283 if (drro
->drr_bonuslen
> 0) {
284 payload_size
= DRR_OBJECT_PAYLOAD_SIZE(drro
);
285 (void) sfread(buf
, payload_size
, ofp
);
292 struct drr_spill
*drrs
= &drr
->drr_u
.drr_spill
;
293 payload_size
= DRR_SPILL_PAYLOAD_SIZE(drrs
);
294 (void) sfread(buf
, payload_size
, ofp
);
298 case DRR_WRITE_BYREF
:
300 struct drr_write_byref drrwb
=
301 drr
->drr_u
.drr_write_byref
;
303 num_write_byref_records
++;
306 * Look up in hash table by drrwb->drr_refguid,
307 * drr_refobject, drr_refoffset. Replace this
308 * record with the found WRITE record, but with
309 * drr_object,drr_offset,drr_toguid replaced with ours.
311 uint64_t stream_offset
= 0;
312 rdt_lookup(&rdt
, drrwb
.drr_refguid
,
313 drrwb
.drr_refobject
, drrwb
.drr_refoffset
,
316 spread(infd
, drr
, sizeof (*drr
), stream_offset
);
318 assert(drr
->drr_type
== DRR_WRITE
);
319 struct drr_write
*drrw
= &drr
->drr_u
.drr_write
;
320 assert(drrw
->drr_toguid
== drrwb
.drr_refguid
);
321 assert(drrw
->drr_object
== drrwb
.drr_refobject
);
322 assert(drrw
->drr_offset
== drrwb
.drr_refoffset
);
324 payload_size
= DRR_WRITE_PAYLOAD_SIZE(drrw
);
325 spread(infd
, buf
, payload_size
,
326 stream_offset
+ sizeof (*drr
));
328 drrw
->drr_toguid
= drrwb
.drr_toguid
;
329 drrw
->drr_object
= drrwb
.drr_object
;
330 drrw
->drr_offset
= drrwb
.drr_offset
;
336 struct drr_write
*drrw
= &drr
->drr_u
.drr_write
;
337 payload_size
= DRR_WRITE_PAYLOAD_SIZE(drrw
);
338 (void) sfread(buf
, payload_size
, ofp
);
340 rdt_insert(&rdt
, drrw
->drr_toguid
,
341 drrw
->drr_object
, drrw
->drr_offset
, offset
);
345 case DRR_WRITE_EMBEDDED
:
347 struct drr_write_embedded
*drrwe
=
348 &drr
->drr_u
.drr_write_embedded
;
350 P2ROUNDUP((uint64_t)drrwe
->drr_psize
, 8);
351 (void) sfread(buf
, payload_size
, ofp
);
355 case DRR_FREEOBJECTS
:
357 case DRR_OBJECT_RANGE
:
361 (void) fprintf(stderr
, "INVALID record type 0x%x\n",
363 /* should never happen, so assert */
368 fprintf(stderr
, "Error: unexpected end-of-file\n");
372 fprintf(stderr
, "Error while reading file: %s\n",
378 * We need to recalculate the checksum, and it needs to be
379 * initially zero to do that. BEGIN records don't have
382 if (drr
->drr_type
!= DRR_BEGIN
) {
383 bzero(&drr
->drr_u
.drr_checksum
.drr_checksum
,
384 sizeof (drr
->drr_u
.drr_checksum
.drr_checksum
));
386 if (dump_record(drr
, buf
, payload_size
,
387 &stream_cksum
, outfd
) != 0)
389 if (drr
->drr_type
== DRR_END
) {
391 * Typically the END record is either the last
392 * thing in the stream, or it is followed
393 * by a BEGIN record (which also zeros the checksum).
394 * However, a stream package ends with two END
395 * records. The last END record's checksum starts
398 ZIO_SET_CHECKSUM(&stream_cksum
, 0, 0, 0, 0);
405 zfs_nicenum(rdt
.ddt_count
* sizeof (redup_entry_t
),
406 mem_str
, sizeof (mem_str
));
407 fprintf(stderr
, "converted stream with %llu total records, "
408 "including %llu dedup records, using %sB memory.\n",
409 (long long)num_records
,
410 (long long)num_write_byref_records
,
414 umem_cache_destroy(rdt
.ddecache
);
415 free(rdt
.redup_hash_array
);
421 zstream_do_redup(int argc
, char *argv
[])
423 boolean_t verbose
= B_FALSE
;
426 while ((c
= getopt(argc
, argv
, "v")) != -1) {
432 (void) fprintf(stderr
, "invalid option '%c'\n",
445 const char *filename
= argv
[0];
447 if (isatty(STDOUT_FILENO
)) {
448 (void) fprintf(stderr
,
449 "Error: Stream can not be written to a terminal.\n"
450 "You must redirect standard output.\n");
454 int fd
= open(filename
, O_RDONLY
);
456 (void) fprintf(stderr
,
457 "Error while opening file '%s': %s\n",
458 filename
, strerror(errno
));
463 zfs_redup_stream(fd
, STDOUT_FILENO
, verbose
);