3 Written by Robert Collins <rbtcollins@hotmail.com>
5 This file is part of Cygwin.
7 This software is a copyrighted work licensed under the terms of the
8 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
11 #ifdef __OUTSIDE_CYGWIN__
18 #include <sys/types.h>
20 #include "threaded_queue.h"
22 /*****************************************************************************/
26 queue_request::~queue_request ()
29 /*****************************************************************************/
33 threaded_queue::threaded_queue (const size_t initial_workers
)
37 _submitters_head (NULL
),
39 _requests_head (NULL
),
42 InitializeCriticalSection (&_queue_lock
);
44 // This semaphore's count is the number of requests on the queue.
45 // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
46 // multiplied by max. threads per process (2028?), which is (a few)
47 // more requests than could ever be pending with the current design.
49 _requests_sem
= CreateSemaphore (NULL
, // SECURITY_ATTRIBUTES
51 129792, // Maximum count
56 system_printf (("failed to create the request queue semaphore, "
62 create_workers (initial_workers
);
65 threaded_queue::~threaded_queue ()
70 debug_printf ("deleting all pending queue requests");
71 queue_request
*reqptr
= _requests_head
;
74 queue_request
*const ptr
= reqptr
;
75 reqptr
= reqptr
->_next
;
79 DeleteCriticalSection (&_queue_lock
);
81 (void) CloseHandle (_requests_sem
);
84 /* FIXME: return success or failure rather than quitting */
86 threaded_queue::add_submission_loop (queue_submission_loop
*const submitter
)
89 assert (submitter
->_queue
== this);
90 assert (!submitter
->_next
);
93 TInterlockedExchangePointer (&_submitters_head
, submitter
);
100 threaded_queue::start ()
102 EnterCriticalSection (&_queue_lock
);
103 const bool was_running
= _running
;
105 queue_submission_loop
*loopptr
= _submitters_head
;
106 LeaveCriticalSection (&_queue_lock
);
110 debug_printf ("starting all queue submission loops");
114 queue_submission_loop
*const ptr
= loopptr
;
115 loopptr
= loopptr
->_next
;
124 threaded_queue::stop ()
126 EnterCriticalSection (&_queue_lock
);
127 const bool was_running
= _running
;
129 queue_submission_loop
*loopptr
= _submitters_head
;
130 LeaveCriticalSection (&_queue_lock
);
134 debug_printf ("stopping all queue submission loops");
137 queue_submission_loop
*const ptr
= loopptr
;
138 loopptr
= loopptr
->_next
;
142 ReleaseSemaphore (_requests_sem
, _workers_count
, NULL
);
143 while (_workers_count
)
145 debug_printf (("waiting for worker threads to terminate: "
150 debug_printf ("all worker threads have terminated");
156 /* FIXME: return success or failure */
158 threaded_queue::add (queue_request
*const therequest
)
161 assert (!therequest
->_next
);
163 EnterCriticalSection (&_queue_lock
);
165 _requests_head
= therequest
;
168 /* Add to the queue end. */
169 queue_request
*reqptr
= _requests_head
;
170 for (; reqptr
->_next
; reqptr
= reqptr
->_next
)
173 assert (!reqptr
->_next
);
174 reqptr
->_next
= therequest
;
177 _requests_count
+= 1;
178 assert (_requests_count
> 0);
179 LeaveCriticalSection (&_queue_lock
);
181 (void) ReleaseSemaphore (_requests_sem
, 1, NULL
);
183 if (_workers_busy
>= _workers_count
)
186 system_printf ("All threads busy, added one (now %u)", _workers_count
);
190 /*static*/ DWORD WINAPI
191 threaded_queue::start_routine (const LPVOID lpParam
)
193 class threaded_queue
*const queue
= (class threaded_queue
*) lpParam
;
196 queue
->worker_loop ();
198 const long count
= InterlockedDecrement (&queue
->_workers_count
);
202 debug_printf ("worker loop has exited; thread about to terminate");
208 threaded_queue::create_workers (const size_t initial_workers
)
210 assert (initial_workers
> 0);
212 for (unsigned int i
= 0; i
< initial_workers
; i
++)
214 const long count
= InterlockedIncrement (&_workers_count
);
218 const HANDLE hThread
=
219 CreateThread (NULL
, 0, start_routine
, this, 0, &tid
);
223 system_printf ("failed to create thread, error = %u",
228 (void) CloseHandle (hThread
);
233 threaded_queue::worker_loop ()
237 const DWORD rc
= WaitForSingleObject (_requests_sem
, INFINITE
);
238 if (rc
== WAIT_FAILED
)
240 system_printf ("wait for request semaphore failed, error = %u",
244 assert (rc
== WAIT_OBJECT_0
);
246 EnterCriticalSection (&_queue_lock
);
249 LeaveCriticalSection (&_queue_lock
);
253 assert (_requests_head
);
254 queue_request
*const reqptr
= _requests_head
;
255 _requests_head
= reqptr
->_next
;
257 _requests_count
-= 1;
258 assert (_requests_count
>= 0);
259 LeaveCriticalSection (&_queue_lock
);
262 InterlockedIncrement (&_workers_busy
);
264 InterlockedDecrement (&_workers_busy
);
269 /*****************************************************************************/
271 /* queue_submission_loop */
273 queue_submission_loop::queue_submission_loop (threaded_queue
*const queue
,
274 const bool ninterruptible
)
276 _interrupt_event (NULL
),
278 _interruptible (ninterruptible
),
285 // verbose: debug_printf ("creating an interruptible processing thread");
287 _interrupt_event
= CreateEvent (NULL
, // SECURITY_ATTRIBUTES
289 FALSE
, // Initially non-signalled
292 if (!_interrupt_event
)
294 system_printf ("failed to create interrupt event, error = %u",
301 queue_submission_loop::~queue_submission_loop ()
305 if (_interrupt_event
)
306 (void) CloseHandle (_interrupt_event
);
308 (void) CloseHandle (_hThread
);
312 queue_submission_loop::start ()
316 const bool was_running
= _running
;
322 _hThread
= CreateThread (NULL
, 0, start_routine
, this, 0, &_tid
);
325 system_printf ("failed to create thread, error = %u",
335 queue_submission_loop::stop ()
337 assert (_hThread
&& _hThread
!= INVALID_HANDLE_VALUE
);
339 const bool was_running
= _running
;
347 assert (_interrupt_event
348 && _interrupt_event
!= INVALID_HANDLE_VALUE
);
350 SetEvent (_interrupt_event
);
352 if (WaitForSingleObject (_hThread
, 1000) == WAIT_TIMEOUT
)
354 system_printf (("request loop thread %u failed to shutdown "
355 "when asked politely: about to get heavy"),
358 if (!TerminateThread (_hThread
, 0))
360 system_printf (("failed to kill request loop thread %u"
362 _tid
, GetLastError ());
369 // FIXME: could wait to see if the request loop notices that
370 // the submission loop is no longer running and shuts down
373 debug_printf ("killing request loop thread %u", _tid
);
375 if (!TerminateThread (_hThread
, 0))
376 system_printf (("failed to kill request loop thread %u"
378 _tid
, GetLastError ());
385 /*static*/ DWORD WINAPI
386 queue_submission_loop::start_routine (const LPVOID lpParam
)
388 class queue_submission_loop
*const submission_loop
=
389 (class queue_submission_loop
*) lpParam
;
390 assert (submission_loop
);
392 submission_loop
->request_loop ();
394 debug_printf ("submission loop has exited; thread about to terminate");
396 submission_loop
->stop ();
401 /*****************************************************************************/
402 #endif /* __OUTSIDE_CYGWIN__ */