Also use Objects as part of an operation but as a result don't generate Any operation...
[ACE_TAO.git] / ACE / ace / Token.cpp
blob86acaf07dd5db00252be4bd5016d6e0c22435764
1 #include "ace/Token.h"
3 #if !defined (__ACE_INLINE__)
4 # include "ace/Token.inl"
5 #endif /* __ACE_INLINE__ */
9 #if defined (ACE_HAS_THREADS)
11 #include "ace/Thread.h"
12 #include "ace/Log_Category.h"
14 #if defined (ACE_TOKEN_DEBUGGING)
15 // FUZZ: disable check_for_streams_include
16 #include "ace/streams.h"
17 #endif /* ACE_TOKEN_DEBUGGING */
19 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
21 ACE_ALLOC_HOOK_DEFINE(ACE_Token)
23 void
24 ACE_Token::dump (void) const
26 #if defined (ACE_HAS_DUMP)
27 ACE_TRACE ("ACE_Token::dump");
29 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
31 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nthread = %d"), ACE_Thread::self ()));
32 // @@ Is there a portable way to do this?
33 // ACELIB_DEBUG ((LM_DEBUG, "\nowner_ = %d", (long) this->owner_));
34 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nowner_ addr = %x"), &this->owner_));
35 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nwaiters_ = %d"), this->waiters_));
36 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nin_use_ = %d"), this->in_use_));
37 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("\nnesting level = %d"), this->nesting_level_));
38 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
39 #endif /* ACE_HAS_DUMP */
42 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
43 ACE_thread_t t_id)
44 : next_ (0),
45 thread_id_ (t_id),
46 #if defined (ACE_TOKEN_USES_SEMAPHORE)
47 cv_ (0),
48 #else
49 cv_ (m),
50 #endif /* ACE_TOKEN_USES_SEMAPHORE */
51 runable_ (0)
53 #if defined (ACE_TOKEN_USES_SEMAPHORE)
54 ACE_UNUSED_ARG (m);
55 #endif /* ACE_TOKEN_USES_SEMAPHORE */
57 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
60 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
61 ACE_thread_t t_id,
62 ACE_Condition_Attributes &attributes)
63 : next_ (0),
64 thread_id_ (t_id),
65 #if defined (ACE_TOKEN_USES_SEMAPHORE)
66 cv_ (0),
67 #else
68 cv_ (m, attributes),
69 #endif /* ACE_TOKEN_USES_SEMAPHORE */
70 runable_ (0)
72 #if defined (ACE_TOKEN_USES_SEMAPHORE)
73 ACE_UNUSED_ARG (m);
74 ACE_UNUSED_ARG (attributes);
75 #endif /* ACE_TOKEN_USES_SEMAPHORE */
77 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
80 ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void)
81 : head_ (0),
82 tail_ (0)
84 ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
88 // Remove an entry from the list. Must be called with locks held.
90 void
91 ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
93 ACE_TRACE ("ACE_Token::ACE_Token_Queue::remove_entry");
94 ACE_Token_Queue_Entry *curr = 0;
95 ACE_Token_Queue_Entry *prev = 0;
97 if (this->head_ == 0)
98 return;
100 for (curr = this->head_;
101 curr != 0 && curr != entry;
102 curr = curr->next_)
103 prev = curr;
105 if (curr == 0)
106 // Didn't find the entry...
107 return;
108 else if (prev == 0)
109 // Delete at the head.
110 this->head_ = this->head_->next_;
111 else
112 // Delete in the middle.
113 prev->next_ = curr->next_;
115 // We need to update the tail of the list if we've deleted the last
116 // entry.
117 if (curr->next_ == 0)
118 this->tail_ = prev;
122 // Add an entry into the list. Must be called with locks held.
124 void
125 ACE_Token::ACE_Token_Queue::insert_entry (ACE_Token::ACE_Token_Queue_Entry &entry,
126 int requeue_position)
128 if (this->head_ == 0)
130 // No other threads - just add me
131 this->head_ = &entry;
132 this->tail_ = &entry;
134 else if (requeue_position == -1)
136 // Insert at the end of the queue.
137 this->tail_->next_ = &entry;
138 this->tail_ = &entry;
140 else if (requeue_position == 0)
142 // Insert at head of queue.
143 entry.next_ = this->head_;
144 this->head_ = &entry;
146 else
147 // Insert in the middle of the queue somewhere.
149 // Determine where our thread should go in the queue of waiters.
151 ACE_Token::ACE_Token_Queue_Entry *insert_after = this->head_;
152 while (requeue_position-- && insert_after->next_ != 0)
153 insert_after = insert_after->next_;
155 entry.next_ = insert_after->next_;
157 if (entry.next_ == 0)
158 this->tail_ = &entry;
160 insert_after->next_ = &entry;
164 ACE_Token::ACE_Token (const ACE_TCHAR *name, void *any)
165 : lock_ (name, (ACE_mutexattr_t *) any),
166 owner_ (ACE_OS::NULL_thread),
167 in_use_ (0),
168 waiters_ (0),
169 nesting_level_ (0),
170 attributes_ (USYNC_THREAD),
171 queueing_strategy_ (FIFO)
173 // ACE_TRACE ("ACE_Token::ACE_Token");
176 ACE_Token::~ACE_Token (void)
178 ACE_TRACE ("ACE_Token::~ACE_Token");
182 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
183 void *arg,
184 ACE_Time_Value *timeout,
185 ACE_Token_Op_Type op_type)
187 ACE_TRACE ("ACE_Token::shared_acquire");
188 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
190 #if defined (ACE_TOKEN_DEBUGGING)
191 this->dump ();
192 #endif /* ACE_TOKEN_DEBUGGING */
194 ACE_thread_t const thr_id = ACE_Thread::self ();
196 // Nobody holds the token.
197 if (!this->in_use_)
199 // Its mine!
200 this->in_use_ = op_type;
201 this->owner_ = thr_id;
202 return 0;
205 // Someone already holds the token.
207 // Check if it is us.
208 if (ACE_OS::thr_equal (thr_id, this->owner_))
210 ++this->nesting_level_;
211 return 0;
214 // Do a quick check for "polling" behavior.
215 if (timeout != 0 && *timeout == ACE_Time_Value::zero)
217 errno = ETIME;
218 return -1;
222 // We've got to sleep until we get the token.
225 // Which queue we should end up in...
226 ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
227 ? &this->readers_
228 : &this->writers_);
230 // Allocate queue entry on stack. This works since we don't exit
231 // this method's activation record until we've got the token.
232 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
233 thr_id,
234 this->attributes_);
235 queue->insert_entry (my_entry, this->queueing_strategy_);
236 ++this->waiters_;
238 // Execute appropriate <sleep_hook> callback. (@@ should these
239 // methods return a success/failure status, and if so, what should
240 // we do with it?)
241 int ret = 0;
242 if (sleep_hook_func)
244 (*sleep_hook_func) (arg);
245 ++ret;
247 else
249 // Execute virtual method.
250 this->sleep_hook ();
251 ++ret;
254 bool timed_out = false;
255 bool error = false;
257 // Sleep until we've got the token (ignore signals).
260 int const result = my_entry.wait (timeout, this->lock_);
262 if (result == -1)
264 // Note, this should obey whatever thread-specific interrupt
265 // policy is currently in place...
266 if (errno == EINTR)
267 continue;
269 #if defined (ACE_TOKEN_DEBUGGING)
270 cerr << '(' << ACE_Thread::self () << ')'
271 << " acquire: "
272 << (errno == ETIME ? "timed out" : "error occurred")
273 << endl;
274 #endif /* ACE_TOKEN_DEBUGGING */
276 // We come here if a timeout occurs or some serious
277 // ACE_Condition object error.
278 if (errno == ETIME)
279 timed_out = true;
280 else
281 error = true;
283 // Stop the loop.
284 break;
287 while (!ACE_OS::thr_equal (thr_id, this->owner_));
289 // Do this always and irrespective of the result of wait().
290 --this->waiters_;
291 queue->remove_entry (&my_entry);
293 #if defined (ACE_TOKEN_DEBUGGING)
294 ACELIB_DEBUG ((LM_DEBUG, "(%t) ACE_Token::shared_acquire (UNBLOCKED)\n"));
295 #endif /* ACE_TOKEN_DEBUGGING */
297 // If timeout occurred
298 if (timed_out)
300 // This thread was still selected to own the token.
301 if (my_entry.runable_)
303 // Wakeup next waiter since this thread timed out.
304 this->wakeup_next_waiter ();
307 // Return error.
308 return -1;
310 else if (error)
312 // Return error.
313 return -1;
316 // If this is a normal wakeup, this thread should be runnable.
317 ACE_ASSERT (my_entry.runable_);
319 return ret;
322 // By default this is a no-op.
324 /* virtual */
325 void
326 ACE_Token::sleep_hook (void)
328 ACE_TRACE ("ACE_Token::sleep_hook");
332 ACE_Token::acquire (ACE_Time_Value *timeout)
334 ACE_TRACE ("ACE_Token::acquire");
335 return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
338 // Acquire the token, sleeping until it is obtained or until <timeout>
339 // expires.
342 ACE_Token::acquire (void (*sleep_hook_func)(void *),
343 void *arg,
344 ACE_Time_Value *timeout)
346 ACE_TRACE ("ACE_Token::acquire");
347 return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
350 // Try to renew the token.
353 ACE_Token::renew (int requeue_position,
354 ACE_Time_Value *timeout)
356 ACE_TRACE ("ACE_Token::renew");
357 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
359 #if defined (ACE_TOKEN_DEBUGGING)
360 this->dump ();
361 #endif /* ACE_TOKEN_DEBUGGING */
362 // ACE_ASSERT (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_));
364 // Check to see if there are any waiters worth giving up the lock
365 // for.
367 // If no writers and either we are a writer or there are no readers.
368 if (this->writers_.head_ == 0 &&
369 (this->in_use_ == ACE_Token::WRITE_TOKEN ||
370 this->readers_.head_ == 0))
371 // Immediate return.
372 return 0;
374 // We've got to sleep until we get the token again.
376 // Determine which queue should this thread go to.
377 ACE_Token::ACE_Token_Queue *this_threads_queue =
378 this->in_use_ == ACE_Token::READ_TOKEN ?
379 &this->readers_ : &this->writers_;
381 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
382 this->owner_);
384 this_threads_queue->insert_entry (my_entry,
385 // if requeue_position == 0 then we want to go next,
386 // otherwise use the queueing strategy, which might also
387 // happen to be 0.
388 requeue_position == 0 ? 0 : this->queueing_strategy_);
389 ++this->waiters_;
391 // Remember nesting level...
392 int const save_nesting_level_ = this->nesting_level_;
394 // Reset state for new owner.
395 this->nesting_level_ = 0;
397 // Wakeup waiter.
398 this->wakeup_next_waiter ();
400 bool timed_out = false;
401 bool error = false;
403 // Sleep until we've got the token (ignore signals).
406 int const result = my_entry.wait (timeout, this->lock_);
408 if (result == -1)
410 // Note, this should obey whatever thread-specific interrupt
411 // policy is currently in place...
412 if (errno == EINTR)
413 continue;
415 #if defined (ACE_TOKEN_DEBUGGING)
416 cerr << '(' << ACE_Thread::self () << ')'
417 << " renew: "
418 << (errno == ETIME ? "timed out" : "error occurred")
419 << endl;
420 #endif /* ACE_TOKEN_DEBUGGING */
422 // We come here if a timeout occurs or some serious
423 // ACE_Condition object error.
424 if (errno == ETIME)
425 timed_out = true;
426 else
427 error = true;
429 // Stop the loop.
430 break;
433 while (!ACE_OS::thr_equal (my_entry.thread_id_, this->owner_));
435 // Do this always and irrespective of the result of wait().
436 --this->waiters_;
437 this_threads_queue->remove_entry (&my_entry);
439 #if defined (ACE_TOKEN_DEBUGGING)
440 ACELIB_DEBUG ((LM_DEBUG, "(%t) ACE_Token::renew (UNBLOCKED)\n"));
441 #endif /* ACE_TOKEN_DEBUGGING */
443 // If timeout occurred
444 if (timed_out)
446 // This thread was still selected to own the token.
447 if (my_entry.runable_)
449 // Wakeup next waiter since this thread timed out.
450 this->wakeup_next_waiter ();
453 // Return error.
454 return -1;
456 else if (error)
458 // Return error.
459 return -1;
462 // If this is a normal wakeup, this thread should be runnable.
463 ACE_ASSERT (my_entry.runable_);
465 // Reinstate nesting level.
466 this->nesting_level_ = save_nesting_level_;
468 return 0;
471 // Release the current holder of the token (which had
472 // better be the caller's thread!).
475 ACE_Token::release (void)
477 ACE_TRACE ("ACE_Token::release");
478 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
480 #if defined (ACE_TOKEN_DEBUGGING)
481 this->dump ();
482 #endif /* ACE_TOKEN_DEBUGGING */
484 // Nested release...
485 if (this->nesting_level_ > 0)
486 --this->nesting_level_;
487 else
490 // Regular release...
493 // Wakeup waiter.
494 this->wakeup_next_waiter ();
497 return 0;
500 void
501 ACE_Token::wakeup_next_waiter (void)
503 ACE_TRACE ("ACE_Token::wakeup_next_waiter");
505 // Reset state for new owner.
506 this->owner_ = ACE_OS::NULL_thread;
507 this->in_use_ = 0;
509 // Any waiters...
510 if (this->writers_.head_ == 0 &&
511 this->readers_.head_ == 0)
513 // No more waiters...
514 return;
517 // Wakeup next waiter.
518 ACE_Token_Queue *queue = 0;
520 // Writer threads get priority to run first.
521 if (this->writers_.head_ != 0)
523 this->in_use_ = ACE_Token::WRITE_TOKEN;
524 queue = &this->writers_;
526 else
528 this->in_use_ = ACE_Token::READ_TOKEN;
529 queue = &this->readers_;
532 // Wake up waiter and make it runable.
533 queue->head_->runable_ = 1;
534 queue->head_->signal ();
535 this->owner_ = queue->head_->thread_id_;
538 ACE_END_VERSIONED_NAMESPACE_DECL
540 #endif /* ACE_HAS_THREADS */