worldstone: add -s for statistical profiling
[minix.git] / lib / libblockdriver / driver_mt.c
blob31be86fec9d5d6e7ae0a476e10141439bd0ff07e
1 /* This file contains the multithreaded driver interface.
3 * Changes:
4 * Aug 27, 2011 created (A. Welzel)
6 * The entry points into this file are:
7 * blockdriver_mt_task: the main message loop of the driver
8 * blockdriver_mt_terminate: break out of the main message loop
9 * blockdriver_mt_sleep: put the current thread to sleep
10 * blockdriver_mt_wakeup: wake up a sleeping thread
11 * blockdriver_mt_set_workers:set the number of worker threads
14 #include <minix/blockdriver_mt.h>
15 #include <minix/mthread.h>
16 #include <assert.h>
18 #include "const.h"
19 #include "driver.h"
20 #include "mq.h"
22 /* A thread ID is composed of a device ID and a per-device worker thread ID.
23 * All thread IDs must be in the range 0..(MAX_THREADS-1) inclusive.
25 #define MAKE_TID(did, wid) ((did) * MAX_WORKERS + (wid))
26 #define TID_DEVICE(tid) ((tid) / MAX_WORKERS)
27 #define TID_WORKER(tid) ((tid) % MAX_WORKERS)
29 typedef int worker_id_t;
31 typedef enum {
32 STATE_DEAD,
33 STATE_RUNNING,
34 STATE_BUSY,
35 STATE_EXITED
36 } worker_state;
38 /* Structure with information about a worker thread. */
39 typedef struct {
40 device_id_t device_id;
41 worker_id_t worker_id;
42 worker_state state;
43 mthread_thread_t mthread;
44 mthread_event_t sleep_event;
45 } worker_t;
47 /* Structure with information about a device. */
48 typedef struct {
49 device_id_t id;
50 unsigned int workers;
51 worker_t worker[MAX_WORKERS];
52 mthread_event_t queue_event;
53 mthread_rwlock_t barrier;
54 } device_t;
56 static struct blockdriver *bdtab;
57 static int running = FALSE;
59 static mthread_key_t worker_key;
61 static device_t device[MAX_DEVICES];
63 static worker_t *exited[MAX_THREADS];
64 static int num_exited = 0;
66 /*===========================================================================*
67 * enqueue *
68 *===========================================================================*/
69 static void enqueue(device_t *dp, const message *m_src, int ipc_status)
71 /* Enqueue a message into the device's queue, and signal the event.
72 * Must be called from the master thread.
75 if (!mq_enqueue(dp->id, m_src, ipc_status))
76 panic("blockdriver_mt: enqueue failed (message queue full)");
78 mthread_event_fire(&dp->queue_event);
81 /*===========================================================================*
82 * try_dequeue *
83 *===========================================================================*/
84 static int try_dequeue(device_t *dp, message *m_dst, int *ipc_status)
86 /* See if a message can be dequeued from the current worker thread's device
87 * queue. If so, dequeue the message and return TRUE. If not, return FALSE.
88 * Must be called from a worker thread. Does not block.
91 return mq_dequeue(dp->id, m_dst, ipc_status);
94 /*===========================================================================*
95 * dequeue *
96 *===========================================================================*/
97 static int dequeue(device_t *dp, worker_t *wp, message *m_dst,
98 int *ipc_status)
100 /* Dequeue a message from the current worker thread's device queue. Block the
101 * current thread if necessary. Must be called from a worker thread. Either
102 * succeeds with a message (TRUE) or indicates that the thread should be
103 * terminated (FALSE).
106 do {
107 mthread_event_wait(&dp->queue_event);
109 /* If we were woken up as a result of terminate or set_workers, break
110 * out of the loop and terminate the thread.
112 if (!running || wp->worker_id >= dp->workers)
113 return FALSE;
114 } while (!try_dequeue(dp, m_dst, ipc_status));
116 return TRUE;
119 /*===========================================================================*
120 * is_transfer_req *
121 *===========================================================================*/
122 static int is_transfer_req(int type)
124 /* Return whether the given block device request is a transfer request.
127 switch (type) {
128 case BDEV_READ:
129 case BDEV_WRITE:
130 case BDEV_GATHER:
131 case BDEV_SCATTER:
132 return TRUE;
134 default:
135 return FALSE;
139 /*===========================================================================*
140 * worker_thread *
141 *===========================================================================*/
142 static void *worker_thread(void *param)
144 /* The worker thread loop. Set up the thread-specific reference to itself and
145 * start looping. The loop consists of blocking dequeing and handling messages.
146 * After handling a message, the thread might have been stopped, so we check
147 * for this condition and exit if so.
149 worker_t *wp;
150 device_t *dp;
151 thread_id_t tid;
152 message m;
153 int ipc_status, r;
155 wp = (worker_t *) param;
156 assert(wp != NULL);
157 dp = &device[wp->device_id];
158 tid = MAKE_TID(wp->device_id, wp->worker_id);
160 if (mthread_setspecific(worker_key, wp))
161 panic("blockdriver_mt: could not save local thread pointer");
163 while (running && wp->worker_id < dp->workers) {
165 /* See if a new message is available right away. */
166 if (!try_dequeue(dp, &m, &ipc_status)) {
168 /* If not, block waiting for a new message or a thread
169 * termination event.
171 if (!dequeue(dp, wp, &m, &ipc_status))
172 break;
175 /* Even if the thread was stopped before, a new message resumes it. */
176 wp->state = STATE_BUSY;
178 /* If the request is a transfer request, we acquire the read barrier
179 * lock. Otherwise, we acquire the write lock.
181 if (is_transfer_req(m.m_type))
182 mthread_rwlock_rdlock(&dp->barrier);
183 else
184 mthread_rwlock_wrlock(&dp->barrier);
186 /* Handle the request and send a reply. */
187 r = blockdriver_handle_request(bdtab, &m, tid);
189 blockdriver_reply(&m, ipc_status, r);
191 /* Switch the thread back to running state, and unlock the barrier. */
192 wp->state = STATE_RUNNING;
193 mthread_rwlock_unlock(&dp->barrier);
196 /* Clean up and terminate this thread. */
197 if (mthread_setspecific(worker_key, NULL))
198 panic("blockdriver_mt: could not delete local thread pointer");
200 wp->state = STATE_EXITED;
202 exited[num_exited++] = wp;
204 return NULL;
207 /*===========================================================================*
208 * master_create_worker *
209 *===========================================================================*/
210 static void master_create_worker(worker_t *wp, worker_id_t worker_id,
211 device_id_t device_id)
213 /* Start a new worker thread.
215 mthread_attr_t attr;
216 int r;
218 wp->device_id = device_id;
219 wp->worker_id = worker_id;
220 wp->state = STATE_RUNNING;
222 /* Initialize synchronization primitives. */
223 mthread_event_init(&wp->sleep_event);
225 r = mthread_attr_init(&attr);
226 if (r != 0)
227 panic("blockdriver_mt: could not initialize attributes (%d)", r);
229 r = mthread_attr_setstacksize(&attr, STACK_SIZE);
230 if (r != 0)
231 panic("blockdriver_mt: could not set stack size (%d)", r);
233 r = mthread_create(&wp->mthread, &attr, worker_thread, (void *) wp);
234 if (r != 0)
235 panic("blockdriver_mt: could not start thread %d (%d)", worker_id, r);
237 mthread_attr_destroy(&attr);
240 /*===========================================================================*
241 * master_destroy_worker *
242 *===========================================================================*/
243 static void master_destroy_worker(worker_t *wp)
245 /* Clean up resources used by an exited worker thread.
248 assert(wp != NULL);
249 assert(wp->state == STATE_EXITED);
251 /* Join the thread. */
252 if (mthread_join(wp->mthread, NULL))
253 panic("blockdriver_mt: could not join thread %d", wp->worker_id);
255 /* Destroy resources. */
256 mthread_event_destroy(&wp->sleep_event);
258 wp->state = STATE_DEAD;
261 /*===========================================================================*
262 * master_handle_exits *
263 *===========================================================================*/
264 static void master_handle_exits(void)
266 /* Destroy the remains of all exited threads.
268 int i;
270 for (i = 0; i < num_exited; i++)
271 master_destroy_worker(exited[i]);
273 num_exited = 0;
276 /*===========================================================================*
277 * master_handle_request *
278 *===========================================================================*/
279 static void master_handle_request(message *m_ptr, int ipc_status)
281 /* For real request messages, query the device ID, start a thread if none is
282 * free and the maximum number of threads for that device has not yet been
283 * reached, and enqueue the message in the devices's message queue.
285 device_id_t id;
286 worker_t *wp;
287 device_t *dp;
288 int r, wid;
290 /* If this is not a block driver request, we cannot get the minor device
291 * associated with it, and thus we can not tell which thread should process
292 * it either. In that case, the master thread has to handle it instead.
294 if (!IS_BDEV_RQ(m_ptr->m_type)) {
295 /* Process as 'other' message. */
296 r = blockdriver_handle_request(bdtab, m_ptr, MAIN_THREAD);
298 blockdriver_reply(m_ptr, ipc_status, r);
300 return;
303 /* Query the device ID. Upon failure, send the error code to the caller. */
304 r = (*bdtab->bdr_device)(m_ptr->BDEV_MINOR, &id);
305 if (r != OK) {
306 blockdriver_reply(m_ptr, ipc_status, r);
308 return;
311 /* Look up the device control block. */
312 assert(id >= 0 && id < MAX_DEVICES);
313 dp = &device[id];
315 /* Find the first non-busy worker thread. */
316 for (wid = 0; wid < dp->workers; wid++)
317 if (dp->worker[wid].state != STATE_BUSY)
318 break;
320 /* If the worker thread is dead, start a thread now, unless we have already
321 * reached the maximum number of threads.
323 if (wid < dp->workers) {
324 wp = &dp->worker[wid];
326 assert(wp->state != STATE_EXITED);
328 /* If the non-busy thread has not yet been created, create one now. */
329 if (wp->state == STATE_DEAD)
330 master_create_worker(wp, wid, dp->id);
333 /* Enqueue the message at the device queue. */
334 enqueue(dp, m_ptr, ipc_status);
337 /*===========================================================================*
338 * master_init *
339 *===========================================================================*/
340 static void master_init(struct blockdriver *bdp)
342 /* Initialize the state of the master thread.
344 int i, j;
346 assert(bdp != NULL);
347 assert(bdp->bdr_device != NULL);
349 mthread_init();
351 bdtab = bdp;
353 /* Initialize device-specific data structures. */
354 for (i = 0; i < MAX_DEVICES; i++) {
355 device[i].id = i;
356 device[i].workers = 1;
357 mthread_event_init(&device[i].queue_event);
358 mthread_rwlock_init(&device[i].barrier);
360 for (j = 0; j < MAX_WORKERS; j++)
361 device[i].worker[j].state = STATE_DEAD;
364 /* Initialize a per-thread key, where each worker thread stores its own
365 * reference to the worker structure.
367 if (mthread_key_create(&worker_key, NULL))
368 panic("blockdriver_mt: error initializing worker key");
371 /*===========================================================================*
372 * blockdriver_mt_get_tid *
373 *===========================================================================*/
374 thread_id_t blockdriver_mt_get_tid(void)
376 /* Return back the ID of this thread.
378 worker_t *wp;
380 wp = (worker_t *) mthread_getspecific(worker_key);
382 if (wp == NULL)
383 panic("blockdriver_mt: master thread cannot query thread ID\n");
385 return MAKE_TID(wp->device_id, wp->worker_id);
388 /*===========================================================================*
389 * blockdriver_mt_receive *
390 *===========================================================================*/
391 static void blockdriver_mt_receive(message *m_ptr, int *ipc_status)
393 /* Receive a message.
395 int r;
397 r = sef_receive_status(ANY, m_ptr, ipc_status);
399 if (r != OK)
400 panic("blockdriver_mt: sef_receive_status() returned %d", r);
403 /*===========================================================================*
404 * blockdriver_mt_task *
405 *===========================================================================*/
406 void blockdriver_mt_task(struct blockdriver *driver_tab)
408 /* The multithreaded driver task.
410 int ipc_status, i;
411 message mess;
413 /* Initialize first if necessary. */
414 if (!running) {
415 master_init(driver_tab);
417 running = TRUE;
420 /* The main message loop. */
421 while (running) {
422 /* Receive a message. */
423 blockdriver_mt_receive(&mess, &ipc_status);
425 /* Dispatch the message. */
426 if (is_ipc_notify(ipc_status))
427 blockdriver_handle_notify(bdtab, &mess);
428 else
429 master_handle_request(&mess, ipc_status);
431 /* Let other threads run. */
432 mthread_yield_all();
434 /* Clean up any exited threads. */
435 if (num_exited > 0)
436 master_handle_exits();
439 /* Free up resources. */
440 for (i = 0; i < MAX_DEVICES; i++)
441 mthread_event_destroy(&device[i].queue_event);
444 /*===========================================================================*
445 * blockdriver_mt_terminate *
446 *===========================================================================*/
447 void blockdriver_mt_terminate(void)
449 /* Instruct libblockdriver to shut down.
452 running = FALSE;
455 /*===========================================================================*
456 * blockdriver_mt_sleep *
457 *===========================================================================*/
458 void blockdriver_mt_sleep(void)
460 /* Let the current thread sleep until it gets woken up by the master thread.
462 worker_t *wp;
464 wp = (worker_t *) mthread_getspecific(worker_key);
466 if (wp == NULL)
467 panic("blockdriver_mt: master thread cannot sleep");
469 mthread_event_wait(&wp->sleep_event);
472 /*===========================================================================*
473 * blockdriver_mt_wakeup *
474 *===========================================================================*/
475 void blockdriver_mt_wakeup(thread_id_t id)
477 /* Wake up a sleeping worker thread from the master thread.
479 worker_t *wp;
480 device_id_t device_id;
481 worker_id_t worker_id;
483 device_id = TID_DEVICE(id);
484 worker_id = TID_WORKER(id);
486 assert(device_id >= 0 && device_id < MAX_DEVICES);
487 assert(worker_id >= 0 && worker_id < MAX_WORKERS);
489 wp = &device[device_id].worker[worker_id];
491 assert(wp->state == STATE_RUNNING || wp->state == STATE_BUSY);
493 mthread_event_fire(&wp->sleep_event);
496 /*===========================================================================*
497 * blockdriver_mt_set_workers *
498 *===========================================================================*/
499 void blockdriver_mt_set_workers(device_id_t id, int workers)
501 /* Set the number of worker threads for the given device.
503 device_t *dp;
505 assert(id >= 0 && id < MAX_DEVICES);
507 if (workers > MAX_WORKERS)
508 workers = MAX_WORKERS;
510 dp = &device[id];
512 /* If we are cleaning up, wake up all threads waiting on a queue event. */
513 if (workers == 1 && dp->workers > workers)
514 mthread_event_fire_all(&dp->queue_event);
516 dp->workers = workers;