tools/llvm: Do not build with symbols
[minix3.git] / minix / lib / libblockdriver / driver_mt.c
blob66d69485426fbaf4f3c55a5586f8c4d016ec526b
1 /* This file contains the multithreaded driver interface.
3 * Changes:
4 * Aug 27, 2011 created (A. Welzel)
6 * The entry points into this file are:
7 * blockdriver_mt_task: the main message loop of the driver
8 * blockdriver_mt_terminate: break out of the main message loop
9 * blockdriver_mt_sleep: put the current thread to sleep
10 * blockdriver_mt_wakeup: wake up a sleeping thread
11 * blockdriver_mt_set_workers:set the number of worker threads
14 #include <minix/blockdriver_mt.h>
15 #include <minix/mthread.h>
16 #include <assert.h>
18 #include "const.h"
19 #include "driver.h"
20 #include "mq.h"
22 /* A thread ID is composed of a device ID and a per-device worker thread ID.
23 * All thread IDs must be in the range 0..(MAX_THREADS-1) inclusive.
25 #define MAKE_TID(did, wid) ((did) * MAX_WORKERS + (wid))
26 #define TID_DEVICE(tid) ((tid) / MAX_WORKERS)
27 #define TID_WORKER(tid) ((tid) % MAX_WORKERS)
29 typedef int worker_id_t;
31 typedef enum {
32 STATE_DEAD,
33 STATE_RUNNING,
34 STATE_BUSY,
35 STATE_EXITED
36 } worker_state;
38 /* Structure with information about a worker thread. */
39 typedef struct {
40 device_id_t device_id;
41 worker_id_t worker_id;
42 worker_state state;
43 mthread_thread_t mthread;
44 mthread_event_t sleep_event;
45 } worker_t;
47 /* Structure with information about a device. */
48 typedef struct {
49 device_id_t id;
50 unsigned int workers;
51 worker_t worker[MAX_WORKERS];
52 mthread_event_t queue_event;
53 mthread_rwlock_t barrier;
54 } device_t;
56 static struct blockdriver *bdtab;
57 static int running = FALSE;
59 static mthread_key_t worker_key;
61 static device_t device[MAX_DEVICES];
63 static worker_t *exited[MAX_THREADS];
64 static int num_exited = 0;
66 /*===========================================================================*
67 * enqueue *
68 *===========================================================================*/
69 static void enqueue(device_t *dp, const message *m_src, int ipc_status)
71 /* Enqueue a message into the device's queue, and signal the event.
72 * Must be called from the master thread.
75 if (!mq_enqueue(dp->id, m_src, ipc_status))
76 panic("blockdriver_mt: enqueue failed (message queue full)");
78 mthread_event_fire(&dp->queue_event);
81 /*===========================================================================*
82 * try_dequeue *
83 *===========================================================================*/
84 static int try_dequeue(device_t *dp, message *m_dst, int *ipc_status)
86 /* See if a message can be dequeued from the current worker thread's device
87 * queue. If so, dequeue the message and return TRUE. If not, return FALSE.
88 * Must be called from a worker thread. Does not block.
91 return mq_dequeue(dp->id, m_dst, ipc_status);
94 /*===========================================================================*
95 * dequeue *
96 *===========================================================================*/
97 static int dequeue(device_t *dp, worker_t *wp, message *m_dst,
98 int *ipc_status)
100 /* Dequeue a message from the current worker thread's device queue. Block the
101 * current thread if necessary. Must be called from a worker thread. Either
102 * succeeds with a message (TRUE) or indicates that the thread should be
103 * terminated (FALSE).
106 do {
107 mthread_event_wait(&dp->queue_event);
109 /* If we were woken up as a result of terminate or set_workers, break
110 * out of the loop and terminate the thread.
112 if (!running || wp->worker_id >= dp->workers)
113 return FALSE;
114 } while (!try_dequeue(dp, m_dst, ipc_status));
116 return TRUE;
119 /*===========================================================================*
120 * is_transfer_req *
121 *===========================================================================*/
122 static int is_transfer_req(int type)
124 /* Return whether the given block device request is a transfer request.
127 switch (type) {
128 case BDEV_READ:
129 case BDEV_WRITE:
130 case BDEV_GATHER:
131 case BDEV_SCATTER:
132 return TRUE;
134 default:
135 return FALSE;
139 /*===========================================================================*
140 * worker_thread *
141 *===========================================================================*/
142 static void *worker_thread(void *param)
144 /* The worker thread loop. Set up the thread-specific reference to itself and
145 * start looping. The loop consists of blocking dequeing and handling messages.
146 * After handling a message, the thread might have been stopped, so we check
147 * for this condition and exit if so.
149 worker_t *wp;
150 device_t *dp;
151 thread_id_t tid;
152 message m;
153 int ipc_status, r;
155 wp = (worker_t *) param;
156 assert(wp != NULL);
157 dp = &device[wp->device_id];
158 tid = MAKE_TID(wp->device_id, wp->worker_id);
160 if (mthread_setspecific(worker_key, wp))
161 panic("blockdriver_mt: could not save local thread pointer");
163 while (running && wp->worker_id < dp->workers) {
165 /* See if a new message is available right away. */
166 if (!try_dequeue(dp, &m, &ipc_status)) {
168 /* If not, block waiting for a new message or a thread
169 * termination event.
171 if (!dequeue(dp, wp, &m, &ipc_status))
172 break;
175 /* Even if the thread was stopped before, a new message resumes it. */
176 wp->state = STATE_BUSY;
178 /* If the request is a transfer request, we acquire the read barrier
179 * lock. Otherwise, we acquire the write lock.
181 if (is_transfer_req(m.m_type))
182 mthread_rwlock_rdlock(&dp->barrier);
183 else
184 mthread_rwlock_wrlock(&dp->barrier);
186 /* Handle the request and send a reply. */
187 blockdriver_process_on_thread(bdtab, &m, ipc_status, tid);
189 /* Switch the thread back to running state, and unlock the barrier. */
190 wp->state = STATE_RUNNING;
191 mthread_rwlock_unlock(&dp->barrier);
194 /* Clean up and terminate this thread. */
195 if (mthread_setspecific(worker_key, NULL))
196 panic("blockdriver_mt: could not delete local thread pointer");
198 wp->state = STATE_EXITED;
200 exited[num_exited++] = wp;
202 return NULL;
205 /*===========================================================================*
206 * master_create_worker *
207 *===========================================================================*/
208 static void master_create_worker(worker_t *wp, worker_id_t worker_id,
209 device_id_t device_id)
211 /* Start a new worker thread.
213 mthread_attr_t attr;
214 int r;
216 wp->device_id = device_id;
217 wp->worker_id = worker_id;
218 wp->state = STATE_RUNNING;
220 /* Initialize synchronization primitives. */
221 mthread_event_init(&wp->sleep_event);
223 r = mthread_attr_init(&attr);
224 if (r != 0)
225 panic("blockdriver_mt: could not initialize attributes (%d)", r);
227 r = mthread_attr_setstacksize(&attr, STACK_SIZE);
228 if (r != 0)
229 panic("blockdriver_mt: could not set stack size (%d)", r);
231 r = mthread_create(&wp->mthread, &attr, worker_thread, (void *) wp);
232 if (r != 0)
233 panic("blockdriver_mt: could not start thread %d (%d)", worker_id, r);
235 mthread_attr_destroy(&attr);
238 /*===========================================================================*
239 * master_destroy_worker *
240 *===========================================================================*/
241 static void master_destroy_worker(worker_t *wp)
243 /* Clean up resources used by an exited worker thread.
246 assert(wp != NULL);
247 assert(wp->state == STATE_EXITED);
249 /* Join the thread. */
250 if (mthread_join(wp->mthread, NULL))
251 panic("blockdriver_mt: could not join thread %d", wp->worker_id);
253 /* Destroy resources. */
254 mthread_event_destroy(&wp->sleep_event);
256 wp->state = STATE_DEAD;
259 /*===========================================================================*
260 * master_handle_exits *
261 *===========================================================================*/
262 static void master_handle_exits(void)
264 /* Destroy the remains of all exited threads.
266 int i;
268 for (i = 0; i < num_exited; i++)
269 master_destroy_worker(exited[i]);
271 num_exited = 0;
274 /*===========================================================================*
275 * master_handle_message *
276 *===========================================================================*/
277 static void master_handle_message(message *m_ptr, int ipc_status)
279 /* For real request messages, query the device ID, start a thread if none is
280 * free and the maximum number of threads for that device has not yet been
281 * reached, and enqueue the message in the devices's message queue. All other
282 * messages are handled immediately from the main thread.
284 device_id_t id;
285 worker_t *wp;
286 device_t *dp;
287 int r, wid;
289 /* If this is not a block driver request, we cannot get the minor device
290 * associated with it, and thus we can not tell which thread should process
291 * it either. In that case, the master thread has to handle it instead.
293 if (is_ipc_notify(ipc_status) || !IS_BDEV_RQ(m_ptr->m_type)) {
294 /* Process as 'other' message. */
295 blockdriver_process_on_thread(bdtab, m_ptr, ipc_status, MAIN_THREAD);
297 return;
300 /* Query the device ID. Upon failure, send the error code to the caller. */
301 r = (*bdtab->bdr_device)(m_ptr->m_lbdev_lblockdriver_msg.minor, &id);
302 if (r != OK) {
303 blockdriver_reply(m_ptr, ipc_status, r);
305 return;
308 /* Look up the device control block. */
309 assert(id >= 0 && id < MAX_DEVICES);
310 dp = &device[id];
312 /* Find the first non-busy worker thread. */
313 for (wid = 0; wid < dp->workers; wid++)
314 if (dp->worker[wid].state != STATE_BUSY)
315 break;
317 /* If the worker thread is dead, start a thread now, unless we have already
318 * reached the maximum number of threads.
320 if (wid < dp->workers) {
321 wp = &dp->worker[wid];
323 assert(wp->state != STATE_EXITED);
325 /* If the non-busy thread has not yet been created, create one now. */
326 if (wp->state == STATE_DEAD)
327 master_create_worker(wp, wid, dp->id);
330 /* Enqueue the message at the device queue. */
331 enqueue(dp, m_ptr, ipc_status);
334 /*===========================================================================*
335 * master_init *
336 *===========================================================================*/
337 static void master_init(struct blockdriver *bdp)
339 /* Initialize the state of the master thread.
341 int i, j;
343 assert(bdp != NULL);
344 assert(bdp->bdr_device != NULL);
346 bdtab = bdp;
348 /* Initialize device-specific data structures. */
349 for (i = 0; i < MAX_DEVICES; i++) {
350 device[i].id = i;
351 device[i].workers = 1;
352 mthread_event_init(&device[i].queue_event);
353 mthread_rwlock_init(&device[i].barrier);
355 for (j = 0; j < MAX_WORKERS; j++)
356 device[i].worker[j].state = STATE_DEAD;
359 /* Initialize a per-thread key, where each worker thread stores its own
360 * reference to the worker structure.
362 if (mthread_key_create(&worker_key, NULL))
363 panic("blockdriver_mt: error initializing worker key");
366 /*===========================================================================*
367 * blockdriver_mt_get_tid *
368 *===========================================================================*/
369 thread_id_t blockdriver_mt_get_tid(void)
371 /* Return back the ID of this thread.
373 worker_t *wp;
375 wp = (worker_t *) mthread_getspecific(worker_key);
377 if (wp == NULL)
378 panic("blockdriver_mt: master thread cannot query thread ID\n");
380 return MAKE_TID(wp->device_id, wp->worker_id);
383 /*===========================================================================*
384 * blockdriver_mt_receive *
385 *===========================================================================*/
386 static void blockdriver_mt_receive(message *m_ptr, int *ipc_status)
388 /* Receive a message.
390 int r;
392 r = sef_receive_status(ANY, m_ptr, ipc_status);
394 if (r != OK)
395 panic("blockdriver_mt: sef_receive_status() returned %d", r);
398 /*===========================================================================*
399 * blockdriver_mt_task *
400 *===========================================================================*/
401 void blockdriver_mt_task(struct blockdriver *driver_tab)
403 /* The multithreaded driver task.
405 int ipc_status, i;
406 message mess;
408 /* Initialize first if necessary. */
409 if (!running) {
410 master_init(driver_tab);
412 running = TRUE;
415 /* The main message loop. */
416 while (running) {
417 /* Receive a message. */
418 blockdriver_mt_receive(&mess, &ipc_status);
420 /* Dispatch the message. */
421 master_handle_message(&mess, ipc_status);
423 /* Let other threads run. */
424 mthread_yield_all();
426 /* Clean up any exited threads. */
427 if (num_exited > 0)
428 master_handle_exits();
431 /* Free up resources. */
432 for (i = 0; i < MAX_DEVICES; i++)
433 mthread_event_destroy(&device[i].queue_event);
436 /*===========================================================================*
437 * blockdriver_mt_terminate *
438 *===========================================================================*/
439 void blockdriver_mt_terminate(void)
441 /* Instruct libblockdriver to shut down.
444 running = FALSE;
447 /*===========================================================================*
448 * blockdriver_mt_sleep *
449 *===========================================================================*/
450 void blockdriver_mt_sleep(void)
452 /* Let the current thread sleep until it gets woken up by the master thread.
454 worker_t *wp;
456 wp = (worker_t *) mthread_getspecific(worker_key);
458 if (wp == NULL)
459 panic("blockdriver_mt: master thread cannot sleep");
461 mthread_event_wait(&wp->sleep_event);
464 /*===========================================================================*
465 * blockdriver_mt_wakeup *
466 *===========================================================================*/
467 void blockdriver_mt_wakeup(thread_id_t id)
469 /* Wake up a sleeping worker thread from the master thread.
471 worker_t *wp;
472 device_id_t device_id;
473 worker_id_t worker_id;
475 device_id = TID_DEVICE(id);
476 worker_id = TID_WORKER(id);
478 assert(device_id >= 0 && device_id < MAX_DEVICES);
479 assert(worker_id >= 0 && worker_id < MAX_WORKERS);
481 wp = &device[device_id].worker[worker_id];
483 assert(wp->state == STATE_RUNNING || wp->state == STATE_BUSY);
485 mthread_event_fire(&wp->sleep_event);
488 /*===========================================================================*
489 * blockdriver_mt_set_workers *
490 *===========================================================================*/
491 void blockdriver_mt_set_workers(device_id_t id, int workers)
493 /* Set the number of worker threads for the given device.
495 device_t *dp;
497 assert(id >= 0 && id < MAX_DEVICES);
499 if (workers > MAX_WORKERS)
500 workers = MAX_WORKERS;
502 dp = &device[id];
504 /* If we are cleaning up, wake up all threads waiting on a queue event. */
505 if (workers == 1 && dp->workers > workers)
506 mthread_event_fire_all(&dp->queue_event);
508 dp->workers = workers;