Backed out 2 changesets (bug 1943998) for causing wd failures @ phases.py CLOSED...
[gecko.git] / nsprpub / pr / src / misc / prtpool.c
blob00f07b756c6c730e38419c21e451135e22fb691d
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 #include "nspr.h"
8 /*
9 * Thread pools
10 * Thread pools create and manage threads to provide support for
11 * scheduling jobs onto one or more threads.
14 #ifdef OPT_WINNT
15 # include <windows.h>
16 #endif
19 * worker thread
21 typedef struct wthread {
22 PRCList links;
23 PRThread* thread;
24 } wthread;
27 * queue of timer jobs
29 typedef struct timer_jobq {
30 PRCList list;
31 PRLock* lock;
32 PRCondVar* cv;
33 PRInt32 cnt;
34 PRCList wthreads;
35 } timer_jobq;
38 * queue of jobs
40 typedef struct tp_jobq {
41 PRCList list;
42 PRInt32 cnt;
43 PRLock* lock;
44 PRCondVar* cv;
45 PRCList wthreads;
46 #ifdef OPT_WINNT
47 HANDLE nt_completion_port;
48 #endif
49 } tp_jobq;
52 * queue of IO jobs
54 typedef struct io_jobq {
55 PRCList list;
56 PRPollDesc* pollfds;
57 PRInt32 npollfds;
58 PRJob** polljobs;
59 PRLock* lock;
60 PRInt32 cnt;
61 PRFileDesc* notify_fd;
62 PRCList wthreads;
63 } io_jobq;
66 * Threadpool
68 struct PRThreadPool {
69 PRInt32 init_threads;
70 PRInt32 max_threads;
71 PRInt32 current_threads;
72 PRInt32 idle_threads;
73 PRUint32 stacksize;
74 tp_jobq jobq;
75 io_jobq ioq;
76 timer_jobq timerq;
77 PRLock* join_lock; /* used with jobp->join_cv */
78 PRCondVar* shutdown_cv;
79 PRBool shutdown;
82 typedef enum io_op_type {
83 JOB_IO_READ,
84 JOB_IO_WRITE,
85 JOB_IO_CONNECT,
86 JOB_IO_ACCEPT
87 } io_op_type;
89 #ifdef OPT_WINNT
90 typedef struct NT_notifier {
91 OVERLAPPED overlapped; /* must be first */
92 PRJob* jobp;
93 } NT_notifier;
94 #endif
96 struct PRJob {
97 PRCList links; /* for linking jobs */
98 PRBool on_ioq; /* job on ioq */
99 PRBool on_timerq; /* job on timerq */
100 PRJobFn job_func;
101 void* job_arg;
102 PRCondVar* join_cv;
103 PRBool join_wait; /* == PR_TRUE, when waiting to join */
104 PRCondVar* cancel_cv; /* for cancelling IO jobs */
105 PRBool cancel_io; /* for cancelling IO jobs */
106 PRThreadPool* tpool; /* back pointer to thread pool */
107 PRJobIoDesc* iod;
108 io_op_type io_op;
109 PRInt16 io_poll_flags;
110 PRNetAddr* netaddr;
111 PRIntervalTime timeout; /* relative value */
112 PRIntervalTime absolute;
113 #ifdef OPT_WINNT
114 NT_notifier nt_notifier;
115 #endif
118 #define JOB_LINKS_PTR(_qp) ((PRJob*)((char*)(_qp) - offsetof(PRJob, links)))
120 #define WTHREAD_LINKS_PTR(_qp) \
121 ((wthread*)((char*)(_qp) - offsetof(wthread, links)))
123 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
125 #define JOIN_NOTIFY(_jobp) \
126 PR_BEGIN_MACRO \
127 PR_Lock(_jobp->tpool->join_lock); \
128 _jobp->join_wait = PR_FALSE; \
129 PR_NotifyCondVar(_jobp->join_cv); \
130 PR_Unlock(_jobp->tpool->join_lock); \
131 PR_END_MACRO
133 #define CANCEL_IO_JOB(jobp) \
134 PR_BEGIN_MACRO \
135 jobp->cancel_io = PR_FALSE; \
136 jobp->on_ioq = PR_FALSE; \
137 PR_REMOVE_AND_INIT_LINK(&jobp->links); \
138 tp->ioq.cnt--; \
139 PR_NotifyCondVar(jobp->cancel_cv); \
140 PR_END_MACRO
142 static void delete_job(PRJob* jobp);
143 static PRThreadPool* alloc_threadpool(void);
144 static PRJob* alloc_job(PRBool joinable, PRThreadPool* tp);
145 static void notify_ioq(PRThreadPool* tp);
146 static void notify_timerq(PRThreadPool* tp);
149 * locks are acquired in the following order
151 * tp->ioq.lock,tp->timerq.lock
154 * tp->jobq->lock
158 * worker thread function
160 static void wstart(void* arg) {
161 PRThreadPool* tp = (PRThreadPool*)arg;
162 PRCList* head;
165 * execute jobs until shutdown
167 while (!tp->shutdown) {
168 PRJob* jobp;
169 #ifdef OPT_WINNT
170 BOOL rv;
171 DWORD unused, shutdown;
172 LPOVERLAPPED olp;
174 PR_Lock(tp->jobq.lock);
175 tp->idle_threads++;
176 PR_Unlock(tp->jobq.lock);
177 rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, &unused,
178 &shutdown, &olp, INFINITE);
180 PR_ASSERT(rv);
181 if (shutdown) {
182 break;
184 jobp = ((NT_notifier*)olp)->jobp;
185 PR_Lock(tp->jobq.lock);
186 tp->idle_threads--;
187 tp->jobq.cnt--;
188 PR_Unlock(tp->jobq.lock);
189 #else
191 PR_Lock(tp->jobq.lock);
192 while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
193 tp->idle_threads++;
194 PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
195 tp->idle_threads--;
197 if (tp->shutdown) {
198 PR_Unlock(tp->jobq.lock);
199 break;
201 head = PR_LIST_HEAD(&tp->jobq.list);
203 * remove job from queue
205 PR_REMOVE_AND_INIT_LINK(head);
206 tp->jobq.cnt--;
207 jobp = JOB_LINKS_PTR(head);
208 PR_Unlock(tp->jobq.lock);
209 #endif
211 jobp->job_func(jobp->job_arg);
212 if (!JOINABLE_JOB(jobp)) {
213 delete_job(jobp);
214 } else {
215 JOIN_NOTIFY(jobp);
218 PR_Lock(tp->jobq.lock);
219 tp->current_threads--;
220 PR_Unlock(tp->jobq.lock);
224 * add a job to the work queue
226 static void add_to_jobq(PRThreadPool* tp, PRJob* jobp) {
228 * add to jobq
230 #ifdef OPT_WINNT
231 PR_Lock(tp->jobq.lock);
232 tp->jobq.cnt++;
233 PR_Unlock(tp->jobq.lock);
235 * notify worker thread(s)
237 PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, FALSE,
238 &jobp->nt_notifier.overlapped);
239 #else
240 PR_Lock(tp->jobq.lock);
241 PR_APPEND_LINK(&jobp->links, &tp->jobq.list);
242 tp->jobq.cnt++;
243 if ((tp->idle_threads < tp->jobq.cnt) &&
244 (tp->current_threads < tp->max_threads)) {
245 wthread* wthrp;
247 * increment thread count and unlock the jobq lock
249 tp->current_threads++;
250 PR_Unlock(tp->jobq.lock);
251 /* create new worker thread */
252 wthrp = PR_NEWZAP(wthread);
253 if (wthrp) {
254 wthrp->thread =
255 PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL,
256 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, tp->stacksize);
257 if (NULL == wthrp->thread) {
258 PR_DELETE(wthrp); /* this sets wthrp to NULL */
261 PR_Lock(tp->jobq.lock);
262 if (NULL == wthrp) {
263 tp->current_threads--;
264 } else {
265 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
269 * wakeup a worker thread
271 PR_NotifyCondVar(tp->jobq.cv);
272 PR_Unlock(tp->jobq.lock);
273 #endif
277 * io worker thread function
279 static void io_wstart(void* arg) {
280 PRThreadPool* tp = (PRThreadPool*)arg;
281 int pollfd_cnt, pollfds_used;
282 int rv;
283 PRCList *qp, *nextqp;
284 PRPollDesc* pollfds = NULL;
285 PRJob** polljobs = NULL;
286 int poll_timeout;
287 PRIntervalTime now;
290 * scan io_jobq
291 * construct poll list
292 * call PR_Poll
293 * for all fds, for which poll returns true, move the job to
294 * jobq and wakeup worker thread.
296 while (!tp->shutdown) {
297 PRJob* jobp;
299 pollfd_cnt = tp->ioq.cnt + 10;
300 if (pollfd_cnt > tp->ioq.npollfds) {
302 * re-allocate pollfd array if the current one is not large
303 * enough
305 if (NULL != tp->ioq.pollfds) {
306 PR_Free(tp->ioq.pollfds);
308 tp->ioq.pollfds = (PRPollDesc*)PR_Malloc(
309 pollfd_cnt * (sizeof(PRPollDesc) + sizeof(PRJob*)));
310 PR_ASSERT(NULL != tp->ioq.pollfds);
312 * array of pollfds
314 pollfds = tp->ioq.pollfds;
315 tp->ioq.polljobs = (PRJob**)(&tp->ioq.pollfds[pollfd_cnt]);
317 * parallel array of jobs
319 polljobs = tp->ioq.polljobs;
320 tp->ioq.npollfds = pollfd_cnt;
323 pollfds_used = 0;
325 * add the notify fd; used for unblocking io thread(s)
327 pollfds[pollfds_used].fd = tp->ioq.notify_fd;
328 pollfds[pollfds_used].in_flags = PR_POLL_READ;
329 pollfds[pollfds_used].out_flags = 0;
330 polljobs[pollfds_used] = NULL;
331 pollfds_used++;
333 * fill in the pollfd array
335 PR_Lock(tp->ioq.lock);
336 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
337 nextqp = qp->next;
338 jobp = JOB_LINKS_PTR(qp);
339 if (jobp->cancel_io) {
340 CANCEL_IO_JOB(jobp);
341 continue;
343 if (pollfds_used == (pollfd_cnt)) {
344 break;
346 pollfds[pollfds_used].fd = jobp->iod->socket;
347 pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
348 pollfds[pollfds_used].out_flags = 0;
349 polljobs[pollfds_used] = jobp;
351 pollfds_used++;
353 if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
354 qp = tp->ioq.list.next;
355 jobp = JOB_LINKS_PTR(qp);
356 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
357 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
358 } else if (PR_INTERVAL_NO_WAIT == jobp->timeout) {
359 poll_timeout = PR_INTERVAL_NO_WAIT;
360 } else {
361 poll_timeout = jobp->absolute - PR_IntervalNow();
362 if (poll_timeout <= 0) { /* already timed out */
363 poll_timeout = PR_INTERVAL_NO_WAIT;
366 } else {
367 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
369 PR_Unlock(tp->ioq.lock);
372 * XXXX
373 * should retry if more jobs have been added to the queue?
376 PR_ASSERT(pollfds_used <= pollfd_cnt);
377 rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
379 if (tp->shutdown) {
380 break;
383 if (rv > 0) {
385 * at least one io event is set
387 PRStatus rval_status;
388 PRInt32 index;
390 PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
392 * reset the pollable event, if notified
394 if (pollfds[0].out_flags & PR_POLL_READ) {
395 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
396 PR_ASSERT(PR_SUCCESS == rval_status);
399 for (index = 1; index < (pollfds_used); index++) {
400 PRInt16 events = pollfds[index].in_flags;
401 PRInt16 revents = pollfds[index].out_flags;
402 jobp = polljobs[index];
404 if ((revents & PR_POLL_NVAL) || /* busted in all cases */
405 (revents & PR_POLL_ERR) ||
406 ((events & PR_POLL_WRITE) &&
407 (revents & PR_POLL_HUP))) { /* write op & hup */
408 PR_Lock(tp->ioq.lock);
409 if (jobp->cancel_io) {
410 CANCEL_IO_JOB(jobp);
411 PR_Unlock(tp->ioq.lock);
412 continue;
414 PR_REMOVE_AND_INIT_LINK(&jobp->links);
415 tp->ioq.cnt--;
416 jobp->on_ioq = PR_FALSE;
417 PR_Unlock(tp->ioq.lock);
419 /* set error */
420 if (PR_POLL_NVAL & revents) {
421 jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
422 } else if (PR_POLL_HUP & revents) {
423 jobp->iod->error = PR_CONNECT_RESET_ERROR;
424 } else {
425 jobp->iod->error = PR_IO_ERROR;
429 * add to jobq
431 add_to_jobq(tp, jobp);
432 } else if (revents) {
434 * add to jobq
436 PR_Lock(tp->ioq.lock);
437 if (jobp->cancel_io) {
438 CANCEL_IO_JOB(jobp);
439 PR_Unlock(tp->ioq.lock);
440 continue;
442 PR_REMOVE_AND_INIT_LINK(&jobp->links);
443 tp->ioq.cnt--;
444 jobp->on_ioq = PR_FALSE;
445 PR_Unlock(tp->ioq.lock);
447 if (jobp->io_op == JOB_IO_CONNECT) {
448 if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) {
449 jobp->iod->error = 0;
450 } else {
451 jobp->iod->error = PR_GetError();
453 } else {
454 jobp->iod->error = 0;
457 add_to_jobq(tp, jobp);
462 * timeout processing
464 now = PR_IntervalNow();
465 PR_Lock(tp->ioq.lock);
466 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
467 nextqp = qp->next;
468 jobp = JOB_LINKS_PTR(qp);
469 if (jobp->cancel_io) {
470 CANCEL_IO_JOB(jobp);
471 continue;
473 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
474 break;
476 if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
477 ((PRInt32)(jobp->absolute - now) > 0)) {
478 break;
480 PR_REMOVE_AND_INIT_LINK(&jobp->links);
481 tp->ioq.cnt--;
482 jobp->on_ioq = PR_FALSE;
483 jobp->iod->error = PR_IO_TIMEOUT_ERROR;
484 add_to_jobq(tp, jobp);
486 PR_Unlock(tp->ioq.lock);
491 * timer worker thread function
493 static void timer_wstart(void* arg) {
494 PRThreadPool* tp = (PRThreadPool*)arg;
495 PRCList* qp;
496 PRIntervalTime timeout;
497 PRIntervalTime now;
500 * call PR_WaitCondVar with minimum value of all timeouts
502 while (!tp->shutdown) {
503 PRJob* jobp;
505 PR_Lock(tp->timerq.lock);
506 if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
507 timeout = PR_INTERVAL_NO_TIMEOUT;
508 } else {
509 PRCList* qp;
511 qp = tp->timerq.list.next;
512 jobp = JOB_LINKS_PTR(qp);
514 timeout = jobp->absolute - PR_IntervalNow();
515 if (timeout <= 0) {
516 timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
519 if (PR_INTERVAL_NO_WAIT != timeout) {
520 PR_WaitCondVar(tp->timerq.cv, timeout);
522 if (tp->shutdown) {
523 PR_Unlock(tp->timerq.lock);
524 break;
527 * move expired-timer jobs to jobq
529 now = PR_IntervalNow();
530 while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
531 qp = tp->timerq.list.next;
532 jobp = JOB_LINKS_PTR(qp);
534 if ((PRInt32)(jobp->absolute - now) > 0) {
535 break;
538 * job timed out
540 PR_REMOVE_AND_INIT_LINK(&jobp->links);
541 tp->timerq.cnt--;
542 jobp->on_timerq = PR_FALSE;
543 add_to_jobq(tp, jobp);
545 PR_Unlock(tp->timerq.lock);
549 static void delete_threadpool(PRThreadPool* tp) {
550 if (NULL != tp) {
551 if (NULL != tp->shutdown_cv) {
552 PR_DestroyCondVar(tp->shutdown_cv);
554 if (NULL != tp->jobq.cv) {
555 PR_DestroyCondVar(tp->jobq.cv);
557 if (NULL != tp->jobq.lock) {
558 PR_DestroyLock(tp->jobq.lock);
560 if (NULL != tp->join_lock) {
561 PR_DestroyLock(tp->join_lock);
563 #ifdef OPT_WINNT
564 if (NULL != tp->jobq.nt_completion_port) {
565 CloseHandle(tp->jobq.nt_completion_port);
567 #endif
568 /* Timer queue */
569 if (NULL != tp->timerq.cv) {
570 PR_DestroyCondVar(tp->timerq.cv);
572 if (NULL != tp->timerq.lock) {
573 PR_DestroyLock(tp->timerq.lock);
576 if (NULL != tp->ioq.lock) {
577 PR_DestroyLock(tp->ioq.lock);
579 if (NULL != tp->ioq.pollfds) {
580 PR_Free(tp->ioq.pollfds);
582 if (NULL != tp->ioq.notify_fd) {
583 PR_DestroyPollableEvent(tp->ioq.notify_fd);
585 PR_Free(tp);
587 return;
590 static PRThreadPool* alloc_threadpool(void) {
591 PRThreadPool* tp;
593 tp = (PRThreadPool*)PR_CALLOC(sizeof(*tp));
594 if (NULL == tp) {
595 goto failed;
597 tp->jobq.lock = PR_NewLock();
598 if (NULL == tp->jobq.lock) {
599 goto failed;
601 tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
602 if (NULL == tp->jobq.cv) {
603 goto failed;
605 tp->join_lock = PR_NewLock();
606 if (NULL == tp->join_lock) {
607 goto failed;
609 #ifdef OPT_WINNT
610 tp->jobq.nt_completion_port =
611 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
612 if (NULL == tp->jobq.nt_completion_port) {
613 goto failed;
615 #endif
617 tp->ioq.lock = PR_NewLock();
618 if (NULL == tp->ioq.lock) {
619 goto failed;
622 /* Timer queue */
624 tp->timerq.lock = PR_NewLock();
625 if (NULL == tp->timerq.lock) {
626 goto failed;
628 tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
629 if (NULL == tp->timerq.cv) {
630 goto failed;
633 tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
634 if (NULL == tp->shutdown_cv) {
635 goto failed;
637 tp->ioq.notify_fd = PR_NewPollableEvent();
638 if (NULL == tp->ioq.notify_fd) {
639 goto failed;
641 return tp;
642 failed:
643 delete_threadpool(tp);
644 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
645 return NULL;
648 /* Create thread pool */
649 PR_IMPLEMENT(PRThreadPool*)
650 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
651 PRUint32 stacksize) {
652 PRThreadPool* tp;
653 PRThread* thr;
654 int i;
655 wthread* wthrp;
657 tp = alloc_threadpool();
658 if (NULL == tp) {
659 return NULL;
662 tp->init_threads = initial_threads;
663 tp->max_threads = max_threads;
664 tp->stacksize = stacksize;
665 PR_INIT_CLIST(&tp->jobq.list);
666 PR_INIT_CLIST(&tp->ioq.list);
667 PR_INIT_CLIST(&tp->timerq.list);
668 PR_INIT_CLIST(&tp->jobq.wthreads);
669 PR_INIT_CLIST(&tp->ioq.wthreads);
670 PR_INIT_CLIST(&tp->timerq.wthreads);
671 tp->shutdown = PR_FALSE;
673 PR_Lock(tp->jobq.lock);
674 for (i = 0; i < initial_threads; ++i) {
675 thr = PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL,
676 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
677 PR_ASSERT(thr);
678 wthrp = PR_NEWZAP(wthread);
679 PR_ASSERT(wthrp);
680 wthrp->thread = thr;
681 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
683 tp->current_threads = initial_threads;
685 thr = PR_CreateThread(PR_USER_THREAD, io_wstart, tp, PR_PRIORITY_NORMAL,
686 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
687 PR_ASSERT(thr);
688 wthrp = PR_NEWZAP(wthread);
689 PR_ASSERT(wthrp);
690 wthrp->thread = thr;
691 PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
693 thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, tp, PR_PRIORITY_NORMAL,
694 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
695 PR_ASSERT(thr);
696 wthrp = PR_NEWZAP(wthread);
697 PR_ASSERT(wthrp);
698 wthrp->thread = thr;
699 PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
701 PR_Unlock(tp->jobq.lock);
702 return tp;
705 static void delete_job(PRJob* jobp) {
706 if (NULL != jobp) {
707 if (NULL != jobp->join_cv) {
708 PR_DestroyCondVar(jobp->join_cv);
709 jobp->join_cv = NULL;
711 if (NULL != jobp->cancel_cv) {
712 PR_DestroyCondVar(jobp->cancel_cv);
713 jobp->cancel_cv = NULL;
715 PR_DELETE(jobp);
719 static PRJob* alloc_job(PRBool joinable, PRThreadPool* tp) {
720 PRJob* jobp;
722 jobp = PR_NEWZAP(PRJob);
723 if (NULL == jobp) {
724 goto failed;
726 if (joinable) {
727 jobp->join_cv = PR_NewCondVar(tp->join_lock);
728 jobp->join_wait = PR_TRUE;
729 if (NULL == jobp->join_cv) {
730 goto failed;
732 } else {
733 jobp->join_cv = NULL;
735 #ifdef OPT_WINNT
736 jobp->nt_notifier.jobp = jobp;
737 #endif
738 return jobp;
739 failed:
740 delete_job(jobp);
741 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
742 return NULL;
745 /* queue a job */
746 PR_IMPLEMENT(PRJob*)
747 PR_QueueJob(PRThreadPool* tpool, PRJobFn fn, void* arg, PRBool joinable) {
748 PRJob* jobp;
750 jobp = alloc_job(joinable, tpool);
751 if (NULL == jobp) {
752 return NULL;
755 jobp->job_func = fn;
756 jobp->job_arg = arg;
757 jobp->tpool = tpool;
759 add_to_jobq(tpool, jobp);
760 return jobp;
763 /* queue a job, when a socket is readable or writeable */
764 static PRJob* queue_io_job(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn,
765 void* arg, PRBool joinable, io_op_type op) {
766 PRJob* jobp;
767 PRIntervalTime now;
769 jobp = alloc_job(joinable, tpool);
770 if (NULL == jobp) {
771 return NULL;
775 * Add a new job to io_jobq
776 * wakeup io worker thread
779 jobp->job_func = fn;
780 jobp->job_arg = arg;
781 jobp->tpool = tpool;
782 jobp->iod = iod;
783 if (JOB_IO_READ == op) {
784 jobp->io_op = JOB_IO_READ;
785 jobp->io_poll_flags = PR_POLL_READ;
786 } else if (JOB_IO_WRITE == op) {
787 jobp->io_op = JOB_IO_WRITE;
788 jobp->io_poll_flags = PR_POLL_WRITE;
789 } else if (JOB_IO_ACCEPT == op) {
790 jobp->io_op = JOB_IO_ACCEPT;
791 jobp->io_poll_flags = PR_POLL_READ;
792 } else if (JOB_IO_CONNECT == op) {
793 jobp->io_op = JOB_IO_CONNECT;
794 jobp->io_poll_flags = PR_POLL_WRITE | PR_POLL_EXCEPT;
795 } else {
796 delete_job(jobp);
797 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
798 return NULL;
801 jobp->timeout = iod->timeout;
802 if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
803 (PR_INTERVAL_NO_WAIT == iod->timeout)) {
804 jobp->absolute = iod->timeout;
805 } else {
806 now = PR_IntervalNow();
807 jobp->absolute = now + iod->timeout;
810 PR_Lock(tpool->ioq.lock);
812 if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
813 (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
814 PR_APPEND_LINK(&jobp->links, &tpool->ioq.list);
815 } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
816 PR_INSERT_LINK(&jobp->links, &tpool->ioq.list);
817 } else {
818 PRCList* qp;
819 PRJob* tmp_jobp;
821 * insert into the timeout-sorted ioq
823 for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; qp = qp->prev) {
824 tmp_jobp = JOB_LINKS_PTR(qp);
825 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
826 break;
829 PR_INSERT_AFTER(&jobp->links, qp);
832 jobp->on_ioq = PR_TRUE;
833 tpool->ioq.cnt++;
835 * notify io worker thread(s)
837 PR_Unlock(tpool->ioq.lock);
838 notify_ioq(tpool);
839 return jobp;
842 /* queue a job, when a socket is readable */
843 PR_IMPLEMENT(PRJob*)
844 PR_QueueJob_Read(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
845 PRBool joinable) {
846 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
849 /* queue a job, when a socket is writeable */
850 PR_IMPLEMENT(PRJob*)
851 PR_QueueJob_Write(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
852 PRBool joinable) {
853 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
856 /* queue a job, when a socket has a pending connection */
857 PR_IMPLEMENT(PRJob*)
858 PR_QueueJob_Accept(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
859 PRBool joinable) {
860 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
863 /* queue a job, when a socket can be connected */
864 PR_IMPLEMENT(PRJob*)
865 PR_QueueJob_Connect(PRThreadPool* tpool, PRJobIoDesc* iod,
866 const PRNetAddr* addr, PRJobFn fn, void* arg,
867 PRBool joinable) {
868 PRStatus rv;
869 PRErrorCode err;
871 rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
872 if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) {
873 /* connection pending */
874 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
877 * connection succeeded or failed; add to jobq right away
879 if (rv == PR_FAILURE) {
880 iod->error = err;
881 } else {
882 iod->error = 0;
884 return (PR_QueueJob(tpool, fn, arg, joinable));
887 /* queue a job, when a timer expires */
888 PR_IMPLEMENT(PRJob*)
889 PR_QueueJob_Timer(PRThreadPool* tpool, PRIntervalTime timeout, PRJobFn fn,
890 void* arg, PRBool joinable) {
891 PRIntervalTime now;
892 PRJob* jobp;
894 if (PR_INTERVAL_NO_TIMEOUT == timeout) {
895 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
896 return NULL;
898 if (PR_INTERVAL_NO_WAIT == timeout) {
900 * no waiting; add to jobq right away
902 return (PR_QueueJob(tpool, fn, arg, joinable));
904 jobp = alloc_job(joinable, tpool);
905 if (NULL == jobp) {
906 return NULL;
910 * Add a new job to timer_jobq
911 * wakeup timer worker thread
914 jobp->job_func = fn;
915 jobp->job_arg = arg;
916 jobp->tpool = tpool;
917 jobp->timeout = timeout;
919 now = PR_IntervalNow();
920 jobp->absolute = now + timeout;
922 PR_Lock(tpool->timerq.lock);
923 jobp->on_timerq = PR_TRUE;
924 if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
925 PR_APPEND_LINK(&jobp->links, &tpool->timerq.list);
926 } else {
927 PRCList* qp;
928 PRJob* tmp_jobp;
930 * insert into the sorted timer jobq
932 for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
933 qp = qp->prev) {
934 tmp_jobp = JOB_LINKS_PTR(qp);
935 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
936 break;
939 PR_INSERT_AFTER(&jobp->links, qp);
941 tpool->timerq.cnt++;
943 * notify timer worker thread(s)
945 notify_timerq(tpool);
946 PR_Unlock(tpool->timerq.lock);
947 return jobp;
950 static void notify_timerq(PRThreadPool* tp) {
952 * wakeup the timer thread(s)
954 PR_NotifyCondVar(tp->timerq.cv);
957 static void notify_ioq(PRThreadPool* tp) {
958 PRStatus rval_status;
961 * wakeup the io thread(s)
963 rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
964 PR_ASSERT(PR_SUCCESS == rval_status);
968 * cancel a job
970 * XXXX: is this needed? likely to be removed
972 PR_IMPLEMENT(PRStatus)
973 PR_CancelJob(PRJob* jobp) {
974 PRStatus rval = PR_FAILURE;
975 PRThreadPool* tp;
977 if (jobp->on_timerq) {
979 * now, check again while holding the timerq lock
981 tp = jobp->tpool;
982 PR_Lock(tp->timerq.lock);
983 if (jobp->on_timerq) {
984 jobp->on_timerq = PR_FALSE;
985 PR_REMOVE_AND_INIT_LINK(&jobp->links);
986 tp->timerq.cnt--;
987 PR_Unlock(tp->timerq.lock);
988 if (!JOINABLE_JOB(jobp)) {
989 delete_job(jobp);
990 } else {
991 JOIN_NOTIFY(jobp);
993 rval = PR_SUCCESS;
994 } else {
995 PR_Unlock(tp->timerq.lock);
997 } else if (jobp->on_ioq) {
999 * now, check again while holding the ioq lock
1001 tp = jobp->tpool;
1002 PR_Lock(tp->ioq.lock);
1003 if (jobp->on_ioq) {
1004 jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
1005 if (NULL == jobp->cancel_cv) {
1006 PR_Unlock(tp->ioq.lock);
1007 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
1008 return PR_FAILURE;
1011 * mark job 'cancelled' and notify io thread(s)
1012 * XXXX:
1013 * this assumes there is only one io thread; when there
1014 * are multiple threads, the io thread processing this job
1015 * must be notified.
1017 jobp->cancel_io = PR_TRUE;
1018 PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
1019 notify_ioq(tp);
1020 PR_Lock(tp->ioq.lock);
1021 while (jobp->cancel_io) {
1022 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
1024 PR_Unlock(tp->ioq.lock);
1025 PR_ASSERT(!jobp->on_ioq);
1026 if (!JOINABLE_JOB(jobp)) {
1027 delete_job(jobp);
1028 } else {
1029 JOIN_NOTIFY(jobp);
1031 rval = PR_SUCCESS;
1032 } else {
1033 PR_Unlock(tp->ioq.lock);
1036 if (PR_FAILURE == rval) {
1037 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1039 return rval;
1042 /* join a job, wait until completion */
1043 PR_IMPLEMENT(PRStatus)
1044 PR_JoinJob(PRJob* jobp) {
1045 if (!JOINABLE_JOB(jobp)) {
1046 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1047 return PR_FAILURE;
1049 PR_Lock(jobp->tpool->join_lock);
1050 while (jobp->join_wait) {
1051 PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
1053 PR_Unlock(jobp->tpool->join_lock);
1054 delete_job(jobp);
1055 return PR_SUCCESS;
1058 /* shutdown threadpool */
1059 PR_IMPLEMENT(PRStatus)
1060 PR_ShutdownThreadPool(PRThreadPool* tpool) {
1061 PRStatus rval = PR_SUCCESS;
1063 PR_Lock(tpool->jobq.lock);
1064 tpool->shutdown = PR_TRUE;
1065 PR_NotifyAllCondVar(tpool->shutdown_cv);
1066 PR_Unlock(tpool->jobq.lock);
1068 return rval;
1072 * join thread pool
1073 * wait for termination of worker threads
1074 * reclaim threadpool resources
1076 PR_IMPLEMENT(PRStatus)
1077 PR_JoinThreadPool(PRThreadPool* tpool) {
1078 PRStatus rval = PR_SUCCESS;
1079 PRCList* head;
1080 PRStatus rval_status;
1082 PR_Lock(tpool->jobq.lock);
1083 while (!tpool->shutdown) {
1084 PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
1088 * wakeup worker threads
1090 #ifdef OPT_WINNT
1092 * post shutdown notification for all threads
1095 int i;
1096 for (i = 0; i < tpool->current_threads; i++) {
1097 PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, TRUE, NULL);
1100 #else
1101 PR_NotifyAllCondVar(tpool->jobq.cv);
1102 #endif
1105 * wakeup io thread(s)
1107 notify_ioq(tpool);
1110 * wakeup timer thread(s)
1112 PR_Lock(tpool->timerq.lock);
1113 notify_timerq(tpool);
1114 PR_Unlock(tpool->timerq.lock);
1116 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
1117 wthread* wthrp;
1119 head = PR_LIST_HEAD(&tpool->jobq.wthreads);
1120 PR_REMOVE_AND_INIT_LINK(head);
1121 PR_Unlock(tpool->jobq.lock);
1122 wthrp = WTHREAD_LINKS_PTR(head);
1123 rval_status = PR_JoinThread(wthrp->thread);
1124 PR_ASSERT(PR_SUCCESS == rval_status);
1125 PR_DELETE(wthrp);
1126 PR_Lock(tpool->jobq.lock);
1128 PR_Unlock(tpool->jobq.lock);
1129 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
1130 wthread* wthrp;
1132 head = PR_LIST_HEAD(&tpool->ioq.wthreads);
1133 PR_REMOVE_AND_INIT_LINK(head);
1134 wthrp = WTHREAD_LINKS_PTR(head);
1135 rval_status = PR_JoinThread(wthrp->thread);
1136 PR_ASSERT(PR_SUCCESS == rval_status);
1137 PR_DELETE(wthrp);
1140 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
1141 wthread* wthrp;
1143 head = PR_LIST_HEAD(&tpool->timerq.wthreads);
1144 PR_REMOVE_AND_INIT_LINK(head);
1145 wthrp = WTHREAD_LINKS_PTR(head);
1146 rval_status = PR_JoinThread(wthrp->thread);
1147 PR_ASSERT(PR_SUCCESS == rval_status);
1148 PR_DELETE(wthrp);
1152 * Delete queued jobs
1154 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
1155 PRJob* jobp;
1157 head = PR_LIST_HEAD(&tpool->jobq.list);
1158 PR_REMOVE_AND_INIT_LINK(head);
1159 jobp = JOB_LINKS_PTR(head);
1160 tpool->jobq.cnt--;
1161 delete_job(jobp);
1164 /* delete io jobs */
1165 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
1166 PRJob* jobp;
1168 head = PR_LIST_HEAD(&tpool->ioq.list);
1169 PR_REMOVE_AND_INIT_LINK(head);
1170 tpool->ioq.cnt--;
1171 jobp = JOB_LINKS_PTR(head);
1172 delete_job(jobp);
1175 /* delete timer jobs */
1176 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
1177 PRJob* jobp;
1179 head = PR_LIST_HEAD(&tpool->timerq.list);
1180 PR_REMOVE_AND_INIT_LINK(head);
1181 tpool->timerq.cnt--;
1182 jobp = JOB_LINKS_PTR(head);
1183 delete_job(jobp);
1186 PR_ASSERT(0 == tpool->jobq.cnt);
1187 PR_ASSERT(0 == tpool->ioq.cnt);
1188 PR_ASSERT(0 == tpool->timerq.cnt);
1190 delete_threadpool(tpool);
1191 return rval;