etc/services - sync with NetBSD-8
[minix.git] / minix / lib / libblockdriver / driver_mt.c
blobf65a6544e6fd37e8b7a8a30992a7bb32075084e7
1 /* This file contains the multithreaded driver interface.
3 * Changes:
4 * Aug 27, 2011 created (A. Welzel)
6 * The entry points into this file are:
7 * blockdriver_mt_task: the main message loop of the driver
8 * blockdriver_mt_terminate: break out of the main message loop
9 * blockdriver_mt_sleep: put the current thread to sleep
10 * blockdriver_mt_wakeup: wake up a sleeping thread
11 * blockdriver_mt_set_workers:set the number of worker threads
14 #include <minix/blockdriver_mt.h>
15 #include <minix/mthread.h>
16 #include <assert.h>
18 #include "const.h"
19 #include "driver.h"
20 #include "driver_mt.h"
21 #include "mq.h"
23 /* A thread ID is composed of a device ID and a per-device worker thread ID.
24 * All thread IDs must be in the range 0..(MAX_THREADS-1) inclusive.
26 #define MAKE_TID(did, wid) ((did) * MAX_WORKERS + (wid))
27 #define TID_DEVICE(tid) ((tid) / MAX_WORKERS)
28 #define TID_WORKER(tid) ((tid) % MAX_WORKERS)
30 typedef unsigned int worker_id_t;
32 typedef enum {
33 STATE_DEAD,
34 STATE_RUNNING,
35 STATE_BUSY,
36 STATE_EXITED
37 } worker_state;
39 /* Structure with information about a worker thread. */
40 typedef struct {
41 device_id_t device_id;
42 worker_id_t worker_id;
43 worker_state state;
44 mthread_thread_t mthread;
45 mthread_event_t sleep_event;
46 } worker_t;
48 /* Structure with information about a device. */
49 typedef struct {
50 device_id_t id;
51 unsigned int workers;
52 worker_t worker[MAX_WORKERS];
53 mthread_event_t queue_event;
54 mthread_rwlock_t barrier;
55 } device_t;
57 static struct blockdriver *bdtab;
58 static int running = FALSE;
60 static mthread_key_t worker_key;
62 static device_t device[MAX_DEVICES];
64 static worker_t *exited[MAX_THREADS];
65 static int num_exited = 0;
67 /*===========================================================================*
68 * enqueue *
69 *===========================================================================*/
70 static void enqueue(device_t *dp, const message *m_src, int ipc_status)
72 /* Enqueue a message into the device's queue, and signal the event.
73 * Must be called from the master thread.
76 if (!mq_enqueue(dp->id, m_src, ipc_status))
77 panic("blockdriver_mt: enqueue failed (message queue full)");
79 mthread_event_fire(&dp->queue_event);
82 /*===========================================================================*
83 * try_dequeue *
84 *===========================================================================*/
85 static int try_dequeue(device_t *dp, message *m_dst, int *ipc_status)
87 /* See if a message can be dequeued from the current worker thread's device
88 * queue. If so, dequeue the message and return TRUE. If not, return FALSE.
89 * Must be called from a worker thread. Does not block.
92 return mq_dequeue(dp->id, m_dst, ipc_status);
95 /*===========================================================================*
96 * dequeue *
97 *===========================================================================*/
98 static int dequeue(device_t *dp, worker_t *wp, message *m_dst,
99 int *ipc_status)
101 /* Dequeue a message from the current worker thread's device queue. Block the
102 * current thread if necessary. Must be called from a worker thread. Either
103 * succeeds with a message (TRUE) or indicates that the thread should be
104 * terminated (FALSE).
107 do {
108 mthread_event_wait(&dp->queue_event);
110 /* If we were woken up as a result of terminate or set_workers, break
111 * out of the loop and terminate the thread.
113 if (!running || wp->worker_id >= dp->workers)
114 return FALSE;
115 } while (!try_dequeue(dp, m_dst, ipc_status));
117 return TRUE;
120 /*===========================================================================*
121 * is_transfer_req *
122 *===========================================================================*/
123 static int is_transfer_req(int type)
125 /* Return whether the given block device request is a transfer request.
128 switch (type) {
129 case BDEV_READ:
130 case BDEV_WRITE:
131 case BDEV_GATHER:
132 case BDEV_SCATTER:
133 return TRUE;
135 default:
136 return FALSE;
140 /*===========================================================================*
141 * worker_thread *
142 *===========================================================================*/
143 static void *worker_thread(void *param)
145 /* The worker thread loop. Set up the thread-specific reference to itself and
146 * start looping. The loop consists of blocking dequeing and handling messages.
147 * After handling a message, the thread might have been stopped, so we check
148 * for this condition and exit if so.
150 worker_t *wp;
151 device_t *dp;
152 thread_id_t tid;
153 message m;
154 int ipc_status;
156 wp = (worker_t *) param;
157 assert(wp != NULL);
158 dp = &device[wp->device_id];
159 tid = MAKE_TID(wp->device_id, wp->worker_id);
161 if (mthread_setspecific(worker_key, wp))
162 panic("blockdriver_mt: could not save local thread pointer");
164 while (running && wp->worker_id < dp->workers) {
166 /* See if a new message is available right away. */
167 if (!try_dequeue(dp, &m, &ipc_status)) {
169 /* If not, block waiting for a new message or a thread
170 * termination event.
172 if (!dequeue(dp, wp, &m, &ipc_status))
173 break;
176 /* Even if the thread was stopped before, a new message resumes it. */
177 wp->state = STATE_BUSY;
179 /* If the request is a transfer request, we acquire the read barrier
180 * lock. Otherwise, we acquire the write lock.
182 if (is_transfer_req(m.m_type))
183 mthread_rwlock_rdlock(&dp->barrier);
184 else
185 mthread_rwlock_wrlock(&dp->barrier);
187 /* Handle the request and send a reply. */
188 blockdriver_process_on_thread(bdtab, &m, ipc_status, tid);
190 /* Switch the thread back to running state, and unlock the barrier. */
191 wp->state = STATE_RUNNING;
192 mthread_rwlock_unlock(&dp->barrier);
195 /* Clean up and terminate this thread. */
196 if (mthread_setspecific(worker_key, NULL))
197 panic("blockdriver_mt: could not delete local thread pointer");
199 wp->state = STATE_EXITED;
201 exited[num_exited++] = wp;
203 return NULL;
206 /*===========================================================================*
207 * master_create_worker *
208 *===========================================================================*/
209 static void master_create_worker(worker_t *wp, worker_id_t worker_id,
210 device_id_t device_id)
212 /* Start a new worker thread.
214 mthread_attr_t attr;
215 int r;
217 wp->device_id = device_id;
218 wp->worker_id = worker_id;
219 wp->state = STATE_RUNNING;
221 /* Initialize synchronization primitives. */
222 mthread_event_init(&wp->sleep_event);
224 r = mthread_attr_init(&attr);
225 if (r != 0)
226 panic("blockdriver_mt: could not initialize attributes (%d)", r);
228 r = mthread_attr_setstacksize(&attr, STACK_SIZE);
229 if (r != 0)
230 panic("blockdriver_mt: could not set stack size (%d)", r);
232 r = mthread_create(&wp->mthread, &attr, worker_thread, (void *) wp);
233 if (r != 0)
234 panic("blockdriver_mt: could not start thread %d (%d)", worker_id, r);
236 mthread_attr_destroy(&attr);
239 /*===========================================================================*
240 * master_destroy_worker *
241 *===========================================================================*/
242 static void master_destroy_worker(worker_t *wp)
244 /* Clean up resources used by an exited worker thread.
247 assert(wp != NULL);
248 assert(wp->state == STATE_EXITED);
250 /* Join the thread. */
251 if (mthread_join(wp->mthread, NULL))
252 panic("blockdriver_mt: could not join thread %d", wp->worker_id);
254 /* Destroy resources. */
255 mthread_event_destroy(&wp->sleep_event);
257 wp->state = STATE_DEAD;
260 /*===========================================================================*
261 * master_handle_exits *
262 *===========================================================================*/
263 static void master_handle_exits(void)
265 /* Destroy the remains of all exited threads.
267 int i;
269 for (i = 0; i < num_exited; i++)
270 master_destroy_worker(exited[i]);
272 num_exited = 0;
275 /*===========================================================================*
276 * master_yield *
277 *===========================================================================*/
278 static void master_yield(void)
280 /* Let worker threads run, and clean up any exited threads.
283 mthread_yield_all();
285 if (num_exited > 0)
286 master_handle_exits();
289 /*===========================================================================*
290 * master_handle_message *
291 *===========================================================================*/
292 static void master_handle_message(message *m_ptr, int ipc_status)
294 /* For real request messages, query the device ID, start a thread if none is
295 * free and the maximum number of threads for that device has not yet been
296 * reached, and enqueue the message in the devices's message queue. All other
297 * messages are handled immediately from the main thread.
299 device_id_t id;
300 worker_t *wp;
301 device_t *dp;
302 unsigned int wid;
303 int r;
305 /* If this is not a block driver request, we cannot get the minor device
306 * associated with it, and thus we can not tell which thread should process
307 * it either. In that case, the master thread has to handle it instead.
309 if (is_ipc_notify(ipc_status) || !IS_BDEV_RQ(m_ptr->m_type)) {
310 /* Process as 'other' message. */
311 blockdriver_process_on_thread(bdtab, m_ptr, ipc_status, MAIN_THREAD);
313 return;
316 /* Query the device ID. Upon failure, send the error code to the caller. */
317 r = (*bdtab->bdr_device)(m_ptr->m_lbdev_lblockdriver_msg.minor, &id);
318 if (r != OK) {
319 blockdriver_reply(m_ptr, ipc_status, r);
321 return;
324 /* Look up the device control block. */
325 assert(id >= 0 && id < MAX_DEVICES);
326 dp = &device[id];
328 /* Find the first non-busy worker thread. */
329 for (wid = 0; wid < dp->workers; wid++)
330 if (dp->worker[wid].state != STATE_BUSY)
331 break;
333 /* If the worker thread is dead, start a thread now, unless we have already
334 * reached the maximum number of threads.
336 if (wid < dp->workers) {
337 wp = &dp->worker[wid];
339 assert(wp->state != STATE_EXITED);
341 /* If the non-busy thread has not yet been created, create one now. */
342 if (wp->state == STATE_DEAD)
343 master_create_worker(wp, wid, dp->id);
346 /* Enqueue the message at the device queue. */
347 enqueue(dp, m_ptr, ipc_status);
350 /*===========================================================================*
351 * master_init *
352 *===========================================================================*/
353 static void master_init(struct blockdriver *bdp)
355 /* Initialize the state of the master thread.
357 int i, j;
359 assert(bdp != NULL);
360 assert(bdp->bdr_device != NULL);
362 bdtab = bdp;
364 /* Initialize device-specific data structures. */
365 for (i = 0; i < MAX_DEVICES; i++) {
366 device[i].id = i;
367 device[i].workers = 1;
368 mthread_event_init(&device[i].queue_event);
369 mthread_rwlock_init(&device[i].barrier);
371 for (j = 0; j < MAX_WORKERS; j++)
372 device[i].worker[j].state = STATE_DEAD;
375 /* Initialize a per-thread key, where each worker thread stores its own
376 * reference to the worker structure.
378 if (mthread_key_create(&worker_key, NULL))
379 panic("blockdriver_mt: error initializing worker key");
382 /*===========================================================================*
383 * blockdriver_mt_get_tid *
384 *===========================================================================*/
385 thread_id_t blockdriver_mt_get_tid(void)
387 /* Return back the ID of this thread.
389 worker_t *wp;
391 wp = (worker_t *) mthread_getspecific(worker_key);
393 if (wp == NULL)
394 panic("blockdriver_mt: master thread cannot query thread ID\n");
396 return MAKE_TID(wp->device_id, wp->worker_id);
399 /*===========================================================================*
400 * blockdriver_mt_receive *
401 *===========================================================================*/
402 static void blockdriver_mt_receive(message *m_ptr, int *ipc_status)
404 /* Receive a message.
406 int r;
408 r = sef_receive_status(ANY, m_ptr, ipc_status);
410 if (r != OK)
411 panic("blockdriver_mt: sef_receive_status() returned %d", r);
414 /*===========================================================================*
415 * blockdriver_mt_task *
416 *===========================================================================*/
417 void blockdriver_mt_task(struct blockdriver *driver_tab)
419 /* The multithreaded driver task.
421 int ipc_status, i;
422 message mess;
424 /* Initialize first if necessary. */
425 if (!running) {
426 master_init(driver_tab);
428 running = TRUE;
431 /* The main message loop. */
432 while (running) {
433 /* Receive a message. */
434 blockdriver_mt_receive(&mess, &ipc_status);
436 /* Dispatch the message. */
437 master_handle_message(&mess, ipc_status);
439 /* Let worker threads run. */
440 master_yield();
443 /* Free up resources. */
444 for (i = 0; i < MAX_DEVICES; i++)
445 mthread_event_destroy(&device[i].queue_event);
448 /*===========================================================================*
449 * blockdriver_mt_terminate *
450 *===========================================================================*/
451 void blockdriver_mt_terminate(void)
453 /* Instruct libblockdriver to shut down.
456 running = FALSE;
459 /*===========================================================================*
460 * blockdriver_mt_sleep *
461 *===========================================================================*/
462 void blockdriver_mt_sleep(void)
464 /* Let the current thread sleep until it gets woken up by the master thread.
466 worker_t *wp;
468 wp = (worker_t *) mthread_getspecific(worker_key);
470 if (wp == NULL)
471 panic("blockdriver_mt: master thread cannot sleep");
473 mthread_event_wait(&wp->sleep_event);
476 /*===========================================================================*
477 * blockdriver_mt_wakeup *
478 *===========================================================================*/
479 void blockdriver_mt_wakeup(thread_id_t id)
481 /* Wake up a sleeping worker thread from the master thread.
483 worker_t *wp;
484 device_id_t device_id;
485 worker_id_t worker_id;
487 device_id = TID_DEVICE(id);
488 worker_id = TID_WORKER(id);
490 assert(device_id >= 0 && device_id < MAX_DEVICES);
491 assert(worker_id < MAX_WORKERS);
493 wp = &device[device_id].worker[worker_id];
495 assert(wp->state == STATE_RUNNING || wp->state == STATE_BUSY);
497 mthread_event_fire(&wp->sleep_event);
500 /*===========================================================================*
501 * blockdriver_mt_set_workers *
502 *===========================================================================*/
503 void blockdriver_mt_set_workers(device_id_t id, unsigned int workers)
505 /* Set the number of worker threads for the given device.
507 device_t *dp;
509 assert(id >= 0 && id < MAX_DEVICES);
511 if (workers > MAX_WORKERS)
512 workers = MAX_WORKERS;
514 dp = &device[id];
516 /* If we are cleaning up, wake up all threads waiting on a queue event. */
517 if (workers == 1 && dp->workers > workers)
518 mthread_event_fire_all(&dp->queue_event);
520 dp->workers = workers;
523 /*===========================================================================*
524 * blockdriver_mt_is_idle *
525 *===========================================================================*/
526 int blockdriver_mt_is_idle(void)
528 /* Return whether the block driver is idle. This means that it has no enqueued
529 * requests and no busy worker threads. Used for live update functionality.
531 unsigned int did, wid;
533 for (did = 0; did < MAX_DEVICES; did++) {
534 if (!mq_isempty(did))
535 return FALSE;
537 for (wid = 0; wid < device[did].workers; wid++)
538 if (device[did].worker[wid].state == STATE_BUSY)
539 return FALSE;
542 return TRUE;
545 /*===========================================================================*
546 * blockdriver_mt_suspend *
547 *===========================================================================*/
548 void blockdriver_mt_suspend(void)
550 /* Suspend the driver operation in order to facilitate a live update.
551 * Suspension involves shutting down all worker threads, because transferring
552 * thread stacks is currently not supported by the state transfer framework.
554 unsigned int did;
556 assert(running);
557 assert(blockdriver_mt_is_idle());
559 /* We terminate the worker threads by simulating a driver shutdown. */
560 running = FALSE;
562 for (did = 0; did < MAX_DEVICES; did++)
563 mthread_event_fire_all(&device[did].queue_event);
565 master_yield();
568 /*===========================================================================*
569 * blockdriver_mt_resume *
570 *===========================================================================*/
571 void blockdriver_mt_resume(void)
573 /* Resume regular operation after a (successful or failed) live update. We do
574 * not recreate worker threads; instead, they are recreated lazily as new
575 * requests come in.
578 assert(!running);
580 running = TRUE;