Revert "ci: skip "lib/test-fork-safe-execvpe.sh" on Alpine Linux"
[libnbd.git] / copy / multi-thread-copying.c
blob00eec511d015420f75856a9fee2605cd079acdfa
1 /* NBD client library in userspace.
2 * Copyright Red Hat
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 #include <config.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <stdint.h>
24 #include <stdbool.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <poll.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <sys/stat.h>
31 #include <inttypes.h>
33 #include <pthread.h>
35 #include <libnbd.h>
37 #include "iszero.h"
38 #include "minmax.h"
39 #include "rounding.h"
41 #include "nbdcopy.h"
43 /* Threads pick up work in units of THREAD_WORK_SIZE starting at the
44 * next_offset. The lock protects next_offset.
46 static uint64_t next_offset = 0;
47 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
49 static bool
50 get_next_offset (uint64_t *offset, uint64_t *count)
52 bool r = false; /* returning false means no more work */
54 pthread_mutex_lock (&lock);
55 if (next_offset < src->size) {
56 *offset = next_offset;
58 /* Work out how large this range is. The last range may be
59 * smaller than THREAD_WORK_SIZE.
61 *count = src->size - *offset;
62 if (*count > THREAD_WORK_SIZE)
63 *count = THREAD_WORK_SIZE;
65 next_offset += THREAD_WORK_SIZE;
66 r = true; /* there is more work */
68 /* XXX This means the progress bar "runs fast" since it shows the
69 * progress issuing commands, not necessarily progress performing
70 * the commands. We might move this into a callback, but those
71 * are called from threads and not necessarily in monotonic order
72 * so the progress bar would move erratically.
74 progress_bar (*offset, src->size);
76 pthread_mutex_unlock (&lock);
77 return r;
80 static void *worker_thread (void *wp);
82 void
83 multi_thread_copying (void)
85 struct worker *workers;
86 size_t i;
87 int err;
89 /* Some invariants that should be true if the main program called us
90 * correctly.
92 assert (threads > 0);
93 assert (threads == connections);
95 if (src.ops == &nbd_ops)
96 assert (src.u.nbd.handles.size == connections);
97 if (dst.ops == &nbd_ops)
98 assert (dst.u.nbd.handles.size == connections);
100 assert (src->size != -1);
102 workers = calloc (threads, sizeof *workers);
103 if (workers == NULL) {
104 perror ("calloc");
105 exit (EXIT_FAILURE);
108 /* Start the worker threads. */
109 for (i = 0; i < threads; ++i) {
110 workers[i].index = i;
111 err = pthread_create (&workers[i].thread, NULL, worker_thread,
112 &workers[i]);
113 if (err != 0) {
114 errno = err;
115 perror ("pthread_create");
116 exit (EXIT_FAILURE);
120 /* Wait until all worker threads exit. */
121 for (i = 0; i < threads; ++i) {
122 err = pthread_join (workers[i].thread, NULL);
123 if (err != 0) {
124 errno = err;
125 perror ("pthread_join");
126 exit (EXIT_FAILURE);
130 free (workers);
133 static void wait_for_request_slots (struct worker *worker);
134 static unsigned in_flight (size_t index);
135 static void poll_both_ends (size_t index);
136 static int finished_read (void *vp, int *error);
137 static int finished_command (void *vp, int *error);
138 static void free_command (struct command *command);
139 static void fill_dst_range_with_zeroes (struct command *command);
140 static struct command *create_command (uint64_t offset, size_t len, bool zero,
141 struct worker *worker);
143 /* Tracking worker queue size.
145 * The queue size is increased when starting a read command.
147 * The queue size is decreased when a read command is converted to zero
148 * subcommand in finished_read(), or when a write command completes in
149 * finished_command().
151 * Zero commands are not considered in the queue size since they have no
152 * payload.
155 static inline void
156 increase_queue_size (struct worker *worker, size_t len)
158 assert (worker->queue_size < queue_size);
159 worker->queue_size += len;
162 static inline void
163 decrease_queue_size (struct worker *worker, size_t len)
165 assert (worker->queue_size >= len);
166 worker->queue_size -= len;
169 /* Using the extents map 'exts', check if the region
170 * [offset..offset+len-1] intersects only with zero extents.
172 * The invariant for '*i' is always an extent which starts before or
173 * equal to the current offset.
175 static bool
176 only_zeroes (const extent_list exts, size_t *i,
177 uint64_t offset, unsigned len)
179 size_t j;
181 /* Invariant. */
182 assert (*i < exts.len);
183 assert (exts.ptr[*i].offset <= offset);
185 /* Update the invariant. Search for the last possible extent in the
186 * list which is <= offset.
188 for (j = *i + 1; j < exts.len; ++j) {
189 if (exts.ptr[j].offset <= offset)
190 *i = j;
191 else
192 break;
195 /* Check invariant again. */
196 assert (*i < exts.len);
197 assert (exts.ptr[*i].offset <= offset);
199 /* If *i is not the last extent, then the next extent starts
200 * strictly beyond our current offset.
202 assert (*i == exts.len - 1 || exts.ptr[*i + 1].offset > offset);
204 /* Search forward, look for any non-zero extents overlapping the region. */
205 for (j = *i; j < exts.len; ++j) {
206 uint64_t start, end;
208 /* [start..end-1] is the current extent. */
209 start = exts.ptr[j].offset;
210 end = exts.ptr[j].offset + exts.ptr[j].length;
212 assert (end > offset);
214 if (start >= offset + len)
215 break;
217 /* Non-zero extent covering this region => test failed. */
218 if (!exts.ptr[j].zero)
219 return false;
222 return true;
225 /* There are 'threads' worker threads, each copying work ranges from
226 * src to dst until there are no more work ranges.
228 static void *
229 worker_thread (void *wp)
231 struct worker *w = wp;
232 uint64_t offset, count;
233 extent_list exts = empty_vector;
235 while (get_next_offset (&offset, &count)) {
236 struct command *command;
237 size_t extent_index;
238 bool is_zeroing = false;
239 uint64_t zeroing_start = 0; /* initialized to avoid bogus GCC warning */
241 assert (0 < count && count <= THREAD_WORK_SIZE);
242 if (extents)
243 src->ops->get_extents (src, w->index, offset, count, &exts);
244 else
245 default_get_extents (src, w->index, offset, count, &exts);
247 extent_index = 0; // index into extents array used to optimize only_zeroes
248 while (count) {
249 const size_t len = MIN (count, request_size);
251 if (only_zeroes (exts, &extent_index, offset, len)) {
252 /* The source is zero so we can proceed directly to skipping,
253 * fast zeroing, or writing zeroes at the destination. Defer
254 * zeroing so we can send it as a single large command.
256 if (!is_zeroing) {
257 is_zeroing = true;
258 zeroing_start = offset;
261 else /* data */ {
262 /* If we were in the middle of deferred zeroing, do it now. */
263 if (is_zeroing) {
264 /* Note that offset-zeroing_start can never exceed
265 * THREAD_WORK_SIZE, so there is no danger of overflowing
266 * size_t.
268 command = create_command (zeroing_start, offset-zeroing_start,
269 true, w);
270 fill_dst_range_with_zeroes (command);
271 is_zeroing = false;
274 /* Issue the asynchronous read command. */
275 command = create_command (offset, len, false, w);
277 wait_for_request_slots (w);
279 /* NOTE: Must increase the queue size after waiting. */
280 increase_queue_size (w, len);
282 /* Begin the asynch read operation. */
283 src->ops->asynch_read (src, command,
284 (nbd_completion_callback) {
285 .callback = finished_read,
286 .user_data = command,
290 offset += len;
291 count -= len;
292 } /* while (count) */
294 /* If we were in the middle of deferred zeroing, do it now. */
295 if (is_zeroing) {
296 /* Note that offset-zeroing_start can never exceed
297 * THREAD_WORK_SIZE, so there is no danger of overflowing
298 * size_t.
300 command = create_command (zeroing_start, offset - zeroing_start,
301 true, w);
302 fill_dst_range_with_zeroes (command);
303 //is_zeroing = false;
307 /* Wait for in flight NBD requests to finish. */
308 while (in_flight (w->index) > 0)
309 poll_both_ends (w->index);
311 free (exts.ptr);
312 return NULL;
315 /* If the number of requests or queued bytes in flight exceed limits,
316 * then poll until enough requests finish. This enforces the user
317 * --requests and --queue-size options.
319 * NB: Unfortunately it's not possible to call this from a callback,
320 * since it will deadlock trying to grab the libnbd handle lock. This
321 * means that although the worker thread calls this and enforces the
322 * limit, when we split up requests into subrequests (eg. doing
323 * sparseness detection) we will probably exceed the user request
324 * limit. XXX
326 static void
327 wait_for_request_slots (struct worker *worker)
329 while (in_flight (worker->index) >= max_requests ||
330 worker->queue_size >= queue_size)
331 poll_both_ends (worker->index);
334 /* Count the number of asynchronous commands in flight. */
335 static unsigned
336 in_flight (size_t index)
338 return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index);
341 /* Poll (optional) NBD src and NBD dst, moving the state machine(s)
342 * along. This is a lightly modified nbd_poll.
344 static void
345 poll_both_ends (size_t index)
347 struct pollfd fds[2];
348 int r, direction;
350 memset (fds, 0, sizeof fds);
352 /* Note: if polling is not supported, this function will
353 * set fd == -1 which poll ignores.
355 src->ops->get_polling_fd (src, index, &fds[0].fd, &direction);
356 if (fds[0].fd >= 0) {
357 switch (direction) {
358 case LIBNBD_AIO_DIRECTION_READ:
359 fds[0].events = POLLIN;
360 break;
361 case LIBNBD_AIO_DIRECTION_WRITE:
362 fds[0].events = POLLOUT;
363 break;
364 case LIBNBD_AIO_DIRECTION_BOTH:
365 fds[0].events = POLLIN|POLLOUT;
366 break;
370 dst->ops->get_polling_fd (dst, index, &fds[1].fd, &direction);
371 if (fds[1].fd >= 0) {
372 switch (direction) {
373 case LIBNBD_AIO_DIRECTION_READ:
374 fds[1].events = POLLIN;
375 break;
376 case LIBNBD_AIO_DIRECTION_WRITE:
377 fds[1].events = POLLOUT;
378 break;
379 case LIBNBD_AIO_DIRECTION_BOTH:
380 fds[1].events = POLLIN|POLLOUT;
381 break;
385 r = poll (fds, 2, -1);
386 if (r == -1) {
387 perror ("poll");
388 exit (EXIT_FAILURE);
390 if (r == 0)
391 return;
393 if (fds[0].fd >= 0) {
394 if ((fds[0].revents & (POLLIN | POLLHUP)) != 0)
395 src->ops->asynch_notify_read (src, index);
396 else if ((fds[0].revents & POLLOUT) != 0)
397 src->ops->asynch_notify_write (src, index);
398 else if ((fds[0].revents & (POLLERR | POLLNVAL)) != 0) {
399 errno = ENOTCONN;
400 perror (src->name);
401 exit (EXIT_FAILURE);
405 if (fds[1].fd >= 0) {
406 if ((fds[1].revents & (POLLIN | POLLHUP)) != 0)
407 dst->ops->asynch_notify_read (dst, index);
408 else if ((fds[1].revents & POLLOUT) != 0)
409 dst->ops->asynch_notify_write (dst, index);
410 else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) {
411 errno = ENOTCONN;
412 perror (dst->name);
413 exit (EXIT_FAILURE);
418 /* Create a new buffer. */
419 static struct buffer*
420 create_buffer (size_t len)
422 struct buffer *buffer;
424 buffer = calloc (1, sizeof *buffer);
425 if (buffer == NULL) {
426 perror ("calloc");
427 exit (EXIT_FAILURE);
430 buffer->data = malloc (len);
431 if (buffer->data == NULL) {
432 perror ("malloc");
433 exit (EXIT_FAILURE);
436 buffer->refs = 1;
438 return buffer;
441 /* Create a new command for read or zero. */
442 static struct command *
443 create_command (uint64_t offset, size_t len, bool zero, struct worker *worker)
445 struct command *command;
447 command = calloc (1, sizeof *command);
448 if (command == NULL) {
449 perror ("calloc");
450 exit (EXIT_FAILURE);
453 command->offset = offset;
454 command->slice.len = len;
456 if (!zero)
457 command->slice.buffer = create_buffer (len);
459 command->worker = worker;
461 return command;
464 /* Create a sub-command of an existing command. This creates a slice
465 * referencing the buffer of the existing command without copying.
467 static struct command *
468 create_subcommand (struct command *command, uint64_t offset, size_t len,
469 bool zero)
471 const uint64_t end = command->offset + command->slice.len;
472 struct command *newcommand;
474 assert (command->offset <= offset && offset < end);
475 assert (offset + len <= end);
477 newcommand = calloc (1, sizeof *newcommand);
478 if (newcommand == NULL) {
479 perror ("calloc");
480 exit (EXIT_FAILURE);
482 newcommand->offset = offset;
483 newcommand->slice.len = len;
484 if (!zero) {
485 newcommand->slice.buffer = command->slice.buffer;
486 newcommand->slice.buffer->refs++;
487 newcommand->slice.base = offset - command->offset;
489 newcommand->worker = command->worker;
491 return newcommand;
494 /* Callback called when src has finished one read command. This
495 * initiates a write.
497 static int
498 finished_read (void *vp, int *error)
500 struct command *command = vp;
502 if (*error) {
503 fprintf (stderr, "%s: read at offset %" PRId64 " failed: %s\n",
504 prog, command->offset, strerror (*error));
505 exit (EXIT_FAILURE);
508 if (allocated || sparse_size == 0) {
509 /* If sparseness detection (see below) is turned off then we write
510 * the whole command.
512 dst->ops->asynch_write (dst, command,
513 (nbd_completion_callback) {
514 .callback = finished_command,
515 .user_data = command,
518 else { /* Sparseness detection. */
519 const uint64_t start = command->offset;
520 const uint64_t end = start + command->slice.len;
521 uint64_t last_offset = start;
522 bool last_is_zero = false;
523 uint64_t i;
524 struct command *newcommand;
526 /* Iterate over whole blocks in the command, starting on a block
527 * boundary.
529 for (i = MIN (ROUND_UP (start, sparse_size), end);
530 i + sparse_size <= end;
531 i += sparse_size) {
532 if (is_zero (slice_ptr (command->slice) + i-start, sparse_size)) {
533 /* It's a zero range. If the last was a zero too then we do
534 * nothing here which coalesces. Otherwise write the last data
535 * and start a new zero range.
537 if (!last_is_zero) {
538 /* Write the last data (if any). */
539 if (i - last_offset > 0) {
540 newcommand = create_subcommand (command,
541 last_offset, i - last_offset,
542 false);
543 dst->ops->asynch_write (dst, newcommand,
544 (nbd_completion_callback) {
545 .callback = finished_command,
546 .user_data = newcommand,
549 /* Start the new zero range. */
550 last_offset = i;
551 last_is_zero = true;
554 else {
555 /* It's data. If the last was data too, do nothing =>
556 * coalesce. Otherwise write the last zero range and start a
557 * new data.
559 if (last_is_zero) {
560 /* Write the last zero range (if any). */
561 if (i - last_offset > 0) {
562 newcommand = create_subcommand (command,
563 last_offset, i - last_offset,
564 true);
565 decrease_queue_size (command->worker, newcommand->slice.len);
566 fill_dst_range_with_zeroes (newcommand);
568 /* Start the new data. */
569 last_offset = i;
570 last_is_zero = false;
573 } /* for i */
575 /* Write the last_offset up to i. */
576 if (i - last_offset > 0) {
577 if (!last_is_zero) {
578 newcommand = create_subcommand (command,
579 last_offset, i - last_offset,
580 false);
581 dst->ops->asynch_write (dst, newcommand,
582 (nbd_completion_callback) {
583 .callback = finished_command,
584 .user_data = newcommand,
587 else {
588 newcommand = create_subcommand (command,
589 last_offset, i - last_offset,
590 true);
591 decrease_queue_size (command->worker, newcommand->slice.len);
592 fill_dst_range_with_zeroes (newcommand);
596 /* There may be an unaligned tail, so write that. */
597 if (end - i > 0) {
598 newcommand = create_subcommand (command, i, end - i, false);
599 dst->ops->asynch_write (dst, newcommand,
600 (nbd_completion_callback) {
601 .callback = finished_command,
602 .user_data = newcommand,
606 /* Free the original command since it has been split into
607 * subcommands and the original is no longer needed.
609 free_command (command);
612 return 1; /* auto-retires the command */
615 /* Fill a range in dst with zeroes. This is called from the copying
616 * loop when we see a zero range in the source. Depending on the
617 * command line flags this could mean:
619 * --destination-is-zero:
620 * do nothing
622 * --allocated: write zeroes allocating space using an efficient
623 * zeroing command or writing a command of zeroes
625 * (neither flag) write zeroes punching a hole using an efficient
626 * zeroing command or fallback to writing a command
627 * of zeroes.
629 * This takes over ownership of the command and frees it eventually.
631 static void
632 fill_dst_range_with_zeroes (struct command *command)
634 char *data;
635 size_t data_size;
637 if (destination_is_zero)
638 goto free_and_return;
640 /* Try efficient zeroing. */
641 if (dst->ops->asynch_zero (dst, command,
642 (nbd_completion_callback) {
643 .callback = finished_command,
644 .user_data = command,
646 allocated))
647 return;
649 /* Fall back to loop writing zeroes. This is going to be slow
650 * anyway, so do it synchronously. XXX
652 data_size = MIN (request_size, command->slice.len);
653 data = calloc (1, data_size);
654 if (!data) {
655 perror ("calloc");
656 exit (EXIT_FAILURE);
658 while (command->slice.len > 0) {
659 size_t len = command->slice.len;
661 if (len > data_size)
662 len = data_size;
664 dst->ops->synch_write (dst, data, len, command->offset);
665 command->slice.len -= len;
666 command->offset += len;
668 free (data);
670 free_and_return:
671 free_command (command);
674 static int
675 finished_command (void *vp, int *error)
677 struct command *command = vp;
679 if (*error) {
680 fprintf (stderr, "%s: write at offset %" PRId64 " failed: %s\n",
681 prog, command->offset, strerror (*error));
682 exit (EXIT_FAILURE);
685 if (command->slice.buffer)
686 decrease_queue_size (command->worker, command->slice.len);
688 free_command (command);
690 return 1; /* auto-retires the command */
693 static void
694 free_command (struct command *command)
696 if (command == NULL)
697 return;
699 struct buffer *buffer = command->slice.buffer;
701 if (buffer != NULL) {
702 if (--buffer->refs == 0) {
703 free (buffer->data);
704 free (buffer);
708 free (command);