1 /* NBD client library in userspace.
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
43 /* Threads pick up work in units of THREAD_WORK_SIZE starting at the
44 * next_offset. The lock protects next_offset.
46 static uint64_t next_offset
= 0;
47 static pthread_mutex_t lock
= PTHREAD_MUTEX_INITIALIZER
;
50 get_next_offset (uint64_t *offset
, uint64_t *count
)
52 bool r
= false; /* returning false means no more work */
54 pthread_mutex_lock (&lock
);
55 if (next_offset
< src
->size
) {
56 *offset
= next_offset
;
58 /* Work out how large this range is. The last range may be
59 * smaller than THREAD_WORK_SIZE.
61 *count
= src
->size
- *offset
;
62 if (*count
> THREAD_WORK_SIZE
)
63 *count
= THREAD_WORK_SIZE
;
65 next_offset
+= THREAD_WORK_SIZE
;
66 r
= true; /* there is more work */
68 /* XXX This means the progress bar "runs fast" since it shows the
69 * progress issuing commands, not necessarily progress performing
70 * the commands. We might move this into a callback, but those
71 * are called from threads and not necessarily in monotonic order
72 * so the progress bar would move erratically.
74 progress_bar (*offset
, src
->size
);
76 pthread_mutex_unlock (&lock
);
80 static void *worker_thread (void *wp
);
83 multi_thread_copying (void)
85 struct worker
*workers
;
89 /* Some invariants that should be true if the main program called us
93 assert (threads
== connections
);
95 if (src.ops == &nbd_ops)
96 assert (src.u.nbd.handles.size == connections);
97 if (dst.ops == &nbd_ops)
98 assert (dst.u.nbd.handles.size == connections);
100 assert (src
->size
!= -1);
102 workers
= calloc (threads
, sizeof *workers
);
103 if (workers
== NULL
) {
108 /* Start the worker threads. */
109 for (i
= 0; i
< threads
; ++i
) {
110 workers
[i
].index
= i
;
111 err
= pthread_create (&workers
[i
].thread
, NULL
, worker_thread
,
115 perror ("pthread_create");
120 /* Wait until all worker threads exit. */
121 for (i
= 0; i
< threads
; ++i
) {
122 err
= pthread_join (workers
[i
].thread
, NULL
);
125 perror ("pthread_join");
133 static void wait_for_request_slots (struct worker
*worker
);
134 static unsigned in_flight (size_t index
);
135 static void poll_both_ends (size_t index
);
136 static int finished_read (void *vp
, int *error
);
137 static int finished_command (void *vp
, int *error
);
138 static void free_command (struct command
*command
);
139 static void fill_dst_range_with_zeroes (struct command
*command
);
140 static struct command
*create_command (uint64_t offset
, size_t len
, bool zero
,
141 struct worker
*worker
);
143 /* Tracking worker queue size.
145 * The queue size is increased when starting a read command.
147 * The queue size is decreased when a read command is converted to zero
148 * subcommand in finished_read(), or when a write command completes in
149 * finished_command().
151 * Zero commands are not considered in the queue size since they have no
156 increase_queue_size (struct worker
*worker
, size_t len
)
158 assert (worker
->queue_size
< queue_size
);
159 worker
->queue_size
+= len
;
163 decrease_queue_size (struct worker
*worker
, size_t len
)
165 assert (worker
->queue_size
>= len
);
166 worker
->queue_size
-= len
;
169 /* Using the extents map 'exts', check if the region
170 * [offset..offset+len-1] intersects only with zero extents.
172 * The invariant for '*i' is always an extent which starts before or
173 * equal to the current offset.
176 only_zeroes (const extent_list exts
, size_t *i
,
177 uint64_t offset
, unsigned len
)
182 assert (*i
< exts
.len
);
183 assert (exts
.ptr
[*i
].offset
<= offset
);
185 /* Update the invariant. Search for the last possible extent in the
186 * list which is <= offset.
188 for (j
= *i
+ 1; j
< exts
.len
; ++j
) {
189 if (exts
.ptr
[j
].offset
<= offset
)
195 /* Check invariant again. */
196 assert (*i
< exts
.len
);
197 assert (exts
.ptr
[*i
].offset
<= offset
);
199 /* If *i is not the last extent, then the next extent starts
200 * strictly beyond our current offset.
202 assert (*i
== exts
.len
- 1 || exts
.ptr
[*i
+ 1].offset
> offset
);
204 /* Search forward, look for any non-zero extents overlapping the region. */
205 for (j
= *i
; j
< exts
.len
; ++j
) {
208 /* [start..end-1] is the current extent. */
209 start
= exts
.ptr
[j
].offset
;
210 end
= exts
.ptr
[j
].offset
+ exts
.ptr
[j
].length
;
212 assert (end
> offset
);
214 if (start
>= offset
+ len
)
217 /* Non-zero extent covering this region => test failed. */
218 if (!exts
.ptr
[j
].zero
)
225 /* There are 'threads' worker threads, each copying work ranges from
226 * src to dst until there are no more work ranges.
229 worker_thread (void *wp
)
231 struct worker
*w
= wp
;
232 uint64_t offset
, count
;
233 extent_list exts
= empty_vector
;
235 while (get_next_offset (&offset
, &count
)) {
236 struct command
*command
;
238 bool is_zeroing
= false;
239 uint64_t zeroing_start
= 0; /* initialized to avoid bogus GCC warning */
241 assert (0 < count
&& count
<= THREAD_WORK_SIZE
);
243 src
->ops
->get_extents (src
, w
->index
, offset
, count
, &exts
);
245 default_get_extents (src
, w
->index
, offset
, count
, &exts
);
247 extent_index
= 0; // index into extents array used to optimize only_zeroes
249 const size_t len
= MIN (count
, request_size
);
251 if (only_zeroes (exts
, &extent_index
, offset
, len
)) {
252 /* The source is zero so we can proceed directly to skipping,
253 * fast zeroing, or writing zeroes at the destination. Defer
254 * zeroing so we can send it as a single large command.
258 zeroing_start
= offset
;
262 /* If we were in the middle of deferred zeroing, do it now. */
264 /* Note that offset-zeroing_start can never exceed
265 * THREAD_WORK_SIZE, so there is no danger of overflowing
268 command
= create_command (zeroing_start
, offset
-zeroing_start
,
270 fill_dst_range_with_zeroes (command
);
274 /* Issue the asynchronous read command. */
275 command
= create_command (offset
, len
, false, w
);
277 wait_for_request_slots (w
);
279 /* NOTE: Must increase the queue size after waiting. */
280 increase_queue_size (w
, len
);
282 /* Begin the asynch read operation. */
283 src
->ops
->asynch_read (src
, command
,
284 (nbd_completion_callback
) {
285 .callback
= finished_read
,
286 .user_data
= command
,
292 } /* while (count) */
294 /* If we were in the middle of deferred zeroing, do it now. */
296 /* Note that offset-zeroing_start can never exceed
297 * THREAD_WORK_SIZE, so there is no danger of overflowing
300 command
= create_command (zeroing_start
, offset
- zeroing_start
,
302 fill_dst_range_with_zeroes (command
);
303 //is_zeroing = false;
307 /* Wait for in flight NBD requests to finish. */
308 while (in_flight (w
->index
) > 0)
309 poll_both_ends (w
->index
);
315 /* If the number of requests or queued bytes in flight exceed limits,
316 * then poll until enough requests finish. This enforces the user
317 * --requests and --queue-size options.
319 * NB: Unfortunately it's not possible to call this from a callback,
320 * since it will deadlock trying to grab the libnbd handle lock. This
321 * means that although the worker thread calls this and enforces the
322 * limit, when we split up requests into subrequests (eg. doing
323 * sparseness detection) we will probably exceed the user request
327 wait_for_request_slots (struct worker
*worker
)
329 while (in_flight (worker
->index
) >= max_requests
||
330 worker
->queue_size
>= queue_size
)
331 poll_both_ends (worker
->index
);
334 /* Count the number of asynchronous commands in flight. */
336 in_flight (size_t index
)
338 return src
->ops
->in_flight (src
, index
) + dst
->ops
->in_flight (dst
, index
);
341 /* Poll (optional) NBD src and NBD dst, moving the state machine(s)
342 * along. This is a lightly modified nbd_poll.
345 poll_both_ends (size_t index
)
347 struct pollfd fds
[2];
350 memset (fds
, 0, sizeof fds
);
352 /* Note: if polling is not supported, this function will
353 * set fd == -1 which poll ignores.
355 src
->ops
->get_polling_fd (src
, index
, &fds
[0].fd
, &direction
);
356 if (fds
[0].fd
>= 0) {
358 case LIBNBD_AIO_DIRECTION_READ
:
359 fds
[0].events
= POLLIN
;
361 case LIBNBD_AIO_DIRECTION_WRITE
:
362 fds
[0].events
= POLLOUT
;
364 case LIBNBD_AIO_DIRECTION_BOTH
:
365 fds
[0].events
= POLLIN
|POLLOUT
;
370 dst
->ops
->get_polling_fd (dst
, index
, &fds
[1].fd
, &direction
);
371 if (fds
[1].fd
>= 0) {
373 case LIBNBD_AIO_DIRECTION_READ
:
374 fds
[1].events
= POLLIN
;
376 case LIBNBD_AIO_DIRECTION_WRITE
:
377 fds
[1].events
= POLLOUT
;
379 case LIBNBD_AIO_DIRECTION_BOTH
:
380 fds
[1].events
= POLLIN
|POLLOUT
;
385 r
= poll (fds
, 2, -1);
393 if (fds
[0].fd
>= 0) {
394 if ((fds
[0].revents
& (POLLIN
| POLLHUP
)) != 0)
395 src
->ops
->asynch_notify_read (src
, index
);
396 else if ((fds
[0].revents
& POLLOUT
) != 0)
397 src
->ops
->asynch_notify_write (src
, index
);
398 else if ((fds
[0].revents
& (POLLERR
| POLLNVAL
)) != 0) {
405 if (fds
[1].fd
>= 0) {
406 if ((fds
[1].revents
& (POLLIN
| POLLHUP
)) != 0)
407 dst
->ops
->asynch_notify_read (dst
, index
);
408 else if ((fds
[1].revents
& POLLOUT
) != 0)
409 dst
->ops
->asynch_notify_write (dst
, index
);
410 else if ((fds
[1].revents
& (POLLERR
| POLLNVAL
)) != 0) {
418 /* Create a new buffer. */
419 static struct buffer
*
420 create_buffer (size_t len
)
422 struct buffer
*buffer
;
424 buffer
= calloc (1, sizeof *buffer
);
425 if (buffer
== NULL
) {
430 buffer
->data
= malloc (len
);
431 if (buffer
->data
== NULL
) {
441 /* Create a new command for read or zero. */
442 static struct command
*
443 create_command (uint64_t offset
, size_t len
, bool zero
, struct worker
*worker
)
445 struct command
*command
;
447 command
= calloc (1, sizeof *command
);
448 if (command
== NULL
) {
453 command
->offset
= offset
;
454 command
->slice
.len
= len
;
457 command
->slice
.buffer
= create_buffer (len
);
459 command
->worker
= worker
;
464 /* Create a sub-command of an existing command. This creates a slice
465 * referencing the buffer of the existing command without copying.
467 static struct command
*
468 create_subcommand (struct command
*command
, uint64_t offset
, size_t len
,
471 const uint64_t end
= command
->offset
+ command
->slice
.len
;
472 struct command
*newcommand
;
474 assert (command
->offset
<= offset
&& offset
< end
);
475 assert (offset
+ len
<= end
);
477 newcommand
= calloc (1, sizeof *newcommand
);
478 if (newcommand
== NULL
) {
482 newcommand
->offset
= offset
;
483 newcommand
->slice
.len
= len
;
485 newcommand
->slice
.buffer
= command
->slice
.buffer
;
486 newcommand
->slice
.buffer
->refs
++;
487 newcommand
->slice
.base
= offset
- command
->offset
;
489 newcommand
->worker
= command
->worker
;
494 /* Callback called when src has finished one read command. This
498 finished_read (void *vp
, int *error
)
500 struct command
*command
= vp
;
503 fprintf (stderr
, "%s: read at offset %" PRId64
" failed: %s\n",
504 prog
, command
->offset
, strerror (*error
));
508 if (allocated
|| sparse_size
== 0) {
509 /* If sparseness detection (see below) is turned off then we write
512 dst
->ops
->asynch_write (dst
, command
,
513 (nbd_completion_callback
) {
514 .callback
= finished_command
,
515 .user_data
= command
,
518 else { /* Sparseness detection. */
519 const uint64_t start
= command
->offset
;
520 const uint64_t end
= start
+ command
->slice
.len
;
521 uint64_t last_offset
= start
;
522 bool last_is_zero
= false;
524 struct command
*newcommand
;
526 /* Iterate over whole blocks in the command, starting on a block
529 for (i
= MIN (ROUND_UP (start
, sparse_size
), end
);
530 i
+ sparse_size
<= end
;
532 if (is_zero (slice_ptr (command
->slice
) + i
-start
, sparse_size
)) {
533 /* It's a zero range. If the last was a zero too then we do
534 * nothing here which coalesces. Otherwise write the last data
535 * and start a new zero range.
538 /* Write the last data (if any). */
539 if (i
- last_offset
> 0) {
540 newcommand
= create_subcommand (command
,
541 last_offset
, i
- last_offset
,
543 dst
->ops
->asynch_write (dst
, newcommand
,
544 (nbd_completion_callback
) {
545 .callback
= finished_command
,
546 .user_data
= newcommand
,
549 /* Start the new zero range. */
555 /* It's data. If the last was data too, do nothing =>
556 * coalesce. Otherwise write the last zero range and start a
560 /* Write the last zero range (if any). */
561 if (i
- last_offset
> 0) {
562 newcommand
= create_subcommand (command
,
563 last_offset
, i
- last_offset
,
565 decrease_queue_size (command
->worker
, newcommand
->slice
.len
);
566 fill_dst_range_with_zeroes (newcommand
);
568 /* Start the new data. */
570 last_is_zero
= false;
575 /* Write the last_offset up to i. */
576 if (i
- last_offset
> 0) {
578 newcommand
= create_subcommand (command
,
579 last_offset
, i
- last_offset
,
581 dst
->ops
->asynch_write (dst
, newcommand
,
582 (nbd_completion_callback
) {
583 .callback
= finished_command
,
584 .user_data
= newcommand
,
588 newcommand
= create_subcommand (command
,
589 last_offset
, i
- last_offset
,
591 decrease_queue_size (command
->worker
, newcommand
->slice
.len
);
592 fill_dst_range_with_zeroes (newcommand
);
596 /* There may be an unaligned tail, so write that. */
598 newcommand
= create_subcommand (command
, i
, end
- i
, false);
599 dst
->ops
->asynch_write (dst
, newcommand
,
600 (nbd_completion_callback
) {
601 .callback
= finished_command
,
602 .user_data
= newcommand
,
606 /* Free the original command since it has been split into
607 * subcommands and the original is no longer needed.
609 free_command (command
);
612 return 1; /* auto-retires the command */
615 /* Fill a range in dst with zeroes. This is called from the copying
616 * loop when we see a zero range in the source. Depending on the
617 * command line flags this could mean:
619 * --destination-is-zero:
622 * --allocated: write zeroes allocating space using an efficient
623 * zeroing command or writing a command of zeroes
625 * (neither flag) write zeroes punching a hole using an efficient
626 * zeroing command or fallback to writing a command
629 * This takes over ownership of the command and frees it eventually.
632 fill_dst_range_with_zeroes (struct command
*command
)
637 if (destination_is_zero
)
638 goto free_and_return
;
640 /* Try efficient zeroing. */
641 if (dst
->ops
->asynch_zero (dst
, command
,
642 (nbd_completion_callback
) {
643 .callback
= finished_command
,
644 .user_data
= command
,
649 /* Fall back to loop writing zeroes. This is going to be slow
650 * anyway, so do it synchronously. XXX
652 data_size
= MIN (request_size
, command
->slice
.len
);
653 data
= calloc (1, data_size
);
658 while (command
->slice
.len
> 0) {
659 size_t len
= command
->slice
.len
;
664 dst
->ops
->synch_write (dst
, data
, len
, command
->offset
);
665 command
->slice
.len
-= len
;
666 command
->offset
+= len
;
671 free_command (command
);
675 finished_command (void *vp
, int *error
)
677 struct command
*command
= vp
;
680 fprintf (stderr
, "%s: write at offset %" PRId64
" failed: %s\n",
681 prog
, command
->offset
, strerror (*error
));
685 if (command
->slice
.buffer
)
686 decrease_queue_size (command
->worker
, command
->slice
.len
);
688 free_command (command
);
690 return 1; /* auto-retires the command */
694 free_command (struct command
*command
)
699 struct buffer
*buffer
= command
->slice
.buffer
;
701 if (buffer
!= NULL
) {
702 if (--buffer
->refs
== 0) {