1 #ifndef ACE_MESSAGE_QUEUE_T_CPP
2 #define ACE_MESSAGE_QUEUE_T_CPP
4 // #include Message_Queue.h instead of Message_Queue_T.h to avoid
5 // circular include problems.
6 #include "ace/Message_Queue.h"
7 #include "ace/Message_Queue_Vx.h"
8 #include "ace/Log_Category.h"
9 #include "ace/OS_NS_sys_time.h"
11 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
12 #include "ace/Message_Queue_NT.h"
13 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
15 #if !defined (ACE_LACKS_PRAGMA_ONCE)
17 #endif /* ACE_LACKS_PRAGMA_ONCE */
19 #include "ace/Notification_Strategy.h"
20 #include "ace/Truncate.h"
21 #include "ace/Condition_Attributes.h"
23 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
24 #include "ace/OS_NS_stdio.h"
25 #include "ace/OS_NS_unistd.h"
26 #include "ace/Monitor_Size.h"
27 #endif /* ACE_HAS_MONITOR_POINTS==1 */
29 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
31 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Message_Queue
)
32 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Dynamic_Message_Queue
)
33 ACE_ALLOC_HOOK_DEFINE_Tcyc(ACE_Message_Queue_Ex
)
34 ACE_ALLOC_HOOK_DEFINE_Tcyc(ACE_Message_Queue_Ex_N
)
36 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> void
37 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::dump () const
39 #if defined (ACE_HAS_DUMP)
40 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dump");
43 #endif /* ACE_HAS_DUMP */
46 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> void
47 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::message_bytes (size_t new_value
)
49 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_bytes");
51 this->queue_
.message_bytes (new_value
);
54 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> void
55 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::message_length (size_t new_value
)
57 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_length");
59 this->queue_
.message_length (new_value
);
62 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
63 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Message_Queue_Ex (size_t high_water_mark
,
64 size_t low_water_mark
,
65 ACE_Notification_Strategy
*ns
)
67 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue_Ex");
69 if (this->queue_
.open (high_water_mark
, low_water_mark
, ns
) == -1)
70 ACELIB_ERROR ((LM_ERROR
,
71 ACE_TEXT ("ACE_Message_Queue_Ex")));
74 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
75 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::~ACE_Message_Queue_Ex ()
77 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue_Ex");
80 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
81 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::open (size_t hwm
,
83 ACE_Notification_Strategy
*ns
)
85 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::open");
87 return this->queue_
.open (hwm
, lwm
, ns
);
90 // Clean up the queue if we have not already done so!
92 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
93 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::close ()
95 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::close");
97 return this->queue_
.close ();
100 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
101 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::flush ()
103 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::flush");
105 return this->queue_
.flush ();
108 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
109 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::flush_i ()
111 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::flush_i");
113 return this->queue_
.flush_i ();
116 // Take a look at the first item without removing it.
118 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
119 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::peek_dequeue_head (ACE_MESSAGE_TYPE
*&first_item
,
120 ACE_Time_Value
*timeout
)
122 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::peek_dequeue_head");
124 ACE_Message_Block
*mb
= 0;
126 int const cur_count
= this->queue_
.peek_dequeue_head (mb
, timeout
);
129 first_item
= reinterpret_cast<ACE_MESSAGE_TYPE
*> (mb
->base ());
134 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
135 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_head (ACE_MESSAGE_TYPE
*new_item
,
136 ACE_Time_Value
*timeout
)
138 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_head");
140 ACE_Message_Block
*mb
= 0;
143 ACE_Message_Block ((char *) new_item
,
145 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::DEFAULT_PRIORITY
),
148 int const result
= this->queue_
.enqueue_head (mb
, timeout
);
155 // Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in
156 // accordance with its <msg_priority> (0 is lowest priority). Returns
157 // -1 on failure, else the number of items still on the queue.
159 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
160 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::enqueue (ACE_MESSAGE_TYPE
*new_item
,
161 ACE_Time_Value
*timeout
)
163 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue");
165 return this->enqueue_prio (new_item
, timeout
);
168 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
169 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_prio (ACE_MESSAGE_TYPE
*new_item
,
170 ACE_Time_Value
*timeout
,
171 unsigned long priority
)
173 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_prio");
175 ACE_Message_Block
*mb
= 0;
178 ACE_Message_Block ((char *) new_item
,
183 int const result
= this->queue_
.enqueue_prio (mb
, timeout
);
191 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
192 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_deadline (ACE_MESSAGE_TYPE
*new_item
,
193 ACE_Time_Value
*timeout
)
195 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline");
197 ACE_Message_Block
*mb
= 0;
200 ACE_Message_Block ((char *) new_item
,
202 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::DEFAULT_PRIORITY
),
205 int const result
= this->queue_
.enqueue_deadline (mb
, timeout
);
213 // Block indefinitely waiting for an item to arrive,
214 // does not ignore alerts (e.g., signals).
216 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
217 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_tail (ACE_MESSAGE_TYPE
*new_item
,
218 ACE_Time_Value
*timeout
)
220 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail");
222 ACE_Message_Block
*mb
= 0;
225 ACE_Message_Block ((char *) new_item
,
227 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::DEFAULT_PRIORITY
),
230 int const result
= this->queue_
.enqueue_tail (mb
, timeout
);
237 // Remove an item from the front of the queue. If timeout == 0 block
238 // indefinitely (or until an alert occurs). Otherwise, block for upto
239 // the amount of time specified by timeout.
241 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
242 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_head (ACE_MESSAGE_TYPE
*&first_item
,
243 ACE_Time_Value
*timeout
)
245 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_head");
247 ACE_Message_Block
*mb
= 0;
249 int const cur_count
= this->queue_
.dequeue_head (mb
, timeout
);
251 // Dequeue the message.
254 first_item
= reinterpret_cast<ACE_MESSAGE_TYPE
*> (mb
->base ());
255 // Delete the message block.
262 // Remove the item with the lowest priority from the queue. If timeout == 0
263 // block indefinitely (or until an alert occurs). Otherwise, block for upto
264 // the amount of time specified by timeout.
266 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
267 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_prio (ACE_MESSAGE_TYPE
*&dequeued
,
268 ACE_Time_Value
*timeout
)
270 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio");
272 ACE_Message_Block
*mb
= 0;
274 int const cur_count
= this->queue_
.dequeue_prio (mb
, timeout
);
276 // Dequeue the message.
279 dequeued
= reinterpret_cast<ACE_MESSAGE_TYPE
*> (mb
->base ());
280 // Delete the message block.
287 // Remove an item from the end of the queue. If timeout == 0 block
288 // indefinitely (or until an alert occurs). Otherwise, block for upto
289 // the amount of time specified by timeout.
291 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
292 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_tail (ACE_MESSAGE_TYPE
*&dequeued
,
293 ACE_Time_Value
*timeout
)
295 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail");
297 ACE_Message_Block
*mb
= 0;
299 int const cur_count
= this->queue_
.dequeue_tail (mb
, timeout
);
301 // Dequeue the message.
304 dequeued
= reinterpret_cast<ACE_MESSAGE_TYPE
*> (mb
->base ());
305 // Delete the message block.
312 // Remove an item with the lowest deadline time. If timeout == 0 block
313 // indefinitely (or until an alert occurs). Otherwise, block for upto
314 // the amount of time specified by timeout.
316 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
317 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_deadline (ACE_MESSAGE_TYPE
*&dequeued
,
318 ACE_Time_Value
*timeout
)
320 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline");
322 ACE_Message_Block
*mb
= 0;
324 int const cur_count
= this->queue_
.dequeue_deadline (mb
, timeout
);
326 // Dequeue the message.
329 dequeued
= reinterpret_cast<ACE_MESSAGE_TYPE
*> (mb
->base ());
330 // Delete the message block.
337 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
338 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::notify ()
340 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notify");
342 return this->queue_
.notify ();
345 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
346 ACE_Message_Queue_Ex_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::
347 ACE_Message_Queue_Ex_Iterator (ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
> & queue
)
348 : iter_ (queue
.queue_
)
352 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
353 ACE_Message_Queue_Ex_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::
354 next (ACE_MESSAGE_TYPE
*&entry
)
356 ACE_Message_Block
* mb
= 0;
357 int retval
= this->iter_
.next (mb
);
360 entry
= reinterpret_cast<ACE_MESSAGE_TYPE
*> (mb
->base ());
365 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
366 ACE_Message_Queue_Ex_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::done () const
368 return this->iter_
.done ();
371 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
372 ACE_Message_Queue_Ex_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::advance ()
374 return this->iter_
.advance ();
377 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> void
378 ACE_Message_Queue_Ex_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::dump () const
383 ACE_ALLOC_HOOK_DEFINE_Tcyc(ACE_Message_Queue_Ex_Iterator
)
385 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
386 ACE_Message_Queue_Ex_Reverse_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::
387 ACE_Message_Queue_Ex_Reverse_Iterator (ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
> & queue
)
388 : iter_ (queue
.queue_
)
392 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
393 ACE_Message_Queue_Ex_Reverse_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::
394 next (ACE_MESSAGE_TYPE
*&entry
)
396 ACE_Message_Block
* mb
= 0;
397 int retval
= this->iter_
.next (mb
);
400 entry
= reinterpret_cast<ACE_MESSAGE_TYPE
*> (mb
->base ());
405 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
406 ACE_Message_Queue_Ex_Reverse_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::done () const
408 return this->iter_
.done ();
411 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
412 ACE_Message_Queue_Ex_Reverse_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::advance ()
414 return this->iter_
.advance ();
417 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> void
418 ACE_Message_Queue_Ex_Reverse_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::dump () const
423 ACE_ALLOC_HOOK_DEFINE_Tcyc(ACE_Message_Queue_Ex_Reverse_Iterator
)
425 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
426 ACE_Message_Queue_Ex_N
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Message_Queue_Ex_N
427 (size_t high_water_mark
,
428 size_t low_water_mark
,
429 ACE_Notification_Strategy
*ns
):
430 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
> (high_water_mark
,
434 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue_Ex_N");
437 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
438 ACE_Message_Queue_Ex_N
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::~ACE_Message_Queue_Ex_N ()
440 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue_Ex_N");
443 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
444 ACE_Message_Queue_Ex_N
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_head
445 (ACE_MESSAGE_TYPE
*new_item
,
446 ACE_Time_Value
*timeout
)
448 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_head");
450 // Create a chained ACE_Message_Blocks wrappers around the 'chained'
451 // ACE_MESSAGE_TYPES.
452 ACE_Message_Block
*mb
= this->wrap_with_mbs_i (new_item
);
458 int result
= this->queue_
.enqueue_head (mb
, timeout
);
467 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
468 ACE_Message_Queue_Ex_N
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_tail
469 (ACE_MESSAGE_TYPE
*new_item
,
470 ACE_Time_Value
*timeout
)
472 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail");
474 // Create a chained ACE_Message_Blocks wrappers around the 'chained'
475 // ACE_MESSAGE_TYPES.
476 ACE_Message_Block
*mb
= this->wrap_with_mbs_i (new_item
);
482 int result
= this->queue_
.enqueue_tail (mb
, timeout
);
491 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> ACE_Message_Block
*
492 ACE_Message_Queue_Ex_N
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::wrap_with_mbs_i
493 (ACE_MESSAGE_TYPE
*new_item
)
495 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::wrap_with_mbs_i");
497 // We need to keep a reference to the head of the chain
498 ACE_Message_Block
*mb_head
= 0;
500 ACE_NEW_RETURN (mb_head
,
501 ACE_Message_Block ((char *) new_item
,
503 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::DEFAULT_PRIORITY
),
506 // mb_tail will point to the last ACE_Message_Block
507 ACE_Message_Block
*mb_tail
= mb_head
;
509 // Run through rest of the messages and wrap them
510 for (ACE_MESSAGE_TYPE
*pobj
= new_item
->next (); pobj
; pobj
= pobj
->next ())
512 ACE_Message_Block
*mb_temp
= 0;
513 ACE_NEW_NORETURN (mb_temp
,
514 ACE_Message_Block ((char *) pobj
,
516 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::DEFAULT_PRIORITY
));
524 mb_tail
->next (mb_temp
);
531 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Message_Queue_Reverse_Iterator
)
533 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
534 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::dequeue (ACE_MESSAGE_TYPE
*&first_item
,
535 ACE_Time_Value
*timeout
)
537 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::dequeue");
539 return this->dequeue_head (first_item
, timeout
);
542 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> ACE_Notification_Strategy
*
543 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::notification_strategy ()
545 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notification_strategy");
547 return this->queue_
.notification_strategy ();
550 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> void
551 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::notification_strategy (ACE_Notification_Strategy
*s
)
553 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::notification_strategy");
555 this->queue_
.notification_strategy (s
);
558 // Check if queue is empty (holds locks).
560 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> bool
561 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::is_empty ()
563 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::is_empty");
565 return this->queue_
.is_empty ();
568 // Check if queue is full (holds locks).
570 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> bool
571 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::is_full ()
573 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::is_full");
575 return this->queue_
.is_full ();
578 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
579 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::high_water_mark ()
581 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::high_water_mark");
583 return this->queue_
.high_water_mark ();
586 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> void
587 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::high_water_mark (size_t hwm
)
589 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::high_water_mark");
591 this->queue_
.high_water_mark (hwm
);
594 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
595 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::low_water_mark ()
597 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::low_water_mark");
599 return this->queue_
.low_water_mark ();
602 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> void
603 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::low_water_mark (size_t lwm
)
605 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::low_water_mark");
607 this->queue_
.low_water_mark (lwm
);
610 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
611 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::message_bytes ()
613 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_bytes");
615 return this->queue_
.message_bytes ();
618 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
619 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::message_length ()
621 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_length");
623 return this->queue_
.message_length ();
626 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
627 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::message_count ()
629 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::message_count");
631 return this->queue_
.message_count ();
634 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
635 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::deactivate ()
637 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::deactivate");
639 return this->queue_
.deactivate ();
642 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
643 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::activate ()
645 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::activate");
647 return this->queue_
.activate ();
650 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
651 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::pulse ()
653 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::pulse");
655 return this->queue_
.pulse ();
658 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
659 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::deactivated ()
661 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::deactivated");
663 return this->queue_
.deactivated ();
666 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> int
667 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::state ()
669 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::state");
671 return this->queue_
.state ();
674 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> ACE_SYNCH_MUTEX_T
&
675 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::lock ()
677 return this->queue_
.lock ();
680 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
681 ACE_Time_Value_T
<TIME_POLICY
>
682 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::gettimeofday ()
684 return this->queue_
.gettimeofday ();
687 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
689 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::set_time_policy (TIME_POLICY
const & rhs
)
691 this->queue_
.set_time_policy (rhs
);
694 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
>
695 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &
696 ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>::queue ()
701 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
702 ACE_Message_Queue_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Message_Queue_Iterator (ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &q
)
708 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
709 ACE_Message_Queue_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::next (ACE_Message_Block
*&entry
)
711 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, m
, this->queue_
.lock_
, -1)
713 if (this->curr_
!= 0)
722 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
723 ACE_Message_Queue_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::done () const
725 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, m
, this->queue_
.lock_
, -1)
727 return this->curr_
== 0;
730 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
731 ACE_Message_Queue_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::advance ()
733 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, m
, this->queue_
.lock_
, -1)
736 this->curr_
= this->curr_
->next ();
737 return this->curr_
!= 0;
740 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
741 ACE_Message_Queue_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::dump () const
743 #if defined (ACE_HAS_DUMP)
744 #endif /* ACE_HAS_DUMP */
747 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Message_Queue_Iterator
)
749 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
750 ACE_Message_Queue_Reverse_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &q
)
752 , curr_ (queue_
.tail_
)
756 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
757 ACE_Message_Queue_Reverse_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::next (ACE_Message_Block
*&entry
)
759 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, m
, this->queue_
.lock_
, -1)
761 if (this->curr_
!= 0)
770 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
771 ACE_Message_Queue_Reverse_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::done () const
773 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, m
, this->queue_
.lock_
, -1)
775 return this->curr_
== 0;
778 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
779 ACE_Message_Queue_Reverse_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::advance ()
781 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, m
, this->queue_
.lock_
, -1)
784 this->curr_
= this->curr_
->prev ();
785 return this->curr_
!= 0;
788 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
789 ACE_Message_Queue_Reverse_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::dump () const
791 #if defined (ACE_HAS_DUMP)
792 #endif /* ACE_HAS_DUMP */
795 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
796 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue (ACE_Message_Block
*&first_item
,
797 ACE_Time_Value
*timeout
)
799 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue");
800 return this->dequeue_head (first_item
, timeout
);
803 template <ACE_SYNCH_DECL
, class TIME_POLICY
> ACE_Notification_Strategy
*
804 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::notification_strategy ()
806 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notification_strategy");
808 return this->notification_strategy_
;
811 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
812 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::notification_strategy (ACE_Notification_Strategy
*s
)
814 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notification_strategy");
816 this->notification_strategy_
= s
;
819 // Check if queue is empty (does not hold locks).
821 template <ACE_SYNCH_DECL
, class TIME_POLICY
> bool
822 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::is_empty_i ()
824 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_empty_i");
825 return this->tail_
== 0;
828 // Check if queue is full (does not hold locks).
830 template <ACE_SYNCH_DECL
, class TIME_POLICY
> bool
831 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::is_full_i ()
833 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_full_i");
834 return this->cur_bytes_
>= this->high_water_mark_
;
837 // Check if queue is empty (holds locks).
839 template <ACE_SYNCH_DECL
, class TIME_POLICY
> bool
840 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::is_empty ()
842 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_empty");
843 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, false);
845 return this->is_empty_i ();
848 // Check if queue is full (holds locks).
850 template <ACE_SYNCH_DECL
, class TIME_POLICY
> bool
851 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::is_full ()
853 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::is_full");
854 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, false);
856 return this->is_full_i ();
859 template <ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
860 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::high_water_mark ()
862 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::high_water_mark");
863 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, 0);
865 return this->high_water_mark_
;
868 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
869 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::high_water_mark (size_t hwm
)
871 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::high_water_mark");
872 ACE_GUARD (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
);
874 this->high_water_mark_
= hwm
;
877 template <ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
878 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::low_water_mark ()
880 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::low_water_mark");
881 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, 0);
883 return this->low_water_mark_
;
886 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
887 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::low_water_mark (size_t lwm
)
889 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::low_water_mark");
890 ACE_GUARD (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
);
892 this->low_water_mark_
= lwm
;
895 template <ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
896 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::message_bytes ()
898 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_bytes");
899 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, 0);
901 return this->cur_bytes_
;
904 template <ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
905 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::message_length ()
907 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_length");
908 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, 0);
910 return this->cur_length_
;
913 template <ACE_SYNCH_DECL
, class TIME_POLICY
> size_t
914 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::message_count ()
916 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_count");
917 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, 0);
919 return this->cur_count_
;
922 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
923 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::deactivate ()
925 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivate");
926 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
928 return this->deactivate_i (0); // Not a pulse
931 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
932 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::activate ()
934 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::activate");
935 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
937 return this->activate_i ();
940 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
941 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::pulse ()
943 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::pulse");
944 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
946 return this->deactivate_i (1); // Just a pulse
949 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
950 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::deactivated ()
952 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivated");
954 return this->state_
== ACE_Message_Queue_Base::DEACTIVATED
;
957 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
958 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::state ()
960 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::state");
965 template <ACE_SYNCH_DECL
, class TIME_POLICY
> ACE_SYNCH_MUTEX_T
&
966 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::lock ()
971 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
972 ACE_Time_Value_T
<TIME_POLICY
>
973 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::gettimeofday () const
975 return this->time_policy_ ();
978 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
980 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::set_time_policy (TIME_POLICY
const & rhs
)
982 this->time_policy_
= rhs
;
985 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
986 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dump () const
988 #if defined (ACE_HAS_DUMP)
989 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dump");
990 ACELIB_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
991 switch (this->state_
)
993 case ACE_Message_Queue_Base::ACTIVATED
:
994 ACELIB_DEBUG ((LM_DEBUG
,
995 ACE_TEXT ("state = ACTIVATED\n")));
997 case ACE_Message_Queue_Base::DEACTIVATED
:
998 ACELIB_DEBUG ((LM_DEBUG
,
999 ACE_TEXT ("state = DEACTIVATED\n")));
1001 case ACE_Message_Queue_Base::PULSED
:
1002 ACELIB_DEBUG ((LM_DEBUG
,
1003 ACE_TEXT ("state = PULSED\n")));
1006 ACELIB_DEBUG ((LM_DEBUG
,
1007 ACE_TEXT ("low_water_mark = %d\n")
1008 ACE_TEXT ("high_water_mark = %d\n")
1009 ACE_TEXT ("cur_bytes = %d\n")
1010 ACE_TEXT ("cur_length = %d\n")
1011 ACE_TEXT ("cur_count = %d\n")
1012 ACE_TEXT ("head_ = %u\n")
1013 ACE_TEXT ("tail_ = %u\n"),
1014 this->low_water_mark_
,
1015 this->high_water_mark_
,
1021 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("not_full_cond:\n")));
1022 not_full_cond_
.dump ();
1023 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("not_empty_cond:\n")));
1024 not_empty_cond_
.dump ();
1025 ACELIB_DEBUG ((LM_DEBUG
, ACE_END_DUMP
));
1026 #endif /* ACE_HAS_DUMP */
1029 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
1030 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::message_bytes (size_t new_value
)
1032 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_bytes");
1033 ACE_GUARD (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
);
1035 this->cur_bytes_
= new_value
;
1038 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
1039 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::message_length (size_t new_value
)
1041 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::message_length");
1042 ACE_GUARD (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
);
1044 this->cur_length_
= new_value
;
1047 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
1048 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Message_Queue (size_t hwm
,
1050 ACE_Notification_Strategy
*ns
)
1051 #if defined (ACE_HAS_THREADS)
1052 : not_empty_cond_ (lock_
, cond_attr_
)
1053 , not_full_cond_ (lock_
, cond_attr_
)
1055 : not_empty_cond_ (lock_
)
1056 , not_full_cond_ (lock_
)
1059 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::ACE_Message_Queue");
1061 if (this->open (hwm
, lwm
, ns
) == -1)
1062 ACELIB_ERROR ((LM_ERROR
,
1063 ACE_TEXT ("open")));
1065 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1066 ACE_NEW (this->monitor_
,
1067 ACE::Monitor_Control::Size_Monitor
);
1069 /// Make a unique name using our process id and hex address.
1070 char pid_buf
[sizeof (int) + 1];
1071 ACE_OS::sprintf (pid_buf
, "%d", ACE_OS::getpid ());
1072 pid_buf
[sizeof (int)] = '\0';
1074 const int addr_nibbles
= 2 * sizeof (ptrdiff_t);
1075 char addr_buf
[addr_nibbles
+ 1];
1076 ACE_OS::sprintf (addr_buf
, "%p", this);
1077 addr_buf
[addr_nibbles
] = '\0';
1079 ACE_CString
name_str ("Message_Queue_");
1080 name_str
+= pid_buf
;
1082 name_str
+= addr_buf
;
1083 this->monitor_
->name (name_str
.c_str ());
1084 this->monitor_
->add_to_registry ();
1085 #endif /* ACE_HAS_MONITOR_POINTS==1 */
1088 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
1089 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::~ACE_Message_Queue ()
1091 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Message_Queue");
1092 if (this->head_
!= 0 && this->close () == -1)
1093 ACELIB_ERROR ((LM_ERROR
,
1094 ACE_TEXT ("close")));
1096 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1097 this->monitor_
->remove_from_registry ();
1098 this->monitor_
->remove_ref ();
1099 #endif /* ACE_HAS_MONITOR_POINTS==1 */
1102 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1103 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::flush_i ()
1105 int number_flushed
= 0;
1107 // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue>
1108 // and <release> their memory.
1109 for (this->tail_
= 0; this->head_
!= 0; )
1113 size_t mb_bytes
= 0;
1114 size_t mb_length
= 0;
1115 this->head_
->total_size_and_length (mb_bytes
,
1117 // Subtract off all of the bytes associated with this message.
1118 this->cur_bytes_
-= mb_bytes
;
1119 this->cur_length_
-= mb_length
;
1122 ACE_Message_Block
*temp
= this->head_
;
1123 this->head_
= this->head_
->next ();
1125 // Make sure to use <release> rather than <delete> since this is
1126 // reference counted.
1130 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1131 // The monitor should output only if the size has actually changed.
1132 if (number_flushed
> 0)
1134 this->monitor_
->receive (this->cur_length_
);
1138 return number_flushed
;
1141 // Don't bother locking since if someone calls this function more than
1142 // once for the same queue, we're in bigger trouble than just
1143 // concurrency control!
1145 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1146 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::open (size_t hwm
,
1148 ACE_Notification_Strategy
*ns
)
1150 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::open");
1151 this->high_water_mark_
= hwm
;
1152 this->low_water_mark_
= lwm
;
1153 this->state_
= ACE_Message_Queue_Base::ACTIVATED
;
1154 this->cur_bytes_
= 0;
1155 this->cur_length_
= 0;
1156 this->cur_count_
= 0;
1159 this->notification_strategy_
= ns
;
1163 // Implementation of the public deactivate() method
1164 // (assumes locks are held).
1166 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1167 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::deactivate_i (int pulse
)
1169 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::deactivate_i");
1170 int const previous_state
= this->state_
;
1172 if (previous_state
!= ACE_Message_Queue_Base::DEACTIVATED
)
1174 // Wakeup all waiters.
1175 this->not_empty_cond_
.broadcast ();
1176 this->not_full_cond_
.broadcast ();
1179 this->state_
= ACE_Message_Queue_Base::PULSED
;
1181 this->state_
= ACE_Message_Queue_Base::DEACTIVATED
;
1184 return previous_state
;
1187 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1188 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::activate_i ()
1190 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::activate_i");
1191 int const previous_state
= this->state_
;
1192 this->state_
= ACE_Message_Queue_Base::ACTIVATED
;
1193 return previous_state
;
1196 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1197 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::flush ()
1199 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::flush");
1200 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1202 // Free up the remaining messages on the queue.
1203 return this->flush_i ();
1206 // Clean up the queue if we have not already done so!
1208 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1209 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::close ()
1211 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::close");
1212 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1214 // There's no need to check the return value of deactivate_i() since
1216 this->deactivate_i ();
1218 // Free up the remaining messages on the queue.
1219 return this->flush_i ();
1222 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1223 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::signal_enqueue_waiters ()
1225 if (this->not_full_cond_
.signal () != 0)
1230 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1231 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::signal_dequeue_waiters ()
1233 // Tell any blocked threads that the queue has a new item!
1234 if (this->not_empty_cond_
.signal () != 0)
1239 // Actually put the node at the end (no locking so must be called with
1242 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1243 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_tail_i (ACE_Message_Block
*new_item
)
1245 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail_i");
1250 // Update the queued size and length, taking into account any chained
1251 // blocks (total_size_and_length() counts all continuation blocks).
1252 // Keep count of how many blocks we're adding and, if there is a chain of
1253 // blocks, find the end in seq_tail and be sure they're properly
1254 // back-connected along the way.
1255 ACE_Message_Block
*seq_tail
= new_item
;
1257 new_item
->total_size_and_length (this->cur_bytes_
,
1259 while (seq_tail
->next () != 0)
1261 seq_tail
->next ()->prev (seq_tail
);
1262 seq_tail
= seq_tail
->next ();
1264 seq_tail
->total_size_and_length (this->cur_bytes_
,
1268 // List was empty, so build a new one.
1269 if (this->tail_
== 0)
1271 this->head_
= new_item
;
1272 this->tail_
= seq_tail
;
1273 // seq_tail->next (0); This is a condition of the while() loop above.
1279 // seq_tail->next (0); This is a condition of the while() loop above.
1280 this->tail_
->next (new_item
);
1281 new_item
->prev (this->tail_
);
1282 this->tail_
= seq_tail
;
1285 if (this->signal_dequeue_waiters () == -1)
1288 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
1291 // Actually put the node(s) at the head (no locking)
1293 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1294 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_head_i (ACE_Message_Block
*new_item
)
1296 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head_i");
1301 // Update the queued size and length, taking into account any chained
1302 // blocks (total_size_and_length() counts all continuation blocks).
1303 // Keep count of how many blocks we're adding and, if there is a chain of
1304 // blocks, find the end in seq_tail and be sure they're properly
1305 // back-connected along the way.
1306 ACE_Message_Block
*seq_tail
= new_item
;
1308 new_item
->total_size_and_length (this->cur_bytes_
,
1310 while (seq_tail
->next () != 0)
1312 seq_tail
->next ()->prev (seq_tail
);
1313 seq_tail
= seq_tail
->next ();
1315 seq_tail
->total_size_and_length (this->cur_bytes_
,
1320 seq_tail
->next (this->head_
);
1322 if (this->head_
!= 0)
1323 this->head_
->prev (seq_tail
);
1325 this->tail_
= seq_tail
;
1327 this->head_
= new_item
;
1329 if (this->signal_dequeue_waiters () == -1)
1332 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
1335 // Actually put the node at its proper position relative to its
1338 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1339 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_i (ACE_Message_Block
*new_item
)
1341 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_i");
1346 // Since this method uses enqueue_head_i() and enqueue_tail_i() for
1347 // special situations, and this method doesn't support enqueueing
1348 // chains of blocks off the 'next' pointer, make sure the new_item's
1349 // next pointer is 0.
1352 if (this->head_
== 0)
1353 // Check for simple case of an empty queue, where all we need to
1354 // do is insert <new_item> into the head.
1355 return this->enqueue_head_i (new_item
);
1358 ACE_Message_Block
*temp
= 0;
1360 // Figure out where the new item goes relative to its priority.
1361 // We start looking from the lowest priority (at the tail) to
1362 // the highest priority (at the head).
1364 for (temp
= this->tail_
;
1366 temp
= temp
->prev ())
1367 if (temp
->msg_priority () >= new_item
->msg_priority ())
1368 // Break out when we've located an item that has
1369 // greater or equal priority.
1373 // Check for simple case of inserting at the head of the queue,
1374 // where all we need to do is insert <new_item> before the
1376 return this->enqueue_head_i (new_item
);
1377 else if (temp
->next () == 0)
1378 // Check for simple case of inserting at the tail of the
1379 // queue, where all we need to do is insert <new_item> after
1380 // the current tail.
1381 return this->enqueue_tail_i (new_item
);
1384 // Insert the new message behind the message of greater or
1385 // equal priority. This ensures that FIFO order is
1386 // maintained when messages of the same priority are
1387 // inserted consecutively.
1388 new_item
->prev (temp
);
1389 new_item
->next (temp
->next ());
1390 temp
->next ()->prev (new_item
);
1391 temp
->next (new_item
);
1395 // Make sure to count all the bytes in a composite message!!!
1396 new_item
->total_size_and_length (this->cur_bytes_
,
1400 if (this->signal_dequeue_waiters () == -1)
1403 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
1406 // Actually put the node at its proper position relative to its
1409 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1410 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_deadline_i (ACE_Message_Block
*new_item
)
1412 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
1413 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline_i");
1418 // Since this method uses enqueue_head_i() and enqueue_tail_i() for
1419 // special situations, and this method doesn't support enqueueing
1420 // chains of blocks off the 'next' pointer, make sure the new_item's
1421 // next pointer is 0.
1424 if (this->head_
== 0)
1425 // Check for simple case of an empty queue, where all we need to
1426 // do is insert <new_item> into the head.
1427 return this->enqueue_head_i (new_item
);
1430 ACE_Message_Block
*temp
= 0;
1432 // Figure out where the new item goes relative to its priority.
1433 // We start looking from the smallest deadline to the highest
1436 for (temp
= this->head_
;
1438 temp
= temp
->next ())
1439 if (new_item
->msg_deadline_time () < temp
->msg_deadline_time ())
1440 // Break out when we've located an item that has
1441 // greater or equal priority.
1444 if (temp
== 0 || temp
->next () == 0)
1445 // Check for simple case of inserting at the tail of the queue,
1446 // where all we need to do is insert <new_item> after the
1448 return this->enqueue_tail_i (new_item
);
1451 // Insert the new message behind the message of
1452 // lesser or equal deadline time. This ensures that FIFO order is
1453 // maintained when messages of the same priority are
1454 // inserted consecutively.
1455 new_item
->prev (temp
);
1456 new_item
->next (temp
->next ());
1457 temp
->next ()->prev (new_item
);
1458 temp
->next (new_item
);
1462 // Make sure to count all the bytes in a composite message!!!
1463 new_item
->total_size_and_length (this->cur_bytes_
,
1467 if (this->signal_dequeue_waiters () == -1)
1470 return this->cur_count_
;
1472 return this->enqueue_tail_i (new_item
);
1473 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
1476 // Actually get the first ACE_Message_Block (no locking, so must be
1477 // called with locks held). This method assumes that the queue has at
1478 // least one item in it when it is called.
1480 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1481 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_head_i (ACE_Message_Block
*&first_item
)
1483 if (this->head_
==0)
1484 ACELIB_ERROR_RETURN ((LM_ERROR
,
1485 ACE_TEXT ("Attempting to dequeue from empty queue")),
1487 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head_i");
1488 first_item
= this->head_
;
1489 this->head_
= this->head_
->next ();
1491 if (this->head_
== 0)
1494 // The prev pointer of first message block must point to 0...
1495 this->head_
->prev (0);
1497 size_t mb_bytes
= 0;
1498 size_t mb_length
= 0;
1499 first_item
->total_size_and_length (mb_bytes
,
1501 // Subtract off all of the bytes associated with this message.
1502 this->cur_bytes_
-= mb_bytes
;
1503 this->cur_length_
-= mb_length
;
1506 if (this->cur_count_
== 0 && this->head_
== this->tail_
)
1507 this->head_
= this->tail_
= 0;
1509 // Make sure that the prev and next fields are 0!
1510 first_item
->prev (0);
1511 first_item
->next (0);
1513 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1514 this->monitor_
->receive (this->cur_length_
);
1517 // Only signal enqueueing threads if we've fallen below the low
1519 if (this->cur_bytes_
<= this->low_water_mark_
1520 && this->signal_enqueue_waiters () == -1)
1523 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
1526 // Get the earliest (i.e., FIFO) ACE_Message_Block with the lowest
1527 // priority (no locking, so must be called with locks held). This
1528 // method assumes that the queue has at least one item in it when it
1531 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1532 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_prio_i (ACE_Message_Block
*&dequeued
)
1534 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio_i");
1536 if (this->head_
== 0)
1539 // Find the earliest (i.e., FIFO) message enqueued with the lowest
1541 ACE_Message_Block
*chosen
= 0;
1542 u_long priority
= ULONG_MAX
;
1544 for (ACE_Message_Block
*temp
= this->tail_
;
1546 temp
= temp
->prev ())
1548 // Find the first version of the earliest message (i.e.,
1549 // preserve FIFO order for messages at the same priority).
1550 if (temp
->msg_priority () <= priority
)
1552 priority
= temp
->msg_priority ();
1557 // If every message block is the same priority, pass back the first
1560 chosen
= this->head_
;
1562 // Patch up the queue. If we don't have a previous then we are at
1563 // the head of the queue.
1564 if (chosen
->prev () == 0)
1565 this->head_
= chosen
->next ();
1567 chosen
->prev ()->next (chosen
->next ());
1569 if (chosen
->next () == 0)
1570 this->tail_
= chosen
->prev ();
1572 chosen
->next ()->prev (chosen
->prev ());
1574 // Pass back the chosen block
1577 size_t mb_bytes
= 0;
1578 size_t mb_length
= 0;
1579 dequeued
->total_size_and_length (mb_bytes
,
1581 // Subtract off all of the bytes associated with this message.
1582 this->cur_bytes_
-= mb_bytes
;
1583 this->cur_length_
-= mb_length
;
1586 if (this->cur_count_
== 0 && this->head_
== this->tail_
)
1587 this->head_
= this->tail_
= 0;
1589 // Make sure that the prev and next fields are 0!
1593 // Only signal enqueueing threads if we've fallen below the low
1595 if (this->cur_bytes_
<= this->low_water_mark_
1596 && this->signal_enqueue_waiters () == -1)
1599 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
1602 // Actually get the last ACE_Message_Block (no locking, so must be
1603 // called with locks held). This method assumes that the queue has at
1604 // least one item in it when it is called.
1606 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1607 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_tail_i (ACE_Message_Block
*&dequeued
)
1609 if (this->head_
== 0)
1610 ACELIB_ERROR_RETURN ((LM_ERROR
,
1611 ACE_TEXT ("Attempting to dequeue from empty queue")),
1613 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail_i");
1614 dequeued
= this->tail_
;
1615 if (this->tail_
->prev () == 0)
1622 this->tail_
->prev ()->next (0);
1623 this->tail_
= this->tail_
->prev ();
1626 size_t mb_bytes
= 0;
1627 size_t mb_length
= 0;
1628 dequeued
->total_size_and_length (mb_bytes
,
1630 // Subtract off all of the bytes associated with this message.
1631 this->cur_bytes_
-= mb_bytes
;
1632 this->cur_length_
-= mb_length
;
1635 if (this->cur_count_
== 0 && this->head_
== this->tail_
)
1636 this->head_
= this->tail_
= 0;
1638 // Make sure that the prev and next fields are 0!
1642 // Only signal enqueueing threads if we've fallen below the low
1644 if (this->cur_bytes_
<= this->low_water_mark_
1645 && this->signal_enqueue_waiters () == -1)
1648 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
1651 // Actually get the ACE_Message_Block with the lowest deadline time
1652 // (no locking, so must be called with locks held). This method assumes
1653 // that the queue has at least one item in it when it is called.
1655 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1656 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_deadline_i (ACE_Message_Block
*&dequeued
)
1658 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
1659 if (this->head_
== 0)
1660 ACELIB_ERROR_RETURN ((LM_ERROR
,
1661 ACE_TEXT ("Attempting to dequeue from empty queue")),
1663 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline_i");
1665 // Find the last message enqueued with the lowest deadline time
1666 ACE_Message_Block
* chosen
= 0;
1667 ACE_Time_Value deadline
= ACE_Time_Value::max_time
;
1668 for (ACE_Message_Block
*temp
= this->head_
; temp
!= 0; temp
= temp
->next ())
1669 if (temp
->msg_deadline_time () < deadline
)
1671 deadline
= temp
->msg_deadline_time ();
1675 // If every message block is the same deadline time,
1676 // pass back the first one
1678 chosen
= this->head_
;
1680 // Patch up the queue. If we don't have a previous
1681 // then we are at the head of the queue.
1682 if (chosen
->prev () == 0)
1683 this->head_
= chosen
->next ();
1685 chosen
->prev ()->next (chosen
->next ());
1687 if (chosen
->next () == 0)
1688 this->tail_
= chosen
->prev ();
1690 chosen
->next ()->prev (chosen
->prev ());
1692 // Pass back the chosen block
1695 size_t mb_bytes
= 0;
1696 size_t mb_length
= 0;
1697 dequeued
->total_size_and_length (mb_bytes
,
1699 // Subtract off all of the bytes associated with this message.
1700 this->cur_bytes_
-= mb_bytes
;
1701 this->cur_length_
-= mb_length
;
1704 if (this->cur_count_
== 0 && this->head_
== this->tail_
)
1705 this->head_
= this->tail_
= 0;
1707 // Make sure that the prev and next fields are 0!
1711 // Only signal enqueueing threads if we've fallen below the low
1713 if (this->cur_bytes_
<= this->low_water_mark_
1714 && this->signal_enqueue_waiters () == -1)
1717 return this->cur_count_
;
1719 return this->dequeue_head_i (dequeued
);
1720 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
1723 // Take a look at the first item without removing it.
1725 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1726 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::peek_dequeue_head (ACE_Message_Block
*&first_item
,
1727 ACE_Time_Value
*timeout
)
1729 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::peek_dequeue_head");
1730 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1732 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
1738 // Wait for at least one item to become available.
1739 if (this->wait_not_empty_cond (timeout
) == -1)
1742 first_item
= this->head_
;
1743 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
1746 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1747 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::wait_not_full_cond (ACE_Time_Value
*timeout
)
1751 // Wait while the queue is full.
1753 while (this->is_full_i ())
1755 if (this->not_full_cond_
.wait (timeout
) == -1)
1758 errno
= EWOULDBLOCK
;
1762 if (this->state_
!= ACE_Message_Queue_Base::ACTIVATED
)
1772 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1773 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::wait_not_empty_cond (ACE_Time_Value
*timeout
)
1777 // Wait while the queue is empty.
1779 while (this->is_empty_i ())
1781 if (this->not_empty_cond_
.wait (timeout
) == -1)
1784 errno
= EWOULDBLOCK
;
1788 if (this->state_
!= ACE_Message_Queue_Base::ACTIVATED
)
1798 // Block indefinitely waiting for an item to arrive, does not ignore
1799 // alerts (e.g., signals).
1801 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1802 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_head (ACE_Message_Block
*new_item
,
1803 ACE_Time_Value
*timeout
)
1805 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head");
1806 int queue_count
= 0;
1807 ACE_Notification_Strategy
*notifier
= 0;
1809 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1811 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
1817 if (this->wait_not_full_cond (timeout
) == -1)
1820 queue_count
= this->enqueue_head_i (new_item
);
1821 if (queue_count
== -1)
1824 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1825 this->monitor_
->receive (this->cur_length_
);
1827 notifier
= this->notification_strategy_
;
1835 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
1836 // accordance with its <msg_priority> (0 is lowest priority). Returns
1837 // -1 on failure, else the number of items still on the queue.
1839 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1840 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_prio (ACE_Message_Block
*new_item
,
1841 ACE_Time_Value
*timeout
)
1843 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_prio");
1844 int queue_count
= 0;
1845 ACE_Notification_Strategy
*notifier
= 0;
1847 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1849 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
1855 if (this->wait_not_full_cond (timeout
) == -1)
1858 queue_count
= this->enqueue_i (new_item
);
1860 if (queue_count
== -1)
1863 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1864 this->monitor_
->receive (this->cur_length_
);
1866 notifier
= this->notification_strategy_
;
1869 notifier
->notify ();
1873 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
1874 // accordance with its <msg_deadline_time>. Returns
1875 // -1 on failure, else the number of items still on the queue.
1877 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1878 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_deadline (ACE_Message_Block
*new_item
,
1879 ACE_Time_Value
*timeout
)
1881 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_deadline");
1882 int queue_count
= 0;
1883 ACE_Notification_Strategy
*notifier
= 0;
1885 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1887 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
1893 if (this->wait_not_full_cond (timeout
) == -1)
1896 queue_count
= this->enqueue_deadline_i (new_item
);
1898 if (queue_count
== -1)
1901 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1902 this->monitor_
->receive (this->cur_length_
);
1904 notifier
= this->notification_strategy_
;
1907 notifier
->notify ();
1911 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1912 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue (ACE_Message_Block
*new_item
,
1913 ACE_Time_Value
*timeout
)
1915 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue");
1916 return this->enqueue_prio (new_item
, timeout
);
1919 // Block indefinitely waiting for an item to arrive,
1920 // does not ignore alerts (e.g., signals).
1922 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1923 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_tail (ACE_Message_Block
*new_item
,
1924 ACE_Time_Value
*timeout
)
1926 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail");
1927 int queue_count
= 0;
1928 ACE_Notification_Strategy
*notifier
= 0;
1930 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1932 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
1938 if (this->wait_not_full_cond (timeout
) == -1)
1941 queue_count
= this->enqueue_tail_i (new_item
);
1943 if (queue_count
== -1)
1946 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
1947 this->monitor_
->receive (this->cur_length_
);
1949 notifier
= this->notification_strategy_
;
1952 notifier
->notify ();
1956 // Remove an item from the front of the queue. If timeout == 0 block
1957 // indefinitely (or until an alert occurs). Otherwise, block for upto
1958 // the amount of time specified by timeout.
1960 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1961 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_head (ACE_Message_Block
*&first_item
,
1962 ACE_Time_Value
*timeout
)
1964 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head");
1965 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1967 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
1973 if (this->wait_not_empty_cond (timeout
) == -1)
1976 return this->dequeue_head_i (first_item
);
1979 // Remove item with the lowest priority from the queue. If timeout == 0 block
1980 // indefinitely (or until an alert occurs). Otherwise, block for upto
1981 // the amount of time specified by timeout.
1983 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
1984 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_prio (ACE_Message_Block
*&dequeued
,
1985 ACE_Time_Value
*timeout
)
1987 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_prio");
1988 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
1990 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
1996 if (this->wait_not_empty_cond (timeout
) == -1)
1999 return this->dequeue_prio_i (dequeued
);
2002 // Remove an item from the end of the queue. If timeout == 0 block
2003 // indefinitely (or until an alert occurs). Otherwise, block for upto
2004 // the amount of time specified by timeout.
2006 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2007 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_tail (ACE_Message_Block
*&dequeued
,
2008 ACE_Time_Value
*timeout
)
2010 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_tail");
2011 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
2013 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
2019 if (this->wait_not_empty_cond (timeout
) == -1)
2022 return this->dequeue_tail_i (dequeued
);
2025 // Remove an item with the lowest deadline time. If timeout == 0 block
2026 // indefinitely (or until an alert occurs). Otherwise, block for upto
2027 // the amount of time specified by timeout.
2029 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2030 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_deadline (ACE_Message_Block
*&dequeued
,
2031 ACE_Time_Value
*timeout
)
2033 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_deadline");
2034 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
2036 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
2042 if (this->wait_not_empty_cond (timeout
) == -1)
2045 return this->dequeue_deadline_i (dequeued
);
2048 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2049 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::notify ()
2051 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::notify");
2053 // By default, don't do anything.
2054 if (this->notification_strategy_
== 0)
2057 return this->notification_strategy_
->notify ();
2060 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
2061 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy
& message_strategy
,
2064 ACE_Notification_Strategy
*ns
)
2065 : ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> (hwm
, lwm
, ns
),
2070 beyond_late_head_ (0),
2071 beyond_late_tail_ (0),
2072 message_strategy_ (message_strategy
)
2074 // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
2075 // for the passed ACE_Dynamic_Message_Strategy object, and deletes
2076 // it in its own dtor
2079 // dtor: free message strategy and let base class dtor do the rest.
2081 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
2082 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::~ACE_Dynamic_Message_Queue ()
2084 delete &this->message_strategy_
;
2087 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2088 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::remove_messages (ACE_Message_Block
*&list_head
,
2089 ACE_Message_Block
*&list_tail
,
2092 // start with an empty list
2096 // Get the current time
2097 ACE_Time_Value current_time
= ACE_OS::gettimeofday ();
2099 // Refresh priority status boundaries in the queue.
2100 int result
= this->refresh_queue (current_time
);
2104 if (ACE_BIT_ENABLED (status_flags
,
2105 (u_int
) ACE_Dynamic_Message_Strategy::PENDING
)
2106 && this->pending_head_
2107 && this->pending_tail_
)
2109 // patch up pointers for the new tail of the queue
2110 if (this->pending_head_
->prev ())
2112 this->tail_
= this->pending_head_
->prev ();
2113 this->pending_head_
->prev ()->next (0);
2117 // the list has become empty
2122 // point to the head and tail of the list
2123 list_head
= this->pending_head_
;
2124 list_tail
= this->pending_tail_
;
2126 // cut the pending messages out of the queue entirely
2127 this->pending_head_
->prev (0);
2128 this->pending_head_
= 0;
2129 this->pending_tail_
= 0;
2132 if (ACE_BIT_ENABLED (status_flags
,
2133 (u_int
) ACE_Dynamic_Message_Strategy::LATE
)
2135 && this->late_tail_
)
2137 // Patch up pointers for the (possibly) new head and tail of the
2139 if (this->late_tail_
->next ())
2140 this->late_tail_
->next ()->prev (this->late_head_
->prev ());
2142 this->tail_
= this->late_head_
->prev ();
2144 if (this->late_head_
->prev ())
2145 this->late_head_
->prev ()->next (this->late_tail_
->next ());
2147 this->head_
= this->late_tail_
->next ();
2149 // put late messages behind pending messages (if any) being returned
2150 this->late_head_
->prev (list_tail
);
2152 list_tail
->next (this->late_head_
);
2154 list_head
= this->late_head_
;
2156 list_tail
= this->late_tail_
;
2158 this->late_tail_
->next (0);
2159 this->late_head_
= 0;
2160 this->late_tail_
= 0;
2163 if (ACE_BIT_ENABLED (status_flags
,
2164 (u_int
) ACE_Dynamic_Message_Strategy::BEYOND_LATE
)
2165 && this->beyond_late_head_
2166 && this->beyond_late_tail_
)
2168 // Patch up pointers for the new tail of the queue
2169 if (this->beyond_late_tail_
->next ())
2171 this->head_
= this->beyond_late_tail_
->next ();
2172 this->beyond_late_tail_
->next ()->prev (0);
2176 // the list has become empty
2181 // Put beyond late messages at the end of the list being
2185 this->beyond_late_head_
->prev (list_tail
);
2186 list_tail
->next (this->beyond_late_head_
);
2189 list_head
= this->beyond_late_head_
;
2191 list_tail
= this->beyond_late_tail_
;
2193 this->beyond_late_tail_
->next (0);
2194 this->beyond_late_head_
= 0;
2195 this->beyond_late_tail_
= 0;
2198 // Decrement message and size counts for removed messages.
2199 ACE_Message_Block
*temp1
;
2201 for (temp1
= list_head
;
2203 temp1
= temp1
->next ())
2207 size_t mb_bytes
= 0;
2208 size_t mb_length
= 0;
2209 temp1
->total_size_and_length (mb_bytes
,
2211 // Subtract off all of the bytes associated with this message.
2212 this->cur_bytes_
-= mb_bytes
;
2213 this->cur_length_
-= mb_length
;
2219 // Detach all messages with status given in the passed flags from the
2220 // queue and return them by setting passed head and tail pointers to
2221 // the linked list they comprise. This method is intended primarily
2222 // as a means of periodically harvesting messages that have missed
2223 // their deadlines, but is available in its most general form. All
2224 // messages are returned in priority order, from head to tail, as of
2225 // the time this method was called.
2227 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2228 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_head (ACE_Message_Block
*&first_item
,
2229 ACE_Time_Value
*timeout
)
2231 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head");
2233 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
2235 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
2243 // get the current time
2244 ACE_Time_Value current_time
= ACE_OS::gettimeofday ();
2246 // refresh priority status boundaries in the queue
2247 result
= this->refresh_queue (current_time
);
2251 // *now* it's appropriate to wait for an enqueued item
2252 result
= this->wait_not_empty_cond (timeout
);
2256 // call the internal dequeue method, which selects an item from the
2257 // highest priority status portion of the queue that has messages
2259 result
= this->dequeue_head_i (first_item
);
2264 // Dequeue and return the <ACE_Message_Block *> at the (logical) head
2267 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
2268 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dump () const
2270 #if defined (ACE_HAS_DUMP)
2271 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dump");
2272 ACELIB_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
2274 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> (base class):\n")));
2275 this->ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dump ();
2277 ACELIB_DEBUG ((LM_DEBUG
,
2278 ACE_TEXT ("pending_head_ = %u\n")
2279 ACE_TEXT ("pending_tail_ = %u\n")
2280 ACE_TEXT ("late_head_ = %u\n")
2281 ACE_TEXT ("late_tail_ = %u\n")
2282 ACE_TEXT ("beyond_late_head_ = %u\n")
2283 ACE_TEXT ("beyond_late_tail_ = %u\n"),
2284 this->pending_head_
,
2285 this->pending_tail_
,
2288 this->beyond_late_head_
,
2289 this->beyond_late_tail_
));
2291 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("message_strategy_ :\n")));
2292 message_strategy_
.dump ();
2294 ACELIB_DEBUG ((LM_DEBUG
, ACE_END_DUMP
));
2295 #endif /* ACE_HAS_DUMP */
2297 // dump the state of the queue
2299 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2300 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_i (ACE_Message_Block
*new_item
)
2302 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_i");
2311 // Get the current time.
2312 ACE_Time_Value current_time
= ACE_OS::gettimeofday ();
2314 // Refresh priority status boundaries in the queue.
2316 result
= this->refresh_queue (current_time
);
2323 // Where we enqueue depends on the message's priority status.
2324 switch (message_strategy_
.priority_status (*new_item
,
2327 case ACE_Dynamic_Message_Strategy::PENDING
:
2328 if (this->pending_tail_
== 0)
2330 // Check for simple case of an empty pending queue, where
2331 // all we need to do is insert <new_item> into the tail of
2333 pending_head_
= new_item
;
2334 pending_tail_
= pending_head_
;
2335 return this->enqueue_tail_i (new_item
);
2339 // Enqueue the new message in priority order in the pending
2341 result
= sublist_enqueue_i (new_item
,
2343 this->pending_head_
,
2344 this->pending_tail_
,
2345 ACE_Dynamic_Message_Strategy::PENDING
);
2349 case ACE_Dynamic_Message_Strategy::LATE
:
2350 if (this->late_tail_
== 0)
2352 late_head_
= new_item
;
2353 late_tail_
= late_head_
;
2355 if (this->pending_head_
== 0)
2356 // Check for simple case of an empty pending queue,
2357 // where all we need to do is insert <new_item> into the
2358 // tail of the queue.
2359 return this->enqueue_tail_i (new_item
);
2360 else if (this->beyond_late_tail_
== 0)
2361 // Check for simple case of an empty beyond late queue, where all
2362 // we need to do is insert <new_item> into the head of the queue.
2363 return this->enqueue_head_i (new_item
);
2366 // Otherwise, we can just splice the new message in
2367 // between the pending and beyond late portions of the
2369 this->beyond_late_tail_
->next (new_item
);
2370 new_item
->prev (this->beyond_late_tail_
);
2371 this->pending_head_
->prev (new_item
);
2372 new_item
->next (this->pending_head_
);
2377 // Enqueue the new message in priority order in the late
2379 result
= sublist_enqueue_i (new_item
,
2383 ACE_Dynamic_Message_Strategy::LATE
);
2387 case ACE_Dynamic_Message_Strategy::BEYOND_LATE
:
2388 if (this->beyond_late_tail_
== 0)
2390 // Check for simple case of an empty beyond late queue,
2391 // where all we need to do is insert <new_item> into the
2392 // head of the queue.
2393 beyond_late_head_
= new_item
;
2394 beyond_late_tail_
= beyond_late_head_
;
2395 return this->enqueue_head_i (new_item
);
2399 // all beyond late messages have the same (zero) priority,
2400 // so just put the new one at the end of the beyond late
2402 if (this->beyond_late_tail_
->next ())
2404 this->beyond_late_tail_
->next ()->prev (new_item
);
2408 this->tail_
= new_item
;
2411 new_item
->next (this->beyond_late_tail_
->next ());
2412 this->beyond_late_tail_
->next (new_item
);
2413 new_item
->prev (this->beyond_late_tail_
);
2414 this->beyond_late_tail_
= new_item
;
2419 // should never get here, but just in case...
2430 size_t mb_bytes
= 0;
2431 size_t mb_length
= 0;
2432 new_item
->total_size_and_length (mb_bytes
,
2434 this->cur_bytes_
+= mb_bytes
;
2435 this->cur_length_
+= mb_length
;
2438 if (this->signal_dequeue_waiters () == -1)
2444 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
2448 // Enqueue an <ACE_Message_Block *> in accordance with its priority.
2449 // priority may be *dynamic* or *static* or a combination or *both* It
2450 // calls the priority evaluation function passed into the Dynamic
2451 // Message Queue constructor to update the priorities of all enqueued
2454 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2455 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::sublist_enqueue_i (ACE_Message_Block
*new_item
,
2456 const ACE_Time_Value
¤t_time
,
2457 ACE_Message_Block
*&sublist_head
,
2458 ACE_Message_Block
*&sublist_tail
,
2459 ACE_Dynamic_Message_Strategy::Priority_Status status
)
2462 ACE_Message_Block
*current_item
= 0;
2464 // Find message after which to enqueue new item, based on message
2465 // priority and priority status.
2466 for (current_item
= sublist_tail
;
2468 current_item
= current_item
->prev ())
2470 if (message_strategy_
.priority_status (*current_item
, current_time
) == status
)
2472 if (current_item
->msg_priority () >= new_item
->msg_priority ())
2477 sublist_head
= new_item
;
2482 if (current_item
== 0)
2484 // If the new message has highest priority of any, put it at the
2485 // head of the list (and sublist).
2487 new_item
->next (this->head_
);
2488 if (this->head_
!= 0)
2489 this->head_
->prev (new_item
);
2492 this->tail_
= new_item
;
2493 sublist_tail
= new_item
;
2495 this->head_
= new_item
;
2496 sublist_head
= new_item
;
2500 // insert the new item into the list
2501 new_item
->next (current_item
->next ());
2502 new_item
->prev (current_item
);
2504 if (current_item
->next ())
2505 current_item
->next ()->prev (new_item
);
2507 this->tail_
= new_item
;
2509 current_item
->next (new_item
);
2511 // If the new item has lowest priority of any in the sublist,
2512 // move the tail pointer of the sublist back to the new item
2513 if (current_item
== sublist_tail
)
2514 sublist_tail
= new_item
;
2520 // Enqueue a message in priority order within a given priority status
2523 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2524 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::dequeue_head_i (ACE_Message_Block
*&first_item
)
2526 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::dequeue_head_i");
2529 int last_in_subqueue
= 0;
2531 // first, try to dequeue from the head of the pending list
2532 if (this->pending_head_
)
2534 first_item
= this->pending_head_
;
2536 if (0 == this->pending_head_
->prev ())
2537 this->head_
= this->pending_head_
->next ();
2539 this->pending_head_
->prev ()->next (this->pending_head_
->next ());
2541 if (0 == this->pending_head_
->next ())
2543 this->tail_
= this->pending_head_
->prev ();
2544 this->pending_head_
= 0;
2545 this->pending_tail_
= 0;
2549 this->pending_head_
->next ()->prev (this->pending_head_
->prev ());
2550 this->pending_head_
= this->pending_head_
->next ();
2553 first_item
->prev (0);
2554 first_item
->next (0);
2557 // Second, try to dequeue from the head of the late list
2558 else if (this->late_head_
)
2560 last_in_subqueue
= this->late_head_
== this->late_tail_
? 1 : 0;
2562 first_item
= this->late_head_
;
2564 if (0 == this->late_head_
->prev ())
2565 this->head_
= this->late_head_
->next ();
2567 this->late_head_
->prev ()->next (this->late_head_
->next ());
2569 if (0 == this->late_head_
->next ())
2570 this->tail_
= this->late_head_
->prev ();
2573 this->late_head_
->next ()->prev (this->late_head_
->prev ());
2574 this->late_head_
= this->late_head_
->next ();
2577 if (last_in_subqueue
)
2579 this->late_head_
= 0;
2580 this->late_tail_
= 0;
2583 first_item
->prev (0);
2584 first_item
->next (0);
2586 // finally, try to dequeue from the head of the beyond late list
2587 else if (this->beyond_late_head_
)
2590 (this->beyond_late_head_
== this->beyond_late_tail_
) ? 1 : 0;
2592 first_item
= this->beyond_late_head_
;
2593 this->head_
= this->beyond_late_head_
->next ();
2595 if (0 == this->beyond_late_head_
->next ())
2597 this->tail_
= this->beyond_late_head_
->prev ();
2601 this->beyond_late_head_
->next ()->prev (this->beyond_late_head_
->prev ());
2602 this->beyond_late_head_
= this->beyond_late_head_
->next ();
2605 if (last_in_subqueue
)
2607 this->beyond_late_head_
= 0;
2608 this->beyond_late_tail_
= 0;
2611 first_item
->prev (0);
2612 first_item
->next (0);
2616 // nothing to dequeue: set the pointer to zero and return an error code
2626 size_t mb_bytes
= 0;
2627 size_t mb_length
= 0;
2628 first_item
->total_size_and_length (mb_bytes
,
2630 // Subtract off all of the bytes associated with this message.
2631 this->cur_bytes_
-= mb_bytes
;
2632 this->cur_length_
-= mb_length
;
2635 // Only signal enqueueing threads if we've fallen below the low
2637 if (this->cur_bytes_
<= this->low_water_mark_
2638 && this->signal_enqueue_waiters () == -1)
2644 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
2648 // Dequeue and return the <ACE_Message_Block *> at the head of the
2649 // logical queue. Attempts first to dequeue from the pending portion
2650 // of the queue, or if that is empty from the late portion, or if that
2651 // is empty from the beyond late portion, or if that is empty just
2652 // sets the passed pointer to zero and returns -1.
2654 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2655 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::refresh_queue (const ACE_Time_Value
¤t_time
)
2659 result
= refresh_pending_queue (current_time
);
2662 result
= refresh_late_queue (current_time
);
2667 // Refresh the queue using the strategy specific priority status
2670 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2671 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::refresh_pending_queue (const ACE_Time_Value
¤t_time
)
2673 ACE_Dynamic_Message_Strategy::Priority_Status current_status
;
2675 // refresh priority status boundaries in the queue
2676 if (this->pending_head_
)
2678 current_status
= message_strategy_
.priority_status (*this->pending_head_
,
2680 switch (current_status
)
2682 case ACE_Dynamic_Message_Strategy::BEYOND_LATE
:
2683 // Make sure the head of the beyond late queue is set (there
2684 // may not have been any beyond late messages previously)
2685 this->beyond_late_head_
= this->head_
;
2687 // Zero out the late queue pointers, and set them only if
2688 // there turn out to be late messages in the pending sublist
2689 this->late_head_
= 0;
2690 this->late_tail_
= 0;
2692 // Advance through the beyond late messages in the pending queue
2695 this->pending_head_
= this->pending_head_
->next ();
2697 if (this->pending_head_
)
2698 current_status
= message_strategy_
.priority_status (*this->pending_head_
,
2704 while (current_status
== ACE_Dynamic_Message_Strategy::BEYOND_LATE
);
2706 if (this->pending_head_
)
2708 // point tail of beyond late sublist to previous item
2709 this->beyond_late_tail_
= this->pending_head_
->prev ();
2711 if (current_status
== ACE_Dynamic_Message_Strategy::PENDING
)
2712 // there are no late messages left in the queue
2714 else if (current_status
!= ACE_Dynamic_Message_Strategy::LATE
)
2716 // if we got here, something is *seriously* wrong with the queue
2717 ACELIB_ERROR_RETURN ((LM_ERROR
,
2718 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
2719 (int) current_status
),
2725 // There are no pending or late messages left in the
2727 this->beyond_late_tail_
= this->tail_
;
2728 this->pending_head_
= 0;
2729 this->pending_tail_
= 0;
2734 case ACE_Dynamic_Message_Strategy::LATE
:
2735 // Make sure the head of the late queue is set (there may
2736 // not have been any late messages previously, or they may
2737 // have all become beyond late).
2738 if (this->late_head_
== 0)
2739 this->late_head_
= this->pending_head_
;
2741 // advance through the beyond late messages in the pending queue
2744 this->pending_head_
= this->pending_head_
->next ();
2746 if (this->pending_head_
)
2747 current_status
= message_strategy_
.priority_status (*this->pending_head_
,
2753 while (current_status
== ACE_Dynamic_Message_Strategy::LATE
);
2755 if (this->pending_head_
)
2757 if (current_status
!= ACE_Dynamic_Message_Strategy::PENDING
)
2758 // if we got here, something is *seriously* wrong with the queue
2759 ACELIB_ERROR_RETURN((LM_ERROR
,
2760 ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
2761 (int) current_status
),
2764 // Point tail of late sublist to previous item
2765 this->late_tail_
= this->pending_head_
->prev ();
2769 // there are no pending messages left in the queue
2770 this->late_tail_
= this->tail_
;
2771 this->pending_head_
= 0;
2772 this->pending_tail_
= 0;
2776 case ACE_Dynamic_Message_Strategy::PENDING
:
2777 // do nothing - the pending queue is unchanged
2780 // if we got here, something is *seriously* wrong with the queue
2781 ACELIB_ERROR_RETURN((LM_ERROR
,
2782 ACE_TEXT ("Unknown message priority status [%d]"),
2783 (int) current_status
),
2790 // Refresh the pending queue using the strategy specific priority
2793 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2794 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::refresh_late_queue (const ACE_Time_Value
¤t_time
)
2796 ACE_Dynamic_Message_Strategy::Priority_Status current_status
;
2798 if (this->late_head_
)
2800 current_status
= message_strategy_
.priority_status (*this->late_head_
,
2802 switch (current_status
)
2804 case ACE_Dynamic_Message_Strategy::BEYOND_LATE
:
2806 // make sure the head of the beyond late queue is set
2807 // (there may not have been any beyond late messages previously)
2808 this->beyond_late_head_
= this->head_
;
2810 // advance through the beyond late messages in the late queue
2813 this->late_head_
= this->late_head_
->next ();
2815 if (this->late_head_
)
2816 current_status
= message_strategy_
.priority_status (*this->late_head_
,
2822 while (current_status
== ACE_Dynamic_Message_Strategy::BEYOND_LATE
);
2824 if (this->late_head_
)
2826 // point tail of beyond late sublist to previous item
2827 this->beyond_late_tail_
= this->late_head_
->prev ();
2829 if (current_status
== ACE_Dynamic_Message_Strategy::PENDING
)
2831 // there are no late messages left in the queue
2832 this->late_head_
= 0;
2833 this->late_tail_
= 0;
2835 else if (current_status
!= ACE_Dynamic_Message_Strategy::LATE
)
2836 // if we got here, something is *seriously* wrong with the queue
2837 ACELIB_ERROR_RETURN ((LM_ERROR
,
2838 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
2839 (int) current_status
),
2844 // there are no late messages left in the queue
2845 this->beyond_late_tail_
= this->tail_
;
2846 this->late_head_
= 0;
2847 this->late_tail_
= 0;
2852 case ACE_Dynamic_Message_Strategy::LATE
:
2853 // do nothing - the late queue is unchanged
2856 case ACE_Dynamic_Message_Strategy::PENDING
:
2857 // if we got here, something is *seriously* wrong with the queue
2858 ACELIB_ERROR_RETURN ((LM_ERROR
,
2859 ACE_TEXT ("Unexpected message priority status ")
2860 ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
2861 (int) current_status
),
2864 // if we got here, something is *seriously* wrong with the queue
2865 ACELIB_ERROR_RETURN ((LM_ERROR
,
2866 ACE_TEXT ("Unknown message priority status [%d]"),
2867 (int) current_status
),
2875 // Refresh the late queue using the strategy specific priority status
2878 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2879 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::peek_dequeue_head (ACE_Message_Block
*&first_item
,
2880 ACE_Time_Value
*timeout
)
2882 return ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::peek_dequeue_head (first_item
,
2886 // Private method to hide public base class method: just calls base
2889 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2890 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_tail (ACE_Message_Block
*new_item
,
2891 ACE_Time_Value
*timeout
)
2893 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_tail");
2894 return this->enqueue_prio (new_item
, timeout
);
2897 // Just call priority enqueue method: tail enqueue semantics for
2898 // dynamic message queues are unstable: the message may or may not be
2899 // where it was placed after the queue is refreshed prior to the next
2900 // enqueue or dequeue operation.
2902 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
2903 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>::enqueue_head (ACE_Message_Block
*new_item
,
2904 ACE_Time_Value
*timeout
)
2906 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>::enqueue_head");
2907 return this->enqueue_prio (new_item
, timeout
);
2910 // Just call priority enqueue method: head enqueue semantics for
2911 // dynamic message queues are unstable: the message may or may not be
2912 // where it was placed after the queue is refreshed prior to the next
2913 // enqueue or dequeue operation.
2915 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
2916 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> *
2917 ACE_Message_Queue_Factory
<ACE_SYNCH_USE
, TIME_POLICY
>::create_static_message_queue (size_t hwm
,
2919 ACE_Notification_Strategy
*ns
)
2921 typedef ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> QUEUE_TYPE
;
2922 QUEUE_TYPE
*tmp
= 0;
2924 ACE_NEW_RETURN (tmp
,
2925 QUEUE_TYPE (hwm
, lwm
, ns
),
2930 // Factory method for a statically prioritized ACE_Message_Queue.
2932 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
2933 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> *
2934 ACE_Message_Queue_Factory
<ACE_SYNCH_USE
, TIME_POLICY
>::create_deadline_message_queue (size_t hwm
,
2936 ACE_Notification_Strategy
*ns
,
2937 u_long static_bit_field_mask
,
2938 u_long static_bit_field_shift
,
2939 u_long dynamic_priority_max
,
2940 u_long dynamic_priority_offset
)
2942 ACE_Deadline_Message_Strategy
*adms
= 0;
2944 ACE_NEW_RETURN (adms
,
2945 ACE_Deadline_Message_Strategy (static_bit_field_mask
,
2946 static_bit_field_shift
,
2947 dynamic_priority_max
,
2948 dynamic_priority_offset
),
2951 typedef ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> QUEUE_TYPE
;
2952 QUEUE_TYPE
*tmp
= 0;
2953 ACE_NEW_RETURN (tmp
,
2954 QUEUE_TYPE (*adms
, hwm
, lwm
, ns
),
2959 // Factory method for a dynamically prioritized (by time to deadline)
2960 // ACE_Dynamic_Message_Queue.
2962 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
2963 ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> *
2964 ACE_Message_Queue_Factory
<ACE_SYNCH_USE
, TIME_POLICY
>::create_laxity_message_queue (size_t hwm
,
2966 ACE_Notification_Strategy
*ns
,
2967 u_long static_bit_field_mask
,
2968 u_long static_bit_field_shift
,
2969 u_long dynamic_priority_max
,
2970 u_long dynamic_priority_offset
)
2972 ACE_Laxity_Message_Strategy
*alms
= 0;
2974 ACE_NEW_RETURN (alms
,
2975 ACE_Laxity_Message_Strategy (static_bit_field_mask
,
2976 static_bit_field_shift
,
2977 dynamic_priority_max
,
2978 dynamic_priority_offset
),
2981 typedef ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> QUEUE_TYPE
;
2982 QUEUE_TYPE
*tmp
= 0;
2983 ACE_NEW_RETURN (tmp
,
2984 QUEUE_TYPE (*alms
, hwm
, lwm
, ns
),
2989 // Factory method for a dynamically prioritized (by laxity)
2990 // <ACE_Dynamic_Message_Queue>.
2992 #if defined (ACE_VXWORKS)
2993 // factory method for a wrapped VxWorks message queue
2995 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
2996 ACE_Message_Queue_Vx
*
2997 ACE_Message_Queue_Factory
<ACE_SYNCH_USE
, TIME_POLICY
>::create_Vx_message_queue (size_t max_messages
,
2998 size_t max_message_length
,
2999 ACE_Notification_Strategy
*ns
)
3001 ACE_Message_Queue_Vx
*tmp
= 0;
3003 ACE_NEW_RETURN (tmp
,
3004 ACE_Message_Queue_Vx (max_messages
, max_message_length
, ns
),
3008 #endif /* defined (ACE_VXWORKS) */
3010 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
3012 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
3013 ACE_Message_Queue_NT
*
3014 ACE_Message_Queue_Factory
<ACE_SYNCH_USE
, TIME_POLICY
>::create_NT_message_queue (size_t max_threads
)
3016 ACE_Message_Queue_NT
*tmp
= 0;
3018 ACE_NEW_RETURN (tmp
,
3019 ACE_Message_Queue_NT (max_threads
),
3024 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
3026 ACE_END_VERSIONED_NAMESPACE_DECL
3028 #endif /* !ACE_MESSAGE_QUEUE_T_CPP */