ssa: don't swap the arguments of comparison operators
[ajla.git] / iomux_kq.c
blob36c1235ab96022a821c8f30db5f41a957a4ed567
1 /*
2 * Copyright (C) 2024 Mikulas Patocka
4 * This file is part of Ajla.
6 * Ajla is free software: you can redistribute it and/or modify it under the
7 * terms of the GNU General Public License as published by the Free Software
8 * Foundation, either version 3 of the License, or (at your option) any later
9 * version.
11 * Ajla is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along with
16 * Ajla. If not, see <https://www.gnu.org/licenses/>.
19 #include "ajla.h"
21 #include "list.h"
22 #include "mem_al.h"
23 #include "thread.h"
24 #include "addrlock.h"
25 #include "rwlock.h"
26 #include "str.h"
27 #include "os.h"
28 #include "timer.h"
29 #include "obj_reg.h"
31 #include "iomux.h"
33 #ifdef IOMUX_KQUEUE
35 #include <unistd.h>
36 #include <sys/poll.h>
37 #include <sys/event.h>
39 #define KQUEUE_MAX_EVENTS 64
41 struct iomux_wait {
42 struct list wait_list[3];
43 uint64_t seq;
44 handle_t self;
47 static handle_t kq_fd;
49 static void iomux_wait_init(struct iomux_wait *iow, handle_t handle)
51 list_init(&iow->wait_list[0]);
52 list_init(&iow->wait_list[1]);
53 list_init(&iow->wait_list[2]);
54 iow->seq = 0;
55 iow->self = handle;
58 #include "iomux.inc"
60 void iomux_register_wait(handle_t handle, bool wr, mutex_t **mutex_to_lock, struct list *list_entry)
62 int r;
63 struct iomux_wait *iow = iomux_get_iowait(handle);
64 struct kevent event;
66 EV_SET(&event, handle, !wr ? EVFILT_READ : EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, 0);
68 address_lock(iow, DEPTH_THUNK);
70 EINTR_LOOP(r, kevent(kq_fd, &event, 1, NULL, 0, NULL));
71 if (unlikely(r == -1)) {
72 int er = errno;
73 fatal("kevent failed adding a new handle: %d, %s", er, error_decode(error_from_errno(EC_SYSCALL, er)));
76 *mutex_to_lock = address_get_mutex(iow, DEPTH_THUNK);
77 list_add(&iow->wait_list[(int)wr], list_entry);
79 address_unlock(iow, DEPTH_THUNK);
81 #if !defined(THREAD_NONE) && defined(__APPLE__)
82 os_notify();
83 #endif
87 bool iomux_test_handle(handle_t handle, bool wr)
89 struct pollfd p;
90 int r;
91 p.fd = handle;
92 p.events = !wr ? POLLIN : POLLOUT;
93 again:
94 EINTR_LOOP(r, poll(&p, 1, 0));
95 if (unlikely(r == -1)) {
96 int er = errno;
97 if (er == EAGAIN)
98 goto again;
99 fatal("poll failed: %d, %s", er, error_decode(error_from_errno(EC_SYSCALL, er)));
101 return !!r;
105 bool iomux_directory_handle_alloc(dir_handle_t attr_unused handle, notify_handle_t attr_unused *h, uint64_t attr_unused *seq, ajla_error_t *err)
107 #ifndef NO_DIR_HANDLES
108 int r;
109 int newfd;
110 struct iomux_wait *iow;
111 struct kevent event;
113 EINTR_LOOP(newfd, dup(handle));
114 if (unlikely(newfd == -1)) {
115 ajla_error_t e = error_from_errno(EC_SYSCALL, errno);
116 fatal_mayfail(e, err, "dup failed: %s", error_decode(e));
117 return false;
119 iow = iomux_get_iowait(newfd);
121 EV_SET(&event, newfd, EVFILT_VNODE, EV_ADD | EV_ONESHOT, NOTE_WRITE, 0, 0);
123 address_lock(iow, DEPTH_THUNK);
125 EINTR_LOOP(r, kevent(kq_fd, &event, 1, NULL, 0, NULL));
126 if (unlikely(r == -1)) {
127 ajla_error_t e = error_from_errno(EC_SYSCALL, errno);
128 address_unlock(iow, DEPTH_THUNK);
129 os_close_handle(newfd);
130 fatal_mayfail(e, err, "adding a directory watch failed: %s", error_decode(e));
131 return false;
134 *h = iow;
135 *seq = iow->seq;
137 address_unlock(iow, DEPTH_THUNK);
138 #if !defined(THREAD_NONE) && defined(__APPLE__)
139 os_notify();
140 #endif
141 return true;
142 #else
143 fatal_mayfail(error_ajla(EC_SYNC, AJLA_ERROR_NOT_SUPPORTED), err, "directory monitoring not supported");
144 return false;
145 #endif
148 bool iomux_directory_handle_wait(notify_handle_t h, uint64_t seq, mutex_t **mutex_to_lock, struct list *list_entry)
150 struct iomux_wait *iow = h;
152 address_lock(iow, DEPTH_THUNK);
153 if (iow->seq != seq) {
154 address_unlock(iow, DEPTH_THUNK);
155 return true;
158 *mutex_to_lock = address_get_mutex(iow, DEPTH_THUNK);
159 list_add(&iow->wait_list[2], list_entry);
160 address_unlock(iow, DEPTH_THUNK);
162 return false;
165 void iomux_directory_handle_free(notify_handle_t h)
167 struct iomux_wait *iow = h;
168 os_close_handle(iow->self);
172 void iomux_check_all(uint32_t us)
174 struct kevent events[KQUEUE_MAX_EVENTS];
175 int n_ev, i;
176 struct timespec ts;
178 us = iomux_get_time(us);
180 if (us != IOMUX_INDEFINITE_WAIT) {
181 ts.tv_sec = us / 1000000;
182 ts.tv_nsec = us % 1000000 * 1000;
185 n_ev = kevent(kq_fd, NULL, 0, events, KQUEUE_MAX_EVENTS, us != IOMUX_INDEFINITE_WAIT ? &ts : NULL);
186 if (unlikely(n_ev == -1)) {
187 int er;
188 if (likely(errno == EINTR))
189 goto no_events;
190 er = errno;
191 fatal("kevent failed getting events: %d, %s", er, error_decode(error_from_errno(EC_SYSCALL, er)));
194 rwlock_lock_read(&iomux_rwlock);
195 for (i = 0; i < n_ev; i++) {
196 struct kevent *ev = &events[i];
197 handle_t handle = ev->ident;
198 struct iomux_wait *iow;
199 if (handle == os_notify_pipe[0]) {
200 #ifdef THREAD_NONE
201 os_drain_notify_pipe();
202 #endif
203 continue;
205 iow = iowait_directory[handle];
206 /*debug("handle: %d, iow: %p", handle, iow);*/
208 address_lock(iow, DEPTH_THUNK);
209 if (ev->filter == EVFILT_READ) {
210 call(wake_up_wait_list)(&iow->wait_list[0], address_get_mutex(iow, DEPTH_THUNK), true);
211 } else if (ev->filter == EVFILT_WRITE) {
212 call(wake_up_wait_list)(&iow->wait_list[1], address_get_mutex(iow, DEPTH_THUNK), true);
213 } else if (ev->filter == EVFILT_VNODE) {
214 iow->seq++;
215 call(wake_up_wait_list)(&iow->wait_list[2], address_get_mutex(iow, DEPTH_THUNK), true);
216 } else {
217 fatal("kevent returned unknown event %d", ev->filter);
220 rwlock_unlock_read(&iomux_rwlock);
222 no_events:;
226 void iomux_init(void)
228 struct kevent pipe_ev;
229 int r;
230 rwlock_init(&iomux_rwlock);
231 array_init(struct iomux_wait *, &iowait_directory, &iowait_directory_size);
233 EINTR_LOOP(kq_fd, kqueue());
234 if (unlikely(kq_fd == -1)) {
235 int er = errno;
236 fatal("kqueue failed: %d, %s", er, error_decode(error_from_errno(EC_SYSCALL, er)));
238 os_set_cloexec(kq_fd);
239 obj_registry_insert(OBJ_TYPE_HANDLE, kq_fd, file_line);
241 EV_SET(&pipe_ev, os_notify_pipe[0], EVFILT_READ, EV_ADD, 0, 0, 0);
242 EINTR_LOOP(r, kevent(kq_fd, &pipe_ev, 1, NULL, 0, NULL));
243 if (unlikely(r == -1)) {
244 int er = errno;
245 fatal("kevent failed adding notify pipe: %d, %s", er, error_decode(error_from_errno(EC_SYSCALL, er)));
247 #ifndef THREAD_NONE
248 thread_spawn(&iomux_thread, iomux_poll_thread, NULL, PRIORITY_IO, NULL);
249 #endif
252 void iomux_done(void)
254 struct kevent pipe_ev;
255 int r;
256 size_t h;
257 os_shutdown_notify_pipe();
258 #ifndef THREAD_NONE
259 thread_join(&iomux_thread);
260 #endif
261 EV_SET(&pipe_ev, os_notify_pipe[0], EVFILT_READ, EV_DELETE, 0, 0, 0);
262 EINTR_LOOP(r, kevent(kq_fd, &pipe_ev, 1, NULL, 0, NULL));
263 if (unlikely(r == -1)) {
264 int er = errno;
265 fatal("kevent failed removing notify pipe: %d, %s", er, error_decode(error_from_errno(EC_SYSCALL, er)));
267 os_close(kq_fd);
269 for (h = 0; h < iowait_directory_size; h++)
270 if (iowait_directory[h])
271 mem_free(iowait_directory[h]);
272 mem_free(iowait_directory);
273 rwlock_done(&iomux_rwlock);
276 #endif