1 We do not gain much speed here, so I removed the whole queue code
2 to make things simpler.
4 Also, previous code produced segmentation faults in qemu_co_mutex_lock().
6 Index: new/vma-writer.c
7 ===================================================================
8 --- new.orig/vma-writer.c 2014-11-20 09:08:33.000000000 +0100
9 +++ new/vma-writer.c 2014-11-20 09:10:14.000000000 +0100
11 do { if (DEBUG_VMA) { printf("vma: " fmt, ## __VA_ARGS__); } } while (0)
13 #define WRITE_BUFFERS 5
15 -typedef struct VmaAIOCB VmaAIOCB;
17 - unsigned char buffer[VMA_MAX_EXTENT_SIZE];
22 +#define HEADER_CLUSTERS 8
23 +#define HEADERBUF_SIZE (VMA_CLUSTER_SIZE*HEADER_CLUSTERS)
30 /* we always write extents */
31 - unsigned char outbuf[VMA_MAX_EXTENT_SIZE];
32 + unsigned char *outbuf;
33 int outbuf_pos; /* in bytes */
34 int outbuf_count; /* in VMA_BLOCKS */
35 uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
37 - VmaAIOCB *aiocbs[WRITE_BUFFERS];
39 + unsigned char *headerbuf;
42 - CoMutex writer_lock;
49 static ssize_t coroutine_fn
50 -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
51 +vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
55 + DPRINTF("vma_queue_write enter %zd\n", bytes);
57 - /* atomic writes (we cannot interleave writes) */
58 - qemu_co_mutex_lock(&vmaw->writer_lock);
61 + assert(bytes <= VMA_MAX_EXTENT_SIZE);
63 - DPRINTF("vma_co_write enter %zd\n", bytes);
67 assert(vmaw->co_writer == NULL);
69 vmaw->co_writer = qemu_coroutine_self();
71 - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, NULL, vma_co_continue_write, vmaw);
73 - DPRINTF("vma_co_write wait until writable\n");
74 - qemu_coroutine_yield();
75 - DPRINTF("vma_co_write starting %zd\n", bytes);
77 while (done < bytes) {
78 + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, NULL, vma_co_continue_write, vmaw);
79 + qemu_coroutine_yield();
80 + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, NULL, NULL, NULL);
81 + if (vmaw->status < 0) {
82 + DPRINTF("vma_queue_write detected canceled backup\n");
86 ret = write(vmaw->fd, buf + done, bytes - done);
89 - DPRINTF("vma_co_write written %zd %zd\n", done, ret);
90 + DPRINTF("vma_queue_write written %zd %zd\n", done, ret);
92 if (errno == EAGAIN || errno == EWOULDBLOCK) {
93 - DPRINTF("vma_co_write yield %zd\n", done);
94 - qemu_coroutine_yield();
95 - DPRINTF("vma_co_write restart %zd\n", done);
97 - vma_writer_set_error(vmaw, "vma_co_write write error - %s",
100 + vma_writer_set_error(vmaw, "vma_queue_write: write error - %s",
102 done = -1; /* always return failure for partial writes */
104 @@ -264,102 +257,9 @@
108 - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, NULL, NULL, NULL);
110 vmaw->co_writer = NULL;
112 - qemu_co_mutex_unlock(&vmaw->writer_lock);
114 - DPRINTF("vma_co_write leave %zd\n", done);
118 -static void coroutine_fn vma_co_writer_task(void *opaque)
120 - VmaAIOCB *cb = opaque;
122 - DPRINTF("vma_co_writer_task start\n");
124 - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
125 - DPRINTF("vma_co_writer_task write done %zd\n", done);
127 - if (done != cb->bytes) {
128 - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
129 - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
135 - qemu_co_queue_next(&cb->vmaw->wqueue);
137 - DPRINTF("vma_co_writer_task end\n");
140 -static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
142 - DPRINTF("vma_queue_flush enter\n");
148 - VmaAIOCB *cb = NULL;
149 - for (i = 0; i < WRITE_BUFFERS; i++) {
150 - if (vmaw->aiocbs[i]->bytes) {
151 - cb = vmaw->aiocbs[i];
152 - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
153 - vmaw->aiocbs[i]->bytes);
160 - qemu_co_queue_wait(&vmaw->wqueue);
163 - DPRINTF("vma_queue_flush leave\n");
167 - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
168 - * So we need to create a coroutione to allow 'parallel' execution.
170 -static ssize_t coroutine_fn
171 -vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
173 - DPRINTF("vma_queue_write enter %zd\n", bytes);
177 - assert(bytes <= VMA_MAX_EXTENT_SIZE);
179 - VmaAIOCB *cb = NULL;
182 - for (i = 0; i < WRITE_BUFFERS; i++) {
183 - if (!vmaw->aiocbs[i]->bytes) {
184 - cb = vmaw->aiocbs[i];
189 - qemu_co_queue_wait(&vmaw->wqueue);
193 - memcpy(cb->buffer, buf, bytes);
197 - DPRINTF("vma_queue_write start %zd\n", bytes);
198 - cb->co = qemu_coroutine_create(vma_co_writer_task);
199 - qemu_coroutine_enter(cb->co, cb);
201 - DPRINTF("vma_queue_write leave\n");
205 + return (done == bytes) ? bytes : -1;
208 VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp)
209 @@ -426,20 +326,16 @@
212 /* we use O_DIRECT, so we need to align IO buffers */
214 - for (i = 0; i < WRITE_BUFFERS; i++) {
215 - vmaw->aiocbs[i] = qemu_memalign(512, sizeof(VmaAIOCB));
216 - memset(vmaw->aiocbs[i], 0, sizeof(VmaAIOCB));
219 + vmaw->outbuf = qemu_memalign(512, VMA_MAX_EXTENT_SIZE);
220 + vmaw->headerbuf = qemu_memalign(512, HEADERBUF_SIZE);
222 vmaw->outbuf_count = 0;
223 vmaw->outbuf_pos = VMA_EXTENT_HEADER_SIZE;
225 vmaw->header_blob_table_pos = 1; /* start at pos 1 */
227 - qemu_co_mutex_init(&vmaw->writer_lock);
228 qemu_co_mutex_init(&vmaw->flush_lock);
229 - qemu_co_queue_init(&vmaw->wqueue);
231 uuid_copy(vmaw->uuid, uuid);
234 static int coroutine_fn vma_write_header(VmaWriter *vmaw)
237 - int header_clusters = 8;
238 - char buf[65536*header_clusters];
239 + unsigned char *buf = vmaw->headerbuf;
240 VmaHeader *head = (VmaHeader *)buf;
247 - memset(buf, 0, sizeof(buf));
248 + memset(buf, 0, HEADERBUF_SIZE);
250 head->magic = VMA_MAGIC;
251 head->version = GUINT32_TO_BE(1); /* v1 */
253 uint32_t header_size = sizeof(VmaHeader) + vmaw->header_blob_table_size;
254 head->header_size = GUINT32_TO_BE(header_size);
256 - if (header_size > sizeof(buf)) {
257 + if (header_size > HEADERBUF_SIZE) {
258 return -1; /* just to be sure */
265 - vma_queue_flush(vmaw);
267 - /* this should not happen - just to be sure */
268 - while (!qemu_co_queue_empty(&vmaw->wqueue)) {
269 - DPRINTF("vma_writer_close wait\n");
270 - co_aio_sleep_ns(qemu_get_aio_context(), QEMU_CLOCK_REALTIME, 1000000);
272 + assert(vmaw->co_writer == NULL);
275 if (pclose(vmaw->cmd) < 0) {
277 g_checksum_free(vmaw->md5csum);
280 - for (i = 0; i < WRITE_BUFFERS; i++) {
281 - free(vmaw->aiocbs[i]);
286 Index: new/blockdev.c
287 ===================================================================
288 --- new.orig/blockdev.c 2014-11-20 09:08:33.000000000 +0100
289 +++ new/blockdev.c 2014-11-20 09:08:49.000000000 +0100
290 @@ -2094,6 +2094,11 @@
291 error_setg(&backup_state.error, "backup cancelled");
294 + if (backup_state.vmaw) {
295 + /* make sure vma writer does not block anymore */
296 + vma_writer_set_error(backup_state.vmaw, "backup cancelled");
299 /* drain all i/o (awake jobs waiting for aio) */
302 @@ -2106,6 +2111,7 @@
304 if (!di->completed) {
305 block_job_cancel_sync(job);
306 + bdrv_drain_all(); /* drain all i/o (awake jobs waiting for aio) */