1 From a6f324d47b810809de2a6106849527c6a9590175 Mon Sep 17 00:00:00 2001
2 From: Dietmar Maurer <dietmar@proxmox.com>
3 Date: Mon, 14 Jan 2013 08:05:40 +0100
4 Subject: [PATCH v3 7/7] use extra thread for vma writer
6 The previous AIO approach has problem with bdrv_drain_all(), because writer
7 coroutines are not considered there. Those coroutines are not restarted, so
8 bdrv_drain_all() can fail (tracked_requests list not empty).
10 We now use a thread, so we could also add compression here.
12 Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
14 vma-writer.c | 296 +++++++++++++++++++++++++++++++++++-----------------------
15 1 files changed, 180 insertions(+), 116 deletions(-)
17 Index: new/vma-writer.c
18 ===================================================================
19 --- new.orig/vma-writer.c 2013-01-23 07:35:12.000000000 +0100
20 +++ new/vma-writer.c 2013-01-23 09:24:19.000000000 +0100
23 #define WRITE_BUFFERS 5
25 -typedef struct VmaAIOCB VmaAIOCB;
28 +typedef struct WriteBuffer {
29 unsigned char buffer[VMA_MAX_EXTENT_SIZE];
35 +typedef struct WriterThread {
42 + WriteBuffer wbuf[WRITE_BUFFERS];
49 int outbuf_count; /* in VMA_BLOCKS */
50 uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
52 - VmaAIOCB aiocbs[WRITE_BUFFERS];
59 uint32_t config_count;
62 +static gpointer vma_writer_thread(gpointer data)
64 + WriterThread *wt = (WriterThread *)data;
67 + WriteBuffer *b = NULL;
69 + qemu_mutex_lock_iothread();
71 + for (i = 0; i < WRITE_BUFFERS; i++) {
72 + if (wt->wbuf[i].bytes) {
77 + qemu_mutex_unlock_iothread();
81 + while (done < b->bytes) {
82 + int ret = write(wt->fd, b->buffer + done, b->bytes - done);
85 + } else if (ret < 0) {
86 + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
87 + qemu_mutex_lock_iothread();
89 + qemu_mutex_unlock_iothread();
92 + } else if (ret == 0) {
93 + /* should not happen - simply try again */
96 + qemu_mutex_lock_iothread();
98 + DPRINTF("AWAKE JOB %d\n", wt->error);
100 + for (i = 0; i < WRITE_BUFFERS; i++) {
101 + wt->wbuf[i].bytes = 0;
103 + qemu_co_queue_restart_all(&wt->wqueue);
105 + qemu_co_queue_next(&wt->wqueue);
107 + qemu_mutex_unlock_iothread();
108 + DPRINTF("AWAKE JOB END\n");
112 + DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error);
113 + g_thread_exit(NULL);
116 + g_mutex_lock(wt->mutex);
117 + bool cancel = wt->cancel;
118 + if (!b && !cancel) {
119 + DPRINTF("WRITER THREAD WAIT FOR DATA\n");
120 + g_cond_wait(wt->change_cond, wt->mutex);
121 + cancel = wt->cancel;
123 + g_mutex_unlock(wt->mutex);
126 + qemu_mutex_lock_iothread();
127 + for (i = 0; i < WRITE_BUFFERS; i++) {
128 + wt->wbuf[i].bytes = 0;
130 + qemu_co_queue_restart_all(&wt->wqueue);
131 + qemu_mutex_unlock_iothread();
132 + DPRINTF("END WRITER THREAD\n");
133 + g_thread_exit(NULL);
140 +static void vma_stop_writer_thread(VmaWriter *vmaw)
144 + DPRINTF("vma_stop_writer_thread start\n");
146 + if (vmaw->wt.thread) {
147 + DPRINTF("vma_stop_writer_thread 1\n");
148 + g_mutex_lock(vmaw->wt.mutex);
149 + DPRINTF("vma_stop_writer_thread 2\n");
150 + vmaw->wt.cancel = true;
151 + g_cond_signal(vmaw->wt.change_cond);
152 + g_mutex_unlock(vmaw->wt.mutex);
153 + DPRINTF("vma_stop_writer_thread 3\n");
154 + qemu_mutex_unlock_iothread();
155 + g_thread_join(vmaw->wt.thread);
156 + qemu_mutex_lock_iothread();
157 + DPRINTF("vma_stop_writer_thread 4\n");
158 + vmaw->wt.thread = NULL;
160 + DPRINTF("vma_stop_writer_thread end\n");
163 void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...)
166 @@ -213,111 +321,45 @@
170 -static void vma_co_continue_write(void *opaque)
172 - VmaWriter *vmaw = opaque;
174 - qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL);
176 - DPRINTF("vma_co_continue_write\n");
177 - qemu_coroutine_enter(vmaw->co_writer, NULL);
180 -static ssize_t coroutine_fn
181 -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
186 - /* atomic writes (we cannot interleave writes) */
187 - qemu_co_mutex_lock(&vmaw->writer_lock);
189 - DPRINTF("vma_co_write enter %zd\n", bytes);
191 - while (done < bytes) {
192 - ret = write(vmaw->fd, buf + done, bytes - done);
195 - DPRINTF("vma_co_write written %zd %zd\n", done, ret);
196 - } else if (ret < 0) {
197 - if (errno == EAGAIN || errno == EWOULDBLOCK) {
198 - DPRINTF("vma_co_write yield %zd\n", done);
200 - vmaw->co_writer = qemu_coroutine_self();
201 - qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write,
204 - qemu_coroutine_yield();
205 - DPRINTF("vma_co_write restart %zd\n", done);
207 - vma_writer_set_error(vmaw, "vma_co_write write error - %s",
209 - done = -1; /* always return failure for partial writes */
212 - } else if (ret == 0) {
213 - /* should not happen - simply try again */
217 - qemu_co_mutex_unlock(&vmaw->writer_lock);
219 - DPRINTF("vma_co_write leave %zd\n", done);
223 -static void coroutine_fn vma_co_writer_task(void *opaque)
225 - VmaAIOCB *cb = opaque;
227 - DPRINTF("vma_co_writer_task start\n");
229 - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
230 - DPRINTF("vma_co_writer_task write done %zd\n", done);
232 - if (done != cb->bytes) {
233 - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
234 - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
240 - qemu_co_queue_next(&cb->vmaw->wqueue);
242 - DPRINTF("vma_co_writer_task end\n");
245 static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
247 DPRINTF("vma_queue_flush enter\n");
255 - VmaAIOCB *cb = NULL;
256 + WriteBuffer *b = NULL;
258 + error = vmaw->wt.error;
260 for (i = 0; i < WRITE_BUFFERS; i++) {
261 - if (vmaw->aiocbs[i].bytes) {
262 - cb = &vmaw->aiocbs[i];
263 - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
264 - vmaw->aiocbs[i].bytes);
265 + if (vmaw->wt.wbuf[i].bytes) {
266 + b = &vmaw->wt.wbuf[i];
267 + DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i,
268 + vmaw->wt.wbuf[i].bytes);
277 - qemu_co_queue_wait(&vmaw->wqueue);
278 + DPRINTF("WAIT FOR BUFFER FLUSH\n");
279 + qemu_co_queue_wait(&vmaw->wt.wqueue);
280 + DPRINTF("WAIT FOR BUFFER FLUSH END\n");
284 + vma_writer_set_error(vmaw, "vma_queue_flush write error - %s",
288 DPRINTF("vma_queue_flush leave\n");
292 - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
293 - * So we need to create a coroutione to allow 'parallel' execution.
295 static ssize_t coroutine_fn
296 vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
298 @@ -327,29 +369,42 @@
300 assert(bytes <= VMA_MAX_EXTENT_SIZE);
302 - VmaAIOCB *cb = NULL;
306 + /* wait for a free output buffer */
307 + WriteBuffer *b = NULL;
309 + error = vmaw->wt.error;
311 + vma_writer_set_error(vmaw, "vma_queue_write error - %s",
317 for (i = 0; i < WRITE_BUFFERS; i++) {
318 - if (!vmaw->aiocbs[i].bytes) {
319 - cb = &vmaw->aiocbs[i];
320 + if (!vmaw->wt.wbuf[i].bytes) {
321 + b = &vmaw->wt.wbuf[i];
326 - qemu_co_queue_wait(&vmaw->wqueue);
328 + DPRINTF("WAIT FOR BUFFER\n");
329 + qemu_co_queue_wait(&vmaw->wt.wqueue);
330 + DPRINTF("WAIT FOR BUFFER DONE\n");
334 - memcpy(cb->buffer, buf, bytes);
337 + /* copy data to output buffer */
338 + memcpy(b->buffer, buf, bytes);
341 - DPRINTF("vma_queue_write start %zd\n", bytes);
342 - cb->co = qemu_coroutine_create(vma_co_writer_task);
343 - qemu_coroutine_enter(cb->co, cb);
344 + g_mutex_lock(vmaw->wt.mutex);
345 + /* signal writer thread that we have new data */
346 + g_cond_signal(vmaw->wt.change_cond);
347 + g_mutex_unlock(vmaw->wt.mutex);
349 - DPRINTF("vma_queue_write leave\n");
350 + DPRINTF("vma_queue_write queued %zd\n", bytes);
354 @@ -386,10 +441,10 @@
355 const char *tmp_id_str;
357 if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) {
358 - oflags = O_NONBLOCK|O_WRONLY;
360 vmaw->fd = qemu_open(filename, oflags, 0644);
361 } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) {
362 - oflags = O_NONBLOCK|O_WRONLY;
364 vmaw->fd = qemu_open(filename, oflags, 0644);
365 } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) {
366 vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp);
371 - oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL;
372 + oflags = O_WRONLY|O_CREAT|O_EXCL;
373 vmaw->fd = qemu_open(filename, oflags, 0644);
376 @@ -415,10 +470,19 @@
378 qemu_co_mutex_init(&vmaw->writer_lock);
379 qemu_co_mutex_init(&vmaw->flush_lock);
380 - qemu_co_queue_init(&vmaw->wqueue);
381 + qemu_co_queue_init(&vmaw->wt.wqueue);
383 uuid_copy(vmaw->uuid, uuid);
385 + vmaw->wt.mutex = g_mutex_new();
386 + vmaw->wt.change_cond = g_cond_new();
387 + vmaw->wt.fd = vmaw->fd;
388 + vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL);
389 + if (vmaw->wt.thread == NULL) {
390 + error_setg(errp, "can't allocate writer thread\n");
398 g_checksum_free(vmaw->md5csum);
401 + if (vmaw->wt.mutex) {
402 + g_mutex_free(vmaw->wt.mutex);
405 + if (vmaw->wt.change_cond) {
406 + g_cond_free(vmaw->wt.change_cond);
416 + int error = vmaw->wt.error;
419 + vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s",
424 if (vmaw->status < 0) {
427 @@ -783,14 +863,17 @@
431 + DPRINTF("vma_writer_close start\n");
432 vma_queue_flush(vmaw);
434 /* this should not happen - just to be sure */
435 - while (!qemu_co_queue_empty(&vmaw->wqueue)) {
436 + while (!qemu_co_queue_empty(&vmaw->wt.wqueue)) {
437 DPRINTF("vma_writer_close wait\n");
438 co_sleep_ns(rt_clock, 1000000);
441 + vma_stop_writer_thread(vmaw);
444 if (pclose(vmaw->cmd) < 0) {
445 vma_writer_set_error(vmaw, "vma_writer_close: "
451 + vma_stop_writer_thread(vmaw);
454 for (i = 0; i <= 255; i++) {
455 if (vmaw->stream_info[i].devname) {
456 g_free(vmaw->stream_info[i].devname);
458 g_checksum_free(vmaw->md5csum);
461 + if (vmaw->wt.mutex) {
462 + g_mutex_free(vmaw->wt.mutex);
465 + if (vmaw->wt.change_cond) {
466 + g_cond_free(vmaw->wt.change_cond);