1 /*************************************************************************
3 * Open Dynamics Engine, Copyright (C) 2001-2003 Russell L. Smith. *
4 * All rights reserved. Email: russ@q12.org Web: www.q12.org *
6 * Threading POSIX thread pool implementation file. *
7 * Copyright (C) 2011-2024 Oleh Derevenko. All rights reserved. *
8 * e-mail: odar@eleks.com (change all "a" to "e") *
10 * This library is free software; you can redistribute it and/or *
11 * modify it under the terms of EITHER: *
12 * (1) The GNU Lesser General Public License as published by the Free *
13 * Software Foundation; either version 2.1 of the License, or (at *
14 * your option) any later version. The text of the GNU Lesser *
15 * General Public License is included with this library in the *
17 * (2) The BSD-style license that is included with this library in *
18 * the file LICENSE-BSD.TXT. *
20 * This library is distributed in the hope that it will be useful, *
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the files *
23 * LICENSE.TXT and LICENSE-BSD.TXT for more details. *
25 *************************************************************************/
28 * POSIX thread pool implementation for built-in threading support provider.
34 #include <ode/odeconfig.h>
35 #include <ode/error.h>
36 #include <ode/threading_impl.h>
37 #include <ode/odeinit.h>
40 #include "threading_impl_templates.h"
43 #if dBUILTIN_THREADING_IMPL_ENABLED
55 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
58 #if dBUILTIN_THREADING_IMPL_ENABLED
63 dxEventObject(): m_event_allocated(false), m_event_manual(false), m_event_value(false) {}
64 ~dxEventObject() { FinalizeObject(); }
66 bool InitializeObject(bool manual_reset
, bool initial_state
);
67 void FinalizeObject();
69 // WARNING! To make implementation simpler, event only releases a single thread even for manual mode.
70 bool WaitInfinitely();
75 bool m_event_allocated
;
78 pthread_mutex_t m_event_mutex
;
79 pthread_cond_t m_event_cond
;
82 bool dxEventObject::InitializeObject(bool manual_reset
, bool initial_state
)
84 dIASSERT(!m_event_allocated
);
88 bool cond_allocated
= false;
92 int cond_result
= pthread_cond_init(&m_event_cond
, NULL
);
93 if (cond_result
!= EOK
)
99 cond_allocated
= true;
101 int mutex_result
= pthread_mutex_init(&m_event_mutex
, NULL
);
102 if (mutex_result
!= EOK
)
104 errno
= mutex_result
;
108 m_event_manual
= manual_reset
;
109 m_event_value
= initial_state
;
110 m_event_allocated
= true;
119 int cond_destroy_result
= pthread_cond_destroy(&m_event_cond
);
120 dIVERIFY(cond_destroy_result
== EOK
);
127 void dxEventObject::FinalizeObject()
129 if (m_event_allocated
)
131 int mutex_destroy_result
= pthread_mutex_destroy(&m_event_mutex
);
132 dICHECK(mutex_destroy_result
== EOK
); // Why would mutex be unable to be destroyed?
134 int cond_destroy_result
= pthread_cond_destroy(&m_event_cond
);
135 dICHECK(cond_destroy_result
== EOK
); // Why would condvar be unable to be destroyed?
137 m_event_allocated
= false;
141 bool dxEventObject::WaitInfinitely()
145 int lock_result
= pthread_mutex_lock(&m_event_mutex
);
146 dICHECK(lock_result
== EOK
);
148 int wait_result
= EOK
;
151 wait_result
= pthread_cond_wait(&m_event_cond
, &m_event_mutex
);
152 dICHECK(wait_result
!= EINTR
); // Would caller be so kind to disable signal handling for thread for duration of the call to ODE at least?
155 if (wait_result
== EOK
)
157 dIASSERT(m_event_value
);
161 m_event_value
= false;
167 int unlock_result
= pthread_mutex_unlock(&m_event_mutex
);
168 dICHECK(unlock_result
== EOK
);
173 void dxEventObject::SetEvent()
175 int lock_result
= pthread_mutex_lock(&m_event_mutex
);
176 dICHECK(lock_result
== EOK
);
180 m_event_value
= true;
182 // NOTE! Event only releases a single thread even for manual mode to simplify implementation.
183 int signal_result
= pthread_cond_signal(&m_event_cond
);
184 dICHECK(signal_result
== EOK
);
187 int unlock_result
= pthread_mutex_unlock(&m_event_mutex
);
188 dICHECK(unlock_result
== EOK
);
191 void dxEventObject::ResetEvent()
193 int lock_result
= pthread_mutex_lock(&m_event_mutex
);
194 dICHECK(lock_result
== EOK
);
196 m_event_value
= false;
198 int unlock_result
= pthread_mutex_unlock(&m_event_mutex
);
199 dICHECK(unlock_result
== EOK
);
203 struct dxThreadPoolThreadInfo
206 dxThreadPoolThreadInfo();
207 ~dxThreadPoolThreadInfo();
209 bool Initialize(sizeint stack_size
, unsigned int ode_data_allocate_flags
);
212 bool InitializeThreadAttributes(pthread_attr_t
*thread_attr
, sizeint stack_size
);
213 void FinalizeThreadAttributes(pthread_attr_t
*thread_attr
);
214 bool WaitInitStatus();
218 void WaitAndCloseThreadHandle(pthread_t thread_handle
);
223 dxTHREAD_COMMAND_EXIT
,
224 dxTHREAD_COMMAND_NOOP
,
225 dxTHREAD_COMMAND_SERVE_IMPLEMENTATION
,
228 struct dxServeImplementationParams
230 dxServeImplementationParams(dThreadingImplementationID impl
, dxEventObject
*ready_wait_event
):
231 m_impl(impl
), m_ready_wait_event(ready_wait_event
)
235 dThreadingImplementationID m_impl
;
236 dxEventObject
*m_ready_wait_event
;
239 void ExecuteThreadCommand(dxTHREADCOMMAND command
, void *param
, bool wait_response
);
242 static void *ThreadProcedure_Callback(void *thread_param
);
243 void ThreadProcedure();
244 bool DisableSignalHandlers();
245 void ReportInitStatus(bool init_result
);
246 void RunCommandHandlingLoop();
248 void ThreadedServeImplementation(dThreadingImplementationID impl
, dxEventObject
*ready_wait_event
);
249 static void ProcessThreadServeReadiness_Callback(void *context
);
252 pthread_t m_thread_handle
;
253 bool m_thread_allocated
;
255 unsigned int m_ode_data_allocate_flags
;
256 dxTHREADCOMMAND m_command_code
;
257 dxEventObject m_command_event
;
258 dxEventObject m_acknowledgement_event
;
259 void *m_command_param
;
263 dxThreadPoolThreadInfo::dxThreadPoolThreadInfo():
265 m_thread_allocated(false),
266 m_ode_data_allocate_flags(0),
267 m_command_code(dxTHREAD_COMMAND_EXIT
),
269 m_acknowledgement_event(),
270 m_command_param(NULL
)
274 dxThreadPoolThreadInfo::~dxThreadPoolThreadInfo()
280 bool dxThreadPoolThreadInfo::Initialize(sizeint stack_size
, unsigned int ode_data_allocate_flags
)
284 bool command_event_allocated
= false, acknowledgement_event_allocated
= false;
288 // -- There is no implicit limit on stack size in POSIX implementation
289 // if (stack_size > ...)
295 if (!m_command_event
.InitializeObject(false, false))
300 command_event_allocated
= true;
302 if (!m_acknowledgement_event
.InitializeObject(true, false))
307 acknowledgement_event_allocated
= true;
309 m_ode_data_allocate_flags
= ode_data_allocate_flags
;
311 pthread_attr_t thread_attr
;
312 if (!InitializeThreadAttributes(&thread_attr
, stack_size
))
317 int thread_create_result
= pthread_create(&m_thread_handle
, &thread_attr
, &ThreadProcedure_Callback
, (void *)this);
319 FinalizeThreadAttributes(&thread_attr
);
321 if (thread_create_result
!= EOK
)
323 errno
= thread_create_result
;
327 bool thread_init_result
= WaitInitStatus();
328 if (!thread_init_result
)
330 WaitAndCloseThreadHandle(m_thread_handle
);
334 m_thread_allocated
= true;
341 if (command_event_allocated
)
343 if (acknowledgement_event_allocated
)
345 m_acknowledgement_event
.FinalizeObject();
348 m_command_event
.FinalizeObject();
355 bool dxThreadPoolThreadInfo::InitializeThreadAttributes(pthread_attr_t
*thread_attr
, sizeint stack_size
)
359 bool attr_inited
= false;
363 int init_result
= pthread_attr_init(thread_attr
);
364 if (init_result
!= EOK
)
373 if ((set_result
= pthread_attr_setdetachstate(thread_attr
, PTHREAD_CREATE_JOINABLE
)) != EOK
374 #if (HAVE_PTHREAD_ATTR_SETINHERITSCHED)
375 || (set_result
= pthread_attr_setinheritsched(thread_attr
, PTHREAD_INHERIT_SCHED
)) != EOK
377 #if (HAVE_PTHREAD_ATTR_SETSTACKLAZY)
378 || (set_result
= pthread_attr_setstacklazy(thread_attr
, PTHREAD_STACK_NOTLAZY
)) != EOK
380 || (stack_size
!= 0 && (set_result
= pthread_attr_setstacksize(thread_attr
, stack_size
)) != EOK
))
394 int destroy_result
= pthread_attr_destroy(thread_attr
);
395 dIVERIFY(destroy_result
== EOK
);
402 void dxThreadPoolThreadInfo::FinalizeThreadAttributes(pthread_attr_t
*thread_attr
)
404 int destroy_result
= pthread_attr_destroy(thread_attr
);
405 dIVERIFY(destroy_result
== EOK
);
408 bool dxThreadPoolThreadInfo::WaitInitStatus()
410 bool acknowledgement_wait_result
= m_acknowledgement_event
.WaitInfinitely();
411 dICHECK(acknowledgement_wait_result
);
413 int error_code
= (int)(sizeint
)m_command_param
;
415 bool init_status
= error_code
== EOK
? true : ((errno
= error_code
), false);
419 void dxThreadPoolThreadInfo::Finalize()
421 if (m_thread_allocated
)
423 ExecuteThreadCommand(dxTHREAD_COMMAND_EXIT
, NULL
, false);
425 WaitAndCloseThreadHandle(m_thread_handle
);
426 m_thread_allocated
= false;
428 m_command_event
.FinalizeObject();
429 m_acknowledgement_event
.FinalizeObject();
433 void dxThreadPoolThreadInfo::WaitAndCloseThreadHandle(pthread_t thread_handle
)
435 int join_result
= pthread_join(thread_handle
, NULL
);
436 dICHECK(join_result
== EOK
);
439 void dxThreadPoolThreadInfo::ExecuteThreadCommand(dxTHREADCOMMAND command
, void *param
, bool wait_response
)
441 bool acknowledgement_wait_result
= m_acknowledgement_event
.WaitInfinitely();
442 dICHECK(acknowledgement_wait_result
);
444 m_acknowledgement_event
.ResetEvent();
446 m_command_code
= command
;
447 m_command_param
= param
;
449 m_command_event
.SetEvent();
453 bool new_acknowledgement_wait_result
= m_acknowledgement_event
.WaitInfinitely();
454 dICHECK(new_acknowledgement_wait_result
);
458 void *dxThreadPoolThreadInfo::ThreadProcedure_Callback(void *thread_param
)
460 dxThreadPoolThreadInfo
*thread_info
= (dxThreadPoolThreadInfo
*)thread_param
;
461 thread_info
->ThreadProcedure();
466 void dxThreadPoolThreadInfo::ThreadProcedure()
468 bool init_result
= dAllocateODEDataForThread(m_ode_data_allocate_flags
) != 0
469 && DisableSignalHandlers();
471 ReportInitStatus(init_result
);
475 RunCommandHandlingLoop();
477 // dCleanupODEAllDataForThread(); -- this function can only be called if ODE was initialized for manual cleanup. And that is unknown here...
481 bool dxThreadPoolThreadInfo::DisableSignalHandlers()
488 if (sigprocmask( SIG_BLOCK
, &set
, NULL
) != -1)
496 void dxThreadPoolThreadInfo::ReportInitStatus(bool init_result
)
498 m_command_param
= (void *)(sizeint
)(init_result
? EOK
: ((errno
!= EOK
) ? errno
: EFAULT
));
500 m_acknowledgement_event
.SetEvent();
503 void dxThreadPoolThreadInfo::RunCommandHandlingLoop()
505 bool exit_requested
= false;
507 while (!exit_requested
)
509 bool command_wait_result
= m_command_event
.WaitInfinitely();
510 dICHECK(command_wait_result
);
512 const dxTHREADCOMMAND command_code
= m_command_code
;
513 switch (command_code
)
515 case dxTHREAD_COMMAND_EXIT
:
517 m_acknowledgement_event
.SetEvent();
519 exit_requested
= true;
526 // break; -- proceed to case dxTHREAD_COMMAND_NOOP
529 case dxTHREAD_COMMAND_NOOP
:
531 m_acknowledgement_event
.SetEvent();
537 case dxTHREAD_COMMAND_SERVE_IMPLEMENTATION
:
539 const dxServeImplementationParams
*serve_params
= (const dxServeImplementationParams
*)m_command_param
;
540 dThreadingImplementationID impl
= serve_params
->m_impl
;
541 dxEventObject
*ready_wait_event
= serve_params
->m_ready_wait_event
;
543 m_acknowledgement_event
.SetEvent();
545 ThreadedServeImplementation(impl
, ready_wait_event
);
552 void dxThreadPoolThreadInfo::ThreadedServeImplementation(dThreadingImplementationID impl
, dxEventObject
*ready_wait_event
)
554 ((dxIThreadingImplementation
*)impl
)->StickToJobsProcessing(&ProcessThreadServeReadiness_Callback
, (void *)ready_wait_event
);
557 void dxThreadPoolThreadInfo::ProcessThreadServeReadiness_Callback(void *context
)
559 dxEventObject
*ready_wait_event
= (dxEventObject
*)context
;
561 ready_wait_event
->SetEvent();
566 struct dxThreadingThreadPool
:
570 dxThreadingThreadPool();
571 ~dxThreadingThreadPool();
573 bool InitializeThreads(sizeint thread_count
, sizeint stack_size
, unsigned int ode_data_allocate_flags
);
576 void FinalizeThreads();
578 bool InitializeIndividualThreadInfos(dxThreadPoolThreadInfo
*thread_infos
, sizeint thread_count
, sizeint stack_size
, unsigned int ode_data_allocate_flags
);
579 void FinalizeIndividualThreadInfos(dxThreadPoolThreadInfo
*thread_infos
, sizeint thread_count
);
581 bool InitializeSingleThreadInfo(dxThreadPoolThreadInfo
*thread_info
, sizeint stack_size
, unsigned int ode_data_allocate_flags
);
582 void FinalizeSingleThreadInfo(dxThreadPoolThreadInfo
*thread_info
);
585 void ServeThreadingImplementation(dThreadingImplementationID impl
);
586 void WaitIdleState();
589 dxThreadPoolThreadInfo
*m_thread_infos
;
590 sizeint m_thread_count
;
591 dxEventObject m_ready_wait_event
;
595 dxThreadingThreadPool::dxThreadingThreadPool():
596 m_thread_infos(NULL
),
602 dxThreadingThreadPool::~dxThreadingThreadPool()
608 bool dxThreadingThreadPool::InitializeThreads(sizeint thread_count
, sizeint stack_size
, unsigned int ode_data_allocate_flags
)
610 dIASSERT(m_thread_infos
== NULL
);
614 bool wait_event_allocated
= false;
616 dxThreadPoolThreadInfo
*thread_infos
= NULL
;
617 bool thread_infos_allocated
= false;
621 if (!m_ready_wait_event
.InitializeObject(false, false))
626 wait_event_allocated
= true;
628 thread_infos
= (dxThreadPoolThreadInfo
*)dAlloc(thread_count
* sizeof(dxThreadPoolThreadInfo
));
629 if (thread_infos
== NULL
)
634 thread_infos_allocated
= true;
636 if (!InitializeIndividualThreadInfos(thread_infos
, thread_count
, stack_size
, ode_data_allocate_flags
))
641 m_thread_infos
= thread_infos
;
642 m_thread_count
= thread_count
;
649 if (wait_event_allocated
)
651 if (thread_infos_allocated
)
653 dFree(thread_infos
, thread_count
* sizeof(dxThreadPoolThreadInfo
));
656 m_ready_wait_event
.FinalizeObject();
663 void dxThreadingThreadPool::FinalizeThreads()
665 dxThreadPoolThreadInfo
*thread_infos
= m_thread_infos
;
666 if (thread_infos
!= NULL
)
668 sizeint thread_count
= m_thread_count
;
670 FinalizeIndividualThreadInfos(thread_infos
, thread_count
);
671 dFree(thread_infos
, thread_count
* sizeof(dxThreadPoolThreadInfo
));
673 m_ready_wait_event
.FinalizeObject();
678 bool dxThreadingThreadPool::InitializeIndividualThreadInfos(dxThreadPoolThreadInfo
*thread_infos
, sizeint thread_count
, sizeint stack_size
, unsigned int ode_data_allocate_flags
)
680 bool any_fault
= false;
682 dxThreadPoolThreadInfo
*const infos_end
= thread_infos
+ thread_count
;
683 for (dxThreadPoolThreadInfo
*current_info
= thread_infos
; current_info
!= infos_end
; ++current_info
)
685 if (!InitializeSingleThreadInfo(current_info
, stack_size
, ode_data_allocate_flags
))
687 FinalizeIndividualThreadInfos(thread_infos
, current_info
- thread_infos
);
694 bool result
= !any_fault
;
698 void dxThreadingThreadPool::FinalizeIndividualThreadInfos(dxThreadPoolThreadInfo
*thread_infos
, sizeint thread_count
)
700 dxThreadPoolThreadInfo
*const infos_end
= thread_infos
+ thread_count
;
701 for (dxThreadPoolThreadInfo
*current_info
= thread_infos
; current_info
!= infos_end
; ++current_info
)
703 FinalizeSingleThreadInfo(current_info
);
708 bool dxThreadingThreadPool::InitializeSingleThreadInfo(dxThreadPoolThreadInfo
*thread_info
, sizeint stack_size
, unsigned int ode_data_allocate_flags
)
712 new(thread_info
) dxThreadPoolThreadInfo();
714 if (thread_info
->Initialize(stack_size
, ode_data_allocate_flags
))
720 thread_info
->dxThreadPoolThreadInfo::~dxThreadPoolThreadInfo();
726 void dxThreadingThreadPool::FinalizeSingleThreadInfo(dxThreadPoolThreadInfo
*thread_info
)
728 if (thread_info
!= NULL
)
730 thread_info
->dxThreadPoolThreadInfo::~dxThreadPoolThreadInfo();
735 void dxThreadingThreadPool::ServeThreadingImplementation(dThreadingImplementationID impl
)
737 dxThreadPoolThreadInfo::dxServeImplementationParams
params(impl
, &m_ready_wait_event
);
739 dxThreadPoolThreadInfo
*const infos_end
= m_thread_infos
+ m_thread_count
;
740 for (dxThreadPoolThreadInfo
*current_info
= m_thread_infos
; current_info
!= infos_end
; ++current_info
)
742 current_info
->ExecuteThreadCommand(dxThreadPoolThreadInfo::dxTHREAD_COMMAND_SERVE_IMPLEMENTATION
, ¶ms
, true);
744 bool ready_wait_result
= m_ready_wait_event
.WaitInfinitely();
745 dICHECK(ready_wait_result
);
749 void dxThreadingThreadPool::WaitIdleState()
751 dxThreadPoolThreadInfo
*const infos_end
= m_thread_infos
+ m_thread_count
;
752 for (dxThreadPoolThreadInfo
*current_info
= m_thread_infos
; current_info
!= infos_end
; ++current_info
)
754 current_info
->ExecuteThreadCommand(dxThreadPoolThreadInfo::dxTHREAD_COMMAND_NOOP
, NULL
, true);
759 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
762 /*extern */dThreadingThreadPoolID
dThreadingAllocateThreadPool(unsigned thread_count
,
763 sizeint stack_size
, unsigned int ode_data_allocate_flags
, void *reserved
/*=NULL*/)
765 dAASSERT(thread_count
!= 0);
767 #if dBUILTIN_THREADING_IMPL_ENABLED
768 dxThreadingThreadPool
*thread_pool
= new dxThreadingThreadPool();
769 if (thread_pool
!= NULL
)
771 if (thread_pool
->InitializeThreads(thread_count
, stack_size
, ode_data_allocate_flags
))
782 dThreadingThreadPoolID thread_pool
= NULL
;
783 (void)stack_size
; // unused
784 (void)ode_data_allocate_flags
; // unused
785 (void)reserved
; // unused
786 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
788 return (dThreadingThreadPoolID
)thread_pool
;
791 /*extern */void dThreadingThreadPoolServeMultiThreadedImplementation(dThreadingThreadPoolID pool
, dThreadingImplementationID impl
)
793 #if dBUILTIN_THREADING_IMPL_ENABLED
794 dxThreadingThreadPool
*thread_pool
= (dxThreadingThreadPool
*)pool
;
795 thread_pool
->ServeThreadingImplementation(impl
);
797 (void)pool
; // unused
798 (void)impl
; // unused
799 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
802 /*extern */void dThreadingThreadPoolWaitIdleState(dThreadingThreadPoolID pool
)
804 #if dBUILTIN_THREADING_IMPL_ENABLED
805 dxThreadingThreadPool
*thread_pool
= (dxThreadingThreadPool
*)pool
;
806 thread_pool
->WaitIdleState();
808 (void)pool
; // unused
809 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
812 /*extern */void dThreadingFreeThreadPool(dThreadingThreadPoolID pool
)
814 #if dBUILTIN_THREADING_IMPL_ENABLED
815 dxThreadingThreadPool
*thread_pool
= (dxThreadingThreadPool
*)pool
;
818 (void)pool
; // unused
819 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
823 #endif // #if !defined(_WIN32)