4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or https://opensource.org/licenses/CDDL-1.0.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26 * Copyright 2014 HybridCluster. All rights reserved.
27 * Copyright 2016 RackTop Systems.
28 * Copyright (c) 2016 Actifio, Inc. All rights reserved.
29 * Copyright (c) 2019, 2024, Klara, Inc.
30 * Copyright (c) 2019, Allan Jude
34 #include <sys/dmu_impl.h>
35 #include <sys/dmu_tx.h>
37 #include <sys/dnode.h>
38 #include <sys/zfs_context.h>
39 #include <sys/dmu_objset.h>
40 #include <sys/dmu_traverse.h>
41 #include <sys/dsl_dataset.h>
42 #include <sys/dsl_dir.h>
43 #include <sys/dsl_prop.h>
44 #include <sys/dsl_pool.h>
45 #include <sys/dsl_synctask.h>
46 #include <sys/spa_impl.h>
47 #include <sys/zfs_ioctl.h>
49 #include <sys/zio_checksum.h>
50 #include <sys/zfs_znode.h>
51 #include <zfs_fletcher.h>
54 #include <sys/zfs_onexit.h>
55 #include <sys/dmu_send.h>
56 #include <sys/dmu_recv.h>
57 #include <sys/dsl_destroy.h>
58 #include <sys/blkptr.h>
59 #include <sys/dsl_bookmark.h>
60 #include <sys/zfeature.h>
61 #include <sys/bqueue.h>
63 #include <sys/policy.h>
64 #include <sys/objlist.h>
66 #include <sys/zfs_vfsops.h>
69 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
70 static int zfs_send_corrupt_data
= B_FALSE
;
72 * This tunable controls the amount of data (measured in bytes) that will be
73 * prefetched by zfs send. If the main thread is blocking on reads that haven't
74 * completed, this variable might need to be increased. If instead the main
75 * thread is issuing new reads because the prefetches have fallen out of the
76 * cache, this may need to be decreased.
78 static uint_t zfs_send_queue_length
= SPA_MAXBLOCKSIZE
;
80 * This tunable controls the length of the queues that zfs send worker threads
81 * use to communicate. If the send_main_thread is blocking on these queues,
82 * this variable may need to be increased. If there is a significant slowdown
83 * at the start of a send as these threads consume all the available IO
84 * resources, this variable may need to be decreased.
86 static uint_t zfs_send_no_prefetch_queue_length
= 1024 * 1024;
88 * These tunables control the fill fraction of the queues by zfs send. The fill
89 * fraction controls the frequency with which threads have to be cv_signaled.
90 * If a lot of cpu time is being spent on cv_signal, then these should be tuned
91 * down. If the queues empty before the signalled thread can catch up, then
92 * these should be tuned up.
94 static uint_t zfs_send_queue_ff
= 20;
95 static uint_t zfs_send_no_prefetch_queue_ff
= 20;
98 * Use this to override the recordsize calculation for fast zfs send estimates.
100 static uint_t zfs_override_estimate_recordsize
= 0;
102 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
103 static const boolean_t zfs_send_set_freerecords_bit
= B_TRUE
;
105 /* Set this tunable to FALSE is disable sending unmodified spill blocks. */
106 static int zfs_send_unmodified_spill_blocks
= B_TRUE
;
108 static inline boolean_t
109 overflow_multiply(uint64_t a
, uint64_t b
, uint64_t *c
)
111 uint64_t temp
= a
* b
;
112 if (b
!= 0 && temp
/ b
!= a
)
118 struct send_thread_arg
{
120 objset_t
*os
; /* Objset to traverse */
121 uint64_t fromtxg
; /* Traverse from this txg */
122 int flags
; /* flags to pass to traverse_dataset */
125 zbookmark_phys_t resume
;
126 uint64_t *num_blocks_visited
;
129 struct redact_list_thread_arg
{
132 zbookmark_phys_t resume
;
133 redaction_list_t
*rl
;
134 boolean_t mark_redact
;
136 uint64_t *num_blocks_visited
;
139 struct send_merge_thread_arg
{
142 struct redact_list_thread_arg
*from_arg
;
143 struct send_thread_arg
*to_arg
;
144 struct redact_list_thread_arg
*redact_arg
;
150 boolean_t eos_marker
; /* Marks the end of the stream */
152 uint64_t start_blkid
;
155 enum type
{DATA
, HOLE
, OBJECT
, OBJECT_RANGE
, REDACT
,
156 PREVIOUSLY_REDACTED
} type
;
159 dmu_object_type_t obj_type
;
160 uint32_t datablksz
; // logical size
161 uint32_t datasz
; // payload size
167 boolean_t io_outstanding
;
168 boolean_t io_compressed
;
176 * This is a pointer because embedding it in the
177 * struct causes these structures to be massively larger
178 * for all range types; this makes the code much less
183 /* Piggyback unmodified spill block */
184 struct send_range
*spill_range
;
196 * The list of data whose inclusion in a send stream can be pending from
197 * one call to backup_cb to another. Multiple calls to dump_free(),
198 * dump_freeobjects(), and dump_redact() can be aggregated into a single
199 * DRR_FREE, DRR_FREEOBJECTS, or DRR_REDACT replay record.
208 typedef struct dmu_send_cookie
{
209 dmu_replay_record_t
*dsc_drr
;
210 dmu_send_outparams_t
*dsc_dso
;
215 uint64_t dsc_fromtxg
;
217 dmu_pendop_t dsc_pending_op
;
218 uint64_t dsc_featureflags
;
219 uint64_t dsc_last_data_object
;
220 uint64_t dsc_last_data_offset
;
221 uint64_t dsc_resume_object
;
222 uint64_t dsc_resume_offset
;
223 boolean_t dsc_sent_begin
;
224 boolean_t dsc_sent_end
;
227 static int do_dump(dmu_send_cookie_t
*dscp
, struct send_range
*range
);
230 range_free(struct send_range
*range
)
232 if (range
->type
== OBJECT
) {
233 size_t size
= sizeof (dnode_phys_t
) *
234 (range
->sru
.object
.dnp
->dn_extra_slots
+ 1);
235 kmem_free(range
->sru
.object
.dnp
, size
);
236 if (range
->sru
.object
.spill_range
)
237 range_free(range
->sru
.object
.spill_range
);
238 } else if (range
->type
== DATA
) {
239 mutex_enter(&range
->sru
.data
.lock
);
240 while (range
->sru
.data
.io_outstanding
)
241 cv_wait(&range
->sru
.data
.cv
, &range
->sru
.data
.lock
);
242 if (range
->sru
.data
.abd
!= NULL
)
243 abd_free(range
->sru
.data
.abd
);
244 if (range
->sru
.data
.abuf
!= NULL
) {
245 arc_buf_destroy(range
->sru
.data
.abuf
,
246 &range
->sru
.data
.abuf
);
248 mutex_exit(&range
->sru
.data
.lock
);
250 cv_destroy(&range
->sru
.data
.cv
);
251 mutex_destroy(&range
->sru
.data
.lock
);
253 kmem_free(range
, sizeof (*range
));
257 * For all record types except BEGIN, fill in the checksum (overlaid in
258 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything
259 * up to the start of the checksum itself.
262 dump_record(dmu_send_cookie_t
*dscp
, void *payload
, int payload_len
)
264 dmu_send_outparams_t
*dso
= dscp
->dsc_dso
;
265 ASSERT3U(offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
),
266 ==, sizeof (dmu_replay_record_t
) - sizeof (zio_cksum_t
));
267 (void) fletcher_4_incremental_native(dscp
->dsc_drr
,
268 offsetof(dmu_replay_record_t
, drr_u
.drr_checksum
.drr_checksum
),
270 if (dscp
->dsc_drr
->drr_type
== DRR_BEGIN
) {
271 dscp
->dsc_sent_begin
= B_TRUE
;
273 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dscp
->dsc_drr
->drr_u
.
274 drr_checksum
.drr_checksum
));
275 dscp
->dsc_drr
->drr_u
.drr_checksum
.drr_checksum
= dscp
->dsc_zc
;
277 if (dscp
->dsc_drr
->drr_type
== DRR_END
) {
278 dscp
->dsc_sent_end
= B_TRUE
;
280 (void) fletcher_4_incremental_native(&dscp
->dsc_drr
->
281 drr_u
.drr_checksum
.drr_checksum
,
282 sizeof (zio_cksum_t
), &dscp
->dsc_zc
);
283 *dscp
->dsc_off
+= sizeof (dmu_replay_record_t
);
284 dscp
->dsc_err
= dso
->dso_outfunc(dscp
->dsc_os
, dscp
->dsc_drr
,
285 sizeof (dmu_replay_record_t
), dso
->dso_arg
);
286 if (dscp
->dsc_err
!= 0)
287 return (SET_ERROR(EINTR
));
288 if (payload_len
!= 0) {
289 *dscp
->dsc_off
+= payload_len
;
291 * payload is null when dso_dryrun == B_TRUE (i.e. when we're
292 * doing a send size calculation)
294 if (payload
!= NULL
) {
295 (void) fletcher_4_incremental_native(
296 payload
, payload_len
, &dscp
->dsc_zc
);
300 * The code does not rely on this (len being a multiple of 8).
301 * We keep this assertion because of the corresponding assertion
302 * in receive_read(). Keeping this assertion ensures that we do
303 * not inadvertently break backwards compatibility (causing the
304 * assertion in receive_read() to trigger on old software).
306 * Raw sends cannot be received on old software, and so can
307 * bypass this assertion.
310 ASSERT((payload_len
% 8 == 0) ||
311 (dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
));
313 dscp
->dsc_err
= dso
->dso_outfunc(dscp
->dsc_os
, payload
,
314 payload_len
, dso
->dso_arg
);
315 if (dscp
->dsc_err
!= 0)
316 return (SET_ERROR(EINTR
));
322 * Fill in the drr_free struct, or perform aggregation if the previous record is
323 * also a free record, and the two are adjacent.
325 * Note that we send free records even for a full send, because we want to be
326 * able to receive a full send as a clone, which requires a list of all the free
327 * and freeobject records that were generated on the source.
330 dump_free(dmu_send_cookie_t
*dscp
, uint64_t object
, uint64_t offset
,
333 struct drr_free
*drrf
= &(dscp
->dsc_drr
->drr_u
.drr_free
);
336 * When we receive a free record, dbuf_free_range() assumes
337 * that the receiving system doesn't have any dbufs in the range
338 * being freed. This is always true because there is a one-record
339 * constraint: we only send one WRITE record for any given
340 * object,offset. We know that the one-record constraint is
341 * true because we always send data in increasing order by
344 * If the increasing-order constraint ever changes, we should find
345 * another way to assert that the one-record constraint is still
348 ASSERT(object
> dscp
->dsc_last_data_object
||
349 (object
== dscp
->dsc_last_data_object
&&
350 offset
> dscp
->dsc_last_data_offset
));
353 * If there is a pending op, but it's not PENDING_FREE, push it out,
354 * since free block aggregation can only be done for blocks of the
355 * same type (i.e., DRR_FREE records can only be aggregated with
356 * other DRR_FREE records. DRR_FREEOBJECTS records can only be
357 * aggregated with other DRR_FREEOBJECTS records).
359 if (dscp
->dsc_pending_op
!= PENDING_NONE
&&
360 dscp
->dsc_pending_op
!= PENDING_FREE
) {
361 if (dump_record(dscp
, NULL
, 0) != 0)
362 return (SET_ERROR(EINTR
));
363 dscp
->dsc_pending_op
= PENDING_NONE
;
366 if (dscp
->dsc_pending_op
== PENDING_FREE
) {
368 * Check to see whether this free block can be aggregated
371 if (drrf
->drr_object
== object
&& drrf
->drr_offset
+
372 drrf
->drr_length
== offset
) {
373 if (offset
+ length
< offset
|| length
== UINT64_MAX
)
374 drrf
->drr_length
= UINT64_MAX
;
376 drrf
->drr_length
+= length
;
379 /* not a continuation. Push out pending record */
380 if (dump_record(dscp
, NULL
, 0) != 0)
381 return (SET_ERROR(EINTR
));
382 dscp
->dsc_pending_op
= PENDING_NONE
;
385 /* create a FREE record and make it pending */
386 memset(dscp
->dsc_drr
, 0, sizeof (dmu_replay_record_t
));
387 dscp
->dsc_drr
->drr_type
= DRR_FREE
;
388 drrf
->drr_object
= object
;
389 drrf
->drr_offset
= offset
;
390 if (offset
+ length
< offset
)
391 drrf
->drr_length
= DMU_OBJECT_END
;
393 drrf
->drr_length
= length
;
394 drrf
->drr_toguid
= dscp
->dsc_toguid
;
395 if (length
== DMU_OBJECT_END
) {
396 if (dump_record(dscp
, NULL
, 0) != 0)
397 return (SET_ERROR(EINTR
));
399 dscp
->dsc_pending_op
= PENDING_FREE
;
406 * Fill in the drr_redact struct, or perform aggregation if the previous record
407 * is also a redaction record, and the two are adjacent.
410 dump_redact(dmu_send_cookie_t
*dscp
, uint64_t object
, uint64_t offset
,
413 struct drr_redact
*drrr
= &dscp
->dsc_drr
->drr_u
.drr_redact
;
416 * If there is a pending op, but it's not PENDING_REDACT, push it out,
417 * since free block aggregation can only be done for blocks of the
418 * same type (i.e., DRR_REDACT records can only be aggregated with
419 * other DRR_REDACT records).
421 if (dscp
->dsc_pending_op
!= PENDING_NONE
&&
422 dscp
->dsc_pending_op
!= PENDING_REDACT
) {
423 if (dump_record(dscp
, NULL
, 0) != 0)
424 return (SET_ERROR(EINTR
));
425 dscp
->dsc_pending_op
= PENDING_NONE
;
428 if (dscp
->dsc_pending_op
== PENDING_REDACT
) {
430 * Check to see whether this redacted block can be aggregated
433 if (drrr
->drr_object
== object
&& drrr
->drr_offset
+
434 drrr
->drr_length
== offset
) {
435 drrr
->drr_length
+= length
;
438 /* not a continuation. Push out pending record */
439 if (dump_record(dscp
, NULL
, 0) != 0)
440 return (SET_ERROR(EINTR
));
441 dscp
->dsc_pending_op
= PENDING_NONE
;
444 /* create a REDACT record and make it pending */
445 memset(dscp
->dsc_drr
, 0, sizeof (dmu_replay_record_t
));
446 dscp
->dsc_drr
->drr_type
= DRR_REDACT
;
447 drrr
->drr_object
= object
;
448 drrr
->drr_offset
= offset
;
449 drrr
->drr_length
= length
;
450 drrr
->drr_toguid
= dscp
->dsc_toguid
;
451 dscp
->dsc_pending_op
= PENDING_REDACT
;
457 dmu_dump_write(dmu_send_cookie_t
*dscp
, dmu_object_type_t type
, uint64_t object
,
458 uint64_t offset
, int lsize
, int psize
, const blkptr_t
*bp
,
459 boolean_t io_compressed
, void *data
)
461 uint64_t payload_size
;
462 boolean_t raw
= (dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
);
463 struct drr_write
*drrw
= &(dscp
->dsc_drr
->drr_u
.drr_write
);
466 * We send data in increasing object, offset order.
467 * See comment in dump_free() for details.
469 ASSERT(object
> dscp
->dsc_last_data_object
||
470 (object
== dscp
->dsc_last_data_object
&&
471 offset
> dscp
->dsc_last_data_offset
));
472 dscp
->dsc_last_data_object
= object
;
473 dscp
->dsc_last_data_offset
= offset
+ lsize
- 1;
476 * If there is any kind of pending aggregation (currently either
477 * a grouping of free objects or free blocks), push it out to
478 * the stream, since aggregation can't be done across operations
479 * of different types.
481 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
482 if (dump_record(dscp
, NULL
, 0) != 0)
483 return (SET_ERROR(EINTR
));
484 dscp
->dsc_pending_op
= PENDING_NONE
;
486 /* write a WRITE record */
487 memset(dscp
->dsc_drr
, 0, sizeof (dmu_replay_record_t
));
488 dscp
->dsc_drr
->drr_type
= DRR_WRITE
;
489 drrw
->drr_object
= object
;
490 drrw
->drr_type
= type
;
491 drrw
->drr_offset
= offset
;
492 drrw
->drr_toguid
= dscp
->dsc_toguid
;
493 drrw
->drr_logical_size
= lsize
;
495 /* only set the compression fields if the buf is compressed or raw */
496 boolean_t compressed
=
497 (bp
!= NULL
? BP_GET_COMPRESS(bp
) != ZIO_COMPRESS_OFF
&&
498 io_compressed
: lsize
!= psize
);
499 if (raw
|| compressed
) {
501 ASSERT(raw
|| dscp
->dsc_featureflags
&
502 DMU_BACKUP_FEATURE_COMPRESSED
);
503 ASSERT(!BP_IS_EMBEDDED(bp
));
504 ASSERT3S(psize
, >, 0);
507 ASSERT(BP_IS_PROTECTED(bp
));
510 * This is a raw protected block so we need to pass
511 * along everything the receiving side will need to
512 * interpret this block, including the byteswap, salt,
515 if (BP_SHOULD_BYTESWAP(bp
))
516 drrw
->drr_flags
|= DRR_RAW_BYTESWAP
;
517 zio_crypt_decode_params_bp(bp
, drrw
->drr_salt
,
519 zio_crypt_decode_mac_bp(bp
, drrw
->drr_mac
);
521 /* this is a compressed block */
522 ASSERT(dscp
->dsc_featureflags
&
523 DMU_BACKUP_FEATURE_COMPRESSED
);
524 ASSERT(!BP_SHOULD_BYTESWAP(bp
));
525 ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp
)));
526 ASSERT3U(BP_GET_COMPRESS(bp
), !=, ZIO_COMPRESS_OFF
);
527 ASSERT3S(lsize
, >=, psize
);
530 /* set fields common to compressed and raw sends */
531 drrw
->drr_compressiontype
= BP_GET_COMPRESS(bp
);
532 drrw
->drr_compressed_size
= psize
;
533 payload_size
= drrw
->drr_compressed_size
;
535 payload_size
= drrw
->drr_logical_size
;
538 if (bp
== NULL
|| BP_IS_EMBEDDED(bp
) || (BP_IS_PROTECTED(bp
) && !raw
)) {
540 * There's no pre-computed checksum for partial-block writes,
541 * embedded BP's, or encrypted BP's that are being sent as
542 * plaintext, so (like fletcher4-checksummed blocks) userland
543 * will have to compute a dedup-capable checksum itself.
545 drrw
->drr_checksumtype
= ZIO_CHECKSUM_OFF
;
547 drrw
->drr_checksumtype
= BP_GET_CHECKSUM(bp
);
548 if (zio_checksum_table
[drrw
->drr_checksumtype
].ci_flags
&
549 ZCHECKSUM_FLAG_DEDUP
)
550 drrw
->drr_flags
|= DRR_CHECKSUM_DEDUP
;
551 DDK_SET_LSIZE(&drrw
->drr_key
, BP_GET_LSIZE(bp
));
552 DDK_SET_PSIZE(&drrw
->drr_key
, BP_GET_PSIZE(bp
));
553 DDK_SET_COMPRESS(&drrw
->drr_key
, BP_GET_COMPRESS(bp
));
554 DDK_SET_CRYPT(&drrw
->drr_key
, BP_IS_PROTECTED(bp
));
555 drrw
->drr_key
.ddk_cksum
= bp
->blk_cksum
;
558 if (dump_record(dscp
, data
, payload_size
) != 0)
559 return (SET_ERROR(EINTR
));
564 dump_write_embedded(dmu_send_cookie_t
*dscp
, uint64_t object
, uint64_t offset
,
565 int blksz
, const blkptr_t
*bp
)
567 char buf
[BPE_PAYLOAD_SIZE
];
568 struct drr_write_embedded
*drrw
=
569 &(dscp
->dsc_drr
->drr_u
.drr_write_embedded
);
571 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
572 if (dump_record(dscp
, NULL
, 0) != 0)
573 return (SET_ERROR(EINTR
));
574 dscp
->dsc_pending_op
= PENDING_NONE
;
577 ASSERT(BP_IS_EMBEDDED(bp
));
579 memset(dscp
->dsc_drr
, 0, sizeof (dmu_replay_record_t
));
580 dscp
->dsc_drr
->drr_type
= DRR_WRITE_EMBEDDED
;
581 drrw
->drr_object
= object
;
582 drrw
->drr_offset
= offset
;
583 drrw
->drr_length
= blksz
;
584 drrw
->drr_toguid
= dscp
->dsc_toguid
;
585 drrw
->drr_compression
= BP_GET_COMPRESS(bp
);
586 drrw
->drr_etype
= BPE_GET_ETYPE(bp
);
587 drrw
->drr_lsize
= BPE_GET_LSIZE(bp
);
588 drrw
->drr_psize
= BPE_GET_PSIZE(bp
);
590 decode_embedded_bp_compressed(bp
, buf
);
592 uint32_t psize
= drrw
->drr_psize
;
593 uint32_t rsize
= P2ROUNDUP(psize
, 8);
596 memset(buf
+ psize
, 0, rsize
- psize
);
598 if (dump_record(dscp
, buf
, rsize
) != 0)
599 return (SET_ERROR(EINTR
));
604 dump_spill(dmu_send_cookie_t
*dscp
, const blkptr_t
*bp
, uint64_t object
,
607 struct drr_spill
*drrs
= &(dscp
->dsc_drr
->drr_u
.drr_spill
);
608 uint64_t blksz
= BP_GET_LSIZE(bp
);
609 uint64_t payload_size
= blksz
;
611 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
612 if (dump_record(dscp
, NULL
, 0) != 0)
613 return (SET_ERROR(EINTR
));
614 dscp
->dsc_pending_op
= PENDING_NONE
;
617 /* write a SPILL record */
618 memset(dscp
->dsc_drr
, 0, sizeof (dmu_replay_record_t
));
619 dscp
->dsc_drr
->drr_type
= DRR_SPILL
;
620 drrs
->drr_object
= object
;
621 drrs
->drr_length
= blksz
;
622 drrs
->drr_toguid
= dscp
->dsc_toguid
;
624 /* See comment in piggyback_unmodified_spill() for full details */
625 if (zfs_send_unmodified_spill_blocks
&&
626 (BP_GET_LOGICAL_BIRTH(bp
) <= dscp
->dsc_fromtxg
)) {
627 drrs
->drr_flags
|= DRR_SPILL_UNMODIFIED
;
630 /* handle raw send fields */
631 if (dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
) {
632 ASSERT(BP_IS_PROTECTED(bp
));
634 if (BP_SHOULD_BYTESWAP(bp
))
635 drrs
->drr_flags
|= DRR_RAW_BYTESWAP
;
636 drrs
->drr_compressiontype
= BP_GET_COMPRESS(bp
);
637 drrs
->drr_compressed_size
= BP_GET_PSIZE(bp
);
638 zio_crypt_decode_params_bp(bp
, drrs
->drr_salt
, drrs
->drr_iv
);
639 zio_crypt_decode_mac_bp(bp
, drrs
->drr_mac
);
640 payload_size
= drrs
->drr_compressed_size
;
643 if (dump_record(dscp
, data
, payload_size
) != 0)
644 return (SET_ERROR(EINTR
));
649 dump_freeobjects(dmu_send_cookie_t
*dscp
, uint64_t firstobj
, uint64_t numobjs
)
651 struct drr_freeobjects
*drrfo
= &(dscp
->dsc_drr
->drr_u
.drr_freeobjects
);
652 uint64_t maxobj
= DNODES_PER_BLOCK
*
653 (DMU_META_DNODE(dscp
->dsc_os
)->dn_maxblkid
+ 1);
656 * ZoL < 0.7 does not handle large FREEOBJECTS records correctly,
657 * leading to zfs recv never completing. to avoid this issue, don't
658 * send FREEOBJECTS records for object IDs which cannot exist on the
662 if (maxobj
<= firstobj
)
665 if (maxobj
< firstobj
+ numobjs
)
666 numobjs
= maxobj
- firstobj
;
670 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
671 * push it out, since free block aggregation can only be done for
672 * blocks of the same type (i.e., DRR_FREE records can only be
673 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records
674 * can only be aggregated with other DRR_FREEOBJECTS records).
676 if (dscp
->dsc_pending_op
!= PENDING_NONE
&&
677 dscp
->dsc_pending_op
!= PENDING_FREEOBJECTS
) {
678 if (dump_record(dscp
, NULL
, 0) != 0)
679 return (SET_ERROR(EINTR
));
680 dscp
->dsc_pending_op
= PENDING_NONE
;
683 if (dscp
->dsc_pending_op
== PENDING_FREEOBJECTS
) {
685 * See whether this free object array can be aggregated
688 if (drrfo
->drr_firstobj
+ drrfo
->drr_numobjs
== firstobj
) {
689 drrfo
->drr_numobjs
+= numobjs
;
692 /* can't be aggregated. Push out pending record */
693 if (dump_record(dscp
, NULL
, 0) != 0)
694 return (SET_ERROR(EINTR
));
695 dscp
->dsc_pending_op
= PENDING_NONE
;
699 /* write a FREEOBJECTS record */
700 memset(dscp
->dsc_drr
, 0, sizeof (dmu_replay_record_t
));
701 dscp
->dsc_drr
->drr_type
= DRR_FREEOBJECTS
;
702 drrfo
->drr_firstobj
= firstobj
;
703 drrfo
->drr_numobjs
= numobjs
;
704 drrfo
->drr_toguid
= dscp
->dsc_toguid
;
706 dscp
->dsc_pending_op
= PENDING_FREEOBJECTS
;
712 dump_dnode(dmu_send_cookie_t
*dscp
, const blkptr_t
*bp
, uint64_t object
,
715 struct drr_object
*drro
= &(dscp
->dsc_drr
->drr_u
.drr_object
);
718 if (object
< dscp
->dsc_resume_object
) {
720 * Note: when resuming, we will visit all the dnodes in
721 * the block of dnodes that we are resuming from. In
722 * this case it's unnecessary to send the dnodes prior to
723 * the one we are resuming from. We should be at most one
724 * block's worth of dnodes behind the resume point.
726 ASSERT3U(dscp
->dsc_resume_object
- object
, <,
727 1 << (DNODE_BLOCK_SHIFT
- DNODE_SHIFT
));
731 if (dnp
== NULL
|| dnp
->dn_type
== DMU_OT_NONE
)
732 return (dump_freeobjects(dscp
, object
, 1));
734 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
735 if (dump_record(dscp
, NULL
, 0) != 0)
736 return (SET_ERROR(EINTR
));
737 dscp
->dsc_pending_op
= PENDING_NONE
;
740 /* write an OBJECT record */
741 memset(dscp
->dsc_drr
, 0, sizeof (dmu_replay_record_t
));
742 dscp
->dsc_drr
->drr_type
= DRR_OBJECT
;
743 drro
->drr_object
= object
;
744 drro
->drr_type
= dnp
->dn_type
;
745 drro
->drr_bonustype
= dnp
->dn_bonustype
;
746 drro
->drr_blksz
= dnp
->dn_datablkszsec
<< SPA_MINBLOCKSHIFT
;
747 drro
->drr_bonuslen
= dnp
->dn_bonuslen
;
748 drro
->drr_dn_slots
= dnp
->dn_extra_slots
+ 1;
749 drro
->drr_checksumtype
= dnp
->dn_checksum
;
750 drro
->drr_compress
= dnp
->dn_compress
;
751 drro
->drr_toguid
= dscp
->dsc_toguid
;
753 if (!(dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_LARGE_BLOCKS
) &&
754 drro
->drr_blksz
> SPA_OLD_MAXBLOCKSIZE
)
755 drro
->drr_blksz
= SPA_OLD_MAXBLOCKSIZE
;
757 bonuslen
= P2ROUNDUP(dnp
->dn_bonuslen
, 8);
759 if ((dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
)) {
760 ASSERT(BP_IS_ENCRYPTED(bp
));
762 if (BP_SHOULD_BYTESWAP(bp
))
763 drro
->drr_flags
|= DRR_RAW_BYTESWAP
;
765 /* needed for reconstructing dnp on recv side */
766 drro
->drr_maxblkid
= dnp
->dn_maxblkid
;
767 drro
->drr_indblkshift
= dnp
->dn_indblkshift
;
768 drro
->drr_nlevels
= dnp
->dn_nlevels
;
769 drro
->drr_nblkptr
= dnp
->dn_nblkptr
;
772 * Since we encrypt the entire bonus area, the (raw) part
773 * beyond the bonuslen is actually nonzero, so we need
777 if (drro
->drr_bonuslen
> DN_MAX_BONUS_LEN(dnp
))
778 return (SET_ERROR(EINVAL
));
779 drro
->drr_raw_bonuslen
= DN_MAX_BONUS_LEN(dnp
);
780 bonuslen
= drro
->drr_raw_bonuslen
;
785 * DRR_OBJECT_SPILL is set for every dnode which references a
786 * spill block. This allows the receiving pool to definitively
787 * determine when a spill block should be kept or freed.
789 if (dnp
->dn_flags
& DNODE_FLAG_SPILL_BLKPTR
)
790 drro
->drr_flags
|= DRR_OBJECT_SPILL
;
792 if (dump_record(dscp
, DN_BONUS(dnp
), bonuslen
) != 0)
793 return (SET_ERROR(EINTR
));
795 /* Free anything past the end of the file. */
796 if (dump_free(dscp
, object
, (dnp
->dn_maxblkid
+ 1) *
797 (dnp
->dn_datablkszsec
<< SPA_MINBLOCKSHIFT
), DMU_OBJECT_END
) != 0)
798 return (SET_ERROR(EINTR
));
800 if (dscp
->dsc_err
!= 0)
801 return (SET_ERROR(EINTR
));
807 dump_object_range(dmu_send_cookie_t
*dscp
, const blkptr_t
*bp
,
808 uint64_t firstobj
, uint64_t numslots
)
810 struct drr_object_range
*drror
=
811 &(dscp
->dsc_drr
->drr_u
.drr_object_range
);
813 /* we only use this record type for raw sends */
814 ASSERT(BP_IS_PROTECTED(bp
));
815 ASSERT(dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
);
816 ASSERT3U(BP_GET_COMPRESS(bp
), ==, ZIO_COMPRESS_OFF
);
817 ASSERT3U(BP_GET_TYPE(bp
), ==, DMU_OT_DNODE
);
818 ASSERT0(BP_GET_LEVEL(bp
));
820 if (dscp
->dsc_pending_op
!= PENDING_NONE
) {
821 if (dump_record(dscp
, NULL
, 0) != 0)
822 return (SET_ERROR(EINTR
));
823 dscp
->dsc_pending_op
= PENDING_NONE
;
826 memset(dscp
->dsc_drr
, 0, sizeof (dmu_replay_record_t
));
827 dscp
->dsc_drr
->drr_type
= DRR_OBJECT_RANGE
;
828 drror
->drr_firstobj
= firstobj
;
829 drror
->drr_numslots
= numslots
;
830 drror
->drr_toguid
= dscp
->dsc_toguid
;
831 if (BP_SHOULD_BYTESWAP(bp
))
832 drror
->drr_flags
|= DRR_RAW_BYTESWAP
;
833 zio_crypt_decode_params_bp(bp
, drror
->drr_salt
, drror
->drr_iv
);
834 zio_crypt_decode_mac_bp(bp
, drror
->drr_mac
);
836 if (dump_record(dscp
, NULL
, 0) != 0)
837 return (SET_ERROR(EINTR
));
842 send_do_embed(const blkptr_t
*bp
, uint64_t featureflags
)
844 if (!BP_IS_EMBEDDED(bp
))
848 * Compression function must be legacy, or explicitly enabled.
850 if ((BP_GET_COMPRESS(bp
) >= ZIO_COMPRESS_LEGACY_FUNCTIONS
&&
851 !(featureflags
& DMU_BACKUP_FEATURE_LZ4
)))
855 * If we have not set the ZSTD feature flag, we can't send ZSTD
856 * compressed embedded blocks, as the receiver may not support them.
858 if ((BP_GET_COMPRESS(bp
) == ZIO_COMPRESS_ZSTD
&&
859 !(featureflags
& DMU_BACKUP_FEATURE_ZSTD
)))
863 * Embed type must be explicitly enabled.
865 switch (BPE_GET_ETYPE(bp
)) {
866 case BP_EMBEDDED_TYPE_DATA
:
867 if (featureflags
& DMU_BACKUP_FEATURE_EMBED_DATA
)
877 * This function actually handles figuring out what kind of record needs to be
878 * dumped, and calling the appropriate helper function. In most cases,
879 * the data has already been read by send_reader_thread().
882 do_dump(dmu_send_cookie_t
*dscp
, struct send_range
*range
)
885 switch (range
->type
) {
887 err
= dump_dnode(dscp
, &range
->sru
.object
.bp
, range
->object
,
888 range
->sru
.object
.dnp
);
889 /* Dump piggybacked unmodified spill block */
890 if (!err
&& range
->sru
.object
.spill_range
)
891 err
= do_dump(dscp
, range
->sru
.object
.spill_range
);
894 ASSERT3U(range
->start_blkid
+ 1, ==, range
->end_blkid
);
895 if (!(dscp
->dsc_featureflags
& DMU_BACKUP_FEATURE_RAW
)) {
898 uint64_t epb
= BP_GET_LSIZE(&range
->sru
.object_range
.bp
) >>
900 uint64_t firstobj
= range
->start_blkid
* epb
;
901 err
= dump_object_range(dscp
, &range
->sru
.object_range
.bp
,
906 struct srr
*srrp
= &range
->sru
.redact
;
907 err
= dump_redact(dscp
, range
->object
, range
->start_blkid
*
908 srrp
->datablksz
, (range
->end_blkid
- range
->start_blkid
) *
913 struct srd
*srdp
= &range
->sru
.data
;
914 blkptr_t
*bp
= &srdp
->bp
;
916 dmu_objset_spa(dscp
->dsc_os
);
918 ASSERT3U(srdp
->datablksz
, ==, BP_GET_LSIZE(bp
));
919 ASSERT3U(range
->start_blkid
+ 1, ==, range
->end_blkid
);
921 if (send_do_embed(bp
, dscp
->dsc_featureflags
)) {
922 err
= dump_write_embedded(dscp
, range
->object
,
923 range
->start_blkid
* srdp
->datablksz
,
924 srdp
->datablksz
, bp
);
927 ASSERT(range
->object
> dscp
->dsc_resume_object
||
928 (range
->object
== dscp
->dsc_resume_object
&&
929 (range
->start_blkid
== DMU_SPILL_BLKID
||
930 range
->start_blkid
* srdp
->datablksz
>=
931 dscp
->dsc_resume_offset
)));
932 /* it's a level-0 block of a regular object */
934 mutex_enter(&srdp
->lock
);
935 while (srdp
->io_outstanding
)
936 cv_wait(&srdp
->cv
, &srdp
->lock
);
938 mutex_exit(&srdp
->lock
);
941 if (zfs_send_corrupt_data
&&
942 !dscp
->dsc_dso
->dso_dryrun
) {
944 * Send a block filled with 0x"zfs badd bloc"
946 srdp
->abuf
= arc_alloc_buf(spa
, &srdp
->abuf
,
947 ARC_BUFC_DATA
, srdp
->datablksz
);
949 for (ptr
= srdp
->abuf
->b_data
;
950 (char *)ptr
< (char *)srdp
->abuf
->b_data
+
951 srdp
->datablksz
; ptr
++)
952 *ptr
= 0x2f5baddb10cULL
;
954 return (SET_ERROR(EIO
));
958 ASSERT(dscp
->dsc_dso
->dso_dryrun
||
959 srdp
->abuf
!= NULL
|| srdp
->abd
!= NULL
);
962 if (srdp
->abd
!= NULL
) {
963 data
= abd_to_buf(srdp
->abd
);
964 ASSERT3P(srdp
->abuf
, ==, NULL
);
965 } else if (srdp
->abuf
!= NULL
) {
966 data
= srdp
->abuf
->b_data
;
969 if (BP_GET_TYPE(bp
) == DMU_OT_SA
) {
970 ASSERT3U(range
->start_blkid
, ==, DMU_SPILL_BLKID
);
971 err
= dump_spill(dscp
, bp
, range
->object
, data
);
975 uint64_t offset
= range
->start_blkid
* srdp
->datablksz
;
978 * If we have large blocks stored on disk but the send flags
979 * don't allow us to send large blocks, we split the data from
980 * the arc buf into chunks.
982 if (srdp
->datablksz
> SPA_OLD_MAXBLOCKSIZE
&&
983 !(dscp
->dsc_featureflags
&
984 DMU_BACKUP_FEATURE_LARGE_BLOCKS
)) {
985 while (srdp
->datablksz
> 0 && err
== 0) {
986 int n
= MIN(srdp
->datablksz
,
987 SPA_OLD_MAXBLOCKSIZE
);
988 err
= dmu_dump_write(dscp
, srdp
->obj_type
,
989 range
->object
, offset
, n
, n
, NULL
, B_FALSE
,
993 * When doing dry run, data==NULL is used as a
995 * dmu_dump_write()->dump_record().
999 srdp
->datablksz
-= n
;
1002 err
= dmu_dump_write(dscp
, srdp
->obj_type
,
1003 range
->object
, offset
,
1004 srdp
->datablksz
, srdp
->datasz
, bp
,
1005 srdp
->io_compressed
, data
);
1010 struct srh
*srhp
= &range
->sru
.hole
;
1011 if (range
->object
== DMU_META_DNODE_OBJECT
) {
1012 uint32_t span
= srhp
->datablksz
>> DNODE_SHIFT
;
1013 uint64_t first_obj
= range
->start_blkid
* span
;
1014 uint64_t numobj
= range
->end_blkid
* span
- first_obj
;
1015 return (dump_freeobjects(dscp
, first_obj
, numobj
));
1017 uint64_t offset
= 0;
1020 * If this multiply overflows, we don't need to send this block.
1021 * Even if it has a birth time, it can never not be a hole, so
1022 * we don't need to send records for it.
1024 if (!overflow_multiply(range
->start_blkid
, srhp
->datablksz
,
1030 if (!overflow_multiply(range
->end_blkid
, srhp
->datablksz
, &len
))
1033 return (dump_free(dscp
, range
->object
, offset
, len
));
1036 panic("Invalid range type in do_dump: %d", range
->type
);
1041 static struct send_range
*
1042 range_alloc(enum type type
, uint64_t object
, uint64_t start_blkid
,
1043 uint64_t end_blkid
, boolean_t eos
)
1045 struct send_range
*range
= kmem_alloc(sizeof (*range
), KM_SLEEP
);
1047 range
->object
= object
;
1048 range
->start_blkid
= start_blkid
;
1049 range
->end_blkid
= end_blkid
;
1050 range
->eos_marker
= eos
;
1052 range
->sru
.data
.abd
= NULL
;
1053 range
->sru
.data
.abuf
= NULL
;
1054 mutex_init(&range
->sru
.data
.lock
, NULL
, MUTEX_DEFAULT
, NULL
);
1055 cv_init(&range
->sru
.data
.cv
, NULL
, CV_DEFAULT
, NULL
);
1056 range
->sru
.data
.io_outstanding
= 0;
1057 range
->sru
.data
.io_err
= 0;
1058 range
->sru
.data
.io_compressed
= B_FALSE
;
1059 } else if (type
== OBJECT
) {
1060 range
->sru
.object
.spill_range
= NULL
;
1066 * This is the callback function to traverse_dataset that acts as a worker
1067 * thread for dmu_send_impl.
1070 send_cb(spa_t
*spa
, zilog_t
*zilog
, const blkptr_t
*bp
,
1071 const zbookmark_phys_t
*zb
, const struct dnode_phys
*dnp
, void *arg
)
1074 struct send_thread_arg
*sta
= arg
;
1075 struct send_range
*record
;
1077 ASSERT(zb
->zb_object
== DMU_META_DNODE_OBJECT
||
1078 zb
->zb_object
>= sta
->resume
.zb_object
);
1081 * All bps of an encrypted os should have the encryption bit set.
1082 * If this is not true it indicates tampering and we report an error.
1084 if (sta
->os
->os_encrypted
&&
1085 !BP_IS_HOLE(bp
) && !BP_USES_CRYPT(bp
)) {
1086 spa_log_error(spa
, zb
, BP_GET_LOGICAL_BIRTH(bp
));
1087 return (SET_ERROR(EIO
));
1091 return (SET_ERROR(EINTR
));
1092 if (zb
->zb_object
!= DMU_META_DNODE_OBJECT
&&
1093 DMU_OBJECT_IS_SPECIAL(zb
->zb_object
))
1095 atomic_inc_64(sta
->num_blocks_visited
);
1097 if (zb
->zb_level
== ZB_DNODE_LEVEL
) {
1098 if (zb
->zb_object
== DMU_META_DNODE_OBJECT
)
1100 record
= range_alloc(OBJECT
, zb
->zb_object
, 0, 0, B_FALSE
);
1101 record
->sru
.object
.bp
= *bp
;
1102 size_t size
= sizeof (*dnp
) * (dnp
->dn_extra_slots
+ 1);
1103 record
->sru
.object
.dnp
= kmem_alloc(size
, KM_SLEEP
);
1104 memcpy(record
->sru
.object
.dnp
, dnp
, size
);
1105 bqueue_enqueue(&sta
->q
, record
, sizeof (*record
));
1108 if (zb
->zb_level
== 0 && zb
->zb_object
== DMU_META_DNODE_OBJECT
&&
1110 record
= range_alloc(OBJECT_RANGE
, 0, zb
->zb_blkid
,
1111 zb
->zb_blkid
+ 1, B_FALSE
);
1112 record
->sru
.object_range
.bp
= *bp
;
1113 bqueue_enqueue(&sta
->q
, record
, sizeof (*record
));
1116 if (zb
->zb_level
< 0 || (zb
->zb_level
> 0 && !BP_IS_HOLE(bp
)))
1118 if (zb
->zb_object
== DMU_META_DNODE_OBJECT
&& !BP_IS_HOLE(bp
))
1121 uint64_t span
= bp_span_in_blocks(dnp
->dn_indblkshift
, zb
->zb_level
);
1125 * If this multiply overflows, we don't need to send this block.
1126 * Even if it has a birth time, it can never not be a hole, so
1127 * we don't need to send records for it.
1129 if (!overflow_multiply(span
, zb
->zb_blkid
, &start
) || (!(zb
->zb_blkid
==
1130 DMU_SPILL_BLKID
|| DMU_OT_IS_METADATA(dnp
->dn_type
)) &&
1131 span
* zb
->zb_blkid
> dnp
->dn_maxblkid
)) {
1132 ASSERT(BP_IS_HOLE(bp
));
1136 if (zb
->zb_blkid
== DMU_SPILL_BLKID
)
1137 ASSERT3U(BP_GET_TYPE(bp
), ==, DMU_OT_SA
);
1139 enum type record_type
= DATA
;
1142 else if (BP_IS_REDACTED(bp
))
1143 record_type
= REDACT
;
1147 record
= range_alloc(record_type
, zb
->zb_object
, start
,
1148 (start
+ span
< start
? 0 : start
+ span
), B_FALSE
);
1150 uint64_t datablksz
= (zb
->zb_blkid
== DMU_SPILL_BLKID
?
1151 BP_GET_LSIZE(bp
) : dnp
->dn_datablkszsec
<< SPA_MINBLOCKSHIFT
);
1153 if (BP_IS_HOLE(bp
)) {
1154 record
->sru
.hole
.datablksz
= datablksz
;
1155 } else if (BP_IS_REDACTED(bp
)) {
1156 record
->sru
.redact
.datablksz
= datablksz
;
1158 record
->sru
.data
.datablksz
= datablksz
;
1159 record
->sru
.data
.obj_type
= dnp
->dn_type
;
1160 record
->sru
.data
.bp
= *bp
;
1163 bqueue_enqueue(&sta
->q
, record
, sizeof (*record
));
1167 struct redact_list_cb_arg
{
1168 uint64_t *num_blocks_visited
;
1171 boolean_t mark_redact
;
1175 redact_list_cb(redact_block_phys_t
*rb
, void *arg
)
1177 struct redact_list_cb_arg
*rlcap
= arg
;
1179 atomic_inc_64(rlcap
->num_blocks_visited
);
1183 struct send_range
*data
= range_alloc(REDACT
, rb
->rbp_object
,
1184 rb
->rbp_blkid
, rb
->rbp_blkid
+ redact_block_get_count(rb
), B_FALSE
);
1185 ASSERT3U(data
->end_blkid
, >, rb
->rbp_blkid
);
1186 if (rlcap
->mark_redact
) {
1187 data
->type
= REDACT
;
1188 data
->sru
.redact
.datablksz
= redact_block_get_size(rb
);
1190 data
->type
= PREVIOUSLY_REDACTED
;
1192 bqueue_enqueue(rlcap
->q
, data
, sizeof (*data
));
1198 * This function kicks off the traverse_dataset. It also handles setting the
1199 * error code of the thread in case something goes wrong, and pushes the End of
1200 * Stream record when the traverse_dataset call has finished.
1202 static __attribute__((noreturn
)) void
1203 send_traverse_thread(void *arg
)
1205 struct send_thread_arg
*st_arg
= arg
;
1207 struct send_range
*data
;
1208 fstrans_cookie_t cookie
= spl_fstrans_mark();
1210 err
= traverse_dataset_resume(st_arg
->os
->os_dsl_dataset
,
1211 st_arg
->fromtxg
, &st_arg
->resume
,
1212 st_arg
->flags
, send_cb
, st_arg
);
1215 st_arg
->error_code
= err
;
1216 data
= range_alloc(DATA
, 0, 0, 0, B_TRUE
);
1217 bqueue_enqueue_flush(&st_arg
->q
, data
, sizeof (*data
));
1218 spl_fstrans_unmark(cookie
);
1223 * Utility function that causes End of Stream records to compare after of all
1224 * others, so that other threads' comparison logic can stay simple.
1226 static int __attribute__((unused
))
1227 send_range_after(const struct send_range
*from
, const struct send_range
*to
)
1229 if (from
->eos_marker
== B_TRUE
)
1231 if (to
->eos_marker
== B_TRUE
)
1234 uint64_t from_obj
= from
->object
;
1235 uint64_t from_end_obj
= from
->object
+ 1;
1236 uint64_t to_obj
= to
->object
;
1237 uint64_t to_end_obj
= to
->object
+ 1;
1238 if (from_obj
== 0) {
1239 ASSERT(from
->type
== HOLE
|| from
->type
== OBJECT_RANGE
);
1240 from_obj
= from
->start_blkid
<< DNODES_PER_BLOCK_SHIFT
;
1241 from_end_obj
= from
->end_blkid
<< DNODES_PER_BLOCK_SHIFT
;
1244 ASSERT(to
->type
== HOLE
|| to
->type
== OBJECT_RANGE
);
1245 to_obj
= to
->start_blkid
<< DNODES_PER_BLOCK_SHIFT
;
1246 to_end_obj
= to
->end_blkid
<< DNODES_PER_BLOCK_SHIFT
;
1249 if (from_end_obj
<= to_obj
)
1251 if (from_obj
>= to_end_obj
)
1253 int64_t cmp
= TREE_CMP(to
->type
== OBJECT_RANGE
, from
->type
==
1257 cmp
= TREE_CMP(to
->type
== OBJECT
, from
->type
== OBJECT
);
1260 if (from
->end_blkid
<= to
->start_blkid
)
1262 if (from
->start_blkid
>= to
->end_blkid
)
1268 * Pop the new data off the queue, check that the records we receive are in
1269 * the right order, but do not free the old data. This is used so that the
1270 * records can be sent on to the main thread without copying the data.
1272 static struct send_range
*
1273 get_next_range_nofree(bqueue_t
*bq
, struct send_range
*prev
)
1275 struct send_range
*next
= bqueue_dequeue(bq
);
1276 ASSERT3S(send_range_after(prev
, next
), ==, -1);
1281 * Pop the new data off the queue, check that the records we receive are in
1282 * the right order, and free the old data.
1284 static struct send_range
*
1285 get_next_range(bqueue_t
*bq
, struct send_range
*prev
)
1287 struct send_range
*next
= get_next_range_nofree(bq
, prev
);
1292 static __attribute__((noreturn
)) void
1293 redact_list_thread(void *arg
)
1295 struct redact_list_thread_arg
*rlt_arg
= arg
;
1296 struct send_range
*record
;
1297 fstrans_cookie_t cookie
= spl_fstrans_mark();
1298 if (rlt_arg
->rl
!= NULL
) {
1299 struct redact_list_cb_arg rlcba
= {0};
1300 rlcba
.cancel
= &rlt_arg
->cancel
;
1301 rlcba
.q
= &rlt_arg
->q
;
1302 rlcba
.num_blocks_visited
= rlt_arg
->num_blocks_visited
;
1303 rlcba
.mark_redact
= rlt_arg
->mark_redact
;
1304 int err
= dsl_redaction_list_traverse(rlt_arg
->rl
,
1305 &rlt_arg
->resume
, redact_list_cb
, &rlcba
);
1307 rlt_arg
->error_code
= err
;
1309 record
= range_alloc(DATA
, 0, 0, 0, B_TRUE
);
1310 bqueue_enqueue_flush(&rlt_arg
->q
, record
, sizeof (*record
));
1311 spl_fstrans_unmark(cookie
);
1317 * Compare the start point of the two provided ranges. End of stream ranges
1318 * compare last, objects compare before any data or hole inside that object and
1319 * multi-object holes that start at the same object.
1322 send_range_start_compare(struct send_range
*r1
, struct send_range
*r2
)
1324 uint64_t r1_objequiv
= r1
->object
;
1325 uint64_t r1_l0equiv
= r1
->start_blkid
;
1326 uint64_t r2_objequiv
= r2
->object
;
1327 uint64_t r2_l0equiv
= r2
->start_blkid
;
1328 int64_t cmp
= TREE_CMP(r1
->eos_marker
, r2
->eos_marker
);
1331 if (r1
->object
== 0) {
1332 r1_objequiv
= r1
->start_blkid
* DNODES_PER_BLOCK
;
1335 if (r2
->object
== 0) {
1336 r2_objequiv
= r2
->start_blkid
* DNODES_PER_BLOCK
;
1340 cmp
= TREE_CMP(r1_objequiv
, r2_objequiv
);
1343 cmp
= TREE_CMP(r2
->type
== OBJECT_RANGE
, r1
->type
== OBJECT_RANGE
);
1346 cmp
= TREE_CMP(r2
->type
== OBJECT
, r1
->type
== OBJECT
);
1350 return (TREE_CMP(r1_l0equiv
, r2_l0equiv
));
1361 * This function returns the next range the send_merge_thread should operate on.
1362 * The inputs are two arrays; the first one stores the range at the front of the
1363 * queues stored in the second one. The ranges are sorted in descending
1364 * priority order; the metadata from earlier ranges overrules metadata from
1365 * later ranges. out_mask is used to return which threads the ranges came from;
1366 * bit i is set if ranges[i] started at the same place as the returned range.
1368 * This code is not hardcoded to compare a specific number of threads; it could
1369 * be used with any number, just by changing the q_idx enum.
1371 * The "next range" is the one with the earliest start; if two starts are equal,
1372 * the highest-priority range is the next to operate on. If a higher-priority
1373 * range starts in the middle of the first range, then the first range will be
1374 * truncated to end where the higher-priority range starts, and we will operate
1375 * on that one next time. In this way, we make sure that each block covered by
1376 * some range gets covered by a returned range, and each block covered is
1377 * returned using the metadata of the highest-priority range it appears in.
1379 * For example, if the three ranges at the front of the queues were [2,4),
1380 * [3,5), and [1,3), then the ranges returned would be [1,2) with the metadata
1381 * from the third range, [2,4) with the metadata from the first range, and then
1382 * [4,5) with the metadata from the second.
1384 static struct send_range
*
1385 find_next_range(struct send_range
**ranges
, bqueue_t
**qs
, uint64_t *out_mask
)
1387 int idx
= 0; // index of the range with the earliest start
1390 for (i
= 1; i
< NUM_THREADS
; i
++) {
1391 if (send_range_start_compare(ranges
[i
], ranges
[idx
]) < 0)
1394 if (ranges
[idx
]->eos_marker
) {
1395 struct send_range
*ret
= range_alloc(DATA
, 0, 0, 0, B_TRUE
);
1400 * Find all the ranges that start at that same point.
1402 for (i
= 0; i
< NUM_THREADS
; i
++) {
1403 if (send_range_start_compare(ranges
[i
], ranges
[idx
]) == 0)
1408 * OBJECT_RANGE records only come from the TO thread, and should always
1409 * be treated as overlapping with nothing and sent on immediately. They
1410 * are only used in raw sends, and are never redacted.
1412 if (ranges
[idx
]->type
== OBJECT_RANGE
) {
1413 ASSERT3U(idx
, ==, TO_IDX
);
1414 ASSERT3U(*out_mask
, ==, 1 << TO_IDX
);
1415 struct send_range
*ret
= ranges
[idx
];
1416 ranges
[idx
] = get_next_range_nofree(qs
[idx
], ranges
[idx
]);
1420 * Find the first start or end point after the start of the first range.
1422 uint64_t first_change
= ranges
[idx
]->end_blkid
;
1423 for (i
= 0; i
< NUM_THREADS
; i
++) {
1424 if (i
== idx
|| ranges
[i
]->eos_marker
||
1425 ranges
[i
]->object
> ranges
[idx
]->object
||
1426 ranges
[i
]->object
== DMU_META_DNODE_OBJECT
)
1428 ASSERT3U(ranges
[i
]->object
, ==, ranges
[idx
]->object
);
1429 if (first_change
> ranges
[i
]->start_blkid
&&
1430 (bmask
& (1 << i
)) == 0)
1431 first_change
= ranges
[i
]->start_blkid
;
1432 else if (first_change
> ranges
[i
]->end_blkid
)
1433 first_change
= ranges
[i
]->end_blkid
;
1436 * Update all ranges to no longer overlap with the range we're
1437 * returning. All such ranges must start at the same place as the range
1438 * being returned, and end at or after first_change. Thus we update
1439 * their start to first_change. If that makes them size 0, then free
1440 * them and pull a new range from that thread.
1442 for (i
= 0; i
< NUM_THREADS
; i
++) {
1443 if (i
== idx
|| (bmask
& (1 << i
)) == 0)
1445 ASSERT3U(first_change
, >, ranges
[i
]->start_blkid
);
1446 ranges
[i
]->start_blkid
= first_change
;
1447 ASSERT3U(ranges
[i
]->start_blkid
, <=, ranges
[i
]->end_blkid
);
1448 if (ranges
[i
]->start_blkid
== ranges
[i
]->end_blkid
)
1449 ranges
[i
] = get_next_range(qs
[i
], ranges
[i
]);
1452 * Short-circuit the simple case; if the range doesn't overlap with
1453 * anything else, or it only overlaps with things that start at the same
1454 * place and are longer, send it on.
1456 if (first_change
== ranges
[idx
]->end_blkid
) {
1457 struct send_range
*ret
= ranges
[idx
];
1458 ranges
[idx
] = get_next_range_nofree(qs
[idx
], ranges
[idx
]);
1463 * Otherwise, return a truncated copy of ranges[idx] and move the start
1464 * of ranges[idx] back to first_change.
1466 struct send_range
*ret
= kmem_alloc(sizeof (*ret
), KM_SLEEP
);
1467 *ret
= *ranges
[idx
];
1468 ret
->end_blkid
= first_change
;
1469 ranges
[idx
]->start_blkid
= first_change
;
1473 #define FROM_AND_REDACT_BITS ((1 << REDACT_IDX) | (1 << FROM_IDX))
1476 * Merge the results from the from thread and the to thread, and then hand the
1477 * records off to send_prefetch_thread to prefetch them. If this is not a
1478 * send from a redaction bookmark, the from thread will push an end of stream
1479 * record and stop, and we'll just send everything that was changed in the
1480 * to_ds since the ancestor's creation txg. If it is, then since
1481 * traverse_dataset has a canonical order, we can compare each change as
1482 * they're pulled off the queues. That will give us a stream that is
1483 * appropriately sorted, and covers all records. In addition, we pull the
1484 * data from the redact_list_thread and use that to determine which blocks
1485 * should be redacted.
1487 static __attribute__((noreturn
)) void
1488 send_merge_thread(void *arg
)
1490 struct send_merge_thread_arg
*smt_arg
= arg
;
1491 struct send_range
*front_ranges
[NUM_THREADS
];
1492 bqueue_t
*queues
[NUM_THREADS
];
1494 fstrans_cookie_t cookie
= spl_fstrans_mark();
1496 if (smt_arg
->redact_arg
== NULL
) {
1497 front_ranges
[REDACT_IDX
] =
1498 kmem_zalloc(sizeof (struct send_range
), KM_SLEEP
);
1499 front_ranges
[REDACT_IDX
]->eos_marker
= B_TRUE
;
1500 front_ranges
[REDACT_IDX
]->type
= REDACT
;
1501 queues
[REDACT_IDX
] = NULL
;
1503 front_ranges
[REDACT_IDX
] =
1504 bqueue_dequeue(&smt_arg
->redact_arg
->q
);
1505 queues
[REDACT_IDX
] = &smt_arg
->redact_arg
->q
;
1507 front_ranges
[TO_IDX
] = bqueue_dequeue(&smt_arg
->to_arg
->q
);
1508 queues
[TO_IDX
] = &smt_arg
->to_arg
->q
;
1509 front_ranges
[FROM_IDX
] = bqueue_dequeue(&smt_arg
->from_arg
->q
);
1510 queues
[FROM_IDX
] = &smt_arg
->from_arg
->q
;
1512 struct send_range
*range
;
1513 for (range
= find_next_range(front_ranges
, queues
, &mask
);
1514 !range
->eos_marker
&& err
== 0 && !smt_arg
->cancel
;
1515 range
= find_next_range(front_ranges
, queues
, &mask
)) {
1517 * If the range in question was in both the from redact bookmark
1518 * and the bookmark we're using to redact, then don't send it.
1519 * It's already redacted on the receiving system, so a redaction
1520 * record would be redundant.
1522 if ((mask
& FROM_AND_REDACT_BITS
) == FROM_AND_REDACT_BITS
) {
1523 ASSERT3U(range
->type
, ==, REDACT
);
1527 bqueue_enqueue(&smt_arg
->q
, range
, sizeof (*range
));
1529 if (smt_arg
->to_arg
->error_code
!= 0) {
1530 err
= smt_arg
->to_arg
->error_code
;
1531 } else if (smt_arg
->from_arg
->error_code
!= 0) {
1532 err
= smt_arg
->from_arg
->error_code
;
1533 } else if (smt_arg
->redact_arg
!= NULL
&&
1534 smt_arg
->redact_arg
->error_code
!= 0) {
1535 err
= smt_arg
->redact_arg
->error_code
;
1538 if (smt_arg
->cancel
&& err
== 0)
1539 err
= SET_ERROR(EINTR
);
1540 smt_arg
->error
= err
;
1541 if (smt_arg
->error
!= 0) {
1542 smt_arg
->to_arg
->cancel
= B_TRUE
;
1543 smt_arg
->from_arg
->cancel
= B_TRUE
;
1544 if (smt_arg
->redact_arg
!= NULL
)
1545 smt_arg
->redact_arg
->cancel
= B_TRUE
;
1547 for (int i
= 0; i
< NUM_THREADS
; i
++) {
1548 while (!front_ranges
[i
]->eos_marker
) {
1549 front_ranges
[i
] = get_next_range(queues
[i
],
1552 range_free(front_ranges
[i
]);
1554 range
->eos_marker
= B_TRUE
;
1555 bqueue_enqueue_flush(&smt_arg
->q
, range
, 1);
1556 spl_fstrans_unmark(cookie
);
1560 struct send_reader_thread_arg
{
1561 struct send_merge_thread_arg
*smta
;
1564 boolean_t issue_reads
;
1565 uint64_t featureflags
;
1570 dmu_send_read_done(zio_t
*zio
)
1572 struct send_range
*range
= zio
->io_private
;
1574 mutex_enter(&range
->sru
.data
.lock
);
1575 if (zio
->io_error
!= 0) {
1576 abd_free(range
->sru
.data
.abd
);
1577 range
->sru
.data
.abd
= NULL
;
1578 range
->sru
.data
.io_err
= zio
->io_error
;
1581 ASSERT(range
->sru
.data
.io_outstanding
);
1582 range
->sru
.data
.io_outstanding
= B_FALSE
;
1583 cv_broadcast(&range
->sru
.data
.cv
);
1584 mutex_exit(&range
->sru
.data
.lock
);
1588 issue_data_read(struct send_reader_thread_arg
*srta
, struct send_range
*range
)
1590 struct srd
*srdp
= &range
->sru
.data
;
1591 blkptr_t
*bp
= &srdp
->bp
;
1592 objset_t
*os
= srta
->smta
->os
;
1594 ASSERT3U(range
->type
, ==, DATA
);
1595 ASSERT3U(range
->start_blkid
+ 1, ==, range
->end_blkid
);
1597 * If we have large blocks stored on disk but
1598 * the send flags don't allow us to send large
1599 * blocks, we split the data from the arc buf
1602 boolean_t split_large_blocks
=
1603 srdp
->datablksz
> SPA_OLD_MAXBLOCKSIZE
&&
1604 !(srta
->featureflags
& DMU_BACKUP_FEATURE_LARGE_BLOCKS
);
1606 * We should only request compressed data from the ARC if all
1607 * the following are true:
1608 * - stream compression was requested
1609 * - we aren't splitting large blocks into smaller chunks
1610 * - the data won't need to be byteswapped before sending
1611 * - this isn't an embedded block
1612 * - this isn't metadata (if receiving on a different endian
1613 * system it can be byteswapped more easily)
1615 boolean_t request_compressed
=
1616 (srta
->featureflags
& DMU_BACKUP_FEATURE_COMPRESSED
) &&
1617 !split_large_blocks
&& !BP_SHOULD_BYTESWAP(bp
) &&
1618 !BP_IS_EMBEDDED(bp
) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp
));
1620 zio_flag_t zioflags
= ZIO_FLAG_CANFAIL
;
1622 if (srta
->featureflags
& DMU_BACKUP_FEATURE_RAW
) {
1623 zioflags
|= ZIO_FLAG_RAW
;
1624 srdp
->io_compressed
= B_TRUE
;
1625 } else if (request_compressed
) {
1626 zioflags
|= ZIO_FLAG_RAW_COMPRESS
;
1627 srdp
->io_compressed
= B_TRUE
;
1630 srdp
->datasz
= (zioflags
& ZIO_FLAG_RAW_COMPRESS
) ?
1631 BP_GET_PSIZE(bp
) : BP_GET_LSIZE(bp
);
1633 if (!srta
->issue_reads
)
1635 if (BP_IS_REDACTED(bp
))
1637 if (send_do_embed(bp
, srta
->featureflags
))
1640 zbookmark_phys_t zb
= {
1641 .zb_objset
= dmu_objset_id(os
),
1642 .zb_object
= range
->object
,
1644 .zb_blkid
= range
->start_blkid
,
1647 arc_flags_t aflags
= ARC_FLAG_CACHED_ONLY
;
1649 int arc_err
= arc_read(NULL
, os
->os_spa
, bp
,
1650 arc_getbuf_func
, &srdp
->abuf
, ZIO_PRIORITY_ASYNC_READ
,
1651 zioflags
, &aflags
, &zb
);
1653 * If the data is not already cached in the ARC, we read directly
1654 * from zio. This avoids the performance overhead of adding a new
1655 * entry to the ARC, and we also avoid polluting the ARC cache with
1656 * data that is not likely to be used in the future.
1659 srdp
->abd
= abd_alloc_linear(srdp
->datasz
, B_FALSE
);
1660 srdp
->io_outstanding
= B_TRUE
;
1661 zio_nowait(zio_read(NULL
, os
->os_spa
, bp
, srdp
->abd
,
1662 srdp
->datasz
, dmu_send_read_done
, range
,
1663 ZIO_PRIORITY_ASYNC_READ
, zioflags
, &zb
));
1668 * Create a new record with the given values.
1671 enqueue_range(struct send_reader_thread_arg
*srta
, bqueue_t
*q
, dnode_t
*dn
,
1672 uint64_t blkid
, uint64_t count
, const blkptr_t
*bp
, uint32_t datablksz
)
1674 enum type range_type
= (bp
== NULL
|| BP_IS_HOLE(bp
) ? HOLE
:
1675 (BP_IS_REDACTED(bp
) ? REDACT
: DATA
));
1677 struct send_range
*range
= range_alloc(range_type
, dn
->dn_object
,
1678 blkid
, blkid
+ count
, B_FALSE
);
1680 if (blkid
== DMU_SPILL_BLKID
) {
1681 ASSERT3P(bp
, !=, NULL
);
1682 ASSERT3U(BP_GET_TYPE(bp
), ==, DMU_OT_SA
);
1685 switch (range_type
) {
1687 range
->sru
.hole
.datablksz
= datablksz
;
1690 ASSERT3U(count
, ==, 1);
1691 range
->sru
.data
.datablksz
= datablksz
;
1692 range
->sru
.data
.obj_type
= dn
->dn_type
;
1693 range
->sru
.data
.bp
= *bp
;
1694 issue_data_read(srta
, range
);
1697 range
->sru
.redact
.datablksz
= datablksz
;
1702 bqueue_enqueue(q
, range
, datablksz
);
1706 * Send DRR_SPILL records for unmodified spill blocks. This is useful
1707 * because changing certain attributes of the object (e.g. blocksize)
1708 * can cause old versions of ZFS to incorrectly remove a spill block.
1709 * Including these records in the stream forces an up to date version
1710 * to always be written ensuring they're never lost. Current versions
1711 * of the code which understand the DRR_FLAG_SPILL_BLOCK feature can
1712 * ignore these unmodified spill blocks.
1714 * We piggyback the spill_range to dnode range instead of enqueueing it
1715 * so send_range_after won't complain.
1718 piggyback_unmodified_spill(struct send_reader_thread_arg
*srta
,
1719 struct send_range
*range
)
1721 ASSERT3U(range
->type
, ==, OBJECT
);
1723 dnode_phys_t
*dnp
= range
->sru
.object
.dnp
;
1724 uint64_t fromtxg
= srta
->smta
->to_arg
->fromtxg
;
1726 if (!zfs_send_unmodified_spill_blocks
||
1727 !(dnp
->dn_flags
& DNODE_FLAG_SPILL_BLKPTR
) ||
1728 !(BP_GET_LOGICAL_BIRTH(DN_SPILL_BLKPTR(dnp
)) <= fromtxg
))
1731 blkptr_t
*bp
= DN_SPILL_BLKPTR(dnp
);
1732 struct send_range
*spill_range
= range_alloc(DATA
, range
->object
,
1733 DMU_SPILL_BLKID
, DMU_SPILL_BLKID
+1, B_FALSE
);
1734 spill_range
->sru
.data
.bp
= *bp
;
1735 spill_range
->sru
.data
.obj_type
= dnp
->dn_type
;
1736 spill_range
->sru
.data
.datablksz
= BP_GET_LSIZE(bp
);
1738 issue_data_read(srta
, spill_range
);
1739 range
->sru
.object
.spill_range
= spill_range
;
1741 return (BP_GET_LSIZE(bp
));
1745 * This thread is responsible for two things: First, it retrieves the correct
1746 * blkptr in the to ds if we need to send the data because of something from
1747 * the from thread. As a result of this, we're the first ones to discover that
1748 * some indirect blocks can be discarded because they're not holes. Second,
1749 * it issues prefetches for the data we need to send.
1751 static __attribute__((noreturn
)) void
1752 send_reader_thread(void *arg
)
1754 struct send_reader_thread_arg
*srta
= arg
;
1755 struct send_merge_thread_arg
*smta
= srta
->smta
;
1756 bqueue_t
*inq
= &smta
->q
;
1757 bqueue_t
*outq
= &srta
->q
;
1758 objset_t
*os
= smta
->os
;
1759 fstrans_cookie_t cookie
= spl_fstrans_mark();
1760 struct send_range
*range
= bqueue_dequeue(inq
);
1764 * If the record we're analyzing is from a redaction bookmark from the
1765 * fromds, then we need to know whether or not it exists in the tods so
1766 * we know whether to create records for it or not. If it does, we need
1767 * the datablksz so we can generate an appropriate record for it.
1768 * Finally, if it isn't redacted, we need the blkptr so that we can send
1769 * a WRITE record containing the actual data.
1771 uint64_t last_obj
= UINT64_MAX
;
1772 uint64_t last_obj_exists
= B_TRUE
;
1773 while (!range
->eos_marker
&& !srta
->cancel
&& smta
->error
== 0 &&
1776 switch (range
->type
) {
1778 issue_data_read(srta
, range
);
1779 bqueue_enqueue(outq
, range
, range
->sru
.data
.datablksz
);
1780 range
= get_next_range_nofree(inq
, range
);
1783 spill
= piggyback_unmodified_spill(srta
, range
);
1787 case REDACT
: // Redacted blocks must exist
1788 bqueue_enqueue(outq
, range
, sizeof (*range
) + spill
);
1789 range
= get_next_range_nofree(inq
, range
);
1791 case PREVIOUSLY_REDACTED
: {
1793 * This entry came from the "from bookmark" when
1794 * sending from a bookmark that has a redaction
1795 * list. We need to check if this object/blkid
1796 * exists in the target ("to") dataset, and if
1797 * not then we drop this entry. We also need
1798 * to fill in the block pointer so that we know
1801 * To accomplish the above, we first cache whether or
1802 * not the last object we examined exists. If it
1803 * doesn't, we can drop this record. If it does, we hold
1804 * the dnode and use it to call dbuf_dnode_findbp. We do
1805 * this instead of dbuf_bookmark_findbp because we will
1806 * often operate on large ranges, and holding the dnode
1807 * once is more efficient.
1809 boolean_t object_exists
= B_TRUE
;
1811 * If the data is redacted, we only care if it exists,
1812 * so that we don't send records for objects that have
1816 if (range
->object
== last_obj
&& !last_obj_exists
) {
1818 * If we're still examining the same object as
1819 * previously, and it doesn't exist, we don't
1820 * need to call dbuf_bookmark_findbp.
1822 object_exists
= B_FALSE
;
1824 err
= dnode_hold(os
, range
->object
, FTAG
, &dn
);
1825 if (err
== ENOENT
) {
1826 object_exists
= B_FALSE
;
1829 last_obj
= range
->object
;
1830 last_obj_exists
= object_exists
;
1835 } else if (!object_exists
) {
1837 * The block was modified, but doesn't
1838 * exist in the to dataset; if it was
1839 * deleted in the to dataset, then we'll
1840 * visit the hole bp for it at some point.
1842 range
= get_next_range(inq
, range
);
1846 MIN(dn
->dn_maxblkid
, range
->end_blkid
);
1848 * The object exists, so we need to try to find the
1849 * blkptr for each block in the range we're processing.
1851 rw_enter(&dn
->dn_struct_rwlock
, RW_READER
);
1852 for (uint64_t blkid
= range
->start_blkid
;
1853 blkid
< file_max
; blkid
++) {
1855 uint32_t datablksz
=
1856 dn
->dn_phys
->dn_datablkszsec
<<
1858 uint64_t offset
= blkid
* datablksz
;
1860 * This call finds the next non-hole block in
1861 * the object. This is to prevent a
1862 * performance problem where we're unredacting
1863 * a large hole. Using dnode_next_offset to
1864 * skip over the large hole avoids iterating
1865 * over every block in it.
1867 err
= dnode_next_offset(dn
, DNODE_FIND_HAVELOCK
,
1870 offset
= UINT64_MAX
;
1872 } else if (err
!= 0) {
1875 if (offset
!= blkid
* datablksz
) {
1877 * if there is a hole from here
1880 offset
= MIN(offset
, file_max
*
1882 uint64_t nblks
= (offset
/ datablksz
) -
1884 enqueue_range(srta
, outq
, dn
, blkid
,
1885 nblks
, NULL
, datablksz
);
1888 if (blkid
>= file_max
)
1890 err
= dbuf_dnode_findbp(dn
, 0, blkid
, &bp
,
1894 ASSERT(!BP_IS_HOLE(&bp
));
1895 enqueue_range(srta
, outq
, dn
, blkid
, 1, &bp
,
1898 rw_exit(&dn
->dn_struct_rwlock
);
1899 dnode_rele(dn
, FTAG
);
1900 range
= get_next_range(inq
, range
);
1904 if (srta
->cancel
|| err
!= 0) {
1905 smta
->cancel
= B_TRUE
;
1907 } else if (smta
->error
!= 0) {
1908 srta
->error
= smta
->error
;
1910 while (!range
->eos_marker
)
1911 range
= get_next_range(inq
, range
);
1913 bqueue_enqueue_flush(outq
, range
, 1);
1914 spl_fstrans_unmark(cookie
);
1918 #define NUM_SNAPS_NOT_REDACTED UINT64_MAX
1920 struct dmu_send_params
{
1922 const void *tag
; // Tag dp was held with, will be used to release dp.
1924 /* To snapshot args */
1926 dsl_dataset_t
*to_ds
;
1927 /* From snapshot args */
1928 zfs_bookmark_phys_t ancestor_zb
;
1929 uint64_t *fromredactsnaps
;
1930 /* NUM_SNAPS_NOT_REDACTED if not sending from redaction bookmark */
1931 uint64_t numfromredactsnaps
;
1935 boolean_t large_block_ok
;
1936 boolean_t compressok
;
1941 uint64_t saved_guid
;
1942 zfs_bookmark_phys_t
*redactbook
;
1943 /* Stream output params */
1944 dmu_send_outparams_t
*dso
;
1946 /* Stream progress params */
1949 char saved_toname
[MAXNAMELEN
];
1953 setup_featureflags(struct dmu_send_params
*dspp
, objset_t
*os
,
1954 uint64_t *featureflags
)
1956 dsl_dataset_t
*to_ds
= dspp
->to_ds
;
1957 dsl_pool_t
*dp
= dspp
->dp
;
1959 if (dmu_objset_type(os
) == DMU_OST_ZFS
) {
1961 if (zfs_get_zplprop(os
, ZFS_PROP_VERSION
, &version
) != 0)
1962 return (SET_ERROR(EINVAL
));
1964 if (version
>= ZPL_VERSION_SA
)
1965 *featureflags
|= DMU_BACKUP_FEATURE_SA_SPILL
;
1968 /* raw sends imply large_block_ok */
1969 if ((dspp
->rawok
|| dspp
->large_block_ok
) &&
1970 dsl_dataset_feature_is_active(to_ds
, SPA_FEATURE_LARGE_BLOCKS
)) {
1971 *featureflags
|= DMU_BACKUP_FEATURE_LARGE_BLOCKS
;
1974 /* encrypted datasets will not have embedded blocks */
1975 if ((dspp
->embedok
|| dspp
->rawok
) && !os
->os_encrypted
&&
1976 spa_feature_is_active(dp
->dp_spa
, SPA_FEATURE_EMBEDDED_DATA
)) {
1977 *featureflags
|= DMU_BACKUP_FEATURE_EMBED_DATA
;
1980 /* raw send implies compressok */
1981 if (dspp
->compressok
|| dspp
->rawok
)
1982 *featureflags
|= DMU_BACKUP_FEATURE_COMPRESSED
;
1984 if (dspp
->rawok
&& os
->os_encrypted
)
1985 *featureflags
|= DMU_BACKUP_FEATURE_RAW
;
1987 if ((*featureflags
&
1988 (DMU_BACKUP_FEATURE_EMBED_DATA
| DMU_BACKUP_FEATURE_COMPRESSED
|
1989 DMU_BACKUP_FEATURE_RAW
)) != 0 &&
1990 spa_feature_is_active(dp
->dp_spa
, SPA_FEATURE_LZ4_COMPRESS
)) {
1991 *featureflags
|= DMU_BACKUP_FEATURE_LZ4
;
1995 * We specifically do not include DMU_BACKUP_FEATURE_EMBED_DATA here to
1996 * allow sending ZSTD compressed datasets to a receiver that does not
1999 if ((*featureflags
&
2000 (DMU_BACKUP_FEATURE_COMPRESSED
| DMU_BACKUP_FEATURE_RAW
)) != 0 &&
2001 dsl_dataset_feature_is_active(to_ds
, SPA_FEATURE_ZSTD_COMPRESS
)) {
2002 *featureflags
|= DMU_BACKUP_FEATURE_ZSTD
;
2005 if (dspp
->resumeobj
!= 0 || dspp
->resumeoff
!= 0) {
2006 *featureflags
|= DMU_BACKUP_FEATURE_RESUMING
;
2009 if (dspp
->redactbook
!= NULL
) {
2010 *featureflags
|= DMU_BACKUP_FEATURE_REDACTED
;
2013 if (dsl_dataset_feature_is_active(to_ds
, SPA_FEATURE_LARGE_DNODE
)) {
2014 *featureflags
|= DMU_BACKUP_FEATURE_LARGE_DNODE
;
2017 if (dsl_dataset_feature_is_active(to_ds
, SPA_FEATURE_LONGNAME
)) {
2018 *featureflags
|= DMU_BACKUP_FEATURE_LONGNAME
;
2021 if (dsl_dataset_feature_is_active(to_ds
, SPA_FEATURE_LARGE_MICROZAP
)) {
2023 * We must never split a large microzap block, so we can only
2024 * send large microzaps if LARGE_BLOCKS is already enabled.
2026 if (!(*featureflags
& DMU_BACKUP_FEATURE_LARGE_BLOCKS
))
2027 return (SET_ERROR(ZFS_ERR_STREAM_LARGE_MICROZAP
));
2028 *featureflags
|= DMU_BACKUP_FEATURE_LARGE_MICROZAP
;
2034 static dmu_replay_record_t
*
2035 create_begin_record(struct dmu_send_params
*dspp
, objset_t
*os
,
2036 uint64_t featureflags
)
2038 dmu_replay_record_t
*drr
= kmem_zalloc(sizeof (dmu_replay_record_t
),
2040 drr
->drr_type
= DRR_BEGIN
;
2042 struct drr_begin
*drrb
= &drr
->drr_u
.drr_begin
;
2043 dsl_dataset_t
*to_ds
= dspp
->to_ds
;
2045 drrb
->drr_magic
= DMU_BACKUP_MAGIC
;
2046 drrb
->drr_creation_time
= dsl_dataset_phys(to_ds
)->ds_creation_time
;
2047 drrb
->drr_type
= dmu_objset_type(os
);
2048 drrb
->drr_toguid
= dsl_dataset_phys(to_ds
)->ds_guid
;
2049 drrb
->drr_fromguid
= dspp
->ancestor_zb
.zbm_guid
;
2051 DMU_SET_STREAM_HDRTYPE(drrb
->drr_versioninfo
, DMU_SUBSTREAM
);
2052 DMU_SET_FEATUREFLAGS(drrb
->drr_versioninfo
, featureflags
);
2055 drrb
->drr_flags
|= DRR_FLAG_CLONE
;
2056 if (dsl_dataset_phys(dspp
->to_ds
)->ds_flags
& DS_FLAG_CI_DATASET
)
2057 drrb
->drr_flags
|= DRR_FLAG_CI_DATA
;
2058 if (zfs_send_set_freerecords_bit
)
2059 drrb
->drr_flags
|= DRR_FLAG_FREERECORDS
;
2060 drr
->drr_u
.drr_begin
.drr_flags
|= DRR_FLAG_SPILL_BLOCK
;
2062 if (dspp
->savedok
) {
2063 drrb
->drr_toguid
= dspp
->saved_guid
;
2064 strlcpy(drrb
->drr_toname
, dspp
->saved_toname
,
2065 sizeof (drrb
->drr_toname
));
2067 dsl_dataset_name(to_ds
, drrb
->drr_toname
);
2068 if (!to_ds
->ds_is_snapshot
) {
2069 (void) strlcat(drrb
->drr_toname
, "@--head--",
2070 sizeof (drrb
->drr_toname
));
2077 setup_to_thread(struct send_thread_arg
*to_arg
, objset_t
*to_os
,
2078 dmu_sendstatus_t
*dssp
, uint64_t fromtxg
, boolean_t rawok
)
2080 VERIFY0(bqueue_init(&to_arg
->q
, zfs_send_no_prefetch_queue_ff
,
2081 MAX(zfs_send_no_prefetch_queue_length
, 2 * zfs_max_recordsize
),
2082 offsetof(struct send_range
, ln
)));
2083 to_arg
->error_code
= 0;
2084 to_arg
->cancel
= B_FALSE
;
2086 to_arg
->fromtxg
= fromtxg
;
2087 to_arg
->flags
= TRAVERSE_PRE
| TRAVERSE_PREFETCH_METADATA
;
2089 to_arg
->flags
|= TRAVERSE_NO_DECRYPT
;
2090 if (zfs_send_corrupt_data
)
2091 to_arg
->flags
|= TRAVERSE_HARD
;
2092 to_arg
->num_blocks_visited
= &dssp
->dss_blocks
;
2093 (void) thread_create(NULL
, 0, send_traverse_thread
, to_arg
, 0,
2094 curproc
, TS_RUN
, minclsyspri
);
2098 setup_from_thread(struct redact_list_thread_arg
*from_arg
,
2099 redaction_list_t
*from_rl
, dmu_sendstatus_t
*dssp
)
2101 VERIFY0(bqueue_init(&from_arg
->q
, zfs_send_no_prefetch_queue_ff
,
2102 MAX(zfs_send_no_prefetch_queue_length
, 2 * zfs_max_recordsize
),
2103 offsetof(struct send_range
, ln
)));
2104 from_arg
->error_code
= 0;
2105 from_arg
->cancel
= B_FALSE
;
2106 from_arg
->rl
= from_rl
;
2107 from_arg
->mark_redact
= B_FALSE
;
2108 from_arg
->num_blocks_visited
= &dssp
->dss_blocks
;
2110 * If from_ds is null, send_traverse_thread just returns success and
2111 * enqueues an eos marker.
2113 (void) thread_create(NULL
, 0, redact_list_thread
, from_arg
, 0,
2114 curproc
, TS_RUN
, minclsyspri
);
2118 setup_redact_list_thread(struct redact_list_thread_arg
*rlt_arg
,
2119 struct dmu_send_params
*dspp
, redaction_list_t
*rl
, dmu_sendstatus_t
*dssp
)
2121 if (dspp
->redactbook
== NULL
)
2124 rlt_arg
->cancel
= B_FALSE
;
2125 VERIFY0(bqueue_init(&rlt_arg
->q
, zfs_send_no_prefetch_queue_ff
,
2126 MAX(zfs_send_no_prefetch_queue_length
, 2 * zfs_max_recordsize
),
2127 offsetof(struct send_range
, ln
)));
2128 rlt_arg
->error_code
= 0;
2129 rlt_arg
->mark_redact
= B_TRUE
;
2131 rlt_arg
->num_blocks_visited
= &dssp
->dss_blocks
;
2133 (void) thread_create(NULL
, 0, redact_list_thread
, rlt_arg
, 0,
2134 curproc
, TS_RUN
, minclsyspri
);
2138 setup_merge_thread(struct send_merge_thread_arg
*smt_arg
,
2139 struct dmu_send_params
*dspp
, struct redact_list_thread_arg
*from_arg
,
2140 struct send_thread_arg
*to_arg
, struct redact_list_thread_arg
*rlt_arg
,
2143 VERIFY0(bqueue_init(&smt_arg
->q
, zfs_send_no_prefetch_queue_ff
,
2144 MAX(zfs_send_no_prefetch_queue_length
, 2 * zfs_max_recordsize
),
2145 offsetof(struct send_range
, ln
)));
2146 smt_arg
->cancel
= B_FALSE
;
2148 smt_arg
->from_arg
= from_arg
;
2149 smt_arg
->to_arg
= to_arg
;
2150 if (dspp
->redactbook
!= NULL
)
2151 smt_arg
->redact_arg
= rlt_arg
;
2154 (void) thread_create(NULL
, 0, send_merge_thread
, smt_arg
, 0, curproc
,
2155 TS_RUN
, minclsyspri
);
2159 setup_reader_thread(struct send_reader_thread_arg
*srt_arg
,
2160 struct dmu_send_params
*dspp
, struct send_merge_thread_arg
*smt_arg
,
2161 uint64_t featureflags
)
2163 VERIFY0(bqueue_init(&srt_arg
->q
, zfs_send_queue_ff
,
2164 MAX(zfs_send_queue_length
, 2 * zfs_max_recordsize
),
2165 offsetof(struct send_range
, ln
)));
2166 srt_arg
->smta
= smt_arg
;
2167 srt_arg
->issue_reads
= !dspp
->dso
->dso_dryrun
;
2168 srt_arg
->featureflags
= featureflags
;
2169 (void) thread_create(NULL
, 0, send_reader_thread
, srt_arg
, 0,
2170 curproc
, TS_RUN
, minclsyspri
);
2174 setup_resume_points(struct dmu_send_params
*dspp
,
2175 struct send_thread_arg
*to_arg
, struct redact_list_thread_arg
*from_arg
,
2176 struct redact_list_thread_arg
*rlt_arg
,
2177 struct send_merge_thread_arg
*smt_arg
, boolean_t resuming
, objset_t
*os
,
2178 redaction_list_t
*redact_rl
, nvlist_t
*nvl
)
2181 dsl_dataset_t
*to_ds
= dspp
->to_ds
;
2187 obj
= dspp
->resumeobj
;
2188 dmu_object_info_t to_doi
;
2189 err
= dmu_object_info(os
, obj
, &to_doi
);
2193 blkid
= dspp
->resumeoff
/ to_doi
.doi_data_block_size
;
2196 * If we're resuming a redacted send, we can skip to the appropriate
2197 * point in the redaction bookmark by binary searching through it.
2199 if (redact_rl
!= NULL
) {
2200 SET_BOOKMARK(&rlt_arg
->resume
, to_ds
->ds_object
, obj
, 0, blkid
);
2203 SET_BOOKMARK(&to_arg
->resume
, to_ds
->ds_object
, obj
, 0, blkid
);
2204 if (nvlist_exists(nvl
, BEGINNV_REDACT_FROM_SNAPS
)) {
2205 uint64_t objset
= dspp
->ancestor_zb
.zbm_redaction_obj
;
2207 * Note: If the resume point is in an object whose
2208 * blocksize is different in the from vs to snapshots,
2209 * we will have divided by the "wrong" blocksize.
2210 * However, in this case fromsnap's send_cb() will
2211 * detect that the blocksize has changed and therefore
2212 * ignore this object.
2214 * If we're resuming a send from a redaction bookmark,
2215 * we still cannot accidentally suggest blocks behind
2216 * the to_ds. In addition, we know that any blocks in
2217 * the object in the to_ds will have to be sent, since
2218 * the size changed. Therefore, we can't cause any harm
2221 SET_BOOKMARK(&from_arg
->resume
, objset
, obj
, 0, blkid
);
2224 fnvlist_add_uint64(nvl
, BEGINNV_RESUME_OBJECT
, dspp
->resumeobj
);
2225 fnvlist_add_uint64(nvl
, BEGINNV_RESUME_OFFSET
, dspp
->resumeoff
);
2230 static dmu_sendstatus_t
*
2231 setup_send_progress(struct dmu_send_params
*dspp
)
2233 dmu_sendstatus_t
*dssp
= kmem_zalloc(sizeof (*dssp
), KM_SLEEP
);
2234 dssp
->dss_outfd
= dspp
->outfd
;
2235 dssp
->dss_off
= dspp
->off
;
2236 dssp
->dss_proc
= curproc
;
2237 mutex_enter(&dspp
->to_ds
->ds_sendstream_lock
);
2238 list_insert_head(&dspp
->to_ds
->ds_sendstreams
, dssp
);
2239 mutex_exit(&dspp
->to_ds
->ds_sendstream_lock
);
2244 * Actually do the bulk of the work in a zfs send.
2246 * The idea is that we want to do a send from ancestor_zb to to_ds. We also
2247 * want to not send any data that has been modified by all the datasets in
2248 * redactsnaparr, and store the list of blocks that are redacted in this way in
2249 * a bookmark named redactbook, created on the to_ds. We do this by creating
2250 * several worker threads, whose function is described below.
2252 * There are three cases.
2253 * The first case is a redacted zfs send. In this case there are 5 threads.
2254 * The first thread is the to_ds traversal thread: it calls dataset_traverse on
2255 * the to_ds and finds all the blocks that have changed since ancestor_zb (if
2256 * it's a full send, that's all blocks in the dataset). It then sends those
2257 * blocks on to the send merge thread. The redact list thread takes the data
2258 * from the redaction bookmark and sends those blocks on to the send merge
2259 * thread. The send merge thread takes the data from the to_ds traversal
2260 * thread, and combines it with the redaction records from the redact list
2261 * thread. If a block appears in both the to_ds's data and the redaction data,
2262 * the send merge thread will mark it as redacted and send it on to the prefetch
2263 * thread. Otherwise, the send merge thread will send the block on to the
2264 * prefetch thread unchanged. The prefetch thread will issue prefetch reads for
2265 * any data that isn't redacted, and then send the data on to the main thread.
2266 * The main thread behaves the same as in a normal send case, issuing demand
2267 * reads for data blocks and sending out records over the network
2269 * The graphic below diagrams the flow of data in the case of a redacted zfs
2270 * send. Each box represents a thread, and each line represents the flow of
2273 * Records from the |
2274 * redaction bookmark |
2275 * +--------------------+ | +---------------------------+
2276 * | | v | Send Merge Thread |
2277 * | Redact List Thread +----------> Apply redaction marks to |
2278 * | | | records as specified by |
2279 * +--------------------+ | redaction ranges |
2280 * +----^---------------+------+
2283 * | +------------v--------+
2284 * | | Prefetch Thread |
2285 * +--------------------+ | | Issues prefetch |
2286 * | to_ds Traversal | | | reads of data blocks|
2287 * | Thread (finds +---------------+ +------------+--------+
2288 * | candidate blocks) | Blocks modified | Prefetched data
2289 * +--------------------+ by to_ds since |
2290 * ancestor_zb +------------v----+
2291 * | Main Thread | File Descriptor
2292 * | Sends data over +->(to zfs receive)
2294 * +-----------------+
2296 * The second case is an incremental send from a redaction bookmark. The to_ds
2297 * traversal thread and the main thread behave the same as in the redacted
2298 * send case. The new thread is the from bookmark traversal thread. It
2299 * iterates over the redaction list in the redaction bookmark, and enqueues
2300 * records for each block that was redacted in the original send. The send
2301 * merge thread now has to merge the data from the two threads. For details
2302 * about that process, see the header comment of send_merge_thread(). Any data
2303 * it decides to send on will be prefetched by the prefetch thread. Note that
2304 * you can perform a redacted send from a redaction bookmark; in that case,
2305 * the data flow behaves very similarly to the flow in the redacted send case,
2306 * except with the addition of the bookmark traversal thread iterating over the
2307 * redaction bookmark. The send_merge_thread also has to take on the
2308 * responsibility of merging the redact list thread's records, the bookmark
2309 * traversal thread's records, and the to_ds records.
2311 * +---------------------+
2313 * | Redact List Thread +--------------+
2315 * +---------------------+ |
2316 * Blocks in redaction list | Ranges modified by every secure snap
2317 * of from bookmark | (or EOS if not readcted)
2319 * +---------------------+ | +----v----------------------+
2320 * | bookmark Traversal | v | Send Merge Thread |
2321 * | Thread (finds +---------> Merges bookmark, rlt, and |
2322 * | candidate blocks) | | to_ds send records |
2323 * +---------------------+ +----^---------------+------+
2325 * | +------------v--------+
2326 * | | Prefetch Thread |
2327 * +--------------------+ | | Issues prefetch |
2328 * | to_ds Traversal | | | reads of data blocks|
2329 * | Thread (finds +---------------+ +------------+--------+
2330 * | candidate blocks) | Blocks modified | Prefetched data
2331 * +--------------------+ by to_ds since +------------v----+
2332 * ancestor_zb | Main Thread | File Descriptor
2333 * | Sends data over +->(to zfs receive)
2335 * +-----------------+
2337 * The final case is a simple zfs full or incremental send. The to_ds traversal
2338 * thread behaves the same as always. The redact list thread is never started.
2339 * The send merge thread takes all the blocks that the to_ds traversal thread
2340 * sends it, prefetches the data, and sends the blocks on to the main thread.
2341 * The main thread sends the data over the wire.
2343 * To keep performance acceptable, we want to prefetch the data in the worker
2344 * threads. While the to_ds thread could simply use the TRAVERSE_PREFETCH
2345 * feature built into traverse_dataset, the combining and deletion of records
2346 * due to redaction and sends from redaction bookmarks mean that we could
2347 * issue many unnecessary prefetches. As a result, we only prefetch data
2348 * after we've determined that the record is not going to be redacted. To
2349 * prevent the prefetching from getting too far ahead of the main thread, the
2350 * blocking queues that are used for communication are capped not by the
2351 * number of entries in the queue, but by the sum of the size of the
2352 * prefetches associated with them. The limit on the amount of data that the
2353 * thread can prefetch beyond what the main thread has reached is controlled
2354 * by the global variable zfs_send_queue_length. In addition, to prevent poor
2355 * performance in the beginning of a send, we also limit the distance ahead
2356 * that the traversal threads can be. That distance is controlled by the
2357 * zfs_send_no_prefetch_queue_length tunable.
2359 * Note: Releases dp using the specified tag.
2362 dmu_send_impl(struct dmu_send_params
*dspp
)
2365 dmu_replay_record_t
*drr
;
2366 dmu_sendstatus_t
*dssp
;
2367 dmu_send_cookie_t dsc
= {0};
2369 uint64_t fromtxg
= dspp
->ancestor_zb
.zbm_creation_txg
;
2370 uint64_t featureflags
= 0;
2371 struct redact_list_thread_arg
*from_arg
;
2372 struct send_thread_arg
*to_arg
;
2373 struct redact_list_thread_arg
*rlt_arg
;
2374 struct send_merge_thread_arg
*smt_arg
;
2375 struct send_reader_thread_arg
*srt_arg
;
2376 struct send_range
*range
;
2377 redaction_list_t
*from_rl
= NULL
;
2378 redaction_list_t
*redact_rl
= NULL
;
2379 boolean_t resuming
= (dspp
->resumeobj
!= 0 || dspp
->resumeoff
!= 0);
2380 boolean_t book_resuming
= resuming
;
2382 dsl_dataset_t
*to_ds
= dspp
->to_ds
;
2383 zfs_bookmark_phys_t
*ancestor_zb
= &dspp
->ancestor_zb
;
2384 dsl_pool_t
*dp
= dspp
->dp
;
2385 const void *tag
= dspp
->tag
;
2387 err
= dmu_objset_from_ds(to_ds
, &os
);
2389 dsl_pool_rele(dp
, tag
);
2394 * If this is a non-raw send of an encrypted ds, we can ensure that
2395 * the objset_phys_t is authenticated. This is safe because this is
2396 * either a snapshot or we have owned the dataset, ensuring that
2397 * it can't be modified.
2399 if (!dspp
->rawok
&& os
->os_encrypted
&&
2400 arc_is_unauthenticated(os
->os_phys_buf
)) {
2401 zbookmark_phys_t zb
;
2403 SET_BOOKMARK(&zb
, to_ds
->ds_object
, ZB_ROOT_OBJECT
,
2404 ZB_ROOT_LEVEL
, ZB_ROOT_BLKID
);
2405 err
= arc_untransform(os
->os_phys_buf
, os
->os_spa
,
2408 dsl_pool_rele(dp
, tag
);
2412 ASSERT0(arc_is_unauthenticated(os
->os_phys_buf
));
2415 if ((err
= setup_featureflags(dspp
, os
, &featureflags
)) != 0) {
2416 dsl_pool_rele(dp
, tag
);
2421 * If we're doing a redacted send, hold the bookmark's redaction list.
2423 if (dspp
->redactbook
!= NULL
) {
2424 err
= dsl_redaction_list_hold_obj(dp
,
2425 dspp
->redactbook
->zbm_redaction_obj
, FTAG
,
2428 dsl_pool_rele(dp
, tag
);
2429 return (SET_ERROR(EINVAL
));
2431 dsl_redaction_list_long_hold(dp
, redact_rl
, FTAG
);
2435 * If we're sending from a redaction bookmark, hold the redaction list
2436 * so that we can consider sending the redacted blocks.
2438 if (ancestor_zb
->zbm_redaction_obj
!= 0) {
2439 err
= dsl_redaction_list_hold_obj(dp
,
2440 ancestor_zb
->zbm_redaction_obj
, FTAG
, &from_rl
);
2442 if (redact_rl
!= NULL
) {
2443 dsl_redaction_list_long_rele(redact_rl
, FTAG
);
2444 dsl_redaction_list_rele(redact_rl
, FTAG
);
2446 dsl_pool_rele(dp
, tag
);
2447 return (SET_ERROR(EINVAL
));
2449 dsl_redaction_list_long_hold(dp
, from_rl
, FTAG
);
2452 dsl_dataset_long_hold(to_ds
, FTAG
);
2454 from_arg
= kmem_zalloc(sizeof (*from_arg
), KM_SLEEP
);
2455 to_arg
= kmem_zalloc(sizeof (*to_arg
), KM_SLEEP
);
2456 rlt_arg
= kmem_zalloc(sizeof (*rlt_arg
), KM_SLEEP
);
2457 smt_arg
= kmem_zalloc(sizeof (*smt_arg
), KM_SLEEP
);
2458 srt_arg
= kmem_zalloc(sizeof (*srt_arg
), KM_SLEEP
);
2460 drr
= create_begin_record(dspp
, os
, featureflags
);
2461 dssp
= setup_send_progress(dspp
);
2464 dsc
.dsc_dso
= dspp
->dso
;
2466 dsc
.dsc_off
= dspp
->off
;
2467 dsc
.dsc_toguid
= dsl_dataset_phys(to_ds
)->ds_guid
;
2468 dsc
.dsc_fromtxg
= fromtxg
;
2469 dsc
.dsc_pending_op
= PENDING_NONE
;
2470 dsc
.dsc_featureflags
= featureflags
;
2471 dsc
.dsc_resume_object
= dspp
->resumeobj
;
2472 dsc
.dsc_resume_offset
= dspp
->resumeoff
;
2474 dsl_pool_rele(dp
, tag
);
2476 void *payload
= NULL
;
2477 size_t payload_len
= 0;
2478 nvlist_t
*nvl
= fnvlist_alloc();
2481 * If we're doing a redacted send, we include the snapshots we're
2482 * redacted with respect to so that the target system knows what send
2483 * streams can be correctly received on top of this dataset. If we're
2484 * instead sending a redacted dataset, we include the snapshots that the
2485 * dataset was created with respect to.
2487 if (dspp
->redactbook
!= NULL
) {
2488 fnvlist_add_uint64_array(nvl
, BEGINNV_REDACT_SNAPS
,
2489 redact_rl
->rl_phys
->rlp_snaps
,
2490 redact_rl
->rl_phys
->rlp_num_snaps
);
2491 } else if (dsl_dataset_feature_is_active(to_ds
,
2492 SPA_FEATURE_REDACTED_DATASETS
)) {
2493 uint64_t *tods_guids
;
2495 VERIFY(dsl_dataset_get_uint64_array_feature(to_ds
,
2496 SPA_FEATURE_REDACTED_DATASETS
, &length
, &tods_guids
));
2497 fnvlist_add_uint64_array(nvl
, BEGINNV_REDACT_SNAPS
, tods_guids
,
2502 * If we're sending from a redaction bookmark, then we should retrieve
2503 * the guids of that bookmark so we can send them over the wire.
2505 if (from_rl
!= NULL
) {
2506 fnvlist_add_uint64_array(nvl
, BEGINNV_REDACT_FROM_SNAPS
,
2507 from_rl
->rl_phys
->rlp_snaps
,
2508 from_rl
->rl_phys
->rlp_num_snaps
);
2512 * If the snapshot we're sending from is redacted, include the redaction
2513 * list in the stream.
2515 if (dspp
->numfromredactsnaps
!= NUM_SNAPS_NOT_REDACTED
) {
2516 ASSERT3P(from_rl
, ==, NULL
);
2517 fnvlist_add_uint64_array(nvl
, BEGINNV_REDACT_FROM_SNAPS
,
2518 dspp
->fromredactsnaps
, (uint_t
)dspp
->numfromredactsnaps
);
2519 if (dspp
->numfromredactsnaps
> 0) {
2520 kmem_free(dspp
->fromredactsnaps
,
2521 dspp
->numfromredactsnaps
* sizeof (uint64_t));
2522 dspp
->fromredactsnaps
= NULL
;
2526 if (resuming
|| book_resuming
) {
2527 err
= setup_resume_points(dspp
, to_arg
, from_arg
,
2528 rlt_arg
, smt_arg
, resuming
, os
, redact_rl
, nvl
);
2533 if (featureflags
& DMU_BACKUP_FEATURE_RAW
) {
2534 uint64_t ivset_guid
= ancestor_zb
->zbm_ivset_guid
;
2535 nvlist_t
*keynvl
= NULL
;
2536 ASSERT(os
->os_encrypted
);
2538 err
= dsl_crypto_populate_key_nvlist(os
, ivset_guid
,
2545 fnvlist_add_nvlist(nvl
, "crypt_keydata", keynvl
);
2546 fnvlist_free(keynvl
);
2549 if (!nvlist_empty(nvl
)) {
2550 payload
= fnvlist_pack(nvl
, &payload_len
);
2551 drr
->drr_payloadlen
= payload_len
;
2555 err
= dump_record(&dsc
, payload
, payload_len
);
2556 fnvlist_pack_free(payload
, payload_len
);
2562 setup_to_thread(to_arg
, os
, dssp
, fromtxg
, dspp
->rawok
);
2563 setup_from_thread(from_arg
, from_rl
, dssp
);
2564 setup_redact_list_thread(rlt_arg
, dspp
, redact_rl
, dssp
);
2565 setup_merge_thread(smt_arg
, dspp
, from_arg
, to_arg
, rlt_arg
, os
);
2566 setup_reader_thread(srt_arg
, dspp
, smt_arg
, featureflags
);
2568 range
= bqueue_dequeue(&srt_arg
->q
);
2569 while (err
== 0 && !range
->eos_marker
) {
2570 err
= do_dump(&dsc
, range
);
2571 range
= get_next_range(&srt_arg
->q
, range
);
2573 err
= SET_ERROR(EINTR
);
2577 * If we hit an error or are interrupted, cancel our worker threads and
2578 * clear the queue of any pending records. The threads will pass the
2579 * cancel up the tree of worker threads, and each one will clean up any
2580 * pending records before exiting.
2583 srt_arg
->cancel
= B_TRUE
;
2584 while (!range
->eos_marker
) {
2585 range
= get_next_range(&srt_arg
->q
, range
);
2590 bqueue_destroy(&srt_arg
->q
);
2591 bqueue_destroy(&smt_arg
->q
);
2592 if (dspp
->redactbook
!= NULL
)
2593 bqueue_destroy(&rlt_arg
->q
);
2594 bqueue_destroy(&to_arg
->q
);
2595 bqueue_destroy(&from_arg
->q
);
2597 if (err
== 0 && srt_arg
->error
!= 0)
2598 err
= srt_arg
->error
;
2603 if (dsc
.dsc_pending_op
!= PENDING_NONE
)
2604 if (dump_record(&dsc
, NULL
, 0) != 0)
2605 err
= SET_ERROR(EINTR
);
2608 if (err
== EINTR
&& dsc
.dsc_err
!= 0)
2614 * Send the DRR_END record if this is not a saved stream.
2615 * Otherwise, the omitted DRR_END record will signal to
2616 * the receive side that the stream is incomplete.
2618 if (!dspp
->savedok
) {
2619 memset(drr
, 0, sizeof (dmu_replay_record_t
));
2620 drr
->drr_type
= DRR_END
;
2621 drr
->drr_u
.drr_end
.drr_checksum
= dsc
.dsc_zc
;
2622 drr
->drr_u
.drr_end
.drr_toguid
= dsc
.dsc_toguid
;
2624 if (dump_record(&dsc
, NULL
, 0) != 0)
2628 mutex_enter(&to_ds
->ds_sendstream_lock
);
2629 list_remove(&to_ds
->ds_sendstreams
, dssp
);
2630 mutex_exit(&to_ds
->ds_sendstream_lock
);
2632 VERIFY(err
!= 0 || (dsc
.dsc_sent_begin
&&
2633 (dsc
.dsc_sent_end
|| dspp
->savedok
)));
2635 kmem_free(drr
, sizeof (dmu_replay_record_t
));
2636 kmem_free(dssp
, sizeof (dmu_sendstatus_t
));
2637 kmem_free(from_arg
, sizeof (*from_arg
));
2638 kmem_free(to_arg
, sizeof (*to_arg
));
2639 kmem_free(rlt_arg
, sizeof (*rlt_arg
));
2640 kmem_free(smt_arg
, sizeof (*smt_arg
));
2641 kmem_free(srt_arg
, sizeof (*srt_arg
));
2643 dsl_dataset_long_rele(to_ds
, FTAG
);
2644 if (from_rl
!= NULL
) {
2645 dsl_redaction_list_long_rele(from_rl
, FTAG
);
2646 dsl_redaction_list_rele(from_rl
, FTAG
);
2648 if (redact_rl
!= NULL
) {
2649 dsl_redaction_list_long_rele(redact_rl
, FTAG
);
2650 dsl_redaction_list_rele(redact_rl
, FTAG
);
2657 dmu_send_obj(const char *pool
, uint64_t tosnap
, uint64_t fromsnap
,
2658 boolean_t embedok
, boolean_t large_block_ok
, boolean_t compressok
,
2659 boolean_t rawok
, boolean_t savedok
, int outfd
, offset_t
*off
,
2660 dmu_send_outparams_t
*dsop
)
2663 dsl_dataset_t
*fromds
;
2664 ds_hold_flags_t dsflags
;
2665 struct dmu_send_params dspp
= {0};
2666 dspp
.embedok
= embedok
;
2667 dspp
.large_block_ok
= large_block_ok
;
2668 dspp
.compressok
= compressok
;
2674 dspp
.savedok
= savedok
;
2676 dsflags
= (rawok
) ? DS_HOLD_FLAG_NONE
: DS_HOLD_FLAG_DECRYPT
;
2677 err
= dsl_pool_hold(pool
, FTAG
, &dspp
.dp
);
2681 err
= dsl_dataset_hold_obj_flags(dspp
.dp
, tosnap
, dsflags
, FTAG
,
2684 dsl_pool_rele(dspp
.dp
, FTAG
);
2688 if (fromsnap
!= 0) {
2689 err
= dsl_dataset_hold_obj_flags(dspp
.dp
, fromsnap
, dsflags
,
2692 dsl_dataset_rele_flags(dspp
.to_ds
, dsflags
, FTAG
);
2693 dsl_pool_rele(dspp
.dp
, FTAG
);
2696 dspp
.ancestor_zb
.zbm_guid
= dsl_dataset_phys(fromds
)->ds_guid
;
2697 dspp
.ancestor_zb
.zbm_creation_txg
=
2698 dsl_dataset_phys(fromds
)->ds_creation_txg
;
2699 dspp
.ancestor_zb
.zbm_creation_time
=
2700 dsl_dataset_phys(fromds
)->ds_creation_time
;
2702 if (dsl_dataset_is_zapified(fromds
)) {
2703 (void) zap_lookup(dspp
.dp
->dp_meta_objset
,
2704 fromds
->ds_object
, DS_FIELD_IVSET_GUID
, 8, 1,
2705 &dspp
.ancestor_zb
.zbm_ivset_guid
);
2708 /* See dmu_send for the reasons behind this. */
2709 uint64_t *fromredact
;
2711 if (!dsl_dataset_get_uint64_array_feature(fromds
,
2712 SPA_FEATURE_REDACTED_DATASETS
,
2713 &dspp
.numfromredactsnaps
,
2715 dspp
.numfromredactsnaps
= NUM_SNAPS_NOT_REDACTED
;
2716 } else if (dspp
.numfromredactsnaps
> 0) {
2717 uint64_t size
= dspp
.numfromredactsnaps
*
2719 dspp
.fromredactsnaps
= kmem_zalloc(size
, KM_SLEEP
);
2720 memcpy(dspp
.fromredactsnaps
, fromredact
, size
);
2723 boolean_t is_before
=
2724 dsl_dataset_is_before(dspp
.to_ds
, fromds
, 0);
2725 dspp
.is_clone
= (dspp
.to_ds
->ds_dir
!=
2727 dsl_dataset_rele(fromds
, FTAG
);
2729 dsl_pool_rele(dspp
.dp
, FTAG
);
2730 err
= SET_ERROR(EXDEV
);
2732 err
= dmu_send_impl(&dspp
);
2735 dspp
.numfromredactsnaps
= NUM_SNAPS_NOT_REDACTED
;
2736 err
= dmu_send_impl(&dspp
);
2738 if (dspp
.fromredactsnaps
)
2739 kmem_free(dspp
.fromredactsnaps
,
2740 dspp
.numfromredactsnaps
* sizeof (uint64_t));
2742 dsl_dataset_rele(dspp
.to_ds
, FTAG
);
2747 dmu_send(const char *tosnap
, const char *fromsnap
, boolean_t embedok
,
2748 boolean_t large_block_ok
, boolean_t compressok
, boolean_t rawok
,
2749 boolean_t savedok
, uint64_t resumeobj
, uint64_t resumeoff
,
2750 const char *redactbook
, int outfd
, offset_t
*off
,
2751 dmu_send_outparams_t
*dsop
)
2754 ds_hold_flags_t dsflags
;
2755 boolean_t owned
= B_FALSE
;
2756 dsl_dataset_t
*fromds
= NULL
;
2757 zfs_bookmark_phys_t book
= {0};
2758 struct dmu_send_params dspp
= {0};
2760 dsflags
= (rawok
) ? DS_HOLD_FLAG_NONE
: DS_HOLD_FLAG_DECRYPT
;
2761 dspp
.tosnap
= tosnap
;
2762 dspp
.embedok
= embedok
;
2763 dspp
.large_block_ok
= large_block_ok
;
2764 dspp
.compressok
= compressok
;
2769 dspp
.resumeobj
= resumeobj
;
2770 dspp
.resumeoff
= resumeoff
;
2772 dspp
.savedok
= savedok
;
2774 if (fromsnap
!= NULL
&& strpbrk(fromsnap
, "@#") == NULL
)
2775 return (SET_ERROR(EINVAL
));
2777 err
= dsl_pool_hold(tosnap
, FTAG
, &dspp
.dp
);
2781 if (strchr(tosnap
, '@') == NULL
&& spa_writeable(dspp
.dp
->dp_spa
)) {
2783 * We are sending a filesystem or volume. Ensure
2784 * that it doesn't change by owning the dataset.
2789 * We are looking for the dataset that represents the
2790 * partially received send stream. If this stream was
2791 * received as a new snapshot of an existing dataset,
2792 * this will be saved in a hidden clone named
2793 * "<pool>/<dataset>/%recv". Otherwise, the stream
2794 * will be saved in the live dataset itself. In
2795 * either case we need to use dsl_dataset_own_force()
2796 * because the stream is marked as inconsistent,
2797 * which would normally make it unavailable to be
2800 char *name
= kmem_asprintf("%s/%s", tosnap
,
2802 err
= dsl_dataset_own_force(dspp
.dp
, name
, dsflags
,
2804 if (err
== ENOENT
) {
2805 err
= dsl_dataset_own_force(dspp
.dp
, tosnap
,
2806 dsflags
, FTAG
, &dspp
.to_ds
);
2811 err
= zap_lookup(dspp
.dp
->dp_meta_objset
,
2812 dspp
.to_ds
->ds_object
,
2813 DS_FIELD_RESUME_TOGUID
, 8, 1,
2818 err
= zap_lookup(dspp
.dp
->dp_meta_objset
,
2819 dspp
.to_ds
->ds_object
,
2820 DS_FIELD_RESUME_TONAME
, 1,
2821 sizeof (dspp
.saved_toname
),
2824 /* Only disown if there was an error in the lookups */
2825 if (owned
&& (err
!= 0))
2826 dsl_dataset_disown(dspp
.to_ds
, dsflags
, FTAG
);
2830 err
= dsl_dataset_own(dspp
.dp
, tosnap
, dsflags
,
2836 err
= dsl_dataset_hold_flags(dspp
.dp
, tosnap
, dsflags
, FTAG
,
2841 /* Note: dsl dataset is not owned at this point */
2842 dsl_pool_rele(dspp
.dp
, FTAG
);
2846 if (redactbook
!= NULL
) {
2847 char path
[ZFS_MAX_DATASET_NAME_LEN
];
2848 (void) strlcpy(path
, tosnap
, sizeof (path
));
2849 char *at
= strchr(path
, '@');
2853 (void) snprintf(at
, sizeof (path
) - (at
- path
), "#%s",
2855 err
= dsl_bookmark_lookup(dspp
.dp
, path
,
2857 dspp
.redactbook
= &book
;
2862 dsl_pool_rele(dspp
.dp
, FTAG
);
2864 dsl_dataset_disown(dspp
.to_ds
, dsflags
, FTAG
);
2866 dsl_dataset_rele_flags(dspp
.to_ds
, dsflags
, FTAG
);
2870 if (fromsnap
!= NULL
) {
2871 zfs_bookmark_phys_t
*zb
= &dspp
.ancestor_zb
;
2873 if (strpbrk(tosnap
, "@#") != NULL
)
2874 fsnamelen
= strpbrk(tosnap
, "@#") - tosnap
;
2876 fsnamelen
= strlen(tosnap
);
2879 * If the fromsnap is in a different filesystem, then
2880 * mark the send stream as a clone.
2882 if (strncmp(tosnap
, fromsnap
, fsnamelen
) != 0 ||
2883 (fromsnap
[fsnamelen
] != '@' &&
2884 fromsnap
[fsnamelen
] != '#')) {
2885 dspp
.is_clone
= B_TRUE
;
2888 if (strchr(fromsnap
, '@') != NULL
) {
2889 err
= dsl_dataset_hold(dspp
.dp
, fromsnap
, FTAG
,
2893 ASSERT3P(fromds
, ==, NULL
);
2896 * We need to make a deep copy of the redact
2897 * snapshots of the from snapshot, because the
2898 * array will be freed when we evict from_ds.
2900 uint64_t *fromredact
;
2901 if (!dsl_dataset_get_uint64_array_feature(
2902 fromds
, SPA_FEATURE_REDACTED_DATASETS
,
2903 &dspp
.numfromredactsnaps
,
2905 dspp
.numfromredactsnaps
=
2906 NUM_SNAPS_NOT_REDACTED
;
2907 } else if (dspp
.numfromredactsnaps
> 0) {
2909 dspp
.numfromredactsnaps
*
2911 dspp
.fromredactsnaps
= kmem_zalloc(size
,
2913 memcpy(dspp
.fromredactsnaps
, fromredact
,
2916 if (!dsl_dataset_is_before(dspp
.to_ds
, fromds
,
2918 err
= SET_ERROR(EXDEV
);
2920 zb
->zbm_creation_txg
=
2921 dsl_dataset_phys(fromds
)->
2923 zb
->zbm_creation_time
=
2924 dsl_dataset_phys(fromds
)->
2927 dsl_dataset_phys(fromds
)->ds_guid
;
2928 zb
->zbm_redaction_obj
= 0;
2930 if (dsl_dataset_is_zapified(fromds
)) {
2932 dspp
.dp
->dp_meta_objset
,
2934 DS_FIELD_IVSET_GUID
, 8, 1,
2935 &zb
->zbm_ivset_guid
);
2938 dsl_dataset_rele(fromds
, FTAG
);
2941 dspp
.numfromredactsnaps
= NUM_SNAPS_NOT_REDACTED
;
2942 err
= dsl_bookmark_lookup(dspp
.dp
, fromsnap
, dspp
.to_ds
,
2944 if (err
== EXDEV
&& zb
->zbm_redaction_obj
!= 0 &&
2946 dsl_dataset_phys(dspp
.to_ds
)->ds_guid
)
2951 /* dmu_send_impl will call dsl_pool_rele for us. */
2952 err
= dmu_send_impl(&dspp
);
2954 if (dspp
.fromredactsnaps
)
2955 kmem_free(dspp
.fromredactsnaps
,
2956 dspp
.numfromredactsnaps
*
2958 dsl_pool_rele(dspp
.dp
, FTAG
);
2961 dspp
.numfromredactsnaps
= NUM_SNAPS_NOT_REDACTED
;
2962 err
= dmu_send_impl(&dspp
);
2965 dsl_dataset_disown(dspp
.to_ds
, dsflags
, FTAG
);
2967 dsl_dataset_rele_flags(dspp
.to_ds
, dsflags
, FTAG
);
2972 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t
*ds
, uint64_t uncompressed
,
2973 uint64_t compressed
, boolean_t stream_compressed
, uint64_t *sizep
)
2978 * Assume that space (both on-disk and in-stream) is dominated by
2979 * data. We will adjust for indirect blocks and the copies property,
2980 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
2983 uint64_t recordsize
;
2984 uint64_t record_count
;
2986 VERIFY0(dmu_objset_from_ds(ds
, &os
));
2988 /* Assume all (uncompressed) blocks are recordsize. */
2989 if (zfs_override_estimate_recordsize
!= 0) {
2990 recordsize
= zfs_override_estimate_recordsize
;
2991 } else if (os
->os_phys
->os_type
== DMU_OST_ZVOL
) {
2992 err
= dsl_prop_get_int_ds(ds
,
2993 zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE
), &recordsize
);
2995 err
= dsl_prop_get_int_ds(ds
,
2996 zfs_prop_to_name(ZFS_PROP_RECORDSIZE
), &recordsize
);
3000 record_count
= uncompressed
/ recordsize
;
3003 * If we're estimating a send size for a compressed stream, use the
3004 * compressed data size to estimate the stream size. Otherwise, use the
3005 * uncompressed data size.
3007 size
= stream_compressed
? compressed
: uncompressed
;
3010 * Subtract out approximate space used by indirect blocks.
3011 * Assume most space is used by data blocks (non-indirect, non-dnode).
3012 * Assume no ditto blocks or internal fragmentation.
3014 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
3017 size
-= record_count
* sizeof (blkptr_t
);
3019 /* Add in the space for the record associated with each block. */
3020 size
+= record_count
* sizeof (dmu_replay_record_t
);
3028 dmu_send_estimate_fast(dsl_dataset_t
*origds
, dsl_dataset_t
*fromds
,
3029 zfs_bookmark_phys_t
*frombook
, boolean_t stream_compressed
,
3030 boolean_t saved
, uint64_t *sizep
)
3033 dsl_dataset_t
*ds
= origds
;
3034 uint64_t uncomp
, comp
;
3036 ASSERT(dsl_pool_config_held(origds
->ds_dir
->dd_pool
));
3037 ASSERT(fromds
== NULL
|| frombook
== NULL
);
3040 * If this is a saved send we may actually be sending
3041 * from the %recv clone used for resuming.
3044 objset_t
*mos
= origds
->ds_dir
->dd_pool
->dp_meta_objset
;
3046 char dsname
[ZFS_MAX_DATASET_NAME_LEN
+ 6];
3048 dsl_dataset_name(origds
, dsname
);
3049 (void) strcat(dsname
, "/");
3050 (void) strlcat(dsname
, recv_clone_name
, sizeof (dsname
));
3052 err
= dsl_dataset_hold(origds
->ds_dir
->dd_pool
,
3054 if (err
!= ENOENT
&& err
!= 0) {
3056 } else if (err
== ENOENT
) {
3060 /* check that this dataset has partially received data */
3061 err
= zap_lookup(mos
, ds
->ds_object
,
3062 DS_FIELD_RESUME_TOGUID
, 8, 1, &guid
);
3064 err
= SET_ERROR(err
== ENOENT
? EINVAL
: err
);
3068 err
= zap_lookup(mos
, ds
->ds_object
,
3069 DS_FIELD_RESUME_TONAME
, 1, sizeof (dsname
), dsname
);
3071 err
= SET_ERROR(err
== ENOENT
? EINVAL
: err
);
3076 /* tosnap must be a snapshot or the target of a saved send */
3077 if (!ds
->ds_is_snapshot
&& ds
== origds
)
3078 return (SET_ERROR(EINVAL
));
3080 if (fromds
!= NULL
) {
3082 if (!fromds
->ds_is_snapshot
) {
3083 err
= SET_ERROR(EINVAL
);
3087 if (!dsl_dataset_is_before(ds
, fromds
, 0)) {
3088 err
= SET_ERROR(EXDEV
);
3092 err
= dsl_dataset_space_written(fromds
, ds
, &used
, &comp
,
3096 } else if (frombook
!= NULL
) {
3098 err
= dsl_dataset_space_written_bookmark(frombook
, ds
, &used
,
3103 uncomp
= dsl_dataset_phys(ds
)->ds_uncompressed_bytes
;
3104 comp
= dsl_dataset_phys(ds
)->ds_compressed_bytes
;
3107 err
= dmu_adjust_send_estimate_for_indirects(ds
, uncomp
, comp
,
3108 stream_compressed
, sizep
);
3110 * Add the size of the BEGIN and END records to the estimate.
3112 *sizep
+= 2 * sizeof (dmu_replay_record_t
);
3116 dsl_dataset_rele(ds
, FTAG
);
3120 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, corrupt_data
, INT
, ZMOD_RW
,
3121 "Allow sending corrupt data");
3123 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, queue_length
, UINT
, ZMOD_RW
,
3124 "Maximum send queue length");
3126 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, unmodified_spill_blocks
, INT
, ZMOD_RW
,
3127 "Send unmodified spill blocks");
3129 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, no_prefetch_queue_length
, UINT
, ZMOD_RW
,
3130 "Maximum send queue length for non-prefetch queues");
3132 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, queue_ff
, UINT
, ZMOD_RW
,
3133 "Send queue fill fraction");
3135 ZFS_MODULE_PARAM(zfs_send
, zfs_send_
, no_prefetch_queue_ff
, UINT
, ZMOD_RW
,
3136 "Send queue fill fraction for non-prefetch queues");
3138 ZFS_MODULE_PARAM(zfs_send
, zfs_
, override_estimate_recordsize
, UINT
, ZMOD_RW
,
3139 "Override block size estimate with fixed size");