2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 #include "cmogstored.h"
7 * a poll/select/libev/libevent-based implementation would have a hard time
8 * migrating clients between threads
12 struct mog_queue
* mog_queue_new(void)
14 int kqueue_fd
= kqueue();
17 die_errno("kqueue() failed");
19 return mog_queue_init(kqueue_fd
);
23 * grabs one active event off the event queue
25 struct mog_fd
* mog_idleq_wait(struct mog_queue
*q
, int timeout
)
32 bool cancellable
= timeout
!= 0;
37 ts
.tv_sec
= timeout
/ 1000;
38 ts
.tv_nsec
= (timeout
% 1000) * 1000000;
43 * we enable SIGURG from pthread_kill() in thrpool.c when sleeping
44 * in kevent(). This allows us to wake up an respond to a
45 * cancellation request (since kevent() is not a cancellation point).
50 rc
= kevent(q
->queue_fd
, NULL
, 0, &event
, 1, tsp
);
54 mog_fd_check_out(mfd
);
56 /* ignore pending cancel until the next round */
65 die_errno("kevent(wait) failed with (%d)", rc
);
70 struct mog_fd
* mog_idleq_wait_intr(struct mog_queue
*q
, int timeout
)
74 /* this is racy, using a self-pipe covers the race */
76 mfd
= mog_idleq_wait(q
, timeout
);
81 MOG_NOINLINE
static void
82 kevent_add_error(struct mog_queue
*q
, struct mog_fd
*mfd
)
87 "kevent(EV_ADD) out-of-space, dropping file descriptor");
91 syslog(LOG_ERR
, "unhandled kevent(EV_ADD) error: %m");
92 assert(0 && "BUG in our usage of kevent");
96 static int add_event(int kqueue_fd
, struct kevent
*event
)
101 rc
= kevent(kqueue_fd
, event
, 1, NULL
, 0, NULL
);
102 } while (rc
< 0 && errno
== EINTR
);
107 static void qpush(struct mog_queue
*q
, struct mog_fd
*mfd
, enum mog_qev ev
)
110 u_short flags
= EV_ADD
| EV_ONESHOT
;
112 EV_SET(&event
, mfd
->fd
, (short)ev
, flags
, 0, 0, mfd
);
114 mog_fd_check_in(mfd
);
115 if (add_event(q
->queue_fd
, &event
) != 0) {
116 mog_fd_check_out(mfd
);
117 kevent_add_error(q
, mfd
);
122 * Pushes in one mog_fd for kqueue to watch.
124 * Only call this with MOG_QEV_RW *or* if EAGAIN/EWOULDBLOCK is
125 * encountered in mog_queue_loop.
127 void mog_idleq_push(struct mog_queue
*q
, struct mog_fd
*mfd
, enum mog_qev ev
)
129 if (ev
== MOG_QEV_RW
) {
130 switch (mfd
->fd_type
) {
131 case MOG_FD_TYPE_IOSTAT
:
132 case MOG_FD_TYPE_SELFWAKE
:
135 case MOG_FD_TYPE_UNUSED
:
136 case MOG_FD_TYPE_ACCEPT
:
137 case MOG_FD_TYPE_FILE
:
138 case MOG_FD_TYPE_QUEUE
:
139 case MOG_FD_TYPE_SVC
:
140 assert(0 && "invalid fd_type for mog_idleq_push");
149 void mog_idleq_add(struct mog_queue
*q
, struct mog_fd
*mfd
, enum mog_qev ev
)
151 mog_idleq_push(q
, mfd
, ev
);
155 mog_queue_xchg(struct mog_queue
*q
, struct mog_fd
*mfd
, enum mog_qev ev
)
158 * kqueue() _should_ be able to implement this function with
159 * one syscall, however, we currently rely on mog_idleq_wait()
160 * being a cancellation point. So we must ensure the mfd is
161 * back in the queue (for other threads to access) before
162 * cancelling this thread...
164 mog_idleq_push(q
, mfd
, ev
);
166 return mog_idleq_wait(q
, -1);
168 #else /* ! HAVE_KQUEUE */
169 typedef int avoid_empty_file
;
170 #endif /* ! HAVE_KQUEUE */