minor fixes for safecopy & safemap tests
[minix.git] / servers / vfs / worker.c
blobadd1d371228455a6d2986f1c2980b858030e29b0
1 #include "fs.h"
2 #include "glo.h"
3 #include "fproc.h"
4 #include "threads.h"
5 #include "job.h"
6 #include <assert.h>
8 static void append_job(struct job *job, void *(*func)(void *arg));
9 static void get_work(struct worker_thread *worker);
10 static void *worker_main(void *arg);
11 static void worker_sleep(struct worker_thread *worker);
12 static void worker_wake(struct worker_thread *worker);
13 static int worker_waiting_for(struct worker_thread *worker, endpoint_t
14 proc_e);
15 static int init = 0;
16 static mthread_attr_t tattr;
18 #ifdef MKCOVERAGE
19 # define TH_STACKSIZE (10 * 1024)
20 #else
21 # define TH_STACKSIZE (7 * 1024)
22 #endif
24 #define ASSERTW(w) assert((w) == &sys_worker || (w) == &dl_worker || \
25 ((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]));
27 /*===========================================================================*
28 * worker_init *
29 *===========================================================================*/
30 void worker_init(struct worker_thread *wp)
32 /* Initialize worker thread */
33 if (!init) {
34 threads_init();
35 if (mthread_attr_init(&tattr) != 0)
36 panic("failed to initialize attribute");
37 if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
38 panic("couldn't set default thread stack size");
39 if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0)
40 panic("couldn't set default thread detach state");
41 invalid_thread_id = mthread_self(); /* Assuming we're the main thread*/
42 pending = 0;
43 init = 1;
46 ASSERTW(wp);
48 wp->w_job.j_func = NULL; /* Mark not in use */
49 wp->w_next = NULL;
50 if (mutex_init(&wp->w_event_mutex, NULL) != 0)
51 panic("failed to initialize mutex");
52 if (cond_init(&wp->w_event, NULL) != 0)
53 panic("failed to initialize conditional variable");
54 if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
55 panic("unable to start thread");
56 yield();
59 /*===========================================================================*
60 * get_work *
61 *===========================================================================*/
62 static void get_work(struct worker_thread *worker)
64 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the
65 * latter case wait for new work to come in. */
67 struct job *new_job;
68 struct fproc *rfp;
70 ASSERTW(worker);
71 self = worker;
73 /* Do we have queued work to do? */
74 if ((new_job = worker->w_job.j_next) != NULL) {
75 worker->w_job = *new_job;
76 free(new_job);
77 return;
78 } else if (worker != &sys_worker && worker != &dl_worker && pending > 0) {
79 /* Find pending work */
80 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
81 if (rfp->fp_flags & FP_PENDING) {
82 worker->w_job = rfp->fp_job;
83 rfp->fp_job.j_func = NULL;
84 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
85 pending--;
86 assert(pending >= 0);
87 return;
90 panic("Pending work inconsistency");
93 /* Wait for work to come to us */
94 worker_sleep(worker);
97 /*===========================================================================*
98 * worker_available *
99 *===========================================================================*/
100 int worker_available(void)
102 int busy, i;
104 busy = 0;
105 for (i = 0; i < NR_WTHREADS; i++) {
106 if (workers[i].w_job.j_func != NULL)
107 busy++;
110 return(NR_WTHREADS - busy);
113 /*===========================================================================*
114 * worker_main *
115 *===========================================================================*/
116 static void *worker_main(void *arg)
118 /* Worker thread main loop */
119 struct worker_thread *me;
121 me = (struct worker_thread *) arg;
122 ASSERTW(me);
124 while(TRUE) {
125 get_work(me);
127 /* Register ourselves in fproc table if possible */
128 if (me->w_job.j_fp != NULL) {
129 me->w_job.j_fp->fp_wtid = me->w_tid;
132 /* Carry out work */
133 me->w_job.j_func(&me->w_job);
135 /* Deregister if possible */
136 if (me->w_job.j_fp != NULL) {
137 me->w_job.j_fp->fp_wtid = invalid_thread_id;
140 /* Mark ourselves as done */
141 me->w_job.j_func = NULL;
142 me->w_job.j_fp = NULL;
145 return(NULL); /* Unreachable */
148 /*===========================================================================*
149 * dl_worker_start *
150 *===========================================================================*/
151 void dl_worker_start(void *(*func)(void *arg))
153 /* Start the deadlock resolving worker. This worker is reserved to run in case
154 * all other workers are busy and we have to have an additional worker to come
155 * to the rescue. */
156 assert(dl_worker.w_job.j_func == NULL);
158 if (dl_worker.w_job.j_func == NULL) {
159 dl_worker.w_job.j_fp = fp;
160 dl_worker.w_job.j_m_in = m_in;
161 dl_worker.w_job.j_func = func;
162 worker_wake(&dl_worker);
166 /*===========================================================================*
167 * sys_worker_start *
168 *===========================================================================*/
169 void sys_worker_start(void *(*func)(void *arg))
171 /* Carry out work for the system (i.e., kernel or PM). If this thread is idle
172 * do it right away, else create new job and append it to the queue. */
174 if (sys_worker.w_job.j_func == NULL) {
175 sys_worker.w_job.j_fp = fp;
176 sys_worker.w_job.j_m_in = m_in;
177 sys_worker.w_job.j_func = func;
178 worker_wake(&sys_worker);
179 } else {
180 append_job(&sys_worker.w_job, func);
184 /*===========================================================================*
185 * append_job *
186 *===========================================================================*/
187 static void append_job(struct job *job, void *(*func)(void *arg))
189 /* Append a job */
191 struct job *new_job, *tail;
193 /* Create new job */
194 new_job = calloc(1, sizeof(struct job));
195 assert(new_job != NULL);
196 new_job->j_fp = fp;
197 new_job->j_m_in = m_in;
198 new_job->j_func = func;
199 new_job->j_next = NULL;
200 new_job->j_err_code = OK;
202 /* Append to queue */
203 tail = job;
204 while (tail->j_next != NULL) tail = tail->j_next;
205 tail->j_next = new_job;
208 /*===========================================================================*
209 * worker_start *
210 *===========================================================================*/
211 void worker_start(void *(*func)(void *arg))
213 /* Find an available worker or wait for one */
214 int i;
215 struct worker_thread *worker;
217 if (fp->fp_flags & FP_DROP_WORK) {
218 return; /* This process is not allowed to accept new work */
221 worker = NULL;
222 for (i = 0; i < NR_WTHREADS; i++) {
223 if (workers[i].w_job.j_func == NULL) {
224 worker = &workers[i];
225 break;
229 if (worker != NULL) {
230 worker->w_job.j_fp = fp;
231 worker->w_job.j_m_in = m_in;
232 worker->w_job.j_func = func;
233 worker->w_job.j_next = NULL;
234 worker->w_job.j_err_code = OK;
235 worker_wake(worker);
236 return;
239 /* No worker threads available, let's wait for one to finish. */
240 /* If this process already has a job scheduled, forget about this new
241 * job;
242 * - the new job is do_dummy and we have already scheduled an actual job
243 * - the new job is an actual job and we have already scheduled do_dummy in
244 * order to exit this proc, so doing the new job is pointless. */
245 if (fp->fp_job.j_func == NULL) {
246 assert(!(fp->fp_flags & FP_PENDING));
247 fp->fp_job.j_fp = fp;
248 fp->fp_job.j_m_in = m_in;
249 fp->fp_job.j_func = func;
250 fp->fp_job.j_next = NULL;
251 fp->fp_job.j_err_code = OK;
252 fp->fp_flags |= FP_PENDING;
253 pending++;
257 /*===========================================================================*
258 * worker_sleep *
259 *===========================================================================*/
260 static void worker_sleep(struct worker_thread *worker)
262 ASSERTW(worker);
263 assert(self == worker);
264 if (mutex_lock(&worker->w_event_mutex) != 0)
265 panic("unable to lock event mutex");
266 if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
267 panic("could not wait on conditional variable");
268 if (mutex_unlock(&worker->w_event_mutex) != 0)
269 panic("unable to unlock event mutex");
270 self = worker;
273 /*===========================================================================*
274 * worker_wake *
275 *===========================================================================*/
276 static void worker_wake(struct worker_thread *worker)
278 /* Signal a worker to wake up */
279 ASSERTW(worker);
280 if (mutex_lock(&worker->w_event_mutex) != 0)
281 panic("unable to lock event mutex");
282 if (cond_signal(&worker->w_event) != 0)
283 panic("unable to signal conditional variable");
284 if (mutex_unlock(&worker->w_event_mutex) != 0)
285 panic("unable to unlock event mutex");
288 /*===========================================================================*
289 * worker_wait *
290 *===========================================================================*/
291 void worker_wait(void)
293 self->w_job.j_err_code = err_code;
294 worker_sleep(self);
295 /* We continue here after waking up */
296 fp = self->w_job.j_fp; /* Restore global data */
297 err_code = self->w_job.j_err_code;
298 assert(self->w_next == NULL);
301 /*===========================================================================*
302 * worker_signal *
303 *===========================================================================*/
304 void worker_signal(struct worker_thread *worker)
306 ASSERTW(worker); /* Make sure we have a valid thread */
307 worker_wake(worker);
310 /*===========================================================================*
311 * worker_stop *
312 *===========================================================================*/
313 void worker_stop(struct worker_thread *worker)
315 ASSERTW(worker); /* Make sure we have a valid thread */
316 if (worker->w_task != NONE) {
317 /* This thread is communicating with a driver or file server */
318 if (worker->w_drv_sendrec != NULL) { /* Driver */
319 worker->w_drv_sendrec->m_type = EIO;
320 } else if (worker->w_fs_sendrec != NULL) { /* FS */
321 worker->w_fs_sendrec->m_type = EIO;
322 } else {
323 panic("reply storage consistency error"); /* Oh dear */
325 } else {
326 worker->w_job.j_m_in.m_type = EIO;
328 worker_wake(worker);
331 /*===========================================================================*
332 * worker_stop_by_endpt *
333 *===========================================================================*/
334 void worker_stop_by_endpt(endpoint_t proc_e)
336 struct worker_thread *worker;
337 int i;
339 if (proc_e == NONE) return;
341 if (worker_waiting_for(&sys_worker, proc_e)) worker_stop(&sys_worker);
342 if (worker_waiting_for(&dl_worker, proc_e)) worker_stop(&dl_worker);
344 for (i = 0; i < NR_WTHREADS; i++) {
345 worker = &workers[i];
346 if (worker_waiting_for(worker, proc_e))
347 worker_stop(worker);
351 /*===========================================================================*
352 * worker_get *
353 *===========================================================================*/
354 struct worker_thread *worker_get(thread_t worker_tid)
356 int i;
357 struct worker_thread *worker;
359 worker = NULL;
360 if (worker_tid == sys_worker.w_tid)
361 worker = &sys_worker;
362 else if (worker_tid == dl_worker.w_tid)
363 worker = &dl_worker;
364 else {
365 for (i = 0; i < NR_WTHREADS; i++) {
366 if (workers[i].w_tid == worker_tid) {
367 worker = &workers[i];
368 break;
373 return(worker);
376 /*===========================================================================*
377 * worker_getjob *
378 *===========================================================================*/
379 struct job *worker_getjob(thread_t worker_tid)
381 struct worker_thread *worker;
383 if ((worker = worker_get(worker_tid)) != NULL)
384 return(&worker->w_job);
386 return(NULL);
389 /*===========================================================================*
390 * worker_waiting_for *
391 *===========================================================================*/
392 static int worker_waiting_for(struct worker_thread *worker, endpoint_t proc_e)
394 ASSERTW(worker); /* Make sure we have a valid thread */
396 if (worker->w_job.j_func != NULL) {
397 if (worker->w_task != NONE)
398 return(worker->w_task == proc_e);
399 else if (worker->w_job.j_fp != NULL) {
400 return(worker->w_job.j_fp->fp_task == proc_e);
404 return(0);