Cosmetic: Copyright years were updated
[ode.git] / ode / src / threading_pool_posix.cpp
blob8447ff0d0583e327dc538f628d5235e93c0b5a7a
1 /*************************************************************************
2 * *
3 * Open Dynamics Engine, Copyright (C) 2001-2003 Russell L. Smith. *
4 * All rights reserved. Email: russ@q12.org Web: www.q12.org *
5 * *
6 * Threading POSIX thread pool implementation file. *
7 * Copyright (C) 2011-2024 Oleh Derevenko. All rights reserved. *
8 * e-mail: odar@eleks.com (change all "a" to "e") *
9 * *
10 * This library is free software; you can redistribute it and/or *
11 * modify it under the terms of EITHER: *
12 * (1) The GNU Lesser General Public License as published by the Free *
13 * Software Foundation; either version 2.1 of the License, or (at *
14 * your option) any later version. The text of the GNU Lesser *
15 * General Public License is included with this library in the *
16 * file LICENSE.TXT. *
17 * (2) The BSD-style license that is included with this library in *
18 * the file LICENSE-BSD.TXT. *
19 * *
20 * This library is distributed in the hope that it will be useful, *
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the files *
23 * LICENSE.TXT and LICENSE-BSD.TXT for more details. *
24 * *
25 *************************************************************************/
28 * POSIX thread pool implementation for built-in threading support provider.
32 #if !defined(_WIN32)
34 #include <ode/odeconfig.h>
35 #include <ode/error.h>
36 #include <ode/threading_impl.h>
37 #include <ode/odeinit.h>
38 #include "config.h"
39 #include "objects.h"
40 #include "threading_impl_templates.h"
43 #if dBUILTIN_THREADING_IMPL_ENABLED
45 #include <new>
46 #include <pthread.h>
47 #include <signal.h>
48 #include <errno.h>
50 #if !defined(EOK)
51 #define EOK 0
52 #endif
55 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
58 #if dBUILTIN_THREADING_IMPL_ENABLED
60 struct dxEventObject
62 public:
63 dxEventObject(): m_event_allocated(false), m_event_manual(false), m_event_value(false) {}
64 ~dxEventObject() { FinalizeObject(); }
66 bool InitializeObject(bool manual_reset, bool initial_state);
67 void FinalizeObject();
69 // WARNING! To make implementation simpler, event only releases a single thread even for manual mode.
70 bool WaitInfinitely();
71 void SetEvent();
72 void ResetEvent();
74 private:
75 bool m_event_allocated;
76 bool m_event_manual;
77 bool m_event_value;
78 pthread_mutex_t m_event_mutex;
79 pthread_cond_t m_event_cond;
82 bool dxEventObject::InitializeObject(bool manual_reset, bool initial_state)
84 dIASSERT(!m_event_allocated);
86 bool result = false;
88 bool cond_allocated = false;
90 do
92 int cond_result = pthread_cond_init(&m_event_cond, NULL);
93 if (cond_result != EOK)
95 errno = cond_result;
96 break;
99 cond_allocated = true;
101 int mutex_result = pthread_mutex_init(&m_event_mutex, NULL);
102 if (mutex_result != EOK)
104 errno = mutex_result;
105 break;
108 m_event_manual = manual_reset;
109 m_event_value = initial_state;
110 m_event_allocated = true;
111 result = true;
113 while (false);
115 if (!result)
117 if (cond_allocated)
119 int cond_destroy_result = pthread_cond_destroy(&m_event_cond);
120 dIVERIFY(cond_destroy_result == EOK);
124 return result;
127 void dxEventObject::FinalizeObject()
129 if (m_event_allocated)
131 int mutex_destroy_result = pthread_mutex_destroy(&m_event_mutex);
132 dICHECK(mutex_destroy_result == EOK); // Why would mutex be unable to be destroyed?
134 int cond_destroy_result = pthread_cond_destroy(&m_event_cond);
135 dICHECK(cond_destroy_result == EOK); // Why would condvar be unable to be destroyed?
137 m_event_allocated = false;
141 bool dxEventObject::WaitInfinitely()
143 bool result = false;
145 int lock_result = pthread_mutex_lock(&m_event_mutex);
146 dICHECK(lock_result == EOK);
148 int wait_result = EOK;
149 if (!m_event_value)
151 wait_result = pthread_cond_wait(&m_event_cond, &m_event_mutex);
152 dICHECK(wait_result != EINTR); // Would caller be so kind to disable signal handling for thread for duration of the call to ODE at least?
155 if (wait_result == EOK)
157 dIASSERT(m_event_value);
159 if (!m_event_manual)
161 m_event_value = false;
164 result = true;
167 int unlock_result = pthread_mutex_unlock(&m_event_mutex);
168 dICHECK(unlock_result == EOK);
170 return result;
173 void dxEventObject::SetEvent()
175 int lock_result = pthread_mutex_lock(&m_event_mutex);
176 dICHECK(lock_result == EOK);
178 if (!m_event_value)
180 m_event_value = true;
182 // NOTE! Event only releases a single thread even for manual mode to simplify implementation.
183 int signal_result = pthread_cond_signal(&m_event_cond);
184 dICHECK(signal_result == EOK);
187 int unlock_result = pthread_mutex_unlock(&m_event_mutex);
188 dICHECK(unlock_result == EOK);
191 void dxEventObject::ResetEvent()
193 int lock_result = pthread_mutex_lock(&m_event_mutex);
194 dICHECK(lock_result == EOK);
196 m_event_value = false;
198 int unlock_result = pthread_mutex_unlock(&m_event_mutex);
199 dICHECK(unlock_result == EOK);
203 struct dxThreadPoolThreadInfo
205 public:
206 dxThreadPoolThreadInfo();
207 ~dxThreadPoolThreadInfo();
209 bool Initialize(sizeint stack_size, unsigned int ode_data_allocate_flags);
211 private:
212 bool InitializeThreadAttributes(pthread_attr_t *thread_attr, sizeint stack_size);
213 void FinalizeThreadAttributes(pthread_attr_t *thread_attr);
214 bool WaitInitStatus();
216 private:
217 void Finalize();
218 void WaitAndCloseThreadHandle(pthread_t thread_handle);
220 public:
221 enum dxTHREADCOMMAND
223 dxTHREAD_COMMAND_EXIT,
224 dxTHREAD_COMMAND_NOOP,
225 dxTHREAD_COMMAND_SERVE_IMPLEMENTATION,
228 struct dxServeImplementationParams
230 dxServeImplementationParams(dThreadingImplementationID impl, dxEventObject *ready_wait_event):
231 m_impl(impl), m_ready_wait_event(ready_wait_event)
235 dThreadingImplementationID m_impl;
236 dxEventObject *m_ready_wait_event;
239 void ExecuteThreadCommand(dxTHREADCOMMAND command, void *param, bool wait_response);
241 private:
242 static void *ThreadProcedure_Callback(void *thread_param);
243 void ThreadProcedure();
244 bool DisableSignalHandlers();
245 void ReportInitStatus(bool init_result);
246 void RunCommandHandlingLoop();
248 void ThreadedServeImplementation(dThreadingImplementationID impl, dxEventObject *ready_wait_event);
249 static void ProcessThreadServeReadiness_Callback(void *context);
251 private:
252 pthread_t m_thread_handle;
253 bool m_thread_allocated;
255 unsigned int m_ode_data_allocate_flags;
256 dxTHREADCOMMAND m_command_code;
257 dxEventObject m_command_event;
258 dxEventObject m_acknowledgement_event;
259 void *m_command_param;
263 dxThreadPoolThreadInfo::dxThreadPoolThreadInfo():
264 m_thread_handle(),
265 m_thread_allocated(false),
266 m_ode_data_allocate_flags(0),
267 m_command_code(dxTHREAD_COMMAND_EXIT),
268 m_command_event(),
269 m_acknowledgement_event(),
270 m_command_param(NULL)
274 dxThreadPoolThreadInfo::~dxThreadPoolThreadInfo()
276 Finalize();
280 bool dxThreadPoolThreadInfo::Initialize(sizeint stack_size, unsigned int ode_data_allocate_flags)
282 bool result = false;
284 bool command_event_allocated = false, acknowledgement_event_allocated = false;
288 // -- There is no implicit limit on stack size in POSIX implementation
289 // if (stack_size > ...)
290 // {
291 // errno = EINVAL;
292 // break;
293 // }
295 if (!m_command_event.InitializeObject(false, false))
297 break;
300 command_event_allocated = true;
302 if (!m_acknowledgement_event.InitializeObject(true, false))
304 break;
307 acknowledgement_event_allocated = true;
309 m_ode_data_allocate_flags = ode_data_allocate_flags;
311 pthread_attr_t thread_attr;
312 if (!InitializeThreadAttributes(&thread_attr, stack_size))
314 break;
317 int thread_create_result = pthread_create(&m_thread_handle, &thread_attr, &ThreadProcedure_Callback, (void *)this);
319 FinalizeThreadAttributes(&thread_attr);
321 if (thread_create_result != EOK)
323 errno = thread_create_result;
324 break;
327 bool thread_init_result = WaitInitStatus();
328 if (!thread_init_result)
330 WaitAndCloseThreadHandle(m_thread_handle);
331 break;
334 m_thread_allocated = true;
335 result = true;
337 while (false);
339 if (!result)
341 if (command_event_allocated)
343 if (acknowledgement_event_allocated)
345 m_acknowledgement_event.FinalizeObject();
348 m_command_event.FinalizeObject();
352 return result;
355 bool dxThreadPoolThreadInfo::InitializeThreadAttributes(pthread_attr_t *thread_attr, sizeint stack_size)
357 bool result = false;
359 bool attr_inited = false;
363 int init_result = pthread_attr_init(thread_attr);
364 if (init_result != EOK)
366 errno = init_result;
367 break;
370 attr_inited = true;
372 int set_result;
373 if ((set_result = pthread_attr_setdetachstate(thread_attr, PTHREAD_CREATE_JOINABLE)) != EOK
374 #if (HAVE_PTHREAD_ATTR_SETINHERITSCHED)
375 || (set_result = pthread_attr_setinheritsched(thread_attr, PTHREAD_INHERIT_SCHED)) != EOK
376 #endif
377 #if (HAVE_PTHREAD_ATTR_SETSTACKLAZY)
378 || (set_result = pthread_attr_setstacklazy(thread_attr, PTHREAD_STACK_NOTLAZY)) != EOK
379 #endif
380 || (stack_size != 0 && (set_result = pthread_attr_setstacksize(thread_attr, stack_size)) != EOK))
382 errno = set_result;
383 break;
386 result = true;
388 while (false);
390 if (!result)
392 if (attr_inited)
394 int destroy_result = pthread_attr_destroy(thread_attr);
395 dIVERIFY(destroy_result == EOK);
399 return result;
402 void dxThreadPoolThreadInfo::FinalizeThreadAttributes(pthread_attr_t *thread_attr)
404 int destroy_result = pthread_attr_destroy(thread_attr);
405 dIVERIFY(destroy_result == EOK);
408 bool dxThreadPoolThreadInfo::WaitInitStatus()
410 bool acknowledgement_wait_result = m_acknowledgement_event.WaitInfinitely();
411 dICHECK(acknowledgement_wait_result);
413 int error_code = (int)(sizeint)m_command_param;
415 bool init_status = error_code == EOK ? true : ((errno = error_code), false);
416 return init_status;
419 void dxThreadPoolThreadInfo::Finalize()
421 if (m_thread_allocated)
423 ExecuteThreadCommand(dxTHREAD_COMMAND_EXIT, NULL, false);
425 WaitAndCloseThreadHandle(m_thread_handle);
426 m_thread_allocated = false;
428 m_command_event.FinalizeObject();
429 m_acknowledgement_event.FinalizeObject();
433 void dxThreadPoolThreadInfo::WaitAndCloseThreadHandle(pthread_t thread_handle)
435 int join_result = pthread_join(thread_handle, NULL);
436 dICHECK(join_result == EOK);
439 void dxThreadPoolThreadInfo::ExecuteThreadCommand(dxTHREADCOMMAND command, void *param, bool wait_response)
441 bool acknowledgement_wait_result = m_acknowledgement_event.WaitInfinitely();
442 dICHECK(acknowledgement_wait_result);
444 m_acknowledgement_event.ResetEvent();
446 m_command_code = command;
447 m_command_param = param;
449 m_command_event.SetEvent();
451 if (wait_response)
453 bool new_acknowledgement_wait_result = m_acknowledgement_event.WaitInfinitely();
454 dICHECK(new_acknowledgement_wait_result);
458 void *dxThreadPoolThreadInfo::ThreadProcedure_Callback(void *thread_param)
460 dxThreadPoolThreadInfo *thread_info = (dxThreadPoolThreadInfo *)thread_param;
461 thread_info->ThreadProcedure();
463 return 0;
466 void dxThreadPoolThreadInfo::ThreadProcedure()
468 bool init_result = dAllocateODEDataForThread(m_ode_data_allocate_flags) != 0
469 && DisableSignalHandlers();
471 ReportInitStatus(init_result);
473 if (init_result)
475 RunCommandHandlingLoop();
477 // dCleanupODEAllDataForThread(); -- this function can only be called if ODE was initialized for manual cleanup. And that is unknown here...
481 bool dxThreadPoolThreadInfo::DisableSignalHandlers()
483 bool result = false;
485 sigset_t set;
486 sigfillset( &set );
488 if (sigprocmask( SIG_BLOCK, &set, NULL ) != -1)
490 result = true;
493 return result;
496 void dxThreadPoolThreadInfo::ReportInitStatus(bool init_result)
498 m_command_param = (void *)(sizeint)(init_result ? EOK : ((errno != EOK) ? errno : EFAULT));
500 m_acknowledgement_event.SetEvent();
503 void dxThreadPoolThreadInfo::RunCommandHandlingLoop()
505 bool exit_requested = false;
507 while (!exit_requested)
509 bool command_wait_result = m_command_event.WaitInfinitely();
510 dICHECK(command_wait_result);
512 const dxTHREADCOMMAND command_code = m_command_code;
513 switch (command_code)
515 case dxTHREAD_COMMAND_EXIT:
517 m_acknowledgement_event.SetEvent();
519 exit_requested = true;
520 break;
523 default:
525 dIASSERT(false);
526 // break; -- proceed to case dxTHREAD_COMMAND_NOOP
529 case dxTHREAD_COMMAND_NOOP:
531 m_acknowledgement_event.SetEvent();
533 // Do nothing
534 break;
537 case dxTHREAD_COMMAND_SERVE_IMPLEMENTATION:
539 const dxServeImplementationParams *serve_params = (const dxServeImplementationParams *)m_command_param;
540 dThreadingImplementationID impl = serve_params->m_impl;
541 dxEventObject *ready_wait_event = serve_params->m_ready_wait_event;
543 m_acknowledgement_event.SetEvent();
545 ThreadedServeImplementation(impl, ready_wait_event);
546 break;
552 void dxThreadPoolThreadInfo::ThreadedServeImplementation(dThreadingImplementationID impl, dxEventObject *ready_wait_event)
554 ((dxIThreadingImplementation *)impl)->StickToJobsProcessing(&ProcessThreadServeReadiness_Callback, (void *)ready_wait_event);
557 void dxThreadPoolThreadInfo::ProcessThreadServeReadiness_Callback(void *context)
559 dxEventObject *ready_wait_event = (dxEventObject *)context;
561 ready_wait_event->SetEvent();
566 struct dxThreadingThreadPool:
567 public dBase
569 public:
570 dxThreadingThreadPool();
571 ~dxThreadingThreadPool();
573 bool InitializeThreads(sizeint thread_count, sizeint stack_size, unsigned int ode_data_allocate_flags);
575 private:
576 void FinalizeThreads();
578 bool InitializeIndividualThreadInfos(dxThreadPoolThreadInfo *thread_infos, sizeint thread_count, sizeint stack_size, unsigned int ode_data_allocate_flags);
579 void FinalizeIndividualThreadInfos(dxThreadPoolThreadInfo *thread_infos, sizeint thread_count);
581 bool InitializeSingleThreadInfo(dxThreadPoolThreadInfo *thread_info, sizeint stack_size, unsigned int ode_data_allocate_flags);
582 void FinalizeSingleThreadInfo(dxThreadPoolThreadInfo *thread_info);
584 public:
585 void ServeThreadingImplementation(dThreadingImplementationID impl);
586 void WaitIdleState();
588 private:
589 dxThreadPoolThreadInfo *m_thread_infos;
590 sizeint m_thread_count;
591 dxEventObject m_ready_wait_event;
595 dxThreadingThreadPool::dxThreadingThreadPool():
596 m_thread_infos(NULL),
597 m_thread_count(0),
598 m_ready_wait_event()
602 dxThreadingThreadPool::~dxThreadingThreadPool()
604 FinalizeThreads();
608 bool dxThreadingThreadPool::InitializeThreads(sizeint thread_count, sizeint stack_size, unsigned int ode_data_allocate_flags)
610 dIASSERT(m_thread_infos == NULL);
612 bool result = false;
614 bool wait_event_allocated = false;
616 dxThreadPoolThreadInfo *thread_infos = NULL;
617 bool thread_infos_allocated = false;
621 if (!m_ready_wait_event.InitializeObject(false, false))
623 break;
626 wait_event_allocated = true;
628 thread_infos = (dxThreadPoolThreadInfo *)dAlloc(thread_count * sizeof(dxThreadPoolThreadInfo));
629 if (thread_infos == NULL)
631 break;
634 thread_infos_allocated = true;
636 if (!InitializeIndividualThreadInfos(thread_infos, thread_count, stack_size, ode_data_allocate_flags))
638 break;
641 m_thread_infos = thread_infos;
642 m_thread_count = thread_count;
643 result = true;
645 while (false);
647 if (!result)
649 if (wait_event_allocated)
651 if (thread_infos_allocated)
653 dFree(thread_infos, thread_count * sizeof(dxThreadPoolThreadInfo));
656 m_ready_wait_event.FinalizeObject();
660 return result;
663 void dxThreadingThreadPool::FinalizeThreads()
665 dxThreadPoolThreadInfo *thread_infos = m_thread_infos;
666 if (thread_infos != NULL)
668 sizeint thread_count = m_thread_count;
670 FinalizeIndividualThreadInfos(thread_infos, thread_count);
671 dFree(thread_infos, thread_count * sizeof(dxThreadPoolThreadInfo));
673 m_ready_wait_event.FinalizeObject();
678 bool dxThreadingThreadPool::InitializeIndividualThreadInfos(dxThreadPoolThreadInfo *thread_infos, sizeint thread_count, sizeint stack_size, unsigned int ode_data_allocate_flags)
680 bool any_fault = false;
682 dxThreadPoolThreadInfo *const infos_end = thread_infos + thread_count;
683 for (dxThreadPoolThreadInfo *current_info = thread_infos; current_info != infos_end; ++current_info)
685 if (!InitializeSingleThreadInfo(current_info, stack_size, ode_data_allocate_flags))
687 FinalizeIndividualThreadInfos(thread_infos, current_info - thread_infos);
689 any_fault = true;
690 break;
694 bool result = !any_fault;
695 return result;
698 void dxThreadingThreadPool::FinalizeIndividualThreadInfos(dxThreadPoolThreadInfo *thread_infos, sizeint thread_count)
700 dxThreadPoolThreadInfo *const infos_end = thread_infos + thread_count;
701 for (dxThreadPoolThreadInfo *current_info = thread_infos; current_info != infos_end; ++current_info)
703 FinalizeSingleThreadInfo(current_info);
708 bool dxThreadingThreadPool::InitializeSingleThreadInfo(dxThreadPoolThreadInfo *thread_info, sizeint stack_size, unsigned int ode_data_allocate_flags)
710 bool result = false;
712 new(thread_info) dxThreadPoolThreadInfo();
714 if (thread_info->Initialize(stack_size, ode_data_allocate_flags))
716 result = true;
718 else
720 thread_info->dxThreadPoolThreadInfo::~dxThreadPoolThreadInfo();
723 return result;
726 void dxThreadingThreadPool::FinalizeSingleThreadInfo(dxThreadPoolThreadInfo *thread_info)
728 if (thread_info != NULL)
730 thread_info->dxThreadPoolThreadInfo::~dxThreadPoolThreadInfo();
735 void dxThreadingThreadPool::ServeThreadingImplementation(dThreadingImplementationID impl)
737 dxThreadPoolThreadInfo::dxServeImplementationParams params(impl, &m_ready_wait_event);
739 dxThreadPoolThreadInfo *const infos_end = m_thread_infos + m_thread_count;
740 for (dxThreadPoolThreadInfo *current_info = m_thread_infos; current_info != infos_end; ++current_info)
742 current_info->ExecuteThreadCommand(dxThreadPoolThreadInfo::dxTHREAD_COMMAND_SERVE_IMPLEMENTATION, &params, true);
744 bool ready_wait_result = m_ready_wait_event.WaitInfinitely();
745 dICHECK(ready_wait_result);
749 void dxThreadingThreadPool::WaitIdleState()
751 dxThreadPoolThreadInfo *const infos_end = m_thread_infos + m_thread_count;
752 for (dxThreadPoolThreadInfo *current_info = m_thread_infos; current_info != infos_end; ++current_info)
754 current_info->ExecuteThreadCommand(dxThreadPoolThreadInfo::dxTHREAD_COMMAND_NOOP, NULL, true);
759 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
762 /*extern */dThreadingThreadPoolID dThreadingAllocateThreadPool(unsigned thread_count,
763 sizeint stack_size, unsigned int ode_data_allocate_flags, void *reserved/*=NULL*/)
765 dAASSERT(thread_count != 0);
767 #if dBUILTIN_THREADING_IMPL_ENABLED
768 dxThreadingThreadPool *thread_pool = new dxThreadingThreadPool();
769 if (thread_pool != NULL)
771 if (thread_pool->InitializeThreads(thread_count, stack_size, ode_data_allocate_flags))
773 // do nothing
775 else
777 delete thread_pool;
778 thread_pool = NULL;
781 #else
782 dThreadingThreadPoolID thread_pool = NULL;
783 (void)stack_size; // unused
784 (void)ode_data_allocate_flags; // unused
785 (void)reserved; // unused
786 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
788 return (dThreadingThreadPoolID)thread_pool;
791 /*extern */void dThreadingThreadPoolServeMultiThreadedImplementation(dThreadingThreadPoolID pool, dThreadingImplementationID impl)
793 #if dBUILTIN_THREADING_IMPL_ENABLED
794 dxThreadingThreadPool *thread_pool = (dxThreadingThreadPool *)pool;
795 thread_pool->ServeThreadingImplementation(impl);
796 #else
797 (void)pool; // unused
798 (void)impl; // unused
799 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
802 /*extern */void dThreadingThreadPoolWaitIdleState(dThreadingThreadPoolID pool)
804 #if dBUILTIN_THREADING_IMPL_ENABLED
805 dxThreadingThreadPool *thread_pool = (dxThreadingThreadPool *)pool;
806 thread_pool->WaitIdleState();
807 #else
808 (void)pool; // unused
809 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
812 /*extern */void dThreadingFreeThreadPool(dThreadingThreadPoolID pool)
814 #if dBUILTIN_THREADING_IMPL_ENABLED
815 dxThreadingThreadPool *thread_pool = (dxThreadingThreadPool *)pool;
816 delete thread_pool;
817 #else
818 (void)pool; // unused
819 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
823 #endif // #if !defined(_WIN32)