wined3d: Pass a wined3d_device_context to wined3d_cs_emit_blt_sub_resource().
[wine/zf.git] / dlls / ntdll / threadpool.c
blob1b51a1919797b1534a0875f2070e1828811b07e0
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include <assert.h>
23 #include <stdarg.h>
24 #include <limits.h>
26 #define NONAMELESSUNION
27 #include "ntstatus.h"
28 #define WIN32_NO_STATUS
29 #include "winternl.h"
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
39 * Old thread pooling API
42 struct rtl_work_item
44 PRTL_WORK_ITEM_ROUTINE function;
45 PVOID context;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
53 static struct
55 HANDLE compl_port;
56 RTL_CRITICAL_SECTION threadpool_compl_cs;
58 old_threadpool =
60 NULL, /* compl_port */
61 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
66 0, 0, &old_threadpool.threadpool_compl_cs,
67 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
68 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
71 struct timer_queue;
72 struct queue_timer
74 struct timer_queue *q;
75 struct list entry;
76 ULONG runcount; /* number of callbacks pending execution */
77 RTL_WAITORTIMERCALLBACKFUNC callback;
78 PVOID param;
79 DWORD period;
80 ULONG flags;
81 ULONGLONG expire;
82 BOOL destroy; /* timer should be deleted; once set, never unset */
83 HANDLE event; /* removal event */
86 struct timer_queue
88 DWORD magic;
89 RTL_CRITICAL_SECTION cs;
90 struct list timers; /* sorted by expiration time */
91 BOOL quit; /* queue should be deleted; once set, never unset */
92 HANDLE event;
93 HANDLE thread;
97 * Object-oriented thread pooling API
100 #define THREADPOOL_WORKER_TIMEOUT 5000
101 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
103 /* internal threadpool representation */
104 struct threadpool
106 LONG refcount;
107 LONG objcount;
108 BOOL shutdown;
109 CRITICAL_SECTION cs;
110 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
111 struct list pools[3];
112 RTL_CONDITION_VARIABLE update_event;
113 /* information about worker threads, locked via .cs */
114 int max_workers;
115 int min_workers;
116 int num_workers;
117 int num_busy_workers;
118 HANDLE compl_port;
119 TP_POOL_STACK_INFORMATION stack_info;
122 enum threadpool_objtype
124 TP_OBJECT_TYPE_SIMPLE,
125 TP_OBJECT_TYPE_WORK,
126 TP_OBJECT_TYPE_TIMER,
127 TP_OBJECT_TYPE_WAIT,
128 TP_OBJECT_TYPE_IO,
131 struct io_completion
133 IO_STATUS_BLOCK iosb;
134 ULONG_PTR cvalue;
137 /* internal threadpool object representation */
138 struct threadpool_object
140 void *win32_callback; /* leave space for kernelbase to store win32 callback */
141 LONG refcount;
142 BOOL shutdown;
143 /* read-only information */
144 enum threadpool_objtype type;
145 struct threadpool *pool;
146 struct threadpool_group *group;
147 PVOID userdata;
148 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
149 PTP_SIMPLE_CALLBACK finalization_callback;
150 BOOL may_run_long;
151 HMODULE race_dll;
152 TP_CALLBACK_PRIORITY priority;
153 /* information about the group, locked via .group->cs */
154 struct list group_entry;
155 BOOL is_group_member;
156 /* information about the pool, locked via .pool->cs */
157 struct list pool_entry;
158 RTL_CONDITION_VARIABLE finished_event;
159 RTL_CONDITION_VARIABLE group_finished_event;
160 HANDLE completed_event;
161 LONG num_pending_callbacks;
162 LONG num_running_callbacks;
163 LONG num_associated_callbacks;
164 /* arguments for callback */
165 union
167 struct
169 PTP_SIMPLE_CALLBACK callback;
170 } simple;
171 struct
173 PTP_WORK_CALLBACK callback;
174 } work;
175 struct
177 PTP_TIMER_CALLBACK callback;
178 /* information about the timer, locked via timerqueue.cs */
179 BOOL timer_initialized;
180 BOOL timer_pending;
181 struct list timer_entry;
182 BOOL timer_set;
183 ULONGLONG timeout;
184 LONG period;
185 LONG window_length;
186 } timer;
187 struct
189 PTP_WAIT_CALLBACK callback;
190 LONG signaled;
191 /* information about the wait object, locked via waitqueue.cs */
192 struct waitqueue_bucket *bucket;
193 BOOL wait_pending;
194 struct list wait_entry;
195 ULONGLONG timeout;
196 HANDLE handle;
197 DWORD flags;
198 RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
199 } wait;
200 struct
202 PTP_IO_CALLBACK callback;
203 /* locked via .pool->cs */
204 unsigned int pending_count, completion_count, completion_max;
205 struct io_completion *completions;
206 } io;
207 } u;
210 /* internal threadpool instance representation */
211 struct threadpool_instance
213 struct threadpool_object *object;
214 DWORD threadid;
215 BOOL associated;
216 BOOL may_run_long;
217 struct
219 CRITICAL_SECTION *critical_section;
220 HANDLE mutex;
221 HANDLE semaphore;
222 LONG semaphore_count;
223 HANDLE event;
224 HMODULE library;
225 } cleanup;
228 /* internal threadpool group representation */
229 struct threadpool_group
231 LONG refcount;
232 BOOL shutdown;
233 CRITICAL_SECTION cs;
234 /* list of group members, locked via .cs */
235 struct list members;
238 /* global timerqueue object */
239 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
241 static struct
243 CRITICAL_SECTION cs;
244 LONG objcount;
245 BOOL thread_running;
246 struct list pending_timers;
247 RTL_CONDITION_VARIABLE update_event;
249 timerqueue =
251 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
252 0, /* objcount */
253 FALSE, /* thread_running */
254 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
255 RTL_CONDITION_VARIABLE_INIT /* update_event */
258 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
260 0, 0, &timerqueue.cs,
261 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
262 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
265 /* global waitqueue object */
266 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
268 static struct
270 CRITICAL_SECTION cs;
271 LONG num_buckets;
272 struct list buckets;
274 waitqueue =
276 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
277 0, /* num_buckets */
278 LIST_INIT( waitqueue.buckets ) /* buckets */
281 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
283 0, 0, &waitqueue.cs,
284 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
285 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
288 struct waitqueue_bucket
290 struct list bucket_entry;
291 LONG objcount;
292 struct list reserved;
293 struct list waiting;
294 HANDLE update_event;
295 BOOL alertable;
298 /* global I/O completion queue object */
299 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
301 static struct
303 CRITICAL_SECTION cs;
304 LONG objcount;
305 BOOL thread_running;
306 HANDLE port;
307 RTL_CONDITION_VARIABLE update_event;
309 ioqueue =
311 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
314 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
316 0, 0, &ioqueue.cs,
317 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
318 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
321 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
323 return (struct threadpool *)pool;
326 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
328 struct threadpool_object *object = (struct threadpool_object *)work;
329 assert( object->type == TP_OBJECT_TYPE_WORK );
330 return object;
333 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
335 struct threadpool_object *object = (struct threadpool_object *)timer;
336 assert( object->type == TP_OBJECT_TYPE_TIMER );
337 return object;
340 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
342 struct threadpool_object *object = (struct threadpool_object *)wait;
343 assert( object->type == TP_OBJECT_TYPE_WAIT );
344 return object;
347 static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
349 struct threadpool_object *object = (struct threadpool_object *)io;
350 assert( object->type == TP_OBJECT_TYPE_IO );
351 return object;
354 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
356 return (struct threadpool_group *)group;
359 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
361 return (struct threadpool_instance *)instance;
364 static void CALLBACK threadpool_worker_proc( void *param );
365 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
366 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
367 static void tp_object_prepare_shutdown( struct threadpool_object *object );
368 static BOOL tp_object_release( struct threadpool_object *object );
369 static struct threadpool *default_threadpool = NULL;
371 static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
373 unsigned int new_capacity, max_capacity;
374 void *new_elements;
376 if (count <= *capacity)
377 return TRUE;
379 max_capacity = ~(SIZE_T)0 / size;
380 if (count > max_capacity)
381 return FALSE;
383 new_capacity = max(4, *capacity);
384 while (new_capacity < count && new_capacity <= max_capacity / 2)
385 new_capacity *= 2;
386 if (new_capacity < count)
387 new_capacity = max_capacity;
389 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
390 return FALSE;
392 *elements = new_elements;
393 *capacity = new_capacity;
395 return TRUE;
398 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
400 struct rtl_work_item *item = userdata;
402 TRACE("executing %p(%p)\n", item->function, item->context);
403 item->function( item->context );
405 RtlFreeHeap( GetProcessHeap(), 0, item );
408 /***********************************************************************
409 * RtlQueueWorkItem (NTDLL.@)
411 * Queues a work item into a thread in the thread pool.
413 * PARAMS
414 * function [I] Work function to execute.
415 * context [I] Context to pass to the work function when it is executed.
416 * flags [I] Flags. See notes.
418 * RETURNS
419 * Success: STATUS_SUCCESS.
420 * Failure: Any NTSTATUS code.
422 * NOTES
423 * Flags can be one or more of the following:
424 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
425 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
426 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
427 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
428 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
430 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
432 TP_CALLBACK_ENVIRON environment;
433 struct rtl_work_item *item;
434 NTSTATUS status;
436 TRACE( "%p %p %u\n", function, context, flags );
438 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
439 if (!item)
440 return STATUS_NO_MEMORY;
442 memset( &environment, 0, sizeof(environment) );
443 environment.Version = 1;
444 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
445 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
447 item->function = function;
448 item->context = context;
450 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
451 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
452 return status;
455 /***********************************************************************
456 * iocp_poller - get completion events and run callbacks
458 static DWORD CALLBACK iocp_poller(LPVOID Arg)
460 HANDLE cport = Arg;
462 while( TRUE )
464 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
465 LPVOID overlapped;
466 IO_STATUS_BLOCK iosb;
467 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
468 if (res)
470 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
472 else
474 DWORD transferred = 0;
475 DWORD err = 0;
477 if (iosb.u.Status == STATUS_SUCCESS)
478 transferred = iosb.Information;
479 else
480 err = RtlNtStatusToDosError(iosb.u.Status);
482 callback( err, transferred, overlapped );
485 return 0;
488 /***********************************************************************
489 * RtlSetIoCompletionCallback (NTDLL.@)
491 * Binds a handle to a thread pool's completion port, and possibly
492 * starts a non-I/O thread to monitor this port and call functions back.
494 * PARAMS
495 * FileHandle [I] Handle to bind to a completion port.
496 * Function [I] Callback function to call on I/O completions.
497 * Flags [I] Not used.
499 * RETURNS
500 * Success: STATUS_SUCCESS.
501 * Failure: Any NTSTATUS code.
504 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
506 IO_STATUS_BLOCK iosb;
507 FILE_COMPLETION_INFORMATION info;
509 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
511 if (!old_threadpool.compl_port)
513 NTSTATUS res = STATUS_SUCCESS;
515 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
516 if (!old_threadpool.compl_port)
518 HANDLE cport;
520 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
521 if (!res)
523 /* FIXME native can start additional threads in case of e.g. hung callback function. */
524 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
525 if (!res)
526 old_threadpool.compl_port = cport;
527 else
528 NtClose( cport );
531 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
532 if (res) return res;
535 info.CompletionPort = old_threadpool.compl_port;
536 info.CompletionKey = (ULONG_PTR)Function;
538 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
541 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
543 if (timeout == INFINITE) return NULL;
544 pTime->QuadPart = (ULONGLONG)timeout * -10000;
545 return pTime;
549 /************************** Timer Queue Impl **************************/
551 static void queue_remove_timer(struct queue_timer *t)
553 /* We MUST hold the queue cs while calling this function. This ensures
554 that we cannot queue another callback for this timer. The runcount
555 being zero makes sure we don't have any already queued. */
556 struct timer_queue *q = t->q;
558 assert(t->runcount == 0);
559 assert(t->destroy);
561 list_remove(&t->entry);
562 if (t->event)
563 NtSetEvent(t->event, NULL);
564 RtlFreeHeap(GetProcessHeap(), 0, t);
566 if (q->quit && list_empty(&q->timers))
567 NtSetEvent(q->event, NULL);
570 static void timer_cleanup_callback(struct queue_timer *t)
572 struct timer_queue *q = t->q;
573 RtlEnterCriticalSection(&q->cs);
575 assert(0 < t->runcount);
576 --t->runcount;
578 if (t->destroy && t->runcount == 0)
579 queue_remove_timer(t);
581 RtlLeaveCriticalSection(&q->cs);
584 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
586 struct queue_timer *t = p;
587 t->callback(t->param, TRUE);
588 timer_cleanup_callback(t);
589 return 0;
592 static inline ULONGLONG queue_current_time(void)
594 LARGE_INTEGER now, freq;
595 NtQueryPerformanceCounter(&now, &freq);
596 return now.QuadPart * 1000 / freq.QuadPart;
599 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
600 BOOL set_event)
602 /* We MUST hold the queue cs while calling this function. */
603 struct timer_queue *q = t->q;
604 struct list *ptr = &q->timers;
606 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
608 if (time != EXPIRE_NEVER)
609 LIST_FOR_EACH(ptr, &q->timers)
611 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
612 if (time < cur->expire)
613 break;
615 list_add_before(ptr, &t->entry);
617 t->expire = time;
619 /* If we insert at the head of the list, we need to expire sooner
620 than expected. */
621 if (set_event && &t->entry == list_head(&q->timers))
622 NtSetEvent(q->event, NULL);
625 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
626 BOOL set_event)
628 /* We MUST hold the queue cs while calling this function. */
629 list_remove(&t->entry);
630 queue_add_timer(t, time, set_event);
633 static void queue_timer_expire(struct timer_queue *q)
635 struct queue_timer *t = NULL;
637 RtlEnterCriticalSection(&q->cs);
638 if (list_head(&q->timers))
640 ULONGLONG now, next;
641 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
642 if (!t->destroy && t->expire <= ((now = queue_current_time())))
644 ++t->runcount;
645 if (t->period)
647 next = t->expire + t->period;
648 /* avoid trigger cascade if overloaded / hibernated */
649 if (next < now)
650 next = now + t->period;
652 else
653 next = EXPIRE_NEVER;
654 queue_move_timer(t, next, FALSE);
656 else
657 t = NULL;
659 RtlLeaveCriticalSection(&q->cs);
661 if (t)
663 if (t->flags & WT_EXECUTEINTIMERTHREAD)
664 timer_callback_wrapper(t);
665 else
667 ULONG flags
668 = (t->flags
669 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
670 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
671 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
672 if (status != STATUS_SUCCESS)
673 timer_cleanup_callback(t);
678 static ULONG queue_get_timeout(struct timer_queue *q)
680 struct queue_timer *t;
681 ULONG timeout = INFINITE;
683 RtlEnterCriticalSection(&q->cs);
684 if (list_head(&q->timers))
686 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
687 assert(!t->destroy || t->expire == EXPIRE_NEVER);
689 if (t->expire != EXPIRE_NEVER)
691 ULONGLONG time = queue_current_time();
692 timeout = t->expire < time ? 0 : t->expire - time;
695 RtlLeaveCriticalSection(&q->cs);
697 return timeout;
700 static void WINAPI timer_queue_thread_proc(LPVOID p)
702 struct timer_queue *q = p;
703 ULONG timeout_ms;
705 timeout_ms = INFINITE;
706 for (;;)
708 LARGE_INTEGER timeout;
709 NTSTATUS status;
710 BOOL done = FALSE;
712 status = NtWaitForSingleObject(
713 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
715 if (status == STATUS_WAIT_0)
717 /* There are two possible ways to trigger the event. Either
718 we are quitting and the last timer got removed, or a new
719 timer got put at the head of the list so we need to adjust
720 our timeout. */
721 RtlEnterCriticalSection(&q->cs);
722 if (q->quit && list_empty(&q->timers))
723 done = TRUE;
724 RtlLeaveCriticalSection(&q->cs);
726 else if (status == STATUS_TIMEOUT)
727 queue_timer_expire(q);
729 if (done)
730 break;
732 timeout_ms = queue_get_timeout(q);
735 NtClose(q->event);
736 RtlDeleteCriticalSection(&q->cs);
737 q->magic = 0;
738 RtlFreeHeap(GetProcessHeap(), 0, q);
739 RtlExitUserThread( 0 );
742 static void queue_destroy_timer(struct queue_timer *t)
744 /* We MUST hold the queue cs while calling this function. */
745 t->destroy = TRUE;
746 if (t->runcount == 0)
747 /* Ensure a timer is promptly removed. If callbacks are pending,
748 it will be removed after the last one finishes by the callback
749 cleanup wrapper. */
750 queue_remove_timer(t);
751 else
752 /* Make sure no destroyed timer masks an active timer at the head
753 of the sorted list. */
754 queue_move_timer(t, EXPIRE_NEVER, FALSE);
757 /***********************************************************************
758 * RtlCreateTimerQueue (NTDLL.@)
760 * Creates a timer queue object and returns a handle to it.
762 * PARAMS
763 * NewTimerQueue [O] The newly created queue.
765 * RETURNS
766 * Success: STATUS_SUCCESS.
767 * Failure: Any NTSTATUS code.
769 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
771 NTSTATUS status;
772 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
773 if (!q)
774 return STATUS_NO_MEMORY;
776 RtlInitializeCriticalSection(&q->cs);
777 list_init(&q->timers);
778 q->quit = FALSE;
779 q->magic = TIMER_QUEUE_MAGIC;
780 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
781 if (status != STATUS_SUCCESS)
783 RtlFreeHeap(GetProcessHeap(), 0, q);
784 return status;
786 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
787 timer_queue_thread_proc, q, &q->thread, NULL);
788 if (status != STATUS_SUCCESS)
790 NtClose(q->event);
791 RtlFreeHeap(GetProcessHeap(), 0, q);
792 return status;
795 *NewTimerQueue = q;
796 return STATUS_SUCCESS;
799 /***********************************************************************
800 * RtlDeleteTimerQueueEx (NTDLL.@)
802 * Deletes a timer queue object.
804 * PARAMS
805 * TimerQueue [I] The timer queue to destroy.
806 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
807 * wait until all timers are finished firing before
808 * returning. Otherwise, return immediately and set the
809 * event when all timers are done.
811 * RETURNS
812 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
813 * Failure: Any NTSTATUS code.
815 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
817 struct timer_queue *q = TimerQueue;
818 struct queue_timer *t, *temp;
819 HANDLE thread;
820 NTSTATUS status;
822 if (!q || q->magic != TIMER_QUEUE_MAGIC)
823 return STATUS_INVALID_HANDLE;
825 thread = q->thread;
827 RtlEnterCriticalSection(&q->cs);
828 q->quit = TRUE;
829 if (list_head(&q->timers))
830 /* When the last timer is removed, it will signal the timer thread to
831 exit... */
832 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
833 queue_destroy_timer(t);
834 else
835 /* However if we have none, we must do it ourselves. */
836 NtSetEvent(q->event, NULL);
837 RtlLeaveCriticalSection(&q->cs);
839 if (CompletionEvent == INVALID_HANDLE_VALUE)
841 NtWaitForSingleObject(thread, FALSE, NULL);
842 status = STATUS_SUCCESS;
844 else
846 if (CompletionEvent)
848 FIXME("asynchronous return on completion event unimplemented\n");
849 NtWaitForSingleObject(thread, FALSE, NULL);
850 NtSetEvent(CompletionEvent, NULL);
852 status = STATUS_PENDING;
855 NtClose(thread);
856 return status;
859 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
861 static struct timer_queue *default_timer_queue;
863 if (TimerQueue)
864 return TimerQueue;
865 else
867 if (!default_timer_queue)
869 HANDLE q;
870 NTSTATUS status = RtlCreateTimerQueue(&q);
871 if (status == STATUS_SUCCESS)
873 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
874 if (p)
875 /* Got beat to the punch. */
876 RtlDeleteTimerQueueEx(q, NULL);
879 return default_timer_queue;
883 /***********************************************************************
884 * RtlCreateTimer (NTDLL.@)
886 * Creates a new timer associated with the given queue.
888 * PARAMS
889 * NewTimer [O] The newly created timer.
890 * TimerQueue [I] The queue to hold the timer.
891 * Callback [I] The callback to fire.
892 * Parameter [I] The argument for the callback.
893 * DueTime [I] The delay, in milliseconds, before first firing the
894 * timer.
895 * Period [I] The period, in milliseconds, at which to fire the timer
896 * after the first callback. If zero, the timer will only
897 * fire once. It still needs to be deleted with
898 * RtlDeleteTimer.
899 * Flags [I] Flags controlling the execution of the callback. In
900 * addition to the WT_* thread pool flags (see
901 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
902 * WT_EXECUTEONLYONCE are supported.
904 * RETURNS
905 * Success: STATUS_SUCCESS.
906 * Failure: Any NTSTATUS code.
908 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
909 RTL_WAITORTIMERCALLBACKFUNC Callback,
910 PVOID Parameter, DWORD DueTime, DWORD Period,
911 ULONG Flags)
913 NTSTATUS status;
914 struct queue_timer *t;
915 struct timer_queue *q = get_timer_queue(TimerQueue);
917 if (!q) return STATUS_NO_MEMORY;
918 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
920 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
921 if (!t)
922 return STATUS_NO_MEMORY;
924 t->q = q;
925 t->runcount = 0;
926 t->callback = Callback;
927 t->param = Parameter;
928 t->period = Period;
929 t->flags = Flags;
930 t->destroy = FALSE;
931 t->event = NULL;
933 status = STATUS_SUCCESS;
934 RtlEnterCriticalSection(&q->cs);
935 if (q->quit)
936 status = STATUS_INVALID_HANDLE;
937 else
938 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
939 RtlLeaveCriticalSection(&q->cs);
941 if (status == STATUS_SUCCESS)
942 *NewTimer = t;
943 else
944 RtlFreeHeap(GetProcessHeap(), 0, t);
946 return status;
949 /***********************************************************************
950 * RtlUpdateTimer (NTDLL.@)
952 * Changes the time at which a timer expires.
954 * PARAMS
955 * TimerQueue [I] The queue that holds the timer.
956 * Timer [I] The timer to update.
957 * DueTime [I] The delay, in milliseconds, before next firing the timer.
958 * Period [I] The period, in milliseconds, at which to fire the timer
959 * after the first callback. If zero, the timer will not
960 * refire once. It still needs to be deleted with
961 * RtlDeleteTimer.
963 * RETURNS
964 * Success: STATUS_SUCCESS.
965 * Failure: Any NTSTATUS code.
967 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
968 DWORD DueTime, DWORD Period)
970 struct queue_timer *t = Timer;
971 struct timer_queue *q = t->q;
973 RtlEnterCriticalSection(&q->cs);
974 /* Can't change a timer if it was once-only or destroyed. */
975 if (t->expire != EXPIRE_NEVER)
977 t->period = Period;
978 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
980 RtlLeaveCriticalSection(&q->cs);
982 return STATUS_SUCCESS;
985 /***********************************************************************
986 * RtlDeleteTimer (NTDLL.@)
988 * Cancels a timer-queue timer.
990 * PARAMS
991 * TimerQueue [I] The queue that holds the timer.
992 * Timer [I] The timer to update.
993 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
994 * wait until the timer is finished firing all pending
995 * callbacks before returning. Otherwise, return
996 * immediately and set the timer is done.
998 * RETURNS
999 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1000 or if the completion event is NULL.
1001 * Failure: Any NTSTATUS code.
1003 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1004 HANDLE CompletionEvent)
1006 struct queue_timer *t = Timer;
1007 struct timer_queue *q;
1008 NTSTATUS status = STATUS_PENDING;
1009 HANDLE event = NULL;
1011 if (!Timer)
1012 return STATUS_INVALID_PARAMETER_1;
1013 q = t->q;
1014 if (CompletionEvent == INVALID_HANDLE_VALUE)
1016 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1017 if (status == STATUS_SUCCESS)
1018 status = STATUS_PENDING;
1020 else if (CompletionEvent)
1021 event = CompletionEvent;
1023 RtlEnterCriticalSection(&q->cs);
1024 t->event = event;
1025 if (t->runcount == 0 && event)
1026 status = STATUS_SUCCESS;
1027 queue_destroy_timer(t);
1028 RtlLeaveCriticalSection(&q->cs);
1030 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1032 if (status == STATUS_PENDING)
1034 NtWaitForSingleObject(event, FALSE, NULL);
1035 status = STATUS_SUCCESS;
1037 NtClose(event);
1040 return status;
1043 /***********************************************************************
1044 * timerqueue_thread_proc (internal)
1046 static void CALLBACK timerqueue_thread_proc( void *param )
1048 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1049 struct threadpool_object *other_timer;
1050 LARGE_INTEGER now, timeout;
1051 struct list *ptr;
1053 TRACE( "starting timer queue thread\n" );
1055 RtlEnterCriticalSection( &timerqueue.cs );
1056 for (;;)
1058 NtQuerySystemTime( &now );
1060 /* Check for expired timers. */
1061 while ((ptr = list_head( &timerqueue.pending_timers )))
1063 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1064 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1065 assert( timer->u.timer.timer_pending );
1066 if (timer->u.timer.timeout > now.QuadPart)
1067 break;
1069 /* Queue a new callback in one of the worker threads. */
1070 list_remove( &timer->u.timer.timer_entry );
1071 timer->u.timer.timer_pending = FALSE;
1072 tp_object_submit( timer, FALSE );
1074 /* Insert the timer back into the queue, except it's marked for shutdown. */
1075 if (timer->u.timer.period && !timer->shutdown)
1077 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1078 if (timer->u.timer.timeout <= now.QuadPart)
1079 timer->u.timer.timeout = now.QuadPart + 1;
1081 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1082 struct threadpool_object, u.timer.timer_entry )
1084 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1085 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1086 break;
1088 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1089 timer->u.timer.timer_pending = TRUE;
1093 timeout_lower = timeout_upper = MAXLONGLONG;
1095 /* Determine next timeout and use the window length to optimize wakeup times. */
1096 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1097 struct threadpool_object, u.timer.timer_entry )
1099 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1100 if (other_timer->u.timer.timeout >= timeout_upper)
1101 break;
1103 timeout_lower = other_timer->u.timer.timeout;
1104 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1105 if (new_timeout < timeout_upper)
1106 timeout_upper = new_timeout;
1109 /* Wait for timer update events or until the next timer expires. */
1110 if (timerqueue.objcount)
1112 timeout.QuadPart = timeout_lower;
1113 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1114 continue;
1117 /* All timers have been destroyed, if no new timers are created
1118 * within some amount of time, then we can shutdown this thread. */
1119 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1120 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1121 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1123 break;
1127 timerqueue.thread_running = FALSE;
1128 RtlLeaveCriticalSection( &timerqueue.cs );
1130 TRACE( "terminating timer queue thread\n" );
1131 RtlExitUserThread( 0 );
1134 /***********************************************************************
1135 * tp_new_worker_thread (internal)
1137 * Create and account a new worker thread for the desired pool.
1139 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1141 HANDLE thread;
1142 NTSTATUS status;
1144 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1145 threadpool_worker_proc, pool, &thread, NULL );
1146 if (status == STATUS_SUCCESS)
1148 InterlockedIncrement( &pool->refcount );
1149 pool->num_workers++;
1150 NtClose( thread );
1152 return status;
1155 /***********************************************************************
1156 * tp_timerqueue_lock (internal)
1158 * Acquires a lock on the global timerqueue. When the lock is acquired
1159 * successfully, it is guaranteed that the timer thread is running.
1161 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1163 NTSTATUS status = STATUS_SUCCESS;
1164 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1166 timer->u.timer.timer_initialized = FALSE;
1167 timer->u.timer.timer_pending = FALSE;
1168 timer->u.timer.timer_set = FALSE;
1169 timer->u.timer.timeout = 0;
1170 timer->u.timer.period = 0;
1171 timer->u.timer.window_length = 0;
1173 RtlEnterCriticalSection( &timerqueue.cs );
1175 /* Make sure that the timerqueue thread is running. */
1176 if (!timerqueue.thread_running)
1178 HANDLE thread;
1179 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1180 timerqueue_thread_proc, NULL, &thread, NULL );
1181 if (status == STATUS_SUCCESS)
1183 timerqueue.thread_running = TRUE;
1184 NtClose( thread );
1188 if (status == STATUS_SUCCESS)
1190 timer->u.timer.timer_initialized = TRUE;
1191 timerqueue.objcount++;
1194 RtlLeaveCriticalSection( &timerqueue.cs );
1195 return status;
1198 /***********************************************************************
1199 * tp_timerqueue_unlock (internal)
1201 * Releases a lock on the global timerqueue.
1203 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1205 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1207 RtlEnterCriticalSection( &timerqueue.cs );
1208 if (timer->u.timer.timer_initialized)
1210 /* If timer was pending, remove it. */
1211 if (timer->u.timer.timer_pending)
1213 list_remove( &timer->u.timer.timer_entry );
1214 timer->u.timer.timer_pending = FALSE;
1217 /* If the last timer object was destroyed, then wake up the thread. */
1218 if (!--timerqueue.objcount)
1220 assert( list_empty( &timerqueue.pending_timers ) );
1221 RtlWakeAllConditionVariable( &timerqueue.update_event );
1224 timer->u.timer.timer_initialized = FALSE;
1226 RtlLeaveCriticalSection( &timerqueue.cs );
1229 /***********************************************************************
1230 * waitqueue_thread_proc (internal)
1232 static void CALLBACK waitqueue_thread_proc( void *param )
1234 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1235 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1236 struct waitqueue_bucket *bucket = param;
1237 struct threadpool_object *wait, *next;
1238 LARGE_INTEGER now, timeout;
1239 DWORD num_handles;
1240 NTSTATUS status;
1242 TRACE( "starting wait queue thread\n" );
1244 RtlEnterCriticalSection( &waitqueue.cs );
1246 for (;;)
1248 NtQuerySystemTime( &now );
1249 timeout.QuadPart = MAXLONGLONG;
1250 num_handles = 0;
1252 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1253 u.wait.wait_entry )
1255 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1256 if (wait->u.wait.timeout <= now.QuadPart)
1258 /* Wait object timed out. */
1259 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1261 list_remove( &wait->u.wait.wait_entry );
1262 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1264 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1266 InterlockedIncrement( &wait->refcount );
1267 wait->num_pending_callbacks++;
1268 RtlEnterCriticalSection( &wait->pool->cs );
1269 tp_object_execute( wait, TRUE );
1270 RtlLeaveCriticalSection( &wait->pool->cs );
1271 tp_object_release( wait );
1273 else tp_object_submit( wait, FALSE );
1275 else
1277 if (wait->u.wait.timeout < timeout.QuadPart)
1278 timeout.QuadPart = wait->u.wait.timeout;
1280 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1281 InterlockedIncrement( &wait->refcount );
1282 objects[num_handles] = wait;
1283 handles[num_handles] = wait->u.wait.handle;
1284 num_handles++;
1288 if (!bucket->objcount)
1290 /* All wait objects have been destroyed, if no new wait objects are created
1291 * within some amount of time, then we can shutdown this thread. */
1292 assert( num_handles == 0 );
1293 RtlLeaveCriticalSection( &waitqueue.cs );
1294 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1295 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1296 RtlEnterCriticalSection( &waitqueue.cs );
1298 if (status == STATUS_TIMEOUT && !bucket->objcount)
1299 break;
1301 else
1303 handles[num_handles] = bucket->update_event;
1304 RtlLeaveCriticalSection( &waitqueue.cs );
1305 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1306 RtlEnterCriticalSection( &waitqueue.cs );
1308 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1310 wait = objects[status - STATUS_WAIT_0];
1311 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1312 if (wait->u.wait.bucket)
1314 /* Wait object signaled. */
1315 assert( wait->u.wait.bucket == bucket );
1316 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1318 list_remove( &wait->u.wait.wait_entry );
1319 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1321 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1323 wait->u.wait.signaled++;
1324 wait->num_pending_callbacks++;
1325 RtlEnterCriticalSection( &wait->pool->cs );
1326 tp_object_execute( wait, TRUE );
1327 RtlLeaveCriticalSection( &wait->pool->cs );
1329 else tp_object_submit( wait, TRUE );
1331 else
1332 WARN("wait object %p triggered while object was destroyed\n", wait);
1335 /* Release temporary references to wait objects. */
1336 while (num_handles)
1338 wait = objects[--num_handles];
1339 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1340 tp_object_release( wait );
1344 /* Try to merge bucket with other threads. */
1345 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1346 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1348 struct waitqueue_bucket *other_bucket;
1349 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1351 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1352 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1354 other_bucket->objcount += bucket->objcount;
1355 bucket->objcount = 0;
1357 /* Update reserved list. */
1358 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1360 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1361 wait->u.wait.bucket = other_bucket;
1363 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1365 /* Update waiting list. */
1366 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1368 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1369 wait->u.wait.bucket = other_bucket;
1371 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1373 /* Move bucket to the end, to keep the probability of
1374 * newly added wait objects as small as possible. */
1375 list_remove( &bucket->bucket_entry );
1376 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1378 NtSetEvent( other_bucket->update_event, NULL );
1379 break;
1385 /* Remove this bucket from the list. */
1386 list_remove( &bucket->bucket_entry );
1387 if (!--waitqueue.num_buckets)
1388 assert( list_empty( &waitqueue.buckets ) );
1390 RtlLeaveCriticalSection( &waitqueue.cs );
1392 TRACE( "terminating wait queue thread\n" );
1394 assert( bucket->objcount == 0 );
1395 assert( list_empty( &bucket->reserved ) );
1396 assert( list_empty( &bucket->waiting ) );
1397 NtClose( bucket->update_event );
1399 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1400 RtlExitUserThread( 0 );
1403 /***********************************************************************
1404 * tp_waitqueue_lock (internal)
1406 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1408 struct waitqueue_bucket *bucket;
1409 NTSTATUS status;
1410 HANDLE thread;
1411 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1412 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1414 wait->u.wait.signaled = 0;
1415 wait->u.wait.bucket = NULL;
1416 wait->u.wait.wait_pending = FALSE;
1417 wait->u.wait.timeout = 0;
1418 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1420 RtlEnterCriticalSection( &waitqueue.cs );
1422 /* Try to assign to existing bucket if possible. */
1423 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1425 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1427 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1428 wait->u.wait.bucket = bucket;
1429 bucket->objcount++;
1431 status = STATUS_SUCCESS;
1432 goto out;
1436 /* Create a new bucket and corresponding worker thread. */
1437 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1438 if (!bucket)
1440 status = STATUS_NO_MEMORY;
1441 goto out;
1444 bucket->objcount = 0;
1445 bucket->alertable = alertable;
1446 list_init( &bucket->reserved );
1447 list_init( &bucket->waiting );
1449 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1450 NULL, SynchronizationEvent, FALSE );
1451 if (status)
1453 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1454 goto out;
1457 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1458 waitqueue_thread_proc, bucket, &thread, NULL );
1459 if (status == STATUS_SUCCESS)
1461 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1462 waitqueue.num_buckets++;
1464 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1465 wait->u.wait.bucket = bucket;
1466 bucket->objcount++;
1468 NtClose( thread );
1470 else
1472 NtClose( bucket->update_event );
1473 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1476 out:
1477 RtlLeaveCriticalSection( &waitqueue.cs );
1478 return status;
1481 /***********************************************************************
1482 * tp_waitqueue_unlock (internal)
1484 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1486 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1488 RtlEnterCriticalSection( &waitqueue.cs );
1489 if (wait->u.wait.bucket)
1491 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1492 assert( bucket->objcount > 0 );
1494 list_remove( &wait->u.wait.wait_entry );
1495 wait->u.wait.bucket = NULL;
1496 bucket->objcount--;
1498 NtSetEvent( bucket->update_event, NULL );
1500 RtlLeaveCriticalSection( &waitqueue.cs );
1503 static void CALLBACK ioqueue_thread_proc( void *param )
1505 struct io_completion *completion;
1506 struct threadpool_object *io;
1507 IO_STATUS_BLOCK iosb;
1508 ULONG_PTR key, value;
1509 NTSTATUS status;
1511 TRACE( "starting I/O completion thread\n" );
1513 RtlEnterCriticalSection( &ioqueue.cs );
1515 for (;;)
1517 RtlLeaveCriticalSection( &ioqueue.cs );
1518 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1519 ERR("NtRemoveIoCompletion failed, status %#x.\n", status);
1520 RtlEnterCriticalSection( &ioqueue.cs );
1522 if (key)
1524 io = (struct threadpool_object *)key;
1526 RtlEnterCriticalSection( &io->pool->cs );
1528 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1529 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1531 ERR("Failed to allocate memory.\n");
1532 RtlLeaveCriticalSection( &io->pool->cs );
1533 continue;
1536 completion = &io->u.io.completions[io->u.io.completion_count++];
1537 completion->iosb = iosb;
1538 completion->cvalue = value;
1540 tp_object_submit( io, FALSE );
1542 RtlLeaveCriticalSection( &io->pool->cs );
1545 if (!ioqueue.objcount)
1547 /* All I/O objects have been destroyed; if no new objects are
1548 * created within some amount of time, then we can shutdown this
1549 * thread. */
1550 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1551 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1552 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1553 break;
1557 RtlLeaveCriticalSection( &ioqueue.cs );
1559 TRACE( "terminating I/O completion thread\n" );
1561 RtlExitUserThread( 0 );
1564 static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1566 NTSTATUS status = STATUS_SUCCESS;
1568 assert( io->type == TP_OBJECT_TYPE_IO );
1570 RtlEnterCriticalSection( &ioqueue.cs );
1572 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1573 IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1575 RtlLeaveCriticalSection( &ioqueue.cs );
1576 return status;
1579 if (!ioqueue.thread_running)
1581 HANDLE thread;
1583 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1584 NULL, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1586 ioqueue.thread_running = TRUE;
1587 NtClose( thread );
1591 if (status == STATUS_SUCCESS)
1593 FILE_COMPLETION_INFORMATION info;
1594 IO_STATUS_BLOCK iosb;
1596 info.CompletionPort = ioqueue.port;
1597 info.CompletionKey = (ULONG_PTR)io;
1599 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1602 if (status == STATUS_SUCCESS)
1604 if (!ioqueue.objcount++)
1605 RtlWakeConditionVariable( &ioqueue.update_event );
1608 RtlLeaveCriticalSection( &ioqueue.cs );
1609 return status;
1612 static void tp_ioqueue_unlock( struct threadpool_object *io )
1614 assert( io->type == TP_OBJECT_TYPE_IO );
1616 RtlEnterCriticalSection( &ioqueue.cs );
1618 if (!--ioqueue.objcount)
1619 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
1621 RtlLeaveCriticalSection( &ioqueue.cs );
1624 /***********************************************************************
1625 * tp_threadpool_alloc (internal)
1627 * Allocates a new threadpool object.
1629 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1631 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1632 struct threadpool *pool;
1633 unsigned int i;
1635 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1636 if (!pool)
1637 return STATUS_NO_MEMORY;
1639 pool->refcount = 1;
1640 pool->objcount = 0;
1641 pool->shutdown = FALSE;
1643 RtlInitializeCriticalSection( &pool->cs );
1644 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1646 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1647 list_init( &pool->pools[i] );
1648 RtlInitializeConditionVariable( &pool->update_event );
1650 pool->max_workers = 500;
1651 pool->min_workers = 0;
1652 pool->num_workers = 0;
1653 pool->num_busy_workers = 0;
1654 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1655 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1657 TRACE( "allocated threadpool %p\n", pool );
1659 *out = pool;
1660 return STATUS_SUCCESS;
1663 /***********************************************************************
1664 * tp_threadpool_shutdown (internal)
1666 * Prepares the shutdown of a threadpool object and notifies all worker
1667 * threads to terminate (after all remaining work items have been
1668 * processed).
1670 static void tp_threadpool_shutdown( struct threadpool *pool )
1672 assert( pool != default_threadpool );
1674 pool->shutdown = TRUE;
1675 RtlWakeAllConditionVariable( &pool->update_event );
1678 /***********************************************************************
1679 * tp_threadpool_release (internal)
1681 * Releases a reference to a threadpool object.
1683 static BOOL tp_threadpool_release( struct threadpool *pool )
1685 unsigned int i;
1687 if (InterlockedDecrement( &pool->refcount ))
1688 return FALSE;
1690 TRACE( "destroying threadpool %p\n", pool );
1692 assert( pool->shutdown );
1693 assert( !pool->objcount );
1694 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1695 assert( list_empty( &pool->pools[i] ) );
1697 pool->cs.DebugInfo->Spare[0] = 0;
1698 RtlDeleteCriticalSection( &pool->cs );
1700 RtlFreeHeap( GetProcessHeap(), 0, pool );
1701 return TRUE;
1704 /***********************************************************************
1705 * tp_threadpool_lock (internal)
1707 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1708 * block. When the lock is acquired successfully, it is guaranteed that
1709 * there is at least one worker thread to process tasks.
1711 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1713 struct threadpool *pool = NULL;
1714 NTSTATUS status = STATUS_SUCCESS;
1716 if (environment)
1718 /* Validate environment parameters. */
1719 if (environment->Version == 3)
1721 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1723 switch (environment3->CallbackPriority)
1725 case TP_CALLBACK_PRIORITY_HIGH:
1726 case TP_CALLBACK_PRIORITY_NORMAL:
1727 case TP_CALLBACK_PRIORITY_LOW:
1728 break;
1729 default:
1730 return STATUS_INVALID_PARAMETER;
1734 pool = (struct threadpool *)environment->Pool;
1737 if (!pool)
1739 if (!default_threadpool)
1741 status = tp_threadpool_alloc( &pool );
1742 if (status != STATUS_SUCCESS)
1743 return status;
1745 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1747 tp_threadpool_shutdown( pool );
1748 tp_threadpool_release( pool );
1752 pool = default_threadpool;
1755 RtlEnterCriticalSection( &pool->cs );
1757 /* Make sure that the threadpool has at least one thread. */
1758 if (!pool->num_workers)
1759 status = tp_new_worker_thread( pool );
1761 /* Keep a reference, and increment objcount to ensure that the
1762 * last thread doesn't terminate. */
1763 if (status == STATUS_SUCCESS)
1765 InterlockedIncrement( &pool->refcount );
1766 pool->objcount++;
1769 RtlLeaveCriticalSection( &pool->cs );
1771 if (status != STATUS_SUCCESS)
1772 return status;
1774 *out = pool;
1775 return STATUS_SUCCESS;
1778 /***********************************************************************
1779 * tp_threadpool_unlock (internal)
1781 * Releases a lock on a threadpool.
1783 static void tp_threadpool_unlock( struct threadpool *pool )
1785 RtlEnterCriticalSection( &pool->cs );
1786 pool->objcount--;
1787 RtlLeaveCriticalSection( &pool->cs );
1788 tp_threadpool_release( pool );
1791 /***********************************************************************
1792 * tp_group_alloc (internal)
1794 * Allocates a new threadpool group object.
1796 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1798 struct threadpool_group *group;
1800 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1801 if (!group)
1802 return STATUS_NO_MEMORY;
1804 group->refcount = 1;
1805 group->shutdown = FALSE;
1807 RtlInitializeCriticalSection( &group->cs );
1808 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1810 list_init( &group->members );
1812 TRACE( "allocated group %p\n", group );
1814 *out = group;
1815 return STATUS_SUCCESS;
1818 /***********************************************************************
1819 * tp_group_shutdown (internal)
1821 * Marks the group object for shutdown.
1823 static void tp_group_shutdown( struct threadpool_group *group )
1825 group->shutdown = TRUE;
1828 /***********************************************************************
1829 * tp_group_release (internal)
1831 * Releases a reference to a group object.
1833 static BOOL tp_group_release( struct threadpool_group *group )
1835 if (InterlockedDecrement( &group->refcount ))
1836 return FALSE;
1838 TRACE( "destroying group %p\n", group );
1840 assert( group->shutdown );
1841 assert( list_empty( &group->members ) );
1843 group->cs.DebugInfo->Spare[0] = 0;
1844 RtlDeleteCriticalSection( &group->cs );
1846 RtlFreeHeap( GetProcessHeap(), 0, group );
1847 return TRUE;
1850 /***********************************************************************
1851 * tp_object_initialize (internal)
1853 * Initializes members of a threadpool object.
1855 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1856 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1858 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1860 object->refcount = 1;
1861 object->shutdown = FALSE;
1863 object->pool = pool;
1864 object->group = NULL;
1865 object->userdata = userdata;
1866 object->group_cancel_callback = NULL;
1867 object->finalization_callback = NULL;
1868 object->may_run_long = 0;
1869 object->race_dll = NULL;
1870 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
1872 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1873 object->is_group_member = FALSE;
1875 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1876 RtlInitializeConditionVariable( &object->finished_event );
1877 RtlInitializeConditionVariable( &object->group_finished_event );
1878 object->completed_event = NULL;
1879 object->num_pending_callbacks = 0;
1880 object->num_running_callbacks = 0;
1881 object->num_associated_callbacks = 0;
1883 if (environment)
1885 if (environment->Version != 1 && environment->Version != 3)
1886 FIXME( "unsupported environment version %u\n", environment->Version );
1888 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1889 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1890 object->finalization_callback = environment->FinalizationCallback;
1891 object->may_run_long = environment->u.s.LongFunction != 0;
1892 object->race_dll = environment->RaceDll;
1893 if (environment->Version == 3)
1895 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1897 object->priority = environment_v3->CallbackPriority;
1898 assert( object->priority < ARRAY_SIZE(pool->pools) );
1901 if (environment->ActivationContext)
1902 FIXME( "activation context not supported yet\n" );
1904 if (environment->u.s.Persistent)
1905 FIXME( "persistent threads not supported yet\n" );
1908 if (object->race_dll)
1909 LdrAddRefDll( 0, object->race_dll );
1911 TRACE( "allocated object %p of type %u\n", object, object->type );
1913 /* For simple callbacks we have to run tp_object_submit before adding this object
1914 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1915 * will be set, and tp_object_submit would fail with an assertion. */
1917 if (is_simple_callback)
1918 tp_object_submit( object, FALSE );
1920 if (object->group)
1922 struct threadpool_group *group = object->group;
1923 InterlockedIncrement( &group->refcount );
1925 RtlEnterCriticalSection( &group->cs );
1926 list_add_tail( &group->members, &object->group_entry );
1927 object->is_group_member = TRUE;
1928 RtlLeaveCriticalSection( &group->cs );
1931 if (is_simple_callback)
1932 tp_object_release( object );
1935 static void tp_object_prio_queue( struct threadpool_object *object )
1937 ++object->pool->num_busy_workers;
1938 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
1941 /***********************************************************************
1942 * tp_object_submit (internal)
1944 * Submits a threadpool object to the associated threadpool. This
1945 * function has to be VOID because TpPostWork can never fail on Windows.
1947 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1949 struct threadpool *pool = object->pool;
1950 NTSTATUS status = STATUS_UNSUCCESSFUL;
1952 assert( !object->shutdown );
1953 assert( !pool->shutdown );
1955 RtlEnterCriticalSection( &pool->cs );
1957 /* Start new worker threads if required. */
1958 if (pool->num_busy_workers >= pool->num_workers &&
1959 pool->num_workers < pool->max_workers)
1960 status = tp_new_worker_thread( pool );
1962 /* Queue work item and increment refcount. */
1963 InterlockedIncrement( &object->refcount );
1964 if (!object->num_pending_callbacks++)
1965 tp_object_prio_queue( object );
1967 /* Count how often the object was signaled. */
1968 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
1969 object->u.wait.signaled++;
1971 /* No new thread started - wake up one existing thread. */
1972 if (status != STATUS_SUCCESS)
1974 assert( pool->num_workers > 0 );
1975 RtlWakeConditionVariable( &pool->update_event );
1978 RtlLeaveCriticalSection( &pool->cs );
1981 /***********************************************************************
1982 * tp_object_cancel (internal)
1984 * Cancels all currently pending callbacks for a specific object.
1986 static void tp_object_cancel( struct threadpool_object *object )
1988 struct threadpool *pool = object->pool;
1989 LONG pending_callbacks = 0;
1991 RtlEnterCriticalSection( &pool->cs );
1992 if (object->num_pending_callbacks)
1994 pending_callbacks = object->num_pending_callbacks;
1995 object->num_pending_callbacks = 0;
1996 list_remove( &object->pool_entry );
1998 if (object->type == TP_OBJECT_TYPE_WAIT)
1999 object->u.wait.signaled = 0;
2001 if (object->type == TP_OBJECT_TYPE_IO)
2002 object->u.io.pending_count = 0;
2003 RtlLeaveCriticalSection( &pool->cs );
2005 while (pending_callbacks--)
2006 tp_object_release( object );
2009 static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2011 if (object->num_pending_callbacks)
2012 return FALSE;
2013 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2014 return FALSE;
2016 if (group)
2017 return !object->num_running_callbacks;
2018 else
2019 return !object->num_associated_callbacks;
2022 /***********************************************************************
2023 * tp_object_wait (internal)
2025 * Waits until all pending and running callbacks of a specific object
2026 * have been processed.
2028 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2030 struct threadpool *pool = object->pool;
2032 RtlEnterCriticalSection( &pool->cs );
2033 while (!object_is_finished( object, group_wait ))
2035 if (group_wait)
2036 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2037 else
2038 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2040 RtlLeaveCriticalSection( &pool->cs );
2043 /***********************************************************************
2044 * tp_object_prepare_shutdown (internal)
2046 * Prepares a threadpool object for shutdown.
2048 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2050 if (object->type == TP_OBJECT_TYPE_TIMER)
2051 tp_timerqueue_unlock( object );
2052 else if (object->type == TP_OBJECT_TYPE_WAIT)
2053 tp_waitqueue_unlock( object );
2054 else if (object->type == TP_OBJECT_TYPE_IO)
2055 tp_ioqueue_unlock( object );
2058 /***********************************************************************
2059 * tp_object_release (internal)
2061 * Releases a reference to a threadpool object.
2063 static BOOL tp_object_release( struct threadpool_object *object )
2065 if (InterlockedDecrement( &object->refcount ))
2066 return FALSE;
2068 TRACE( "destroying object %p of type %u\n", object, object->type );
2070 assert( object->shutdown );
2071 assert( !object->num_pending_callbacks );
2072 assert( !object->num_running_callbacks );
2073 assert( !object->num_associated_callbacks );
2075 /* release reference to the group */
2076 if (object->group)
2078 struct threadpool_group *group = object->group;
2080 RtlEnterCriticalSection( &group->cs );
2081 if (object->is_group_member)
2083 list_remove( &object->group_entry );
2084 object->is_group_member = FALSE;
2086 RtlLeaveCriticalSection( &group->cs );
2088 tp_group_release( group );
2091 tp_threadpool_unlock( object->pool );
2093 if (object->race_dll)
2094 LdrUnloadDll( object->race_dll );
2096 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2097 NtSetEvent( object->completed_event, NULL );
2099 RtlFreeHeap( GetProcessHeap(), 0, object );
2100 return TRUE;
2103 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2105 struct list *ptr;
2106 unsigned int i;
2108 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2110 if ((ptr = list_head( &pool->pools[i] )))
2111 break;
2114 return ptr;
2117 /***********************************************************************
2118 * tp_object_execute (internal)
2120 * Executes a threadpool object callback, object->pool->cs has to be
2121 * held.
2123 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2125 TP_CALLBACK_INSTANCE *callback_instance;
2126 struct threadpool_instance instance;
2127 struct io_completion completion;
2128 struct threadpool *pool = object->pool;
2129 TP_WAIT_RESULT wait_result = 0;
2130 NTSTATUS status;
2132 object->num_pending_callbacks--;
2134 /* For wait objects check if they were signaled or have timed out. */
2135 if (object->type == TP_OBJECT_TYPE_WAIT)
2137 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2138 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2140 else if (object->type == TP_OBJECT_TYPE_IO)
2142 assert( object->u.io.completion_count );
2143 completion = object->u.io.completions[--object->u.io.completion_count];
2144 object->u.io.pending_count--;
2147 /* Leave critical section and do the actual callback. */
2148 object->num_associated_callbacks++;
2149 object->num_running_callbacks++;
2150 RtlLeaveCriticalSection( &pool->cs );
2151 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2153 /* Initialize threadpool instance struct. */
2154 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2155 instance.object = object;
2156 instance.threadid = GetCurrentThreadId();
2157 instance.associated = TRUE;
2158 instance.may_run_long = object->may_run_long;
2159 instance.cleanup.critical_section = NULL;
2160 instance.cleanup.mutex = NULL;
2161 instance.cleanup.semaphore = NULL;
2162 instance.cleanup.semaphore_count = 0;
2163 instance.cleanup.event = NULL;
2164 instance.cleanup.library = NULL;
2166 switch (object->type)
2168 case TP_OBJECT_TYPE_SIMPLE:
2170 TRACE( "executing simple callback %p(%p, %p)\n",
2171 object->u.simple.callback, callback_instance, object->userdata );
2172 object->u.simple.callback( callback_instance, object->userdata );
2173 TRACE( "callback %p returned\n", object->u.simple.callback );
2174 break;
2177 case TP_OBJECT_TYPE_WORK:
2179 TRACE( "executing work callback %p(%p, %p, %p)\n",
2180 object->u.work.callback, callback_instance, object->userdata, object );
2181 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2182 TRACE( "callback %p returned\n", object->u.work.callback );
2183 break;
2186 case TP_OBJECT_TYPE_TIMER:
2188 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2189 object->u.timer.callback, callback_instance, object->userdata, object );
2190 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2191 TRACE( "callback %p returned\n", object->u.timer.callback );
2192 break;
2195 case TP_OBJECT_TYPE_WAIT:
2197 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2198 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2199 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2200 TRACE( "callback %p returned\n", object->u.wait.callback );
2201 break;
2204 case TP_OBJECT_TYPE_IO:
2206 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2207 object->u.io.callback, callback_instance, object->userdata,
2208 completion.cvalue, &completion.iosb, (TP_IO *)object );
2209 object->u.io.callback( callback_instance, object->userdata,
2210 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2211 TRACE( "callback %p returned\n", object->u.io.callback );
2212 break;
2215 default:
2216 assert(0);
2217 break;
2220 /* Execute finalization callback. */
2221 if (object->finalization_callback)
2223 TRACE( "executing finalization callback %p(%p, %p)\n",
2224 object->finalization_callback, callback_instance, object->userdata );
2225 object->finalization_callback( callback_instance, object->userdata );
2226 TRACE( "callback %p returned\n", object->finalization_callback );
2229 /* Execute cleanup tasks. */
2230 if (instance.cleanup.critical_section)
2232 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2234 if (instance.cleanup.mutex)
2236 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2237 if (status != STATUS_SUCCESS) goto skip_cleanup;
2239 if (instance.cleanup.semaphore)
2241 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2242 if (status != STATUS_SUCCESS) goto skip_cleanup;
2244 if (instance.cleanup.event)
2246 status = NtSetEvent( instance.cleanup.event, NULL );
2247 if (status != STATUS_SUCCESS) goto skip_cleanup;
2249 if (instance.cleanup.library)
2251 LdrUnloadDll( instance.cleanup.library );
2254 skip_cleanup:
2255 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2256 RtlEnterCriticalSection( &pool->cs );
2258 /* Simple callbacks are automatically shutdown after execution. */
2259 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2261 tp_object_prepare_shutdown( object );
2262 object->shutdown = TRUE;
2265 object->num_running_callbacks--;
2266 if (object_is_finished( object, TRUE ))
2267 RtlWakeAllConditionVariable( &object->group_finished_event );
2269 if (instance.associated)
2271 object->num_associated_callbacks--;
2272 if (object_is_finished( object, FALSE ))
2273 RtlWakeAllConditionVariable( &object->finished_event );
2277 /***********************************************************************
2278 * threadpool_worker_proc (internal)
2280 static void CALLBACK threadpool_worker_proc( void *param )
2282 struct threadpool *pool = param;
2283 LARGE_INTEGER timeout;
2284 struct list *ptr;
2286 TRACE( "starting worker thread for pool %p\n", pool );
2288 RtlEnterCriticalSection( &pool->cs );
2289 for (;;)
2291 while ((ptr = threadpool_get_next_item( pool )))
2293 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2294 assert( object->num_pending_callbacks > 0 );
2296 /* If further pending callbacks are queued, move the work item to
2297 * the end of the pool list. Otherwise remove it from the pool. */
2298 list_remove( &object->pool_entry );
2299 if (object->num_pending_callbacks > 1)
2300 tp_object_prio_queue( object );
2302 tp_object_execute( object, FALSE );
2304 assert(pool->num_busy_workers);
2305 pool->num_busy_workers--;
2307 tp_object_release( object );
2310 /* Shutdown worker thread if requested. */
2311 if (pool->shutdown)
2312 break;
2314 /* Wait for new tasks or until the timeout expires. A thread only terminates
2315 * when no new tasks are available, and the number of threads can be
2316 * decreased without violating the min_workers limit. An exception is when
2317 * min_workers == 0, then objcount is used to detect if the last thread
2318 * can be terminated. */
2319 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2320 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2321 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2322 (!pool->min_workers && !pool->objcount)))
2324 break;
2327 pool->num_workers--;
2328 RtlLeaveCriticalSection( &pool->cs );
2330 TRACE( "terminating worker thread for pool %p\n", pool );
2331 tp_threadpool_release( pool );
2332 RtlExitUserThread( 0 );
2335 /***********************************************************************
2336 * TpAllocCleanupGroup (NTDLL.@)
2338 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2340 TRACE( "%p\n", out );
2342 return tp_group_alloc( (struct threadpool_group **)out );
2345 /***********************************************************************
2346 * TpAllocIoCompletion (NTDLL.@)
2348 NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2349 void *userdata, TP_CALLBACK_ENVIRON *environment )
2351 struct threadpool_object *object;
2352 struct threadpool *pool;
2353 NTSTATUS status;
2355 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2357 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2358 return STATUS_NO_MEMORY;
2360 if ((status = tp_threadpool_lock( &pool, environment )))
2362 RtlFreeHeap( GetProcessHeap(), 0, object );
2363 return status;
2366 object->type = TP_OBJECT_TYPE_IO;
2367 object->u.io.callback = callback;
2368 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2370 tp_threadpool_unlock( pool );
2371 RtlFreeHeap( GetProcessHeap(), 0, object );
2372 return status;
2375 if ((status = tp_ioqueue_lock( object, file )))
2377 tp_threadpool_unlock( pool );
2378 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2379 RtlFreeHeap( GetProcessHeap(), 0, object );
2380 return status;
2383 tp_object_initialize( object, pool, userdata, environment );
2385 *out = (TP_IO *)object;
2386 return STATUS_SUCCESS;
2389 /***********************************************************************
2390 * TpAllocPool (NTDLL.@)
2392 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2394 TRACE( "%p %p\n", out, reserved );
2396 if (reserved)
2397 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2399 return tp_threadpool_alloc( (struct threadpool **)out );
2402 /***********************************************************************
2403 * TpAllocTimer (NTDLL.@)
2405 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2406 TP_CALLBACK_ENVIRON *environment )
2408 struct threadpool_object *object;
2409 struct threadpool *pool;
2410 NTSTATUS status;
2412 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2414 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2415 if (!object)
2416 return STATUS_NO_MEMORY;
2418 status = tp_threadpool_lock( &pool, environment );
2419 if (status)
2421 RtlFreeHeap( GetProcessHeap(), 0, object );
2422 return status;
2425 object->type = TP_OBJECT_TYPE_TIMER;
2426 object->u.timer.callback = callback;
2428 status = tp_timerqueue_lock( object );
2429 if (status)
2431 tp_threadpool_unlock( pool );
2432 RtlFreeHeap( GetProcessHeap(), 0, object );
2433 return status;
2436 tp_object_initialize( object, pool, userdata, environment );
2438 *out = (TP_TIMER *)object;
2439 return STATUS_SUCCESS;
2442 static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2443 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2445 struct threadpool_object *object;
2446 struct threadpool *pool;
2447 NTSTATUS status;
2449 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2450 if (!object)
2451 return STATUS_NO_MEMORY;
2453 status = tp_threadpool_lock( &pool, environment );
2454 if (status)
2456 RtlFreeHeap( GetProcessHeap(), 0, object );
2457 return status;
2460 object->type = TP_OBJECT_TYPE_WAIT;
2461 object->u.wait.callback = callback;
2462 object->u.wait.flags = flags;
2464 status = tp_waitqueue_lock( object );
2465 if (status)
2467 tp_threadpool_unlock( pool );
2468 RtlFreeHeap( GetProcessHeap(), 0, object );
2469 return status;
2472 tp_object_initialize( object, pool, userdata, environment );
2474 *out = (TP_WAIT *)object;
2475 return STATUS_SUCCESS;
2478 /***********************************************************************
2479 * TpAllocWait (NTDLL.@)
2481 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2482 TP_CALLBACK_ENVIRON *environment )
2484 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2485 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2488 /***********************************************************************
2489 * TpAllocWork (NTDLL.@)
2491 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2492 TP_CALLBACK_ENVIRON *environment )
2494 struct threadpool_object *object;
2495 struct threadpool *pool;
2496 NTSTATUS status;
2498 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2500 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2501 if (!object)
2502 return STATUS_NO_MEMORY;
2504 status = tp_threadpool_lock( &pool, environment );
2505 if (status)
2507 RtlFreeHeap( GetProcessHeap(), 0, object );
2508 return status;
2511 object->type = TP_OBJECT_TYPE_WORK;
2512 object->u.work.callback = callback;
2513 tp_object_initialize( object, pool, userdata, environment );
2515 *out = (TP_WORK *)object;
2516 return STATUS_SUCCESS;
2519 /***********************************************************************
2520 * TpCancelAsyncIoOperation (NTDLL.@)
2522 void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2524 struct threadpool_object *this = impl_from_TP_IO( io );
2526 TRACE( "%p\n", io );
2528 RtlEnterCriticalSection( &this->pool->cs );
2530 this->u.io.pending_count--;
2531 if (object_is_finished( this, TRUE ))
2532 RtlWakeAllConditionVariable( &this->group_finished_event );
2533 if (object_is_finished( this, FALSE ))
2534 RtlWakeAllConditionVariable( &this->finished_event );
2536 RtlLeaveCriticalSection( &this->pool->cs );
2539 /***********************************************************************
2540 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2542 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2544 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2546 TRACE( "%p %p\n", instance, crit );
2548 if (!this->cleanup.critical_section)
2549 this->cleanup.critical_section = crit;
2552 /***********************************************************************
2553 * TpCallbackMayRunLong (NTDLL.@)
2555 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2557 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2558 struct threadpool_object *object = this->object;
2559 struct threadpool *pool;
2560 NTSTATUS status = STATUS_SUCCESS;
2562 TRACE( "%p\n", instance );
2564 if (this->threadid != GetCurrentThreadId())
2566 ERR("called from wrong thread, ignoring\n");
2567 return STATUS_UNSUCCESSFUL; /* FIXME */
2570 if (this->may_run_long)
2571 return STATUS_SUCCESS;
2573 pool = object->pool;
2574 RtlEnterCriticalSection( &pool->cs );
2576 /* Start new worker threads if required. */
2577 if (pool->num_busy_workers >= pool->num_workers)
2579 if (pool->num_workers < pool->max_workers)
2581 status = tp_new_worker_thread( pool );
2583 else
2585 status = STATUS_TOO_MANY_THREADS;
2589 RtlLeaveCriticalSection( &pool->cs );
2590 this->may_run_long = TRUE;
2591 return status;
2594 /***********************************************************************
2595 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2597 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2599 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2601 TRACE( "%p %p\n", instance, mutex );
2603 if (!this->cleanup.mutex)
2604 this->cleanup.mutex = mutex;
2607 /***********************************************************************
2608 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2610 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2612 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2614 TRACE( "%p %p %u\n", instance, semaphore, count );
2616 if (!this->cleanup.semaphore)
2618 this->cleanup.semaphore = semaphore;
2619 this->cleanup.semaphore_count = count;
2623 /***********************************************************************
2624 * TpCallbackSetEventOnCompletion (NTDLL.@)
2626 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2628 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2630 TRACE( "%p %p\n", instance, event );
2632 if (!this->cleanup.event)
2633 this->cleanup.event = event;
2636 /***********************************************************************
2637 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2639 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2641 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2643 TRACE( "%p %p\n", instance, module );
2645 if (!this->cleanup.library)
2646 this->cleanup.library = module;
2649 /***********************************************************************
2650 * TpDisassociateCallback (NTDLL.@)
2652 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2654 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2655 struct threadpool_object *object = this->object;
2656 struct threadpool *pool;
2658 TRACE( "%p\n", instance );
2660 if (this->threadid != GetCurrentThreadId())
2662 ERR("called from wrong thread, ignoring\n");
2663 return;
2666 if (!this->associated)
2667 return;
2669 pool = object->pool;
2670 RtlEnterCriticalSection( &pool->cs );
2672 object->num_associated_callbacks--;
2673 if (object_is_finished( object, FALSE ))
2674 RtlWakeAllConditionVariable( &object->finished_event );
2676 RtlLeaveCriticalSection( &pool->cs );
2677 this->associated = FALSE;
2680 /***********************************************************************
2681 * TpIsTimerSet (NTDLL.@)
2683 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2685 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2687 TRACE( "%p\n", timer );
2689 return this->u.timer.timer_set;
2692 /***********************************************************************
2693 * TpPostWork (NTDLL.@)
2695 VOID WINAPI TpPostWork( TP_WORK *work )
2697 struct threadpool_object *this = impl_from_TP_WORK( work );
2699 TRACE( "%p\n", work );
2701 tp_object_submit( this, FALSE );
2704 /***********************************************************************
2705 * TpReleaseCleanupGroup (NTDLL.@)
2707 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2709 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2711 TRACE( "%p\n", group );
2713 tp_group_shutdown( this );
2714 tp_group_release( this );
2717 /***********************************************************************
2718 * TpReleaseCleanupGroupMembers (NTDLL.@)
2720 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2722 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2723 struct threadpool_object *object, *next;
2724 struct list members;
2726 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2728 RtlEnterCriticalSection( &this->cs );
2730 /* Unset group, increase references, and mark objects for shutdown */
2731 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2733 assert( object->group == this );
2734 assert( object->is_group_member );
2736 if (InterlockedIncrement( &object->refcount ) == 1)
2738 /* Object is basically already destroyed, but group reference
2739 * was not deleted yet. We can safely ignore this object. */
2740 InterlockedDecrement( &object->refcount );
2741 list_remove( &object->group_entry );
2742 object->is_group_member = FALSE;
2743 continue;
2746 object->is_group_member = FALSE;
2747 tp_object_prepare_shutdown( object );
2750 /* Move members to a new temporary list */
2751 list_init( &members );
2752 list_move_tail( &members, &this->members );
2754 RtlLeaveCriticalSection( &this->cs );
2756 /* Cancel pending callbacks if requested */
2757 if (cancel_pending)
2759 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2761 tp_object_cancel( object );
2765 /* Wait for remaining callbacks to finish */
2766 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2768 tp_object_wait( object, TRUE );
2770 if (!object->shutdown)
2772 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2773 if (cancel_pending && object->group_cancel_callback)
2775 TRACE( "executing group cancel callback %p(%p, %p)\n",
2776 object->group_cancel_callback, object->userdata, userdata );
2777 object->group_cancel_callback( object->userdata, userdata );
2778 TRACE( "callback %p returned\n", object->group_cancel_callback );
2781 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2782 tp_object_release( object );
2785 object->shutdown = TRUE;
2786 tp_object_release( object );
2790 /***********************************************************************
2791 * TpReleaseIoCompletion (NTDLL.@)
2793 void WINAPI TpReleaseIoCompletion( TP_IO *io )
2795 struct threadpool_object *this = impl_from_TP_IO( io );
2797 TRACE( "%p\n", io );
2799 tp_object_prepare_shutdown( this );
2800 this->shutdown = TRUE;
2801 tp_object_release( this );
2804 /***********************************************************************
2805 * TpReleasePool (NTDLL.@)
2807 VOID WINAPI TpReleasePool( TP_POOL *pool )
2809 struct threadpool *this = impl_from_TP_POOL( pool );
2811 TRACE( "%p\n", pool );
2813 tp_threadpool_shutdown( this );
2814 tp_threadpool_release( this );
2817 /***********************************************************************
2818 * TpReleaseTimer (NTDLL.@)
2820 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2822 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2824 TRACE( "%p\n", timer );
2826 tp_object_prepare_shutdown( this );
2827 this->shutdown = TRUE;
2828 tp_object_release( this );
2831 /***********************************************************************
2832 * TpReleaseWait (NTDLL.@)
2834 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2836 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2838 TRACE( "%p\n", wait );
2840 tp_object_prepare_shutdown( this );
2841 this->shutdown = TRUE;
2842 tp_object_release( this );
2845 /***********************************************************************
2846 * TpReleaseWork (NTDLL.@)
2848 VOID WINAPI TpReleaseWork( TP_WORK *work )
2850 struct threadpool_object *this = impl_from_TP_WORK( work );
2852 TRACE( "%p\n", work );
2854 tp_object_prepare_shutdown( this );
2855 this->shutdown = TRUE;
2856 tp_object_release( this );
2859 /***********************************************************************
2860 * TpSetPoolMaxThreads (NTDLL.@)
2862 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2864 struct threadpool *this = impl_from_TP_POOL( pool );
2866 TRACE( "%p %u\n", pool, maximum );
2868 RtlEnterCriticalSection( &this->cs );
2869 this->max_workers = max( maximum, 1 );
2870 this->min_workers = min( this->min_workers, this->max_workers );
2871 RtlLeaveCriticalSection( &this->cs );
2874 /***********************************************************************
2875 * TpSetPoolMinThreads (NTDLL.@)
2877 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2879 struct threadpool *this = impl_from_TP_POOL( pool );
2880 NTSTATUS status = STATUS_SUCCESS;
2882 TRACE( "%p %u\n", pool, minimum );
2884 RtlEnterCriticalSection( &this->cs );
2886 while (this->num_workers < minimum)
2888 status = tp_new_worker_thread( this );
2889 if (status != STATUS_SUCCESS)
2890 break;
2893 if (status == STATUS_SUCCESS)
2895 this->min_workers = minimum;
2896 this->max_workers = max( this->min_workers, this->max_workers );
2899 RtlLeaveCriticalSection( &this->cs );
2900 return !status;
2903 /***********************************************************************
2904 * TpSetTimer (NTDLL.@)
2906 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2908 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2909 struct threadpool_object *other_timer;
2910 BOOL submit_timer = FALSE;
2911 ULONGLONG timestamp;
2913 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2915 RtlEnterCriticalSection( &timerqueue.cs );
2917 assert( this->u.timer.timer_initialized );
2918 this->u.timer.timer_set = timeout != NULL;
2920 /* Convert relative timeout to absolute timestamp and handle a timeout
2921 * of zero, which means that the timer is submitted immediately. */
2922 if (timeout)
2924 timestamp = timeout->QuadPart;
2925 if ((LONGLONG)timestamp < 0)
2927 LARGE_INTEGER now;
2928 NtQuerySystemTime( &now );
2929 timestamp = now.QuadPart - timestamp;
2931 else if (!timestamp)
2933 if (!period)
2934 timeout = NULL;
2935 else
2937 LARGE_INTEGER now;
2938 NtQuerySystemTime( &now );
2939 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2941 submit_timer = TRUE;
2945 /* First remove existing timeout. */
2946 if (this->u.timer.timer_pending)
2948 list_remove( &this->u.timer.timer_entry );
2949 this->u.timer.timer_pending = FALSE;
2952 /* If the timer was enabled, then add it back to the queue. */
2953 if (timeout)
2955 this->u.timer.timeout = timestamp;
2956 this->u.timer.period = period;
2957 this->u.timer.window_length = window_length;
2959 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
2960 struct threadpool_object, u.timer.timer_entry )
2962 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
2963 if (this->u.timer.timeout < other_timer->u.timer.timeout)
2964 break;
2966 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
2968 /* Wake up the timer thread when the timeout has to be updated. */
2969 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
2970 RtlWakeAllConditionVariable( &timerqueue.update_event );
2972 this->u.timer.timer_pending = TRUE;
2975 RtlLeaveCriticalSection( &timerqueue.cs );
2977 if (submit_timer)
2978 tp_object_submit( this, FALSE );
2981 /***********************************************************************
2982 * TpSetWait (NTDLL.@)
2984 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
2986 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2987 ULONGLONG timestamp = MAXLONGLONG;
2989 TRACE( "%p %p %p\n", wait, handle, timeout );
2991 RtlEnterCriticalSection( &waitqueue.cs );
2993 assert( this->u.wait.bucket );
2994 this->u.wait.handle = handle;
2996 if (handle || this->u.wait.wait_pending)
2998 struct waitqueue_bucket *bucket = this->u.wait.bucket;
2999 list_remove( &this->u.wait.wait_entry );
3001 /* Convert relative timeout to absolute timestamp. */
3002 if (handle && timeout)
3004 timestamp = timeout->QuadPart;
3005 if ((LONGLONG)timestamp < 0)
3007 LARGE_INTEGER now;
3008 NtQuerySystemTime( &now );
3009 timestamp = now.QuadPart - timestamp;
3013 /* Add wait object back into one of the queues. */
3014 if (handle)
3016 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3017 this->u.wait.wait_pending = TRUE;
3018 this->u.wait.timeout = timestamp;
3020 else
3022 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3023 this->u.wait.wait_pending = FALSE;
3026 /* Wake up the wait queue thread. */
3027 NtSetEvent( bucket->update_event, NULL );
3030 RtlLeaveCriticalSection( &waitqueue.cs );
3033 /***********************************************************************
3034 * TpSimpleTryPost (NTDLL.@)
3036 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3037 TP_CALLBACK_ENVIRON *environment )
3039 struct threadpool_object *object;
3040 struct threadpool *pool;
3041 NTSTATUS status;
3043 TRACE( "%p %p %p\n", callback, userdata, environment );
3045 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3046 if (!object)
3047 return STATUS_NO_MEMORY;
3049 status = tp_threadpool_lock( &pool, environment );
3050 if (status)
3052 RtlFreeHeap( GetProcessHeap(), 0, object );
3053 return status;
3056 object->type = TP_OBJECT_TYPE_SIMPLE;
3057 object->u.simple.callback = callback;
3058 tp_object_initialize( object, pool, userdata, environment );
3060 return STATUS_SUCCESS;
3063 /***********************************************************************
3064 * TpStartAsyncIoOperation (NTDLL.@)
3066 void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3068 struct threadpool_object *this = impl_from_TP_IO( io );
3070 TRACE( "%p\n", io );
3072 RtlEnterCriticalSection( &this->pool->cs );
3074 this->u.io.pending_count++;
3076 RtlLeaveCriticalSection( &this->pool->cs );
3079 /***********************************************************************
3080 * TpWaitForIoCompletion (NTDLL.@)
3082 void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3084 struct threadpool_object *this = impl_from_TP_IO( io );
3086 TRACE( "%p %d\n", io, cancel_pending );
3088 if (cancel_pending)
3089 tp_object_cancel( this );
3090 tp_object_wait( this, FALSE );
3093 /***********************************************************************
3094 * TpWaitForTimer (NTDLL.@)
3096 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3098 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3100 TRACE( "%p %d\n", timer, cancel_pending );
3102 if (cancel_pending)
3103 tp_object_cancel( this );
3104 tp_object_wait( this, FALSE );
3107 /***********************************************************************
3108 * TpWaitForWait (NTDLL.@)
3110 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3112 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3114 TRACE( "%p %d\n", wait, cancel_pending );
3116 if (cancel_pending)
3117 tp_object_cancel( this );
3118 tp_object_wait( this, FALSE );
3121 /***********************************************************************
3122 * TpWaitForWork (NTDLL.@)
3124 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3126 struct threadpool_object *this = impl_from_TP_WORK( work );
3128 TRACE( "%p %u\n", work, cancel_pending );
3130 if (cancel_pending)
3131 tp_object_cancel( this );
3132 tp_object_wait( this, FALSE );
3135 /***********************************************************************
3136 * TpSetPoolStackInformation (NTDLL.@)
3138 NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3140 struct threadpool *this = impl_from_TP_POOL( pool );
3142 TRACE( "%p %p\n", pool, stack_info );
3144 if (!stack_info)
3145 return STATUS_INVALID_PARAMETER;
3147 RtlEnterCriticalSection( &this->cs );
3148 this->stack_info = *stack_info;
3149 RtlLeaveCriticalSection( &this->cs );
3151 return STATUS_SUCCESS;
3154 /***********************************************************************
3155 * TpQueryPoolStackInformation (NTDLL.@)
3157 NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3159 struct threadpool *this = impl_from_TP_POOL( pool );
3161 TRACE( "%p %p\n", pool, stack_info );
3163 if (!stack_info)
3164 return STATUS_INVALID_PARAMETER;
3166 RtlEnterCriticalSection( &this->cs );
3167 *stack_info = this->stack_info;
3168 RtlLeaveCriticalSection( &this->cs );
3170 return STATUS_SUCCESS;
3173 static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3175 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3176 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3179 /***********************************************************************
3180 * RtlRegisterWait (NTDLL.@)
3182 * Registers a wait for a handle to become signaled.
3184 * PARAMS
3185 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3186 * Object [I] Object to wait to become signaled.
3187 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3188 * Context [I] Context to pass to the callback function when it is executed.
3189 * Milliseconds [I] Number of milliseconds to wait before timing out.
3190 * Flags [I] Flags. See notes.
3192 * RETURNS
3193 * Success: STATUS_SUCCESS.
3194 * Failure: Any NTSTATUS code.
3196 * NOTES
3197 * Flags can be one or more of the following:
3198 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3199 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3200 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3201 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3202 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3204 NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3205 void *context, ULONG milliseconds, ULONG flags )
3207 struct threadpool_object *object;
3208 TP_CALLBACK_ENVIRON environment;
3209 LARGE_INTEGER timeout;
3210 NTSTATUS status;
3211 TP_WAIT *wait;
3213 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
3214 out, handle, callback, context, milliseconds, flags );
3216 memset( &environment, 0, sizeof(environment) );
3217 environment.Version = 1;
3218 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3219 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3221 flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3222 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3223 return status;
3225 object = impl_from_TP_WAIT(wait);
3226 object->u.wait.rtl_callback = callback;
3228 RtlEnterCriticalSection( &waitqueue.cs );
3229 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3231 *out = object;
3232 RtlLeaveCriticalSection( &waitqueue.cs );
3234 return STATUS_SUCCESS;
3237 /***********************************************************************
3238 * RtlDeregisterWaitEx (NTDLL.@)
3240 * Cancels a wait operation and frees the resources associated with calling
3241 * RtlRegisterWait().
3243 * PARAMS
3244 * WaitObject [I] Handle to the wait object to free.
3246 * RETURNS
3247 * Success: STATUS_SUCCESS.
3248 * Failure: Any NTSTATUS code.
3250 NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3252 struct threadpool_object *object = handle;
3253 NTSTATUS status;
3255 TRACE( "handle %p, event %p\n", handle, event );
3257 if (!object) return STATUS_INVALID_HANDLE;
3259 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3261 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3262 else
3264 assert( object->completed_event == NULL );
3265 object->completed_event = event;
3268 RtlEnterCriticalSection( &object->pool->cs );
3269 if (object->num_pending_callbacks + object->num_running_callbacks
3270 + object->num_associated_callbacks) status = STATUS_PENDING;
3271 else status = STATUS_SUCCESS;
3272 RtlLeaveCriticalSection( &object->pool->cs );
3274 TpReleaseWait( (TP_WAIT *)object );
3275 return status;
3278 /***********************************************************************
3279 * RtlDeregisterWait (NTDLL.@)
3281 * Cancels a wait operation and frees the resources associated with calling
3282 * RtlRegisterWait().
3284 * PARAMS
3285 * WaitObject [I] Handle to the wait object to free.
3287 * RETURNS
3288 * Success: STATUS_SUCCESS.
3289 * Failure: Any NTSTATUS code.
3291 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3293 return RtlDeregisterWaitEx(WaitHandle, NULL);