msvcrt: Use fpclass constants from public header.
[wine/zf.git] / dlls / ntdll / threadpool.c
blob1d64bd82bf4954775db7d7773bd37e0970dac75c
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include <assert.h>
23 #include <stdarg.h>
24 #include <limits.h>
26 #define NONAMELESSUNION
27 #include "ntstatus.h"
28 #define WIN32_NO_STATUS
29 #include "winternl.h"
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
39 * Old thread pooling API
42 struct rtl_work_item
44 PRTL_WORK_ITEM_ROUTINE function;
45 PVOID context;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
53 static struct
55 HANDLE compl_port;
56 RTL_CRITICAL_SECTION threadpool_compl_cs;
58 old_threadpool =
60 NULL, /* compl_port */
61 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
66 0, 0, &old_threadpool.threadpool_compl_cs,
67 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
68 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
71 struct wait_work_item
73 HANDLE Object;
74 HANDLE CancelEvent;
75 WAITORTIMERCALLBACK Callback;
76 PVOID Context;
77 ULONG Milliseconds;
78 ULONG Flags;
79 HANDLE CompletionEvent;
80 LONG DeleteCount;
81 int CallbackInProgress;
84 struct timer_queue;
85 struct queue_timer
87 struct timer_queue *q;
88 struct list entry;
89 ULONG runcount; /* number of callbacks pending execution */
90 RTL_WAITORTIMERCALLBACKFUNC callback;
91 PVOID param;
92 DWORD period;
93 ULONG flags;
94 ULONGLONG expire;
95 BOOL destroy; /* timer should be deleted; once set, never unset */
96 HANDLE event; /* removal event */
99 struct timer_queue
101 DWORD magic;
102 RTL_CRITICAL_SECTION cs;
103 struct list timers; /* sorted by expiration time */
104 BOOL quit; /* queue should be deleted; once set, never unset */
105 HANDLE event;
106 HANDLE thread;
110 * Object-oriented thread pooling API
113 #define THREADPOOL_WORKER_TIMEOUT 5000
114 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
116 /* internal threadpool representation */
117 struct threadpool
119 LONG refcount;
120 LONG objcount;
121 BOOL shutdown;
122 CRITICAL_SECTION cs;
123 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
124 struct list pools[3];
125 RTL_CONDITION_VARIABLE update_event;
126 /* information about worker threads, locked via .cs */
127 int max_workers;
128 int min_workers;
129 int num_workers;
130 int num_busy_workers;
131 HANDLE compl_port;
132 TP_POOL_STACK_INFORMATION stack_info;
135 enum threadpool_objtype
137 TP_OBJECT_TYPE_SIMPLE,
138 TP_OBJECT_TYPE_WORK,
139 TP_OBJECT_TYPE_TIMER,
140 TP_OBJECT_TYPE_WAIT,
141 TP_OBJECT_TYPE_IO,
144 struct io_completion
146 IO_STATUS_BLOCK iosb;
147 ULONG_PTR cvalue;
150 /* internal threadpool object representation */
151 struct threadpool_object
153 void *win32_callback; /* leave space for kernelbase to store win32 callback */
154 LONG refcount;
155 BOOL shutdown;
156 /* read-only information */
157 enum threadpool_objtype type;
158 struct threadpool *pool;
159 struct threadpool_group *group;
160 PVOID userdata;
161 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
162 PTP_SIMPLE_CALLBACK finalization_callback;
163 BOOL may_run_long;
164 HMODULE race_dll;
165 TP_CALLBACK_PRIORITY priority;
166 /* information about the group, locked via .group->cs */
167 struct list group_entry;
168 BOOL is_group_member;
169 /* information about the pool, locked via .pool->cs */
170 struct list pool_entry;
171 RTL_CONDITION_VARIABLE finished_event;
172 RTL_CONDITION_VARIABLE group_finished_event;
173 LONG num_pending_callbacks;
174 LONG num_running_callbacks;
175 LONG num_associated_callbacks;
176 /* arguments for callback */
177 union
179 struct
181 PTP_SIMPLE_CALLBACK callback;
182 } simple;
183 struct
185 PTP_WORK_CALLBACK callback;
186 } work;
187 struct
189 PTP_TIMER_CALLBACK callback;
190 /* information about the timer, locked via timerqueue.cs */
191 BOOL timer_initialized;
192 BOOL timer_pending;
193 struct list timer_entry;
194 BOOL timer_set;
195 ULONGLONG timeout;
196 LONG period;
197 LONG window_length;
198 } timer;
199 struct
201 PTP_WAIT_CALLBACK callback;
202 LONG signaled;
203 /* information about the wait object, locked via waitqueue.cs */
204 struct waitqueue_bucket *bucket;
205 BOOL wait_pending;
206 struct list wait_entry;
207 ULONGLONG timeout;
208 HANDLE handle;
209 } wait;
210 struct
212 PTP_IO_CALLBACK callback;
213 /* locked via .pool->cs */
214 unsigned int pending_count, completion_count, completion_max;
215 struct io_completion *completions;
216 } io;
217 } u;
220 /* internal threadpool instance representation */
221 struct threadpool_instance
223 struct threadpool_object *object;
224 DWORD threadid;
225 BOOL associated;
226 BOOL may_run_long;
227 struct
229 CRITICAL_SECTION *critical_section;
230 HANDLE mutex;
231 HANDLE semaphore;
232 LONG semaphore_count;
233 HANDLE event;
234 HMODULE library;
235 } cleanup;
238 /* internal threadpool group representation */
239 struct threadpool_group
241 LONG refcount;
242 BOOL shutdown;
243 CRITICAL_SECTION cs;
244 /* list of group members, locked via .cs */
245 struct list members;
248 /* global timerqueue object */
249 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
251 static struct
253 CRITICAL_SECTION cs;
254 LONG objcount;
255 BOOL thread_running;
256 struct list pending_timers;
257 RTL_CONDITION_VARIABLE update_event;
259 timerqueue =
261 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
262 0, /* objcount */
263 FALSE, /* thread_running */
264 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
265 RTL_CONDITION_VARIABLE_INIT /* update_event */
268 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
270 0, 0, &timerqueue.cs,
271 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
272 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
275 /* global waitqueue object */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
278 static struct
280 CRITICAL_SECTION cs;
281 LONG num_buckets;
282 struct list buckets;
284 waitqueue =
286 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
287 0, /* num_buckets */
288 LIST_INIT( waitqueue.buckets ) /* buckets */
291 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
293 0, 0, &waitqueue.cs,
294 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
295 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
298 struct waitqueue_bucket
300 struct list bucket_entry;
301 LONG objcount;
302 struct list reserved;
303 struct list waiting;
304 HANDLE update_event;
307 /* global I/O completion queue object */
308 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
310 static struct
312 CRITICAL_SECTION cs;
313 LONG objcount;
314 BOOL thread_running;
315 HANDLE port;
316 RTL_CONDITION_VARIABLE update_event;
318 ioqueue =
320 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
323 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
325 0, 0, &ioqueue.cs,
326 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
327 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
330 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
332 return (struct threadpool *)pool;
335 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
337 struct threadpool_object *object = (struct threadpool_object *)work;
338 assert( object->type == TP_OBJECT_TYPE_WORK );
339 return object;
342 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
344 struct threadpool_object *object = (struct threadpool_object *)timer;
345 assert( object->type == TP_OBJECT_TYPE_TIMER );
346 return object;
349 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
351 struct threadpool_object *object = (struct threadpool_object *)wait;
352 assert( object->type == TP_OBJECT_TYPE_WAIT );
353 return object;
356 static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
358 struct threadpool_object *object = (struct threadpool_object *)io;
359 assert( object->type == TP_OBJECT_TYPE_IO );
360 return object;
363 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
365 return (struct threadpool_group *)group;
368 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
370 return (struct threadpool_instance *)instance;
373 static void CALLBACK threadpool_worker_proc( void *param );
374 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
375 static void tp_object_prepare_shutdown( struct threadpool_object *object );
376 static BOOL tp_object_release( struct threadpool_object *object );
377 static struct threadpool *default_threadpool = NULL;
379 static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
381 unsigned int new_capacity, max_capacity;
382 void *new_elements;
384 if (count <= *capacity)
385 return TRUE;
387 max_capacity = ~(SIZE_T)0 / size;
388 if (count > max_capacity)
389 return FALSE;
391 new_capacity = max(4, *capacity);
392 while (new_capacity < count && new_capacity <= max_capacity / 2)
393 new_capacity *= 2;
394 if (new_capacity < count)
395 new_capacity = max_capacity;
397 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
398 return FALSE;
400 *elements = new_elements;
401 *capacity = new_capacity;
403 return TRUE;
406 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
408 struct rtl_work_item *item = userdata;
410 TRACE("executing %p(%p)\n", item->function, item->context);
411 item->function( item->context );
413 RtlFreeHeap( GetProcessHeap(), 0, item );
416 /***********************************************************************
417 * RtlQueueWorkItem (NTDLL.@)
419 * Queues a work item into a thread in the thread pool.
421 * PARAMS
422 * function [I] Work function to execute.
423 * context [I] Context to pass to the work function when it is executed.
424 * flags [I] Flags. See notes.
426 * RETURNS
427 * Success: STATUS_SUCCESS.
428 * Failure: Any NTSTATUS code.
430 * NOTES
431 * Flags can be one or more of the following:
432 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
433 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
434 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
435 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
436 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
438 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
440 TP_CALLBACK_ENVIRON environment;
441 struct rtl_work_item *item;
442 NTSTATUS status;
444 TRACE( "%p %p %u\n", function, context, flags );
446 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
447 if (!item)
448 return STATUS_NO_MEMORY;
450 memset( &environment, 0, sizeof(environment) );
451 environment.Version = 1;
452 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
453 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
455 item->function = function;
456 item->context = context;
458 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
459 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
460 return status;
463 /***********************************************************************
464 * iocp_poller - get completion events and run callbacks
466 static DWORD CALLBACK iocp_poller(LPVOID Arg)
468 HANDLE cport = Arg;
470 while( TRUE )
472 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
473 LPVOID overlapped;
474 IO_STATUS_BLOCK iosb;
475 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
476 if (res)
478 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
480 else
482 DWORD transferred = 0;
483 DWORD err = 0;
485 if (iosb.u.Status == STATUS_SUCCESS)
486 transferred = iosb.Information;
487 else
488 err = RtlNtStatusToDosError(iosb.u.Status);
490 callback( err, transferred, overlapped );
493 return 0;
496 /***********************************************************************
497 * RtlSetIoCompletionCallback (NTDLL.@)
499 * Binds a handle to a thread pool's completion port, and possibly
500 * starts a non-I/O thread to monitor this port and call functions back.
502 * PARAMS
503 * FileHandle [I] Handle to bind to a completion port.
504 * Function [I] Callback function to call on I/O completions.
505 * Flags [I] Not used.
507 * RETURNS
508 * Success: STATUS_SUCCESS.
509 * Failure: Any NTSTATUS code.
512 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
514 IO_STATUS_BLOCK iosb;
515 FILE_COMPLETION_INFORMATION info;
517 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
519 if (!old_threadpool.compl_port)
521 NTSTATUS res = STATUS_SUCCESS;
523 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
524 if (!old_threadpool.compl_port)
526 HANDLE cport;
528 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
529 if (!res)
531 /* FIXME native can start additional threads in case of e.g. hung callback function. */
532 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
533 if (!res)
534 old_threadpool.compl_port = cport;
535 else
536 NtClose( cport );
539 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
540 if (res) return res;
543 info.CompletionPort = old_threadpool.compl_port;
544 info.CompletionKey = (ULONG_PTR)Function;
546 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
549 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
551 if (timeout == INFINITE) return NULL;
552 pTime->QuadPart = (ULONGLONG)timeout * -10000;
553 return pTime;
556 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
558 NtClose( wait_work_item->CancelEvent );
559 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
562 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
564 struct wait_work_item *wait_work_item = Arg;
565 NTSTATUS status;
566 BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0;
567 HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
568 LARGE_INTEGER timeout;
569 HANDLE completion_event;
571 TRACE("\n");
573 while (TRUE)
575 status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable,
576 get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
577 if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
579 BOOLEAN TimerOrWaitFired;
581 if (status == STATUS_WAIT_0)
583 TRACE( "object %p signaled, calling callback %p with context %p\n",
584 wait_work_item->Object, wait_work_item->Callback,
585 wait_work_item->Context );
586 TimerOrWaitFired = FALSE;
588 else
590 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
591 wait_work_item->Object, wait_work_item->Callback,
592 wait_work_item->Context );
593 TimerOrWaitFired = TRUE;
595 InterlockedExchange( &wait_work_item->CallbackInProgress, TRUE );
596 if (wait_work_item->CompletionEvent)
598 TRACE( "Work has been canceled.\n" );
599 break;
601 wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
602 InterlockedExchange( &wait_work_item->CallbackInProgress, FALSE );
604 if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
605 break;
607 else if (status != STATUS_USER_APC)
608 break;
612 if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 )
614 completion_event = wait_work_item->CompletionEvent;
615 delete_wait_work_item( wait_work_item );
616 if (completion_event && completion_event != INVALID_HANDLE_VALUE)
617 NtSetEvent( completion_event, NULL );
620 return 0;
623 /***********************************************************************
624 * RtlRegisterWait (NTDLL.@)
626 * Registers a wait for a handle to become signaled.
628 * PARAMS
629 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
630 * Object [I] Object to wait to become signaled.
631 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
632 * Context [I] Context to pass to the callback function when it is executed.
633 * Milliseconds [I] Number of milliseconds to wait before timing out.
634 * Flags [I] Flags. See notes.
636 * RETURNS
637 * Success: STATUS_SUCCESS.
638 * Failure: Any NTSTATUS code.
640 * NOTES
641 * Flags can be one or more of the following:
642 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
643 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
644 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
645 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
646 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
648 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
649 RTL_WAITORTIMERCALLBACKFUNC Callback,
650 PVOID Context, ULONG Milliseconds, ULONG Flags)
652 struct wait_work_item *wait_work_item;
653 NTSTATUS status;
655 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
657 wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
658 if (!wait_work_item)
659 return STATUS_NO_MEMORY;
661 wait_work_item->Object = Object;
662 wait_work_item->Callback = Callback;
663 wait_work_item->Context = Context;
664 wait_work_item->Milliseconds = Milliseconds;
665 wait_work_item->Flags = Flags;
666 wait_work_item->CallbackInProgress = FALSE;
667 wait_work_item->DeleteCount = 0;
668 wait_work_item->CompletionEvent = NULL;
670 status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
671 if (status != STATUS_SUCCESS)
673 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
674 return status;
677 Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
678 WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
679 status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags );
680 if (status != STATUS_SUCCESS)
682 delete_wait_work_item( wait_work_item );
683 return status;
686 *NewWaitObject = wait_work_item;
687 return status;
690 /***********************************************************************
691 * RtlDeregisterWaitEx (NTDLL.@)
693 * Cancels a wait operation and frees the resources associated with calling
694 * RtlRegisterWait().
696 * PARAMS
697 * WaitObject [I] Handle to the wait object to free.
699 * RETURNS
700 * Success: STATUS_SUCCESS.
701 * Failure: Any NTSTATUS code.
703 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
705 struct wait_work_item *wait_work_item = WaitHandle;
706 NTSTATUS status;
707 HANDLE LocalEvent = NULL;
708 int CallbackInProgress;
710 TRACE( "(%p %p)\n", WaitHandle, CompletionEvent );
712 if (WaitHandle == NULL)
713 return STATUS_INVALID_HANDLE;
715 InterlockedExchangePointer( &wait_work_item->CompletionEvent, INVALID_HANDLE_VALUE );
716 CallbackInProgress = wait_work_item->CallbackInProgress;
717 TRACE( "callback in progress %u\n", CallbackInProgress );
718 if (CompletionEvent == INVALID_HANDLE_VALUE || !CallbackInProgress)
720 status = NtCreateEvent( &LocalEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
721 if (status != STATUS_SUCCESS)
722 return status;
723 InterlockedExchangePointer( &wait_work_item->CompletionEvent, LocalEvent );
725 else if (CompletionEvent != NULL)
727 InterlockedExchangePointer( &wait_work_item->CompletionEvent, CompletionEvent );
730 NtSetEvent( wait_work_item->CancelEvent, NULL );
732 if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 )
734 status = STATUS_SUCCESS;
735 delete_wait_work_item( wait_work_item );
737 else if (LocalEvent)
739 TRACE( "Waiting for completion event\n" );
740 NtWaitForSingleObject( LocalEvent, FALSE, NULL );
741 status = STATUS_SUCCESS;
743 else
745 status = STATUS_PENDING;
748 if (LocalEvent)
749 NtClose( LocalEvent );
751 return status;
754 /***********************************************************************
755 * RtlDeregisterWait (NTDLL.@)
757 * Cancels a wait operation and frees the resources associated with calling
758 * RtlRegisterWait().
760 * PARAMS
761 * WaitObject [I] Handle to the wait object to free.
763 * RETURNS
764 * Success: STATUS_SUCCESS.
765 * Failure: Any NTSTATUS code.
767 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
769 return RtlDeregisterWaitEx(WaitHandle, NULL);
773 /************************** Timer Queue Impl **************************/
775 static void queue_remove_timer(struct queue_timer *t)
777 /* We MUST hold the queue cs while calling this function. This ensures
778 that we cannot queue another callback for this timer. The runcount
779 being zero makes sure we don't have any already queued. */
780 struct timer_queue *q = t->q;
782 assert(t->runcount == 0);
783 assert(t->destroy);
785 list_remove(&t->entry);
786 if (t->event)
787 NtSetEvent(t->event, NULL);
788 RtlFreeHeap(GetProcessHeap(), 0, t);
790 if (q->quit && list_empty(&q->timers))
791 NtSetEvent(q->event, NULL);
794 static void timer_cleanup_callback(struct queue_timer *t)
796 struct timer_queue *q = t->q;
797 RtlEnterCriticalSection(&q->cs);
799 assert(0 < t->runcount);
800 --t->runcount;
802 if (t->destroy && t->runcount == 0)
803 queue_remove_timer(t);
805 RtlLeaveCriticalSection(&q->cs);
808 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
810 struct queue_timer *t = p;
811 t->callback(t->param, TRUE);
812 timer_cleanup_callback(t);
813 return 0;
816 static inline ULONGLONG queue_current_time(void)
818 LARGE_INTEGER now, freq;
819 NtQueryPerformanceCounter(&now, &freq);
820 return now.QuadPart * 1000 / freq.QuadPart;
823 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
824 BOOL set_event)
826 /* We MUST hold the queue cs while calling this function. */
827 struct timer_queue *q = t->q;
828 struct list *ptr = &q->timers;
830 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
832 if (time != EXPIRE_NEVER)
833 LIST_FOR_EACH(ptr, &q->timers)
835 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
836 if (time < cur->expire)
837 break;
839 list_add_before(ptr, &t->entry);
841 t->expire = time;
843 /* If we insert at the head of the list, we need to expire sooner
844 than expected. */
845 if (set_event && &t->entry == list_head(&q->timers))
846 NtSetEvent(q->event, NULL);
849 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
850 BOOL set_event)
852 /* We MUST hold the queue cs while calling this function. */
853 list_remove(&t->entry);
854 queue_add_timer(t, time, set_event);
857 static void queue_timer_expire(struct timer_queue *q)
859 struct queue_timer *t = NULL;
861 RtlEnterCriticalSection(&q->cs);
862 if (list_head(&q->timers))
864 ULONGLONG now, next;
865 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
866 if (!t->destroy && t->expire <= ((now = queue_current_time())))
868 ++t->runcount;
869 if (t->period)
871 next = t->expire + t->period;
872 /* avoid trigger cascade if overloaded / hibernated */
873 if (next < now)
874 next = now + t->period;
876 else
877 next = EXPIRE_NEVER;
878 queue_move_timer(t, next, FALSE);
880 else
881 t = NULL;
883 RtlLeaveCriticalSection(&q->cs);
885 if (t)
887 if (t->flags & WT_EXECUTEINTIMERTHREAD)
888 timer_callback_wrapper(t);
889 else
891 ULONG flags
892 = (t->flags
893 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
894 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
895 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
896 if (status != STATUS_SUCCESS)
897 timer_cleanup_callback(t);
902 static ULONG queue_get_timeout(struct timer_queue *q)
904 struct queue_timer *t;
905 ULONG timeout = INFINITE;
907 RtlEnterCriticalSection(&q->cs);
908 if (list_head(&q->timers))
910 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
911 assert(!t->destroy || t->expire == EXPIRE_NEVER);
913 if (t->expire != EXPIRE_NEVER)
915 ULONGLONG time = queue_current_time();
916 timeout = t->expire < time ? 0 : t->expire - time;
919 RtlLeaveCriticalSection(&q->cs);
921 return timeout;
924 static void WINAPI timer_queue_thread_proc(LPVOID p)
926 struct timer_queue *q = p;
927 ULONG timeout_ms;
929 timeout_ms = INFINITE;
930 for (;;)
932 LARGE_INTEGER timeout;
933 NTSTATUS status;
934 BOOL done = FALSE;
936 status = NtWaitForSingleObject(
937 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
939 if (status == STATUS_WAIT_0)
941 /* There are two possible ways to trigger the event. Either
942 we are quitting and the last timer got removed, or a new
943 timer got put at the head of the list so we need to adjust
944 our timeout. */
945 RtlEnterCriticalSection(&q->cs);
946 if (q->quit && list_empty(&q->timers))
947 done = TRUE;
948 RtlLeaveCriticalSection(&q->cs);
950 else if (status == STATUS_TIMEOUT)
951 queue_timer_expire(q);
953 if (done)
954 break;
956 timeout_ms = queue_get_timeout(q);
959 NtClose(q->event);
960 RtlDeleteCriticalSection(&q->cs);
961 q->magic = 0;
962 RtlFreeHeap(GetProcessHeap(), 0, q);
963 RtlExitUserThread( 0 );
966 static void queue_destroy_timer(struct queue_timer *t)
968 /* We MUST hold the queue cs while calling this function. */
969 t->destroy = TRUE;
970 if (t->runcount == 0)
971 /* Ensure a timer is promptly removed. If callbacks are pending,
972 it will be removed after the last one finishes by the callback
973 cleanup wrapper. */
974 queue_remove_timer(t);
975 else
976 /* Make sure no destroyed timer masks an active timer at the head
977 of the sorted list. */
978 queue_move_timer(t, EXPIRE_NEVER, FALSE);
981 /***********************************************************************
982 * RtlCreateTimerQueue (NTDLL.@)
984 * Creates a timer queue object and returns a handle to it.
986 * PARAMS
987 * NewTimerQueue [O] The newly created queue.
989 * RETURNS
990 * Success: STATUS_SUCCESS.
991 * Failure: Any NTSTATUS code.
993 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
995 NTSTATUS status;
996 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
997 if (!q)
998 return STATUS_NO_MEMORY;
1000 RtlInitializeCriticalSection(&q->cs);
1001 list_init(&q->timers);
1002 q->quit = FALSE;
1003 q->magic = TIMER_QUEUE_MAGIC;
1004 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1005 if (status != STATUS_SUCCESS)
1007 RtlFreeHeap(GetProcessHeap(), 0, q);
1008 return status;
1010 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1011 timer_queue_thread_proc, q, &q->thread, NULL);
1012 if (status != STATUS_SUCCESS)
1014 NtClose(q->event);
1015 RtlFreeHeap(GetProcessHeap(), 0, q);
1016 return status;
1019 *NewTimerQueue = q;
1020 return STATUS_SUCCESS;
1023 /***********************************************************************
1024 * RtlDeleteTimerQueueEx (NTDLL.@)
1026 * Deletes a timer queue object.
1028 * PARAMS
1029 * TimerQueue [I] The timer queue to destroy.
1030 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1031 * wait until all timers are finished firing before
1032 * returning. Otherwise, return immediately and set the
1033 * event when all timers are done.
1035 * RETURNS
1036 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
1037 * Failure: Any NTSTATUS code.
1039 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
1041 struct timer_queue *q = TimerQueue;
1042 struct queue_timer *t, *temp;
1043 HANDLE thread;
1044 NTSTATUS status;
1046 if (!q || q->magic != TIMER_QUEUE_MAGIC)
1047 return STATUS_INVALID_HANDLE;
1049 thread = q->thread;
1051 RtlEnterCriticalSection(&q->cs);
1052 q->quit = TRUE;
1053 if (list_head(&q->timers))
1054 /* When the last timer is removed, it will signal the timer thread to
1055 exit... */
1056 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
1057 queue_destroy_timer(t);
1058 else
1059 /* However if we have none, we must do it ourselves. */
1060 NtSetEvent(q->event, NULL);
1061 RtlLeaveCriticalSection(&q->cs);
1063 if (CompletionEvent == INVALID_HANDLE_VALUE)
1065 NtWaitForSingleObject(thread, FALSE, NULL);
1066 status = STATUS_SUCCESS;
1068 else
1070 if (CompletionEvent)
1072 FIXME("asynchronous return on completion event unimplemented\n");
1073 NtWaitForSingleObject(thread, FALSE, NULL);
1074 NtSetEvent(CompletionEvent, NULL);
1076 status = STATUS_PENDING;
1079 NtClose(thread);
1080 return status;
1083 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
1085 static struct timer_queue *default_timer_queue;
1087 if (TimerQueue)
1088 return TimerQueue;
1089 else
1091 if (!default_timer_queue)
1093 HANDLE q;
1094 NTSTATUS status = RtlCreateTimerQueue(&q);
1095 if (status == STATUS_SUCCESS)
1097 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
1098 if (p)
1099 /* Got beat to the punch. */
1100 RtlDeleteTimerQueueEx(q, NULL);
1103 return default_timer_queue;
1107 /***********************************************************************
1108 * RtlCreateTimer (NTDLL.@)
1110 * Creates a new timer associated with the given queue.
1112 * PARAMS
1113 * NewTimer [O] The newly created timer.
1114 * TimerQueue [I] The queue to hold the timer.
1115 * Callback [I] The callback to fire.
1116 * Parameter [I] The argument for the callback.
1117 * DueTime [I] The delay, in milliseconds, before first firing the
1118 * timer.
1119 * Period [I] The period, in milliseconds, at which to fire the timer
1120 * after the first callback. If zero, the timer will only
1121 * fire once. It still needs to be deleted with
1122 * RtlDeleteTimer.
1123 * Flags [I] Flags controlling the execution of the callback. In
1124 * addition to the WT_* thread pool flags (see
1125 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1126 * WT_EXECUTEONLYONCE are supported.
1128 * RETURNS
1129 * Success: STATUS_SUCCESS.
1130 * Failure: Any NTSTATUS code.
1132 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
1133 RTL_WAITORTIMERCALLBACKFUNC Callback,
1134 PVOID Parameter, DWORD DueTime, DWORD Period,
1135 ULONG Flags)
1137 NTSTATUS status;
1138 struct queue_timer *t;
1139 struct timer_queue *q = get_timer_queue(TimerQueue);
1141 if (!q) return STATUS_NO_MEMORY;
1142 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1144 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1145 if (!t)
1146 return STATUS_NO_MEMORY;
1148 t->q = q;
1149 t->runcount = 0;
1150 t->callback = Callback;
1151 t->param = Parameter;
1152 t->period = Period;
1153 t->flags = Flags;
1154 t->destroy = FALSE;
1155 t->event = NULL;
1157 status = STATUS_SUCCESS;
1158 RtlEnterCriticalSection(&q->cs);
1159 if (q->quit)
1160 status = STATUS_INVALID_HANDLE;
1161 else
1162 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1163 RtlLeaveCriticalSection(&q->cs);
1165 if (status == STATUS_SUCCESS)
1166 *NewTimer = t;
1167 else
1168 RtlFreeHeap(GetProcessHeap(), 0, t);
1170 return status;
1173 /***********************************************************************
1174 * RtlUpdateTimer (NTDLL.@)
1176 * Changes the time at which a timer expires.
1178 * PARAMS
1179 * TimerQueue [I] The queue that holds the timer.
1180 * Timer [I] The timer to update.
1181 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1182 * Period [I] The period, in milliseconds, at which to fire the timer
1183 * after the first callback. If zero, the timer will not
1184 * refire once. It still needs to be deleted with
1185 * RtlDeleteTimer.
1187 * RETURNS
1188 * Success: STATUS_SUCCESS.
1189 * Failure: Any NTSTATUS code.
1191 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1192 DWORD DueTime, DWORD Period)
1194 struct queue_timer *t = Timer;
1195 struct timer_queue *q = t->q;
1197 RtlEnterCriticalSection(&q->cs);
1198 /* Can't change a timer if it was once-only or destroyed. */
1199 if (t->expire != EXPIRE_NEVER)
1201 t->period = Period;
1202 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1204 RtlLeaveCriticalSection(&q->cs);
1206 return STATUS_SUCCESS;
1209 /***********************************************************************
1210 * RtlDeleteTimer (NTDLL.@)
1212 * Cancels a timer-queue timer.
1214 * PARAMS
1215 * TimerQueue [I] The queue that holds the timer.
1216 * Timer [I] The timer to update.
1217 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1218 * wait until the timer is finished firing all pending
1219 * callbacks before returning. Otherwise, return
1220 * immediately and set the timer is done.
1222 * RETURNS
1223 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1224 or if the completion event is NULL.
1225 * Failure: Any NTSTATUS code.
1227 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1228 HANDLE CompletionEvent)
1230 struct queue_timer *t = Timer;
1231 struct timer_queue *q;
1232 NTSTATUS status = STATUS_PENDING;
1233 HANDLE event = NULL;
1235 if (!Timer)
1236 return STATUS_INVALID_PARAMETER_1;
1237 q = t->q;
1238 if (CompletionEvent == INVALID_HANDLE_VALUE)
1240 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1241 if (status == STATUS_SUCCESS)
1242 status = STATUS_PENDING;
1244 else if (CompletionEvent)
1245 event = CompletionEvent;
1247 RtlEnterCriticalSection(&q->cs);
1248 t->event = event;
1249 if (t->runcount == 0 && event)
1250 status = STATUS_SUCCESS;
1251 queue_destroy_timer(t);
1252 RtlLeaveCriticalSection(&q->cs);
1254 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1256 if (status == STATUS_PENDING)
1258 NtWaitForSingleObject(event, FALSE, NULL);
1259 status = STATUS_SUCCESS;
1261 NtClose(event);
1264 return status;
1267 /***********************************************************************
1268 * timerqueue_thread_proc (internal)
1270 static void CALLBACK timerqueue_thread_proc( void *param )
1272 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1273 struct threadpool_object *other_timer;
1274 LARGE_INTEGER now, timeout;
1275 struct list *ptr;
1277 TRACE( "starting timer queue thread\n" );
1279 RtlEnterCriticalSection( &timerqueue.cs );
1280 for (;;)
1282 NtQuerySystemTime( &now );
1284 /* Check for expired timers. */
1285 while ((ptr = list_head( &timerqueue.pending_timers )))
1287 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1288 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1289 assert( timer->u.timer.timer_pending );
1290 if (timer->u.timer.timeout > now.QuadPart)
1291 break;
1293 /* Queue a new callback in one of the worker threads. */
1294 list_remove( &timer->u.timer.timer_entry );
1295 timer->u.timer.timer_pending = FALSE;
1296 tp_object_submit( timer, FALSE );
1298 /* Insert the timer back into the queue, except it's marked for shutdown. */
1299 if (timer->u.timer.period && !timer->shutdown)
1301 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1302 if (timer->u.timer.timeout <= now.QuadPart)
1303 timer->u.timer.timeout = now.QuadPart + 1;
1305 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1306 struct threadpool_object, u.timer.timer_entry )
1308 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1309 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1310 break;
1312 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1313 timer->u.timer.timer_pending = TRUE;
1317 timeout_lower = TIMEOUT_INFINITE;
1318 timeout_upper = TIMEOUT_INFINITE;
1320 /* Determine next timeout and use the window length to optimize wakeup times. */
1321 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1322 struct threadpool_object, u.timer.timer_entry )
1324 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1325 if (other_timer->u.timer.timeout >= timeout_upper)
1326 break;
1328 timeout_lower = other_timer->u.timer.timeout;
1329 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1330 if (new_timeout < timeout_upper)
1331 timeout_upper = new_timeout;
1334 /* Wait for timer update events or until the next timer expires. */
1335 if (timerqueue.objcount)
1337 timeout.QuadPart = timeout_lower;
1338 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1339 continue;
1342 /* All timers have been destroyed, if no new timers are created
1343 * within some amount of time, then we can shutdown this thread. */
1344 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1345 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1346 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1348 break;
1352 timerqueue.thread_running = FALSE;
1353 RtlLeaveCriticalSection( &timerqueue.cs );
1355 TRACE( "terminating timer queue thread\n" );
1356 RtlExitUserThread( 0 );
1359 /***********************************************************************
1360 * tp_new_worker_thread (internal)
1362 * Create and account a new worker thread for the desired pool.
1364 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1366 HANDLE thread;
1367 NTSTATUS status;
1369 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1370 threadpool_worker_proc, pool, &thread, NULL );
1371 if (status == STATUS_SUCCESS)
1373 InterlockedIncrement( &pool->refcount );
1374 pool->num_workers++;
1375 NtClose( thread );
1377 return status;
1380 /***********************************************************************
1381 * tp_timerqueue_lock (internal)
1383 * Acquires a lock on the global timerqueue. When the lock is acquired
1384 * successfully, it is guaranteed that the timer thread is running.
1386 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1388 NTSTATUS status = STATUS_SUCCESS;
1389 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1391 timer->u.timer.timer_initialized = FALSE;
1392 timer->u.timer.timer_pending = FALSE;
1393 timer->u.timer.timer_set = FALSE;
1394 timer->u.timer.timeout = 0;
1395 timer->u.timer.period = 0;
1396 timer->u.timer.window_length = 0;
1398 RtlEnterCriticalSection( &timerqueue.cs );
1400 /* Make sure that the timerqueue thread is running. */
1401 if (!timerqueue.thread_running)
1403 HANDLE thread;
1404 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1405 timerqueue_thread_proc, NULL, &thread, NULL );
1406 if (status == STATUS_SUCCESS)
1408 timerqueue.thread_running = TRUE;
1409 NtClose( thread );
1413 if (status == STATUS_SUCCESS)
1415 timer->u.timer.timer_initialized = TRUE;
1416 timerqueue.objcount++;
1419 RtlLeaveCriticalSection( &timerqueue.cs );
1420 return status;
1423 /***********************************************************************
1424 * tp_timerqueue_unlock (internal)
1426 * Releases a lock on the global timerqueue.
1428 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1430 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1432 RtlEnterCriticalSection( &timerqueue.cs );
1433 if (timer->u.timer.timer_initialized)
1435 /* If timer was pending, remove it. */
1436 if (timer->u.timer.timer_pending)
1438 list_remove( &timer->u.timer.timer_entry );
1439 timer->u.timer.timer_pending = FALSE;
1442 /* If the last timer object was destroyed, then wake up the thread. */
1443 if (!--timerqueue.objcount)
1445 assert( list_empty( &timerqueue.pending_timers ) );
1446 RtlWakeAllConditionVariable( &timerqueue.update_event );
1449 timer->u.timer.timer_initialized = FALSE;
1451 RtlLeaveCriticalSection( &timerqueue.cs );
1454 /***********************************************************************
1455 * waitqueue_thread_proc (internal)
1457 static void CALLBACK waitqueue_thread_proc( void *param )
1459 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1460 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1461 struct waitqueue_bucket *bucket = param;
1462 struct threadpool_object *wait, *next;
1463 LARGE_INTEGER now, timeout;
1464 DWORD num_handles;
1465 NTSTATUS status;
1467 TRACE( "starting wait queue thread\n" );
1469 RtlEnterCriticalSection( &waitqueue.cs );
1471 for (;;)
1473 NtQuerySystemTime( &now );
1474 timeout.QuadPart = TIMEOUT_INFINITE;
1475 num_handles = 0;
1477 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1478 u.wait.wait_entry )
1480 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1481 if (wait->u.wait.timeout <= now.QuadPart)
1483 /* Wait object timed out. */
1484 list_remove( &wait->u.wait.wait_entry );
1485 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1486 tp_object_submit( wait, FALSE );
1488 else
1490 if (wait->u.wait.timeout < timeout.QuadPart)
1491 timeout.QuadPart = wait->u.wait.timeout;
1493 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1494 InterlockedIncrement( &wait->refcount );
1495 objects[num_handles] = wait;
1496 handles[num_handles] = wait->u.wait.handle;
1497 num_handles++;
1501 if (!bucket->objcount)
1503 /* All wait objects have been destroyed, if no new wait objects are created
1504 * within some amount of time, then we can shutdown this thread. */
1505 assert( num_handles == 0 );
1506 RtlLeaveCriticalSection( &waitqueue.cs );
1507 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1508 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
1509 RtlEnterCriticalSection( &waitqueue.cs );
1511 if (status == STATUS_TIMEOUT && !bucket->objcount)
1512 break;
1514 else
1516 handles[num_handles] = bucket->update_event;
1517 RtlLeaveCriticalSection( &waitqueue.cs );
1518 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
1519 RtlEnterCriticalSection( &waitqueue.cs );
1521 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1523 wait = objects[status - STATUS_WAIT_0];
1524 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1525 if (wait->u.wait.bucket)
1527 /* Wait object signaled. */
1528 assert( wait->u.wait.bucket == bucket );
1529 list_remove( &wait->u.wait.wait_entry );
1530 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1531 tp_object_submit( wait, TRUE );
1533 else
1534 WARN("wait object %p triggered while object was destroyed\n", wait);
1537 /* Release temporary references to wait objects. */
1538 while (num_handles)
1540 wait = objects[--num_handles];
1541 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1542 tp_object_release( wait );
1546 /* Try to merge bucket with other threads. */
1547 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1548 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1550 struct waitqueue_bucket *other_bucket;
1551 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1553 if (other_bucket != bucket && other_bucket->objcount &&
1554 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1556 other_bucket->objcount += bucket->objcount;
1557 bucket->objcount = 0;
1559 /* Update reserved list. */
1560 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1562 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1563 wait->u.wait.bucket = other_bucket;
1565 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1567 /* Update waiting list. */
1568 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1570 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1571 wait->u.wait.bucket = other_bucket;
1573 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1575 /* Move bucket to the end, to keep the probability of
1576 * newly added wait objects as small as possible. */
1577 list_remove( &bucket->bucket_entry );
1578 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1580 NtSetEvent( other_bucket->update_event, NULL );
1581 break;
1587 /* Remove this bucket from the list. */
1588 list_remove( &bucket->bucket_entry );
1589 if (!--waitqueue.num_buckets)
1590 assert( list_empty( &waitqueue.buckets ) );
1592 RtlLeaveCriticalSection( &waitqueue.cs );
1594 TRACE( "terminating wait queue thread\n" );
1596 assert( bucket->objcount == 0 );
1597 assert( list_empty( &bucket->reserved ) );
1598 assert( list_empty( &bucket->waiting ) );
1599 NtClose( bucket->update_event );
1601 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1602 RtlExitUserThread( 0 );
1605 /***********************************************************************
1606 * tp_waitqueue_lock (internal)
1608 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1610 struct waitqueue_bucket *bucket;
1611 NTSTATUS status;
1612 HANDLE thread;
1613 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1615 wait->u.wait.signaled = 0;
1616 wait->u.wait.bucket = NULL;
1617 wait->u.wait.wait_pending = FALSE;
1618 wait->u.wait.timeout = 0;
1619 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1621 RtlEnterCriticalSection( &waitqueue.cs );
1623 /* Try to assign to existing bucket if possible. */
1624 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1626 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
1628 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1629 wait->u.wait.bucket = bucket;
1630 bucket->objcount++;
1632 status = STATUS_SUCCESS;
1633 goto out;
1637 /* Create a new bucket and corresponding worker thread. */
1638 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1639 if (!bucket)
1641 status = STATUS_NO_MEMORY;
1642 goto out;
1645 bucket->objcount = 0;
1646 list_init( &bucket->reserved );
1647 list_init( &bucket->waiting );
1649 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1650 NULL, SynchronizationEvent, FALSE );
1651 if (status)
1653 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1654 goto out;
1657 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1658 waitqueue_thread_proc, bucket, &thread, NULL );
1659 if (status == STATUS_SUCCESS)
1661 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1662 waitqueue.num_buckets++;
1664 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1665 wait->u.wait.bucket = bucket;
1666 bucket->objcount++;
1668 NtClose( thread );
1670 else
1672 NtClose( bucket->update_event );
1673 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1676 out:
1677 RtlLeaveCriticalSection( &waitqueue.cs );
1678 return status;
1681 /***********************************************************************
1682 * tp_waitqueue_unlock (internal)
1684 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1686 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1688 RtlEnterCriticalSection( &waitqueue.cs );
1689 if (wait->u.wait.bucket)
1691 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1692 assert( bucket->objcount > 0 );
1694 list_remove( &wait->u.wait.wait_entry );
1695 wait->u.wait.bucket = NULL;
1696 bucket->objcount--;
1698 NtSetEvent( bucket->update_event, NULL );
1700 RtlLeaveCriticalSection( &waitqueue.cs );
1703 static void CALLBACK ioqueue_thread_proc( void *param )
1705 struct io_completion *completion;
1706 struct threadpool_object *io;
1707 IO_STATUS_BLOCK iosb;
1708 ULONG_PTR key, value;
1709 NTSTATUS status;
1711 TRACE( "starting I/O completion thread\n" );
1713 RtlEnterCriticalSection( &ioqueue.cs );
1715 for (;;)
1717 RtlLeaveCriticalSection( &ioqueue.cs );
1718 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1719 ERR("NtRemoveIoCompletion failed, status %#x.\n", status);
1720 RtlEnterCriticalSection( &ioqueue.cs );
1722 if (key)
1724 io = (struct threadpool_object *)key;
1726 RtlEnterCriticalSection( &io->pool->cs );
1728 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1729 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1731 ERR("Failed to allocate memory.\n");
1732 RtlLeaveCriticalSection( &io->pool->cs );
1733 continue;
1736 completion = &io->u.io.completions[io->u.io.completion_count++];
1737 completion->iosb = iosb;
1738 completion->cvalue = value;
1740 tp_object_submit( io, FALSE );
1742 RtlLeaveCriticalSection( &io->pool->cs );
1745 if (!ioqueue.objcount)
1747 /* All I/O objects have been destroyed; if no new objects are
1748 * created within some amount of time, then we can shutdown this
1749 * thread. */
1750 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1751 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1752 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1753 break;
1757 RtlLeaveCriticalSection( &ioqueue.cs );
1759 TRACE( "terminating I/O completion thread\n" );
1761 RtlExitUserThread( 0 );
1764 static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1766 NTSTATUS status = STATUS_SUCCESS;
1768 assert( io->type == TP_OBJECT_TYPE_IO );
1770 RtlEnterCriticalSection( &ioqueue.cs );
1772 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1773 IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1775 RtlLeaveCriticalSection( &ioqueue.cs );
1776 return status;
1779 if (!ioqueue.thread_running)
1781 HANDLE thread;
1783 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1784 NULL, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1786 ioqueue.thread_running = TRUE;
1787 NtClose( thread );
1791 if (status == STATUS_SUCCESS)
1793 FILE_COMPLETION_INFORMATION info;
1794 IO_STATUS_BLOCK iosb;
1796 info.CompletionPort = ioqueue.port;
1797 info.CompletionKey = (ULONG_PTR)io;
1799 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1802 if (status == STATUS_SUCCESS)
1804 if (!ioqueue.objcount++)
1805 RtlWakeConditionVariable( &ioqueue.update_event );
1808 RtlLeaveCriticalSection( &ioqueue.cs );
1809 return status;
1812 static void tp_ioqueue_unlock( struct threadpool_object *io )
1814 assert( io->type == TP_OBJECT_TYPE_IO );
1816 RtlEnterCriticalSection( &ioqueue.cs );
1818 if (!--ioqueue.objcount)
1819 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
1821 RtlLeaveCriticalSection( &ioqueue.cs );
1824 /***********************************************************************
1825 * tp_threadpool_alloc (internal)
1827 * Allocates a new threadpool object.
1829 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1831 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1832 struct threadpool *pool;
1833 unsigned int i;
1835 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1836 if (!pool)
1837 return STATUS_NO_MEMORY;
1839 pool->refcount = 1;
1840 pool->objcount = 0;
1841 pool->shutdown = FALSE;
1843 RtlInitializeCriticalSection( &pool->cs );
1844 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1846 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1847 list_init( &pool->pools[i] );
1848 RtlInitializeConditionVariable( &pool->update_event );
1850 pool->max_workers = 500;
1851 pool->min_workers = 0;
1852 pool->num_workers = 0;
1853 pool->num_busy_workers = 0;
1854 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1855 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1857 TRACE( "allocated threadpool %p\n", pool );
1859 *out = pool;
1860 return STATUS_SUCCESS;
1863 /***********************************************************************
1864 * tp_threadpool_shutdown (internal)
1866 * Prepares the shutdown of a threadpool object and notifies all worker
1867 * threads to terminate (after all remaining work items have been
1868 * processed).
1870 static void tp_threadpool_shutdown( struct threadpool *pool )
1872 assert( pool != default_threadpool );
1874 pool->shutdown = TRUE;
1875 RtlWakeAllConditionVariable( &pool->update_event );
1878 /***********************************************************************
1879 * tp_threadpool_release (internal)
1881 * Releases a reference to a threadpool object.
1883 static BOOL tp_threadpool_release( struct threadpool *pool )
1885 unsigned int i;
1887 if (InterlockedDecrement( &pool->refcount ))
1888 return FALSE;
1890 TRACE( "destroying threadpool %p\n", pool );
1892 assert( pool->shutdown );
1893 assert( !pool->objcount );
1894 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1895 assert( list_empty( &pool->pools[i] ) );
1897 pool->cs.DebugInfo->Spare[0] = 0;
1898 RtlDeleteCriticalSection( &pool->cs );
1900 RtlFreeHeap( GetProcessHeap(), 0, pool );
1901 return TRUE;
1904 /***********************************************************************
1905 * tp_threadpool_lock (internal)
1907 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1908 * block. When the lock is acquired successfully, it is guaranteed that
1909 * there is at least one worker thread to process tasks.
1911 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1913 struct threadpool *pool = NULL;
1914 NTSTATUS status = STATUS_SUCCESS;
1916 if (environment)
1918 /* Validate environment parameters. */
1919 if (environment->Version == 3)
1921 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1923 switch (environment3->CallbackPriority)
1925 case TP_CALLBACK_PRIORITY_HIGH:
1926 case TP_CALLBACK_PRIORITY_NORMAL:
1927 case TP_CALLBACK_PRIORITY_LOW:
1928 break;
1929 default:
1930 return STATUS_INVALID_PARAMETER;
1934 pool = (struct threadpool *)environment->Pool;
1937 if (!pool)
1939 if (!default_threadpool)
1941 status = tp_threadpool_alloc( &pool );
1942 if (status != STATUS_SUCCESS)
1943 return status;
1945 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1947 tp_threadpool_shutdown( pool );
1948 tp_threadpool_release( pool );
1952 pool = default_threadpool;
1955 RtlEnterCriticalSection( &pool->cs );
1957 /* Make sure that the threadpool has at least one thread. */
1958 if (!pool->num_workers)
1959 status = tp_new_worker_thread( pool );
1961 /* Keep a reference, and increment objcount to ensure that the
1962 * last thread doesn't terminate. */
1963 if (status == STATUS_SUCCESS)
1965 InterlockedIncrement( &pool->refcount );
1966 pool->objcount++;
1969 RtlLeaveCriticalSection( &pool->cs );
1971 if (status != STATUS_SUCCESS)
1972 return status;
1974 *out = pool;
1975 return STATUS_SUCCESS;
1978 /***********************************************************************
1979 * tp_threadpool_unlock (internal)
1981 * Releases a lock on a threadpool.
1983 static void tp_threadpool_unlock( struct threadpool *pool )
1985 RtlEnterCriticalSection( &pool->cs );
1986 pool->objcount--;
1987 RtlLeaveCriticalSection( &pool->cs );
1988 tp_threadpool_release( pool );
1991 /***********************************************************************
1992 * tp_group_alloc (internal)
1994 * Allocates a new threadpool group object.
1996 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1998 struct threadpool_group *group;
2000 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
2001 if (!group)
2002 return STATUS_NO_MEMORY;
2004 group->refcount = 1;
2005 group->shutdown = FALSE;
2007 RtlInitializeCriticalSection( &group->cs );
2008 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
2010 list_init( &group->members );
2012 TRACE( "allocated group %p\n", group );
2014 *out = group;
2015 return STATUS_SUCCESS;
2018 /***********************************************************************
2019 * tp_group_shutdown (internal)
2021 * Marks the group object for shutdown.
2023 static void tp_group_shutdown( struct threadpool_group *group )
2025 group->shutdown = TRUE;
2028 /***********************************************************************
2029 * tp_group_release (internal)
2031 * Releases a reference to a group object.
2033 static BOOL tp_group_release( struct threadpool_group *group )
2035 if (InterlockedDecrement( &group->refcount ))
2036 return FALSE;
2038 TRACE( "destroying group %p\n", group );
2040 assert( group->shutdown );
2041 assert( list_empty( &group->members ) );
2043 group->cs.DebugInfo->Spare[0] = 0;
2044 RtlDeleteCriticalSection( &group->cs );
2046 RtlFreeHeap( GetProcessHeap(), 0, group );
2047 return TRUE;
2050 /***********************************************************************
2051 * tp_object_initialize (internal)
2053 * Initializes members of a threadpool object.
2055 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
2056 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
2058 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2060 object->refcount = 1;
2061 object->shutdown = FALSE;
2063 object->pool = pool;
2064 object->group = NULL;
2065 object->userdata = userdata;
2066 object->group_cancel_callback = NULL;
2067 object->finalization_callback = NULL;
2068 object->may_run_long = 0;
2069 object->race_dll = NULL;
2070 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2072 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2073 object->is_group_member = FALSE;
2075 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2076 RtlInitializeConditionVariable( &object->finished_event );
2077 RtlInitializeConditionVariable( &object->group_finished_event );
2078 object->num_pending_callbacks = 0;
2079 object->num_running_callbacks = 0;
2080 object->num_associated_callbacks = 0;
2082 if (environment)
2084 if (environment->Version != 1 && environment->Version != 3)
2085 FIXME( "unsupported environment version %u\n", environment->Version );
2087 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2088 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2089 object->finalization_callback = environment->FinalizationCallback;
2090 object->may_run_long = environment->u.s.LongFunction != 0;
2091 object->race_dll = environment->RaceDll;
2092 if (environment->Version == 3)
2094 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2096 object->priority = environment_v3->CallbackPriority;
2097 assert( object->priority < ARRAY_SIZE(pool->pools) );
2100 if (environment->ActivationContext)
2101 FIXME( "activation context not supported yet\n" );
2103 if (environment->u.s.Persistent)
2104 FIXME( "persistent threads not supported yet\n" );
2107 if (object->race_dll)
2108 LdrAddRefDll( 0, object->race_dll );
2110 TRACE( "allocated object %p of type %u\n", object, object->type );
2112 /* For simple callbacks we have to run tp_object_submit before adding this object
2113 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2114 * will be set, and tp_object_submit would fail with an assertion. */
2116 if (is_simple_callback)
2117 tp_object_submit( object, FALSE );
2119 if (object->group)
2121 struct threadpool_group *group = object->group;
2122 InterlockedIncrement( &group->refcount );
2124 RtlEnterCriticalSection( &group->cs );
2125 list_add_tail( &group->members, &object->group_entry );
2126 object->is_group_member = TRUE;
2127 RtlLeaveCriticalSection( &group->cs );
2130 if (is_simple_callback)
2131 tp_object_release( object );
2134 static void tp_object_prio_queue( struct threadpool_object *object )
2136 ++object->pool->num_busy_workers;
2137 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2140 /***********************************************************************
2141 * tp_object_submit (internal)
2143 * Submits a threadpool object to the associated threadpool. This
2144 * function has to be VOID because TpPostWork can never fail on Windows.
2146 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
2148 struct threadpool *pool = object->pool;
2149 NTSTATUS status = STATUS_UNSUCCESSFUL;
2151 assert( !object->shutdown );
2152 assert( !pool->shutdown );
2154 RtlEnterCriticalSection( &pool->cs );
2156 /* Start new worker threads if required. */
2157 if (pool->num_busy_workers >= pool->num_workers &&
2158 pool->num_workers < pool->max_workers)
2159 status = tp_new_worker_thread( pool );
2161 /* Queue work item and increment refcount. */
2162 InterlockedIncrement( &object->refcount );
2163 if (!object->num_pending_callbacks++)
2164 tp_object_prio_queue( object );
2166 /* Count how often the object was signaled. */
2167 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2168 object->u.wait.signaled++;
2170 /* No new thread started - wake up one existing thread. */
2171 if (status != STATUS_SUCCESS)
2173 assert( pool->num_workers > 0 );
2174 RtlWakeConditionVariable( &pool->update_event );
2177 RtlLeaveCriticalSection( &pool->cs );
2180 /***********************************************************************
2181 * tp_object_cancel (internal)
2183 * Cancels all currently pending callbacks for a specific object.
2185 static void tp_object_cancel( struct threadpool_object *object )
2187 struct threadpool *pool = object->pool;
2188 LONG pending_callbacks = 0;
2190 RtlEnterCriticalSection( &pool->cs );
2191 if (object->num_pending_callbacks)
2193 pending_callbacks = object->num_pending_callbacks;
2194 object->num_pending_callbacks = 0;
2195 list_remove( &object->pool_entry );
2197 if (object->type == TP_OBJECT_TYPE_WAIT)
2198 object->u.wait.signaled = 0;
2200 if (object->type == TP_OBJECT_TYPE_IO)
2201 object->u.io.pending_count = 0;
2202 RtlLeaveCriticalSection( &pool->cs );
2204 while (pending_callbacks--)
2205 tp_object_release( object );
2208 static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2210 if (object->num_pending_callbacks)
2211 return FALSE;
2212 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2213 return FALSE;
2215 if (group)
2216 return !object->num_running_callbacks;
2217 else
2218 return !object->num_associated_callbacks;
2221 /***********************************************************************
2222 * tp_object_wait (internal)
2224 * Waits until all pending and running callbacks of a specific object
2225 * have been processed.
2227 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2229 struct threadpool *pool = object->pool;
2231 RtlEnterCriticalSection( &pool->cs );
2232 while (!object_is_finished( object, group_wait ))
2234 if (group_wait)
2235 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2236 else
2237 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2239 RtlLeaveCriticalSection( &pool->cs );
2242 /***********************************************************************
2243 * tp_object_prepare_shutdown (internal)
2245 * Prepares a threadpool object for shutdown.
2247 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2249 if (object->type == TP_OBJECT_TYPE_TIMER)
2250 tp_timerqueue_unlock( object );
2251 else if (object->type == TP_OBJECT_TYPE_WAIT)
2252 tp_waitqueue_unlock( object );
2253 else if (object->type == TP_OBJECT_TYPE_IO)
2254 tp_ioqueue_unlock( object );
2257 /***********************************************************************
2258 * tp_object_release (internal)
2260 * Releases a reference to a threadpool object.
2262 static BOOL tp_object_release( struct threadpool_object *object )
2264 if (InterlockedDecrement( &object->refcount ))
2265 return FALSE;
2267 TRACE( "destroying object %p of type %u\n", object, object->type );
2269 assert( object->shutdown );
2270 assert( !object->num_pending_callbacks );
2271 assert( !object->num_running_callbacks );
2272 assert( !object->num_associated_callbacks );
2274 /* release reference to the group */
2275 if (object->group)
2277 struct threadpool_group *group = object->group;
2279 RtlEnterCriticalSection( &group->cs );
2280 if (object->is_group_member)
2282 list_remove( &object->group_entry );
2283 object->is_group_member = FALSE;
2285 RtlLeaveCriticalSection( &group->cs );
2287 tp_group_release( group );
2290 tp_threadpool_unlock( object->pool );
2292 if (object->race_dll)
2293 LdrUnloadDll( object->race_dll );
2295 RtlFreeHeap( GetProcessHeap(), 0, object );
2296 return TRUE;
2299 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2301 struct list *ptr;
2302 unsigned int i;
2304 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2306 if ((ptr = list_head( &pool->pools[i] )))
2307 break;
2310 return ptr;
2313 /***********************************************************************
2314 * threadpool_worker_proc (internal)
2316 static void CALLBACK threadpool_worker_proc( void *param )
2318 TP_CALLBACK_INSTANCE *callback_instance;
2319 struct threadpool_instance instance;
2320 struct io_completion completion;
2321 struct threadpool *pool = param;
2322 TP_WAIT_RESULT wait_result = 0;
2323 LARGE_INTEGER timeout;
2324 struct list *ptr;
2325 NTSTATUS status;
2327 TRACE( "starting worker thread for pool %p\n", pool );
2329 RtlEnterCriticalSection( &pool->cs );
2330 for (;;)
2332 while ((ptr = threadpool_get_next_item( pool )))
2334 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2335 assert( object->num_pending_callbacks > 0 );
2337 /* If further pending callbacks are queued, move the work item to
2338 * the end of the pool list. Otherwise remove it from the pool. */
2339 list_remove( &object->pool_entry );
2340 if (--object->num_pending_callbacks)
2341 tp_object_prio_queue( object );
2343 /* For wait objects check if they were signaled or have timed out. */
2344 if (object->type == TP_OBJECT_TYPE_WAIT)
2346 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2347 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2349 else if (object->type == TP_OBJECT_TYPE_IO)
2351 assert( object->u.io.completion_count );
2352 completion = object->u.io.completions[--object->u.io.completion_count];
2353 object->u.io.pending_count--;
2356 /* Leave critical section and do the actual callback. */
2357 object->num_associated_callbacks++;
2358 object->num_running_callbacks++;
2359 RtlLeaveCriticalSection( &pool->cs );
2361 /* Initialize threadpool instance struct. */
2362 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2363 instance.object = object;
2364 instance.threadid = GetCurrentThreadId();
2365 instance.associated = TRUE;
2366 instance.may_run_long = object->may_run_long;
2367 instance.cleanup.critical_section = NULL;
2368 instance.cleanup.mutex = NULL;
2369 instance.cleanup.semaphore = NULL;
2370 instance.cleanup.semaphore_count = 0;
2371 instance.cleanup.event = NULL;
2372 instance.cleanup.library = NULL;
2374 switch (object->type)
2376 case TP_OBJECT_TYPE_SIMPLE:
2378 TRACE( "executing simple callback %p(%p, %p)\n",
2379 object->u.simple.callback, callback_instance, object->userdata );
2380 object->u.simple.callback( callback_instance, object->userdata );
2381 TRACE( "callback %p returned\n", object->u.simple.callback );
2382 break;
2385 case TP_OBJECT_TYPE_WORK:
2387 TRACE( "executing work callback %p(%p, %p, %p)\n",
2388 object->u.work.callback, callback_instance, object->userdata, object );
2389 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2390 TRACE( "callback %p returned\n", object->u.work.callback );
2391 break;
2394 case TP_OBJECT_TYPE_TIMER:
2396 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2397 object->u.timer.callback, callback_instance, object->userdata, object );
2398 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2399 TRACE( "callback %p returned\n", object->u.timer.callback );
2400 break;
2403 case TP_OBJECT_TYPE_WAIT:
2405 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2406 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2407 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2408 TRACE( "callback %p returned\n", object->u.wait.callback );
2409 break;
2412 case TP_OBJECT_TYPE_IO:
2414 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2415 object->u.io.callback, callback_instance, object->userdata,
2416 completion.cvalue, &completion.iosb, (TP_IO *)object );
2417 object->u.io.callback( callback_instance, object->userdata,
2418 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2419 TRACE( "callback %p returned\n", object->u.io.callback );
2420 break;
2423 default:
2424 assert(0);
2425 break;
2428 /* Execute finalization callback. */
2429 if (object->finalization_callback)
2431 TRACE( "executing finalization callback %p(%p, %p)\n",
2432 object->finalization_callback, callback_instance, object->userdata );
2433 object->finalization_callback( callback_instance, object->userdata );
2434 TRACE( "callback %p returned\n", object->finalization_callback );
2437 /* Execute cleanup tasks. */
2438 if (instance.cleanup.critical_section)
2440 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2442 if (instance.cleanup.mutex)
2444 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2445 if (status != STATUS_SUCCESS) goto skip_cleanup;
2447 if (instance.cleanup.semaphore)
2449 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2450 if (status != STATUS_SUCCESS) goto skip_cleanup;
2452 if (instance.cleanup.event)
2454 status = NtSetEvent( instance.cleanup.event, NULL );
2455 if (status != STATUS_SUCCESS) goto skip_cleanup;
2457 if (instance.cleanup.library)
2459 LdrUnloadDll( instance.cleanup.library );
2462 skip_cleanup:
2463 RtlEnterCriticalSection( &pool->cs );
2464 assert(pool->num_busy_workers);
2465 pool->num_busy_workers--;
2467 /* Simple callbacks are automatically shutdown after execution. */
2468 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2470 tp_object_prepare_shutdown( object );
2471 object->shutdown = TRUE;
2474 object->num_running_callbacks--;
2475 if (object_is_finished( object, TRUE ))
2476 RtlWakeAllConditionVariable( &object->group_finished_event );
2478 if (instance.associated)
2480 object->num_associated_callbacks--;
2481 if (object_is_finished( object, FALSE ))
2482 RtlWakeAllConditionVariable( &object->finished_event );
2485 tp_object_release( object );
2488 /* Shutdown worker thread if requested. */
2489 if (pool->shutdown)
2490 break;
2492 /* Wait for new tasks or until the timeout expires. A thread only terminates
2493 * when no new tasks are available, and the number of threads can be
2494 * decreased without violating the min_workers limit. An exception is when
2495 * min_workers == 0, then objcount is used to detect if the last thread
2496 * can be terminated. */
2497 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2498 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2499 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2500 (!pool->min_workers && !pool->objcount)))
2502 break;
2505 pool->num_workers--;
2506 RtlLeaveCriticalSection( &pool->cs );
2508 TRACE( "terminating worker thread for pool %p\n", pool );
2509 tp_threadpool_release( pool );
2510 RtlExitUserThread( 0 );
2513 /***********************************************************************
2514 * TpAllocCleanupGroup (NTDLL.@)
2516 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2518 TRACE( "%p\n", out );
2520 return tp_group_alloc( (struct threadpool_group **)out );
2523 /***********************************************************************
2524 * TpAllocIoCompletion (NTDLL.@)
2526 NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2527 void *userdata, TP_CALLBACK_ENVIRON *environment )
2529 struct threadpool_object *object;
2530 struct threadpool *pool;
2531 NTSTATUS status;
2533 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2535 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2536 return STATUS_NO_MEMORY;
2538 if ((status = tp_threadpool_lock( &pool, environment )))
2540 RtlFreeHeap( GetProcessHeap(), 0, object );
2541 return status;
2544 object->type = TP_OBJECT_TYPE_IO;
2545 object->u.io.callback = callback;
2546 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2548 tp_threadpool_unlock( pool );
2549 RtlFreeHeap( GetProcessHeap(), 0, object );
2550 return status;
2553 if ((status = tp_ioqueue_lock( object, file )))
2555 tp_threadpool_unlock( pool );
2556 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2557 RtlFreeHeap( GetProcessHeap(), 0, object );
2558 return status;
2561 tp_object_initialize( object, pool, userdata, environment );
2563 *out = (TP_IO *)object;
2564 return STATUS_SUCCESS;
2567 /***********************************************************************
2568 * TpAllocPool (NTDLL.@)
2570 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2572 TRACE( "%p %p\n", out, reserved );
2574 if (reserved)
2575 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2577 return tp_threadpool_alloc( (struct threadpool **)out );
2580 /***********************************************************************
2581 * TpAllocTimer (NTDLL.@)
2583 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2584 TP_CALLBACK_ENVIRON *environment )
2586 struct threadpool_object *object;
2587 struct threadpool *pool;
2588 NTSTATUS status;
2590 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2592 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2593 if (!object)
2594 return STATUS_NO_MEMORY;
2596 status = tp_threadpool_lock( &pool, environment );
2597 if (status)
2599 RtlFreeHeap( GetProcessHeap(), 0, object );
2600 return status;
2603 object->type = TP_OBJECT_TYPE_TIMER;
2604 object->u.timer.callback = callback;
2606 status = tp_timerqueue_lock( object );
2607 if (status)
2609 tp_threadpool_unlock( pool );
2610 RtlFreeHeap( GetProcessHeap(), 0, object );
2611 return status;
2614 tp_object_initialize( object, pool, userdata, environment );
2616 *out = (TP_TIMER *)object;
2617 return STATUS_SUCCESS;
2620 /***********************************************************************
2621 * TpAllocWait (NTDLL.@)
2623 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2624 TP_CALLBACK_ENVIRON *environment )
2626 struct threadpool_object *object;
2627 struct threadpool *pool;
2628 NTSTATUS status;
2630 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2632 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2633 if (!object)
2634 return STATUS_NO_MEMORY;
2636 status = tp_threadpool_lock( &pool, environment );
2637 if (status)
2639 RtlFreeHeap( GetProcessHeap(), 0, object );
2640 return status;
2643 object->type = TP_OBJECT_TYPE_WAIT;
2644 object->u.wait.callback = callback;
2646 status = tp_waitqueue_lock( object );
2647 if (status)
2649 tp_threadpool_unlock( pool );
2650 RtlFreeHeap( GetProcessHeap(), 0, object );
2651 return status;
2654 tp_object_initialize( object, pool, userdata, environment );
2656 *out = (TP_WAIT *)object;
2657 return STATUS_SUCCESS;
2660 /***********************************************************************
2661 * TpAllocWork (NTDLL.@)
2663 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2664 TP_CALLBACK_ENVIRON *environment )
2666 struct threadpool_object *object;
2667 struct threadpool *pool;
2668 NTSTATUS status;
2670 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2672 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2673 if (!object)
2674 return STATUS_NO_MEMORY;
2676 status = tp_threadpool_lock( &pool, environment );
2677 if (status)
2679 RtlFreeHeap( GetProcessHeap(), 0, object );
2680 return status;
2683 object->type = TP_OBJECT_TYPE_WORK;
2684 object->u.work.callback = callback;
2685 tp_object_initialize( object, pool, userdata, environment );
2687 *out = (TP_WORK *)object;
2688 return STATUS_SUCCESS;
2691 /***********************************************************************
2692 * TpCancelAsyncIoOperation (NTDLL.@)
2694 void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2696 struct threadpool_object *this = impl_from_TP_IO( io );
2698 TRACE( "%p\n", io );
2700 RtlEnterCriticalSection( &this->pool->cs );
2702 this->u.io.pending_count--;
2703 if (object_is_finished( this, TRUE ))
2704 RtlWakeAllConditionVariable( &this->group_finished_event );
2705 if (object_is_finished( this, FALSE ))
2706 RtlWakeAllConditionVariable( &this->finished_event );
2708 RtlLeaveCriticalSection( &this->pool->cs );
2711 /***********************************************************************
2712 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2714 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2716 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2718 TRACE( "%p %p\n", instance, crit );
2720 if (!this->cleanup.critical_section)
2721 this->cleanup.critical_section = crit;
2724 /***********************************************************************
2725 * TpCallbackMayRunLong (NTDLL.@)
2727 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2729 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2730 struct threadpool_object *object = this->object;
2731 struct threadpool *pool;
2732 NTSTATUS status = STATUS_SUCCESS;
2734 TRACE( "%p\n", instance );
2736 if (this->threadid != GetCurrentThreadId())
2738 ERR("called from wrong thread, ignoring\n");
2739 return STATUS_UNSUCCESSFUL; /* FIXME */
2742 if (this->may_run_long)
2743 return STATUS_SUCCESS;
2745 pool = object->pool;
2746 RtlEnterCriticalSection( &pool->cs );
2748 /* Start new worker threads if required. */
2749 if (pool->num_busy_workers >= pool->num_workers)
2751 if (pool->num_workers < pool->max_workers)
2753 status = tp_new_worker_thread( pool );
2755 else
2757 status = STATUS_TOO_MANY_THREADS;
2761 RtlLeaveCriticalSection( &pool->cs );
2762 this->may_run_long = TRUE;
2763 return status;
2766 /***********************************************************************
2767 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2769 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2771 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2773 TRACE( "%p %p\n", instance, mutex );
2775 if (!this->cleanup.mutex)
2776 this->cleanup.mutex = mutex;
2779 /***********************************************************************
2780 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2782 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2784 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2786 TRACE( "%p %p %u\n", instance, semaphore, count );
2788 if (!this->cleanup.semaphore)
2790 this->cleanup.semaphore = semaphore;
2791 this->cleanup.semaphore_count = count;
2795 /***********************************************************************
2796 * TpCallbackSetEventOnCompletion (NTDLL.@)
2798 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2800 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2802 TRACE( "%p %p\n", instance, event );
2804 if (!this->cleanup.event)
2805 this->cleanup.event = event;
2808 /***********************************************************************
2809 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2811 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2813 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2815 TRACE( "%p %p\n", instance, module );
2817 if (!this->cleanup.library)
2818 this->cleanup.library = module;
2821 /***********************************************************************
2822 * TpDisassociateCallback (NTDLL.@)
2824 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2826 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2827 struct threadpool_object *object = this->object;
2828 struct threadpool *pool;
2830 TRACE( "%p\n", instance );
2832 if (this->threadid != GetCurrentThreadId())
2834 ERR("called from wrong thread, ignoring\n");
2835 return;
2838 if (!this->associated)
2839 return;
2841 pool = object->pool;
2842 RtlEnterCriticalSection( &pool->cs );
2844 object->num_associated_callbacks--;
2845 if (object_is_finished( object, FALSE ))
2846 RtlWakeAllConditionVariable( &object->finished_event );
2848 RtlLeaveCriticalSection( &pool->cs );
2849 this->associated = FALSE;
2852 /***********************************************************************
2853 * TpIsTimerSet (NTDLL.@)
2855 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2857 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2859 TRACE( "%p\n", timer );
2861 return this->u.timer.timer_set;
2864 /***********************************************************************
2865 * TpPostWork (NTDLL.@)
2867 VOID WINAPI TpPostWork( TP_WORK *work )
2869 struct threadpool_object *this = impl_from_TP_WORK( work );
2871 TRACE( "%p\n", work );
2873 tp_object_submit( this, FALSE );
2876 /***********************************************************************
2877 * TpReleaseCleanupGroup (NTDLL.@)
2879 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2881 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2883 TRACE( "%p\n", group );
2885 tp_group_shutdown( this );
2886 tp_group_release( this );
2889 /***********************************************************************
2890 * TpReleaseCleanupGroupMembers (NTDLL.@)
2892 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2894 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2895 struct threadpool_object *object, *next;
2896 struct list members;
2898 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2900 RtlEnterCriticalSection( &this->cs );
2902 /* Unset group, increase references, and mark objects for shutdown */
2903 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2905 assert( object->group == this );
2906 assert( object->is_group_member );
2908 if (InterlockedIncrement( &object->refcount ) == 1)
2910 /* Object is basically already destroyed, but group reference
2911 * was not deleted yet. We can safely ignore this object. */
2912 InterlockedDecrement( &object->refcount );
2913 list_remove( &object->group_entry );
2914 object->is_group_member = FALSE;
2915 continue;
2918 object->is_group_member = FALSE;
2919 tp_object_prepare_shutdown( object );
2922 /* Move members to a new temporary list */
2923 list_init( &members );
2924 list_move_tail( &members, &this->members );
2926 RtlLeaveCriticalSection( &this->cs );
2928 /* Cancel pending callbacks if requested */
2929 if (cancel_pending)
2931 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2933 tp_object_cancel( object );
2937 /* Wait for remaining callbacks to finish */
2938 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2940 tp_object_wait( object, TRUE );
2942 if (!object->shutdown)
2944 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2945 if (cancel_pending && object->group_cancel_callback)
2947 TRACE( "executing group cancel callback %p(%p, %p)\n",
2948 object->group_cancel_callback, object->userdata, userdata );
2949 object->group_cancel_callback( object->userdata, userdata );
2950 TRACE( "callback %p returned\n", object->group_cancel_callback );
2953 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2954 tp_object_release( object );
2957 object->shutdown = TRUE;
2958 tp_object_release( object );
2962 /***********************************************************************
2963 * TpReleaseIoCompletion (NTDLL.@)
2965 void WINAPI TpReleaseIoCompletion( TP_IO *io )
2967 struct threadpool_object *this = impl_from_TP_IO( io );
2969 TRACE( "%p\n", io );
2971 tp_object_prepare_shutdown( this );
2972 this->shutdown = TRUE;
2973 tp_object_release( this );
2976 /***********************************************************************
2977 * TpReleasePool (NTDLL.@)
2979 VOID WINAPI TpReleasePool( TP_POOL *pool )
2981 struct threadpool *this = impl_from_TP_POOL( pool );
2983 TRACE( "%p\n", pool );
2985 tp_threadpool_shutdown( this );
2986 tp_threadpool_release( this );
2989 /***********************************************************************
2990 * TpReleaseTimer (NTDLL.@)
2992 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2994 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2996 TRACE( "%p\n", timer );
2998 tp_object_prepare_shutdown( this );
2999 this->shutdown = TRUE;
3000 tp_object_release( this );
3003 /***********************************************************************
3004 * TpReleaseWait (NTDLL.@)
3006 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
3008 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3010 TRACE( "%p\n", wait );
3012 tp_object_prepare_shutdown( this );
3013 this->shutdown = TRUE;
3014 tp_object_release( this );
3017 /***********************************************************************
3018 * TpReleaseWork (NTDLL.@)
3020 VOID WINAPI TpReleaseWork( TP_WORK *work )
3022 struct threadpool_object *this = impl_from_TP_WORK( work );
3024 TRACE( "%p\n", work );
3026 tp_object_prepare_shutdown( this );
3027 this->shutdown = TRUE;
3028 tp_object_release( this );
3031 /***********************************************************************
3032 * TpSetPoolMaxThreads (NTDLL.@)
3034 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
3036 struct threadpool *this = impl_from_TP_POOL( pool );
3038 TRACE( "%p %u\n", pool, maximum );
3040 RtlEnterCriticalSection( &this->cs );
3041 this->max_workers = max( maximum, 1 );
3042 this->min_workers = min( this->min_workers, this->max_workers );
3043 RtlLeaveCriticalSection( &this->cs );
3046 /***********************************************************************
3047 * TpSetPoolMinThreads (NTDLL.@)
3049 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
3051 struct threadpool *this = impl_from_TP_POOL( pool );
3052 NTSTATUS status = STATUS_SUCCESS;
3054 TRACE( "%p %u\n", pool, minimum );
3056 RtlEnterCriticalSection( &this->cs );
3058 while (this->num_workers < minimum)
3060 status = tp_new_worker_thread( this );
3061 if (status != STATUS_SUCCESS)
3062 break;
3065 if (status == STATUS_SUCCESS)
3067 this->min_workers = minimum;
3068 this->max_workers = max( this->min_workers, this->max_workers );
3071 RtlLeaveCriticalSection( &this->cs );
3072 return !status;
3075 /***********************************************************************
3076 * TpSetTimer (NTDLL.@)
3078 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
3080 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3081 struct threadpool_object *other_timer;
3082 BOOL submit_timer = FALSE;
3083 ULONGLONG timestamp;
3085 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
3087 RtlEnterCriticalSection( &timerqueue.cs );
3089 assert( this->u.timer.timer_initialized );
3090 this->u.timer.timer_set = timeout != NULL;
3092 /* Convert relative timeout to absolute timestamp and handle a timeout
3093 * of zero, which means that the timer is submitted immediately. */
3094 if (timeout)
3096 timestamp = timeout->QuadPart;
3097 if ((LONGLONG)timestamp < 0)
3099 LARGE_INTEGER now;
3100 NtQuerySystemTime( &now );
3101 timestamp = now.QuadPart - timestamp;
3103 else if (!timestamp)
3105 if (!period)
3106 timeout = NULL;
3107 else
3109 LARGE_INTEGER now;
3110 NtQuerySystemTime( &now );
3111 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3113 submit_timer = TRUE;
3117 /* First remove existing timeout. */
3118 if (this->u.timer.timer_pending)
3120 list_remove( &this->u.timer.timer_entry );
3121 this->u.timer.timer_pending = FALSE;
3124 /* If the timer was enabled, then add it back to the queue. */
3125 if (timeout)
3127 this->u.timer.timeout = timestamp;
3128 this->u.timer.period = period;
3129 this->u.timer.window_length = window_length;
3131 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3132 struct threadpool_object, u.timer.timer_entry )
3134 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3135 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3136 break;
3138 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3140 /* Wake up the timer thread when the timeout has to be updated. */
3141 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3142 RtlWakeAllConditionVariable( &timerqueue.update_event );
3144 this->u.timer.timer_pending = TRUE;
3147 RtlLeaveCriticalSection( &timerqueue.cs );
3149 if (submit_timer)
3150 tp_object_submit( this, FALSE );
3153 /***********************************************************************
3154 * TpSetWait (NTDLL.@)
3156 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
3158 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3159 ULONGLONG timestamp = TIMEOUT_INFINITE;
3160 BOOL submit_wait = FALSE;
3162 TRACE( "%p %p %p\n", wait, handle, timeout );
3164 RtlEnterCriticalSection( &waitqueue.cs );
3166 assert( this->u.wait.bucket );
3167 this->u.wait.handle = handle;
3169 if (handle || this->u.wait.wait_pending)
3171 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3172 list_remove( &this->u.wait.wait_entry );
3174 /* Convert relative timeout to absolute timestamp. */
3175 if (handle && timeout)
3177 timestamp = timeout->QuadPart;
3178 if ((LONGLONG)timestamp < 0)
3180 LARGE_INTEGER now;
3181 NtQuerySystemTime( &now );
3182 timestamp = now.QuadPart - timestamp;
3184 else if (!timestamp)
3186 submit_wait = TRUE;
3187 handle = NULL;
3191 /* Add wait object back into one of the queues. */
3192 if (handle)
3194 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3195 this->u.wait.wait_pending = TRUE;
3196 this->u.wait.timeout = timestamp;
3198 else
3200 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3201 this->u.wait.wait_pending = FALSE;
3204 /* Wake up the wait queue thread. */
3205 NtSetEvent( bucket->update_event, NULL );
3208 RtlLeaveCriticalSection( &waitqueue.cs );
3210 if (submit_wait)
3211 tp_object_submit( this, FALSE );
3214 /***********************************************************************
3215 * TpSimpleTryPost (NTDLL.@)
3217 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3218 TP_CALLBACK_ENVIRON *environment )
3220 struct threadpool_object *object;
3221 struct threadpool *pool;
3222 NTSTATUS status;
3224 TRACE( "%p %p %p\n", callback, userdata, environment );
3226 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3227 if (!object)
3228 return STATUS_NO_MEMORY;
3230 status = tp_threadpool_lock( &pool, environment );
3231 if (status)
3233 RtlFreeHeap( GetProcessHeap(), 0, object );
3234 return status;
3237 object->type = TP_OBJECT_TYPE_SIMPLE;
3238 object->u.simple.callback = callback;
3239 tp_object_initialize( object, pool, userdata, environment );
3241 return STATUS_SUCCESS;
3244 /***********************************************************************
3245 * TpStartAsyncIoOperation (NTDLL.@)
3247 void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3249 struct threadpool_object *this = impl_from_TP_IO( io );
3251 TRACE( "%p\n", io );
3253 RtlEnterCriticalSection( &this->pool->cs );
3255 this->u.io.pending_count++;
3257 RtlLeaveCriticalSection( &this->pool->cs );
3260 /***********************************************************************
3261 * TpWaitForIoCompletion (NTDLL.@)
3263 void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3265 struct threadpool_object *this = impl_from_TP_IO( io );
3267 TRACE( "%p %d\n", io, cancel_pending );
3269 if (cancel_pending)
3270 tp_object_cancel( this );
3271 tp_object_wait( this, FALSE );
3274 /***********************************************************************
3275 * TpWaitForTimer (NTDLL.@)
3277 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3279 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3281 TRACE( "%p %d\n", timer, cancel_pending );
3283 if (cancel_pending)
3284 tp_object_cancel( this );
3285 tp_object_wait( this, FALSE );
3288 /***********************************************************************
3289 * TpWaitForWait (NTDLL.@)
3291 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3293 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3295 TRACE( "%p %d\n", wait, cancel_pending );
3297 if (cancel_pending)
3298 tp_object_cancel( this );
3299 tp_object_wait( this, FALSE );
3302 /***********************************************************************
3303 * TpWaitForWork (NTDLL.@)
3305 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3307 struct threadpool_object *this = impl_from_TP_WORK( work );
3309 TRACE( "%p %u\n", work, cancel_pending );
3311 if (cancel_pending)
3312 tp_object_cancel( this );
3313 tp_object_wait( this, FALSE );
3316 /***********************************************************************
3317 * TpSetPoolStackInformation (NTDLL.@)
3319 NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3321 struct threadpool *this = impl_from_TP_POOL( pool );
3323 TRACE( "%p %p\n", pool, stack_info );
3325 if (!stack_info)
3326 return STATUS_INVALID_PARAMETER;
3328 RtlEnterCriticalSection( &this->cs );
3329 this->stack_info = *stack_info;
3330 RtlLeaveCriticalSection( &this->cs );
3332 return STATUS_SUCCESS;
3335 /***********************************************************************
3336 * TpQueryPoolStackInformation (NTDLL.@)
3338 NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3340 struct threadpool *this = impl_from_TP_POOL( pool );
3342 TRACE( "%p %p\n", pool, stack_info );
3344 if (!stack_info)
3345 return STATUS_INVALID_PARAMETER;
3347 RtlEnterCriticalSection( &this->cs );
3348 *stack_info = this->stack_info;
3349 RtlLeaveCriticalSection( &this->cs );
3351 return STATUS_SUCCESS;