4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #define NONAMELESSUNION
28 #define WIN32_NO_STATUS
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
39 * Old thread pooling API
44 PRTL_WORK_ITEM_ROUTINE function
;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
56 RTL_CRITICAL_SECTION threadpool_compl_cs
;
60 NULL
, /* compl_port */
61 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
66 0, 0, &old_threadpool
.threadpool_compl_cs
,
67 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
68 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
74 struct timer_queue
*q
;
76 ULONG runcount
; /* number of callbacks pending execution */
77 RTL_WAITORTIMERCALLBACKFUNC callback
;
82 BOOL destroy
; /* timer should be deleted; once set, never unset */
83 HANDLE event
; /* removal event */
89 RTL_CRITICAL_SECTION cs
;
90 struct list timers
; /* sorted by expiration time */
91 BOOL quit
; /* queue should be deleted; once set, never unset */
97 * Object-oriented thread pooling API
100 #define THREADPOOL_WORKER_TIMEOUT 5000
101 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
103 /* internal threadpool representation */
110 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
111 struct list pools
[3];
112 RTL_CONDITION_VARIABLE update_event
;
113 /* information about worker threads, locked via .cs */
117 int num_busy_workers
;
119 TP_POOL_STACK_INFORMATION stack_info
;
122 enum threadpool_objtype
124 TP_OBJECT_TYPE_SIMPLE
,
126 TP_OBJECT_TYPE_TIMER
,
133 IO_STATUS_BLOCK iosb
;
137 /* internal threadpool object representation */
138 struct threadpool_object
140 void *win32_callback
; /* leave space for kernelbase to store win32 callback */
143 /* read-only information */
144 enum threadpool_objtype type
;
145 struct threadpool
*pool
;
146 struct threadpool_group
*group
;
148 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
149 PTP_SIMPLE_CALLBACK finalization_callback
;
152 TP_CALLBACK_PRIORITY priority
;
153 /* information about the group, locked via .group->cs */
154 struct list group_entry
;
155 BOOL is_group_member
;
156 /* information about the pool, locked via .pool->cs */
157 struct list pool_entry
;
158 RTL_CONDITION_VARIABLE finished_event
;
159 RTL_CONDITION_VARIABLE group_finished_event
;
160 HANDLE completed_event
;
161 LONG num_pending_callbacks
;
162 LONG num_running_callbacks
;
163 LONG num_associated_callbacks
;
164 /* arguments for callback */
169 PTP_SIMPLE_CALLBACK callback
;
173 PTP_WORK_CALLBACK callback
;
177 PTP_TIMER_CALLBACK callback
;
178 /* information about the timer, locked via timerqueue.cs */
179 BOOL timer_initialized
;
181 struct list timer_entry
;
189 PTP_WAIT_CALLBACK callback
;
191 /* information about the wait object, locked via waitqueue.cs */
192 struct waitqueue_bucket
*bucket
;
194 struct list wait_entry
;
198 RTL_WAITORTIMERCALLBACKFUNC rtl_callback
;
202 PTP_IO_CALLBACK callback
;
203 /* locked via .pool->cs */
204 unsigned int pending_count
, completion_count
, completion_max
;
205 struct io_completion
*completions
;
210 /* internal threadpool instance representation */
211 struct threadpool_instance
213 struct threadpool_object
*object
;
219 CRITICAL_SECTION
*critical_section
;
222 LONG semaphore_count
;
228 /* internal threadpool group representation */
229 struct threadpool_group
234 /* list of group members, locked via .cs */
238 /* global timerqueue object */
239 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
246 struct list pending_timers
;
247 RTL_CONDITION_VARIABLE update_event
;
251 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
253 FALSE
, /* thread_running */
254 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
255 RTL_CONDITION_VARIABLE_INIT
/* update_event */
258 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
260 0, 0, &timerqueue
.cs
,
261 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
262 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
265 /* global waitqueue object */
266 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
276 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
278 LIST_INIT( waitqueue
.buckets
) /* buckets */
281 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
284 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
285 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
288 struct waitqueue_bucket
290 struct list bucket_entry
;
292 struct list reserved
;
298 /* global I/O completion queue object */
299 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
;
307 RTL_CONDITION_VARIABLE update_event
;
311 .cs
= { &ioqueue_debug
, -1, 0, 0, 0, 0 },
314 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
=
317 { &ioqueue_debug
.ProcessLocksList
, &ioqueue_debug
.ProcessLocksList
},
318 0, 0, { (DWORD_PTR
)(__FILE__
": ioqueue.cs") }
321 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
323 return (struct threadpool
*)pool
;
326 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
328 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
329 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
333 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
335 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
336 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
340 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
342 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
343 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
347 static inline struct threadpool_object
*impl_from_TP_IO( TP_IO
*io
)
349 struct threadpool_object
*object
= (struct threadpool_object
*)io
;
350 assert( object
->type
== TP_OBJECT_TYPE_IO
);
354 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
356 return (struct threadpool_group
*)group
;
359 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
361 return (struct threadpool_instance
*)instance
;
364 static void CALLBACK
threadpool_worker_proc( void *param
);
365 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
366 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
);
367 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
368 static BOOL
tp_object_release( struct threadpool_object
*object
);
369 static struct threadpool
*default_threadpool
= NULL
;
371 static BOOL
array_reserve(void **elements
, unsigned int *capacity
, unsigned int count
, unsigned int size
)
373 unsigned int new_capacity
, max_capacity
;
376 if (count
<= *capacity
)
379 max_capacity
= ~(SIZE_T
)0 / size
;
380 if (count
> max_capacity
)
383 new_capacity
= max(4, *capacity
);
384 while (new_capacity
< count
&& new_capacity
<= max_capacity
/ 2)
386 if (new_capacity
< count
)
387 new_capacity
= max_capacity
;
389 if (!(new_elements
= RtlReAllocateHeap( GetProcessHeap(), 0, *elements
, new_capacity
* size
)))
392 *elements
= new_elements
;
393 *capacity
= new_capacity
;
398 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
400 struct rtl_work_item
*item
= userdata
;
402 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
403 item
->function( item
->context
);
405 RtlFreeHeap( GetProcessHeap(), 0, item
);
408 /***********************************************************************
409 * RtlQueueWorkItem (NTDLL.@)
411 * Queues a work item into a thread in the thread pool.
414 * function [I] Work function to execute.
415 * context [I] Context to pass to the work function when it is executed.
416 * flags [I] Flags. See notes.
419 * Success: STATUS_SUCCESS.
420 * Failure: Any NTSTATUS code.
423 * Flags can be one or more of the following:
424 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
425 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
426 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
427 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
428 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
430 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
432 TP_CALLBACK_ENVIRON environment
;
433 struct rtl_work_item
*item
;
436 TRACE( "%p %p %u\n", function
, context
, flags
);
438 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
440 return STATUS_NO_MEMORY
;
442 memset( &environment
, 0, sizeof(environment
) );
443 environment
.Version
= 1;
444 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
445 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
447 item
->function
= function
;
448 item
->context
= context
;
450 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
451 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
455 /***********************************************************************
456 * iocp_poller - get completion events and run callbacks
458 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
464 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
466 IO_STATUS_BLOCK iosb
;
467 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
470 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
474 DWORD transferred
= 0;
477 if (iosb
.u
.Status
== STATUS_SUCCESS
)
478 transferred
= iosb
.Information
;
480 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
482 callback( err
, transferred
, overlapped
);
488 /***********************************************************************
489 * RtlSetIoCompletionCallback (NTDLL.@)
491 * Binds a handle to a thread pool's completion port, and possibly
492 * starts a non-I/O thread to monitor this port and call functions back.
495 * FileHandle [I] Handle to bind to a completion port.
496 * Function [I] Callback function to call on I/O completions.
497 * Flags [I] Not used.
500 * Success: STATUS_SUCCESS.
501 * Failure: Any NTSTATUS code.
504 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
506 IO_STATUS_BLOCK iosb
;
507 FILE_COMPLETION_INFORMATION info
;
509 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
511 if (!old_threadpool
.compl_port
)
513 NTSTATUS res
= STATUS_SUCCESS
;
515 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
516 if (!old_threadpool
.compl_port
)
520 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
523 /* FIXME native can start additional threads in case of e.g. hung callback function. */
524 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
526 old_threadpool
.compl_port
= cport
;
531 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
535 info
.CompletionPort
= old_threadpool
.compl_port
;
536 info
.CompletionKey
= (ULONG_PTR
)Function
;
538 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
541 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
543 if (timeout
== INFINITE
) return NULL
;
544 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
549 /************************** Timer Queue Impl **************************/
551 static void queue_remove_timer(struct queue_timer
*t
)
553 /* We MUST hold the queue cs while calling this function. This ensures
554 that we cannot queue another callback for this timer. The runcount
555 being zero makes sure we don't have any already queued. */
556 struct timer_queue
*q
= t
->q
;
558 assert(t
->runcount
== 0);
561 list_remove(&t
->entry
);
563 NtSetEvent(t
->event
, NULL
);
564 RtlFreeHeap(GetProcessHeap(), 0, t
);
566 if (q
->quit
&& list_empty(&q
->timers
))
567 NtSetEvent(q
->event
, NULL
);
570 static void timer_cleanup_callback(struct queue_timer
*t
)
572 struct timer_queue
*q
= t
->q
;
573 RtlEnterCriticalSection(&q
->cs
);
575 assert(0 < t
->runcount
);
578 if (t
->destroy
&& t
->runcount
== 0)
579 queue_remove_timer(t
);
581 RtlLeaveCriticalSection(&q
->cs
);
584 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
586 struct queue_timer
*t
= p
;
587 t
->callback(t
->param
, TRUE
);
588 timer_cleanup_callback(t
);
592 static inline ULONGLONG
queue_current_time(void)
594 LARGE_INTEGER now
, freq
;
595 NtQueryPerformanceCounter(&now
, &freq
);
596 return now
.QuadPart
* 1000 / freq
.QuadPart
;
599 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
602 /* We MUST hold the queue cs while calling this function. */
603 struct timer_queue
*q
= t
->q
;
604 struct list
*ptr
= &q
->timers
;
606 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
608 if (time
!= EXPIRE_NEVER
)
609 LIST_FOR_EACH(ptr
, &q
->timers
)
611 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
612 if (time
< cur
->expire
)
615 list_add_before(ptr
, &t
->entry
);
619 /* If we insert at the head of the list, we need to expire sooner
621 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
622 NtSetEvent(q
->event
, NULL
);
625 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
628 /* We MUST hold the queue cs while calling this function. */
629 list_remove(&t
->entry
);
630 queue_add_timer(t
, time
, set_event
);
633 static void queue_timer_expire(struct timer_queue
*q
)
635 struct queue_timer
*t
= NULL
;
637 RtlEnterCriticalSection(&q
->cs
);
638 if (list_head(&q
->timers
))
641 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
642 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
647 next
= t
->expire
+ t
->period
;
648 /* avoid trigger cascade if overloaded / hibernated */
650 next
= now
+ t
->period
;
654 queue_move_timer(t
, next
, FALSE
);
659 RtlLeaveCriticalSection(&q
->cs
);
663 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
664 timer_callback_wrapper(t
);
669 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
670 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
671 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
672 if (status
!= STATUS_SUCCESS
)
673 timer_cleanup_callback(t
);
678 static ULONG
queue_get_timeout(struct timer_queue
*q
)
680 struct queue_timer
*t
;
681 ULONG timeout
= INFINITE
;
683 RtlEnterCriticalSection(&q
->cs
);
684 if (list_head(&q
->timers
))
686 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
687 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
689 if (t
->expire
!= EXPIRE_NEVER
)
691 ULONGLONG time
= queue_current_time();
692 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
695 RtlLeaveCriticalSection(&q
->cs
);
700 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
702 struct timer_queue
*q
= p
;
705 timeout_ms
= INFINITE
;
708 LARGE_INTEGER timeout
;
712 status
= NtWaitForSingleObject(
713 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
715 if (status
== STATUS_WAIT_0
)
717 /* There are two possible ways to trigger the event. Either
718 we are quitting and the last timer got removed, or a new
719 timer got put at the head of the list so we need to adjust
721 RtlEnterCriticalSection(&q
->cs
);
722 if (q
->quit
&& list_empty(&q
->timers
))
724 RtlLeaveCriticalSection(&q
->cs
);
726 else if (status
== STATUS_TIMEOUT
)
727 queue_timer_expire(q
);
732 timeout_ms
= queue_get_timeout(q
);
736 RtlDeleteCriticalSection(&q
->cs
);
738 RtlFreeHeap(GetProcessHeap(), 0, q
);
739 RtlExitUserThread( 0 );
742 static void queue_destroy_timer(struct queue_timer
*t
)
744 /* We MUST hold the queue cs while calling this function. */
746 if (t
->runcount
== 0)
747 /* Ensure a timer is promptly removed. If callbacks are pending,
748 it will be removed after the last one finishes by the callback
750 queue_remove_timer(t
);
752 /* Make sure no destroyed timer masks an active timer at the head
753 of the sorted list. */
754 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
757 /***********************************************************************
758 * RtlCreateTimerQueue (NTDLL.@)
760 * Creates a timer queue object and returns a handle to it.
763 * NewTimerQueue [O] The newly created queue.
766 * Success: STATUS_SUCCESS.
767 * Failure: Any NTSTATUS code.
769 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
772 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
774 return STATUS_NO_MEMORY
;
776 RtlInitializeCriticalSection(&q
->cs
);
777 list_init(&q
->timers
);
779 q
->magic
= TIMER_QUEUE_MAGIC
;
780 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
781 if (status
!= STATUS_SUCCESS
)
783 RtlFreeHeap(GetProcessHeap(), 0, q
);
786 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
787 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
788 if (status
!= STATUS_SUCCESS
)
791 RtlFreeHeap(GetProcessHeap(), 0, q
);
796 return STATUS_SUCCESS
;
799 /***********************************************************************
800 * RtlDeleteTimerQueueEx (NTDLL.@)
802 * Deletes a timer queue object.
805 * TimerQueue [I] The timer queue to destroy.
806 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
807 * wait until all timers are finished firing before
808 * returning. Otherwise, return immediately and set the
809 * event when all timers are done.
812 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
813 * Failure: Any NTSTATUS code.
815 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
817 struct timer_queue
*q
= TimerQueue
;
818 struct queue_timer
*t
, *temp
;
822 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
823 return STATUS_INVALID_HANDLE
;
827 RtlEnterCriticalSection(&q
->cs
);
829 if (list_head(&q
->timers
))
830 /* When the last timer is removed, it will signal the timer thread to
832 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
833 queue_destroy_timer(t
);
835 /* However if we have none, we must do it ourselves. */
836 NtSetEvent(q
->event
, NULL
);
837 RtlLeaveCriticalSection(&q
->cs
);
839 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
841 NtWaitForSingleObject(thread
, FALSE
, NULL
);
842 status
= STATUS_SUCCESS
;
848 FIXME("asynchronous return on completion event unimplemented\n");
849 NtWaitForSingleObject(thread
, FALSE
, NULL
);
850 NtSetEvent(CompletionEvent
, NULL
);
852 status
= STATUS_PENDING
;
859 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
861 static struct timer_queue
*default_timer_queue
;
867 if (!default_timer_queue
)
870 NTSTATUS status
= RtlCreateTimerQueue(&q
);
871 if (status
== STATUS_SUCCESS
)
873 PVOID p
= InterlockedCompareExchangePointer( (void **) &default_timer_queue
, q
, NULL
);
875 /* Got beat to the punch. */
876 RtlDeleteTimerQueueEx(q
, NULL
);
879 return default_timer_queue
;
883 /***********************************************************************
884 * RtlCreateTimer (NTDLL.@)
886 * Creates a new timer associated with the given queue.
889 * NewTimer [O] The newly created timer.
890 * TimerQueue [I] The queue to hold the timer.
891 * Callback [I] The callback to fire.
892 * Parameter [I] The argument for the callback.
893 * DueTime [I] The delay, in milliseconds, before first firing the
895 * Period [I] The period, in milliseconds, at which to fire the timer
896 * after the first callback. If zero, the timer will only
897 * fire once. It still needs to be deleted with
899 * Flags [I] Flags controlling the execution of the callback. In
900 * addition to the WT_* thread pool flags (see
901 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
902 * WT_EXECUTEONLYONCE are supported.
905 * Success: STATUS_SUCCESS.
906 * Failure: Any NTSTATUS code.
908 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
909 RTL_WAITORTIMERCALLBACKFUNC Callback
,
910 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
914 struct queue_timer
*t
;
915 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
917 if (!q
) return STATUS_NO_MEMORY
;
918 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
920 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
922 return STATUS_NO_MEMORY
;
926 t
->callback
= Callback
;
927 t
->param
= Parameter
;
933 status
= STATUS_SUCCESS
;
934 RtlEnterCriticalSection(&q
->cs
);
936 status
= STATUS_INVALID_HANDLE
;
938 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
939 RtlLeaveCriticalSection(&q
->cs
);
941 if (status
== STATUS_SUCCESS
)
944 RtlFreeHeap(GetProcessHeap(), 0, t
);
949 /***********************************************************************
950 * RtlUpdateTimer (NTDLL.@)
952 * Changes the time at which a timer expires.
955 * TimerQueue [I] The queue that holds the timer.
956 * Timer [I] The timer to update.
957 * DueTime [I] The delay, in milliseconds, before next firing the timer.
958 * Period [I] The period, in milliseconds, at which to fire the timer
959 * after the first callback. If zero, the timer will not
960 * refire once. It still needs to be deleted with
964 * Success: STATUS_SUCCESS.
965 * Failure: Any NTSTATUS code.
967 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
968 DWORD DueTime
, DWORD Period
)
970 struct queue_timer
*t
= Timer
;
971 struct timer_queue
*q
= t
->q
;
973 RtlEnterCriticalSection(&q
->cs
);
974 /* Can't change a timer if it was once-only or destroyed. */
975 if (t
->expire
!= EXPIRE_NEVER
)
978 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
980 RtlLeaveCriticalSection(&q
->cs
);
982 return STATUS_SUCCESS
;
985 /***********************************************************************
986 * RtlDeleteTimer (NTDLL.@)
988 * Cancels a timer-queue timer.
991 * TimerQueue [I] The queue that holds the timer.
992 * Timer [I] The timer to update.
993 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
994 * wait until the timer is finished firing all pending
995 * callbacks before returning. Otherwise, return
996 * immediately and set the timer is done.
999 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1000 or if the completion event is NULL.
1001 * Failure: Any NTSTATUS code.
1003 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1004 HANDLE CompletionEvent
)
1006 struct queue_timer
*t
= Timer
;
1007 struct timer_queue
*q
;
1008 NTSTATUS status
= STATUS_PENDING
;
1009 HANDLE event
= NULL
;
1012 return STATUS_INVALID_PARAMETER_1
;
1014 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1016 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1017 if (status
== STATUS_SUCCESS
)
1018 status
= STATUS_PENDING
;
1020 else if (CompletionEvent
)
1021 event
= CompletionEvent
;
1023 RtlEnterCriticalSection(&q
->cs
);
1025 if (t
->runcount
== 0 && event
)
1026 status
= STATUS_SUCCESS
;
1027 queue_destroy_timer(t
);
1028 RtlLeaveCriticalSection(&q
->cs
);
1030 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1032 if (status
== STATUS_PENDING
)
1034 NtWaitForSingleObject(event
, FALSE
, NULL
);
1035 status
= STATUS_SUCCESS
;
1043 /***********************************************************************
1044 * timerqueue_thread_proc (internal)
1046 static void CALLBACK
timerqueue_thread_proc( void *param
)
1048 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1049 struct threadpool_object
*other_timer
;
1050 LARGE_INTEGER now
, timeout
;
1053 TRACE( "starting timer queue thread\n" );
1055 RtlEnterCriticalSection( &timerqueue
.cs
);
1058 NtQuerySystemTime( &now
);
1060 /* Check for expired timers. */
1061 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1063 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1064 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1065 assert( timer
->u
.timer
.timer_pending
);
1066 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1069 /* Queue a new callback in one of the worker threads. */
1070 list_remove( &timer
->u
.timer
.timer_entry
);
1071 timer
->u
.timer
.timer_pending
= FALSE
;
1072 tp_object_submit( timer
, FALSE
);
1074 /* Insert the timer back into the queue, except it's marked for shutdown. */
1075 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1077 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1078 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1079 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1081 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1082 struct threadpool_object
, u
.timer
.timer_entry
)
1084 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1085 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1088 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1089 timer
->u
.timer
.timer_pending
= TRUE
;
1093 timeout_lower
= timeout_upper
= MAXLONGLONG
;
1095 /* Determine next timeout and use the window length to optimize wakeup times. */
1096 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1097 struct threadpool_object
, u
.timer
.timer_entry
)
1099 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1100 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1103 timeout_lower
= other_timer
->u
.timer
.timeout
;
1104 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1105 if (new_timeout
< timeout_upper
)
1106 timeout_upper
= new_timeout
;
1109 /* Wait for timer update events or until the next timer expires. */
1110 if (timerqueue
.objcount
)
1112 timeout
.QuadPart
= timeout_lower
;
1113 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1117 /* All timers have been destroyed, if no new timers are created
1118 * within some amount of time, then we can shutdown this thread. */
1119 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1120 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1121 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1127 timerqueue
.thread_running
= FALSE
;
1128 RtlLeaveCriticalSection( &timerqueue
.cs
);
1130 TRACE( "terminating timer queue thread\n" );
1131 RtlExitUserThread( 0 );
1134 /***********************************************************************
1135 * tp_new_worker_thread (internal)
1137 * Create and account a new worker thread for the desired pool.
1139 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1144 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1145 threadpool_worker_proc
, pool
, &thread
, NULL
);
1146 if (status
== STATUS_SUCCESS
)
1148 InterlockedIncrement( &pool
->refcount
);
1149 pool
->num_workers
++;
1155 /***********************************************************************
1156 * tp_timerqueue_lock (internal)
1158 * Acquires a lock on the global timerqueue. When the lock is acquired
1159 * successfully, it is guaranteed that the timer thread is running.
1161 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1163 NTSTATUS status
= STATUS_SUCCESS
;
1164 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1166 timer
->u
.timer
.timer_initialized
= FALSE
;
1167 timer
->u
.timer
.timer_pending
= FALSE
;
1168 timer
->u
.timer
.timer_set
= FALSE
;
1169 timer
->u
.timer
.timeout
= 0;
1170 timer
->u
.timer
.period
= 0;
1171 timer
->u
.timer
.window_length
= 0;
1173 RtlEnterCriticalSection( &timerqueue
.cs
);
1175 /* Make sure that the timerqueue thread is running. */
1176 if (!timerqueue
.thread_running
)
1179 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1180 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1181 if (status
== STATUS_SUCCESS
)
1183 timerqueue
.thread_running
= TRUE
;
1188 if (status
== STATUS_SUCCESS
)
1190 timer
->u
.timer
.timer_initialized
= TRUE
;
1191 timerqueue
.objcount
++;
1194 RtlLeaveCriticalSection( &timerqueue
.cs
);
1198 /***********************************************************************
1199 * tp_timerqueue_unlock (internal)
1201 * Releases a lock on the global timerqueue.
1203 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1205 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1207 RtlEnterCriticalSection( &timerqueue
.cs
);
1208 if (timer
->u
.timer
.timer_initialized
)
1210 /* If timer was pending, remove it. */
1211 if (timer
->u
.timer
.timer_pending
)
1213 list_remove( &timer
->u
.timer
.timer_entry
);
1214 timer
->u
.timer
.timer_pending
= FALSE
;
1217 /* If the last timer object was destroyed, then wake up the thread. */
1218 if (!--timerqueue
.objcount
)
1220 assert( list_empty( &timerqueue
.pending_timers
) );
1221 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1224 timer
->u
.timer
.timer_initialized
= FALSE
;
1226 RtlLeaveCriticalSection( &timerqueue
.cs
);
1229 /***********************************************************************
1230 * waitqueue_thread_proc (internal)
1232 static void CALLBACK
waitqueue_thread_proc( void *param
)
1234 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1235 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1236 struct waitqueue_bucket
*bucket
= param
;
1237 struct threadpool_object
*wait
, *next
;
1238 LARGE_INTEGER now
, timeout
;
1242 TRACE( "starting wait queue thread\n" );
1244 RtlEnterCriticalSection( &waitqueue
.cs
);
1248 NtQuerySystemTime( &now
);
1249 timeout
.QuadPart
= MAXLONGLONG
;
1252 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1255 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1256 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1258 /* Wait object timed out. */
1259 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1261 list_remove( &wait
->u
.wait
.wait_entry
);
1262 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1264 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1266 InterlockedIncrement( &wait
->refcount
);
1267 wait
->num_pending_callbacks
++;
1268 RtlEnterCriticalSection( &wait
->pool
->cs
);
1269 tp_object_execute( wait
, TRUE
);
1270 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1271 tp_object_release( wait
);
1273 else tp_object_submit( wait
, FALSE
);
1277 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1278 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1280 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1281 InterlockedIncrement( &wait
->refcount
);
1282 objects
[num_handles
] = wait
;
1283 handles
[num_handles
] = wait
->u
.wait
.handle
;
1288 if (!bucket
->objcount
)
1290 /* All wait objects have been destroyed, if no new wait objects are created
1291 * within some amount of time, then we can shutdown this thread. */
1292 assert( num_handles
== 0 );
1293 RtlLeaveCriticalSection( &waitqueue
.cs
);
1294 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1295 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, bucket
->alertable
, &timeout
);
1296 RtlEnterCriticalSection( &waitqueue
.cs
);
1298 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1303 handles
[num_handles
] = bucket
->update_event
;
1304 RtlLeaveCriticalSection( &waitqueue
.cs
);
1305 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, bucket
->alertable
, &timeout
);
1306 RtlEnterCriticalSection( &waitqueue
.cs
);
1308 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1310 wait
= objects
[status
- STATUS_WAIT_0
];
1311 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1312 if (wait
->u
.wait
.bucket
)
1314 /* Wait object signaled. */
1315 assert( wait
->u
.wait
.bucket
== bucket
);
1316 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1318 list_remove( &wait
->u
.wait
.wait_entry
);
1319 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1321 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1323 wait
->u
.wait
.signaled
++;
1324 wait
->num_pending_callbacks
++;
1325 RtlEnterCriticalSection( &wait
->pool
->cs
);
1326 tp_object_execute( wait
, TRUE
);
1327 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1329 else tp_object_submit( wait
, TRUE
);
1332 WARN("wait object %p triggered while object was destroyed\n", wait
);
1335 /* Release temporary references to wait objects. */
1338 wait
= objects
[--num_handles
];
1339 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1340 tp_object_release( wait
);
1344 /* Try to merge bucket with other threads. */
1345 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1346 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1348 struct waitqueue_bucket
*other_bucket
;
1349 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1351 if (other_bucket
!= bucket
&& other_bucket
->objcount
&& other_bucket
->alertable
== bucket
->alertable
&&
1352 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1354 other_bucket
->objcount
+= bucket
->objcount
;
1355 bucket
->objcount
= 0;
1357 /* Update reserved list. */
1358 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1360 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1361 wait
->u
.wait
.bucket
= other_bucket
;
1363 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1365 /* Update waiting list. */
1366 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1368 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1369 wait
->u
.wait
.bucket
= other_bucket
;
1371 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1373 /* Move bucket to the end, to keep the probability of
1374 * newly added wait objects as small as possible. */
1375 list_remove( &bucket
->bucket_entry
);
1376 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1378 NtSetEvent( other_bucket
->update_event
, NULL
);
1385 /* Remove this bucket from the list. */
1386 list_remove( &bucket
->bucket_entry
);
1387 if (!--waitqueue
.num_buckets
)
1388 assert( list_empty( &waitqueue
.buckets
) );
1390 RtlLeaveCriticalSection( &waitqueue
.cs
);
1392 TRACE( "terminating wait queue thread\n" );
1394 assert( bucket
->objcount
== 0 );
1395 assert( list_empty( &bucket
->reserved
) );
1396 assert( list_empty( &bucket
->waiting
) );
1397 NtClose( bucket
->update_event
);
1399 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1400 RtlExitUserThread( 0 );
1403 /***********************************************************************
1404 * tp_waitqueue_lock (internal)
1406 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1408 struct waitqueue_bucket
*bucket
;
1411 BOOL alertable
= (wait
->u
.wait
.flags
& WT_EXECUTEINIOTHREAD
) != 0;
1412 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1414 wait
->u
.wait
.signaled
= 0;
1415 wait
->u
.wait
.bucket
= NULL
;
1416 wait
->u
.wait
.wait_pending
= FALSE
;
1417 wait
->u
.wait
.timeout
= 0;
1418 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1420 RtlEnterCriticalSection( &waitqueue
.cs
);
1422 /* Try to assign to existing bucket if possible. */
1423 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1425 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
&& bucket
->alertable
== alertable
)
1427 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1428 wait
->u
.wait
.bucket
= bucket
;
1431 status
= STATUS_SUCCESS
;
1436 /* Create a new bucket and corresponding worker thread. */
1437 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1440 status
= STATUS_NO_MEMORY
;
1444 bucket
->objcount
= 0;
1445 bucket
->alertable
= alertable
;
1446 list_init( &bucket
->reserved
);
1447 list_init( &bucket
->waiting
);
1449 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1450 NULL
, SynchronizationEvent
, FALSE
);
1453 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1457 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1458 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1459 if (status
== STATUS_SUCCESS
)
1461 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1462 waitqueue
.num_buckets
++;
1464 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1465 wait
->u
.wait
.bucket
= bucket
;
1472 NtClose( bucket
->update_event
);
1473 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1477 RtlLeaveCriticalSection( &waitqueue
.cs
);
1481 /***********************************************************************
1482 * tp_waitqueue_unlock (internal)
1484 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1486 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1488 RtlEnterCriticalSection( &waitqueue
.cs
);
1489 if (wait
->u
.wait
.bucket
)
1491 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1492 assert( bucket
->objcount
> 0 );
1494 list_remove( &wait
->u
.wait
.wait_entry
);
1495 wait
->u
.wait
.bucket
= NULL
;
1498 NtSetEvent( bucket
->update_event
, NULL
);
1500 RtlLeaveCriticalSection( &waitqueue
.cs
);
1503 static void CALLBACK
ioqueue_thread_proc( void *param
)
1505 struct io_completion
*completion
;
1506 struct threadpool_object
*io
;
1507 IO_STATUS_BLOCK iosb
;
1508 ULONG_PTR key
, value
;
1511 TRACE( "starting I/O completion thread\n" );
1513 RtlEnterCriticalSection( &ioqueue
.cs
);
1517 RtlLeaveCriticalSection( &ioqueue
.cs
);
1518 if ((status
= NtRemoveIoCompletion( ioqueue
.port
, &key
, &value
, &iosb
, NULL
)))
1519 ERR("NtRemoveIoCompletion failed, status %#x.\n", status
);
1520 RtlEnterCriticalSection( &ioqueue
.cs
);
1524 io
= (struct threadpool_object
*)key
;
1526 RtlEnterCriticalSection( &io
->pool
->cs
);
1528 if (!array_reserve((void **)&io
->u
.io
.completions
, &io
->u
.io
.completion_max
,
1529 io
->u
.io
.completion_count
+ 1, sizeof(*io
->u
.io
.completions
)))
1531 ERR("Failed to allocate memory.\n");
1532 RtlLeaveCriticalSection( &io
->pool
->cs
);
1536 completion
= &io
->u
.io
.completions
[io
->u
.io
.completion_count
++];
1537 completion
->iosb
= iosb
;
1538 completion
->cvalue
= value
;
1540 tp_object_submit( io
, FALSE
);
1542 RtlLeaveCriticalSection( &io
->pool
->cs
);
1545 if (!ioqueue
.objcount
)
1547 /* All I/O objects have been destroyed; if no new objects are
1548 * created within some amount of time, then we can shutdown this
1550 LARGE_INTEGER timeout
= {.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000};
1551 if (RtlSleepConditionVariableCS( &ioqueue
.update_event
, &ioqueue
.cs
,
1552 &timeout
) == STATUS_TIMEOUT
&& !ioqueue
.objcount
)
1557 RtlLeaveCriticalSection( &ioqueue
.cs
);
1559 TRACE( "terminating I/O completion thread\n" );
1561 RtlExitUserThread( 0 );
1564 static NTSTATUS
tp_ioqueue_lock( struct threadpool_object
*io
, HANDLE file
)
1566 NTSTATUS status
= STATUS_SUCCESS
;
1568 assert( io
->type
== TP_OBJECT_TYPE_IO
);
1570 RtlEnterCriticalSection( &ioqueue
.cs
);
1572 if (!ioqueue
.port
&& (status
= NtCreateIoCompletion( &ioqueue
.port
,
1573 IO_COMPLETION_ALL_ACCESS
, NULL
, 0 )))
1575 RtlLeaveCriticalSection( &ioqueue
.cs
);
1579 if (!ioqueue
.thread_running
)
1583 if (!(status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
,
1584 NULL
, 0, 0, ioqueue_thread_proc
, NULL
, &thread
, NULL
)))
1586 ioqueue
.thread_running
= TRUE
;
1591 if (status
== STATUS_SUCCESS
)
1593 FILE_COMPLETION_INFORMATION info
;
1594 IO_STATUS_BLOCK iosb
;
1596 info
.CompletionPort
= ioqueue
.port
;
1597 info
.CompletionKey
= (ULONG_PTR
)io
;
1599 status
= NtSetInformationFile( file
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
1602 if (status
== STATUS_SUCCESS
)
1604 if (!ioqueue
.objcount
++)
1605 RtlWakeConditionVariable( &ioqueue
.update_event
);
1608 RtlLeaveCriticalSection( &ioqueue
.cs
);
1612 static void tp_ioqueue_unlock( struct threadpool_object
*io
)
1614 assert( io
->type
== TP_OBJECT_TYPE_IO
);
1616 RtlEnterCriticalSection( &ioqueue
.cs
);
1618 if (!--ioqueue
.objcount
)
1619 NtSetIoCompletion( ioqueue
.port
, 0, 0, STATUS_SUCCESS
, 0 );
1621 RtlLeaveCriticalSection( &ioqueue
.cs
);
1624 /***********************************************************************
1625 * tp_threadpool_alloc (internal)
1627 * Allocates a new threadpool object.
1629 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1631 IMAGE_NT_HEADERS
*nt
= RtlImageNtHeader( NtCurrentTeb()->Peb
->ImageBaseAddress
);
1632 struct threadpool
*pool
;
1635 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1637 return STATUS_NO_MEMORY
;
1641 pool
->shutdown
= FALSE
;
1643 RtlInitializeCriticalSection( &pool
->cs
);
1644 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1646 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1647 list_init( &pool
->pools
[i
] );
1648 RtlInitializeConditionVariable( &pool
->update_event
);
1650 pool
->max_workers
= 500;
1651 pool
->min_workers
= 0;
1652 pool
->num_workers
= 0;
1653 pool
->num_busy_workers
= 0;
1654 pool
->stack_info
.StackReserve
= nt
->OptionalHeader
.SizeOfStackReserve
;
1655 pool
->stack_info
.StackCommit
= nt
->OptionalHeader
.SizeOfStackCommit
;
1657 TRACE( "allocated threadpool %p\n", pool
);
1660 return STATUS_SUCCESS
;
1663 /***********************************************************************
1664 * tp_threadpool_shutdown (internal)
1666 * Prepares the shutdown of a threadpool object and notifies all worker
1667 * threads to terminate (after all remaining work items have been
1670 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1672 assert( pool
!= default_threadpool
);
1674 pool
->shutdown
= TRUE
;
1675 RtlWakeAllConditionVariable( &pool
->update_event
);
1678 /***********************************************************************
1679 * tp_threadpool_release (internal)
1681 * Releases a reference to a threadpool object.
1683 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1687 if (InterlockedDecrement( &pool
->refcount
))
1690 TRACE( "destroying threadpool %p\n", pool
);
1692 assert( pool
->shutdown
);
1693 assert( !pool
->objcount
);
1694 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1695 assert( list_empty( &pool
->pools
[i
] ) );
1697 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1698 RtlDeleteCriticalSection( &pool
->cs
);
1700 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1704 /***********************************************************************
1705 * tp_threadpool_lock (internal)
1707 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1708 * block. When the lock is acquired successfully, it is guaranteed that
1709 * there is at least one worker thread to process tasks.
1711 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1713 struct threadpool
*pool
= NULL
;
1714 NTSTATUS status
= STATUS_SUCCESS
;
1718 /* Validate environment parameters. */
1719 if (environment
->Version
== 3)
1721 TP_CALLBACK_ENVIRON_V3
*environment3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1723 switch (environment3
->CallbackPriority
)
1725 case TP_CALLBACK_PRIORITY_HIGH
:
1726 case TP_CALLBACK_PRIORITY_NORMAL
:
1727 case TP_CALLBACK_PRIORITY_LOW
:
1730 return STATUS_INVALID_PARAMETER
;
1734 pool
= (struct threadpool
*)environment
->Pool
;
1739 if (!default_threadpool
)
1741 status
= tp_threadpool_alloc( &pool
);
1742 if (status
!= STATUS_SUCCESS
)
1745 if (InterlockedCompareExchangePointer( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1747 tp_threadpool_shutdown( pool
);
1748 tp_threadpool_release( pool
);
1752 pool
= default_threadpool
;
1755 RtlEnterCriticalSection( &pool
->cs
);
1757 /* Make sure that the threadpool has at least one thread. */
1758 if (!pool
->num_workers
)
1759 status
= tp_new_worker_thread( pool
);
1761 /* Keep a reference, and increment objcount to ensure that the
1762 * last thread doesn't terminate. */
1763 if (status
== STATUS_SUCCESS
)
1765 InterlockedIncrement( &pool
->refcount
);
1769 RtlLeaveCriticalSection( &pool
->cs
);
1771 if (status
!= STATUS_SUCCESS
)
1775 return STATUS_SUCCESS
;
1778 /***********************************************************************
1779 * tp_threadpool_unlock (internal)
1781 * Releases a lock on a threadpool.
1783 static void tp_threadpool_unlock( struct threadpool
*pool
)
1785 RtlEnterCriticalSection( &pool
->cs
);
1787 RtlLeaveCriticalSection( &pool
->cs
);
1788 tp_threadpool_release( pool
);
1791 /***********************************************************************
1792 * tp_group_alloc (internal)
1794 * Allocates a new threadpool group object.
1796 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1798 struct threadpool_group
*group
;
1800 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1802 return STATUS_NO_MEMORY
;
1804 group
->refcount
= 1;
1805 group
->shutdown
= FALSE
;
1807 RtlInitializeCriticalSection( &group
->cs
);
1808 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1810 list_init( &group
->members
);
1812 TRACE( "allocated group %p\n", group
);
1815 return STATUS_SUCCESS
;
1818 /***********************************************************************
1819 * tp_group_shutdown (internal)
1821 * Marks the group object for shutdown.
1823 static void tp_group_shutdown( struct threadpool_group
*group
)
1825 group
->shutdown
= TRUE
;
1828 /***********************************************************************
1829 * tp_group_release (internal)
1831 * Releases a reference to a group object.
1833 static BOOL
tp_group_release( struct threadpool_group
*group
)
1835 if (InterlockedDecrement( &group
->refcount
))
1838 TRACE( "destroying group %p\n", group
);
1840 assert( group
->shutdown
);
1841 assert( list_empty( &group
->members
) );
1843 group
->cs
.DebugInfo
->Spare
[0] = 0;
1844 RtlDeleteCriticalSection( &group
->cs
);
1846 RtlFreeHeap( GetProcessHeap(), 0, group
);
1850 /***********************************************************************
1851 * tp_object_initialize (internal)
1853 * Initializes members of a threadpool object.
1855 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1856 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1858 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1860 object
->refcount
= 1;
1861 object
->shutdown
= FALSE
;
1863 object
->pool
= pool
;
1864 object
->group
= NULL
;
1865 object
->userdata
= userdata
;
1866 object
->group_cancel_callback
= NULL
;
1867 object
->finalization_callback
= NULL
;
1868 object
->may_run_long
= 0;
1869 object
->race_dll
= NULL
;
1870 object
->priority
= TP_CALLBACK_PRIORITY_NORMAL
;
1872 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1873 object
->is_group_member
= FALSE
;
1875 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1876 RtlInitializeConditionVariable( &object
->finished_event
);
1877 RtlInitializeConditionVariable( &object
->group_finished_event
);
1878 object
->completed_event
= NULL
;
1879 object
->num_pending_callbacks
= 0;
1880 object
->num_running_callbacks
= 0;
1881 object
->num_associated_callbacks
= 0;
1885 if (environment
->Version
!= 1 && environment
->Version
!= 3)
1886 FIXME( "unsupported environment version %u\n", environment
->Version
);
1888 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1889 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1890 object
->finalization_callback
= environment
->FinalizationCallback
;
1891 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1892 object
->race_dll
= environment
->RaceDll
;
1893 if (environment
->Version
== 3)
1895 TP_CALLBACK_ENVIRON_V3
*environment_v3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1897 object
->priority
= environment_v3
->CallbackPriority
;
1898 assert( object
->priority
< ARRAY_SIZE(pool
->pools
) );
1901 if (environment
->ActivationContext
)
1902 FIXME( "activation context not supported yet\n" );
1904 if (environment
->u
.s
.Persistent
)
1905 FIXME( "persistent threads not supported yet\n" );
1908 if (object
->race_dll
)
1909 LdrAddRefDll( 0, object
->race_dll
);
1911 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1913 /* For simple callbacks we have to run tp_object_submit before adding this object
1914 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1915 * will be set, and tp_object_submit would fail with an assertion. */
1917 if (is_simple_callback
)
1918 tp_object_submit( object
, FALSE
);
1922 struct threadpool_group
*group
= object
->group
;
1923 InterlockedIncrement( &group
->refcount
);
1925 RtlEnterCriticalSection( &group
->cs
);
1926 list_add_tail( &group
->members
, &object
->group_entry
);
1927 object
->is_group_member
= TRUE
;
1928 RtlLeaveCriticalSection( &group
->cs
);
1931 if (is_simple_callback
)
1932 tp_object_release( object
);
1935 static void tp_object_prio_queue( struct threadpool_object
*object
)
1937 ++object
->pool
->num_busy_workers
;
1938 list_add_tail( &object
->pool
->pools
[object
->priority
], &object
->pool_entry
);
1941 /***********************************************************************
1942 * tp_object_submit (internal)
1944 * Submits a threadpool object to the associated threadpool. This
1945 * function has to be VOID because TpPostWork can never fail on Windows.
1947 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1949 struct threadpool
*pool
= object
->pool
;
1950 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1952 assert( !object
->shutdown
);
1953 assert( !pool
->shutdown
);
1955 RtlEnterCriticalSection( &pool
->cs
);
1957 /* Start new worker threads if required. */
1958 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1959 pool
->num_workers
< pool
->max_workers
)
1960 status
= tp_new_worker_thread( pool
);
1962 /* Queue work item and increment refcount. */
1963 InterlockedIncrement( &object
->refcount
);
1964 if (!object
->num_pending_callbacks
++)
1965 tp_object_prio_queue( object
);
1967 /* Count how often the object was signaled. */
1968 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
1969 object
->u
.wait
.signaled
++;
1971 /* No new thread started - wake up one existing thread. */
1972 if (status
!= STATUS_SUCCESS
)
1974 assert( pool
->num_workers
> 0 );
1975 RtlWakeConditionVariable( &pool
->update_event
);
1978 RtlLeaveCriticalSection( &pool
->cs
);
1981 /***********************************************************************
1982 * tp_object_cancel (internal)
1984 * Cancels all currently pending callbacks for a specific object.
1986 static void tp_object_cancel( struct threadpool_object
*object
)
1988 struct threadpool
*pool
= object
->pool
;
1989 LONG pending_callbacks
= 0;
1991 RtlEnterCriticalSection( &pool
->cs
);
1992 if (object
->num_pending_callbacks
)
1994 pending_callbacks
= object
->num_pending_callbacks
;
1995 object
->num_pending_callbacks
= 0;
1996 list_remove( &object
->pool_entry
);
1998 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
1999 object
->u
.wait
.signaled
= 0;
2001 if (object
->type
== TP_OBJECT_TYPE_IO
)
2002 object
->u
.io
.pending_count
= 0;
2003 RtlLeaveCriticalSection( &pool
->cs
);
2005 while (pending_callbacks
--)
2006 tp_object_release( object
);
2009 static BOOL
object_is_finished( struct threadpool_object
*object
, BOOL group
)
2011 if (object
->num_pending_callbacks
)
2013 if (object
->type
== TP_OBJECT_TYPE_IO
&& object
->u
.io
.pending_count
)
2017 return !object
->num_running_callbacks
;
2019 return !object
->num_associated_callbacks
;
2022 /***********************************************************************
2023 * tp_object_wait (internal)
2025 * Waits until all pending and running callbacks of a specific object
2026 * have been processed.
2028 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
2030 struct threadpool
*pool
= object
->pool
;
2032 RtlEnterCriticalSection( &pool
->cs
);
2033 while (!object_is_finished( object
, group_wait
))
2036 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
2038 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2040 RtlLeaveCriticalSection( &pool
->cs
);
2043 /***********************************************************************
2044 * tp_object_prepare_shutdown (internal)
2046 * Prepares a threadpool object for shutdown.
2048 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2050 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2051 tp_timerqueue_unlock( object
);
2052 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2053 tp_waitqueue_unlock( object
);
2054 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2055 tp_ioqueue_unlock( object
);
2058 /***********************************************************************
2059 * tp_object_release (internal)
2061 * Releases a reference to a threadpool object.
2063 static BOOL
tp_object_release( struct threadpool_object
*object
)
2065 if (InterlockedDecrement( &object
->refcount
))
2068 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2070 assert( object
->shutdown
);
2071 assert( !object
->num_pending_callbacks
);
2072 assert( !object
->num_running_callbacks
);
2073 assert( !object
->num_associated_callbacks
);
2075 /* release reference to the group */
2078 struct threadpool_group
*group
= object
->group
;
2080 RtlEnterCriticalSection( &group
->cs
);
2081 if (object
->is_group_member
)
2083 list_remove( &object
->group_entry
);
2084 object
->is_group_member
= FALSE
;
2086 RtlLeaveCriticalSection( &group
->cs
);
2088 tp_group_release( group
);
2091 tp_threadpool_unlock( object
->pool
);
2093 if (object
->race_dll
)
2094 LdrUnloadDll( object
->race_dll
);
2096 if (object
->completed_event
&& object
->completed_event
!= INVALID_HANDLE_VALUE
)
2097 NtSetEvent( object
->completed_event
, NULL
);
2099 RtlFreeHeap( GetProcessHeap(), 0, object
);
2103 static struct list
*threadpool_get_next_item( const struct threadpool
*pool
)
2108 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
2110 if ((ptr
= list_head( &pool
->pools
[i
] )))
2117 /***********************************************************************
2118 * tp_object_execute (internal)
2120 * Executes a threadpool object callback, object->pool->cs has to be
2123 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
)
2125 TP_CALLBACK_INSTANCE
*callback_instance
;
2126 struct threadpool_instance instance
;
2127 struct io_completion completion
;
2128 struct threadpool
*pool
= object
->pool
;
2129 TP_WAIT_RESULT wait_result
= 0;
2132 object
->num_pending_callbacks
--;
2134 /* For wait objects check if they were signaled or have timed out. */
2135 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2137 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2138 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2140 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2142 assert( object
->u
.io
.completion_count
);
2143 completion
= object
->u
.io
.completions
[--object
->u
.io
.completion_count
];
2144 object
->u
.io
.pending_count
--;
2147 /* Leave critical section and do the actual callback. */
2148 object
->num_associated_callbacks
++;
2149 object
->num_running_callbacks
++;
2150 RtlLeaveCriticalSection( &pool
->cs
);
2151 if (wait_thread
) RtlLeaveCriticalSection( &waitqueue
.cs
);
2153 /* Initialize threadpool instance struct. */
2154 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2155 instance
.object
= object
;
2156 instance
.threadid
= GetCurrentThreadId();
2157 instance
.associated
= TRUE
;
2158 instance
.may_run_long
= object
->may_run_long
;
2159 instance
.cleanup
.critical_section
= NULL
;
2160 instance
.cleanup
.mutex
= NULL
;
2161 instance
.cleanup
.semaphore
= NULL
;
2162 instance
.cleanup
.semaphore_count
= 0;
2163 instance
.cleanup
.event
= NULL
;
2164 instance
.cleanup
.library
= NULL
;
2166 switch (object
->type
)
2168 case TP_OBJECT_TYPE_SIMPLE
:
2170 TRACE( "executing simple callback %p(%p, %p)\n",
2171 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2172 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2173 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2177 case TP_OBJECT_TYPE_WORK
:
2179 TRACE( "executing work callback %p(%p, %p, %p)\n",
2180 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2181 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2182 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2186 case TP_OBJECT_TYPE_TIMER
:
2188 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2189 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2190 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2191 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2195 case TP_OBJECT_TYPE_WAIT
:
2197 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2198 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2199 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2200 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2204 case TP_OBJECT_TYPE_IO
:
2206 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2207 object
->u
.io
.callback
, callback_instance
, object
->userdata
,
2208 completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2209 object
->u
.io
.callback( callback_instance
, object
->userdata
,
2210 (void *)completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2211 TRACE( "callback %p returned\n", object
->u
.io
.callback
);
2220 /* Execute finalization callback. */
2221 if (object
->finalization_callback
)
2223 TRACE( "executing finalization callback %p(%p, %p)\n",
2224 object
->finalization_callback
, callback_instance
, object
->userdata
);
2225 object
->finalization_callback( callback_instance
, object
->userdata
);
2226 TRACE( "callback %p returned\n", object
->finalization_callback
);
2229 /* Execute cleanup tasks. */
2230 if (instance
.cleanup
.critical_section
)
2232 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2234 if (instance
.cleanup
.mutex
)
2236 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2237 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2239 if (instance
.cleanup
.semaphore
)
2241 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2242 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2244 if (instance
.cleanup
.event
)
2246 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2247 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2249 if (instance
.cleanup
.library
)
2251 LdrUnloadDll( instance
.cleanup
.library
);
2255 if (wait_thread
) RtlEnterCriticalSection( &waitqueue
.cs
);
2256 RtlEnterCriticalSection( &pool
->cs
);
2258 /* Simple callbacks are automatically shutdown after execution. */
2259 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2261 tp_object_prepare_shutdown( object
);
2262 object
->shutdown
= TRUE
;
2265 object
->num_running_callbacks
--;
2266 if (object_is_finished( object
, TRUE
))
2267 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2269 if (instance
.associated
)
2271 object
->num_associated_callbacks
--;
2272 if (object_is_finished( object
, FALSE
))
2273 RtlWakeAllConditionVariable( &object
->finished_event
);
2277 /***********************************************************************
2278 * threadpool_worker_proc (internal)
2280 static void CALLBACK
threadpool_worker_proc( void *param
)
2282 struct threadpool
*pool
= param
;
2283 LARGE_INTEGER timeout
;
2286 TRACE( "starting worker thread for pool %p\n", pool
);
2288 RtlEnterCriticalSection( &pool
->cs
);
2291 while ((ptr
= threadpool_get_next_item( pool
)))
2293 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2294 assert( object
->num_pending_callbacks
> 0 );
2296 /* If further pending callbacks are queued, move the work item to
2297 * the end of the pool list. Otherwise remove it from the pool. */
2298 list_remove( &object
->pool_entry
);
2299 if (object
->num_pending_callbacks
> 1)
2300 tp_object_prio_queue( object
);
2302 tp_object_execute( object
, FALSE
);
2304 assert(pool
->num_busy_workers
);
2305 pool
->num_busy_workers
--;
2307 tp_object_release( object
);
2310 /* Shutdown worker thread if requested. */
2314 /* Wait for new tasks or until the timeout expires. A thread only terminates
2315 * when no new tasks are available, and the number of threads can be
2316 * decreased without violating the min_workers limit. An exception is when
2317 * min_workers == 0, then objcount is used to detect if the last thread
2318 * can be terminated. */
2319 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2320 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2321 !threadpool_get_next_item( pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2322 (!pool
->min_workers
&& !pool
->objcount
)))
2327 pool
->num_workers
--;
2328 RtlLeaveCriticalSection( &pool
->cs
);
2330 TRACE( "terminating worker thread for pool %p\n", pool
);
2331 tp_threadpool_release( pool
);
2332 RtlExitUserThread( 0 );
2335 /***********************************************************************
2336 * TpAllocCleanupGroup (NTDLL.@)
2338 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2340 TRACE( "%p\n", out
);
2342 return tp_group_alloc( (struct threadpool_group
**)out
);
2345 /***********************************************************************
2346 * TpAllocIoCompletion (NTDLL.@)
2348 NTSTATUS WINAPI
TpAllocIoCompletion( TP_IO
**out
, HANDLE file
, PTP_IO_CALLBACK callback
,
2349 void *userdata
, TP_CALLBACK_ENVIRON
*environment
)
2351 struct threadpool_object
*object
;
2352 struct threadpool
*pool
;
2355 TRACE( "%p %p %p %p %p\n", out
, file
, callback
, userdata
, environment
);
2357 if (!(object
= RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY
, sizeof(*object
) )))
2358 return STATUS_NO_MEMORY
;
2360 if ((status
= tp_threadpool_lock( &pool
, environment
)))
2362 RtlFreeHeap( GetProcessHeap(), 0, object
);
2366 object
->type
= TP_OBJECT_TYPE_IO
;
2367 object
->u
.io
.callback
= callback
;
2368 if (!(object
->u
.io
.completions
= RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object
->u
.io
.completions
) )))
2370 tp_threadpool_unlock( pool
);
2371 RtlFreeHeap( GetProcessHeap(), 0, object
);
2375 if ((status
= tp_ioqueue_lock( object
, file
)))
2377 tp_threadpool_unlock( pool
);
2378 RtlFreeHeap( GetProcessHeap(), 0, object
->u
.io
.completions
);
2379 RtlFreeHeap( GetProcessHeap(), 0, object
);
2383 tp_object_initialize( object
, pool
, userdata
, environment
);
2385 *out
= (TP_IO
*)object
;
2386 return STATUS_SUCCESS
;
2389 /***********************************************************************
2390 * TpAllocPool (NTDLL.@)
2392 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2394 TRACE( "%p %p\n", out
, reserved
);
2397 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2399 return tp_threadpool_alloc( (struct threadpool
**)out
);
2402 /***********************************************************************
2403 * TpAllocTimer (NTDLL.@)
2405 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2406 TP_CALLBACK_ENVIRON
*environment
)
2408 struct threadpool_object
*object
;
2409 struct threadpool
*pool
;
2412 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2414 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2416 return STATUS_NO_MEMORY
;
2418 status
= tp_threadpool_lock( &pool
, environment
);
2421 RtlFreeHeap( GetProcessHeap(), 0, object
);
2425 object
->type
= TP_OBJECT_TYPE_TIMER
;
2426 object
->u
.timer
.callback
= callback
;
2428 status
= tp_timerqueue_lock( object
);
2431 tp_threadpool_unlock( pool
);
2432 RtlFreeHeap( GetProcessHeap(), 0, object
);
2436 tp_object_initialize( object
, pool
, userdata
, environment
);
2438 *out
= (TP_TIMER
*)object
;
2439 return STATUS_SUCCESS
;
2442 static NTSTATUS
tp_alloc_wait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2443 TP_CALLBACK_ENVIRON
*environment
, DWORD flags
)
2445 struct threadpool_object
*object
;
2446 struct threadpool
*pool
;
2449 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2451 return STATUS_NO_MEMORY
;
2453 status
= tp_threadpool_lock( &pool
, environment
);
2456 RtlFreeHeap( GetProcessHeap(), 0, object
);
2460 object
->type
= TP_OBJECT_TYPE_WAIT
;
2461 object
->u
.wait
.callback
= callback
;
2462 object
->u
.wait
.flags
= flags
;
2464 status
= tp_waitqueue_lock( object
);
2467 tp_threadpool_unlock( pool
);
2468 RtlFreeHeap( GetProcessHeap(), 0, object
);
2472 tp_object_initialize( object
, pool
, userdata
, environment
);
2474 *out
= (TP_WAIT
*)object
;
2475 return STATUS_SUCCESS
;
2478 /***********************************************************************
2479 * TpAllocWait (NTDLL.@)
2481 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2482 TP_CALLBACK_ENVIRON
*environment
)
2484 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2485 return tp_alloc_wait( out
, callback
, userdata
, environment
, WT_EXECUTEONLYONCE
);
2488 /***********************************************************************
2489 * TpAllocWork (NTDLL.@)
2491 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2492 TP_CALLBACK_ENVIRON
*environment
)
2494 struct threadpool_object
*object
;
2495 struct threadpool
*pool
;
2498 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2500 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2502 return STATUS_NO_MEMORY
;
2504 status
= tp_threadpool_lock( &pool
, environment
);
2507 RtlFreeHeap( GetProcessHeap(), 0, object
);
2511 object
->type
= TP_OBJECT_TYPE_WORK
;
2512 object
->u
.work
.callback
= callback
;
2513 tp_object_initialize( object
, pool
, userdata
, environment
);
2515 *out
= (TP_WORK
*)object
;
2516 return STATUS_SUCCESS
;
2519 /***********************************************************************
2520 * TpCancelAsyncIoOperation (NTDLL.@)
2522 void WINAPI
TpCancelAsyncIoOperation( TP_IO
*io
)
2524 struct threadpool_object
*this = impl_from_TP_IO( io
);
2526 TRACE( "%p\n", io
);
2528 RtlEnterCriticalSection( &this->pool
->cs
);
2530 this->u
.io
.pending_count
--;
2531 if (object_is_finished( this, TRUE
))
2532 RtlWakeAllConditionVariable( &this->group_finished_event
);
2533 if (object_is_finished( this, FALSE
))
2534 RtlWakeAllConditionVariable( &this->finished_event
);
2536 RtlLeaveCriticalSection( &this->pool
->cs
);
2539 /***********************************************************************
2540 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2542 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2544 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2546 TRACE( "%p %p\n", instance
, crit
);
2548 if (!this->cleanup
.critical_section
)
2549 this->cleanup
.critical_section
= crit
;
2552 /***********************************************************************
2553 * TpCallbackMayRunLong (NTDLL.@)
2555 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2557 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2558 struct threadpool_object
*object
= this->object
;
2559 struct threadpool
*pool
;
2560 NTSTATUS status
= STATUS_SUCCESS
;
2562 TRACE( "%p\n", instance
);
2564 if (this->threadid
!= GetCurrentThreadId())
2566 ERR("called from wrong thread, ignoring\n");
2567 return STATUS_UNSUCCESSFUL
; /* FIXME */
2570 if (this->may_run_long
)
2571 return STATUS_SUCCESS
;
2573 pool
= object
->pool
;
2574 RtlEnterCriticalSection( &pool
->cs
);
2576 /* Start new worker threads if required. */
2577 if (pool
->num_busy_workers
>= pool
->num_workers
)
2579 if (pool
->num_workers
< pool
->max_workers
)
2581 status
= tp_new_worker_thread( pool
);
2585 status
= STATUS_TOO_MANY_THREADS
;
2589 RtlLeaveCriticalSection( &pool
->cs
);
2590 this->may_run_long
= TRUE
;
2594 /***********************************************************************
2595 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2597 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2599 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2601 TRACE( "%p %p\n", instance
, mutex
);
2603 if (!this->cleanup
.mutex
)
2604 this->cleanup
.mutex
= mutex
;
2607 /***********************************************************************
2608 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2610 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2612 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2614 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2616 if (!this->cleanup
.semaphore
)
2618 this->cleanup
.semaphore
= semaphore
;
2619 this->cleanup
.semaphore_count
= count
;
2623 /***********************************************************************
2624 * TpCallbackSetEventOnCompletion (NTDLL.@)
2626 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2628 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2630 TRACE( "%p %p\n", instance
, event
);
2632 if (!this->cleanup
.event
)
2633 this->cleanup
.event
= event
;
2636 /***********************************************************************
2637 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2639 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2641 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2643 TRACE( "%p %p\n", instance
, module
);
2645 if (!this->cleanup
.library
)
2646 this->cleanup
.library
= module
;
2649 /***********************************************************************
2650 * TpDisassociateCallback (NTDLL.@)
2652 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2654 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2655 struct threadpool_object
*object
= this->object
;
2656 struct threadpool
*pool
;
2658 TRACE( "%p\n", instance
);
2660 if (this->threadid
!= GetCurrentThreadId())
2662 ERR("called from wrong thread, ignoring\n");
2666 if (!this->associated
)
2669 pool
= object
->pool
;
2670 RtlEnterCriticalSection( &pool
->cs
);
2672 object
->num_associated_callbacks
--;
2673 if (object_is_finished( object
, FALSE
))
2674 RtlWakeAllConditionVariable( &object
->finished_event
);
2676 RtlLeaveCriticalSection( &pool
->cs
);
2677 this->associated
= FALSE
;
2680 /***********************************************************************
2681 * TpIsTimerSet (NTDLL.@)
2683 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2685 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2687 TRACE( "%p\n", timer
);
2689 return this->u
.timer
.timer_set
;
2692 /***********************************************************************
2693 * TpPostWork (NTDLL.@)
2695 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2697 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2699 TRACE( "%p\n", work
);
2701 tp_object_submit( this, FALSE
);
2704 /***********************************************************************
2705 * TpReleaseCleanupGroup (NTDLL.@)
2707 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2709 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2711 TRACE( "%p\n", group
);
2713 tp_group_shutdown( this );
2714 tp_group_release( this );
2717 /***********************************************************************
2718 * TpReleaseCleanupGroupMembers (NTDLL.@)
2720 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2722 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2723 struct threadpool_object
*object
, *next
;
2724 struct list members
;
2726 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2728 RtlEnterCriticalSection( &this->cs
);
2730 /* Unset group, increase references, and mark objects for shutdown */
2731 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2733 assert( object
->group
== this );
2734 assert( object
->is_group_member
);
2736 if (InterlockedIncrement( &object
->refcount
) == 1)
2738 /* Object is basically already destroyed, but group reference
2739 * was not deleted yet. We can safely ignore this object. */
2740 InterlockedDecrement( &object
->refcount
);
2741 list_remove( &object
->group_entry
);
2742 object
->is_group_member
= FALSE
;
2746 object
->is_group_member
= FALSE
;
2747 tp_object_prepare_shutdown( object
);
2750 /* Move members to a new temporary list */
2751 list_init( &members
);
2752 list_move_tail( &members
, &this->members
);
2754 RtlLeaveCriticalSection( &this->cs
);
2756 /* Cancel pending callbacks if requested */
2759 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2761 tp_object_cancel( object
);
2765 /* Wait for remaining callbacks to finish */
2766 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2768 tp_object_wait( object
, TRUE
);
2770 if (!object
->shutdown
)
2772 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2773 if (cancel_pending
&& object
->group_cancel_callback
)
2775 TRACE( "executing group cancel callback %p(%p, %p)\n",
2776 object
->group_cancel_callback
, object
->userdata
, userdata
);
2777 object
->group_cancel_callback( object
->userdata
, userdata
);
2778 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2781 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2782 tp_object_release( object
);
2785 object
->shutdown
= TRUE
;
2786 tp_object_release( object
);
2790 /***********************************************************************
2791 * TpReleaseIoCompletion (NTDLL.@)
2793 void WINAPI
TpReleaseIoCompletion( TP_IO
*io
)
2795 struct threadpool_object
*this = impl_from_TP_IO( io
);
2797 TRACE( "%p\n", io
);
2799 tp_object_prepare_shutdown( this );
2800 this->shutdown
= TRUE
;
2801 tp_object_release( this );
2804 /***********************************************************************
2805 * TpReleasePool (NTDLL.@)
2807 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2809 struct threadpool
*this = impl_from_TP_POOL( pool
);
2811 TRACE( "%p\n", pool
);
2813 tp_threadpool_shutdown( this );
2814 tp_threadpool_release( this );
2817 /***********************************************************************
2818 * TpReleaseTimer (NTDLL.@)
2820 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2822 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2824 TRACE( "%p\n", timer
);
2826 tp_object_prepare_shutdown( this );
2827 this->shutdown
= TRUE
;
2828 tp_object_release( this );
2831 /***********************************************************************
2832 * TpReleaseWait (NTDLL.@)
2834 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2836 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2838 TRACE( "%p\n", wait
);
2840 tp_object_prepare_shutdown( this );
2841 this->shutdown
= TRUE
;
2842 tp_object_release( this );
2845 /***********************************************************************
2846 * TpReleaseWork (NTDLL.@)
2848 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2850 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2852 TRACE( "%p\n", work
);
2854 tp_object_prepare_shutdown( this );
2855 this->shutdown
= TRUE
;
2856 tp_object_release( this );
2859 /***********************************************************************
2860 * TpSetPoolMaxThreads (NTDLL.@)
2862 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2864 struct threadpool
*this = impl_from_TP_POOL( pool
);
2866 TRACE( "%p %u\n", pool
, maximum
);
2868 RtlEnterCriticalSection( &this->cs
);
2869 this->max_workers
= max( maximum
, 1 );
2870 this->min_workers
= min( this->min_workers
, this->max_workers
);
2871 RtlLeaveCriticalSection( &this->cs
);
2874 /***********************************************************************
2875 * TpSetPoolMinThreads (NTDLL.@)
2877 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2879 struct threadpool
*this = impl_from_TP_POOL( pool
);
2880 NTSTATUS status
= STATUS_SUCCESS
;
2882 TRACE( "%p %u\n", pool
, minimum
);
2884 RtlEnterCriticalSection( &this->cs
);
2886 while (this->num_workers
< minimum
)
2888 status
= tp_new_worker_thread( this );
2889 if (status
!= STATUS_SUCCESS
)
2893 if (status
== STATUS_SUCCESS
)
2895 this->min_workers
= minimum
;
2896 this->max_workers
= max( this->min_workers
, this->max_workers
);
2899 RtlLeaveCriticalSection( &this->cs
);
2903 /***********************************************************************
2904 * TpSetTimer (NTDLL.@)
2906 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2908 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2909 struct threadpool_object
*other_timer
;
2910 BOOL submit_timer
= FALSE
;
2911 ULONGLONG timestamp
;
2913 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2915 RtlEnterCriticalSection( &timerqueue
.cs
);
2917 assert( this->u
.timer
.timer_initialized
);
2918 this->u
.timer
.timer_set
= timeout
!= NULL
;
2920 /* Convert relative timeout to absolute timestamp and handle a timeout
2921 * of zero, which means that the timer is submitted immediately. */
2924 timestamp
= timeout
->QuadPart
;
2925 if ((LONGLONG
)timestamp
< 0)
2928 NtQuerySystemTime( &now
);
2929 timestamp
= now
.QuadPart
- timestamp
;
2931 else if (!timestamp
)
2938 NtQuerySystemTime( &now
);
2939 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
2941 submit_timer
= TRUE
;
2945 /* First remove existing timeout. */
2946 if (this->u
.timer
.timer_pending
)
2948 list_remove( &this->u
.timer
.timer_entry
);
2949 this->u
.timer
.timer_pending
= FALSE
;
2952 /* If the timer was enabled, then add it back to the queue. */
2955 this->u
.timer
.timeout
= timestamp
;
2956 this->u
.timer
.period
= period
;
2957 this->u
.timer
.window_length
= window_length
;
2959 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
2960 struct threadpool_object
, u
.timer
.timer_entry
)
2962 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
2963 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
2966 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
2968 /* Wake up the timer thread when the timeout has to be updated. */
2969 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
2970 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
2972 this->u
.timer
.timer_pending
= TRUE
;
2975 RtlLeaveCriticalSection( &timerqueue
.cs
);
2978 tp_object_submit( this, FALSE
);
2981 /***********************************************************************
2982 * TpSetWait (NTDLL.@)
2984 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
2986 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2987 ULONGLONG timestamp
= MAXLONGLONG
;
2989 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
2991 RtlEnterCriticalSection( &waitqueue
.cs
);
2993 assert( this->u
.wait
.bucket
);
2994 this->u
.wait
.handle
= handle
;
2996 if (handle
|| this->u
.wait
.wait_pending
)
2998 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
2999 list_remove( &this->u
.wait
.wait_entry
);
3001 /* Convert relative timeout to absolute timestamp. */
3002 if (handle
&& timeout
)
3004 timestamp
= timeout
->QuadPart
;
3005 if ((LONGLONG
)timestamp
< 0)
3008 NtQuerySystemTime( &now
);
3009 timestamp
= now
.QuadPart
- timestamp
;
3013 /* Add wait object back into one of the queues. */
3016 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
3017 this->u
.wait
.wait_pending
= TRUE
;
3018 this->u
.wait
.timeout
= timestamp
;
3022 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
3023 this->u
.wait
.wait_pending
= FALSE
;
3026 /* Wake up the wait queue thread. */
3027 NtSetEvent( bucket
->update_event
, NULL
);
3030 RtlLeaveCriticalSection( &waitqueue
.cs
);
3033 /***********************************************************************
3034 * TpSimpleTryPost (NTDLL.@)
3036 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
3037 TP_CALLBACK_ENVIRON
*environment
)
3039 struct threadpool_object
*object
;
3040 struct threadpool
*pool
;
3043 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
3045 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
3047 return STATUS_NO_MEMORY
;
3049 status
= tp_threadpool_lock( &pool
, environment
);
3052 RtlFreeHeap( GetProcessHeap(), 0, object
);
3056 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
3057 object
->u
.simple
.callback
= callback
;
3058 tp_object_initialize( object
, pool
, userdata
, environment
);
3060 return STATUS_SUCCESS
;
3063 /***********************************************************************
3064 * TpStartAsyncIoOperation (NTDLL.@)
3066 void WINAPI
TpStartAsyncIoOperation( TP_IO
*io
)
3068 struct threadpool_object
*this = impl_from_TP_IO( io
);
3070 TRACE( "%p\n", io
);
3072 RtlEnterCriticalSection( &this->pool
->cs
);
3074 this->u
.io
.pending_count
++;
3076 RtlLeaveCriticalSection( &this->pool
->cs
);
3079 /***********************************************************************
3080 * TpWaitForIoCompletion (NTDLL.@)
3082 void WINAPI
TpWaitForIoCompletion( TP_IO
*io
, BOOL cancel_pending
)
3084 struct threadpool_object
*this = impl_from_TP_IO( io
);
3086 TRACE( "%p %d\n", io
, cancel_pending
);
3089 tp_object_cancel( this );
3090 tp_object_wait( this, FALSE
);
3093 /***********************************************************************
3094 * TpWaitForTimer (NTDLL.@)
3096 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
3098 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
3100 TRACE( "%p %d\n", timer
, cancel_pending
);
3103 tp_object_cancel( this );
3104 tp_object_wait( this, FALSE
);
3107 /***********************************************************************
3108 * TpWaitForWait (NTDLL.@)
3110 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
3112 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3114 TRACE( "%p %d\n", wait
, cancel_pending
);
3117 tp_object_cancel( this );
3118 tp_object_wait( this, FALSE
);
3121 /***********************************************************************
3122 * TpWaitForWork (NTDLL.@)
3124 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
3126 struct threadpool_object
*this = impl_from_TP_WORK( work
);
3128 TRACE( "%p %u\n", work
, cancel_pending
);
3131 tp_object_cancel( this );
3132 tp_object_wait( this, FALSE
);
3135 /***********************************************************************
3136 * TpSetPoolStackInformation (NTDLL.@)
3138 NTSTATUS WINAPI
TpSetPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3140 struct threadpool
*this = impl_from_TP_POOL( pool
);
3142 TRACE( "%p %p\n", pool
, stack_info
);
3145 return STATUS_INVALID_PARAMETER
;
3147 RtlEnterCriticalSection( &this->cs
);
3148 this->stack_info
= *stack_info
;
3149 RtlLeaveCriticalSection( &this->cs
);
3151 return STATUS_SUCCESS
;
3154 /***********************************************************************
3155 * TpQueryPoolStackInformation (NTDLL.@)
3157 NTSTATUS WINAPI
TpQueryPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3159 struct threadpool
*this = impl_from_TP_POOL( pool
);
3161 TRACE( "%p %p\n", pool
, stack_info
);
3164 return STATUS_INVALID_PARAMETER
;
3166 RtlEnterCriticalSection( &this->cs
);
3167 *stack_info
= this->stack_info
;
3168 RtlLeaveCriticalSection( &this->cs
);
3170 return STATUS_SUCCESS
;
3173 static void CALLBACK
rtl_wait_callback( TP_CALLBACK_INSTANCE
*instance
, void *userdata
, TP_WAIT
*wait
, TP_WAIT_RESULT result
)
3175 struct threadpool_object
*object
= impl_from_TP_WAIT(wait
);
3176 object
->u
.wait
.rtl_callback( userdata
, result
!= STATUS_WAIT_0
);
3179 /***********************************************************************
3180 * RtlRegisterWait (NTDLL.@)
3182 * Registers a wait for a handle to become signaled.
3185 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3186 * Object [I] Object to wait to become signaled.
3187 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3188 * Context [I] Context to pass to the callback function when it is executed.
3189 * Milliseconds [I] Number of milliseconds to wait before timing out.
3190 * Flags [I] Flags. See notes.
3193 * Success: STATUS_SUCCESS.
3194 * Failure: Any NTSTATUS code.
3197 * Flags can be one or more of the following:
3198 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3199 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3200 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3201 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3202 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3204 NTSTATUS WINAPI
RtlRegisterWait( HANDLE
*out
, HANDLE handle
, RTL_WAITORTIMERCALLBACKFUNC callback
,
3205 void *context
, ULONG milliseconds
, ULONG flags
)
3207 struct threadpool_object
*object
;
3208 TP_CALLBACK_ENVIRON environment
;
3209 LARGE_INTEGER timeout
;
3213 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
3214 out
, handle
, callback
, context
, milliseconds
, flags
);
3216 memset( &environment
, 0, sizeof(environment
) );
3217 environment
.Version
= 1;
3218 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
3219 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
3221 flags
&= (WT_EXECUTEONLYONCE
| WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
);
3222 if ((status
= tp_alloc_wait( &wait
, rtl_wait_callback
, context
, &environment
, flags
)))
3225 object
= impl_from_TP_WAIT(wait
);
3226 object
->u
.wait
.rtl_callback
= callback
;
3228 RtlEnterCriticalSection( &waitqueue
.cs
);
3229 TpSetWait( (TP_WAIT
*)object
, handle
, get_nt_timeout( &timeout
, milliseconds
) );
3232 RtlLeaveCriticalSection( &waitqueue
.cs
);
3234 return STATUS_SUCCESS
;
3237 /***********************************************************************
3238 * RtlDeregisterWaitEx (NTDLL.@)
3240 * Cancels a wait operation and frees the resources associated with calling
3241 * RtlRegisterWait().
3244 * WaitObject [I] Handle to the wait object to free.
3247 * Success: STATUS_SUCCESS.
3248 * Failure: Any NTSTATUS code.
3250 NTSTATUS WINAPI
RtlDeregisterWaitEx( HANDLE handle
, HANDLE event
)
3252 struct threadpool_object
*object
= handle
;
3255 TRACE( "handle %p, event %p\n", handle
, event
);
3257 if (!object
) return STATUS_INVALID_HANDLE
;
3259 TpSetWait( (TP_WAIT
*)object
, NULL
, NULL
);
3261 if (event
== INVALID_HANDLE_VALUE
) TpWaitForWait( (TP_WAIT
*)object
, TRUE
);
3264 assert( object
->completed_event
== NULL
);
3265 object
->completed_event
= event
;
3268 RtlEnterCriticalSection( &object
->pool
->cs
);
3269 if (object
->num_pending_callbacks
+ object
->num_running_callbacks
3270 + object
->num_associated_callbacks
) status
= STATUS_PENDING
;
3271 else status
= STATUS_SUCCESS
;
3272 RtlLeaveCriticalSection( &object
->pool
->cs
);
3274 TpReleaseWait( (TP_WAIT
*)object
);
3278 /***********************************************************************
3279 * RtlDeregisterWait (NTDLL.@)
3281 * Cancels a wait operation and frees the resources associated with calling
3282 * RtlRegisterWait().
3285 * WaitObject [I] Handle to the wait object to free.
3288 * Success: STATUS_SUCCESS.
3289 * Failure: Any NTSTATUS code.
3291 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
3293 return RtlDeregisterWaitEx(WaitHandle
, NULL
);