1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
10 * Thread pools create and manage threads to provide support for
11 * scheduling jobs onto one or more threads.
21 typedef struct wthread
{
29 typedef struct timer_jobq
{
40 typedef struct tp_jobq
{
47 HANDLE nt_completion_port
;
54 typedef struct io_jobq
{
61 PRFileDesc
* notify_fd
;
71 PRInt32 current_threads
;
77 PRLock
* join_lock
; /* used with jobp->join_cv */
78 PRCondVar
* shutdown_cv
;
82 typedef enum io_op_type
{
90 typedef struct NT_notifier
{
91 OVERLAPPED overlapped
; /* must be first */
97 PRCList links
; /* for linking jobs */
98 PRBool on_ioq
; /* job on ioq */
99 PRBool on_timerq
; /* job on timerq */
103 PRBool join_wait
; /* == PR_TRUE, when waiting to join */
104 PRCondVar
* cancel_cv
; /* for cancelling IO jobs */
105 PRBool cancel_io
; /* for cancelling IO jobs */
106 PRThreadPool
* tpool
; /* back pointer to thread pool */
109 PRInt16 io_poll_flags
;
111 PRIntervalTime timeout
; /* relative value */
112 PRIntervalTime absolute
;
114 NT_notifier nt_notifier
;
118 #define JOB_LINKS_PTR(_qp) ((PRJob*)((char*)(_qp) - offsetof(PRJob, links)))
120 #define WTHREAD_LINKS_PTR(_qp) \
121 ((wthread*)((char*)(_qp) - offsetof(wthread, links)))
123 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
125 #define JOIN_NOTIFY(_jobp) \
127 PR_Lock(_jobp->tpool->join_lock); \
128 _jobp->join_wait = PR_FALSE; \
129 PR_NotifyCondVar(_jobp->join_cv); \
130 PR_Unlock(_jobp->tpool->join_lock); \
133 #define CANCEL_IO_JOB(jobp) \
135 jobp->cancel_io = PR_FALSE; \
136 jobp->on_ioq = PR_FALSE; \
137 PR_REMOVE_AND_INIT_LINK(&jobp->links); \
139 PR_NotifyCondVar(jobp->cancel_cv); \
142 static void delete_job(PRJob
* jobp
);
143 static PRThreadPool
* alloc_threadpool(void);
144 static PRJob
* alloc_job(PRBool joinable
, PRThreadPool
* tp
);
145 static void notify_ioq(PRThreadPool
* tp
);
146 static void notify_timerq(PRThreadPool
* tp
);
149 * locks are acquired in the following order
151 * tp->ioq.lock,tp->timerq.lock
158 * worker thread function
160 static void wstart(void* arg
) {
161 PRThreadPool
* tp
= (PRThreadPool
*)arg
;
165 * execute jobs until shutdown
167 while (!tp
->shutdown
) {
171 DWORD unused
, shutdown
;
174 PR_Lock(tp
->jobq
.lock
);
176 PR_Unlock(tp
->jobq
.lock
);
177 rv
= GetQueuedCompletionStatus(tp
->jobq
.nt_completion_port
, &unused
,
178 &shutdown
, &olp
, INFINITE
);
184 jobp
= ((NT_notifier
*)olp
)->jobp
;
185 PR_Lock(tp
->jobq
.lock
);
188 PR_Unlock(tp
->jobq
.lock
);
191 PR_Lock(tp
->jobq
.lock
);
192 while (PR_CLIST_IS_EMPTY(&tp
->jobq
.list
) && (!tp
->shutdown
)) {
194 PR_WaitCondVar(tp
->jobq
.cv
, PR_INTERVAL_NO_TIMEOUT
);
198 PR_Unlock(tp
->jobq
.lock
);
201 head
= PR_LIST_HEAD(&tp
->jobq
.list
);
203 * remove job from queue
205 PR_REMOVE_AND_INIT_LINK(head
);
207 jobp
= JOB_LINKS_PTR(head
);
208 PR_Unlock(tp
->jobq
.lock
);
211 jobp
->job_func(jobp
->job_arg
);
212 if (!JOINABLE_JOB(jobp
)) {
218 PR_Lock(tp
->jobq
.lock
);
219 tp
->current_threads
--;
220 PR_Unlock(tp
->jobq
.lock
);
224 * add a job to the work queue
226 static void add_to_jobq(PRThreadPool
* tp
, PRJob
* jobp
) {
231 PR_Lock(tp
->jobq
.lock
);
233 PR_Unlock(tp
->jobq
.lock
);
235 * notify worker thread(s)
237 PostQueuedCompletionStatus(tp
->jobq
.nt_completion_port
, 0, FALSE
,
238 &jobp
->nt_notifier
.overlapped
);
240 PR_Lock(tp
->jobq
.lock
);
241 PR_APPEND_LINK(&jobp
->links
, &tp
->jobq
.list
);
243 if ((tp
->idle_threads
< tp
->jobq
.cnt
) &&
244 (tp
->current_threads
< tp
->max_threads
)) {
247 * increment thread count and unlock the jobq lock
249 tp
->current_threads
++;
250 PR_Unlock(tp
->jobq
.lock
);
251 /* create new worker thread */
252 wthrp
= PR_NEWZAP(wthread
);
255 PR_CreateThread(PR_USER_THREAD
, wstart
, tp
, PR_PRIORITY_NORMAL
,
256 PR_GLOBAL_THREAD
, PR_JOINABLE_THREAD
, tp
->stacksize
);
257 if (NULL
== wthrp
->thread
) {
258 PR_DELETE(wthrp
); /* this sets wthrp to NULL */
261 PR_Lock(tp
->jobq
.lock
);
263 tp
->current_threads
--;
265 PR_APPEND_LINK(&wthrp
->links
, &tp
->jobq
.wthreads
);
269 * wakeup a worker thread
271 PR_NotifyCondVar(tp
->jobq
.cv
);
272 PR_Unlock(tp
->jobq
.lock
);
277 * io worker thread function
279 static void io_wstart(void* arg
) {
280 PRThreadPool
* tp
= (PRThreadPool
*)arg
;
281 int pollfd_cnt
, pollfds_used
;
283 PRCList
*qp
, *nextqp
;
284 PRPollDesc
* pollfds
= NULL
;
285 PRJob
** polljobs
= NULL
;
291 * construct poll list
293 * for all fds, for which poll returns true, move the job to
294 * jobq and wakeup worker thread.
296 while (!tp
->shutdown
) {
299 pollfd_cnt
= tp
->ioq
.cnt
+ 10;
300 if (pollfd_cnt
> tp
->ioq
.npollfds
) {
302 * re-allocate pollfd array if the current one is not large
305 if (NULL
!= tp
->ioq
.pollfds
) {
306 PR_Free(tp
->ioq
.pollfds
);
308 tp
->ioq
.pollfds
= (PRPollDesc
*)PR_Malloc(
309 pollfd_cnt
* (sizeof(PRPollDesc
) + sizeof(PRJob
*)));
310 PR_ASSERT(NULL
!= tp
->ioq
.pollfds
);
314 pollfds
= tp
->ioq
.pollfds
;
315 tp
->ioq
.polljobs
= (PRJob
**)(&tp
->ioq
.pollfds
[pollfd_cnt
]);
317 * parallel array of jobs
319 polljobs
= tp
->ioq
.polljobs
;
320 tp
->ioq
.npollfds
= pollfd_cnt
;
325 * add the notify fd; used for unblocking io thread(s)
327 pollfds
[pollfds_used
].fd
= tp
->ioq
.notify_fd
;
328 pollfds
[pollfds_used
].in_flags
= PR_POLL_READ
;
329 pollfds
[pollfds_used
].out_flags
= 0;
330 polljobs
[pollfds_used
] = NULL
;
333 * fill in the pollfd array
335 PR_Lock(tp
->ioq
.lock
);
336 for (qp
= tp
->ioq
.list
.next
; qp
!= &tp
->ioq
.list
; qp
= nextqp
) {
338 jobp
= JOB_LINKS_PTR(qp
);
339 if (jobp
->cancel_io
) {
343 if (pollfds_used
== (pollfd_cnt
)) {
346 pollfds
[pollfds_used
].fd
= jobp
->iod
->socket
;
347 pollfds
[pollfds_used
].in_flags
= jobp
->io_poll_flags
;
348 pollfds
[pollfds_used
].out_flags
= 0;
349 polljobs
[pollfds_used
] = jobp
;
353 if (!PR_CLIST_IS_EMPTY(&tp
->ioq
.list
)) {
354 qp
= tp
->ioq
.list
.next
;
355 jobp
= JOB_LINKS_PTR(qp
);
356 if (PR_INTERVAL_NO_TIMEOUT
== jobp
->timeout
) {
357 poll_timeout
= PR_INTERVAL_NO_TIMEOUT
;
358 } else if (PR_INTERVAL_NO_WAIT
== jobp
->timeout
) {
359 poll_timeout
= PR_INTERVAL_NO_WAIT
;
361 poll_timeout
= jobp
->absolute
- PR_IntervalNow();
362 if (poll_timeout
<= 0) { /* already timed out */
363 poll_timeout
= PR_INTERVAL_NO_WAIT
;
367 poll_timeout
= PR_INTERVAL_NO_TIMEOUT
;
369 PR_Unlock(tp
->ioq
.lock
);
373 * should retry if more jobs have been added to the queue?
376 PR_ASSERT(pollfds_used
<= pollfd_cnt
);
377 rv
= PR_Poll(tp
->ioq
.pollfds
, pollfds_used
, poll_timeout
);
385 * at least one io event is set
387 PRStatus rval_status
;
390 PR_ASSERT(pollfds
[0].fd
== tp
->ioq
.notify_fd
);
392 * reset the pollable event, if notified
394 if (pollfds
[0].out_flags
& PR_POLL_READ
) {
395 rval_status
= PR_WaitForPollableEvent(tp
->ioq
.notify_fd
);
396 PR_ASSERT(PR_SUCCESS
== rval_status
);
399 for (index
= 1; index
< (pollfds_used
); index
++) {
400 PRInt16 events
= pollfds
[index
].in_flags
;
401 PRInt16 revents
= pollfds
[index
].out_flags
;
402 jobp
= polljobs
[index
];
404 if ((revents
& PR_POLL_NVAL
) || /* busted in all cases */
405 (revents
& PR_POLL_ERR
) ||
406 ((events
& PR_POLL_WRITE
) &&
407 (revents
& PR_POLL_HUP
))) { /* write op & hup */
408 PR_Lock(tp
->ioq
.lock
);
409 if (jobp
->cancel_io
) {
411 PR_Unlock(tp
->ioq
.lock
);
414 PR_REMOVE_AND_INIT_LINK(&jobp
->links
);
416 jobp
->on_ioq
= PR_FALSE
;
417 PR_Unlock(tp
->ioq
.lock
);
420 if (PR_POLL_NVAL
& revents
) {
421 jobp
->iod
->error
= PR_BAD_DESCRIPTOR_ERROR
;
422 } else if (PR_POLL_HUP
& revents
) {
423 jobp
->iod
->error
= PR_CONNECT_RESET_ERROR
;
425 jobp
->iod
->error
= PR_IO_ERROR
;
431 add_to_jobq(tp
, jobp
);
432 } else if (revents
) {
436 PR_Lock(tp
->ioq
.lock
);
437 if (jobp
->cancel_io
) {
439 PR_Unlock(tp
->ioq
.lock
);
442 PR_REMOVE_AND_INIT_LINK(&jobp
->links
);
444 jobp
->on_ioq
= PR_FALSE
;
445 PR_Unlock(tp
->ioq
.lock
);
447 if (jobp
->io_op
== JOB_IO_CONNECT
) {
448 if (PR_GetConnectStatus(&pollfds
[index
]) == PR_SUCCESS
) {
449 jobp
->iod
->error
= 0;
451 jobp
->iod
->error
= PR_GetError();
454 jobp
->iod
->error
= 0;
457 add_to_jobq(tp
, jobp
);
464 now
= PR_IntervalNow();
465 PR_Lock(tp
->ioq
.lock
);
466 for (qp
= tp
->ioq
.list
.next
; qp
!= &tp
->ioq
.list
; qp
= nextqp
) {
468 jobp
= JOB_LINKS_PTR(qp
);
469 if (jobp
->cancel_io
) {
473 if (PR_INTERVAL_NO_TIMEOUT
== jobp
->timeout
) {
476 if ((PR_INTERVAL_NO_WAIT
!= jobp
->timeout
) &&
477 ((PRInt32
)(jobp
->absolute
- now
) > 0)) {
480 PR_REMOVE_AND_INIT_LINK(&jobp
->links
);
482 jobp
->on_ioq
= PR_FALSE
;
483 jobp
->iod
->error
= PR_IO_TIMEOUT_ERROR
;
484 add_to_jobq(tp
, jobp
);
486 PR_Unlock(tp
->ioq
.lock
);
491 * timer worker thread function
493 static void timer_wstart(void* arg
) {
494 PRThreadPool
* tp
= (PRThreadPool
*)arg
;
496 PRIntervalTime timeout
;
500 * call PR_WaitCondVar with minimum value of all timeouts
502 while (!tp
->shutdown
) {
505 PR_Lock(tp
->timerq
.lock
);
506 if (PR_CLIST_IS_EMPTY(&tp
->timerq
.list
)) {
507 timeout
= PR_INTERVAL_NO_TIMEOUT
;
511 qp
= tp
->timerq
.list
.next
;
512 jobp
= JOB_LINKS_PTR(qp
);
514 timeout
= jobp
->absolute
- PR_IntervalNow();
516 timeout
= PR_INTERVAL_NO_WAIT
; /* already timed out */
519 if (PR_INTERVAL_NO_WAIT
!= timeout
) {
520 PR_WaitCondVar(tp
->timerq
.cv
, timeout
);
523 PR_Unlock(tp
->timerq
.lock
);
527 * move expired-timer jobs to jobq
529 now
= PR_IntervalNow();
530 while (!PR_CLIST_IS_EMPTY(&tp
->timerq
.list
)) {
531 qp
= tp
->timerq
.list
.next
;
532 jobp
= JOB_LINKS_PTR(qp
);
534 if ((PRInt32
)(jobp
->absolute
- now
) > 0) {
540 PR_REMOVE_AND_INIT_LINK(&jobp
->links
);
542 jobp
->on_timerq
= PR_FALSE
;
543 add_to_jobq(tp
, jobp
);
545 PR_Unlock(tp
->timerq
.lock
);
549 static void delete_threadpool(PRThreadPool
* tp
) {
551 if (NULL
!= tp
->shutdown_cv
) {
552 PR_DestroyCondVar(tp
->shutdown_cv
);
554 if (NULL
!= tp
->jobq
.cv
) {
555 PR_DestroyCondVar(tp
->jobq
.cv
);
557 if (NULL
!= tp
->jobq
.lock
) {
558 PR_DestroyLock(tp
->jobq
.lock
);
560 if (NULL
!= tp
->join_lock
) {
561 PR_DestroyLock(tp
->join_lock
);
564 if (NULL
!= tp
->jobq
.nt_completion_port
) {
565 CloseHandle(tp
->jobq
.nt_completion_port
);
569 if (NULL
!= tp
->timerq
.cv
) {
570 PR_DestroyCondVar(tp
->timerq
.cv
);
572 if (NULL
!= tp
->timerq
.lock
) {
573 PR_DestroyLock(tp
->timerq
.lock
);
576 if (NULL
!= tp
->ioq
.lock
) {
577 PR_DestroyLock(tp
->ioq
.lock
);
579 if (NULL
!= tp
->ioq
.pollfds
) {
580 PR_Free(tp
->ioq
.pollfds
);
582 if (NULL
!= tp
->ioq
.notify_fd
) {
583 PR_DestroyPollableEvent(tp
->ioq
.notify_fd
);
590 static PRThreadPool
* alloc_threadpool(void) {
593 tp
= (PRThreadPool
*)PR_CALLOC(sizeof(*tp
));
597 tp
->jobq
.lock
= PR_NewLock();
598 if (NULL
== tp
->jobq
.lock
) {
601 tp
->jobq
.cv
= PR_NewCondVar(tp
->jobq
.lock
);
602 if (NULL
== tp
->jobq
.cv
) {
605 tp
->join_lock
= PR_NewLock();
606 if (NULL
== tp
->join_lock
) {
610 tp
->jobq
.nt_completion_port
=
611 CreateIoCompletionPort(INVALID_HANDLE_VALUE
, NULL
, 0, 0);
612 if (NULL
== tp
->jobq
.nt_completion_port
) {
617 tp
->ioq
.lock
= PR_NewLock();
618 if (NULL
== tp
->ioq
.lock
) {
624 tp
->timerq
.lock
= PR_NewLock();
625 if (NULL
== tp
->timerq
.lock
) {
628 tp
->timerq
.cv
= PR_NewCondVar(tp
->timerq
.lock
);
629 if (NULL
== tp
->timerq
.cv
) {
633 tp
->shutdown_cv
= PR_NewCondVar(tp
->jobq
.lock
);
634 if (NULL
== tp
->shutdown_cv
) {
637 tp
->ioq
.notify_fd
= PR_NewPollableEvent();
638 if (NULL
== tp
->ioq
.notify_fd
) {
643 delete_threadpool(tp
);
644 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
648 /* Create thread pool */
649 PR_IMPLEMENT(PRThreadPool
*)
650 PR_CreateThreadPool(PRInt32 initial_threads
, PRInt32 max_threads
,
651 PRUint32 stacksize
) {
657 tp
= alloc_threadpool();
662 tp
->init_threads
= initial_threads
;
663 tp
->max_threads
= max_threads
;
664 tp
->stacksize
= stacksize
;
665 PR_INIT_CLIST(&tp
->jobq
.list
);
666 PR_INIT_CLIST(&tp
->ioq
.list
);
667 PR_INIT_CLIST(&tp
->timerq
.list
);
668 PR_INIT_CLIST(&tp
->jobq
.wthreads
);
669 PR_INIT_CLIST(&tp
->ioq
.wthreads
);
670 PR_INIT_CLIST(&tp
->timerq
.wthreads
);
671 tp
->shutdown
= PR_FALSE
;
673 PR_Lock(tp
->jobq
.lock
);
674 for (i
= 0; i
< initial_threads
; ++i
) {
675 thr
= PR_CreateThread(PR_USER_THREAD
, wstart
, tp
, PR_PRIORITY_NORMAL
,
676 PR_GLOBAL_THREAD
, PR_JOINABLE_THREAD
, stacksize
);
678 wthrp
= PR_NEWZAP(wthread
);
681 PR_APPEND_LINK(&wthrp
->links
, &tp
->jobq
.wthreads
);
683 tp
->current_threads
= initial_threads
;
685 thr
= PR_CreateThread(PR_USER_THREAD
, io_wstart
, tp
, PR_PRIORITY_NORMAL
,
686 PR_GLOBAL_THREAD
, PR_JOINABLE_THREAD
, stacksize
);
688 wthrp
= PR_NEWZAP(wthread
);
691 PR_APPEND_LINK(&wthrp
->links
, &tp
->ioq
.wthreads
);
693 thr
= PR_CreateThread(PR_USER_THREAD
, timer_wstart
, tp
, PR_PRIORITY_NORMAL
,
694 PR_GLOBAL_THREAD
, PR_JOINABLE_THREAD
, stacksize
);
696 wthrp
= PR_NEWZAP(wthread
);
699 PR_APPEND_LINK(&wthrp
->links
, &tp
->timerq
.wthreads
);
701 PR_Unlock(tp
->jobq
.lock
);
705 static void delete_job(PRJob
* jobp
) {
707 if (NULL
!= jobp
->join_cv
) {
708 PR_DestroyCondVar(jobp
->join_cv
);
709 jobp
->join_cv
= NULL
;
711 if (NULL
!= jobp
->cancel_cv
) {
712 PR_DestroyCondVar(jobp
->cancel_cv
);
713 jobp
->cancel_cv
= NULL
;
719 static PRJob
* alloc_job(PRBool joinable
, PRThreadPool
* tp
) {
722 jobp
= PR_NEWZAP(PRJob
);
727 jobp
->join_cv
= PR_NewCondVar(tp
->join_lock
);
728 jobp
->join_wait
= PR_TRUE
;
729 if (NULL
== jobp
->join_cv
) {
733 jobp
->join_cv
= NULL
;
736 jobp
->nt_notifier
.jobp
= jobp
;
741 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
747 PR_QueueJob(PRThreadPool
* tpool
, PRJobFn fn
, void* arg
, PRBool joinable
) {
750 jobp
= alloc_job(joinable
, tpool
);
759 add_to_jobq(tpool
, jobp
);
763 /* queue a job, when a socket is readable or writeable */
764 static PRJob
* queue_io_job(PRThreadPool
* tpool
, PRJobIoDesc
* iod
, PRJobFn fn
,
765 void* arg
, PRBool joinable
, io_op_type op
) {
769 jobp
= alloc_job(joinable
, tpool
);
775 * Add a new job to io_jobq
776 * wakeup io worker thread
783 if (JOB_IO_READ
== op
) {
784 jobp
->io_op
= JOB_IO_READ
;
785 jobp
->io_poll_flags
= PR_POLL_READ
;
786 } else if (JOB_IO_WRITE
== op
) {
787 jobp
->io_op
= JOB_IO_WRITE
;
788 jobp
->io_poll_flags
= PR_POLL_WRITE
;
789 } else if (JOB_IO_ACCEPT
== op
) {
790 jobp
->io_op
= JOB_IO_ACCEPT
;
791 jobp
->io_poll_flags
= PR_POLL_READ
;
792 } else if (JOB_IO_CONNECT
== op
) {
793 jobp
->io_op
= JOB_IO_CONNECT
;
794 jobp
->io_poll_flags
= PR_POLL_WRITE
| PR_POLL_EXCEPT
;
797 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
801 jobp
->timeout
= iod
->timeout
;
802 if ((PR_INTERVAL_NO_TIMEOUT
== iod
->timeout
) ||
803 (PR_INTERVAL_NO_WAIT
== iod
->timeout
)) {
804 jobp
->absolute
= iod
->timeout
;
806 now
= PR_IntervalNow();
807 jobp
->absolute
= now
+ iod
->timeout
;
810 PR_Lock(tpool
->ioq
.lock
);
812 if (PR_CLIST_IS_EMPTY(&tpool
->ioq
.list
) ||
813 (PR_INTERVAL_NO_TIMEOUT
== iod
->timeout
)) {
814 PR_APPEND_LINK(&jobp
->links
, &tpool
->ioq
.list
);
815 } else if (PR_INTERVAL_NO_WAIT
== iod
->timeout
) {
816 PR_INSERT_LINK(&jobp
->links
, &tpool
->ioq
.list
);
821 * insert into the timeout-sorted ioq
823 for (qp
= tpool
->ioq
.list
.prev
; qp
!= &tpool
->ioq
.list
; qp
= qp
->prev
) {
824 tmp_jobp
= JOB_LINKS_PTR(qp
);
825 if ((PRInt32
)(jobp
->absolute
- tmp_jobp
->absolute
) >= 0) {
829 PR_INSERT_AFTER(&jobp
->links
, qp
);
832 jobp
->on_ioq
= PR_TRUE
;
835 * notify io worker thread(s)
837 PR_Unlock(tpool
->ioq
.lock
);
842 /* queue a job, when a socket is readable */
844 PR_QueueJob_Read(PRThreadPool
* tpool
, PRJobIoDesc
* iod
, PRJobFn fn
, void* arg
,
846 return (queue_io_job(tpool
, iod
, fn
, arg
, joinable
, JOB_IO_READ
));
849 /* queue a job, when a socket is writeable */
851 PR_QueueJob_Write(PRThreadPool
* tpool
, PRJobIoDesc
* iod
, PRJobFn fn
, void* arg
,
853 return (queue_io_job(tpool
, iod
, fn
, arg
, joinable
, JOB_IO_WRITE
));
856 /* queue a job, when a socket has a pending connection */
858 PR_QueueJob_Accept(PRThreadPool
* tpool
, PRJobIoDesc
* iod
, PRJobFn fn
, void* arg
,
860 return (queue_io_job(tpool
, iod
, fn
, arg
, joinable
, JOB_IO_ACCEPT
));
863 /* queue a job, when a socket can be connected */
865 PR_QueueJob_Connect(PRThreadPool
* tpool
, PRJobIoDesc
* iod
,
866 const PRNetAddr
* addr
, PRJobFn fn
, void* arg
,
871 rv
= PR_Connect(iod
->socket
, addr
, PR_INTERVAL_NO_WAIT
);
872 if ((rv
== PR_FAILURE
) && ((err
= PR_GetError()) == PR_IN_PROGRESS_ERROR
)) {
873 /* connection pending */
874 return (queue_io_job(tpool
, iod
, fn
, arg
, joinable
, JOB_IO_CONNECT
));
877 * connection succeeded or failed; add to jobq right away
879 if (rv
== PR_FAILURE
) {
884 return (PR_QueueJob(tpool
, fn
, arg
, joinable
));
887 /* queue a job, when a timer expires */
889 PR_QueueJob_Timer(PRThreadPool
* tpool
, PRIntervalTime timeout
, PRJobFn fn
,
890 void* arg
, PRBool joinable
) {
894 if (PR_INTERVAL_NO_TIMEOUT
== timeout
) {
895 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
898 if (PR_INTERVAL_NO_WAIT
== timeout
) {
900 * no waiting; add to jobq right away
902 return (PR_QueueJob(tpool
, fn
, arg
, joinable
));
904 jobp
= alloc_job(joinable
, tpool
);
910 * Add a new job to timer_jobq
911 * wakeup timer worker thread
917 jobp
->timeout
= timeout
;
919 now
= PR_IntervalNow();
920 jobp
->absolute
= now
+ timeout
;
922 PR_Lock(tpool
->timerq
.lock
);
923 jobp
->on_timerq
= PR_TRUE
;
924 if (PR_CLIST_IS_EMPTY(&tpool
->timerq
.list
)) {
925 PR_APPEND_LINK(&jobp
->links
, &tpool
->timerq
.list
);
930 * insert into the sorted timer jobq
932 for (qp
= tpool
->timerq
.list
.prev
; qp
!= &tpool
->timerq
.list
;
934 tmp_jobp
= JOB_LINKS_PTR(qp
);
935 if ((PRInt32
)(jobp
->absolute
- tmp_jobp
->absolute
) >= 0) {
939 PR_INSERT_AFTER(&jobp
->links
, qp
);
943 * notify timer worker thread(s)
945 notify_timerq(tpool
);
946 PR_Unlock(tpool
->timerq
.lock
);
950 static void notify_timerq(PRThreadPool
* tp
) {
952 * wakeup the timer thread(s)
954 PR_NotifyCondVar(tp
->timerq
.cv
);
957 static void notify_ioq(PRThreadPool
* tp
) {
958 PRStatus rval_status
;
961 * wakeup the io thread(s)
963 rval_status
= PR_SetPollableEvent(tp
->ioq
.notify_fd
);
964 PR_ASSERT(PR_SUCCESS
== rval_status
);
970 * XXXX: is this needed? likely to be removed
972 PR_IMPLEMENT(PRStatus
)
973 PR_CancelJob(PRJob
* jobp
) {
974 PRStatus rval
= PR_FAILURE
;
977 if (jobp
->on_timerq
) {
979 * now, check again while holding the timerq lock
982 PR_Lock(tp
->timerq
.lock
);
983 if (jobp
->on_timerq
) {
984 jobp
->on_timerq
= PR_FALSE
;
985 PR_REMOVE_AND_INIT_LINK(&jobp
->links
);
987 PR_Unlock(tp
->timerq
.lock
);
988 if (!JOINABLE_JOB(jobp
)) {
995 PR_Unlock(tp
->timerq
.lock
);
997 } else if (jobp
->on_ioq
) {
999 * now, check again while holding the ioq lock
1002 PR_Lock(tp
->ioq
.lock
);
1004 jobp
->cancel_cv
= PR_NewCondVar(tp
->ioq
.lock
);
1005 if (NULL
== jobp
->cancel_cv
) {
1006 PR_Unlock(tp
->ioq
.lock
);
1007 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR
, 0);
1011 * mark job 'cancelled' and notify io thread(s)
1013 * this assumes there is only one io thread; when there
1014 * are multiple threads, the io thread processing this job
1017 jobp
->cancel_io
= PR_TRUE
;
1018 PR_Unlock(tp
->ioq
.lock
); /* release, reacquire ioq lock */
1020 PR_Lock(tp
->ioq
.lock
);
1021 while (jobp
->cancel_io
) {
1022 PR_WaitCondVar(jobp
->cancel_cv
, PR_INTERVAL_NO_TIMEOUT
);
1024 PR_Unlock(tp
->ioq
.lock
);
1025 PR_ASSERT(!jobp
->on_ioq
);
1026 if (!JOINABLE_JOB(jobp
)) {
1033 PR_Unlock(tp
->ioq
.lock
);
1036 if (PR_FAILURE
== rval
) {
1037 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
1042 /* join a job, wait until completion */
1043 PR_IMPLEMENT(PRStatus
)
1044 PR_JoinJob(PRJob
* jobp
) {
1045 if (!JOINABLE_JOB(jobp
)) {
1046 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1049 PR_Lock(jobp
->tpool
->join_lock
);
1050 while (jobp
->join_wait
) {
1051 PR_WaitCondVar(jobp
->join_cv
, PR_INTERVAL_NO_TIMEOUT
);
1053 PR_Unlock(jobp
->tpool
->join_lock
);
1058 /* shutdown threadpool */
1059 PR_IMPLEMENT(PRStatus
)
1060 PR_ShutdownThreadPool(PRThreadPool
* tpool
) {
1061 PRStatus rval
= PR_SUCCESS
;
1063 PR_Lock(tpool
->jobq
.lock
);
1064 tpool
->shutdown
= PR_TRUE
;
1065 PR_NotifyAllCondVar(tpool
->shutdown_cv
);
1066 PR_Unlock(tpool
->jobq
.lock
);
1073 * wait for termination of worker threads
1074 * reclaim threadpool resources
1076 PR_IMPLEMENT(PRStatus
)
1077 PR_JoinThreadPool(PRThreadPool
* tpool
) {
1078 PRStatus rval
= PR_SUCCESS
;
1080 PRStatus rval_status
;
1082 PR_Lock(tpool
->jobq
.lock
);
1083 while (!tpool
->shutdown
) {
1084 PR_WaitCondVar(tpool
->shutdown_cv
, PR_INTERVAL_NO_TIMEOUT
);
1088 * wakeup worker threads
1092 * post shutdown notification for all threads
1096 for (i
= 0; i
< tpool
->current_threads
; i
++) {
1097 PostQueuedCompletionStatus(tpool
->jobq
.nt_completion_port
, 0, TRUE
, NULL
);
1101 PR_NotifyAllCondVar(tpool
->jobq
.cv
);
1105 * wakeup io thread(s)
1110 * wakeup timer thread(s)
1112 PR_Lock(tpool
->timerq
.lock
);
1113 notify_timerq(tpool
);
1114 PR_Unlock(tpool
->timerq
.lock
);
1116 while (!PR_CLIST_IS_EMPTY(&tpool
->jobq
.wthreads
)) {
1119 head
= PR_LIST_HEAD(&tpool
->jobq
.wthreads
);
1120 PR_REMOVE_AND_INIT_LINK(head
);
1121 PR_Unlock(tpool
->jobq
.lock
);
1122 wthrp
= WTHREAD_LINKS_PTR(head
);
1123 rval_status
= PR_JoinThread(wthrp
->thread
);
1124 PR_ASSERT(PR_SUCCESS
== rval_status
);
1126 PR_Lock(tpool
->jobq
.lock
);
1128 PR_Unlock(tpool
->jobq
.lock
);
1129 while (!PR_CLIST_IS_EMPTY(&tpool
->ioq
.wthreads
)) {
1132 head
= PR_LIST_HEAD(&tpool
->ioq
.wthreads
);
1133 PR_REMOVE_AND_INIT_LINK(head
);
1134 wthrp
= WTHREAD_LINKS_PTR(head
);
1135 rval_status
= PR_JoinThread(wthrp
->thread
);
1136 PR_ASSERT(PR_SUCCESS
== rval_status
);
1140 while (!PR_CLIST_IS_EMPTY(&tpool
->timerq
.wthreads
)) {
1143 head
= PR_LIST_HEAD(&tpool
->timerq
.wthreads
);
1144 PR_REMOVE_AND_INIT_LINK(head
);
1145 wthrp
= WTHREAD_LINKS_PTR(head
);
1146 rval_status
= PR_JoinThread(wthrp
->thread
);
1147 PR_ASSERT(PR_SUCCESS
== rval_status
);
1152 * Delete queued jobs
1154 while (!PR_CLIST_IS_EMPTY(&tpool
->jobq
.list
)) {
1157 head
= PR_LIST_HEAD(&tpool
->jobq
.list
);
1158 PR_REMOVE_AND_INIT_LINK(head
);
1159 jobp
= JOB_LINKS_PTR(head
);
1164 /* delete io jobs */
1165 while (!PR_CLIST_IS_EMPTY(&tpool
->ioq
.list
)) {
1168 head
= PR_LIST_HEAD(&tpool
->ioq
.list
);
1169 PR_REMOVE_AND_INIT_LINK(head
);
1171 jobp
= JOB_LINKS_PTR(head
);
1175 /* delete timer jobs */
1176 while (!PR_CLIST_IS_EMPTY(&tpool
->timerq
.list
)) {
1179 head
= PR_LIST_HEAD(&tpool
->timerq
.list
);
1180 PR_REMOVE_AND_INIT_LINK(head
);
1181 tpool
->timerq
.cnt
--;
1182 jobp
= JOB_LINKS_PTR(head
);
1186 PR_ASSERT(0 == tpool
->jobq
.cnt
);
1187 PR_ASSERT(0 == tpool
->ioq
.cnt
);
1188 PR_ASSERT(0 == tpool
->timerq
.cnt
);
1190 delete_threadpool(tpool
);