Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / ace / Token.cpp
blobb8312287a85d299c0905d433c24979088e97e645
1 #include "ace/Token.h"
3 #if !defined (__ACE_INLINE__)
4 # include "ace/Token.inl"
5 #endif /* __ACE_INLINE__ */
7 #if defined (ACE_HAS_THREADS)
9 #include "ace/Thread.h"
10 #include "ace/Log_Category.h"
12 #if defined (ACE_TOKEN_DEBUGGING)
13 // FUZZ: disable check_for_streams_include
14 #include "ace/streams.h"
15 #endif /* ACE_TOKEN_DEBUGGING */
17 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
19 ACE_ALLOC_HOOK_DEFINE(ACE_Token)
21 void
22 ACE_Token::dump () const
24 #if defined (ACE_HAS_DUMP)
25 ACE_TRACE ("ACE_Token::dump");
27 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
29 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nthread = %d"), ACE_Thread::self ()));
30 // @@ Is there a portable way to do this?
31 // ACELIB_DEBUG ((LM_DEBUG, "\nowner_ = %d", (long) this->owner_));
32 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nowner_ addr = %x"), &this->owner_));
33 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nwaiters_ = %d"), this->waiters_));
34 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nin_use_ = %d"), this->in_use_));
35 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nnesting level = %d"), this->nesting_level_));
36 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
37 #endif /* ACE_HAS_DUMP */
40 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
41 ACE_thread_t t_id)
42 : next_ (0),
43 thread_id_ (t_id),
44 #if defined (ACE_TOKEN_USES_SEMAPHORE)
45 cv_ (0),
46 #else
47 cv_ (m),
48 #endif /* ACE_TOKEN_USES_SEMAPHORE */
49 runable_ (0)
51 #if defined (ACE_TOKEN_USES_SEMAPHORE)
52 ACE_UNUSED_ARG (m);
53 #endif /* ACE_TOKEN_USES_SEMAPHORE */
55 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
58 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
59 ACE_thread_t t_id,
60 ACE_Condition_Attributes &attributes)
61 : next_ (0),
62 thread_id_ (t_id),
63 #if defined (ACE_TOKEN_USES_SEMAPHORE)
64 cv_ (0),
65 #else
66 cv_ (m, attributes),
67 #endif /* ACE_TOKEN_USES_SEMAPHORE */
68 runable_ (0)
70 #if defined (ACE_TOKEN_USES_SEMAPHORE)
71 ACE_UNUSED_ARG (m);
72 ACE_UNUSED_ARG (attributes);
73 #endif /* ACE_TOKEN_USES_SEMAPHORE */
75 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
78 ACE_Token::ACE_Token_Queue::ACE_Token_Queue ()
79 : head_ (0),
80 tail_ (0)
82 ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
86 // Remove an entry from the list. Must be called with locks held.
88 void
89 ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
91 ACE_TRACE ("ACE_Token::ACE_Token_Queue::remove_entry");
92 ACE_Token_Queue_Entry *curr = 0;
93 ACE_Token_Queue_Entry *prev = 0;
95 if (this->head_ == 0)
96 return;
98 for (curr = this->head_;
99 curr != 0 && curr != entry;
100 curr = curr->next_)
101 prev = curr;
103 if (curr == 0)
104 // Didn't find the entry...
105 return;
106 else if (prev == 0)
107 // Delete at the head.
108 this->head_ = this->head_->next_;
109 else
110 // Delete in the middle.
111 prev->next_ = curr->next_;
113 // We need to update the tail of the list if we've deleted the last
114 // entry.
115 if (curr->next_ == 0)
116 this->tail_ = prev;
120 // Add an entry into the list. Must be called with locks held.
122 void
123 ACE_Token::ACE_Token_Queue::insert_entry (ACE_Token::ACE_Token_Queue_Entry &entry,
124 int requeue_position)
126 if (this->head_ == 0)
128 // No other threads - just add me
129 this->head_ = &entry;
130 this->tail_ = &entry;
132 else if (requeue_position == -1)
134 // Insert at the end of the queue.
135 this->tail_->next_ = &entry;
136 this->tail_ = &entry;
138 else if (requeue_position == 0)
140 // Insert at head of queue.
141 entry.next_ = this->head_;
142 this->head_ = &entry;
144 else
145 // Insert in the middle of the queue somewhere.
147 // Determine where our thread should go in the queue of waiters.
149 ACE_Token::ACE_Token_Queue_Entry *insert_after = this->head_;
150 while (requeue_position-- && insert_after->next_ != 0)
151 insert_after = insert_after->next_;
153 entry.next_ = insert_after->next_;
155 if (entry.next_ == 0)
156 this->tail_ = &entry;
158 insert_after->next_ = &entry;
162 ACE_Token::ACE_Token (const ACE_TCHAR *name, void *any)
163 : lock_ (name, (ACE_mutexattr_t *) any),
164 owner_ (ACE_OS::NULL_thread),
165 in_use_ (0),
166 waiters_ (0),
167 nesting_level_ (0),
168 attributes_ (USYNC_THREAD),
169 queueing_strategy_ (FIFO)
171 // ACE_TRACE ("ACE_Token::ACE_Token");
174 ACE_Token::~ACE_Token ()
176 ACE_TRACE ("ACE_Token::~ACE_Token");
180 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
181 void *arg,
182 ACE_Time_Value *timeout,
183 ACE_Token_Op_Type op_type)
185 ACE_TRACE ("ACE_Token::shared_acquire");
186 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
188 #if defined (ACE_TOKEN_DEBUGGING)
189 this->dump ();
190 #endif /* ACE_TOKEN_DEBUGGING */
192 ACE_thread_t const thr_id = ACE_Thread::self ();
194 // Nobody holds the token.
195 if (!this->in_use_)
197 // Its mine!
198 this->in_use_ = op_type;
199 this->owner_ = thr_id;
200 return 0;
203 // Someone already holds the token.
205 // Check if it is us.
206 if (ACE_OS::thr_equal (thr_id, this->owner_))
208 ++this->nesting_level_;
209 return 0;
212 // Do a quick check for "polling" behavior.
213 if (timeout != 0 && *timeout == ACE_Time_Value::zero)
215 errno = ETIME;
216 return -1;
220 // We've got to sleep until we get the token.
223 // Which queue we should end up in...
224 ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
225 ? &this->readers_
226 : &this->writers_);
228 // Allocate queue entry on stack. This works since we don't exit
229 // this method's activation record until we've got the token.
230 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
231 thr_id,
232 this->attributes_);
233 queue->insert_entry (my_entry, this->queueing_strategy_);
234 ++this->waiters_;
236 // Execute appropriate <sleep_hook> callback. (@@ should these
237 // methods return a success/failure status, and if so, what should
238 // we do with it?)
239 int ret = 0;
240 if (sleep_hook_func)
242 (*sleep_hook_func) (arg);
243 ++ret;
245 else
247 // Execute virtual method.
248 this->sleep_hook ();
249 ++ret;
252 bool timed_out = false;
253 bool error = false;
255 // Sleep until we've got the token (ignore signals).
258 int const result = my_entry.wait (timeout, this->lock_);
260 if (result == -1)
262 // Note, this should obey whatever thread-specific interrupt
263 // policy is currently in place...
264 if (errno == EINTR)
265 continue;
267 #if defined (ACE_TOKEN_DEBUGGING)
268 cerr << '(' << ACE_Thread::self () << ')'
269 << " acquire: "
270 << (errno == ETIME ? "timed out" : "error occurred")
271 << endl;
272 #endif /* ACE_TOKEN_DEBUGGING */
274 // We come here if a timeout occurs or some serious
275 // ACE_Condition object error.
276 if (errno == ETIME)
277 timed_out = true;
278 else
279 error = true;
281 // Stop the loop.
282 break;
285 while (!ACE_OS::thr_equal (thr_id, this->owner_));
287 // Do this always and irrespective of the result of wait().
288 --this->waiters_;
289 queue->remove_entry (&my_entry);
291 #if defined (ACE_TOKEN_DEBUGGING)
292 ACELIB_DEBUG ((LM_DEBUG, "(%t) ACE_Token::shared_acquire (UNBLOCKED)\n"));
293 #endif /* ACE_TOKEN_DEBUGGING */
295 // If timeout occurred
296 if (timed_out)
298 // This thread was still selected to own the token.
299 if (my_entry.runable_)
301 // Wakeup next waiter since this thread timed out.
302 this->wakeup_next_waiter ();
305 // Return error.
306 return -1;
308 else if (error)
310 // Return error.
311 return -1;
314 // If this is a normal wakeup, this thread should be runnable.
315 ACE_ASSERT (my_entry.runable_);
317 return ret;
320 // By default this is a no-op.
322 /* virtual */
323 void
324 ACE_Token::sleep_hook ()
326 ACE_TRACE ("ACE_Token::sleep_hook");
330 ACE_Token::acquire (ACE_Time_Value *timeout)
332 ACE_TRACE ("ACE_Token::acquire");
333 return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
336 // Acquire the token, sleeping until it is obtained or until <timeout>
337 // expires.
340 ACE_Token::acquire (void (*sleep_hook_func)(void *),
341 void *arg,
342 ACE_Time_Value *timeout)
344 ACE_TRACE ("ACE_Token::acquire");
345 return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
348 // Try to renew the token.
351 ACE_Token::renew (int requeue_position,
352 ACE_Time_Value *timeout)
354 ACE_TRACE ("ACE_Token::renew");
355 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
357 #if defined (ACE_TOKEN_DEBUGGING)
358 this->dump ();
359 #endif /* ACE_TOKEN_DEBUGGING */
360 // ACE_ASSERT (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_));
362 // Check to see if there are any waiters worth giving up the lock
363 // for.
365 // If no writers and either we are a writer or there are no readers.
366 if (this->writers_.head_ == 0 &&
367 (this->in_use_ == ACE_Token::WRITE_TOKEN ||
368 this->readers_.head_ == 0))
369 // Immediate return.
370 return 0;
372 // We've got to sleep until we get the token again.
374 // Determine which queue should this thread go to.
375 ACE_Token::ACE_Token_Queue *this_threads_queue =
376 this->in_use_ == ACE_Token::READ_TOKEN ?
377 &this->readers_ : &this->writers_;
379 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
380 this->owner_);
382 this_threads_queue->insert_entry (my_entry,
383 // if requeue_position == 0 then we want to go next,
384 // otherwise use the queueing strategy, which might also
385 // happen to be 0.
386 requeue_position == 0 ? 0 : this->queueing_strategy_);
387 ++this->waiters_;
389 // Remember nesting level...
390 int const save_nesting_level_ = this->nesting_level_;
392 // Reset state for new owner.
393 this->nesting_level_ = 0;
395 // Wakeup waiter.
396 this->wakeup_next_waiter ();
398 bool timed_out = false;
399 bool error = false;
401 // Sleep until we've got the token (ignore signals).
404 int const result = my_entry.wait (timeout, this->lock_);
406 if (result == -1)
408 // Note, this should obey whatever thread-specific interrupt
409 // policy is currently in place...
410 if (errno == EINTR)
411 continue;
413 #if defined (ACE_TOKEN_DEBUGGING)
414 cerr << '(' << ACE_Thread::self () << ')'
415 << " renew: "
416 << (errno == ETIME ? "timed out" : "error occurred")
417 << endl;
418 #endif /* ACE_TOKEN_DEBUGGING */
420 // We come here if a timeout occurs or some serious
421 // ACE_Condition object error.
422 if (errno == ETIME)
423 timed_out = true;
424 else
425 error = true;
427 // Stop the loop.
428 break;
431 while (!ACE_OS::thr_equal (my_entry.thread_id_, this->owner_));
433 // Do this always and irrespective of the result of wait().
434 --this->waiters_;
435 this_threads_queue->remove_entry (&my_entry);
437 #if defined (ACE_TOKEN_DEBUGGING)
438 ACELIB_DEBUG ((LM_DEBUG, "(%t) ACE_Token::renew (UNBLOCKED)\n"));
439 #endif /* ACE_TOKEN_DEBUGGING */
441 // If timeout occurred
442 if (timed_out)
444 // This thread was still selected to own the token.
445 if (my_entry.runable_)
447 // Wakeup next waiter since this thread timed out.
448 this->wakeup_next_waiter ();
451 // Return error.
452 return -1;
454 else if (error)
456 // Return error.
457 return -1;
460 // If this is a normal wakeup, this thread should be runnable.
461 ACE_ASSERT (my_entry.runable_);
463 // Reinstate nesting level.
464 this->nesting_level_ = save_nesting_level_;
466 return 0;
469 // Release the current holder of the token (which had
470 // better be the caller's thread!).
473 ACE_Token::release ()
475 ACE_TRACE ("ACE_Token::release");
476 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
478 #if defined (ACE_TOKEN_DEBUGGING)
479 this->dump ();
480 #endif /* ACE_TOKEN_DEBUGGING */
482 // Nested release...
483 if (this->nesting_level_ > 0)
484 --this->nesting_level_;
485 else
488 // Regular release...
491 // Wakeup waiter.
492 this->wakeup_next_waiter ();
495 return 0;
498 void
499 ACE_Token::wakeup_next_waiter ()
501 ACE_TRACE ("ACE_Token::wakeup_next_waiter");
503 // Reset state for new owner.
504 this->owner_ = ACE_OS::NULL_thread;
505 this->in_use_ = 0;
507 // Any waiters...
508 if (this->writers_.head_ == 0 &&
509 this->readers_.head_ == 0)
511 // No more waiters...
512 return;
515 // Wakeup next waiter.
516 ACE_Token_Queue *queue = 0;
518 // Writer threads get priority to run first.
519 if (this->writers_.head_ != 0)
521 this->in_use_ = ACE_Token::WRITE_TOKEN;
522 queue = &this->writers_;
524 else
526 this->in_use_ = ACE_Token::READ_TOKEN;
527 queue = &this->readers_;
530 // Wake up waiter and make it runable.
531 queue->head_->runable_ = 1;
532 queue->head_->signal ();
533 this->owner_ = queue->head_->thread_id_;
536 ACE_END_VERSIONED_NAMESPACE_DECL
538 #endif /* ACE_HAS_THREADS */