3 //=============================================================================
5 * @file Message_Queue_T.h
7 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
9 //=============================================================================
11 #ifndef ACE_MESSAGE_QUEUE_T_H
12 #define ACE_MESSAGE_QUEUE_T_H
14 #include /**/ "ace/pre.h"
16 #include "ace/Message_Queue.h"
17 #include "ace/Dynamic_Message_Strategy.h"
18 #include "ace/Synch_Traits.h"
19 #include "ace/Guard_T.h"
20 #include "ace/Time_Policy.h"
21 #include "ace/Time_Value_T.h"
22 #if defined (ACE_HAS_THREADS)
23 # include "ace/Condition_Attributes.h"
26 #if !defined (ACE_LACKS_PRAGMA_ONCE)
28 #endif /* ACE_LACKS_PRAGMA_ONCE */
30 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
32 #if defined (ACE_VXWORKS)
33 class ACE_Message_Queue_Vx
;
34 #endif /* defined (ACE_VXWORKS) */
36 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
37 class ACE_Message_Queue_NT
;
38 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO*/
40 #if defined (ACE_HAS_MONITOR_POINTS) && ACE_HAS_MONITOR_POINTS == 1
43 namespace Monitor_Control
48 #endif /* ACE_HAS_MONITOR_POINTS==1 */
51 * @class ACE_Message_Queue
53 * @brief A message queuing facility with parameterized synchronization
54 * capability. ACE_Message_Queue is modeled after the queueing facilities
55 * in System V STREAMs.
57 * ACE_Message_Queue is the primary queuing facility for
58 * messages in the ACE framework. It's one template argument parameterizes
59 * the queue's synchronization. The argument specifies a synchronization
60 * strategy. The two main strategies available for ACE_SYNCH_DECL are:
61 * -# ACE_MT_SYNCH: all operations are thread-safe
62 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead
64 * All data passing through ACE_Message_Queue is in the form of
65 * ACE_Message_Block objects. @sa ACE_Message_Block.
67 template <ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
68 class ACE_Message_Queue
: public ACE_Message_Queue_Base
71 friend class ACE_Message_Queue_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>;
72 friend class ACE_Message_Queue_Reverse_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>;
75 typedef ACE_Message_Queue_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>
77 typedef ACE_Message_Queue_Reverse_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>
81 * @name Initialization methods
85 * Initialize an ACE_Message_Queue.
87 * @param hwm High water mark. Determines how many bytes can be stored in a
88 * queue before it's considered full. Supplier threads must block
89 * until the queue is no longer full.
90 * @param lwm Low water mark. Determines how many bytes must be in the queue
91 * before supplier threads are allowed to enqueue additional
92 * data. By default, the @a hwm equals @a lwm, which means
93 * that suppliers will be able to enqueue new messages as soon as
94 * a consumer removes any message from the queue. Making the low
95 * water mark smaller than the high water mark forces consumers to
96 * drain more messages from the queue before suppliers can enqueue
97 * new messages, which can minimize the "silly window syndrome."
98 * @param ns Notification strategy. Pointer to an object conforming to the
99 * ACE_Notification_Strategy interface. If set, the object's
100 * notify() method will be called each time data is added to
101 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
103 ACE_Message_Queue (size_t hwm
= ACE_Message_Queue_Base::DEFAULT_HWM
,
104 size_t lwm
= ACE_Message_Queue_Base::DEFAULT_LWM
,
105 ACE_Notification_Strategy
*ns
= 0);
106 virtual int open (size_t hwm
= ACE_Message_Queue_Base::DEFAULT_HWM
,
107 size_t lwm
= ACE_Message_Queue_Base::DEFAULT_LWM
,
108 ACE_Notification_Strategy
*ns
= 0);
111 /// Releases all resources from the message queue and marks it deactivated.
114 /// @retval The number of messages released from the queue; -1 on error.
115 virtual int close ();
117 /// Releases all resources from the message queue and marks it deactivated.
118 virtual ~ACE_Message_Queue ();
121 * Releases all resources from the message queue but does not mark it
122 * deactivated. This method holds the queue lock during this operation.
125 * @return The number of messages flushed; -1 on error.
127 virtual int flush ();
130 * Release all resources from the message queue but do not mark it
133 * @pre The caller must be holding the queue lock before calling this
136 * @return The number of messages flushed.
138 virtual int flush_i ();
140 /** @name Enqueue and dequeue methods
142 * The enqueue and dequeue methods accept a timeout value passed as
143 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
144 * the caller will block until action is possible. If the timeout pointer
145 * is non-zero, the call will wait (if needed, subject to water mark
146 * settings) until the absolute time specified in the referenced
147 * ACE_Time_Value object is reached. If the time is reached before the
148 * desired action is possible, the method will return -1 with errno set
149 * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
150 * these methods will also fail and return -1 when the queue is closed,
151 * deactivated, pulsed, or when a signal occurs.
153 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
154 * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are
155 * affected by queue state transitions.
159 * Retrieve a pointer to the first ACE_Message_Block in the queue
160 * without removing it.
162 * @note Because the block whose pointer is returned is still on the queue,
163 * another thread may dequeue the referenced block at any time,
164 * including before the calling thread examines the peeked-at block.
165 * Be very careful with this method in multithreaded queueing
168 * @param first_item Reference to an ACE_Message_Block * that will
169 * point to the first block on the queue. The block
170 * remains on the queue until this or another thread
172 * @param timeout The absolute time the caller will wait until
173 * for a block to be queued.
175 * @retval >0 The number of ACE_Message_Blocks on the queue.
176 * @retval -1 On failure. errno holds the reason. Common errno values are:
177 * - EWOULDBLOCK: the timeout elapsed
178 * - ESHUTDOWN: the queue was deactivated or pulsed
180 virtual int peek_dequeue_head (ACE_Message_Block
*&first_item
,
181 ACE_Time_Value
*timeout
= 0);
184 * Enqueue an ACE_Message_Block into the queue in accordance with
185 * the ACE_Message_Block's priority (0 is lowest priority). FIFO
186 * order is maintained when messages of the same priority are
187 * inserted consecutively.
189 * @param new_item Pointer to an ACE_Message_Block that will be
190 * added to the queue. The block's @c msg_priority()
191 * method will be called to obtain the queueing priority.
192 * @param timeout The absolute time the caller will wait until
193 * for the block to be queued.
195 * @retval >0 The number of ACE_Message_Blocks on the queue after adding
196 * the specified block.
197 * @retval -1 On failure. errno holds the reason. Common errno values are:
198 * - EWOULDBLOCK: the timeout elapsed
199 * - ESHUTDOWN: the queue was deactivated or pulsed
201 virtual int enqueue_prio (ACE_Message_Block
*new_item
,
202 ACE_Time_Value
*timeout
= 0);
205 * Enqueue an ACE_Message_Block into the queue in accordance with the
206 * block's deadline time. FIFO order is maintained when messages of
207 * the same deadline time are inserted consecutively.
209 * @param new_item Pointer to an ACE_Message_Block that will be
210 * added to the queue. The block's @c msg_deadline_time()
211 * method will be called to obtain the relative queueing
213 * @param timeout The absolute time the caller will wait until
214 * for the block to be queued.
216 * @retval >0 The number of ACE_Message_Blocks on the queue after adding
217 * the specified block.
218 * @retval -1 On failure. errno holds the reason. Common errno values are:
219 * - EWOULDBLOCK: the timeout elapsed
220 * - ESHUTDOWN: the queue was deactivated or pulsed
222 virtual int enqueue_deadline (ACE_Message_Block
*new_item
,
223 ACE_Time_Value
*timeout
= 0);
226 * @deprecated This is an alias for enqueue_prio(). It's only here for
227 * backwards compatibility and will go away in a subsequent release.
228 * Please use enqueue_prio() instead.
230 virtual int enqueue (ACE_Message_Block
*new_item
,
231 ACE_Time_Value
*timeout
= 0);
234 * Enqueue one or more ACE_Message_Block objects at the tail of the queue.
235 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
236 * start of a series of ACE_Message_Block objects connected via their
237 * @c next() pointers. The series of blocks will be added to the queue in
238 * the same order they are passed in as.
240 * @param new_item Pointer to an ACE_Message_Block that will be
241 * added to the queue. If the block's @c next() pointer
242 * is non-zero, all blocks chained from the @c next()
243 * pointer are enqueued as well.
244 * @param timeout The absolute time the caller will wait until
245 * for the block to be queued.
247 * @retval >0 The number of ACE_Message_Blocks on the queue after adding
248 * the specified block(s).
249 * @retval -1 On failure. errno holds the reason. Common errno values are:
250 * - EWOULDBLOCK: the timeout elapsed
251 * - ESHUTDOWN: the queue was deactivated or pulsed
253 virtual int enqueue_tail (ACE_Message_Block
*new_item
,
254 ACE_Time_Value
*timeout
= 0);
257 * Enqueue one or more ACE_Message_Block objects at the head of the queue.
258 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
259 * start of a series of ACE_Message_Block objects connected via their
260 * @c next() pointers. The series of blocks will be added to the queue in
261 * the same order they are passed in as.
263 * @param new_item Pointer to an ACE_Message_Block that will be
264 * added to the queue. If the block's @c next() pointer
265 * is non-zero, all blocks chained from the @c next()
266 * pointer are enqueued as well.
267 * @param timeout The absolute time the caller will wait until
268 * for the block to be queued.
270 * @retval >0 The number of ACE_Message_Blocks on the queue after adding
271 * the specified block(s).
272 * @retval -1 On failure. errno holds the reason. Common errno values are:
273 * - EWOULDBLOCK: the timeout elapsed
274 * - ESHUTDOWN: the queue was deactivated or pulsed
276 virtual int enqueue_head (ACE_Message_Block
*new_item
,
277 ACE_Time_Value
*timeout
= 0);
279 /// This method is an alias for the dequeue_head() method.
280 virtual int dequeue (ACE_Message_Block
*&first_item
,
281 ACE_Time_Value
*timeout
= 0);
284 * Dequeue the ACE_Message_Block at the head of the queue and return
285 * a pointer to the dequeued block.
287 * @param first_item Reference to an ACE_Message_Block * that will
288 * be set to the address of the dequeued block.
289 * @param timeout The absolute time the caller will wait until
290 * for a block to be dequeued.
292 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
293 * @retval -1 On failure. errno holds the reason. Common errno values are:
294 * - EWOULDBLOCK: the timeout elapsed
295 * - ESHUTDOWN: the queue was deactivated or pulsed
297 virtual int dequeue_head (ACE_Message_Block
*&first_item
,
298 ACE_Time_Value
*timeout
= 0);
301 * Dequeue the ACE_Message_Block that has the lowest priority (preserves
302 * FIFO order for messages with the same priority) and return a pointer
303 * to the dequeued block.
305 * @param first_item Reference to an ACE_Message_Block * that will
306 * be set to the address of the dequeued block.
307 * @param timeout The absolute time the caller will wait until
308 * for a block to be dequeued.
310 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
311 * @retval -1 On failure. errno holds the reason. Common errno values are:
312 * - EWOULDBLOCK: the timeout elapsed
313 * - ESHUTDOWN: the queue was deactivated or pulsed
315 virtual int dequeue_prio (ACE_Message_Block
*&first_item
,
316 ACE_Time_Value
*timeout
= 0);
319 * Dequeue the ACE_Message_Block at the tail of the queue and return
320 * a pointer to the dequeued block.
322 * @param dequeued Reference to an ACE_Message_Block * that will
323 * be set to the address of the dequeued block.
324 * @param timeout The absolute time the caller will wait until
325 * for a block to be dequeued.
327 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
328 * @retval -1 On failure. errno holds the reason. Common errno values are:
329 * - EWOULDBLOCK: the timeout elapsed
330 * - ESHUTDOWN: the queue was deactivated or pulsed
332 virtual int dequeue_tail (ACE_Message_Block
*&dequeued
,
333 ACE_Time_Value
*timeout
= 0);
336 * Dequeue the ACE_Message_Block with the earliest deadline time and return
337 * a pointer to the dequeued block.
339 * @param dequeued Reference to an ACE_Message_Block * that will
340 * be set to the address of the dequeued block.
341 * @param timeout The absolute time the caller will wait until
342 * for a block to be dequeued.
344 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
345 * @retval -1 On failure. errno holds the reason. Common errno values are:
346 * - EWOULDBLOCK: the timeout elapsed
347 * - ESHUTDOWN: the queue was deactivated or pulsed
349 virtual int dequeue_deadline (ACE_Message_Block
*&dequeued
,
350 ACE_Time_Value
*timeout
= 0);
353 /** @name Queue statistics methods
356 /// True if queue is full, else false.
357 virtual bool is_full ();
358 /// True if queue is empty, else false.
359 virtual bool is_empty ();
362 * Number of total bytes on the queue, i.e., sum of the message
365 virtual size_t message_bytes ();
368 * Number of total length on the queue, i.e., sum of the message
371 virtual size_t message_length ();
374 * Number of total messages on the queue.
376 virtual size_t message_count ();
378 // = Manual changes to these stats (used when queued message blocks
379 // change size or lengths).
381 * New value of the number of total bytes on the queue, i.e., sum of
382 * the message block sizes.
384 virtual void message_bytes (size_t new_size
);
386 * New value of the number of total length on the queue, i.e., sum
387 * of the message block lengths.
389 virtual void message_length (size_t new_length
);
394 /** @name Water mark (flow control) methods
398 * Get high watermark.
400 virtual size_t high_water_mark ();
402 * Set the high watermark, which determines how many bytes can be
403 * stored in a queue before it's considered "full."
405 virtual void high_water_mark (size_t hwm
);
410 virtual size_t low_water_mark ();
412 * Set the low watermark, which determines how many bytes must be in
413 * the queue before supplier threads are allowed to enqueue
414 * additional ACE_Message_Blocks.
416 virtual void low_water_mark (size_t lwm
);
419 /** @name Activation and queue state methods
420 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
421 * queue states and transitions and how the transitions affect message
422 * enqueueing and dequeueing operations.
426 * Deactivate the queue and wakeup all threads waiting on the queue
427 * so they can continue. No messages are removed from the queue,
428 * however. Any other operations called until the queue is
429 * activated again will immediately return -1 with @c errno ==
430 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
431 * call and WAS_ACTIVE if queue was active before the call.
433 virtual int deactivate ();
436 * Reactivate the queue so that threads can enqueue and dequeue
437 * messages again. Returns the state of the queue before the call.
439 virtual int activate ();
442 * Pulse the queue to wake up any waiting threads. Changes the
443 * queue state to PULSED; future enqueue/dequeue operations proceed
444 * as in ACTIVATED state.
446 * @return The queue's state before this call.
448 virtual int pulse ();
450 /// Returns the current state of the queue, which can be one of
451 /// ACTIVATED, DEACTIVATED, or PULSED.
452 virtual int state ();
454 /// Returns true if the state of the queue is <DEACTIVATED>,
455 /// but false if the queue's is <ACTIVATED> or <PULSED>.
456 virtual int deactivated ();
459 /** @name Notification strategy methods
463 * This hook is automatically invoked by <enqueue_head>,
464 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
465 * into the queue. Subclasses can override this method to perform
466 * specific notification strategies (e.g., signaling events for a
467 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a
468 * multi-threaded application with concurrent consumers, there is no
469 * guarantee that the queue will be still be non-empty by the time
470 * the notification occurs.
472 virtual int notify ();
474 /// Get the notification strategy for the <Message_Queue>
475 virtual ACE_Notification_Strategy
*notification_strategy ();
477 /// Set the notification strategy for the <Message_Queue>
478 virtual void notification_strategy (ACE_Notification_Strategy
*s
);
481 /// Returns a reference to the lock used by the ACE_Message_Queue.
482 virtual ACE_SYNCH_MUTEX_T
&lock ();
484 /// Get the current time of day according to the queue's TIME_POLICY.
485 /// Allows users to initialize timeout values using correct time policy.
486 ACE_Time_Value_T
<TIME_POLICY
> gettimeofday () const;
488 /// Allows applications to control how the timer queue gets the time
490 void set_time_policy (TIME_POLICY
const & time_policy
);
492 /// Dump the state of an object.
493 virtual void dump () const;
495 /// Declare the dynamic allocation hooks.
496 ACE_ALLOC_HOOK_DECLARE
;
499 // = Routines that actually do the enqueueing and dequeueing.
501 // These routines assume that locks are held by the corresponding
502 // public methods. Since they are virtual, you can change the
503 // queueing mechanism by subclassing from ACE_Message_Queue.
505 /// Enqueue an <ACE_Message_Block *> in accordance with its priority.
506 virtual int enqueue_i (ACE_Message_Block
*new_item
);
508 /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time.
509 virtual int enqueue_deadline_i (ACE_Message_Block
*new_item
);
511 /// Enqueue an <ACE_Message_Block *> at the end of the queue.
512 virtual int enqueue_tail_i (ACE_Message_Block
*new_item
);
514 /// Enqueue an <ACE_Message_Block *> at the head of the queue.
515 virtual int enqueue_head_i (ACE_Message_Block
*new_item
);
517 /// Dequeue and return the <ACE_Message_Block *> at the head of the
519 virtual int dequeue_head_i (ACE_Message_Block
*&first_item
);
521 /// Dequeue and return the <ACE_Message_Block *> with the lowest
523 virtual int dequeue_prio_i (ACE_Message_Block
*&dequeued
);
525 /// Dequeue and return the <ACE_Message_Block *> at the tail of the
527 virtual int dequeue_tail_i (ACE_Message_Block
*&first_item
);
529 /// Dequeue and return the <ACE_Message_Block *> with the lowest
531 virtual int dequeue_deadline_i (ACE_Message_Block
*&first_item
);
533 // = Check the boundary conditions (assumes locks are held).
535 /// True if queue is full, else false.
536 virtual bool is_full_i ();
538 /// True if queue is empty, else false.
539 virtual bool is_empty_i ();
541 // = Implementation of the public <activate> and <deactivate> methods.
543 // These methods assume locks are held.
546 * Notifies all waiting threads that the queue has been deactivated
547 * so they can wakeup and continue other processing.
548 * No messages are removed from the queue.
550 * @param pulse If 0, the queue's state is changed to DEACTIVATED
551 * and any other operations called until the queue is
552 * reactivated will immediately return -1 with
553 * errno == ESHUTDOWN.
554 * If not zero, only the waiting threads are notified and
555 * the queue's state changes to PULSED.
557 * @return The state of the queue before the call.
559 virtual int deactivate_i (int pulse
= 0);
561 /// Activate the queue.
562 virtual int activate_i ();
564 // = Helper methods to factor out common #ifdef code.
566 /// Wait for the queue to become non-full.
567 virtual int wait_not_full_cond (ACE_Time_Value
*timeout
);
569 /// Wait for the queue to become non-empty.
570 virtual int wait_not_empty_cond (ACE_Time_Value
*timeout
);
572 /// Inform any threads waiting to enqueue that they can procede.
573 virtual int signal_enqueue_waiters ();
575 /// Inform any threads waiting to dequeue that they can procede.
576 virtual int signal_dequeue_waiters ();
578 /// Pointer to head of ACE_Message_Block list.
579 ACE_Message_Block
*head_
;
581 /// Pointer to tail of ACE_Message_Block list.
582 ACE_Message_Block
*tail_
;
584 /// Lowest number before unblocking occurs.
585 size_t low_water_mark_
;
587 /// Greatest number of bytes before blocking.
588 size_t high_water_mark_
;
590 /// Current number of bytes in the queue.
593 /// Current length of messages in the queue.
596 /// Current number of messages in the queue.
599 /// The notification strategy used when a new message is enqueued.
600 ACE_Notification_Strategy
*notification_strategy_
;
602 // = Synchronization primitives for controlling concurrent access.
603 /// Protect queue from concurrent access.
604 ACE_SYNCH_MUTEX_T lock_
;
606 #if defined (ACE_HAS_THREADS)
607 /// Attributes to initialize conditions with.
608 /* We only need this because some crappy compilers can't
609 properly handle initializing the conditions with
610 temporary objects. */
611 ACE_Condition_Attributes_T
<TIME_POLICY
> cond_attr_
;
614 /// Used to make threads sleep until the queue is no longer empty.
615 ACE_SYNCH_CONDITION_T not_empty_cond_
;
617 /// Used to make threads sleep until the queue is no longer full.
618 ACE_SYNCH_CONDITION_T not_full_cond_
;
620 /// The policy to return the current time of day
621 TIME_POLICY time_policy_
;
623 /// Sends the size of the queue whenever it changes.
624 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
625 ACE::Monitor_Control::Size_Monitor
*monitor_
;
629 // = Disallow these operations.
630 void operator= (const ACE_Message_Queue
<ACE_SYNCH_USE
> &) = delete;
631 ACE_Message_Queue (const ACE_Message_Queue
<ACE_SYNCH_USE
> &) = delete;
634 // This typedef is used to get around a compiler bug in g++/vxworks.
635 typedef ACE_Message_Queue
<ACE_SYNCH
> ACE_DEFAULT_MESSAGE_QUEUE_TYPE
;
639 * @class ACE_Message_Queue_Iterator
641 * @brief Iterator for the ACE_Message_Queue.
643 template <ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
644 class ACE_Message_Queue_Iterator
647 ACE_Message_Queue_Iterator (ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &queue
);
649 // = Iteration methods.
650 /// Pass back the @a entry that hasn't been seen in the queue.
651 /// Returns 0 when all items have been seen, else 1.
652 int next (ACE_Message_Block
*&entry
);
654 /// Returns 1 when all items have been seen, else 0.
657 /// Move forward by one element in the queue. Returns 0 when all the
658 /// items in the set have been seen, else 1.
661 /// Dump the state of an object.
664 /// Declare the dynamic allocation hooks.
665 ACE_ALLOC_HOOK_DECLARE
;
668 /// Message_Queue we are iterating over.
669 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &queue_
;
672 /// Keeps track of how far we've advanced...
673 ACE_Message_Block
*curr_
;
677 * @class ACE_Message_Queue_Reverse_Iterator
679 * @brief Reverse Iterator for the ACE_Message_Queue.
681 template <ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
682 class ACE_Message_Queue_Reverse_Iterator
685 ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &queue
);
687 // = Iteration methods.
688 /// Pass back the @a entry that hasn't been seen in the queue.
689 /// Returns 0 when all items have been seen, else 1.
690 int next (ACE_Message_Block
*&entry
);
692 /// Returns 1 when all items have been seen, else 0.
695 /// Move forward by one element in the queue. Returns 0 when all the
696 /// items in the set have been seen, else 1.
699 /// Dump the state of an object.
702 /// Declare the dynamic allocation hooks.
703 ACE_ALLOC_HOOK_DECLARE
;
706 /// Message_Queue we are iterating over.
707 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &queue_
;
710 /// Keeps track of how far we've advanced...
711 ACE_Message_Block
*curr_
;
715 * @class ACE_Dynamic_Message_Queue
717 * @brief A derived class which adapts the ACE_Message_Queue
718 * class in order to maintain dynamic priorities for enqueued
719 * <ACE_Message_Blocks> and manage the queue order according
720 * to these dynamic priorities.
722 * The messages in the queue are managed so as to preserve
723 * a logical ordering with minimal overhead per enqueue and
724 * dequeue operation. For this reason, the actual order of
725 * messages in the linked list of the queue may differ from
726 * their priority order. As time passes, a message may change
727 * from pending status to late status, and eventually to beyond
728 * late status. To minimize reordering overhead under this
729 * design force, three separate boundaries are maintained
730 * within the linked list of messages. Messages are dequeued
731 * preferentially from the head of the pending portion, then
732 * the head of the late portion, and finally from the head
733 * of the beyond late portion. In this way, only the boundaries
734 * need to be maintained (which can be done efficiently, as
735 * aging messages maintain the same linked list order as they
736 * progress from one status to the next), with no reordering
737 * of the messages themselves, while providing correct priority
738 * ordered dequeueing semantics.
739 * Head and tail enqueue methods inherited from ACE_Message_Queue
740 * are made private to prevent out-of-order messages from confusing
741 * management of the various portions of the queue. Messages in
742 * the pending portion of the queue whose priority becomes late
743 * (according to the specific dynamic strategy) advance into
744 * the late portion of the queue. Messages in the late portion
745 * of the queue whose priority becomes later than can be represented
746 * advance to the beyond_late portion of the queue. These behaviors
747 * support a limited schedule overrun, with pending messages prioritized
748 * ahead of late messages, and late messages ahead of beyond late
749 * messages. These behaviors can be modified in derived classes by
750 * providing alternative definitions for the appropriate virtual methods.
751 * When filled with messages, the queue's linked list should look like:
754 * B - B - B - B - L - L - L - P - P - P - P - P
757 * Where the symbols are as follows:
758 * H = Head of the entire list
759 * T = Tail of the entire list
760 * B = Beyond late message
761 * BH = Beyond late messages Head
762 * BT = Beyond late messages Tail
764 * LH = Late messages Head
765 * LT = Late messages Tail
766 * P = Pending message
767 * PH = Pending messages Head
768 * PT = Pending messages Tail
769 * Caveat: the virtual methods enqueue_tail, enqueue_head,
770 * and peek_dequeue_head have semantics for the static
771 * message queues that cannot be guaranteed for dynamic
772 * message queues. The peek_dequeue_head method just
773 * calls the base class method, while the two enqueue
774 * methods call the priority enqueue method. The
775 * order of messages in the dynamic queue is a function
776 * of message deadlines and how long they are in the
777 * queues. You can manipulate these in some cases to
778 * ensure the correct semantics, but that is not a
779 * very stable or portable approach (discouraged).
781 template <ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
782 class ACE_Dynamic_Message_Queue
: public ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
>
785 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy
& message_strategy
,
786 size_t hwm
= ACE_Message_Queue_Base::DEFAULT_HWM
,
787 size_t lwm
= ACE_Message_Queue_Base::DEFAULT_LWM
,
788 ACE_Notification_Strategy
* = 0);
790 /// Close down the message queue and release all resources.
791 virtual ~ACE_Dynamic_Message_Queue ();
794 * Detach all messages with status given in the passed flags from
795 * the queue and return them by setting passed head and tail pointers
796 * to the linked list they comprise. This method is intended primarily
797 * as a means of periodically harvesting messages that have missed
798 * their deadlines, but is available in its most general form. All
799 * messages are returned in priority order, from head to tail, as of
800 * the time this method was called.
802 virtual int remove_messages (ACE_Message_Block
*&list_head
,
803 ACE_Message_Block
*&list_tail
,
807 * Dequeue and return the <ACE_Message_Block *> at the head of the
808 * queue. Returns -1 on failure, else the number of items still on
811 virtual int dequeue_head (ACE_Message_Block
*&first_item
,
812 ACE_Time_Value
*timeout
= 0);
814 /// Dump the state of the queue.
815 virtual void dump () const;
818 * Just call priority enqueue method: tail enqueue semantics for dynamic
819 * message queues are unstable: the message may or may not be where
820 * it was placed after the queue is refreshed prior to the next
821 * enqueue or dequeue operation.
823 virtual int enqueue_tail (ACE_Message_Block
*new_item
,
824 ACE_Time_Value
*timeout
= 0);
827 * Just call priority enqueue method: head enqueue semantics for dynamic
828 * message queues are unstable: the message may or may not be where
829 * it was placed after the queue is refreshed prior to the next
830 * enqueue or dequeue operation.
832 virtual int enqueue_head (ACE_Message_Block
*new_item
,
833 ACE_Time_Value
*timeout
= 0);
836 /// Declare the dynamic allocation hooks.
837 ACE_ALLOC_HOOK_DECLARE
;
841 * Enqueue an <ACE_Message_Block *> in accordance with its priority.
842 * priority may be *dynamic* or *static* or a combination or *both*
843 * It calls the priority evaluation function passed into the Dynamic
844 * Message Queue constructor to update the priorities of all
847 virtual int enqueue_i (ACE_Message_Block
*new_item
);
849 /// Enqueue a message in priority order within a given priority status sublist
850 virtual int sublist_enqueue_i (ACE_Message_Block
*new_item
,
851 const ACE_Time_Value
¤t_time
,
852 ACE_Message_Block
*&sublist_head
,
853 ACE_Message_Block
*&sublist_tail
,
854 ACE_Dynamic_Message_Strategy::Priority_Status status
);
857 * Dequeue and return the <ACE_Message_Block *> at the head of the
858 * logical queue. Attempts first to dequeue from the pending
859 * portion of the queue, or if that is empty from the late portion,
860 * or if that is empty from the beyond late portion, or if that is
861 * empty just sets the passed pointer to zero and returns -1.
863 virtual int dequeue_head_i (ACE_Message_Block
*&first_item
);
865 /// Refresh the queue using the strategy
866 /// specific priority status function.
867 virtual int refresh_queue (const ACE_Time_Value
& current_time
);
869 /// Refresh the pending queue using the strategy
870 /// specific priority status function.
871 virtual int refresh_pending_queue (const ACE_Time_Value
& current_time
);
873 /// Refresh the late queue using the strategy
874 /// specific priority status function.
875 virtual int refresh_late_queue (const ACE_Time_Value
& current_time
);
877 /// Pointer to head of the pending messages
878 ACE_Message_Block
*pending_head_
;
880 /// Pointer to tail of the pending messages
881 ACE_Message_Block
*pending_tail_
;
883 /// Pointer to head of the late messages
884 ACE_Message_Block
*late_head_
;
886 /// Pointer to tail of the late messages
887 ACE_Message_Block
*late_tail_
;
889 /// Pointer to head of the beyond late messages
890 ACE_Message_Block
*beyond_late_head_
;
892 /// Pointer to tail of the beyond late messages
893 ACE_Message_Block
*beyond_late_tail_
;
895 /// Pointer to a dynamic priority evaluation function.
896 ACE_Dynamic_Message_Strategy
&message_strategy_
;
899 // = Disallow public access to these operations.
901 void operator= (const ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &) = delete;
902 ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &) = delete;
904 // provide definitions for these (just call base class method),
905 // but make them private so they're not accessible outside the class
907 /// Private method to hide public base class method: just calls base class method
908 virtual int peek_dequeue_head (ACE_Message_Block
*&first_item
,
909 ACE_Time_Value
*timeout
= 0);
913 * @class ACE_Message_Queue_Factory
915 * @brief ACE_Message_Queue_Factory is a static factory class template which
916 * provides a separate factory method for each of the major kinds of
917 * priority based message dispatching: static, earliest deadline first
918 * (EDF), and minimum laxity first (MLF).
920 * The ACE_Dynamic_Message_Queue class assumes responsibility for
921 * releasing the resources of the strategy with which it was
922 * constructed: the user of a message queue constructed by
923 * any of these factory methods is only responsible for
924 * ensuring destruction of the message queue itself.
926 template <ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
927 class ACE_Message_Queue_Factory
930 /// Factory method for a statically prioritized ACE_Message_Queue
931 static ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> *
932 create_static_message_queue (size_t hwm
= ACE_Message_Queue_Base::DEFAULT_HWM
,
933 size_t lwm
= ACE_Message_Queue_Base::DEFAULT_LWM
,
934 ACE_Notification_Strategy
* = 0);
936 /// Factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
937 static ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> *
938 create_deadline_message_queue (size_t hwm
= ACE_Message_Queue_Base::DEFAULT_HWM
,
939 size_t lwm
= ACE_Message_Queue_Base::DEFAULT_LWM
,
940 ACE_Notification_Strategy
* = 0,
941 u_long static_bit_field_mask
= 0x3FFUL
, // 2^(10) - 1
942 u_long static_bit_field_shift
= 10, // 10 low order bits
943 u_long dynamic_priority_max
= 0x3FFFFFUL
, // 2^(22)-1
944 u_long dynamic_priority_offset
= 0x200000UL
); // 2^(22-1)
946 /// Factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
947 static ACE_Dynamic_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> *
948 create_laxity_message_queue (size_t hwm
= ACE_Message_Queue_Base::DEFAULT_HWM
,
949 size_t lwm
= ACE_Message_Queue_Base::DEFAULT_LWM
,
950 ACE_Notification_Strategy
* = 0,
951 u_long static_bit_field_mask
= 0x3FFUL
, // 2^(10) - 1
952 u_long static_bit_field_shift
= 10, // 10 low order bits
953 u_long dynamic_priority_max
= 0x3FFFFFUL
, // 2^(22)-1
954 u_long dynamic_priority_offset
= 0x200000UL
); // 2^(22-1)
957 #if defined (ACE_VXWORKS)
959 /// Factory method for a wrapped VxWorks message queue
960 static ACE_Message_Queue_Vx
*
961 create_Vx_message_queue (size_t max_messages
, size_t max_message_length
,
962 ACE_Notification_Strategy
*ns
= 0);
964 #endif /* defined (ACE_VXWORKS) */
966 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
968 /// Factory method for a NT message queue.
969 static ACE_Message_Queue_NT
*
970 create_NT_message_queue (size_t max_threads
);
972 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
976 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> class ACE_Message_Queue_Ex_Iterator
;
977 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
> class ACE_Message_Queue_Ex_Reverse_Iterator
;
980 * @class ACE_Message_Queue_Ex
982 * @brief A threaded message queueing facility, modeled after the
983 * queueing facilities in System V STREAMs.
985 * ACE_Message_Queue_Ex is a strongly-typed version of the
986 * ACE_Message_Queue class. Rather than queueing in terms of ACE_Message_Block
987 * objects, ACE_Message_Queue_Ex has a template argument to specify the
988 * type of objects that are queued.
990 * The second template argument parameterizes the queue's synchronization.
991 * The argument specifies a synchronization strategy. The two main
992 * strategies available for ACE_SYNCH_DECL are:
993 * -# ACE_MT_SYNCH: all operations are thread-safe
994 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead
996 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
997 class ACE_Message_Queue_Ex
1002 /// Default priority value. This is the lowest priority.
1003 DEFAULT_PRIORITY
= 0
1006 friend class ACE_Message_Queue_Ex_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>;
1007 friend class ACE_Message_Queue_Ex_Reverse_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>;
1010 typedef ACE_Message_Queue_Ex_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>
1012 typedef ACE_Message_Queue_Ex_Reverse_Iterator
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>
1016 * @name Initialization methods
1020 * Initialize an ACE_Message_Queue_Ex.
1022 * @param high_water_mark High water mark. Determines how many bytes can be
1023 * stored in a queue before it's considered full. Supplier threads
1024 * must block until the queue is no longer full.
1025 * @param low_water_mark Low water mark. Determines how many bytes must be in
1026 * the queue before supplier threads are allowed to enqueue additional
1027 * data. By default, the @a hwm equals @a lwm, which means
1028 * that suppliers will be able to enqueue new messages as soon as
1029 * a consumer removes any message from the queue. Making the low
1030 * water mark smaller than the high water mark forces consumers to
1031 * drain more messages from the queue before suppliers can enqueue
1032 * new messages, which can minimize the "silly window syndrome."
1033 * @param ns Notification strategy. Pointer to an object conforming to the
1034 * ACE_Notification_Strategy interface. If set, the object's
1035 * notify() method will be called each time data is added to
1036 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
1038 ACE_Message_Queue_Ex (size_t high_water_mark
= ACE_Message_Queue_Base::DEFAULT_HWM
,
1039 size_t low_water_mark
= ACE_Message_Queue_Base::DEFAULT_LWM
,
1040 ACE_Notification_Strategy
* ns
= 0);
1041 virtual int open (size_t hwm
= ACE_Message_Queue_Base::DEFAULT_HWM
,
1042 size_t lwm
= ACE_Message_Queue_Base::DEFAULT_LWM
,
1043 ACE_Notification_Strategy
* = 0);
1046 /// Releases all resources from the message queue and marks it deactivated.
1049 /// @retval The number of messages released from the queue; -1 on error.
1050 virtual int close ();
1052 /// Releases all resources from the message queue and marks it deactivated.
1053 virtual ~ACE_Message_Queue_Ex ();
1056 * Releases all resources from the message queue but does not mark it
1057 * deactivated. This method holds the queue lock during this operation.
1060 * @return The number of messages flushed; -1 on error.
1062 virtual int flush ();
1065 * Release all resources from the message queue but do not mark it
1068 * @pre The caller must be holding the queue lock before calling this
1071 * @return The number of messages flushed.
1073 virtual int flush_i ();
1075 /** @name Enqueue and dequeue methods
1077 * The enqueue and dequeue methods accept a timeout value passed as
1078 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
1079 * the caller will block until action is possible. If the timeout pointer
1080 * is non-zero, the call will wait (if needed, subject to water mark
1081 * settings) until the absolute time specified in the referenced
1082 * ACE_Time_Value object is reached. If the time is reached before the
1083 * desired action is possible, the method will return -1 with errno set
1084 * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
1085 * these methods will also fail and return -1 when the queue is closed,
1086 * deactivated, pulsed, or when a signal occurs.
1088 * The time parameters are handled the same as in ACE_Message_Queue, so
1089 * you can see C++NPv2 Section 6.2 and APG Section 12.3 for a fuller
1090 * treatment of ACE_Message_Queue, enqueueing, dequeueing, and how these
1091 * operations are affected by queue state transitions.
1095 * Retrieve a pointer to the first item in the queue without removing it.
1097 * @note Because the item whose pointer is returned is still on the queue,
1098 * another thread may dequeue that item at any time,
1099 * including before the calling thread examines the peeked-at item.
1100 * Be very careful with this method in multithreaded queueing
1103 * @param first_item Reference to an ACE_MESSAGE_TYPE * that will
1104 * point to the first item on the queue. The item
1105 * remains on the queue until this or another thread
1107 * @param timeout The absolute time the caller will wait until
1108 * for an item to be queued.
1110 * @retval >0 The number of items on the queue.
1111 * @retval -1 On failure. errno holds the reason. Common errno values are:
1112 * - EWOULDBLOCK: the timeout elapsed
1113 * - ESHUTDOWN: the queue was deactivated or pulsed
1115 virtual int peek_dequeue_head (ACE_MESSAGE_TYPE
*&first_item
,
1116 ACE_Time_Value
*timeout
= 0);
1119 * Enqueue an ACE_MESSAGE TYPE into the queue in accordance with
1120 * the specified priority (0 is lowest priority). FIFO
1121 * order is maintained when items of the same priority are
1122 * inserted consecutively.
1124 * @param new_item Pointer to an item that will be added to the queue.
1125 * @param timeout The absolute time the caller will wait until
1126 * for the block to be queued.
1127 * @param priority The priority to use when enqueueing the item.
1129 * @retval >0 The number of items on the queue after adding
1130 * the specified item.
1131 * @retval -1 On failure. errno holds the reason. Common errno values are:
1132 * - EWOULDBLOCK: the timeout elapsed
1133 * - ESHUTDOWN: the queue was deactivated or pulsed
1135 virtual int enqueue_prio (ACE_MESSAGE_TYPE
*new_item
,
1136 ACE_Time_Value
*timeout
= 0,
1137 unsigned long priority
= DEFAULT_PRIORITY
);
1140 * This method acts just like enqueue_tail(). There's no deadline
1141 * time associated with items.
1143 virtual int enqueue_deadline (ACE_MESSAGE_TYPE
*new_item
,
1144 ACE_Time_Value
*timeout
= 0);
1147 * @deprecated This is an alias for enqueue_prio(). It's only here for
1148 * backwards compatibility and will go away in a subsequent release.
1149 * Please use enqueue_prio() instead.
1151 virtual int enqueue (ACE_MESSAGE_TYPE
*new_item
,
1152 ACE_Time_Value
*timeout
= 0);
1155 * Enqueue an item at the tail of the queue.
1157 * @param new_item Pointer to an item that will be added to the queue.
1158 * @param timeout The absolute time the caller will wait until
1159 * for the item to be queued.
1161 * @retval >0 The number of items on the queue after adding
1162 * the specified item.
1163 * @retval -1 On failure. errno holds the reason. Common errno values are:
1164 * - EWOULDBLOCK: the timeout elapsed
1165 * - ESHUTDOWN: the queue was deactivated or pulsed
1167 virtual int enqueue_tail (ACE_MESSAGE_TYPE
*new_item
,
1168 ACE_Time_Value
*timeout
= 0);
1171 * Enqueue an item at the head of the queue.
1173 * @param new_item Pointer to an item that will be added to the queue.
1174 * @param timeout The absolute time the caller will wait until
1175 * for the item to be queued.
1177 * @retval >0 The number of items on the queue after adding
1178 * the specified item.
1179 * @retval -1 On failure. errno holds the reason. Common errno values are:
1180 * - EWOULDBLOCK: the timeout elapsed
1181 * - ESHUTDOWN: the queue was deactivated or pulsed
1183 virtual int enqueue_head (ACE_MESSAGE_TYPE
*new_item
,
1184 ACE_Time_Value
*timeout
= 0);
1186 /// This method is an alias for the following <dequeue_head> method.
1187 virtual int dequeue (ACE_MESSAGE_TYPE
*&first_item
,
1188 ACE_Time_Value
*timeout
= 0);
1191 * Dequeue the item at the head of the queue and return a pointer to it.
1193 * @param first_item Reference to an ACE_MESSAGE_TYPE * that will
1194 * be set to the address of the dequeued item.
1195 * @param timeout The absolute time the caller will wait until
1196 * for an item to be dequeued.
1198 * @retval >=0 The number of items remaining in the queue.
1199 * @retval -1 On failure. errno holds the reason. Common errno values are:
1200 * - EWOULDBLOCK: the timeout elapsed
1201 * - ESHUTDOWN: the queue was deactivated or pulsed
1203 virtual int dequeue_head (ACE_MESSAGE_TYPE
*&first_item
,
1204 ACE_Time_Value
*timeout
= 0);
1207 * Dequeue the item that has the lowest priority (preserves
1208 * FIFO order for items with the same priority) and return a pointer
1211 * @param dequeued Reference to an ACE_MESSAGE_TYPE * that will
1212 * be set to the address of the dequeued item.
1213 * @param timeout The absolute time the caller will wait until
1214 * for an item to be dequeued.
1216 * @retval >=0 The number of items remaining in the queue.
1217 * @retval -1 On failure. errno holds the reason. Common errno values are:
1218 * - EWOULDBLOCK: the timeout elapsed
1219 * - ESHUTDOWN: the queue was deactivated or pulsed
1221 virtual int dequeue_prio (ACE_MESSAGE_TYPE
*&dequeued
,
1222 ACE_Time_Value
*timeout
= 0);
1225 * Dequeue the item at the tail of the queue and return a pointer to it.
1227 * @param dequeued Reference to an ACE_MESSAGE_TYPE * that will
1228 * be set to the address of the dequeued item.
1229 * @param timeout The absolute time the caller will wait until
1230 * for an item to be dequeued.
1232 * @retval >=0 The number of items remaining in the queue.
1233 * @retval -1 On failure. errno holds the reason. Common errno values are:
1234 * - EWOULDBLOCK: the timeout elapsed
1235 * - ESHUTDOWN: the queue was deactivated or pulsed
1237 virtual int dequeue_tail (ACE_MESSAGE_TYPE
*&dequeued
,
1238 ACE_Time_Value
*timeout
= 0);
1241 * Because there's deadline associated with enqueue_deadline(), this
1242 * method will behave just as dequeue_head().
1244 virtual int dequeue_deadline (ACE_MESSAGE_TYPE
*&dequeued
,
1245 ACE_Time_Value
*timeout
= 0);
1248 /** @name Queue statistics methods
1251 /// True if queue is full, else false.
1252 virtual bool is_full ();
1254 /// True if queue is empty, else false.
1255 virtual bool is_empty ();
1258 * Number of total bytes on the queue, i.e., sum of the message
1261 virtual size_t message_bytes ();
1263 * Number of total length on the queue, i.e., sum of the message
1266 virtual size_t message_length ();
1268 * Number of total messages on the queue.
1270 virtual size_t message_count ();
1272 // = Manual changes to these stats (used when queued message blocks
1273 // change size or lengths).
1275 * New value of the number of total bytes on the queue, i.e., sum of
1276 * the message block sizes.
1278 virtual void message_bytes (size_t new_size
);
1280 * New value of the number of total length on the queue, i.e., sum
1281 * of the message block lengths.
1283 virtual void message_length (size_t new_length
);
1287 /** @name Water mark (flow control) methods
1291 * Get high watermark.
1293 virtual size_t high_water_mark ();
1295 * Set the high watermark, which determines how many bytes can be
1296 * stored in a queue before it's considered "full."
1298 virtual void high_water_mark (size_t hwm
);
1301 * Get low watermark.
1303 virtual size_t low_water_mark ();
1305 * Set the low watermark, which determines how many bytes must be in
1306 * the queue before supplier threads are allowed to enqueue
1307 * additional <ACE_MESSAGE_TYPE>s.
1309 virtual void low_water_mark (size_t lwm
);
1312 /** @name Activation and queue state methods
1313 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
1314 * queue states and transitions and how the transitions affect message
1315 * enqueueing and dequeueing operations.
1319 * Deactivate the queue and wakeup all threads waiting on the queue
1320 * so they can continue. No messages are removed from the queue,
1321 * however. Any other operations called until the queue is
1322 * activated again will immediately return -1 with @c errno ==
1323 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
1324 * call and WAS_ACTIVE if queue was active before the call.
1326 virtual int deactivate ();
1329 * Reactivate the queue so that threads can enqueue and dequeue
1330 * messages again. Returns the state of the queue before the call.
1332 virtual int activate ();
1335 * Pulse the queue to wake up any waiting threads. Changes the
1336 * queue state to PULSED; future enqueue/dequeue operations proceed
1337 * as in ACTIVATED state.
1339 * @retval The queue's state before this call.
1341 virtual int pulse ();
1343 /// Returns the current state of the queue, which can be one of
1344 /// ACTIVATED, DEACTIVATED, or PULSED.
1345 virtual int state ();
1347 /// Returns true if the state of the queue is DEACTIVATED,
1348 /// but false if the queue's state is ACTIVATED or PULSED.
1349 virtual int deactivated ();
1352 /** @name Notification strategy methods
1356 * This hook is automatically invoked by <enqueue_head>,
1357 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
1358 * into the queue. Subclasses can override this method to perform
1359 * specific notification strategies (e.g., signaling events for a
1360 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a
1361 * multi-threaded application with concurrent consumers, there is no
1362 * guarantee that the queue will be still be non-empty by the time
1363 * the notification occurs.
1365 virtual int notify ();
1367 /// Get the notification strategy for the <Message_Queue>
1368 virtual ACE_Notification_Strategy
*notification_strategy ();
1370 /// Set the notification strategy for the <Message_Queue>
1371 virtual void notification_strategy (ACE_Notification_Strategy
*s
);
1374 /// Returns a reference to the lock used by the ACE_Message_Queue_Ex.
1375 virtual ACE_SYNCH_MUTEX_T
&lock ();
1377 /// Get the current time of day according to the queue's TIME_POLICY.
1378 /// Allows users to initialize timeout
1379 ACE_Time_Value_T
<TIME_POLICY
> gettimeofday ();
1381 /// Allows applications to control how the timer queue gets the time
1383 void set_time_policy (TIME_POLICY
const & time_policy
);
1385 /// Dump the state of an object.
1386 virtual void dump () const;
1388 /// Support access to the underlying <Message_Queue>. Note that
1389 /// manipulating the lower level queue directly may be hazardous (, but
1390 /// necessary in some scenarios); be sure to lock the queue first.
1391 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> &queue ();
1393 /// Declare the dynamic allocation hooks.
1394 ACE_ALLOC_HOOK_DECLARE
;
1397 /// Implement this via an ACE_Message_Queue.
1398 ACE_Message_Queue
<ACE_SYNCH_USE
, TIME_POLICY
> queue_
;
1402 * @class ACE_Message_Queue_Ex_Iterator
1404 * @brief Iterator for the ACE_Message_Queue_Ex.
1406 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
1407 class ACE_Message_Queue_Ex_Iterator
1410 ACE_Message_Queue_Ex_Iterator (ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
> & queue
);
1412 // = Iteration methods.
1413 /// Pass back the @a entry that hasn't been seen in the queue.
1414 /// Returns 0 when all items have been seen, else 1.
1415 int next (ACE_MESSAGE_TYPE
*&entry
);
1417 /// Returns 1 when all items have been seen, else 0.
1420 /// Move forward by one element in the queue. Returns 0 when all the
1421 /// items in the set have been seen, else 1.
1424 /// Dump the state of an object.
1427 /// Declare the dynamic allocation hooks.
1428 ACE_ALLOC_HOOK_DECLARE
;
1431 /// Implement this via the ACE_Message_Queue_Iterator
1432 ACE_Message_Queue_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
> iter_
;
1436 * @class ACE_Message_Queue_Ex_Iterator
1438 * @brief Reverse iterator for the ACE_Message_Queue_Ex.
1440 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
1441 class ACE_Message_Queue_Ex_Reverse_Iterator
1444 ACE_Message_Queue_Ex_Reverse_Iterator (ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
> & queue
);
1446 // = Iteration methods.
1447 /// Pass back the @a entry that hasn't been seen in the queue.
1448 /// Returns 0 when all items have been seen, else 1.
1449 int next (ACE_MESSAGE_TYPE
*&entry
);
1451 /// Returns 1 when all items have been seen, else 0.
1454 /// Move forward by one element in the queue. Returns 0 when all the
1455 /// items in the set have been seen, else 1.
1458 /// Dump the state of an object.
1461 /// Declare the dynamic allocation hooks.
1462 ACE_ALLOC_HOOK_DECLARE
;
1465 /// Implement this via the ACE_Message_Queue_Reverse_Iterator
1466 ACE_Message_Queue_Reverse_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
> iter_
;
1470 * @class ACE_Message_Queue_Ex_N
1472 * @brief A threaded message queueing facility, modeled after the
1473 * queueing facilities in System V STREAMs which can enqueue
1474 * multiple messages in one call.
1476 * As ACE_Message_Queue_Ex, ACE_Message_Queue_Ex_N is a strongly-typed
1477 * version of the ACE_Message_Queue. If @c ACE_SYNCH_DECL is @c ACE_MT_SYNCH
1478 * then all operations are thread-safe. Otherwise, if it's @c ACE_NULL_SYNCH
1479 * then there's no locking overhead.
1481 * The @c ACE_MESSAGE_TYPE messages that are sent to this
1482 * queue can be chained. Messages are expected to have a
1483 * @c next method that returns the next message in the chain;
1484 * ACE_Message_Queue_Ex_N uses this method to run through
1485 * all the incoming messages and enqueue them in one call.
1487 template <class ACE_MESSAGE_TYPE
, ACE_SYNCH_DECL
, class TIME_POLICY
= ACE_System_Time_Policy
>
1488 class ACE_Message_Queue_Ex_N
: public ACE_Message_Queue_Ex
<ACE_MESSAGE_TYPE
, ACE_SYNCH_USE
, TIME_POLICY
>
1492 * Initialize an ACE_Message_Queue_Ex_N. The @a high_water_mark
1493 * determines how many bytes can be stored in a queue before it's
1494 * considered "full." Supplier threads must block until the queue
1495 * is no longer full. The @a low_water_mark determines how many
1496 * bytes must be in the queue before supplier threads are allowed to
1497 * enqueue additional messages. By default, the @a high_water_mark
1498 * equals the @a low_water_mark, which means that suppliers will be
1499 * able to enqueue new messages as soon as a consumer removes any message
1500 * from the queue. Making the @a low_water_mark smaller than the
1501 * @a high_water_mark forces consumers to drain more messages from the
1502 * queue before suppliers can enqueue new messages, which can minimize
1503 * the "silly window syndrome."
1505 ACE_Message_Queue_Ex_N (size_t high_water_mark
= ACE_Message_Queue_Base::DEFAULT_HWM
,
1506 size_t low_water_mark
= ACE_Message_Queue_Base::DEFAULT_LWM
,
1507 ACE_Notification_Strategy
* ns
= 0);
1509 /// Close down the message queue and release all resources.
1510 virtual ~ACE_Message_Queue_Ex_N ();
1513 * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the head of the queue.
1514 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
1515 * start of a series of @c ACE_MESSAGE_TYPE objects connected via their
1516 * @c next() pointers. The series of blocks will be added to the queue in
1517 * the same order they are passed in as.
1519 * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be
1520 * added to the queue. If the block's @c next() pointer
1521 * is non-zero, all blocks chained from the @c next()
1522 * pointer are enqueued as well.
1523 * @param tv The absolute time the caller will wait until
1524 * for the block to be queued.
1526 * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after
1527 * adding the specified block(s).
1528 * @retval -1 On failure. errno holds the reason. Common errno values are:
1529 * - EWOULDBLOCK: the timeout elapsed
1530 * - ESHUTDOWN: the queue was deactivated or pulsed
1532 virtual int enqueue_head (ACE_MESSAGE_TYPE
*new_item
, ACE_Time_Value
*tv
= 0);
1535 * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the tail of the queue.
1536 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
1537 * start of a series of @c ACE_MESSAGE_TYPE objects connected via their
1538 * @c next() pointers. The series of blocks will be added to the queue in
1539 * the same order they are passed in as.
1541 * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be
1542 * added to the queue. If the block's @c next() pointer
1543 * is non-zero, all blocks chained from the @c next()
1544 * pointer are enqueued as well.
1545 * @param tv The absolute time the caller will wait until
1546 * for the block to be queued.
1548 * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after
1549 * adding the specified block(s).
1550 * @retval -1 On failure. errno holds the reason. Common errno values are:
1551 * - EWOULDBLOCK: the timeout elapsed
1552 * - ESHUTDOWN: the queue was deactivated or pulsed
1554 virtual int enqueue_tail (ACE_MESSAGE_TYPE
*new_item
, ACE_Time_Value
*tv
= 0);
1556 /// Declare the dynamic allocation hooks.
1557 ACE_ALLOC_HOOK_DECLARE
;
1561 * An helper method that wraps the incoming chain messages
1562 * with ACE_Message_Blocks.
1564 ACE_Message_Block
*wrap_with_mbs_i (ACE_MESSAGE_TYPE
*new_item
);
1567 ACE_END_VERSIONED_NAMESPACE_DECL
1569 #include "ace/Message_Queue_T.cpp"
1571 #include /**/ "ace/post.h"
1573 #endif /* ACE_MESSAGE_QUEUE_T_H */