2 This file is part of Valgrind, a dynamic binary instrumentation
5 Copyright (C) 2008-2008 Google Inc
8 This program is free software; you can redistribute it and/or
9 modify it under the terms of the GNU General Public License as
10 published by the Free Software Foundation; either version 2 of the
11 License, or (at your option) any later version.
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 The GNU General Public License is contained in the file COPYING.
24 // Author: Konstantin Serebryany <opensource@google.com>
26 // Here we define few simple classes that wrap pthread primitives.
28 // We need this to create unit tests for helgrind (or similar tool)
29 // that will work with different threading frameworks.
31 // If one needs to test helgrind's support for another threading library,
32 // he/she can create a copy of this file and replace pthread_ calls
33 // with appropriate calls to his/her library.
35 // Note, that some of the methods defined here are annotated with
36 // ANNOTATE_* macros defined in dynamic_annotations.h.
38 // DISCLAIMER: the classes defined in this header file
39 // are NOT intended for general use -- only for unit tests.
42 #ifndef THREAD_WRAPPERS_PTHREAD_H
43 #define THREAD_WRAPPERS_PTHREAD_H
46 #include <semaphore.h>
50 #include <limits.h> // INT_MAX
53 #include <libkern/OSAtomic.h>
58 #if __cplusplus >= 201103L
67 #include "../../drd/drd.h"
68 #define ANNOTATE_NO_OP(arg) do { } while(0)
69 #define ANNOTATE_EXPECT_RACE(addr, descr) \
70 ANNOTATE_BENIGN_RACE_SIZED(addr, 4, "expected race")
71 static inline bool RunningOnValgrind() { return RUNNING_ON_VALGRIND
; }
75 # error "Pleeease, do not define NDEBUG"
79 /// Set this to true if malloc() uses mutex on your platform as this may
80 /// introduce a happens-before arc for a pure happens-before race detector.
81 const bool kMallocUsesMutex
= false;
83 /// Current time in milliseconds.
84 static inline int64_t GetCurrentTimeMillis() {
86 gettimeofday(&now
, NULL
);
87 return now
.tv_sec
* 1000 + now
.tv_usec
/ 1000;
90 /// Copy tv to ts adding offset in milliseconds.
91 static inline void timeval2timespec(timeval
*const tv
,
93 int64_t offset_milli
) {
94 const int64_t ten_9
= 1000000000LL;
95 const int64_t ten_6
= 1000000LL;
96 const int64_t ten_3
= 1000LL;
97 int64_t now_nsec
= (int64_t)tv
->tv_sec
* ten_9
;
98 now_nsec
+= (int64_t)tv
->tv_usec
* ten_3
;
99 int64_t then_nsec
= now_nsec
+ offset_milli
* ten_6
;
100 ts
->tv_sec
= then_nsec
/ ten_9
;
101 ts
->tv_nsec
= then_nsec
% ten_9
;
108 /// helgrind does not (yet) support spin locks, so we annotate them.
114 CHECK(0 == pthread_spin_init(&mu_
, 0));
115 ANNOTATE_RWLOCK_CREATE((void*)&mu_
);
118 ANNOTATE_RWLOCK_DESTROY((void*)&mu_
);
119 CHECK(0 == pthread_spin_destroy(&mu_
));
122 CHECK(0 == pthread_spin_lock(&mu_
));
123 ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_
, 1);
126 ANNOTATE_RWLOCK_RELEASED((void*)&mu_
, 1);
127 CHECK(0 == pthread_spin_unlock(&mu_
));
130 pthread_spinlock_t mu_
;
138 SpinLock() : mu_(OS_SPINLOCK_INIT
) {
139 ANNOTATE_RWLOCK_CREATE((void*)&mu_
);
142 ANNOTATE_RWLOCK_DESTROY((void*)&mu_
);
145 OSSpinLockLock(&mu_
);
146 ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_
, 1);
149 ANNOTATE_RWLOCK_RELEASED((void*)&mu_
, 1);
150 OSSpinLockUnlock(&mu_
);
157 #endif // NO_SPINLOCK
159 /// Just a boolean condition. Used by Mutex::LockWhen and similar.
160 template <typename T
>
163 typedef bool (*func_t
)(void*);
165 Condition(bool (*func
)(T
*), T
* arg
)
166 : func1_(func
), arg_(arg
) {}
168 Condition(bool (*func
)())
169 : func0_(func
), arg_(NULL
) {}
171 bool Eval() const { return func1_
? func1_(arg_
) : func0_(); }
180 /// Wrapper for pthread_mutex_t.
182 /// pthread_mutex_t is *not* a reader-writer lock,
183 /// so the methods like ReaderLock() aren't really reader locks.
184 /// We can not use pthread_rwlock_t because it
185 /// does not work with pthread_cond_t.
187 /// TODO: We still need to test reader locks with this class.
188 /// Implement a mode where pthread_rwlock_t will be used
189 /// instead of pthread_mutex_t (only when not used with CondVar or LockWhen).
192 friend class CondVar
;
195 CHECK(0 == pthread_mutex_init(&mu_
, NULL
));
196 CHECK(0 == pthread_cond_init(&cv_
, NULL
));
197 signal_at_unlock_
= true; // Always signal at Unlock to make
198 // Mutex more friendly to hybrid detectors.
201 CHECK(0 == pthread_cond_destroy(&cv_
));
202 CHECK(0 == pthread_mutex_destroy(&mu_
));
204 void Lock() { CHECK(0 == pthread_mutex_lock(&mu_
));}
205 bool TryLock() { return (0 == pthread_mutex_trylock(&mu_
));}
207 if (signal_at_unlock_
) {
208 CHECK(0 == pthread_cond_signal(&cv_
));
210 CHECK(0 == pthread_mutex_unlock(&mu_
));
212 void ReaderLock() { Lock(); }
213 bool ReaderTryLock() { return TryLock();}
214 void ReaderUnlock() { Unlock(); }
216 template <typename T
>
217 void LockWhen(const Condition
<T
>& cond
) { Lock(); WaitLoop(cond
); }
218 template <typename T
>
219 void ReaderLockWhen(const Condition
<T
>& cond
) { Lock(); WaitLoop(cond
); }
220 template <typename T
>
221 void Await(const Condition
<T
>& cond
) { WaitLoop(cond
); }
223 template <typename T
>
224 bool ReaderLockWhenWithTimeout(const Condition
<T
>& cond
, int millis
)
225 { Lock(); return WaitLoopWithTimeout(cond
, millis
); }
226 template <typename T
>
227 bool LockWhenWithTimeout(const Condition
<T
>& cond
, int millis
)
228 { Lock(); return WaitLoopWithTimeout(cond
, millis
); }
229 template <typename T
>
230 bool AwaitWithTimeout(const Condition
<T
>& cond
, int millis
)
231 { return WaitLoopWithTimeout(cond
, millis
); }
235 template <typename T
>
236 void WaitLoop(const Condition
<T
>& cond
) {
237 signal_at_unlock_
= true;
238 while(cond
.Eval() == false) {
239 pthread_cond_wait(&cv_
, &mu_
);
241 ANNOTATE_CONDVAR_LOCK_WAIT(&cv_
, &mu_
);
244 template <typename T
>
245 bool WaitLoopWithTimeout(const Condition
<T
>& cond
, int millis
) {
247 struct timespec timeout
;
249 gettimeofday(&now
, NULL
);
250 timeval2timespec(&now
, &timeout
, millis
);
252 signal_at_unlock_
= true;
253 while (cond
.Eval() == false && retcode
== 0) {
254 retcode
= pthread_cond_timedwait(&cv_
, &mu_
, &timeout
);
257 ANNOTATE_CONDVAR_LOCK_WAIT(&cv_
, &mu_
);
262 // A hack. cv_ should be the first data member so that
263 // ANNOTATE_CONDVAR_WAIT(&MU, &MU) and ANNOTATE_CONDVAR_SIGNAL(&MU) works.
264 // (See also racecheck_unittest.cc)
267 bool signal_at_unlock_
; // Set to true if Wait was called.
271 class MutexLock
{ // Scoped Mutex Locker/Unlocker
285 /// Wrapper for pthread_cond_t.
288 CondVar() { CHECK(0 == pthread_cond_init(&cv_
, NULL
)); }
289 ~CondVar() { CHECK(0 == pthread_cond_destroy(&cv_
)); }
290 void Wait(Mutex
*mu
) { CHECK(0 == pthread_cond_wait(&cv_
, &mu
->mu_
)); }
291 bool WaitWithTimeout(Mutex
*mu
, int millis
) {
293 struct timespec timeout
;
294 gettimeofday(&now
, NULL
);
295 timeval2timespec(&now
, &timeout
, millis
);
296 return 0 != pthread_cond_timedwait(&cv_
, &mu
->mu_
, &timeout
);
298 void Signal() { CHECK(0 == pthread_cond_signal(&cv_
)); }
299 void SignalAll() { CHECK(0 == pthread_cond_broadcast(&cv_
)); }
305 // pthreads do not allow to use condvar with rwlock so we can't make
306 // ReaderLock method of Mutex to be the real rw-lock.
307 // So, we need a special lock class to test reader locks.
308 #define NEEDS_SEPERATE_RW_LOCK
311 RWLock() { CHECK(0 == pthread_rwlock_init(&mu_
, NULL
)); }
312 ~RWLock() { CHECK(0 == pthread_rwlock_destroy(&mu_
)); }
313 void Lock() { CHECK(0 == pthread_rwlock_wrlock(&mu_
)); }
314 void ReaderLock() { CHECK(0 == pthread_rwlock_rdlock(&mu_
)); }
315 void Unlock() { CHECK(0 == pthread_rwlock_unlock(&mu_
)); }
316 void ReaderUnlock() { CHECK(0 == pthread_rwlock_unlock(&mu_
)); }
318 pthread_cond_t dummy
; // Damn, this requires some redesign...
319 pthread_rwlock_t mu_
;
322 class ReaderLockScoped
{ // Scoped RWLock Locker/Unlocker
324 ReaderLockScoped(RWLock
*mu
)
328 ~ReaderLockScoped() {
335 class WriterLockScoped
{ // Scoped RWLock Locker/Unlocker
337 WriterLockScoped(RWLock
*mu
)
341 ~WriterLockScoped() {
351 /// Wrapper for pthread_create()/pthread_join().
354 MyThread(void* (*worker
)(void *), void *arg
= NULL
, const char *name
= NULL
)
355 :wpvpv_(worker
), wvv_(), wvpv_(), arg_(arg
), name_(name
) {}
356 MyThread(void (*worker
)(void), void *arg
= NULL
, const char *name
= NULL
)
357 :wpvpv_(), wvv_(worker
), wvpv_(), arg_(arg
), name_(name
) {}
358 MyThread(void (*worker
)(void *), void *arg
= NULL
, const char *name
= NULL
)
359 :wpvpv_(), wvv_(), wvpv_(worker
), arg_(arg
), name_(name
) {}
361 void Start() { CHECK(0 == pthread_create(&t_
, NULL
, ThreadBody
, this));}
362 void Join() { CHECK(0 == pthread_join(t_
, NULL
));}
363 pthread_t
tid() const { return t_
; }
365 static void *ThreadBody(void *arg
) {
366 MyThread
*my_thread
= reinterpret_cast<MyThread
*>(arg
);
367 if (my_thread
->name_
) {
368 ANNOTATE_THREAD_NAME(my_thread
->name_
);
370 if (my_thread
->wpvpv_
)
371 return my_thread
->wpvpv_(my_thread
->arg_
);
372 if (my_thread
->wvpv_
)
373 my_thread
->wvpv_(my_thread
->arg_
);
379 void *(*wpvpv_
)(void*);
381 void (*wvpv_
)(void*);
387 /// Just a message queue.
388 class ProducerConsumerQueue
{
390 ProducerConsumerQueue(int unused
) {
391 //ANNOTATE_PCQ_CREATE(this);
393 ~ProducerConsumerQueue() {
395 //ANNOTATE_PCQ_DESTROY(this);
399 void Put(void *item
) {
402 ANNOTATE_CONDVAR_SIGNAL(&mu_
); // LockWhen in Get()
403 //ANNOTATE_PCQ_PUT(this);
408 // Blocks if the queue is empty.
410 mu_
.LockWhen(Condition
<typeof(q_
)>(IsQueueNotEmpty
, &q_
));
412 bool ok
= TryGetInternal(&item
);
418 // If queue is not empty,
419 // remove an element from queue, put it into *res and return true.
420 // Otherwise return false.
421 bool TryGet(void **res
) {
423 bool ok
= TryGetInternal(res
);
430 std::queue
<void*> q_
; // protected by mu_
433 bool TryGetInternal(void ** item_ptr
) {
436 *item_ptr
= q_
.front();
438 //ANNOTATE_PCQ_GET(this);
442 static bool IsQueueNotEmpty(std::queue
<void*> * queue
) {
443 return !queue
->empty();
449 /// Function pointer with zero, one or two parameters.
451 typedef void (*F0
)();
452 typedef void (*F1
)(void *arg1
);
453 typedef void (*F2
)(void *arg1
, void *arg2
);
462 } else if (n_params
== 1) {
465 CHECK(n_params
== 2);
466 (F2(f
))(param1
, param2
);
472 Closure
*NewCallback(void (*f
)()) {
473 Closure
*res
= new Closure
;
482 Closure
*NewCallback(void (*f
)(P1
), P1 p1
) {
483 CHECK(sizeof(P1
) <= sizeof(void*));
484 Closure
*res
= new Closure
;
487 res
->param1
= (void*)p1
;
492 template <class T
, class P1
, class P2
>
493 Closure
*NewCallback(void (*f
)(P1
, P2
), P1 p1
, P2 p2
) {
494 CHECK(sizeof(P1
) <= sizeof(void*));
495 Closure
*res
= new Closure
;
498 res
->param1
= (void*)p1
;
499 res
->param2
= (void*)p2
;
503 /*! A thread pool that uses ProducerConsumerQueue.
506 ThreadPool pool(n_workers);
508 pool.Add(NewCallback(func_with_no_args));
509 pool.Add(NewCallback(func_with_one_arg, arg));
510 pool.Add(NewCallback(func_with_two_args, arg1, arg2));
511 ... // more calls to pool.Add()
513 // the ~ThreadPool() is called: we wait workers to finish
514 // and then join all threads in the pool.
519 //! Create n_threads threads, but do not start.
520 explicit ThreadPool(int n_threads
)
522 for (int i
= 0; i
< n_threads
; i
++) {
523 MyThread
*thread
= new MyThread(&ThreadPool::Worker
, this);
524 workers_
.push_back(thread
);
528 //! Start all threads.
529 void StartWorkers() {
530 for (size_t i
= 0; i
< workers_
.size(); i
++) {
531 workers_
[i
]->Start();
536 void Add(Closure
*closure
) {
540 int num_threads() { return workers_
.size();}
542 //! Wait workers to finish, then join all threads.
544 for (size_t i
= 0; i
< workers_
.size(); i
++) {
547 for (size_t i
= 0; i
< workers_
.size(); i
++) {
553 std::vector
<MyThread
*> workers_
;
554 ProducerConsumerQueue queue_
;
556 static void *Worker(void *p
) {
557 ThreadPool
*pool
= reinterpret_cast<ThreadPool
*>(p
);
559 Closure
*closure
= reinterpret_cast<Closure
*>(pool
->queue_
.Get());
560 if(closure
== NULL
) {
569 /// Wrapper for pthread_barrier_t.
572 explicit Barrier(int n_threads
) {CHECK(0 == pthread_barrier_init(&b_
, 0, n_threads
));}
573 ~Barrier() {CHECK(0 == pthread_barrier_destroy(&b_
));}
575 // helgrind 3.3.0 does not have an interceptor for barrier.
576 // but our current local version does.
577 // ANNOTATE_CONDVAR_SIGNAL(this);
578 pthread_barrier_wait(&b_
);
579 // ANNOTATE_CONDVAR_WAIT(this, this);
582 pthread_barrier_t b_
;
587 class BlockingCounter
{
589 explicit BlockingCounter(int initial_count
) :
590 count_(initial_count
) {}
591 bool DecrementCount() {
592 MutexLock
lock(&mu_
);
597 mu_
.LockWhen(Condition
<int>(&IsZero
, &count_
));
601 static bool IsZero(int *arg
) { return *arg
== 0; }
606 int AtomicIncrement(volatile int *value
, int increment
);
609 inline int AtomicIncrement(volatile int *value
, int increment
) {
610 return __sync_add_and_fetch(value
, increment
);
615 inline int AtomicIncrement(volatile int *value
, int increment
) {
616 return OSAtomicAdd32(increment
, value
);
619 // TODO(timurrrr) this is a hack
620 #define memalign(A,B) malloc(B)
622 // TODO(timurrrr) this is a hack
623 int posix_memalign(void **out
, size_t al
, size_t size
) {
624 *out
= memalign(al
, size
);
629 #endif // THREAD_WRAPPERS_PTHREAD_H
630 // vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker