4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
17 * Copyright (c) 2020 by Delphix. All rights reserved.
33 #include <sys/debug.h>
35 #include <sys/zfs_ioctl.h>
36 #include <sys/zio_checksum.h>
37 #include "zfs_fletcher.h"
41 #define MAX_RDT_PHYSMEM_PERCENT 20
42 #define SMALLEST_POSSIBLE_MAX_RDT_MB 128
44 typedef struct redup_entry
{
45 struct redup_entry
*rde_next
;
49 uint64_t rde_stream_offset
;
52 typedef struct redup_table
{
53 redup_entry_t
**redup_hash_array
;
54 umem_cache_t
*ddecache
;
62 void *rv
= calloc(1, n
);
65 "Error: could not allocate %u bytes of memory\n",
73 * Safe version of fread(), exits on error.
76 sfread(void *buf
, size_t size
, FILE *fp
)
78 int rv
= fread(buf
, size
, 1, fp
);
79 if (rv
== 0 && ferror(fp
)) {
80 (void) fprintf(stderr
, "Error while reading file: %s\n",
88 * Safe version of pread(), exits on error.
91 spread(int fd
, void *buf
, size_t count
, off_t offset
)
93 ssize_t err
= pread(fd
, buf
, count
, offset
);
95 (void) fprintf(stderr
,
96 "Error while reading file: %s\n",
99 } else if (err
!= count
) {
100 (void) fprintf(stderr
,
101 "Error while reading file: short read\n");
107 dump_record(dmu_replay_record_t
*drr
, void *payload
, int payload_len
,
108 zio_cksum_t
*zc
, int outfd
)
110 assert(offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
)
111 == sizeof (dmu_replay_record_t
) - sizeof (zio_cksum_t
));
112 fletcher_4_incremental_native(drr
,
113 offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
), zc
);
114 if (drr
->drr_type
!= DRR_BEGIN
) {
115 assert(ZIO_CHECKSUM_IS_ZERO(&drr
->drr_u
.
116 drr_checksum
.drr_checksum
));
117 drr
->drr_u
.drr_checksum
.drr_checksum
= *zc
;
119 fletcher_4_incremental_native(&drr
->drr_u
.drr_checksum
.drr_checksum
,
120 sizeof (zio_cksum_t
), zc
);
121 if (write(outfd
, drr
, sizeof (*drr
)) == -1)
123 if (payload_len
!= 0) {
124 fletcher_4_incremental_native(payload
, payload_len
, zc
);
125 if (write(outfd
, payload
, payload_len
) == -1)
132 rdt_insert(redup_table_t
*rdt
,
133 uint64_t guid
, uint64_t object
, uint64_t offset
, uint64_t stream_offset
)
135 uint64_t ch
= cityhash3(guid
, object
, offset
);
136 uint64_t hashcode
= BF64_GET(ch
, 0, rdt
->numhashbits
);
137 redup_entry_t
**rdepp
;
139 rdepp
= &(rdt
->redup_hash_array
[hashcode
]);
140 redup_entry_t
*rde
= umem_cache_alloc(rdt
->ddecache
, UMEM_NOFAIL
);
141 rde
->rde_next
= *rdepp
;
142 rde
->rde_guid
= guid
;
143 rde
->rde_object
= object
;
144 rde
->rde_offset
= offset
;
145 rde
->rde_stream_offset
= stream_offset
;
151 rdt_lookup(redup_table_t
*rdt
,
152 uint64_t guid
, uint64_t object
, uint64_t offset
,
153 uint64_t *stream_offsetp
)
155 uint64_t ch
= cityhash3(guid
, object
, offset
);
156 uint64_t hashcode
= BF64_GET(ch
, 0, rdt
->numhashbits
);
158 for (redup_entry_t
*rde
= rdt
->redup_hash_array
[hashcode
];
159 rde
!= NULL
; rde
= rde
->rde_next
) {
160 if (rde
->rde_guid
== guid
&&
161 rde
->rde_object
== object
&&
162 rde
->rde_offset
== offset
) {
163 *stream_offsetp
= rde
->rde_stream_offset
;
167 assert(!"could not find expected redup table entry");
171 * Convert a dedup stream (generated by "zfs send -D") to a
172 * non-deduplicated stream. The entire infd will be converted, including
173 * any substreams in a stream package (generated by "zfs send -RD"). The
174 * infd must be seekable.
177 zfs_redup_stream(int infd
, int outfd
, boolean_t verbose
)
179 int bufsz
= SPA_MAXBLOCKSIZE
;
180 dmu_replay_record_t thedrr
;
181 dmu_replay_record_t
*drr
= &thedrr
;
183 zio_cksum_t stream_cksum
;
185 uint64_t num_records
= 0;
186 uint64_t num_write_byref_records
= 0;
188 memset(&thedrr
, 0, sizeof (dmu_replay_record_t
));
191 uint64_t max_rde_size
= SMALLEST_POSSIBLE_MAX_RDT_MB
<< 20;
193 uint64_t physmem
= sysconf(_SC_PHYS_PAGES
) * sysconf(_SC_PAGESIZE
);
194 uint64_t max_rde_size
=
195 MAX((physmem
* MAX_RDT_PHYSMEM_PERCENT
) / 100,
196 SMALLEST_POSSIBLE_MAX_RDT_MB
<< 20);
199 numbuckets
= max_rde_size
/ (sizeof (redup_entry_t
));
202 * numbuckets must be a power of 2. Increase number to
203 * a power of 2 if necessary.
205 if (!ISP2(numbuckets
))
206 numbuckets
= 1ULL << highbit64(numbuckets
);
208 rdt
.redup_hash_array
=
209 safe_calloc(numbuckets
* sizeof (redup_entry_t
*));
210 rdt
.ddecache
= umem_cache_create("rde", sizeof (redup_entry_t
), 0,
211 NULL
, NULL
, NULL
, NULL
, NULL
, 0);
212 rdt
.numhashbits
= highbit64(numbuckets
) - 1;
215 char *buf
= safe_calloc(bufsz
);
216 FILE *ofp
= fdopen(infd
, "r");
217 long offset
= ftell(ofp
);
219 boolean_t seen
= B_FALSE
;
220 while (sfread(drr
, sizeof (*drr
), ofp
) != 0) {
224 * We need to regenerate the checksum.
226 if (drr
->drr_type
!= DRR_BEGIN
) {
227 memset(&drr
->drr_u
.drr_checksum
.drr_checksum
, 0,
228 sizeof (drr
->drr_u
.drr_checksum
.drr_checksum
));
231 uint64_t payload_size
= 0;
232 switch (drr
->drr_type
) {
235 struct drr_begin
*drrb
= &drr
->drr_u
.drr_begin
;
237 ZIO_SET_CHECKSUM(&stream_cksum
, 0, 0, 0, 0);
241 assert(drrb
->drr_magic
== DMU_BACKUP_MAGIC
);
243 /* clear the DEDUP feature flag for this stream */
244 fflags
= DMU_GET_FEATUREFLAGS(drrb
->drr_versioninfo
);
245 fflags
&= ~(DMU_BACKUP_FEATURE_DEDUP
|
246 DMU_BACKUP_FEATURE_DEDUPPROPS
);
247 /* cppcheck-suppress syntaxError */
248 DMU_SET_FEATUREFLAGS(drrb
->drr_versioninfo
, fflags
);
250 uint32_t sz
= drr
->drr_payloadlen
;
252 VERIFY3U(sz
, <=, 1U << 28);
257 buf
= safe_calloc(sz
);
260 (void) sfread(buf
, sz
, ofp
);
268 struct drr_end
*drre
= &drr
->drr_u
.drr_end
;
270 * We would prefer to just check --begin == 0, but
271 * replication streams have an end of stream END
272 * record, so we must avoid tripping it.
274 VERIFY3B(seen
, ==, B_TRUE
);
277 * Use the recalculated checksum, unless this is
278 * the END record of a stream package, which has
281 if (!ZIO_CHECKSUM_IS_ZERO(&drre
->drr_checksum
))
282 drre
->drr_checksum
= stream_cksum
;
288 struct drr_object
*drro
= &drr
->drr_u
.drr_object
;
289 VERIFY3S(begin
, ==, 1);
291 if (drro
->drr_bonuslen
> 0) {
292 payload_size
= DRR_OBJECT_PAYLOAD_SIZE(drro
);
293 (void) sfread(buf
, payload_size
, ofp
);
300 struct drr_spill
*drrs
= &drr
->drr_u
.drr_spill
;
301 VERIFY3S(begin
, ==, 1);
302 payload_size
= DRR_SPILL_PAYLOAD_SIZE(drrs
);
303 (void) sfread(buf
, payload_size
, ofp
);
307 case DRR_WRITE_BYREF
:
309 struct drr_write_byref drrwb
=
310 drr
->drr_u
.drr_write_byref
;
311 VERIFY3S(begin
, ==, 1);
313 num_write_byref_records
++;
316 * Look up in hash table by drrwb->drr_refguid,
317 * drr_refobject, drr_refoffset. Replace this
318 * record with the found WRITE record, but with
319 * drr_object,drr_offset,drr_toguid replaced with ours.
321 uint64_t stream_offset
= 0;
322 rdt_lookup(&rdt
, drrwb
.drr_refguid
,
323 drrwb
.drr_refobject
, drrwb
.drr_refoffset
,
326 spread(infd
, drr
, sizeof (*drr
), stream_offset
);
328 assert(drr
->drr_type
== DRR_WRITE
);
329 struct drr_write
*drrw
= &drr
->drr_u
.drr_write
;
330 assert(drrw
->drr_toguid
== drrwb
.drr_refguid
);
331 assert(drrw
->drr_object
== drrwb
.drr_refobject
);
332 assert(drrw
->drr_offset
== drrwb
.drr_refoffset
);
334 payload_size
= DRR_WRITE_PAYLOAD_SIZE(drrw
);
335 spread(infd
, buf
, payload_size
,
336 stream_offset
+ sizeof (*drr
));
338 drrw
->drr_toguid
= drrwb
.drr_toguid
;
339 drrw
->drr_object
= drrwb
.drr_object
;
340 drrw
->drr_offset
= drrwb
.drr_offset
;
346 struct drr_write
*drrw
= &drr
->drr_u
.drr_write
;
347 VERIFY3S(begin
, ==, 1);
348 payload_size
= DRR_WRITE_PAYLOAD_SIZE(drrw
);
349 (void) sfread(buf
, payload_size
, ofp
);
351 rdt_insert(&rdt
, drrw
->drr_toguid
,
352 drrw
->drr_object
, drrw
->drr_offset
, offset
);
356 case DRR_WRITE_EMBEDDED
:
358 struct drr_write_embedded
*drrwe
=
359 &drr
->drr_u
.drr_write_embedded
;
360 VERIFY3S(begin
, ==, 1);
362 P2ROUNDUP((uint64_t)drrwe
->drr_psize
, 8);
363 (void) sfread(buf
, payload_size
, ofp
);
367 case DRR_FREEOBJECTS
:
369 case DRR_OBJECT_RANGE
:
370 VERIFY3S(begin
, ==, 1);
374 (void) fprintf(stderr
, "INVALID record type 0x%x\n",
376 /* should never happen, so assert */
381 fprintf(stderr
, "Error: unexpected end-of-file\n");
385 fprintf(stderr
, "Error while reading file: %s\n",
391 * We need to recalculate the checksum, and it needs to be
392 * initially zero to do that. BEGIN records don't have
395 if (drr
->drr_type
!= DRR_BEGIN
) {
396 memset(&drr
->drr_u
.drr_checksum
.drr_checksum
, 0,
397 sizeof (drr
->drr_u
.drr_checksum
.drr_checksum
));
399 if (dump_record(drr
, buf
, payload_size
,
400 &stream_cksum
, outfd
) != 0)
402 if (drr
->drr_type
== DRR_END
) {
404 * Typically the END record is either the last
405 * thing in the stream, or it is followed
406 * by a BEGIN record (which also zeros the checksum).
407 * However, a stream package ends with two END
408 * records. The last END record's checksum starts
411 ZIO_SET_CHECKSUM(&stream_cksum
, 0, 0, 0, 0);
418 zfs_nicenum(rdt
.ddt_count
* sizeof (redup_entry_t
),
419 mem_str
, sizeof (mem_str
));
420 fprintf(stderr
, "converted stream with %llu total records, "
421 "including %llu dedup records, using %sB memory.\n",
422 (long long)num_records
,
423 (long long)num_write_byref_records
,
427 umem_cache_destroy(rdt
.ddecache
);
428 free(rdt
.redup_hash_array
);
434 zstream_do_redup(int argc
, char *argv
[])
436 boolean_t verbose
= B_FALSE
;
439 while ((c
= getopt(argc
, argv
, "v")) != -1) {
445 (void) fprintf(stderr
, "invalid option '%c'\n",
458 const char *filename
= argv
[0];
460 if (isatty(STDOUT_FILENO
)) {
461 (void) fprintf(stderr
,
462 "Error: Stream can not be written to a terminal.\n"
463 "You must redirect standard output.\n");
467 int fd
= open(filename
, O_RDONLY
);
469 (void) fprintf(stderr
,
470 "Error while opening file '%s': %s\n",
471 filename
, strerror(errno
));
476 zfs_redup_stream(fd
, STDOUT_FILENO
, verbose
);