4 * Copyright (c) 2015 Red Hat, Inc.
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 #include "qemu/osdep.h"
22 #include "block/aio-wait.h"
23 #include "io/channel.h"
24 #include "qapi/error.h"
25 #include "qemu/main-loop.h"
26 #include "qemu/module.h"
29 bool qio_channel_has_feature(QIOChannel
*ioc
,
30 QIOChannelFeature feature
)
32 return ioc
->features
& (1 << feature
);
36 void qio_channel_set_feature(QIOChannel
*ioc
,
37 QIOChannelFeature feature
)
39 ioc
->features
|= (1 << feature
);
43 void qio_channel_set_name(QIOChannel
*ioc
,
47 ioc
->name
= g_strdup(name
);
51 ssize_t
qio_channel_readv_full(QIOChannel
*ioc
,
52 const struct iovec
*iov
,
59 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
62 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
)) {
63 error_setg_errno(errp
, EINVAL
,
64 "Channel does not support file descriptor passing");
68 if ((flags
& QIO_CHANNEL_READ_FLAG_MSG_PEEK
) &&
69 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_READ_MSG_PEEK
)) {
70 error_setg_errno(errp
, EINVAL
,
71 "Channel does not support peek read");
75 return klass
->io_readv(ioc
, iov
, niov
, fds
, nfds
, flags
, errp
);
79 ssize_t
qio_channel_writev_full(QIOChannel
*ioc
,
80 const struct iovec
*iov
,
87 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
90 if (!qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
)) {
91 error_setg_errno(errp
, EINVAL
,
92 "Channel does not support file descriptor passing");
95 if (flags
& QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
) {
96 error_setg_errno(errp
, EINVAL
,
97 "Zero Copy does not support file descriptor passing");
102 if ((flags
& QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
) &&
103 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY
)) {
104 error_setg_errno(errp
, EINVAL
,
105 "Requested Zero Copy feature is not available");
109 return klass
->io_writev(ioc
, iov
, niov
, fds
, nfds
, flags
, errp
);
113 int coroutine_mixed_fn
qio_channel_readv_all_eof(QIOChannel
*ioc
,
114 const struct iovec
*iov
,
118 return qio_channel_readv_full_all_eof(ioc
, iov
, niov
, NULL
, NULL
, errp
);
121 int coroutine_mixed_fn
qio_channel_readv_all(QIOChannel
*ioc
,
122 const struct iovec
*iov
,
126 return qio_channel_readv_full_all(ioc
, iov
, niov
, NULL
, NULL
, errp
);
129 int coroutine_mixed_fn
qio_channel_readv_full_all_eof(QIOChannel
*ioc
,
130 const struct iovec
*iov
,
132 int **fds
, size_t *nfds
,
136 struct iovec
*local_iov
= g_new(struct iovec
, niov
);
137 struct iovec
*local_iov_head
= local_iov
;
138 unsigned int nlocal_iov
= niov
;
139 int **local_fds
= fds
;
140 size_t *local_nfds
= nfds
;
141 bool partial
= false;
151 nlocal_iov
= iov_copy(local_iov
, nlocal_iov
,
153 0, iov_size(iov
, niov
));
155 while ((nlocal_iov
> 0) || local_fds
) {
157 len
= qio_channel_readv_full(ioc
, local_iov
, nlocal_iov
, local_fds
,
158 local_nfds
, 0, errp
);
159 if (len
== QIO_CHANNEL_ERR_BLOCK
) {
160 if (qemu_in_coroutine()) {
161 qio_channel_yield(ioc
, G_IO_IN
);
163 qio_channel_wait(ioc
, G_IO_IN
);
169 if (local_nfds
&& *local_nfds
) {
171 * Got some FDs, but no data yet. This isn't an EOF
172 * scenario (yet), so carry on to try to read data
173 * on next loop iteration
176 } else if (!partial
) {
177 /* No fds and no data - EOF before any data read */
183 "Unexpected end-of-file before all data were read");
184 /* Fallthrough into len < 0 handling */
189 /* Close any FDs we previously received */
192 for (i
= 0; i
< (*nfds
); i
++) {
203 iov_discard_front(&local_iov
, &nlocal_iov
, len
);
215 g_free(local_iov_head
);
219 int coroutine_mixed_fn
qio_channel_readv_full_all(QIOChannel
*ioc
,
220 const struct iovec
*iov
,
222 int **fds
, size_t *nfds
,
225 int ret
= qio_channel_readv_full_all_eof(ioc
, iov
, niov
, fds
, nfds
, errp
);
228 error_setg(errp
, "Unexpected end-of-file before all data were read");
238 int coroutine_mixed_fn
qio_channel_writev_all(QIOChannel
*ioc
,
239 const struct iovec
*iov
,
243 return qio_channel_writev_full_all(ioc
, iov
, niov
, NULL
, 0, 0, errp
);
246 int coroutine_mixed_fn
qio_channel_writev_full_all(QIOChannel
*ioc
,
247 const struct iovec
*iov
,
249 int *fds
, size_t nfds
,
250 int flags
, Error
**errp
)
253 struct iovec
*local_iov
= g_new(struct iovec
, niov
);
254 struct iovec
*local_iov_head
= local_iov
;
255 unsigned int nlocal_iov
= niov
;
257 nlocal_iov
= iov_copy(local_iov
, nlocal_iov
,
259 0, iov_size(iov
, niov
));
261 while (nlocal_iov
> 0) {
264 len
= qio_channel_writev_full(ioc
, local_iov
, nlocal_iov
, fds
,
267 if (len
== QIO_CHANNEL_ERR_BLOCK
) {
268 if (qemu_in_coroutine()) {
269 qio_channel_yield(ioc
, G_IO_OUT
);
271 qio_channel_wait(ioc
, G_IO_OUT
);
279 iov_discard_front(&local_iov
, &nlocal_iov
, len
);
287 g_free(local_iov_head
);
291 ssize_t
qio_channel_readv(QIOChannel
*ioc
,
292 const struct iovec
*iov
,
296 return qio_channel_readv_full(ioc
, iov
, niov
, NULL
, NULL
, 0, errp
);
300 ssize_t
qio_channel_writev(QIOChannel
*ioc
,
301 const struct iovec
*iov
,
305 return qio_channel_writev_full(ioc
, iov
, niov
, NULL
, 0, 0, errp
);
309 ssize_t
qio_channel_read(QIOChannel
*ioc
,
314 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
315 return qio_channel_readv_full(ioc
, &iov
, 1, NULL
, NULL
, 0, errp
);
319 ssize_t
qio_channel_write(QIOChannel
*ioc
,
324 struct iovec iov
= { .iov_base
= (char *)buf
, .iov_len
= buflen
};
325 return qio_channel_writev_full(ioc
, &iov
, 1, NULL
, 0, 0, errp
);
329 int coroutine_mixed_fn
qio_channel_read_all_eof(QIOChannel
*ioc
,
334 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
335 return qio_channel_readv_all_eof(ioc
, &iov
, 1, errp
);
339 int coroutine_mixed_fn
qio_channel_read_all(QIOChannel
*ioc
,
344 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
345 return qio_channel_readv_all(ioc
, &iov
, 1, errp
);
349 int coroutine_mixed_fn
qio_channel_write_all(QIOChannel
*ioc
,
354 struct iovec iov
= { .iov_base
= (char *)buf
, .iov_len
= buflen
};
355 return qio_channel_writev_all(ioc
, &iov
, 1, errp
);
359 int qio_channel_set_blocking(QIOChannel
*ioc
,
363 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
364 return klass
->io_set_blocking(ioc
, enabled
, errp
);
368 void qio_channel_set_follow_coroutine_ctx(QIOChannel
*ioc
, bool enabled
)
370 ioc
->follow_coroutine_ctx
= enabled
;
374 int qio_channel_close(QIOChannel
*ioc
,
377 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
378 return klass
->io_close(ioc
, errp
);
382 GSource
*qio_channel_create_watch(QIOChannel
*ioc
,
383 GIOCondition condition
)
385 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
386 GSource
*ret
= klass
->io_create_watch(ioc
, condition
);
389 g_source_set_name(ret
, ioc
->name
);
396 void qio_channel_set_aio_fd_handler(QIOChannel
*ioc
,
397 AioContext
*read_ctx
,
399 AioContext
*write_ctx
,
403 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
405 klass
->io_set_aio_fd_handler(ioc
, read_ctx
, io_read
, write_ctx
, io_write
,
409 guint
qio_channel_add_watch_full(QIOChannel
*ioc
,
410 GIOCondition condition
,
413 GDestroyNotify notify
,
414 GMainContext
*context
)
419 source
= qio_channel_create_watch(ioc
, condition
);
421 g_source_set_callback(source
, (GSourceFunc
)func
, user_data
, notify
);
423 id
= g_source_attach(source
, context
);
424 g_source_unref(source
);
429 guint
qio_channel_add_watch(QIOChannel
*ioc
,
430 GIOCondition condition
,
433 GDestroyNotify notify
)
435 return qio_channel_add_watch_full(ioc
, condition
, func
,
436 user_data
, notify
, NULL
);
439 GSource
*qio_channel_add_watch_source(QIOChannel
*ioc
,
440 GIOCondition condition
,
443 GDestroyNotify notify
,
444 GMainContext
*context
)
449 id
= qio_channel_add_watch_full(ioc
, condition
, func
,
450 user_data
, notify
, context
);
451 source
= g_main_context_find_source_by_id(context
, id
);
452 g_source_ref(source
);
457 ssize_t
qio_channel_pwritev(QIOChannel
*ioc
, const struct iovec
*iov
,
458 size_t niov
, off_t offset
, Error
**errp
)
460 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
462 if (!klass
->io_pwritev
) {
463 error_setg(errp
, "Channel does not support pwritev");
467 if (!qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_SEEKABLE
)) {
468 error_setg_errno(errp
, EINVAL
, "Requested channel is not seekable");
472 return klass
->io_pwritev(ioc
, iov
, niov
, offset
, errp
);
475 ssize_t
qio_channel_pwrite(QIOChannel
*ioc
, char *buf
, size_t buflen
,
476 off_t offset
, Error
**errp
)
483 return qio_channel_pwritev(ioc
, &iov
, 1, offset
, errp
);
486 ssize_t
qio_channel_preadv(QIOChannel
*ioc
, const struct iovec
*iov
,
487 size_t niov
, off_t offset
, Error
**errp
)
489 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
491 if (!klass
->io_preadv
) {
492 error_setg(errp
, "Channel does not support preadv");
496 if (!qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_SEEKABLE
)) {
497 error_setg_errno(errp
, EINVAL
, "Requested channel is not seekable");
501 return klass
->io_preadv(ioc
, iov
, niov
, offset
, errp
);
504 ssize_t
qio_channel_pread(QIOChannel
*ioc
, char *buf
, size_t buflen
,
505 off_t offset
, Error
**errp
)
512 return qio_channel_preadv(ioc
, &iov
, 1, offset
, errp
);
515 int qio_channel_shutdown(QIOChannel
*ioc
,
516 QIOChannelShutdown how
,
519 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
521 if (!klass
->io_shutdown
) {
522 error_setg(errp
, "Data path shutdown not supported");
526 return klass
->io_shutdown(ioc
, how
, errp
);
530 void qio_channel_set_delay(QIOChannel
*ioc
,
533 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
535 if (klass
->io_set_delay
) {
536 klass
->io_set_delay(ioc
, enabled
);
541 void qio_channel_set_cork(QIOChannel
*ioc
,
544 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
546 if (klass
->io_set_cork
) {
547 klass
->io_set_cork(ioc
, enabled
);
551 int qio_channel_get_peerpid(QIOChannel
*ioc
,
555 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
557 if (!klass
->io_peerpid
) {
558 error_setg(errp
, "Channel does not support peer pid");
561 klass
->io_peerpid(ioc
, pid
, errp
);
565 off_t
qio_channel_io_seek(QIOChannel
*ioc
,
570 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
572 if (!klass
->io_seek
) {
573 error_setg(errp
, "Channel does not support random access");
577 return klass
->io_seek(ioc
, offset
, whence
, errp
);
580 int qio_channel_flush(QIOChannel
*ioc
,
583 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
585 if (!klass
->io_flush
||
586 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY
)) {
590 return klass
->io_flush(ioc
, errp
);
594 static void qio_channel_restart_read(void *opaque
)
596 QIOChannel
*ioc
= opaque
;
597 Coroutine
*co
= qatomic_xchg(&ioc
->read_coroutine
, NULL
);
603 /* Assert that aio_co_wake() reenters the coroutine directly */
604 assert(qemu_get_current_aio_context() ==
605 qemu_coroutine_get_aio_context(co
));
609 static void qio_channel_restart_write(void *opaque
)
611 QIOChannel
*ioc
= opaque
;
612 Coroutine
*co
= qatomic_xchg(&ioc
->write_coroutine
, NULL
);
618 /* Assert that aio_co_wake() reenters the coroutine directly */
619 assert(qemu_get_current_aio_context() ==
620 qemu_coroutine_get_aio_context(co
));
624 static void coroutine_fn
625 qio_channel_set_fd_handlers(QIOChannel
*ioc
, GIOCondition condition
)
627 AioContext
*ctx
= ioc
->follow_coroutine_ctx
?
628 qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
629 iohandler_get_aio_context();
630 AioContext
*read_ctx
= NULL
;
631 IOHandler
*io_read
= NULL
;
632 AioContext
*write_ctx
= NULL
;
633 IOHandler
*io_write
= NULL
;
635 if (condition
== G_IO_IN
) {
636 ioc
->read_coroutine
= qemu_coroutine_self();
639 io_read
= qio_channel_restart_read
;
642 * Thread safety: if the other coroutine is set and its AioContext
643 * matches ours, then there is mutual exclusion between read and write
644 * because they share a single thread and it's safe to set both read
645 * and write fd handlers here. If the AioContext does not match ours,
646 * then both threads may run in parallel but there is no shared state
649 if (ioc
->write_coroutine
&& ioc
->write_ctx
== ctx
) {
651 io_write
= qio_channel_restart_write
;
653 } else if (condition
== G_IO_OUT
) {
654 ioc
->write_coroutine
= qemu_coroutine_self();
655 ioc
->write_ctx
= ctx
;
657 io_write
= qio_channel_restart_write
;
658 if (ioc
->read_coroutine
&& ioc
->read_ctx
== ctx
) {
660 io_read
= qio_channel_restart_read
;
666 qio_channel_set_aio_fd_handler(ioc
, read_ctx
, io_read
,
667 write_ctx
, io_write
, ioc
);
670 static void coroutine_fn
671 qio_channel_clear_fd_handlers(QIOChannel
*ioc
, GIOCondition condition
)
673 AioContext
*read_ctx
= NULL
;
674 IOHandler
*io_read
= NULL
;
675 AioContext
*write_ctx
= NULL
;
676 IOHandler
*io_write
= NULL
;
679 if (condition
== G_IO_IN
) {
683 if (ioc
->write_coroutine
&& ioc
->write_ctx
== ctx
) {
685 io_write
= qio_channel_restart_write
;
687 } else if (condition
== G_IO_OUT
) {
688 ctx
= ioc
->write_ctx
;
691 if (ioc
->read_coroutine
&& ioc
->read_ctx
== ctx
) {
693 io_read
= qio_channel_restart_read
;
699 qio_channel_set_aio_fd_handler(ioc
, read_ctx
, io_read
,
700 write_ctx
, io_write
, ioc
);
703 void coroutine_fn
qio_channel_yield(QIOChannel
*ioc
,
704 GIOCondition condition
)
708 assert(qemu_in_coroutine());
709 ioc_ctx
= qemu_coroutine_get_aio_context(qemu_coroutine_self());
711 if (condition
== G_IO_IN
) {
712 assert(!ioc
->read_coroutine
);
713 } else if (condition
== G_IO_OUT
) {
714 assert(!ioc
->write_coroutine
);
718 qio_channel_set_fd_handlers(ioc
, condition
);
719 qemu_coroutine_yield();
720 assert(in_aio_context_home_thread(ioc_ctx
));
722 /* Allow interrupting the operation by reentering the coroutine other than
723 * through the aio_fd_handlers. */
724 if (condition
== G_IO_IN
) {
725 assert(ioc
->read_coroutine
== NULL
);
726 } else if (condition
== G_IO_OUT
) {
727 assert(ioc
->write_coroutine
== NULL
);
729 qio_channel_clear_fd_handlers(ioc
, condition
);
732 void qio_channel_wake_read(QIOChannel
*ioc
)
734 Coroutine
*co
= qatomic_xchg(&ioc
->read_coroutine
, NULL
);
740 static gboolean
qio_channel_wait_complete(QIOChannel
*ioc
,
741 GIOCondition condition
,
744 GMainLoop
*loop
= opaque
;
746 g_main_loop_quit(loop
);
751 void qio_channel_wait(QIOChannel
*ioc
,
752 GIOCondition condition
)
754 GMainContext
*ctxt
= g_main_context_new();
755 GMainLoop
*loop
= g_main_loop_new(ctxt
, TRUE
);
758 source
= qio_channel_create_watch(ioc
, condition
);
760 g_source_set_callback(source
,
761 (GSourceFunc
)qio_channel_wait_complete
,
765 g_source_attach(source
, ctxt
);
767 g_main_loop_run(loop
);
769 g_source_unref(source
);
770 g_main_loop_unref(loop
);
771 g_main_context_unref(ctxt
);
775 static void qio_channel_finalize(Object
*obj
)
777 QIOChannel
*ioc
= QIO_CHANNEL(obj
);
779 /* Must not have coroutines in qio_channel_yield() */
780 assert(!ioc
->read_coroutine
);
781 assert(!ioc
->write_coroutine
);
787 CloseHandle(ioc
->event
);
792 static const TypeInfo qio_channel_info
= {
793 .parent
= TYPE_OBJECT
,
794 .name
= TYPE_QIO_CHANNEL
,
795 .instance_size
= sizeof(QIOChannel
),
796 .instance_finalize
= qio_channel_finalize
,
798 .class_size
= sizeof(QIOChannelClass
),
802 static void qio_channel_register_types(void)
804 type_register_static(&qio_channel_info
);
808 type_init(qio_channel_register_types
);