Changes to attempt to silence bcc64x
[ACE_TAO.git] / ACE / ace / Message_Queue_T.cpp
blob6ff89561fa5d813e33845d2f01daf51ea03c629a
1 #ifndef ACE_MESSAGE_QUEUE_T_CPP
2 #define ACE_MESSAGE_QUEUE_T_CPP
4 // #include Message_Queue.h instead of Message_Queue_T.h to avoid
5 // circular include problems.
6 #include "ace/Message_Queue.h"
7 #include "ace/Message_Queue_Vx.h"
8 #include "ace/Log_Category.h"
9 #include "ace/OS_NS_sys_time.h"
11 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
12 #include "ace/Message_Queue_NT.h"
13 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
15 #if !defined (ACE_LACKS_PRAGMA_ONCE)
16 # pragma once
17 #endif /* ACE_LACKS_PRAGMA_ONCE */
19 #include "ace/Notification_Strategy.h"
20 #include "ace/Truncate.h"
21 #include "ace/Condition_Attributes.h"
23 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
24 #include "ace/OS_NS_stdio.h"
25 #include "ace/OS_NS_unistd.h"
26 #include "ace/Monitor_Size.h"
27 #endif /* ACE_HAS_MONITOR_POINTS==1 */
29 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
31 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Message_Queue)
32 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Dynamic_Message_Queue)
33 ACE_ALLOC_HOOK_DEFINE_Tcyc(ACE_Message_Queue_Ex)
34 ACE_ALLOC_HOOK_DEFINE_Tcyc(ACE_Message_Queue_Ex_N)
36 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> void
37 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dump () const
39 #if defined (ACE_HAS_DUMP)
40 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dump");
42 this->queue_.dump ();
43 #endif /* ACE_HAS_DUMP */
46 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> void
47 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_bytes (size_t new_value)
49 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_bytes");
51 this->queue_.message_bytes (new_value);
54 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> void
55 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_length (size_t new_value)
57 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_length");
59 this->queue_.message_length (new_value);
62 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
63 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue_Ex (size_t high_water_mark,
64 size_t low_water_mark,
65 ACE_Notification_Strategy *ns)
67 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue_Ex");
69 if (this->queue_.open (high_water_mark, low_water_mark, ns) == -1)
70 ACELIB_ERROR ((LM_ERROR,
71 ACE_TEXT ("ACE_Message_Queue_Ex")));
74 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
75 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue_Ex ()
77 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue_Ex");
80 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
81 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::open (size_t hwm,
82 size_t lwm,
83 ACE_Notification_Strategy *ns)
85 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::open");
87 return this->queue_.open (hwm, lwm, ns);
90 // Clean up the queue if we have not already done so!
92 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
93 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::close ()
95 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::close");
97 return this->queue_.close ();
100 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
101 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::flush ()
103 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::flush");
105 return this->queue_.flush ();
108 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
109 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::flush_i ()
111 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::flush_i");
113 return this->queue_.flush_i ();
116 // Take a look at the first item without removing it.
118 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
119 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
120 ACE_Time_Value *timeout)
122 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::peek_dequeue_head");
124 ACE_Message_Block *mb = 0;
126 int const cur_count = this->queue_.peek_dequeue_head (mb, timeout);
128 if (cur_count != -1)
129 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
131 return cur_count;
134 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
135 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
136 ACE_Time_Value *timeout)
138 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_head");
140 ACE_Message_Block *mb = 0;
142 ACE_NEW_RETURN (mb,
143 ACE_Message_Block ((char *) new_item,
144 sizeof (*new_item),
145 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY),
146 -1);
148 int const result = this->queue_.enqueue_head (mb, timeout);
149 if (result == -1)
150 // Zap the message.
151 mb->release ();
152 return result;
155 // Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in
156 // accordance with its <msg_priority> (0 is lowest priority). Returns
157 // -1 on failure, else the number of items still on the queue.
159 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
160 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue (ACE_MESSAGE_TYPE *new_item,
161 ACE_Time_Value *timeout)
163 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue");
165 return this->enqueue_prio (new_item, timeout);
168 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
169 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
170 ACE_Time_Value *timeout,
171 unsigned long priority)
173 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_prio");
175 ACE_Message_Block *mb = 0;
177 ACE_NEW_RETURN (mb,
178 ACE_Message_Block ((char *) new_item,
179 sizeof (*new_item),
180 priority),
181 -1);
183 int const result = this->queue_.enqueue_prio (mb, timeout);
184 if (result == -1)
185 // Zap the message.
186 mb->release ();
188 return result;
191 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
192 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
193 ACE_Time_Value *timeout)
195 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline");
197 ACE_Message_Block *mb = 0;
199 ACE_NEW_RETURN (mb,
200 ACE_Message_Block ((char *) new_item,
201 sizeof (*new_item),
202 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY ),
203 -1);
205 int const result = this->queue_.enqueue_deadline (mb, timeout);
206 if (result == -1)
207 // Zap the message.
208 mb->release ();
210 return result;
213 // Block indefinitely waiting for an item to arrive,
214 // does not ignore alerts (e.g., signals).
216 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
217 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
218 ACE_Time_Value *timeout)
220 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail");
222 ACE_Message_Block *mb = 0;
224 ACE_NEW_RETURN (mb,
225 ACE_Message_Block ((char *) new_item,
226 sizeof (*new_item),
227 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY),
228 -1);
230 int const result = this->queue_.enqueue_tail (mb, timeout);
231 if (result == -1)
232 // Zap the message.
233 mb->release ();
234 return result;
237 // Remove an item from the front of the queue. If timeout == 0 block
238 // indefinitely (or until an alert occurs). Otherwise, block for upto
239 // the amount of time specified by timeout.
241 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
242 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
243 ACE_Time_Value *timeout)
245 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_head");
247 ACE_Message_Block *mb = 0;
249 int const cur_count = this->queue_.dequeue_head (mb, timeout);
251 // Dequeue the message.
252 if (cur_count != -1)
254 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
255 // Delete the message block.
256 mb->release ();
259 return cur_count;
262 // Remove the item with the lowest priority from the queue. If timeout == 0
263 // block indefinitely (or until an alert occurs). Otherwise, block for upto
264 // the amount of time specified by timeout.
266 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
267 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
268 ACE_Time_Value *timeout)
270 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio");
272 ACE_Message_Block *mb = 0;
274 int const cur_count = this->queue_.dequeue_prio (mb, timeout);
276 // Dequeue the message.
277 if (cur_count != -1)
279 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
280 // Delete the message block.
281 mb->release ();
284 return cur_count;
287 // Remove an item from the end of the queue. If timeout == 0 block
288 // indefinitely (or until an alert occurs). Otherwise, block for upto
289 // the amount of time specified by timeout.
291 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
292 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
293 ACE_Time_Value *timeout)
295 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail");
297 ACE_Message_Block *mb = 0;
299 int const cur_count = this->queue_.dequeue_tail (mb, timeout);
301 // Dequeue the message.
302 if (cur_count != -1)
304 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
305 // Delete the message block.
306 mb->release ();
309 return cur_count;
312 // Remove an item with the lowest deadline time. If timeout == 0 block
313 // indefinitely (or until an alert occurs). Otherwise, block for upto
314 // the amount of time specified by timeout.
316 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
317 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
318 ACE_Time_Value *timeout)
320 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline");
322 ACE_Message_Block *mb = 0;
324 int const cur_count = this->queue_.dequeue_deadline (mb, timeout);
326 // Dequeue the message.
327 if (cur_count != -1)
329 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
330 // Delete the message block.
331 mb->release ();
334 return cur_count;
337 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
338 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notify ()
340 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notify");
342 return this->queue_.notify ();
345 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
346 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::
347 ACE_Message_Queue_Ex_Iterator (ACE_Message_Queue_Ex <ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY> & queue)
348 : iter_ (queue.queue_)
352 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
353 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::
354 next (ACE_MESSAGE_TYPE *&entry)
356 ACE_Message_Block * mb = 0;
357 int retval = this->iter_.next (mb);
359 if (retval == 1)
360 entry = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
362 return retval;
365 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
366 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::done () const
368 return this->iter_.done ();
371 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
372 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::advance ()
374 return this->iter_.advance ();
377 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> void
378 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dump () const
380 this->iter_.dump ();
383 ACE_ALLOC_HOOK_DEFINE_Tcyc(ACE_Message_Queue_Ex_Iterator)
385 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
386 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::
387 ACE_Message_Queue_Ex_Reverse_Iterator (ACE_Message_Queue_Ex <ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY> & queue)
388 : iter_ (queue.queue_)
392 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
393 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::
394 next (ACE_MESSAGE_TYPE *&entry)
396 ACE_Message_Block * mb = 0;
397 int retval = this->iter_.next (mb);
399 if (retval == 1)
400 entry = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
402 return retval;
405 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
406 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::done () const
408 return this->iter_.done ();
411 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
412 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::advance ()
414 return this->iter_.advance ();
417 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> void
418 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dump () const
420 this->iter_.dump ();
423 ACE_ALLOC_HOOK_DEFINE_Tcyc(ACE_Message_Queue_Ex_Reverse_Iterator)
425 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
426 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue_Ex_N
427 (size_t high_water_mark,
428 size_t low_water_mark,
429 ACE_Notification_Strategy *ns):
430 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY> (high_water_mark,
431 low_water_mark,
434 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue_Ex_N");
437 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
438 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue_Ex_N ()
440 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue_Ex_N");
443 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
444 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_head
445 (ACE_MESSAGE_TYPE *new_item,
446 ACE_Time_Value *timeout)
448 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_head");
450 // Create a chained ACE_Message_Blocks wrappers around the 'chained'
451 // ACE_MESSAGE_TYPES.
452 ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
453 if (0 == mb)
455 return -1;
458 int result = this->queue_.enqueue_head (mb, timeout);
459 if (-1 == result)
461 // Zap the messages.
462 mb->release ();
464 return result;
467 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
468 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail
469 (ACE_MESSAGE_TYPE *new_item,
470 ACE_Time_Value *timeout)
472 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail");
474 // Create a chained ACE_Message_Blocks wrappers around the 'chained'
475 // ACE_MESSAGE_TYPES.
476 ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
477 if (0 == mb)
479 return -1;
482 int result = this->queue_.enqueue_tail (mb, timeout);
483 if (-1 == result)
485 // Zap the message.
486 mb->release ();
488 return result;
491 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> ACE_Message_Block *
492 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::wrap_with_mbs_i
493 (ACE_MESSAGE_TYPE *new_item)
495 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::wrap_with_mbs_i");
497 // We need to keep a reference to the head of the chain
498 ACE_Message_Block *mb_head = 0;
500 ACE_NEW_RETURN (mb_head,
501 ACE_Message_Block ((char *) new_item,
502 sizeof (*new_item),
503 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY),
506 // mb_tail will point to the last ACE_Message_Block
507 ACE_Message_Block *mb_tail = mb_head;
509 // Run through rest of the messages and wrap them
510 for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ())
512 ACE_Message_Block *mb_temp = 0;
513 ACE_NEW_NORETURN (mb_temp,
514 ACE_Message_Block ((char *) pobj,
515 sizeof (*pobj),
516 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY));
517 if (mb_temp == 0)
519 mb_head->release ();
520 mb_head = 0;
521 break;
524 mb_tail->next (mb_temp);
525 mb_tail = mb_temp;
528 return mb_head;
531 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Message_Queue_Reverse_Iterator)
533 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
534 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue (ACE_MESSAGE_TYPE *&first_item,
535 ACE_Time_Value *timeout)
537 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue");
539 return this->dequeue_head (first_item, timeout);
542 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> ACE_Notification_Strategy *
543 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notification_strategy ()
545 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notification_strategy");
547 return this->queue_.notification_strategy ();
550 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> void
551 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notification_strategy (ACE_Notification_Strategy *s)
553 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notification_strategy");
555 this->queue_.notification_strategy (s);
558 // Check if queue is empty (holds locks).
560 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> bool
561 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::is_empty ()
563 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::is_empty");
565 return this->queue_.is_empty ();
568 // Check if queue is full (holds locks).
570 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> bool
571 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::is_full ()
573 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::is_full");
575 return this->queue_.is_full ();
578 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> size_t
579 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::high_water_mark ()
581 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::high_water_mark");
583 return this->queue_.high_water_mark ();
586 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> void
587 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::high_water_mark (size_t hwm)
589 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::high_water_mark");
591 this->queue_.high_water_mark (hwm);
594 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> size_t
595 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::low_water_mark ()
597 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::low_water_mark");
599 return this->queue_.low_water_mark ();
602 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> void
603 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::low_water_mark (size_t lwm)
605 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::low_water_mark");
607 this->queue_.low_water_mark (lwm);
610 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> size_t
611 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_bytes ()
613 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_bytes");
615 return this->queue_.message_bytes ();
618 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> size_t
619 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_length ()
621 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_length");
623 return this->queue_.message_length ();
626 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> size_t
627 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_count ()
629 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_count");
631 return this->queue_.message_count ();
634 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
635 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::deactivate ()
637 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::deactivate");
639 return this->queue_.deactivate ();
642 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
643 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::activate ()
645 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::activate");
647 return this->queue_.activate ();
650 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
651 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::pulse ()
653 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::pulse");
655 return this->queue_.pulse ();
658 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
659 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::deactivated ()
661 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::deactivated");
663 return this->queue_.deactivated ();
666 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> int
667 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::state ()
669 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::state");
671 return this->queue_.state ();
674 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> ACE_SYNCH_MUTEX_T &
675 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::lock ()
677 return this->queue_.lock ();
680 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
681 ACE_Time_Value_T<TIME_POLICY>
682 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::gettimeofday ()
684 return this->queue_.gettimeofday ();
687 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
688 void
689 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::set_time_policy (TIME_POLICY const & rhs)
691 this->queue_.set_time_policy (rhs);
694 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY>
695 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> &
696 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::queue ()
698 return this->queue_;
701 template <ACE_SYNCH_DECL, class TIME_POLICY>
702 ACE_Message_Queue_Iterator<ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE, TIME_POLICY> &q)
703 : queue_ (q)
704 , curr_ (q.head_)
708 template <ACE_SYNCH_DECL, class TIME_POLICY> int
709 ACE_Message_Queue_Iterator<ACE_SYNCH_USE, TIME_POLICY>::next (ACE_Message_Block *&entry)
711 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
713 if (this->curr_ != 0)
715 entry = this->curr_;
716 return 1;
719 return 0;
722 template <ACE_SYNCH_DECL, class TIME_POLICY> int
723 ACE_Message_Queue_Iterator<ACE_SYNCH_USE, TIME_POLICY>::done () const
725 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
727 return this->curr_ == 0;
730 template <ACE_SYNCH_DECL, class TIME_POLICY> int
731 ACE_Message_Queue_Iterator<ACE_SYNCH_USE, TIME_POLICY>::advance ()
733 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
735 if (this->curr_)
736 this->curr_ = this->curr_->next ();
737 return this->curr_ != 0;
740 template <ACE_SYNCH_DECL, class TIME_POLICY> void
741 ACE_Message_Queue_Iterator<ACE_SYNCH_USE, TIME_POLICY>::dump () const
743 #if defined (ACE_HAS_DUMP)
744 #endif /* ACE_HAS_DUMP */
747 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Message_Queue_Iterator)
749 template <ACE_SYNCH_DECL, class TIME_POLICY>
750 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE, TIME_POLICY> &q)
751 : queue_ (q)
752 , curr_ (queue_.tail_)
756 template <ACE_SYNCH_DECL, class TIME_POLICY> int
757 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE, TIME_POLICY>::next (ACE_Message_Block *&entry)
759 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
761 if (this->curr_ != 0)
763 entry = this->curr_;
764 return 1;
767 return 0;
770 template <ACE_SYNCH_DECL, class TIME_POLICY> int
771 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE, TIME_POLICY>::done () const
773 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
775 return this->curr_ == 0;
778 template <ACE_SYNCH_DECL, class TIME_POLICY> int
779 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE, TIME_POLICY>::advance ()
781 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
783 if (this->curr_)
784 this->curr_ = this->curr_->prev ();
785 return this->curr_ != 0;
788 template <ACE_SYNCH_DECL, class TIME_POLICY> void
789 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE, TIME_POLICY>::dump () const
791 #if defined (ACE_HAS_DUMP)
792 #endif /* ACE_HAS_DUMP */
795 template <ACE_SYNCH_DECL, class TIME_POLICY> int
796 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue (ACE_Message_Block *&first_item,
797 ACE_Time_Value *timeout)
799 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue");
800 return this->dequeue_head (first_item, timeout);
803 template <ACE_SYNCH_DECL, class TIME_POLICY> ACE_Notification_Strategy *
804 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notification_strategy ()
806 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notification_strategy");
808 return this->notification_strategy_;
811 template <ACE_SYNCH_DECL, class TIME_POLICY> void
812 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notification_strategy (ACE_Notification_Strategy *s)
814 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notification_strategy");
816 this->notification_strategy_ = s;
819 // Check if queue is empty (does not hold locks).
821 template <ACE_SYNCH_DECL, class TIME_POLICY> bool
822 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_empty_i ()
824 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_empty_i");
825 return this->tail_ == 0;
828 // Check if queue is full (does not hold locks).
830 template <ACE_SYNCH_DECL, class TIME_POLICY> bool
831 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_full_i ()
833 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_full_i");
834 return this->cur_bytes_ >= this->high_water_mark_;
837 // Check if queue is empty (holds locks).
839 template <ACE_SYNCH_DECL, class TIME_POLICY> bool
840 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_empty ()
842 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_empty");
843 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
845 return this->is_empty_i ();
848 // Check if queue is full (holds locks).
850 template <ACE_SYNCH_DECL, class TIME_POLICY> bool
851 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_full ()
853 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_full");
854 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
856 return this->is_full_i ();
859 template <ACE_SYNCH_DECL, class TIME_POLICY> size_t
860 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::high_water_mark ()
862 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::high_water_mark");
863 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
865 return this->high_water_mark_;
868 template <ACE_SYNCH_DECL, class TIME_POLICY> void
869 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::high_water_mark (size_t hwm)
871 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::high_water_mark");
872 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
874 this->high_water_mark_ = hwm;
877 template <ACE_SYNCH_DECL, class TIME_POLICY> size_t
878 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::low_water_mark ()
880 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::low_water_mark");
881 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
883 return this->low_water_mark_;
886 template <ACE_SYNCH_DECL, class TIME_POLICY> void
887 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::low_water_mark (size_t lwm)
889 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::low_water_mark");
890 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
892 this->low_water_mark_ = lwm;
895 template <ACE_SYNCH_DECL, class TIME_POLICY> size_t
896 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_bytes ()
898 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_bytes");
899 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
901 return this->cur_bytes_;
904 template <ACE_SYNCH_DECL, class TIME_POLICY> size_t
905 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_length ()
907 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_length");
908 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
910 return this->cur_length_;
913 template <ACE_SYNCH_DECL, class TIME_POLICY> size_t
914 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_count ()
916 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_count");
917 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
919 return this->cur_count_;
922 template <ACE_SYNCH_DECL, class TIME_POLICY> int
923 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivate ()
925 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivate");
926 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
928 return this->deactivate_i (0); // Not a pulse
931 template <ACE_SYNCH_DECL, class TIME_POLICY> int
932 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::activate ()
934 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::activate");
935 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
937 return this->activate_i ();
940 template <ACE_SYNCH_DECL, class TIME_POLICY> int
941 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::pulse ()
943 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::pulse");
944 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
946 return this->deactivate_i (1); // Just a pulse
949 template <ACE_SYNCH_DECL, class TIME_POLICY> int
950 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivated ()
952 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivated");
954 return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
957 template <ACE_SYNCH_DECL, class TIME_POLICY> int
958 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::state ()
960 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::state");
962 return this->state_;
965 template <ACE_SYNCH_DECL, class TIME_POLICY> ACE_SYNCH_MUTEX_T &
966 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::lock ()
968 return this->lock_;
971 template <ACE_SYNCH_DECL, class TIME_POLICY>
972 ACE_Time_Value_T<TIME_POLICY>
973 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::gettimeofday () const
975 return this->time_policy_ ();
978 template <ACE_SYNCH_DECL, class TIME_POLICY>
979 void
980 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::set_time_policy (TIME_POLICY const & rhs)
982 this->time_policy_ = rhs;
985 template <ACE_SYNCH_DECL, class TIME_POLICY> void
986 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dump () const
988 #if defined (ACE_HAS_DUMP)
989 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dump");
990 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
991 switch (this->state_)
993 case ACE_Message_Queue_Base::ACTIVATED:
994 ACELIB_DEBUG ((LM_DEBUG,
995 ACE_TEXT ("state = ACTIVATED\n")));
996 break;
997 case ACE_Message_Queue_Base::DEACTIVATED:
998 ACELIB_DEBUG ((LM_DEBUG,
999 ACE_TEXT ("state = DEACTIVATED\n")));
1000 break;
1001 case ACE_Message_Queue_Base::PULSED:
1002 ACELIB_DEBUG ((LM_DEBUG,
1003 ACE_TEXT ("state = PULSED\n")));
1004 break;
1006 ACELIB_DEBUG ((LM_DEBUG,
1007 ACE_TEXT ("low_water_mark = %d\n")
1008 ACE_TEXT ("high_water_mark = %d\n")
1009 ACE_TEXT ("cur_bytes = %d\n")
1010 ACE_TEXT ("cur_length = %d\n")
1011 ACE_TEXT ("cur_count = %d\n")
1012 ACE_TEXT ("head_ = %u\n")
1013 ACE_TEXT ("tail_ = %u\n"),
1014 this->low_water_mark_,
1015 this->high_water_mark_,
1016 this->cur_bytes_,
1017 this->cur_length_,
1018 this->cur_count_,
1019 this->head_,
1020 this->tail_));
1021 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("not_full_cond:\n")));
1022 not_full_cond_.dump ();
1023 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("not_empty_cond:\n")));
1024 not_empty_cond_.dump ();
1025 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
1026 #endif /* ACE_HAS_DUMP */
1029 template <ACE_SYNCH_DECL, class TIME_POLICY> void
1030 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_bytes (size_t new_value)
1032 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_bytes");
1033 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
1035 this->cur_bytes_ = new_value;
1038 template <ACE_SYNCH_DECL, class TIME_POLICY> void
1039 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_length (size_t new_value)
1041 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_length");
1042 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
1044 this->cur_length_ = new_value;
1047 template <ACE_SYNCH_DECL, class TIME_POLICY>
1048 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue (size_t hwm,
1049 size_t lwm,
1050 ACE_Notification_Strategy *ns)
1051 #if defined (ACE_HAS_THREADS)
1052 : not_empty_cond_ (lock_, cond_attr_)
1053 , not_full_cond_ (lock_, cond_attr_)
1054 #else
1055 : not_empty_cond_ (lock_)
1056 , not_full_cond_ (lock_)
1057 #endif
1059 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue");
1061 if (this->open (hwm, lwm, ns) == -1)
1062 ACELIB_ERROR ((LM_ERROR,
1063 ACE_TEXT ("open")));
1065 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1066 ACE_NEW (this->monitor_,
1067 ACE::Monitor_Control::Size_Monitor);
1069 /// Make a unique name using our process id and hex address.
1070 char pid_buf[sizeof (int) + 1];
1071 ACE_OS::sprintf (pid_buf, "%d", ACE_OS::getpid ());
1072 pid_buf[sizeof (int)] = '\0';
1074 const int addr_nibbles = 2 * sizeof (ptrdiff_t);
1075 char addr_buf[addr_nibbles + 1];
1076 ACE_OS::sprintf (addr_buf, "%p", this);
1077 addr_buf[addr_nibbles] = '\0';
1079 ACE_CString name_str ("Message_Queue_");
1080 name_str += pid_buf;
1081 name_str += '_';
1082 name_str += addr_buf;
1083 this->monitor_->name (name_str.c_str ());
1084 this->monitor_->add_to_registry ();
1085 #endif /* ACE_HAS_MONITOR_POINTS==1 */
1088 template <ACE_SYNCH_DECL, class TIME_POLICY>
1089 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue ()
1091 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue");
1092 if (this->head_ != 0 && this->close () == -1)
1093 ACELIB_ERROR ((LM_ERROR,
1094 ACE_TEXT ("close")));
1096 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1097 this->monitor_->remove_from_registry ();
1098 this->monitor_->remove_ref ();
1099 #endif /* ACE_HAS_MONITOR_POINTS==1 */
1102 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1103 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::flush_i ()
1105 int number_flushed = 0;
1107 // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue>
1108 // and <release> their memory.
1109 for (this->tail_ = 0; this->head_ != 0; )
1111 ++number_flushed;
1113 size_t mb_bytes = 0;
1114 size_t mb_length = 0;
1115 this->head_->total_size_and_length (mb_bytes,
1116 mb_length);
1117 // Subtract off all of the bytes associated with this message.
1118 this->cur_bytes_ -= mb_bytes;
1119 this->cur_length_ -= mb_length;
1120 --this->cur_count_;
1122 ACE_Message_Block *temp = this->head_;
1123 this->head_ = this->head_->next ();
1125 // Make sure to use <release> rather than <delete> since this is
1126 // reference counted.
1127 temp->release ();
1130 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1131 // The monitor should output only if the size has actually changed.
1132 if (number_flushed > 0)
1134 this->monitor_->receive (this->cur_length_);
1136 #endif
1138 return number_flushed;
1141 // Don't bother locking since if someone calls this function more than
1142 // once for the same queue, we're in bigger trouble than just
1143 // concurrency control!
1145 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1146 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::open (size_t hwm,
1147 size_t lwm,
1148 ACE_Notification_Strategy *ns)
1150 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::open");
1151 this->high_water_mark_ = hwm;
1152 this->low_water_mark_ = lwm;
1153 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
1154 this->cur_bytes_ = 0;
1155 this->cur_length_ = 0;
1156 this->cur_count_ = 0;
1157 this->tail_ = 0;
1158 this->head_ = 0;
1159 this->notification_strategy_ = ns;
1160 return 0;
1163 // Implementation of the public deactivate() method
1164 // (assumes locks are held).
1166 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1167 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivate_i (int pulse)
1169 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivate_i");
1170 int const previous_state = this->state_;
1172 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
1174 // Wakeup all waiters.
1175 this->not_empty_cond_.broadcast ();
1176 this->not_full_cond_.broadcast ();
1178 if (pulse)
1179 this->state_ = ACE_Message_Queue_Base::PULSED;
1180 else
1181 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
1184 return previous_state;
1187 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1188 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::activate_i ()
1190 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::activate_i");
1191 int const previous_state = this->state_;
1192 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
1193 return previous_state;
1196 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1197 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::flush ()
1199 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::flush");
1200 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1202 // Free up the remaining messages on the queue.
1203 return this->flush_i ();
1206 // Clean up the queue if we have not already done so!
1208 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1209 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::close ()
1211 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::close");
1212 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1214 // There's no need to check the return value of deactivate_i() since
1215 // it never fails!
1216 this->deactivate_i ();
1218 // Free up the remaining messages on the queue.
1219 return this->flush_i ();
1222 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1223 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::signal_enqueue_waiters ()
1225 if (this->not_full_cond_.signal () != 0)
1226 return -1;
1227 return 0;
1230 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1231 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::signal_dequeue_waiters ()
1233 // Tell any blocked threads that the queue has a new item!
1234 if (this->not_empty_cond_.signal () != 0)
1235 return -1;
1236 return 0;
1239 // Actually put the node at the end (no locking so must be called with
1240 // locks held).
1242 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1243 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail_i (ACE_Message_Block *new_item)
1245 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail_i");
1247 if (new_item == 0)
1248 return -1;
1250 // Update the queued size and length, taking into account any chained
1251 // blocks (total_size_and_length() counts all continuation blocks).
1252 // Keep count of how many blocks we're adding and, if there is a chain of
1253 // blocks, find the end in seq_tail and be sure they're properly
1254 // back-connected along the way.
1255 ACE_Message_Block *seq_tail = new_item;
1256 ++this->cur_count_;
1257 new_item->total_size_and_length (this->cur_bytes_,
1258 this->cur_length_);
1259 while (seq_tail->next () != 0)
1261 seq_tail->next ()->prev (seq_tail);
1262 seq_tail = seq_tail->next ();
1263 ++this->cur_count_;
1264 seq_tail->total_size_and_length (this->cur_bytes_,
1265 this->cur_length_);
1268 // List was empty, so build a new one.
1269 if (this->tail_ == 0)
1271 this->head_ = new_item;
1272 this->tail_ = seq_tail;
1273 // seq_tail->next (0); This is a condition of the while() loop above.
1274 new_item->prev (0);
1276 // Link at the end.
1277 else
1279 // seq_tail->next (0); This is a condition of the while() loop above.
1280 this->tail_->next (new_item);
1281 new_item->prev (this->tail_);
1282 this->tail_ = seq_tail;
1285 if (this->signal_dequeue_waiters () == -1)
1286 return -1;
1287 else
1288 return ACE_Utils::truncate_cast<int> (this->cur_count_);
1291 // Actually put the node(s) at the head (no locking)
1293 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1294 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head_i (ACE_Message_Block *new_item)
1296 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head_i");
1298 if (new_item == 0)
1299 return -1;
1301 // Update the queued size and length, taking into account any chained
1302 // blocks (total_size_and_length() counts all continuation blocks).
1303 // Keep count of how many blocks we're adding and, if there is a chain of
1304 // blocks, find the end in seq_tail and be sure they're properly
1305 // back-connected along the way.
1306 ACE_Message_Block *seq_tail = new_item;
1307 ++this->cur_count_;
1308 new_item->total_size_and_length (this->cur_bytes_,
1309 this->cur_length_);
1310 while (seq_tail->next () != 0)
1312 seq_tail->next ()->prev (seq_tail);
1313 seq_tail = seq_tail->next ();
1314 ++this->cur_count_;
1315 seq_tail->total_size_and_length (this->cur_bytes_,
1316 this->cur_length_);
1319 new_item->prev (0);
1320 seq_tail->next (this->head_);
1322 if (this->head_ != 0)
1323 this->head_->prev (seq_tail);
1324 else
1325 this->tail_ = seq_tail;
1327 this->head_ = new_item;
1329 if (this->signal_dequeue_waiters () == -1)
1330 return -1;
1331 else
1332 return ACE_Utils::truncate_cast<int> (this->cur_count_);
1335 // Actually put the node at its proper position relative to its
1336 // priority.
1338 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1339 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_i (ACE_Message_Block *new_item)
1341 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_i");
1343 if (new_item == 0)
1344 return -1;
1346 // Since this method uses enqueue_head_i() and enqueue_tail_i() for
1347 // special situations, and this method doesn't support enqueueing
1348 // chains of blocks off the 'next' pointer, make sure the new_item's
1349 // next pointer is 0.
1350 new_item->next (0);
1352 if (this->head_ == 0)
1353 // Check for simple case of an empty queue, where all we need to
1354 // do is insert <new_item> into the head.
1355 return this->enqueue_head_i (new_item);
1356 else
1358 ACE_Message_Block *temp = 0;
1360 // Figure out where the new item goes relative to its priority.
1361 // We start looking from the lowest priority (at the tail) to
1362 // the highest priority (at the head).
1364 for (temp = this->tail_;
1365 temp != 0;
1366 temp = temp->prev ())
1367 if (temp->msg_priority () >= new_item->msg_priority ())
1368 // Break out when we've located an item that has
1369 // greater or equal priority.
1370 break;
1372 if (temp == 0)
1373 // Check for simple case of inserting at the head of the queue,
1374 // where all we need to do is insert <new_item> before the
1375 // current head.
1376 return this->enqueue_head_i (new_item);
1377 else if (temp->next () == 0)
1378 // Check for simple case of inserting at the tail of the
1379 // queue, where all we need to do is insert <new_item> after
1380 // the current tail.
1381 return this->enqueue_tail_i (new_item);
1382 else
1384 // Insert the new message behind the message of greater or
1385 // equal priority. This ensures that FIFO order is
1386 // maintained when messages of the same priority are
1387 // inserted consecutively.
1388 new_item->prev (temp);
1389 new_item->next (temp->next ());
1390 temp->next ()->prev (new_item);
1391 temp->next (new_item);
1395 // Make sure to count all the bytes in a composite message!!!
1396 new_item->total_size_and_length (this->cur_bytes_,
1397 this->cur_length_);
1398 ++this->cur_count_;
1400 if (this->signal_dequeue_waiters () == -1)
1401 return -1;
1402 else
1403 return ACE_Utils::truncate_cast<int> (this->cur_count_);
1406 // Actually put the node at its proper position relative to its
1407 // deadline time.
1409 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1410 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline_i (ACE_Message_Block *new_item)
1412 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
1413 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline_i");
1415 if (new_item == 0)
1416 return -1;
1418 // Since this method uses enqueue_head_i() and enqueue_tail_i() for
1419 // special situations, and this method doesn't support enqueueing
1420 // chains of blocks off the 'next' pointer, make sure the new_item's
1421 // next pointer is 0.
1422 new_item->next (0);
1424 if (this->head_ == 0)
1425 // Check for simple case of an empty queue, where all we need to
1426 // do is insert <new_item> into the head.
1427 return this->enqueue_head_i (new_item);
1428 else
1430 ACE_Message_Block *temp = 0;
1432 // Figure out where the new item goes relative to its priority.
1433 // We start looking from the smallest deadline to the highest
1434 // deadline.
1436 for (temp = this->head_;
1437 temp != 0;
1438 temp = temp->next ())
1439 if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
1440 // Break out when we've located an item that has
1441 // greater or equal priority.
1442 break;
1444 if (temp == 0 || temp->next () == 0)
1445 // Check for simple case of inserting at the tail of the queue,
1446 // where all we need to do is insert <new_item> after the
1447 // current tail.
1448 return this->enqueue_tail_i (new_item);
1449 else
1451 // Insert the new message behind the message of
1452 // lesser or equal deadline time. This ensures that FIFO order is
1453 // maintained when messages of the same priority are
1454 // inserted consecutively.
1455 new_item->prev (temp);
1456 new_item->next (temp->next ());
1457 temp->next ()->prev (new_item);
1458 temp->next (new_item);
1462 // Make sure to count all the bytes in a composite message!!!
1463 new_item->total_size_and_length (this->cur_bytes_,
1464 this->cur_length_);
1465 ++this->cur_count_;
1467 if (this->signal_dequeue_waiters () == -1)
1468 return -1;
1469 else
1470 return this->cur_count_;
1471 #else
1472 return this->enqueue_tail_i (new_item);
1473 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
1476 // Actually get the first ACE_Message_Block (no locking, so must be
1477 // called with locks held). This method assumes that the queue has at
1478 // least one item in it when it is called.
1480 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1481 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head_i (ACE_Message_Block *&first_item)
1483 if (this->head_ ==0)
1484 ACELIB_ERROR_RETURN ((LM_ERROR,
1485 ACE_TEXT ("Attempting to dequeue from empty queue")),
1486 -1);
1487 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head_i");
1488 first_item = this->head_;
1489 this->head_ = this->head_->next ();
1491 if (this->head_ == 0)
1492 this->tail_ = 0;
1493 else
1494 // The prev pointer of first message block must point to 0...
1495 this->head_->prev (0);
1497 size_t mb_bytes = 0;
1498 size_t mb_length = 0;
1499 first_item->total_size_and_length (mb_bytes,
1500 mb_length);
1501 // Subtract off all of the bytes associated with this message.
1502 this->cur_bytes_ -= mb_bytes;
1503 this->cur_length_ -= mb_length;
1504 --this->cur_count_;
1506 if (this->cur_count_ == 0 && this->head_ == this->tail_)
1507 this->head_ = this->tail_ = 0;
1509 // Make sure that the prev and next fields are 0!
1510 first_item->prev (0);
1511 first_item->next (0);
1513 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1514 this->monitor_->receive (this->cur_length_);
1515 #endif
1517 // Only signal enqueueing threads if we've fallen below the low
1518 // water mark.
1519 if (this->cur_bytes_ <= this->low_water_mark_
1520 && this->signal_enqueue_waiters () == -1)
1521 return -1;
1522 else
1523 return ACE_Utils::truncate_cast<int> (this->cur_count_);
1526 // Get the earliest (i.e., FIFO) ACE_Message_Block with the lowest
1527 // priority (no locking, so must be called with locks held). This
1528 // method assumes that the queue has at least one item in it when it
1529 // is called.
1531 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1532 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio_i (ACE_Message_Block *&dequeued)
1534 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio_i");
1536 if (this->head_ == 0)
1537 return -1;
1539 // Find the earliest (i.e., FIFO) message enqueued with the lowest
1540 // priority.
1541 ACE_Message_Block *chosen = 0;
1542 u_long priority = ULONG_MAX;
1544 for (ACE_Message_Block *temp = this->tail_;
1545 temp != 0;
1546 temp = temp->prev ())
1548 // Find the first version of the earliest message (i.e.,
1549 // preserve FIFO order for messages at the same priority).
1550 if (temp->msg_priority () <= priority)
1552 priority = temp->msg_priority ();
1553 chosen = temp;
1557 // If every message block is the same priority, pass back the first
1558 // one.
1559 if (chosen == 0)
1560 chosen = this->head_;
1562 // Patch up the queue. If we don't have a previous then we are at
1563 // the head of the queue.
1564 if (chosen->prev () == 0)
1565 this->head_ = chosen->next ();
1566 else
1567 chosen->prev ()->next (chosen->next ());
1569 if (chosen->next () == 0)
1570 this->tail_ = chosen->prev ();
1571 else
1572 chosen->next ()->prev (chosen->prev ());
1574 // Pass back the chosen block
1575 dequeued = chosen;
1577 size_t mb_bytes = 0;
1578 size_t mb_length = 0;
1579 dequeued->total_size_and_length (mb_bytes,
1580 mb_length);
1581 // Subtract off all of the bytes associated with this message.
1582 this->cur_bytes_ -= mb_bytes;
1583 this->cur_length_ -= mb_length;
1584 --this->cur_count_;
1586 if (this->cur_count_ == 0 && this->head_ == this->tail_)
1587 this->head_ = this->tail_ = 0;
1589 // Make sure that the prev and next fields are 0!
1590 dequeued->prev (0);
1591 dequeued->next (0);
1593 // Only signal enqueueing threads if we've fallen below the low
1594 // water mark.
1595 if (this->cur_bytes_ <= this->low_water_mark_
1596 && this->signal_enqueue_waiters () == -1)
1597 return -1;
1598 else
1599 return ACE_Utils::truncate_cast<int> (this->cur_count_);
1602 // Actually get the last ACE_Message_Block (no locking, so must be
1603 // called with locks held). This method assumes that the queue has at
1604 // least one item in it when it is called.
1606 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1607 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail_i (ACE_Message_Block *&dequeued)
1609 if (this->head_ == 0)
1610 ACELIB_ERROR_RETURN ((LM_ERROR,
1611 ACE_TEXT ("Attempting to dequeue from empty queue")),
1612 -1);
1613 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail_i");
1614 dequeued = this->tail_;
1615 if (this->tail_->prev () == 0)
1617 this->head_ = 0;
1618 this->tail_ = 0;
1620 else
1622 this->tail_->prev ()->next (0);
1623 this->tail_ = this->tail_->prev ();
1626 size_t mb_bytes = 0;
1627 size_t mb_length = 0;
1628 dequeued->total_size_and_length (mb_bytes,
1629 mb_length);
1630 // Subtract off all of the bytes associated with this message.
1631 this->cur_bytes_ -= mb_bytes;
1632 this->cur_length_ -= mb_length;
1633 --this->cur_count_;
1635 if (this->cur_count_ == 0 && this->head_ == this->tail_)
1636 this->head_ = this->tail_ = 0;
1638 // Make sure that the prev and next fields are 0!
1639 dequeued->prev (0);
1640 dequeued->next (0);
1642 // Only signal enqueueing threads if we've fallen below the low
1643 // water mark.
1644 if (this->cur_bytes_ <= this->low_water_mark_
1645 && this->signal_enqueue_waiters () == -1)
1646 return -1;
1647 else
1648 return ACE_Utils::truncate_cast<int> (this->cur_count_);
1651 // Actually get the ACE_Message_Block with the lowest deadline time
1652 // (no locking, so must be called with locks held). This method assumes
1653 // that the queue has at least one item in it when it is called.
1655 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1656 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
1658 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
1659 if (this->head_ == 0)
1660 ACELIB_ERROR_RETURN ((LM_ERROR,
1661 ACE_TEXT ("Attempting to dequeue from empty queue")),
1662 -1);
1663 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline_i");
1665 // Find the last message enqueued with the lowest deadline time
1666 ACE_Message_Block* chosen = 0;
1667 ACE_Time_Value deadline = ACE_Time_Value::max_time;
1668 for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
1669 if (temp->msg_deadline_time () < deadline)
1671 deadline = temp->msg_deadline_time ();
1672 chosen = temp;
1675 // If every message block is the same deadline time,
1676 // pass back the first one
1677 if (chosen == 0)
1678 chosen = this->head_;
1680 // Patch up the queue. If we don't have a previous
1681 // then we are at the head of the queue.
1682 if (chosen->prev () == 0)
1683 this->head_ = chosen->next ();
1684 else
1685 chosen->prev ()->next (chosen->next ());
1687 if (chosen->next () == 0)
1688 this->tail_ = chosen->prev ();
1689 else
1690 chosen->next ()->prev (chosen->prev ());
1692 // Pass back the chosen block
1693 dequeued = chosen;
1695 size_t mb_bytes = 0;
1696 size_t mb_length = 0;
1697 dequeued->total_size_and_length (mb_bytes,
1698 mb_length);
1699 // Subtract off all of the bytes associated with this message.
1700 this->cur_bytes_ -= mb_bytes;
1701 this->cur_length_ -= mb_length;
1702 --this->cur_count_;
1704 if (this->cur_count_ == 0 && this->head_ == this->tail_)
1705 this->head_ = this->tail_ = 0;
1707 // Make sure that the prev and next fields are 0!
1708 dequeued->prev (0);
1709 dequeued->next (0);
1711 // Only signal enqueueing threads if we've fallen below the low
1712 // water mark.
1713 if (this->cur_bytes_ <= this->low_water_mark_
1714 && this->signal_enqueue_waiters () == -1)
1715 return -1;
1716 else
1717 return this->cur_count_;
1718 #else
1719 return this->dequeue_head_i (dequeued);
1720 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
1723 // Take a look at the first item without removing it.
1725 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1726 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::peek_dequeue_head (ACE_Message_Block *&first_item,
1727 ACE_Time_Value *timeout)
1729 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::peek_dequeue_head");
1730 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1732 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
1734 errno = ESHUTDOWN;
1735 return -1;
1738 // Wait for at least one item to become available.
1739 if (this->wait_not_empty_cond (timeout) == -1)
1740 return -1;
1742 first_item = this->head_;
1743 return ACE_Utils::truncate_cast<int> (this->cur_count_);
1746 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1747 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::wait_not_full_cond (ACE_Time_Value *timeout)
1749 int result = 0;
1751 // Wait while the queue is full.
1753 while (this->is_full_i ())
1755 if (this->not_full_cond_.wait (timeout) == -1)
1757 if (errno == ETIME)
1758 errno = EWOULDBLOCK;
1759 result = -1;
1760 break;
1762 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
1764 errno = ESHUTDOWN;
1765 result = -1;
1766 break;
1769 return result;
1772 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1773 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::wait_not_empty_cond (ACE_Time_Value *timeout)
1775 int result = 0;
1777 // Wait while the queue is empty.
1779 while (this->is_empty_i ())
1781 if (this->not_empty_cond_.wait (timeout) == -1)
1783 if (errno == ETIME)
1784 errno = EWOULDBLOCK;
1785 result = -1;
1786 break;
1788 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
1790 errno = ESHUTDOWN;
1791 result = -1;
1792 break;
1795 return result;
1798 // Block indefinitely waiting for an item to arrive, does not ignore
1799 // alerts (e.g., signals).
1801 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1802 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head (ACE_Message_Block *new_item,
1803 ACE_Time_Value *timeout)
1805 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head");
1806 int queue_count = 0;
1807 ACE_Notification_Strategy *notifier = 0;
1809 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1811 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
1813 errno = ESHUTDOWN;
1814 return -1;
1817 if (this->wait_not_full_cond (timeout) == -1)
1818 return -1;
1820 queue_count = this->enqueue_head_i (new_item);
1821 if (queue_count == -1)
1822 return -1;
1824 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1825 this->monitor_->receive (this->cur_length_);
1826 #endif
1827 notifier = this->notification_strategy_;
1830 if (0 != notifier)
1831 notifier->notify();
1832 return queue_count;
1835 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
1836 // accordance with its <msg_priority> (0 is lowest priority). Returns
1837 // -1 on failure, else the number of items still on the queue.
1839 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1840 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_prio (ACE_Message_Block *new_item,
1841 ACE_Time_Value *timeout)
1843 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_prio");
1844 int queue_count = 0;
1845 ACE_Notification_Strategy *notifier = 0;
1847 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1849 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
1851 errno = ESHUTDOWN;
1852 return -1;
1855 if (this->wait_not_full_cond (timeout) == -1)
1856 return -1;
1858 queue_count = this->enqueue_i (new_item);
1860 if (queue_count == -1)
1861 return -1;
1863 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1864 this->monitor_->receive (this->cur_length_);
1865 #endif
1866 notifier = this->notification_strategy_;
1868 if (0 != notifier)
1869 notifier->notify ();
1870 return queue_count;
1873 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
1874 // accordance with its <msg_deadline_time>. Returns
1875 // -1 on failure, else the number of items still on the queue.
1877 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1878 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline (ACE_Message_Block *new_item,
1879 ACE_Time_Value *timeout)
1881 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline");
1882 int queue_count = 0;
1883 ACE_Notification_Strategy *notifier = 0;
1885 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1887 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
1889 errno = ESHUTDOWN;
1890 return -1;
1893 if (this->wait_not_full_cond (timeout) == -1)
1894 return -1;
1896 queue_count = this->enqueue_deadline_i (new_item);
1898 if (queue_count == -1)
1899 return -1;
1901 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1902 this->monitor_->receive (this->cur_length_);
1903 #endif
1904 notifier = this->notification_strategy_;
1906 if (0 != notifier)
1907 notifier->notify ();
1908 return queue_count;
1911 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1912 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue (ACE_Message_Block *new_item,
1913 ACE_Time_Value *timeout)
1915 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue");
1916 return this->enqueue_prio (new_item, timeout);
1919 // Block indefinitely waiting for an item to arrive,
1920 // does not ignore alerts (e.g., signals).
1922 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1923 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail (ACE_Message_Block *new_item,
1924 ACE_Time_Value *timeout)
1926 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail");
1927 int queue_count = 0;
1928 ACE_Notification_Strategy *notifier = 0;
1930 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1932 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
1934 errno = ESHUTDOWN;
1935 return -1;
1938 if (this->wait_not_full_cond (timeout) == -1)
1939 return -1;
1941 queue_count = this->enqueue_tail_i (new_item);
1943 if (queue_count == -1)
1944 return -1;
1946 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1947 this->monitor_->receive (this->cur_length_);
1948 #endif
1949 notifier = this->notification_strategy_;
1951 if (0 != notifier)
1952 notifier->notify ();
1953 return queue_count;
1956 // Remove an item from the front of the queue. If timeout == 0 block
1957 // indefinitely (or until an alert occurs). Otherwise, block for upto
1958 // the amount of time specified by timeout.
1960 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1961 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head (ACE_Message_Block *&first_item,
1962 ACE_Time_Value *timeout)
1964 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head");
1965 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1967 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
1969 errno = ESHUTDOWN;
1970 return -1;
1973 if (this->wait_not_empty_cond (timeout) == -1)
1974 return -1;
1976 return this->dequeue_head_i (first_item);
1979 // Remove item with the lowest priority from the queue. If timeout == 0 block
1980 // indefinitely (or until an alert occurs). Otherwise, block for upto
1981 // the amount of time specified by timeout.
1983 template <ACE_SYNCH_DECL, class TIME_POLICY> int
1984 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio (ACE_Message_Block *&dequeued,
1985 ACE_Time_Value *timeout)
1987 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio");
1988 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
1990 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
1992 errno = ESHUTDOWN;
1993 return -1;
1996 if (this->wait_not_empty_cond (timeout) == -1)
1997 return -1;
1999 return this->dequeue_prio_i (dequeued);
2002 // Remove an item from the end of the queue. If timeout == 0 block
2003 // indefinitely (or until an alert occurs). Otherwise, block for upto
2004 // the amount of time specified by timeout.
2006 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2007 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail (ACE_Message_Block *&dequeued,
2008 ACE_Time_Value *timeout)
2010 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail");
2011 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
2013 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
2015 errno = ESHUTDOWN;
2016 return -1;
2019 if (this->wait_not_empty_cond (timeout) == -1)
2020 return -1;
2022 return this->dequeue_tail_i (dequeued);
2025 // Remove an item with the lowest deadline time. If timeout == 0 block
2026 // indefinitely (or until an alert occurs). Otherwise, block for upto
2027 // the amount of time specified by timeout.
2029 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2030 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline (ACE_Message_Block *&dequeued,
2031 ACE_Time_Value *timeout)
2033 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline");
2034 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
2036 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
2038 errno = ESHUTDOWN;
2039 return -1;
2042 if (this->wait_not_empty_cond (timeout) == -1)
2043 return -1;
2045 return this->dequeue_deadline_i (dequeued);
2048 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2049 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notify ()
2051 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notify");
2053 // By default, don't do anything.
2054 if (this->notification_strategy_ == 0)
2055 return 0;
2056 else
2057 return this->notification_strategy_->notify ();
2060 template <ACE_SYNCH_DECL, class TIME_POLICY>
2061 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
2062 size_t hwm,
2063 size_t lwm,
2064 ACE_Notification_Strategy *ns)
2065 : ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> (hwm, lwm, ns),
2066 pending_head_ (0),
2067 pending_tail_ (0),
2068 late_head_ (0),
2069 late_tail_ (0),
2070 beyond_late_head_ (0),
2071 beyond_late_tail_ (0),
2072 message_strategy_ (message_strategy)
2074 // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
2075 // for the passed ACE_Dynamic_Message_Strategy object, and deletes
2076 // it in its own dtor
2079 // dtor: free message strategy and let base class dtor do the rest.
2081 template <ACE_SYNCH_DECL, class TIME_POLICY>
2082 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Dynamic_Message_Queue ()
2084 delete &this->message_strategy_;
2087 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2088 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::remove_messages (ACE_Message_Block *&list_head,
2089 ACE_Message_Block *&list_tail,
2090 u_int status_flags)
2092 // start with an empty list
2093 list_head = 0;
2094 list_tail = 0;
2096 // Get the current time
2097 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
2099 // Refresh priority status boundaries in the queue.
2100 int result = this->refresh_queue (current_time);
2101 if (result < 0)
2102 return result;
2104 if (ACE_BIT_ENABLED (status_flags,
2105 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
2106 && this->pending_head_
2107 && this->pending_tail_)
2109 // patch up pointers for the new tail of the queue
2110 if (this->pending_head_->prev ())
2112 this->tail_ = this->pending_head_->prev ();
2113 this->pending_head_->prev ()->next (0);
2115 else
2117 // the list has become empty
2118 this->head_ = 0;
2119 this->tail_ = 0;
2122 // point to the head and tail of the list
2123 list_head = this->pending_head_;
2124 list_tail = this->pending_tail_;
2126 // cut the pending messages out of the queue entirely
2127 this->pending_head_->prev (0);
2128 this->pending_head_ = 0;
2129 this->pending_tail_ = 0;
2132 if (ACE_BIT_ENABLED (status_flags,
2133 (u_int) ACE_Dynamic_Message_Strategy::LATE)
2134 && this->late_head_
2135 && this->late_tail_)
2137 // Patch up pointers for the (possibly) new head and tail of the
2138 // queue.
2139 if (this->late_tail_->next ())
2140 this->late_tail_->next ()->prev (this->late_head_->prev ());
2141 else
2142 this->tail_ = this->late_head_->prev ();
2144 if (this->late_head_->prev ())
2145 this->late_head_->prev ()->next (this->late_tail_->next ());
2146 else
2147 this->head_ = this->late_tail_->next ();
2149 // put late messages behind pending messages (if any) being returned
2150 this->late_head_->prev (list_tail);
2151 if (list_tail)
2152 list_tail->next (this->late_head_);
2153 else
2154 list_head = this->late_head_;
2156 list_tail = this->late_tail_;
2158 this->late_tail_->next (0);
2159 this->late_head_ = 0;
2160 this->late_tail_ = 0;
2163 if (ACE_BIT_ENABLED (status_flags,
2164 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
2165 && this->beyond_late_head_
2166 && this->beyond_late_tail_)
2168 // Patch up pointers for the new tail of the queue
2169 if (this->beyond_late_tail_->next ())
2171 this->head_ = this->beyond_late_tail_->next ();
2172 this->beyond_late_tail_->next ()->prev (0);
2174 else
2176 // the list has become empty
2177 this->head_ = 0;
2178 this->tail_ = 0;
2181 // Put beyond late messages at the end of the list being
2182 // returned.
2183 if (list_tail)
2185 this->beyond_late_head_->prev (list_tail);
2186 list_tail->next (this->beyond_late_head_);
2188 else
2189 list_head = this->beyond_late_head_;
2191 list_tail = this->beyond_late_tail_;
2193 this->beyond_late_tail_->next (0);
2194 this->beyond_late_head_ = 0;
2195 this->beyond_late_tail_ = 0;
2198 // Decrement message and size counts for removed messages.
2199 ACE_Message_Block *temp1;
2201 for (temp1 = list_head;
2202 temp1 != 0;
2203 temp1 = temp1->next ())
2205 --this->cur_count_;
2207 size_t mb_bytes = 0;
2208 size_t mb_length = 0;
2209 temp1->total_size_and_length (mb_bytes,
2210 mb_length);
2211 // Subtract off all of the bytes associated with this message.
2212 this->cur_bytes_ -= mb_bytes;
2213 this->cur_length_ -= mb_length;
2216 return result;
2219 // Detach all messages with status given in the passed flags from the
2220 // queue and return them by setting passed head and tail pointers to
2221 // the linked list they comprise. This method is intended primarily
2222 // as a means of periodically harvesting messages that have missed
2223 // their deadlines, but is available in its most general form. All
2224 // messages are returned in priority order, from head to tail, as of
2225 // the time this method was called.
2227 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2228 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head (ACE_Message_Block *&first_item,
2229 ACE_Time_Value *timeout)
2231 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head");
2233 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
2235 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
2237 errno = ESHUTDOWN;
2238 return -1;
2241 int result;
2243 // get the current time
2244 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
2246 // refresh priority status boundaries in the queue
2247 result = this->refresh_queue (current_time);
2248 if (result < 0)
2249 return result;
2251 // *now* it's appropriate to wait for an enqueued item
2252 result = this->wait_not_empty_cond (timeout);
2253 if (result == -1)
2254 return result;
2256 // call the internal dequeue method, which selects an item from the
2257 // highest priority status portion of the queue that has messages
2258 // enqueued.
2259 result = this->dequeue_head_i (first_item);
2261 return result;
2264 // Dequeue and return the <ACE_Message_Block *> at the (logical) head
2265 // of the queue.
2267 template <ACE_SYNCH_DECL, class TIME_POLICY> void
2268 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dump () const
2270 #if defined (ACE_HAS_DUMP)
2271 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dump");
2272 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
2274 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> (base class):\n")));
2275 this->ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dump ();
2277 ACELIB_DEBUG ((LM_DEBUG,
2278 ACE_TEXT ("pending_head_ = %u\n")
2279 ACE_TEXT ("pending_tail_ = %u\n")
2280 ACE_TEXT ("late_head_ = %u\n")
2281 ACE_TEXT ("late_tail_ = %u\n")
2282 ACE_TEXT ("beyond_late_head_ = %u\n")
2283 ACE_TEXT ("beyond_late_tail_ = %u\n"),
2284 this->pending_head_,
2285 this->pending_tail_,
2286 this->late_head_,
2287 this->late_tail_,
2288 this->beyond_late_head_,
2289 this->beyond_late_tail_));
2291 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ :\n")));
2292 message_strategy_.dump ();
2294 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
2295 #endif /* ACE_HAS_DUMP */
2297 // dump the state of the queue
2299 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2300 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_i (ACE_Message_Block *new_item)
2302 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_i");
2304 if (new_item == 0)
2306 return -1;
2309 int result = 0;
2311 // Get the current time.
2312 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
2314 // Refresh priority status boundaries in the queue.
2316 result = this->refresh_queue (current_time);
2318 if (result < 0)
2320 return result;
2323 // Where we enqueue depends on the message's priority status.
2324 switch (message_strategy_.priority_status (*new_item,
2325 current_time))
2327 case ACE_Dynamic_Message_Strategy::PENDING:
2328 if (this->pending_tail_ == 0)
2330 // Check for simple case of an empty pending queue, where
2331 // all we need to do is insert <new_item> into the tail of
2332 // the queue.
2333 pending_head_ = new_item;
2334 pending_tail_ = pending_head_;
2335 return this->enqueue_tail_i (new_item);
2337 else
2339 // Enqueue the new message in priority order in the pending
2340 // sublist
2341 result = sublist_enqueue_i (new_item,
2342 current_time,
2343 this->pending_head_,
2344 this->pending_tail_,
2345 ACE_Dynamic_Message_Strategy::PENDING);
2347 break;
2349 case ACE_Dynamic_Message_Strategy::LATE:
2350 if (this->late_tail_ == 0)
2352 late_head_ = new_item;
2353 late_tail_ = late_head_;
2355 if (this->pending_head_ == 0)
2356 // Check for simple case of an empty pending queue,
2357 // where all we need to do is insert <new_item> into the
2358 // tail of the queue.
2359 return this->enqueue_tail_i (new_item);
2360 else if (this->beyond_late_tail_ == 0)
2361 // Check for simple case of an empty beyond late queue, where all
2362 // we need to do is insert <new_item> into the head of the queue.
2363 return this->enqueue_head_i (new_item);
2364 else
2366 // Otherwise, we can just splice the new message in
2367 // between the pending and beyond late portions of the
2368 // queue.
2369 this->beyond_late_tail_->next (new_item);
2370 new_item->prev (this->beyond_late_tail_);
2371 this->pending_head_->prev (new_item);
2372 new_item->next (this->pending_head_);
2375 else
2377 // Enqueue the new message in priority order in the late
2378 // sublist
2379 result = sublist_enqueue_i (new_item,
2380 current_time,
2381 this->late_head_,
2382 this->late_tail_,
2383 ACE_Dynamic_Message_Strategy::LATE);
2385 break;
2387 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
2388 if (this->beyond_late_tail_ == 0)
2390 // Check for simple case of an empty beyond late queue,
2391 // where all we need to do is insert <new_item> into the
2392 // head of the queue.
2393 beyond_late_head_ = new_item;
2394 beyond_late_tail_ = beyond_late_head_;
2395 return this->enqueue_head_i (new_item);
2397 else
2399 // all beyond late messages have the same (zero) priority,
2400 // so just put the new one at the end of the beyond late
2401 // messages
2402 if (this->beyond_late_tail_->next ())
2404 this->beyond_late_tail_->next ()->prev (new_item);
2406 else
2408 this->tail_ = new_item;
2411 new_item->next (this->beyond_late_tail_->next ());
2412 this->beyond_late_tail_->next (new_item);
2413 new_item->prev (this->beyond_late_tail_);
2414 this->beyond_late_tail_ = new_item;
2417 break;
2419 // should never get here, but just in case...
2420 default:
2421 result = -1;
2422 break;
2425 if (result < 0)
2427 return result;
2430 size_t mb_bytes = 0;
2431 size_t mb_length = 0;
2432 new_item->total_size_and_length (mb_bytes,
2433 mb_length);
2434 this->cur_bytes_ += mb_bytes;
2435 this->cur_length_ += mb_length;
2436 ++this->cur_count_;
2438 if (this->signal_dequeue_waiters () == -1)
2440 return -1;
2442 else
2444 return ACE_Utils::truncate_cast<int> (this->cur_count_);
2448 // Enqueue an <ACE_Message_Block *> in accordance with its priority.
2449 // priority may be *dynamic* or *static* or a combination or *both* It
2450 // calls the priority evaluation function passed into the Dynamic
2451 // Message Queue constructor to update the priorities of all enqueued
2452 // messages.
2454 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2455 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::sublist_enqueue_i (ACE_Message_Block *new_item,
2456 const ACE_Time_Value &current_time,
2457 ACE_Message_Block *&sublist_head,
2458 ACE_Message_Block *&sublist_tail,
2459 ACE_Dynamic_Message_Strategy::Priority_Status status)
2461 int result = 0;
2462 ACE_Message_Block *current_item = 0;
2464 // Find message after which to enqueue new item, based on message
2465 // priority and priority status.
2466 for (current_item = sublist_tail;
2467 current_item;
2468 current_item = current_item->prev ())
2470 if (message_strategy_.priority_status (*current_item, current_time) == status)
2472 if (current_item->msg_priority () >= new_item->msg_priority ())
2473 break;
2475 else
2477 sublist_head = new_item;
2478 break;
2482 if (current_item == 0)
2484 // If the new message has highest priority of any, put it at the
2485 // head of the list (and sublist).
2486 new_item->prev (0);
2487 new_item->next (this->head_);
2488 if (this->head_ != 0)
2489 this->head_->prev (new_item);
2490 else
2492 this->tail_ = new_item;
2493 sublist_tail = new_item;
2495 this->head_ = new_item;
2496 sublist_head = new_item;
2498 else
2500 // insert the new item into the list
2501 new_item->next (current_item->next ());
2502 new_item->prev (current_item);
2504 if (current_item->next ())
2505 current_item->next ()->prev (new_item);
2506 else
2507 this->tail_ = new_item;
2509 current_item->next (new_item);
2511 // If the new item has lowest priority of any in the sublist,
2512 // move the tail pointer of the sublist back to the new item
2513 if (current_item == sublist_tail)
2514 sublist_tail = new_item;
2517 return result;
2520 // Enqueue a message in priority order within a given priority status
2521 // sublist.
2523 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2524 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head_i (ACE_Message_Block *&first_item)
2526 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head_i");
2528 int result = 0;
2529 int last_in_subqueue = 0;
2531 // first, try to dequeue from the head of the pending list
2532 if (this->pending_head_)
2534 first_item = this->pending_head_;
2536 if (0 == this->pending_head_->prev ())
2537 this->head_ = this->pending_head_->next ();
2538 else
2539 this->pending_head_->prev ()->next (this->pending_head_->next ());
2541 if (0 == this->pending_head_->next ())
2543 this->tail_ = this->pending_head_->prev ();
2544 this->pending_head_ = 0;
2545 this->pending_tail_ = 0;
2547 else
2549 this->pending_head_->next ()->prev (this->pending_head_->prev ());
2550 this->pending_head_ = this->pending_head_->next ();
2553 first_item->prev (0);
2554 first_item->next (0);
2557 // Second, try to dequeue from the head of the late list
2558 else if (this->late_head_)
2560 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
2562 first_item = this->late_head_;
2564 if (0 == this->late_head_->prev ())
2565 this->head_ = this->late_head_->next ();
2566 else
2567 this->late_head_->prev ()->next (this->late_head_->next ());
2569 if (0 == this->late_head_->next ())
2570 this->tail_ = this->late_head_->prev ();
2571 else
2573 this->late_head_->next ()->prev (this->late_head_->prev ());
2574 this->late_head_ = this->late_head_->next ();
2577 if (last_in_subqueue)
2579 this->late_head_ = 0;
2580 this->late_tail_ = 0;
2583 first_item->prev (0);
2584 first_item->next (0);
2586 // finally, try to dequeue from the head of the beyond late list
2587 else if (this->beyond_late_head_)
2589 last_in_subqueue =
2590 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
2592 first_item = this->beyond_late_head_;
2593 this->head_ = this->beyond_late_head_->next ();
2595 if (0 == this->beyond_late_head_->next ())
2597 this->tail_ = this->beyond_late_head_->prev ();
2599 else
2601 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
2602 this->beyond_late_head_ = this->beyond_late_head_->next ();
2605 if (last_in_subqueue)
2607 this->beyond_late_head_ = 0;
2608 this->beyond_late_tail_ = 0;
2611 first_item->prev (0);
2612 first_item->next (0);
2614 else
2616 // nothing to dequeue: set the pointer to zero and return an error code
2617 first_item = 0;
2618 result = -1;
2621 if (result < 0)
2623 return result;
2626 size_t mb_bytes = 0;
2627 size_t mb_length = 0;
2628 first_item->total_size_and_length (mb_bytes,
2629 mb_length);
2630 // Subtract off all of the bytes associated with this message.
2631 this->cur_bytes_ -= mb_bytes;
2632 this->cur_length_ -= mb_length;
2633 --this->cur_count_;
2635 // Only signal enqueueing threads if we've fallen below the low
2636 // water mark.
2637 if (this->cur_bytes_ <= this->low_water_mark_
2638 && this->signal_enqueue_waiters () == -1)
2640 return -1;
2642 else
2644 return ACE_Utils::truncate_cast<int> (this->cur_count_);
2648 // Dequeue and return the <ACE_Message_Block *> at the head of the
2649 // logical queue. Attempts first to dequeue from the pending portion
2650 // of the queue, or if that is empty from the late portion, or if that
2651 // is empty from the beyond late portion, or if that is empty just
2652 // sets the passed pointer to zero and returns -1.
2654 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2655 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::refresh_queue (const ACE_Time_Value &current_time)
2657 int result;
2659 result = refresh_pending_queue (current_time);
2661 if (result != -1)
2662 result = refresh_late_queue (current_time);
2664 return result;
2667 // Refresh the queue using the strategy specific priority status
2668 // function.
2670 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2671 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::refresh_pending_queue (const ACE_Time_Value &current_time)
2673 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
2675 // refresh priority status boundaries in the queue
2676 if (this->pending_head_)
2678 current_status = message_strategy_.priority_status (*this->pending_head_,
2679 current_time);
2680 switch (current_status)
2682 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
2683 // Make sure the head of the beyond late queue is set (there
2684 // may not have been any beyond late messages previously)
2685 this->beyond_late_head_ = this->head_;
2687 // Zero out the late queue pointers, and set them only if
2688 // there turn out to be late messages in the pending sublist
2689 this->late_head_ = 0;
2690 this->late_tail_ = 0;
2692 // Advance through the beyond late messages in the pending queue
2695 this->pending_head_ = this->pending_head_->next ();
2697 if (this->pending_head_)
2698 current_status = message_strategy_.priority_status (*this->pending_head_,
2699 current_time);
2700 else
2701 break; // do while
2704 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
2706 if (this->pending_head_)
2708 // point tail of beyond late sublist to previous item
2709 this->beyond_late_tail_ = this->pending_head_->prev ();
2711 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
2712 // there are no late messages left in the queue
2713 break; // switch
2714 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
2716 // if we got here, something is *seriously* wrong with the queue
2717 ACELIB_ERROR_RETURN ((LM_ERROR,
2718 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
2719 (int) current_status),
2720 -1);
2723 else
2725 // There are no pending or late messages left in the
2726 // queue.
2727 this->beyond_late_tail_ = this->tail_;
2728 this->pending_head_ = 0;
2729 this->pending_tail_ = 0;
2730 break; // switch
2732 ACE_FALLTHROUGH;
2734 case ACE_Dynamic_Message_Strategy::LATE:
2735 // Make sure the head of the late queue is set (there may
2736 // not have been any late messages previously, or they may
2737 // have all become beyond late).
2738 if (this->late_head_ == 0)
2739 this->late_head_ = this->pending_head_;
2741 // advance through the beyond late messages in the pending queue
2744 this->pending_head_ = this->pending_head_->next ();
2746 if (this->pending_head_)
2747 current_status = message_strategy_.priority_status (*this->pending_head_,
2748 current_time);
2749 else
2750 break; // do while
2753 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
2755 if (this->pending_head_)
2757 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
2758 // if we got here, something is *seriously* wrong with the queue
2759 ACELIB_ERROR_RETURN((LM_ERROR,
2760 ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
2761 (int) current_status),
2762 -1);
2764 // Point tail of late sublist to previous item
2765 this->late_tail_ = this->pending_head_->prev ();
2767 else
2769 // there are no pending messages left in the queue
2770 this->late_tail_ = this->tail_;
2771 this->pending_head_ = 0;
2772 this->pending_tail_ = 0;
2775 break; // switch
2776 case ACE_Dynamic_Message_Strategy::PENDING:
2777 // do nothing - the pending queue is unchanged
2778 break; // switch
2779 default:
2780 // if we got here, something is *seriously* wrong with the queue
2781 ACELIB_ERROR_RETURN((LM_ERROR,
2782 ACE_TEXT ("Unknown message priority status [%d]"),
2783 (int) current_status),
2784 -1);
2787 return 0;
2790 // Refresh the pending queue using the strategy specific priority
2791 // status function.
2793 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2794 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::refresh_late_queue (const ACE_Time_Value &current_time)
2796 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
2798 if (this->late_head_)
2800 current_status = message_strategy_.priority_status (*this->late_head_,
2801 current_time);
2802 switch (current_status)
2804 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
2806 // make sure the head of the beyond late queue is set
2807 // (there may not have been any beyond late messages previously)
2808 this->beyond_late_head_ = this->head_;
2810 // advance through the beyond late messages in the late queue
2813 this->late_head_ = this->late_head_->next ();
2815 if (this->late_head_)
2816 current_status = message_strategy_.priority_status (*this->late_head_,
2817 current_time);
2818 else
2819 break; // do while
2822 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
2824 if (this->late_head_)
2826 // point tail of beyond late sublist to previous item
2827 this->beyond_late_tail_ = this->late_head_->prev ();
2829 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
2831 // there are no late messages left in the queue
2832 this->late_head_ = 0;
2833 this->late_tail_ = 0;
2835 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
2836 // if we got here, something is *seriously* wrong with the queue
2837 ACELIB_ERROR_RETURN ((LM_ERROR,
2838 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
2839 (int) current_status),
2840 -1);
2842 else
2844 // there are no late messages left in the queue
2845 this->beyond_late_tail_ = this->tail_;
2846 this->late_head_ = 0;
2847 this->late_tail_ = 0;
2850 break; // switch
2852 case ACE_Dynamic_Message_Strategy::LATE:
2853 // do nothing - the late queue is unchanged
2854 break; // switch
2856 case ACE_Dynamic_Message_Strategy::PENDING:
2857 // if we got here, something is *seriously* wrong with the queue
2858 ACELIB_ERROR_RETURN ((LM_ERROR,
2859 ACE_TEXT ("Unexpected message priority status ")
2860 ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
2861 (int) current_status),
2862 -1);
2863 default:
2864 // if we got here, something is *seriously* wrong with the queue
2865 ACELIB_ERROR_RETURN ((LM_ERROR,
2866 ACE_TEXT ("Unknown message priority status [%d]"),
2867 (int) current_status),
2868 -1);
2872 return 0;
2875 // Refresh the late queue using the strategy specific priority status
2876 // function.
2878 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2879 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::peek_dequeue_head (ACE_Message_Block *&first_item,
2880 ACE_Time_Value *timeout)
2882 return ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::peek_dequeue_head (first_item,
2883 timeout);
2886 // Private method to hide public base class method: just calls base
2887 // class method.
2889 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2890 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail (ACE_Message_Block *new_item,
2891 ACE_Time_Value *timeout)
2893 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail");
2894 return this->enqueue_prio (new_item, timeout);
2897 // Just call priority enqueue method: tail enqueue semantics for
2898 // dynamic message queues are unstable: the message may or may not be
2899 // where it was placed after the queue is refreshed prior to the next
2900 // enqueue or dequeue operation.
2902 template <ACE_SYNCH_DECL, class TIME_POLICY> int
2903 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head (ACE_Message_Block *new_item,
2904 ACE_Time_Value *timeout)
2906 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head");
2907 return this->enqueue_prio (new_item, timeout);
2910 // Just call priority enqueue method: head enqueue semantics for
2911 // dynamic message queues are unstable: the message may or may not be
2912 // where it was placed after the queue is refreshed prior to the next
2913 // enqueue or dequeue operation.
2915 template <ACE_SYNCH_DECL, class TIME_POLICY>
2916 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *
2917 ACE_Message_Queue_Factory<ACE_SYNCH_USE, TIME_POLICY>::create_static_message_queue (size_t hwm,
2918 size_t lwm,
2919 ACE_Notification_Strategy *ns)
2921 typedef ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> QUEUE_TYPE;
2922 QUEUE_TYPE *tmp = 0;
2924 ACE_NEW_RETURN (tmp,
2925 QUEUE_TYPE (hwm, lwm, ns),
2927 return tmp;
2930 // Factory method for a statically prioritized ACE_Message_Queue.
2932 template <ACE_SYNCH_DECL, class TIME_POLICY>
2933 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *
2934 ACE_Message_Queue_Factory<ACE_SYNCH_USE, TIME_POLICY>::create_deadline_message_queue (size_t hwm,
2935 size_t lwm,
2936 ACE_Notification_Strategy *ns,
2937 u_long static_bit_field_mask,
2938 u_long static_bit_field_shift,
2939 u_long dynamic_priority_max,
2940 u_long dynamic_priority_offset)
2942 ACE_Deadline_Message_Strategy *adms = 0;
2944 ACE_NEW_RETURN (adms,
2945 ACE_Deadline_Message_Strategy (static_bit_field_mask,
2946 static_bit_field_shift,
2947 dynamic_priority_max,
2948 dynamic_priority_offset),
2951 typedef ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> QUEUE_TYPE;
2952 QUEUE_TYPE *tmp = 0;
2953 ACE_NEW_RETURN (tmp,
2954 QUEUE_TYPE (*adms, hwm, lwm, ns),
2956 return tmp;
2959 // Factory method for a dynamically prioritized (by time to deadline)
2960 // ACE_Dynamic_Message_Queue.
2962 template <ACE_SYNCH_DECL, class TIME_POLICY>
2963 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *
2964 ACE_Message_Queue_Factory<ACE_SYNCH_USE, TIME_POLICY>::create_laxity_message_queue (size_t hwm,
2965 size_t lwm,
2966 ACE_Notification_Strategy *ns,
2967 u_long static_bit_field_mask,
2968 u_long static_bit_field_shift,
2969 u_long dynamic_priority_max,
2970 u_long dynamic_priority_offset)
2972 ACE_Laxity_Message_Strategy *alms = 0;
2974 ACE_NEW_RETURN (alms,
2975 ACE_Laxity_Message_Strategy (static_bit_field_mask,
2976 static_bit_field_shift,
2977 dynamic_priority_max,
2978 dynamic_priority_offset),
2981 typedef ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> QUEUE_TYPE;
2982 QUEUE_TYPE *tmp = 0;
2983 ACE_NEW_RETURN (tmp,
2984 QUEUE_TYPE (*alms, hwm, lwm, ns),
2986 return tmp;
2989 // Factory method for a dynamically prioritized (by laxity)
2990 // <ACE_Dynamic_Message_Queue>.
2992 #if defined (ACE_VXWORKS)
2993 // factory method for a wrapped VxWorks message queue
2995 template <ACE_SYNCH_DECL, class TIME_POLICY>
2996 ACE_Message_Queue_Vx *
2997 ACE_Message_Queue_Factory<ACE_SYNCH_USE, TIME_POLICY>::create_Vx_message_queue (size_t max_messages,
2998 size_t max_message_length,
2999 ACE_Notification_Strategy *ns)
3001 ACE_Message_Queue_Vx *tmp = 0;
3003 ACE_NEW_RETURN (tmp,
3004 ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
3006 return tmp;
3008 #endif /* defined (ACE_VXWORKS) */
3010 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
3012 template <ACE_SYNCH_DECL, class TIME_POLICY>
3013 ACE_Message_Queue_NT *
3014 ACE_Message_Queue_Factory<ACE_SYNCH_USE, TIME_POLICY>::create_NT_message_queue (size_t max_threads)
3016 ACE_Message_Queue_NT *tmp = 0;
3018 ACE_NEW_RETURN (tmp,
3019 ACE_Message_Queue_NT (max_threads),
3021 return tmp;
3024 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
3026 ACE_END_VERSIONED_NAMESPACE_DECL
3028 #endif /* !ACE_MESSAGE_QUEUE_T_CPP */