1 From dccc1b41302d94857bf959ef2f25978f0ef674f1 Mon Sep 17 00:00:00 2001
2 From: Wolfgang Bumiller <w.bumiller@proxmox.com>
3 Date: Wed, 9 Dec 2015 15:40:00 +0100
4 Subject: [PATCH 20/41] backup: vma: remove async queue
8 vma-writer.c | 179 +++++++++++------------------------------------------------
9 2 files changed, 38 insertions(+), 147 deletions(-)
11 diff --git a/blockdev.c b/blockdev.c
12 index 91aaf10..c36888d 100644
15 @@ -3015,6 +3015,11 @@ static void pvebackup_cancel(void *opaque)
16 error_setg(&backup_state.error, "backup cancelled");
19 + if (backup_state.vmaw) {
20 + /* make sure vma writer does not block anymore */
21 + vma_writer_set_error(backup_state.vmaw, "backup cancelled");
24 /* drain all i/o (awake jobs waiting for aio) */
27 @@ -3027,6 +3032,7 @@ static void pvebackup_cancel(void *opaque)
30 block_job_cancel_sync(job);
31 + bdrv_drain_all(); /* drain all i/o (awake jobs waiting for aio) */
35 diff --git a/vma-writer.c b/vma-writer.c
36 index 425c4b9..2558fe1 100644
40 do { if (DEBUG_VMA) { printf("vma: " fmt, ## __VA_ARGS__); } } while (0)
42 #define WRITE_BUFFERS 5
44 -typedef struct VmaAIOCB VmaAIOCB;
46 - unsigned char buffer[VMA_MAX_EXTENT_SIZE];
51 +#define HEADER_CLUSTERS 8
52 +#define HEADERBUF_SIZE (VMA_CLUSTER_SIZE*HEADER_CLUSTERS)
56 @@ -53,16 +47,14 @@ struct VmaWriter {
59 /* we always write extents */
60 - unsigned char outbuf[VMA_MAX_EXTENT_SIZE];
61 + unsigned char *outbuf;
62 int outbuf_pos; /* in bytes */
63 int outbuf_count; /* in VMA_BLOCKS */
64 uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
66 - VmaAIOCB *aiocbs[WRITE_BUFFERS];
68 + unsigned char *headerbuf;
71 - CoMutex writer_lock;
75 @@ -223,38 +215,39 @@ static void vma_co_continue_write(void *opaque)
78 static ssize_t coroutine_fn
79 -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
80 +vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
84 + DPRINTF("vma_queue_write enter %zd\n", bytes);
86 - /* atomic writes (we cannot interleave writes) */
87 - qemu_co_mutex_lock(&vmaw->writer_lock);
90 + assert(bytes <= VMA_MAX_EXTENT_SIZE);
92 - DPRINTF("vma_co_write enter %zd\n", bytes);
96 assert(vmaw->co_writer == NULL);
98 vmaw->co_writer = qemu_coroutine_self();
100 - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, NULL, vma_co_continue_write, vmaw);
102 - DPRINTF("vma_co_write wait until writable\n");
103 - qemu_coroutine_yield();
104 - DPRINTF("vma_co_write starting %zd\n", bytes);
106 while (done < bytes) {
107 + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, NULL, vma_co_continue_write, vmaw);
108 + qemu_coroutine_yield();
109 + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, NULL, NULL, NULL);
110 + if (vmaw->status < 0) {
111 + DPRINTF("vma_queue_write detected canceled backup\n");
115 ret = write(vmaw->fd, buf + done, bytes - done);
118 - DPRINTF("vma_co_write written %zd %zd\n", done, ret);
119 + DPRINTF("vma_queue_write written %zd %zd\n", done, ret);
120 } else if (ret < 0) {
121 if (errno == EAGAIN || errno == EWOULDBLOCK) {
122 - DPRINTF("vma_co_write yield %zd\n", done);
123 - qemu_coroutine_yield();
124 - DPRINTF("vma_co_write restart %zd\n", done);
126 - vma_writer_set_error(vmaw, "vma_co_write write error - %s",
129 + vma_writer_set_error(vmaw, "vma_queue_write: write error - %s",
131 done = -1; /* always return failure for partial writes */
133 @@ -264,102 +257,9 @@ vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
137 - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, NULL, NULL, NULL);
139 vmaw->co_writer = NULL;
141 - qemu_co_mutex_unlock(&vmaw->writer_lock);
143 - DPRINTF("vma_co_write leave %zd\n", done);
147 -static void coroutine_fn vma_co_writer_task(void *opaque)
149 - VmaAIOCB *cb = opaque;
151 - DPRINTF("vma_co_writer_task start\n");
153 - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
154 - DPRINTF("vma_co_writer_task write done %zd\n", done);
156 - if (done != cb->bytes) {
157 - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
158 - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
164 - qemu_co_queue_next(&cb->vmaw->wqueue);
166 - DPRINTF("vma_co_writer_task end\n");
169 -static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
171 - DPRINTF("vma_queue_flush enter\n");
177 - VmaAIOCB *cb = NULL;
178 - for (i = 0; i < WRITE_BUFFERS; i++) {
179 - if (vmaw->aiocbs[i]->bytes) {
180 - cb = vmaw->aiocbs[i];
181 - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
182 - vmaw->aiocbs[i]->bytes);
189 - qemu_co_queue_wait(&vmaw->wqueue);
192 - DPRINTF("vma_queue_flush leave\n");
196 - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
197 - * So we need to create a coroutione to allow 'parallel' execution.
199 -static ssize_t coroutine_fn
200 -vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
202 - DPRINTF("vma_queue_write enter %zd\n", bytes);
206 - assert(bytes <= VMA_MAX_EXTENT_SIZE);
208 - VmaAIOCB *cb = NULL;
211 - for (i = 0; i < WRITE_BUFFERS; i++) {
212 - if (!vmaw->aiocbs[i]->bytes) {
213 - cb = vmaw->aiocbs[i];
218 - qemu_co_queue_wait(&vmaw->wqueue);
222 - memcpy(cb->buffer, buf, bytes);
226 - DPRINTF("vma_queue_write start %zd\n", bytes);
227 - cb->co = qemu_coroutine_create(vma_co_writer_task);
228 - qemu_coroutine_enter(cb->co, cb);
230 - DPRINTF("vma_queue_write leave\n");
234 + return (done == bytes) ? bytes : -1;
237 VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp)
238 @@ -426,20 +326,16 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp)
241 /* we use O_DIRECT, so we need to align IO buffers */
243 - for (i = 0; i < WRITE_BUFFERS; i++) {
244 - vmaw->aiocbs[i] = qemu_memalign(512, sizeof(VmaAIOCB));
245 - memset(vmaw->aiocbs[i], 0, sizeof(VmaAIOCB));
248 + vmaw->outbuf = qemu_memalign(512, VMA_MAX_EXTENT_SIZE);
249 + vmaw->headerbuf = qemu_memalign(512, HEADERBUF_SIZE);
251 vmaw->outbuf_count = 0;
252 vmaw->outbuf_pos = VMA_EXTENT_HEADER_SIZE;
254 vmaw->header_blob_table_pos = 1; /* start at pos 1 */
256 - qemu_co_mutex_init(&vmaw->writer_lock);
257 qemu_co_mutex_init(&vmaw->flush_lock);
258 - qemu_co_queue_init(&vmaw->wqueue);
260 uuid_copy(vmaw->uuid, uuid);
262 @@ -466,8 +362,7 @@ err:
263 static int coroutine_fn vma_write_header(VmaWriter *vmaw)
266 - int header_clusters = 8;
267 - char buf[65536*header_clusters];
268 + unsigned char *buf = vmaw->headerbuf;
269 VmaHeader *head = (VmaHeader *)buf;
272 @@ -478,7 +373,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw)
276 - memset(buf, 0, sizeof(buf));
277 + memset(buf, 0, HEADERBUF_SIZE);
279 head->magic = VMA_MAGIC;
280 head->version = GUINT32_TO_BE(1); /* v1 */
281 @@ -513,7 +408,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw)
282 uint32_t header_size = sizeof(VmaHeader) + vmaw->header_blob_table_size;
283 head->header_size = GUINT32_TO_BE(header_size);
285 - if (header_size > sizeof(buf)) {
286 + if (header_size > HEADERBUF_SIZE) {
287 return -1; /* just to be sure */
290 @@ -811,13 +706,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp)
294 - vma_queue_flush(vmaw);
296 - /* this should not happen - just to be sure */
297 - while (!qemu_co_queue_empty(&vmaw->wqueue)) {
298 - DPRINTF("vma_writer_close wait\n");
299 - co_aio_sleep_ns(qemu_get_aio_context(), QEMU_CLOCK_REALTIME, 1000000);
301 + assert(vmaw->co_writer == NULL);
304 if (pclose(vmaw->cmd) < 0) {
305 @@ -875,9 +764,5 @@ void vma_writer_destroy(VmaWriter *vmaw)
306 g_checksum_free(vmaw->md5csum);
309 - for (i = 0; i < WRITE_BUFFERS; i++) {
310 - free(vmaw->aiocbs[i]);