8 static void append_job(struct job
*job
, void *(*func
)(void *arg
));
9 static void get_work(struct worker_thread
*worker
);
10 static void *worker_main(void *arg
);
11 static void worker_sleep(struct worker_thread
*worker
);
12 static void worker_wake(struct worker_thread
*worker
);
13 static int worker_waiting_for(struct worker_thread
*worker
, endpoint_t
16 static mthread_attr_t tattr
;
19 # define TH_STACKSIZE (10 * 1024)
21 # define TH_STACKSIZE (7 * 1024)
24 #define ASSERTW(w) assert((w) == &sys_worker || (w) == &dl_worker || \
25 ((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]));
27 /*===========================================================================*
29 *===========================================================================*/
30 void worker_init(struct worker_thread
*wp
)
32 /* Initialize worker thread */
35 if (mthread_attr_init(&tattr
) != 0)
36 panic("failed to initialize attribute");
37 if (mthread_attr_setstacksize(&tattr
, TH_STACKSIZE
) != 0)
38 panic("couldn't set default thread stack size");
39 if (mthread_attr_setdetachstate(&tattr
, MTHREAD_CREATE_DETACHED
) != 0)
40 panic("couldn't set default thread detach state");
41 invalid_thread_id
= mthread_self(); /* Assuming we're the main thread*/
48 wp
->w_job
.j_func
= NULL
; /* Mark not in use */
50 if (mutex_init(&wp
->w_event_mutex
, NULL
) != 0)
51 panic("failed to initialize mutex");
52 if (cond_init(&wp
->w_event
, NULL
) != 0)
53 panic("failed to initialize conditional variable");
54 if (mthread_create(&wp
->w_tid
, &tattr
, worker_main
, (void *) wp
) != 0)
55 panic("unable to start thread");
59 /*===========================================================================*
61 *===========================================================================*/
62 static void get_work(struct worker_thread
*worker
)
64 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the
65 * latter case wait for new work to come in. */
73 /* Do we have queued work to do? */
74 if ((new_job
= worker
->w_job
.j_next
) != NULL
) {
75 worker
->w_job
= *new_job
;
78 } else if (worker
!= &sys_worker
&& worker
!= &dl_worker
&& pending
> 0) {
79 /* Find pending work */
80 for (rfp
= &fproc
[0]; rfp
< &fproc
[NR_PROCS
]; rfp
++) {
81 if (rfp
->fp_flags
& FP_PENDING
) {
82 worker
->w_job
= rfp
->fp_job
;
83 rfp
->fp_job
.j_func
= NULL
;
84 rfp
->fp_flags
&= ~FP_PENDING
; /* No longer pending */
90 panic("Pending work inconsistency");
93 /* Wait for work to come to us */
97 /*===========================================================================*
99 *===========================================================================*/
100 int worker_available(void)
105 for (i
= 0; i
< NR_WTHREADS
; i
++) {
106 if (workers
[i
].w_job
.j_func
!= NULL
)
110 return(NR_WTHREADS
- busy
);
113 /*===========================================================================*
115 *===========================================================================*/
116 static void *worker_main(void *arg
)
118 /* Worker thread main loop */
119 struct worker_thread
*me
;
121 me
= (struct worker_thread
*) arg
;
127 /* Register ourselves in fproc table if possible */
128 if (me
->w_job
.j_fp
!= NULL
) {
129 me
->w_job
.j_fp
->fp_wtid
= me
->w_tid
;
133 me
->w_job
.j_func(&me
->w_job
);
135 /* Deregister if possible */
136 if (me
->w_job
.j_fp
!= NULL
) {
137 me
->w_job
.j_fp
->fp_wtid
= invalid_thread_id
;
140 /* Mark ourselves as done */
141 me
->w_job
.j_func
= NULL
;
142 me
->w_job
.j_fp
= NULL
;
145 return(NULL
); /* Unreachable */
148 /*===========================================================================*
150 *===========================================================================*/
151 void dl_worker_start(void *(*func
)(void *arg
))
153 /* Start the deadlock resolving worker. This worker is reserved to run in case
154 * all other workers are busy and we have to have an additional worker to come
156 assert(dl_worker
.w_job
.j_func
== NULL
);
158 if (dl_worker
.w_job
.j_func
== NULL
) {
159 dl_worker
.w_job
.j_fp
= fp
;
160 dl_worker
.w_job
.j_m_in
= m_in
;
161 dl_worker
.w_job
.j_func
= func
;
162 worker_wake(&dl_worker
);
166 /*===========================================================================*
168 *===========================================================================*/
169 void sys_worker_start(void *(*func
)(void *arg
))
171 /* Carry out work for the system (i.e., kernel or PM). If this thread is idle
172 * do it right away, else create new job and append it to the queue. */
174 if (sys_worker
.w_job
.j_func
== NULL
) {
175 sys_worker
.w_job
.j_fp
= fp
;
176 sys_worker
.w_job
.j_m_in
= m_in
;
177 sys_worker
.w_job
.j_func
= func
;
178 worker_wake(&sys_worker
);
180 append_job(&sys_worker
.w_job
, func
);
184 /*===========================================================================*
186 *===========================================================================*/
187 static void append_job(struct job
*job
, void *(*func
)(void *arg
))
191 struct job
*new_job
, *tail
;
194 new_job
= calloc(1, sizeof(struct job
));
195 assert(new_job
!= NULL
);
197 new_job
->j_m_in
= m_in
;
198 new_job
->j_func
= func
;
199 new_job
->j_next
= NULL
;
200 new_job
->j_err_code
= OK
;
202 /* Append to queue */
204 while (tail
->j_next
!= NULL
) tail
= tail
->j_next
;
205 tail
->j_next
= new_job
;
208 /*===========================================================================*
210 *===========================================================================*/
211 void worker_start(void *(*func
)(void *arg
))
213 /* Find an available worker or wait for one */
215 struct worker_thread
*worker
;
217 if (fp
->fp_flags
& FP_DROP_WORK
) {
218 return; /* This process is not allowed to accept new work */
222 for (i
= 0; i
< NR_WTHREADS
; i
++) {
223 if (workers
[i
].w_job
.j_func
== NULL
) {
224 worker
= &workers
[i
];
229 if (worker
!= NULL
) {
230 worker
->w_job
.j_fp
= fp
;
231 worker
->w_job
.j_m_in
= m_in
;
232 worker
->w_job
.j_func
= func
;
233 worker
->w_job
.j_next
= NULL
;
234 worker
->w_job
.j_err_code
= OK
;
239 /* No worker threads available, let's wait for one to finish. */
240 /* If this process already has a job scheduled, forget about this new
242 * - the new job is do_dummy and we have already scheduled an actual job
243 * - the new job is an actual job and we have already scheduled do_dummy in
244 * order to exit this proc, so doing the new job is pointless. */
245 if (fp
->fp_job
.j_func
== NULL
) {
246 assert(!(fp
->fp_flags
& FP_PENDING
));
247 fp
->fp_job
.j_fp
= fp
;
248 fp
->fp_job
.j_m_in
= m_in
;
249 fp
->fp_job
.j_func
= func
;
250 fp
->fp_job
.j_next
= NULL
;
251 fp
->fp_job
.j_err_code
= OK
;
252 fp
->fp_flags
|= FP_PENDING
;
257 /*===========================================================================*
259 *===========================================================================*/
260 static void worker_sleep(struct worker_thread
*worker
)
263 assert(self
== worker
);
264 if (mutex_lock(&worker
->w_event_mutex
) != 0)
265 panic("unable to lock event mutex");
266 if (cond_wait(&worker
->w_event
, &worker
->w_event_mutex
) != 0)
267 panic("could not wait on conditional variable");
268 if (mutex_unlock(&worker
->w_event_mutex
) != 0)
269 panic("unable to unlock event mutex");
273 /*===========================================================================*
275 *===========================================================================*/
276 static void worker_wake(struct worker_thread
*worker
)
278 /* Signal a worker to wake up */
280 if (mutex_lock(&worker
->w_event_mutex
) != 0)
281 panic("unable to lock event mutex");
282 if (cond_signal(&worker
->w_event
) != 0)
283 panic("unable to signal conditional variable");
284 if (mutex_unlock(&worker
->w_event_mutex
) != 0)
285 panic("unable to unlock event mutex");
288 /*===========================================================================*
290 *===========================================================================*/
291 void worker_wait(void)
293 self
->w_job
.j_err_code
= err_code
;
295 /* We continue here after waking up */
296 fp
= self
->w_job
.j_fp
; /* Restore global data */
297 err_code
= self
->w_job
.j_err_code
;
298 assert(self
->w_next
== NULL
);
301 /*===========================================================================*
303 *===========================================================================*/
304 void worker_signal(struct worker_thread
*worker
)
306 ASSERTW(worker
); /* Make sure we have a valid thread */
310 /*===========================================================================*
312 *===========================================================================*/
313 void worker_stop(struct worker_thread
*worker
)
315 ASSERTW(worker
); /* Make sure we have a valid thread */
316 if (worker
->w_task
!= NONE
) {
317 /* This thread is communicating with a driver or file server */
318 if (worker
->w_drv_sendrec
!= NULL
) { /* Driver */
319 worker
->w_drv_sendrec
->m_type
= EIO
;
320 } else if (worker
->w_fs_sendrec
!= NULL
) { /* FS */
321 worker
->w_fs_sendrec
->m_type
= EIO
;
323 panic("reply storage consistency error"); /* Oh dear */
326 worker
->w_job
.j_m_in
.m_type
= EIO
;
331 /*===========================================================================*
332 * worker_stop_by_endpt *
333 *===========================================================================*/
334 void worker_stop_by_endpt(endpoint_t proc_e
)
336 struct worker_thread
*worker
;
339 if (proc_e
== NONE
) return;
341 if (worker_waiting_for(&sys_worker
, proc_e
)) worker_stop(&sys_worker
);
342 if (worker_waiting_for(&dl_worker
, proc_e
)) worker_stop(&dl_worker
);
344 for (i
= 0; i
< NR_WTHREADS
; i
++) {
345 worker
= &workers
[i
];
346 if (worker_waiting_for(worker
, proc_e
))
351 /*===========================================================================*
353 *===========================================================================*/
354 struct worker_thread
*worker_get(thread_t worker_tid
)
357 struct worker_thread
*worker
;
360 if (worker_tid
== sys_worker
.w_tid
)
361 worker
= &sys_worker
;
362 else if (worker_tid
== dl_worker
.w_tid
)
365 for (i
= 0; i
< NR_WTHREADS
; i
++) {
366 if (workers
[i
].w_tid
== worker_tid
) {
367 worker
= &workers
[i
];
376 /*===========================================================================*
378 *===========================================================================*/
379 struct job
*worker_getjob(thread_t worker_tid
)
381 struct worker_thread
*worker
;
383 if ((worker
= worker_get(worker_tid
)) != NULL
)
384 return(&worker
->w_job
);
389 /*===========================================================================*
390 * worker_waiting_for *
391 *===========================================================================*/
392 static int worker_waiting_for(struct worker_thread
*worker
, endpoint_t proc_e
)
394 ASSERTW(worker
); /* Make sure we have a valid thread */
396 if (worker
->w_job
.j_func
!= NULL
) {
397 if (worker
->w_task
!= NONE
)
398 return(worker
->w_task
== proc_e
);
399 else if (worker
->w_job
.j_fp
!= NULL
) {
400 return(worker
->w_job
.j_fp
->fp_task
== proc_e
);