1 /* This file contains the multithreaded driver interface.
4 * Aug 27, 2011 created (A. Welzel)
6 * The entry points into this file are:
7 * blockdriver_mt_task: the main message loop of the driver
8 * blockdriver_mt_terminate: break out of the main message loop
9 * blockdriver_mt_sleep: put the current thread to sleep
10 * blockdriver_mt_wakeup: wake up a sleeping thread
11 * blockdriver_mt_set_workers:set the number of worker threads
14 #include <minix/blockdriver_mt.h>
15 #include <minix/mthread.h>
22 /* A thread ID is composed of a device ID and a per-device worker thread ID.
23 * All thread IDs must be in the range 0..(MAX_THREADS-1) inclusive.
25 #define MAKE_TID(did, wid) ((did) * MAX_WORKERS + (wid))
26 #define TID_DEVICE(tid) ((tid) / MAX_WORKERS)
27 #define TID_WORKER(tid) ((tid) % MAX_WORKERS)
29 typedef int worker_id_t
;
38 /* Structure with information about a worker thread. */
40 device_id_t device_id
;
41 worker_id_t worker_id
;
43 mthread_thread_t mthread
;
44 mthread_event_t sleep_event
;
47 /* Structure with information about a device. */
51 worker_t worker
[MAX_WORKERS
];
52 mthread_event_t queue_event
;
53 mthread_rwlock_t barrier
;
56 static struct blockdriver
*bdtab
;
57 static int running
= FALSE
;
59 static mthread_key_t worker_key
;
61 static device_t device
[MAX_DEVICES
];
63 static worker_t
*exited
[MAX_THREADS
];
64 static int num_exited
= 0;
66 /*===========================================================================*
68 *===========================================================================*/
69 static void enqueue(device_t
*dp
, const message
*m_src
, int ipc_status
)
71 /* Enqueue a message into the device's queue, and signal the event.
72 * Must be called from the master thread.
75 if (!mq_enqueue(dp
->id
, m_src
, ipc_status
))
76 panic("blockdriver_mt: enqueue failed (message queue full)");
78 mthread_event_fire(&dp
->queue_event
);
81 /*===========================================================================*
83 *===========================================================================*/
84 static int try_dequeue(device_t
*dp
, message
*m_dst
, int *ipc_status
)
86 /* See if a message can be dequeued from the current worker thread's device
87 * queue. If so, dequeue the message and return TRUE. If not, return FALSE.
88 * Must be called from a worker thread. Does not block.
91 return mq_dequeue(dp
->id
, m_dst
, ipc_status
);
94 /*===========================================================================*
96 *===========================================================================*/
97 static int dequeue(device_t
*dp
, worker_t
*wp
, message
*m_dst
,
100 /* Dequeue a message from the current worker thread's device queue. Block the
101 * current thread if necessary. Must be called from a worker thread. Either
102 * succeeds with a message (TRUE) or indicates that the thread should be
103 * terminated (FALSE).
107 mthread_event_wait(&dp
->queue_event
);
109 /* If we were woken up as a result of terminate or set_workers, break
110 * out of the loop and terminate the thread.
112 if (!running
|| wp
->worker_id
>= dp
->workers
)
114 } while (!try_dequeue(dp
, m_dst
, ipc_status
));
119 /*===========================================================================*
121 *===========================================================================*/
122 static int is_transfer_req(int type
)
124 /* Return whether the given block device request is a transfer request.
139 /*===========================================================================*
141 *===========================================================================*/
142 static void *worker_thread(void *param
)
144 /* The worker thread loop. Set up the thread-specific reference to itself and
145 * start looping. The loop consists of blocking dequeing and handling messages.
146 * After handling a message, the thread might have been stopped, so we check
147 * for this condition and exit if so.
155 wp
= (worker_t
*) param
;
157 dp
= &device
[wp
->device_id
];
158 tid
= MAKE_TID(wp
->device_id
, wp
->worker_id
);
160 if (mthread_setspecific(worker_key
, wp
))
161 panic("blockdriver_mt: could not save local thread pointer");
163 while (running
&& wp
->worker_id
< dp
->workers
) {
165 /* See if a new message is available right away. */
166 if (!try_dequeue(dp
, &m
, &ipc_status
)) {
168 /* If not, block waiting for a new message or a thread
171 if (!dequeue(dp
, wp
, &m
, &ipc_status
))
175 /* Even if the thread was stopped before, a new message resumes it. */
176 wp
->state
= STATE_BUSY
;
178 /* If the request is a transfer request, we acquire the read barrier
179 * lock. Otherwise, we acquire the write lock.
181 if (is_transfer_req(m
.m_type
))
182 mthread_rwlock_rdlock(&dp
->barrier
);
184 mthread_rwlock_wrlock(&dp
->barrier
);
186 /* Handle the request and send a reply. */
187 blockdriver_process_on_thread(bdtab
, &m
, ipc_status
, tid
);
189 /* Switch the thread back to running state, and unlock the barrier. */
190 wp
->state
= STATE_RUNNING
;
191 mthread_rwlock_unlock(&dp
->barrier
);
194 /* Clean up and terminate this thread. */
195 if (mthread_setspecific(worker_key
, NULL
))
196 panic("blockdriver_mt: could not delete local thread pointer");
198 wp
->state
= STATE_EXITED
;
200 exited
[num_exited
++] = wp
;
205 /*===========================================================================*
206 * master_create_worker *
207 *===========================================================================*/
208 static void master_create_worker(worker_t
*wp
, worker_id_t worker_id
,
209 device_id_t device_id
)
211 /* Start a new worker thread.
216 wp
->device_id
= device_id
;
217 wp
->worker_id
= worker_id
;
218 wp
->state
= STATE_RUNNING
;
220 /* Initialize synchronization primitives. */
221 mthread_event_init(&wp
->sleep_event
);
223 r
= mthread_attr_init(&attr
);
225 panic("blockdriver_mt: could not initialize attributes (%d)", r
);
227 r
= mthread_attr_setstacksize(&attr
, STACK_SIZE
);
229 panic("blockdriver_mt: could not set stack size (%d)", r
);
231 r
= mthread_create(&wp
->mthread
, &attr
, worker_thread
, (void *) wp
);
233 panic("blockdriver_mt: could not start thread %d (%d)", worker_id
, r
);
235 mthread_attr_destroy(&attr
);
238 /*===========================================================================*
239 * master_destroy_worker *
240 *===========================================================================*/
241 static void master_destroy_worker(worker_t
*wp
)
243 /* Clean up resources used by an exited worker thread.
247 assert(wp
->state
== STATE_EXITED
);
249 /* Join the thread. */
250 if (mthread_join(wp
->mthread
, NULL
))
251 panic("blockdriver_mt: could not join thread %d", wp
->worker_id
);
253 /* Destroy resources. */
254 mthread_event_destroy(&wp
->sleep_event
);
256 wp
->state
= STATE_DEAD
;
259 /*===========================================================================*
260 * master_handle_exits *
261 *===========================================================================*/
262 static void master_handle_exits(void)
264 /* Destroy the remains of all exited threads.
268 for (i
= 0; i
< num_exited
; i
++)
269 master_destroy_worker(exited
[i
]);
274 /*===========================================================================*
275 * master_handle_message *
276 *===========================================================================*/
277 static void master_handle_message(message
*m_ptr
, int ipc_status
)
279 /* For real request messages, query the device ID, start a thread if none is
280 * free and the maximum number of threads for that device has not yet been
281 * reached, and enqueue the message in the devices's message queue. All other
282 * messages are handled immediately from the main thread.
289 /* If this is not a block driver request, we cannot get the minor device
290 * associated with it, and thus we can not tell which thread should process
291 * it either. In that case, the master thread has to handle it instead.
293 if (is_ipc_notify(ipc_status
) || !IS_BDEV_RQ(m_ptr
->m_type
)) {
294 /* Process as 'other' message. */
295 blockdriver_process_on_thread(bdtab
, m_ptr
, ipc_status
, MAIN_THREAD
);
300 /* Query the device ID. Upon failure, send the error code to the caller. */
301 r
= (*bdtab
->bdr_device
)(m_ptr
->m_lbdev_lblockdriver_msg
.minor
, &id
);
303 blockdriver_reply(m_ptr
, ipc_status
, r
);
308 /* Look up the device control block. */
309 assert(id
>= 0 && id
< MAX_DEVICES
);
312 /* Find the first non-busy worker thread. */
313 for (wid
= 0; wid
< dp
->workers
; wid
++)
314 if (dp
->worker
[wid
].state
!= STATE_BUSY
)
317 /* If the worker thread is dead, start a thread now, unless we have already
318 * reached the maximum number of threads.
320 if (wid
< dp
->workers
) {
321 wp
= &dp
->worker
[wid
];
323 assert(wp
->state
!= STATE_EXITED
);
325 /* If the non-busy thread has not yet been created, create one now. */
326 if (wp
->state
== STATE_DEAD
)
327 master_create_worker(wp
, wid
, dp
->id
);
330 /* Enqueue the message at the device queue. */
331 enqueue(dp
, m_ptr
, ipc_status
);
334 /*===========================================================================*
336 *===========================================================================*/
337 static void master_init(struct blockdriver
*bdp
)
339 /* Initialize the state of the master thread.
344 assert(bdp
->bdr_device
!= NULL
);
348 /* Initialize device-specific data structures. */
349 for (i
= 0; i
< MAX_DEVICES
; i
++) {
351 device
[i
].workers
= 1;
352 mthread_event_init(&device
[i
].queue_event
);
353 mthread_rwlock_init(&device
[i
].barrier
);
355 for (j
= 0; j
< MAX_WORKERS
; j
++)
356 device
[i
].worker
[j
].state
= STATE_DEAD
;
359 /* Initialize a per-thread key, where each worker thread stores its own
360 * reference to the worker structure.
362 if (mthread_key_create(&worker_key
, NULL
))
363 panic("blockdriver_mt: error initializing worker key");
366 /*===========================================================================*
367 * blockdriver_mt_get_tid *
368 *===========================================================================*/
369 thread_id_t
blockdriver_mt_get_tid(void)
371 /* Return back the ID of this thread.
375 wp
= (worker_t
*) mthread_getspecific(worker_key
);
378 panic("blockdriver_mt: master thread cannot query thread ID\n");
380 return MAKE_TID(wp
->device_id
, wp
->worker_id
);
383 /*===========================================================================*
384 * blockdriver_mt_receive *
385 *===========================================================================*/
386 static void blockdriver_mt_receive(message
*m_ptr
, int *ipc_status
)
388 /* Receive a message.
392 r
= sef_receive_status(ANY
, m_ptr
, ipc_status
);
395 panic("blockdriver_mt: sef_receive_status() returned %d", r
);
398 /*===========================================================================*
399 * blockdriver_mt_task *
400 *===========================================================================*/
401 void blockdriver_mt_task(struct blockdriver
*driver_tab
)
403 /* The multithreaded driver task.
408 /* Initialize first if necessary. */
410 master_init(driver_tab
);
415 /* The main message loop. */
417 /* Receive a message. */
418 blockdriver_mt_receive(&mess
, &ipc_status
);
420 /* Dispatch the message. */
421 master_handle_message(&mess
, ipc_status
);
423 /* Let other threads run. */
426 /* Clean up any exited threads. */
428 master_handle_exits();
431 /* Free up resources. */
432 for (i
= 0; i
< MAX_DEVICES
; i
++)
433 mthread_event_destroy(&device
[i
].queue_event
);
436 /*===========================================================================*
437 * blockdriver_mt_terminate *
438 *===========================================================================*/
439 void blockdriver_mt_terminate(void)
441 /* Instruct libblockdriver to shut down.
447 /*===========================================================================*
448 * blockdriver_mt_sleep *
449 *===========================================================================*/
450 void blockdriver_mt_sleep(void)
452 /* Let the current thread sleep until it gets woken up by the master thread.
456 wp
= (worker_t
*) mthread_getspecific(worker_key
);
459 panic("blockdriver_mt: master thread cannot sleep");
461 mthread_event_wait(&wp
->sleep_event
);
464 /*===========================================================================*
465 * blockdriver_mt_wakeup *
466 *===========================================================================*/
467 void blockdriver_mt_wakeup(thread_id_t id
)
469 /* Wake up a sleeping worker thread from the master thread.
472 device_id_t device_id
;
473 worker_id_t worker_id
;
475 device_id
= TID_DEVICE(id
);
476 worker_id
= TID_WORKER(id
);
478 assert(device_id
>= 0 && device_id
< MAX_DEVICES
);
479 assert(worker_id
>= 0 && worker_id
< MAX_WORKERS
);
481 wp
= &device
[device_id
].worker
[worker_id
];
483 assert(wp
->state
== STATE_RUNNING
|| wp
->state
== STATE_BUSY
);
485 mthread_event_fire(&wp
->sleep_event
);
488 /*===========================================================================*
489 * blockdriver_mt_set_workers *
490 *===========================================================================*/
491 void blockdriver_mt_set_workers(device_id_t id
, int workers
)
493 /* Set the number of worker threads for the given device.
497 assert(id
>= 0 && id
< MAX_DEVICES
);
499 if (workers
> MAX_WORKERS
)
500 workers
= MAX_WORKERS
;
504 /* If we are cleaning up, wake up all threads waiting on a queue event. */
505 if (workers
== 1 && dp
->workers
> workers
)
506 mthread_event_fire_all(&dp
->queue_event
);
508 dp
->workers
= workers
;