Also use Objects as part of an operation but as a result don't generate Any operation...
[ACE_TAO.git] / ACE / ace / Message_Queue_T.h
blobb1cad73845c129e909b0e4d68527a96550133043
1 /* -*- C++ -*- */
3 //=============================================================================
4 /**
5 * @file Message_Queue_T.h
7 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
8 */
9 //=============================================================================
11 #ifndef ACE_MESSAGE_QUEUE_T_H
12 #define ACE_MESSAGE_QUEUE_T_H
14 #include /**/ "ace/pre.h"
16 #include "ace/Message_Queue.h"
17 #include "ace/Dynamic_Message_Strategy.h"
18 #include "ace/Synch_Traits.h"
19 #include "ace/Guard_T.h"
20 #include "ace/Time_Policy.h"
21 #include "ace/Time_Value_T.h"
22 #if defined (ACE_HAS_THREADS)
23 # include "ace/Condition_Attributes.h"
24 #endif
26 #if !defined (ACE_LACKS_PRAGMA_ONCE)
27 # pragma once
28 #endif /* ACE_LACKS_PRAGMA_ONCE */
30 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
32 #if defined (ACE_VXWORKS)
33 class ACE_Message_Queue_Vx;
34 #endif /* defined (ACE_VXWORKS) */
36 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
37 class ACE_Message_Queue_NT;
38 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO*/
40 #if defined (ACE_HAS_MONITOR_POINTS) && ACE_HAS_MONITOR_POINTS == 1
41 namespace ACE
43 namespace Monitor_Control
45 class Size_Monitor;
48 #endif /* ACE_HAS_MONITOR_POINTS==1 */
50 /**
51 * @class ACE_Message_Queue
53 * @brief A message queuing facility with parameterized synchronization
54 * capability. ACE_Message_Queue is modeled after the queueing facilities
55 * in System V STREAMs.
57 * ACE_Message_Queue is the primary queuing facility for
58 * messages in the ACE framework. It's one template argument parameterizes
59 * the queue's synchronization. The argument specifies a synchronization
60 * strategy. The two main strategies available for ACE_SYNCH_DECL are:
61 * -# ACE_MT_SYNCH: all operations are thread-safe
62 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead
64 * All data passing through ACE_Message_Queue is in the form of
65 * ACE_Message_Block objects. @sa ACE_Message_Block.
67 template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
68 class ACE_Message_Queue : public ACE_Message_Queue_Base
70 public:
71 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE, TIME_POLICY>;
72 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE, TIME_POLICY>;
74 // = Traits
75 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE, TIME_POLICY>
76 ITERATOR;
77 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE, TIME_POLICY>
78 REVERSE_ITERATOR;
80 /**
81 * @name Initialization methods
83 //@{
84 /**
85 * Initialize an ACE_Message_Queue.
87 * @param hwm High water mark. Determines how many bytes can be stored in a
88 * queue before it's considered full. Supplier threads must block
89 * until the queue is no longer full.
90 * @param lwm Low water mark. Determines how many bytes must be in the queue
91 * before supplier threads are allowed to enqueue additional
92 * data. By default, the @a hwm equals @a lwm, which means
93 * that suppliers will be able to enqueue new messages as soon as
94 * a consumer removes any message from the queue. Making the low
95 * water mark smaller than the high water mark forces consumers to
96 * drain more messages from the queue before suppliers can enqueue
97 * new messages, which can minimize the "silly window syndrome."
98 * @param ns Notification strategy. Pointer to an object conforming to the
99 * ACE_Notification_Strategy interface. If set, the object's
100 * notify(void) method will be called each time data is added to
101 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
103 ACE_Message_Queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
104 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
105 ACE_Notification_Strategy *ns = 0);
106 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
107 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
108 ACE_Notification_Strategy *ns = 0);
109 //@}
111 /// Releases all resources from the message queue and marks it deactivated.
112 /// @sa flush().
114 /// @retval The number of messages released from the queue; -1 on error.
115 virtual int close (void);
117 /// Releases all resources from the message queue and marks it deactivated.
118 virtual ~ACE_Message_Queue (void);
121 * Releases all resources from the message queue but does not mark it
122 * deactivated. This method holds the queue lock during this operation.
123 * @sa close().
125 * @return The number of messages flushed; -1 on error.
127 virtual int flush (void);
130 * Release all resources from the message queue but do not mark it
131 * as deactivated.
133 * @pre The caller must be holding the queue lock before calling this
134 * method.
136 * @return The number of messages flushed.
138 virtual int flush_i (void);
140 /** @name Enqueue and dequeue methods
142 * The enqueue and dequeue methods accept a timeout value passed as
143 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
144 * the caller will block until action is possible. If the timeout pointer
145 * is non-zero, the call will wait (if needed, subject to water mark
146 * settings) until the absolute time specified in the referenced
147 * ACE_Time_Value object is reached. If the time is reached before the
148 * desired action is possible, the method will return -1 with errno set
149 * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
150 * these methods will also fail and return -1 when the queue is closed,
151 * deactivated, pulsed, or when a signal occurs.
153 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
154 * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are
155 * affected by queue state transitions.
157 //@{
159 * Retrieve a pointer to the first ACE_Message_Block in the queue
160 * without removing it.
162 * @note Because the block whose pointer is returned is still on the queue,
163 * another thread may dequeue the referenced block at any time,
164 * including before the calling thread examines the peeked-at block.
165 * Be very careful with this method in multithreaded queueing
166 * situations.
168 * @param first_item Reference to an ACE_Message_Block * that will
169 * point to the first block on the queue. The block
170 * remains on the queue until this or another thread
171 * dequeues it.
172 * @param timeout The absolute time the caller will wait until
173 * for a block to be queued.
175 * @retval >0 The number of ACE_Message_Blocks on the queue.
176 * @retval -1 On failure. errno holds the reason. Common errno values are:
177 * - EWOULDBLOCK: the timeout elapsed
178 * - ESHUTDOWN: the queue was deactivated or pulsed
180 virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
181 ACE_Time_Value *timeout = 0);
184 * Enqueue an ACE_Message_Block into the queue in accordance with
185 * the ACE_Message_Block's priority (0 is lowest priority). FIFO
186 * order is maintained when messages of the same priority are
187 * inserted consecutively.
189 * @param new_item Pointer to an ACE_Message_Block that will be
190 * added to the queue. The block's @c msg_priority()
191 * method will be called to obtain the queueing priority.
192 * @param timeout The absolute time the caller will wait until
193 * for the block to be queued.
195 * @retval >0 The number of ACE_Message_Blocks on the queue after adding
196 * the specified block.
197 * @retval -1 On failure. errno holds the reason. Common errno values are:
198 * - EWOULDBLOCK: the timeout elapsed
199 * - ESHUTDOWN: the queue was deactivated or pulsed
201 virtual int enqueue_prio (ACE_Message_Block *new_item,
202 ACE_Time_Value *timeout = 0);
205 * Enqueue an ACE_Message_Block into the queue in accordance with the
206 * block's deadline time. FIFO order is maintained when messages of
207 * the same deadline time are inserted consecutively.
209 * @param new_item Pointer to an ACE_Message_Block that will be
210 * added to the queue. The block's @c msg_deadline_time()
211 * method will be called to obtain the relative queueing
212 * position.
213 * @param timeout The absolute time the caller will wait until
214 * for the block to be queued.
216 * @retval >0 The number of ACE_Message_Blocks on the queue after adding
217 * the specified block.
218 * @retval -1 On failure. errno holds the reason. Common errno values are:
219 * - EWOULDBLOCK: the timeout elapsed
220 * - ESHUTDOWN: the queue was deactivated or pulsed
222 virtual int enqueue_deadline (ACE_Message_Block *new_item,
223 ACE_Time_Value *timeout = 0);
226 * @deprecated This is an alias for enqueue_prio(). It's only here for
227 * backwards compatibility and will go away in a subsequent release.
228 * Please use enqueue_prio() instead.
230 virtual int enqueue (ACE_Message_Block *new_item,
231 ACE_Time_Value *timeout = 0);
234 * Enqueue one or more ACE_Message_Block objects at the tail of the queue.
235 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
236 * start of a series of ACE_Message_Block objects connected via their
237 * @c next() pointers. The series of blocks will be added to the queue in
238 * the same order they are passed in as.
240 * @param new_item Pointer to an ACE_Message_Block that will be
241 * added to the queue. If the block's @c next() pointer
242 * is non-zero, all blocks chained from the @c next()
243 * pointer are enqueued as well.
244 * @param timeout The absolute time the caller will wait until
245 * for the block to be queued.
247 * @retval >0 The number of ACE_Message_Blocks on the queue after adding
248 * the specified block(s).
249 * @retval -1 On failure. errno holds the reason. Common errno values are:
250 * - EWOULDBLOCK: the timeout elapsed
251 * - ESHUTDOWN: the queue was deactivated or pulsed
253 virtual int enqueue_tail (ACE_Message_Block *new_item,
254 ACE_Time_Value *timeout = 0);
257 * Enqueue one or more ACE_Message_Block objects at the head of the queue.
258 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
259 * start of a series of ACE_Message_Block objects connected via their
260 * @c next() pointers. The series of blocks will be added to the queue in
261 * the same order they are passed in as.
263 * @param new_item Pointer to an ACE_Message_Block that will be
264 * added to the queue. If the block's @c next() pointer
265 * is non-zero, all blocks chained from the @c next()
266 * pointer are enqueued as well.
267 * @param timeout The absolute time the caller will wait until
268 * for the block to be queued.
270 * @retval >0 The number of ACE_Message_Blocks on the queue after adding
271 * the specified block(s).
272 * @retval -1 On failure. errno holds the reason. Common errno values are:
273 * - EWOULDBLOCK: the timeout elapsed
274 * - ESHUTDOWN: the queue was deactivated or pulsed
276 virtual int enqueue_head (ACE_Message_Block *new_item,
277 ACE_Time_Value *timeout = 0);
279 /// This method is an alias for the dequeue_head() method.
280 virtual int dequeue (ACE_Message_Block *&first_item,
281 ACE_Time_Value *timeout = 0);
284 * Dequeue the ACE_Message_Block at the head of the queue and return
285 * a pointer to the dequeued block.
287 * @param first_item Reference to an ACE_Message_Block * that will
288 * be set to the address of the dequeued block.
289 * @param timeout The absolute time the caller will wait until
290 * for a block to be dequeued.
292 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
293 * @retval -1 On failure. errno holds the reason. Common errno values are:
294 * - EWOULDBLOCK: the timeout elapsed
295 * - ESHUTDOWN: the queue was deactivated or pulsed
297 virtual int dequeue_head (ACE_Message_Block *&first_item,
298 ACE_Time_Value *timeout = 0);
301 * Dequeue the ACE_Message_Block that has the lowest priority (preserves
302 * FIFO order for messages with the same priority) and return a pointer
303 * to the dequeued block.
305 * @param first_item Reference to an ACE_Message_Block * that will
306 * be set to the address of the dequeued block.
307 * @param timeout The absolute time the caller will wait until
308 * for a block to be dequeued.
310 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
311 * @retval -1 On failure. errno holds the reason. Common errno values are:
312 * - EWOULDBLOCK: the timeout elapsed
313 * - ESHUTDOWN: the queue was deactivated or pulsed
315 virtual int dequeue_prio (ACE_Message_Block *&first_item,
316 ACE_Time_Value *timeout = 0);
319 * Dequeue the ACE_Message_Block at the tail of the queue and return
320 * a pointer to the dequeued block.
322 * @param dequeued Reference to an ACE_Message_Block * that will
323 * be set to the address of the dequeued block.
324 * @param timeout The absolute time the caller will wait until
325 * for a block to be dequeued.
327 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
328 * @retval -1 On failure. errno holds the reason. Common errno values are:
329 * - EWOULDBLOCK: the timeout elapsed
330 * - ESHUTDOWN: the queue was deactivated or pulsed
332 virtual int dequeue_tail (ACE_Message_Block *&dequeued,
333 ACE_Time_Value *timeout = 0);
336 * Dequeue the ACE_Message_Block with the earliest deadline time and return
337 * a pointer to the dequeued block.
339 * @param dequeued Reference to an ACE_Message_Block * that will
340 * be set to the address of the dequeued block.
341 * @param timeout The absolute time the caller will wait until
342 * for a block to be dequeued.
344 * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
345 * @retval -1 On failure. errno holds the reason. Common errno values are:
346 * - EWOULDBLOCK: the timeout elapsed
347 * - ESHUTDOWN: the queue was deactivated or pulsed
349 virtual int dequeue_deadline (ACE_Message_Block *&dequeued,
350 ACE_Time_Value *timeout = 0);
351 //@}
353 /** @name Queue statistics methods
355 //@{
357 /// True if queue is full, else false.
358 virtual bool is_full (void);
359 /// True if queue is empty, else false.
360 virtual bool is_empty (void);
363 * Number of total bytes on the queue, i.e., sum of the message
364 * block sizes.
366 virtual size_t message_bytes (void);
369 * Number of total length on the queue, i.e., sum of the message
370 * block lengths.
372 virtual size_t message_length (void);
375 * Number of total messages on the queue.
377 virtual size_t message_count (void);
379 // = Manual changes to these stats (used when queued message blocks
380 // change size or lengths).
382 * New value of the number of total bytes on the queue, i.e., sum of
383 * the message block sizes.
385 virtual void message_bytes (size_t new_size);
387 * New value of the number of total length on the queue, i.e., sum
388 * of the message block lengths.
390 virtual void message_length (size_t new_length);
392 //@}
395 /** @name Water mark (flow control) methods
397 //@{
400 * Get high watermark.
402 virtual size_t high_water_mark (void);
404 * Set the high watermark, which determines how many bytes can be
405 * stored in a queue before it's considered "full."
407 virtual void high_water_mark (size_t hwm);
410 * Get low watermark.
412 virtual size_t low_water_mark (void);
414 * Set the low watermark, which determines how many bytes must be in
415 * the queue before supplier threads are allowed to enqueue
416 * additional ACE_Message_Blocks.
418 virtual void low_water_mark (size_t lwm);
419 //@}
421 /** @name Activation and queue state methods
422 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
423 * queue states and transitions and how the transitions affect message
424 * enqueueing and dequeueing operations.
426 //@{
429 * Deactivate the queue and wakeup all threads waiting on the queue
430 * so they can continue. No messages are removed from the queue,
431 * however. Any other operations called until the queue is
432 * activated again will immediately return -1 with @c errno ==
433 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
434 * call and WAS_ACTIVE if queue was active before the call.
436 virtual int deactivate (void);
439 * Reactivate the queue so that threads can enqueue and dequeue
440 * messages again. Returns the state of the queue before the call.
442 virtual int activate (void);
445 * Pulse the queue to wake up any waiting threads. Changes the
446 * queue state to PULSED; future enqueue/dequeue operations proceed
447 * as in ACTIVATED state.
449 * @return The queue's state before this call.
451 virtual int pulse (void);
453 /// Returns the current state of the queue, which can be one of
454 /// ACTIVATED, DEACTIVATED, or PULSED.
455 virtual int state (void);
457 /// Returns true if the state of the queue is <DEACTIVATED>,
458 /// but false if the queue's is <ACTIVATED> or <PULSED>.
459 virtual int deactivated (void);
460 //@}
462 /** @name Notification strategy methods
464 //@{
467 * This hook is automatically invoked by <enqueue_head>,
468 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
469 * into the queue. Subclasses can override this method to perform
470 * specific notification strategies (e.g., signaling events for a
471 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a
472 * multi-threaded application with concurrent consumers, there is no
473 * guarantee that the queue will be still be non-empty by the time
474 * the notification occurs.
476 virtual int notify (void);
478 /// Get the notification strategy for the <Message_Queue>
479 virtual ACE_Notification_Strategy *notification_strategy (void);
481 /// Set the notification strategy for the <Message_Queue>
482 virtual void notification_strategy (ACE_Notification_Strategy *s);
483 //@}
485 /// Returns a reference to the lock used by the ACE_Message_Queue.
486 virtual ACE_SYNCH_MUTEX_T &lock (void);
488 /// Get the current time of day according to the queue's TIME_POLICY.
489 /// Allows users to initialize timeout values using correct time policy.
490 ACE_Time_Value_T<TIME_POLICY> gettimeofday (void) const;
492 /// Allows applications to control how the timer queue gets the time
493 /// of day.
494 void set_time_policy (TIME_POLICY const & time_policy);
496 /// Dump the state of an object.
497 virtual void dump (void) const;
499 /// Declare the dynamic allocation hooks.
500 ACE_ALLOC_HOOK_DECLARE;
502 protected:
503 // = Routines that actually do the enqueueing and dequeueing.
505 // These routines assume that locks are held by the corresponding
506 // public methods. Since they are virtual, you can change the
507 // queueing mechanism by subclassing from ACE_Message_Queue.
509 /// Enqueue an <ACE_Message_Block *> in accordance with its priority.
510 virtual int enqueue_i (ACE_Message_Block *new_item);
512 /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time.
513 virtual int enqueue_deadline_i (ACE_Message_Block *new_item);
515 /// Enqueue an <ACE_Message_Block *> at the end of the queue.
516 virtual int enqueue_tail_i (ACE_Message_Block *new_item);
518 /// Enqueue an <ACE_Message_Block *> at the head of the queue.
519 virtual int enqueue_head_i (ACE_Message_Block *new_item);
521 /// Dequeue and return the <ACE_Message_Block *> at the head of the
522 /// queue.
523 virtual int dequeue_head_i (ACE_Message_Block *&first_item);
525 /// Dequeue and return the <ACE_Message_Block *> with the lowest
526 /// priority.
527 virtual int dequeue_prio_i (ACE_Message_Block *&dequeued);
529 /// Dequeue and return the <ACE_Message_Block *> at the tail of the
530 /// queue.
531 virtual int dequeue_tail_i (ACE_Message_Block *&first_item);
533 /// Dequeue and return the <ACE_Message_Block *> with the lowest
534 /// deadline time.
535 virtual int dequeue_deadline_i (ACE_Message_Block *&first_item);
537 // = Check the boundary conditions (assumes locks are held).
539 /// True if queue is full, else false.
540 virtual bool is_full_i (void);
542 /// True if queue is empty, else false.
543 virtual bool is_empty_i (void);
545 // = Implementation of the public <activate> and <deactivate> methods.
547 // These methods assume locks are held.
550 * Notifies all waiting threads that the queue has been deactivated
551 * so they can wakeup and continue other processing.
552 * No messages are removed from the queue.
554 * @param pulse If 0, the queue's state is changed to DEACTIVATED
555 * and any other operations called until the queue is
556 * reactivated will immediately return -1 with
557 * errno == ESHUTDOWN.
558 * If not zero, only the waiting threads are notified and
559 * the queue's state changes to PULSED.
561 * @return The state of the queue before the call.
563 virtual int deactivate_i (int pulse = 0);
565 /// Activate the queue.
566 virtual int activate_i (void);
568 // = Helper methods to factor out common #ifdef code.
570 /// Wait for the queue to become non-full.
571 virtual int wait_not_full_cond (ACE_Time_Value *timeout);
573 /// Wait for the queue to become non-empty.
574 virtual int wait_not_empty_cond (ACE_Time_Value *timeout);
576 /// Inform any threads waiting to enqueue that they can procede.
577 virtual int signal_enqueue_waiters (void);
579 /// Inform any threads waiting to dequeue that they can procede.
580 virtual int signal_dequeue_waiters (void);
582 /// Pointer to head of ACE_Message_Block list.
583 ACE_Message_Block *head_;
585 /// Pointer to tail of ACE_Message_Block list.
586 ACE_Message_Block *tail_;
588 /// Lowest number before unblocking occurs.
589 size_t low_water_mark_;
591 /// Greatest number of bytes before blocking.
592 size_t high_water_mark_;
594 /// Current number of bytes in the queue.
595 size_t cur_bytes_;
597 /// Current length of messages in the queue.
598 size_t cur_length_;
600 /// Current number of messages in the queue.
601 size_t cur_count_;
603 /// The notification strategy used when a new message is enqueued.
604 ACE_Notification_Strategy *notification_strategy_;
606 // = Synchronization primitives for controlling concurrent access.
607 /// Protect queue from concurrent access.
608 ACE_SYNCH_MUTEX_T lock_;
610 #if defined (ACE_HAS_THREADS)
611 /// Attributes to initialize conditions with.
612 /* We only need this because some crappy compilers can't
613 properly handle initializing the conditions with
614 temporary objects. */
615 ACE_Condition_Attributes_T<TIME_POLICY> cond_attr_;
616 #endif
618 /// Used to make threads sleep until the queue is no longer empty.
619 ACE_SYNCH_CONDITION_T not_empty_cond_;
621 /// Used to make threads sleep until the queue is no longer full.
622 ACE_SYNCH_CONDITION_T not_full_cond_;
624 /// The policy to return the current time of day
625 TIME_POLICY time_policy_;
627 /// Sends the size of the queue whenever it changes.
628 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
629 ACE::Monitor_Control::Size_Monitor *monitor_;
630 #endif
632 private:
634 // = Disallow these operations.
635 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &))
636 ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &))
639 // This typedef is used to get around a compiler bug in g++/vxworks.
640 typedef ACE_Message_Queue<ACE_SYNCH> ACE_DEFAULT_MESSAGE_QUEUE_TYPE;
644 * @class ACE_Message_Queue_Iterator
646 * @brief Iterator for the ACE_Message_Queue.
648 template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
649 class ACE_Message_Queue_Iterator
651 public:
652 ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE, TIME_POLICY> &queue);
654 // = Iteration methods.
655 /// Pass back the @a entry that hasn't been seen in the queue.
656 /// Returns 0 when all items have been seen, else 1.
657 int next (ACE_Message_Block *&entry);
659 /// Returns 1 when all items have been seen, else 0.
660 int done (void) const;
662 /// Move forward by one element in the queue. Returns 0 when all the
663 /// items in the set have been seen, else 1.
664 int advance (void);
666 /// Dump the state of an object.
667 void dump (void) const;
669 /// Declare the dynamic allocation hooks.
670 ACE_ALLOC_HOOK_DECLARE;
672 private:
673 /// Message_Queue we are iterating over.
674 ACE_Message_Queue <ACE_SYNCH_USE, TIME_POLICY> &queue_;
676 /// Keeps track of how far we've advanced...
677 ACE_Message_Block *curr_;
681 * @class ACE_Message_Queue_Reverse_Iterator
683 * @brief Reverse Iterator for the ACE_Message_Queue.
685 template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
686 class ACE_Message_Queue_Reverse_Iterator
688 public:
689 ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE, TIME_POLICY> &queue);
691 // = Iteration methods.
692 /// Pass back the @a entry that hasn't been seen in the queue.
693 /// Returns 0 when all items have been seen, else 1.
694 int next (ACE_Message_Block *&entry);
696 /// Returns 1 when all items have been seen, else 0.
697 int done (void) const;
699 /// Move forward by one element in the queue. Returns 0 when all the
700 /// items in the set have been seen, else 1.
701 int advance (void);
703 /// Dump the state of an object.
704 void dump (void) const;
706 /// Declare the dynamic allocation hooks.
707 ACE_ALLOC_HOOK_DECLARE;
709 private:
710 /// Message_Queue we are iterating over.
711 ACE_Message_Queue <ACE_SYNCH_USE, TIME_POLICY> &queue_;
713 /// Keeps track of how far we've advanced...
714 ACE_Message_Block *curr_;
718 * @class ACE_Dynamic_Message_Queue
720 * @brief A derived class which adapts the ACE_Message_Queue
721 * class in order to maintain dynamic priorities for enqueued
722 * <ACE_Message_Blocks> and manage the queue order according
723 * to these dynamic priorities.
725 * The messages in the queue are managed so as to preserve
726 * a logical ordering with minimal overhead per enqueue and
727 * dequeue operation. For this reason, the actual order of
728 * messages in the linked list of the queue may differ from
729 * their priority order. As time passes, a message may change
730 * from pending status to late status, and eventually to beyond
731 * late status. To minimize reordering overhead under this
732 * design force, three separate boundaries are maintained
733 * within the linked list of messages. Messages are dequeued
734 * preferentially from the head of the pending portion, then
735 * the head of the late portion, and finally from the head
736 * of the beyond late portion. In this way, only the boundaries
737 * need to be maintained (which can be done efficiently, as
738 * aging messages maintain the same linked list order as they
739 * progress from one status to the next), with no reordering
740 * of the messages themselves, while providing correct priority
741 * ordered dequeueing semantics.
742 * Head and tail enqueue methods inherited from ACE_Message_Queue
743 * are made private to prevent out-of-order messages from confusing
744 * management of the various portions of the queue. Messages in
745 * the pending portion of the queue whose priority becomes late
746 * (according to the specific dynamic strategy) advance into
747 * the late portion of the queue. Messages in the late portion
748 * of the queue whose priority becomes later than can be represented
749 * advance to the beyond_late portion of the queue. These behaviors
750 * support a limited schedule overrun, with pending messages prioritized
751 * ahead of late messages, and late messages ahead of beyond late
752 * messages. These behaviors can be modified in derived classes by
753 * providing alternative definitions for the appropriate virtual methods.
754 * When filled with messages, the queue's linked list should look like:
755 * H T
756 * | |
757 * B - B - B - B - L - L - L - P - P - P - P - P
758 * | | | | | |
759 * BH BT LH LT PH PT
760 * Where the symbols are as follows:
761 * H = Head of the entire list
762 * T = Tail of the entire list
763 * B = Beyond late message
764 * BH = Beyond late messages Head
765 * BT = Beyond late messages Tail
766 * L = Late message
767 * LH = Late messages Head
768 * LT = Late messages Tail
769 * P = Pending message
770 * PH = Pending messages Head
771 * PT = Pending messages Tail
772 * Caveat: the virtual methods enqueue_tail, enqueue_head,
773 * and peek_dequeue_head have semantics for the static
774 * message queues that cannot be guaranteed for dynamic
775 * message queues. The peek_dequeue_head method just
776 * calls the base class method, while the two enqueue
777 * methods call the priority enqueue method. The
778 * order of messages in the dynamic queue is a function
779 * of message deadlines and how long they are in the
780 * queues. You can manipulate these in some cases to
781 * ensure the correct semantics, but that is not a
782 * very stable or portable approach (discouraged).
784 template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
785 class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY>
787 public:
788 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
789 size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
790 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
791 ACE_Notification_Strategy * = 0);
793 /// Close down the message queue and release all resources.
794 virtual ~ACE_Dynamic_Message_Queue (void);
797 * Detach all messages with status given in the passed flags from
798 * the queue and return them by setting passed head and tail pointers
799 * to the linked list they comprise. This method is intended primarily
800 * as a means of periodically harvesting messages that have missed
801 * their deadlines, but is available in its most general form. All
802 * messages are returned in priority order, from head to tail, as of
803 * the time this method was called.
805 virtual int remove_messages (ACE_Message_Block *&list_head,
806 ACE_Message_Block *&list_tail,
807 u_int status_flags);
810 * Dequeue and return the <ACE_Message_Block *> at the head of the
811 * queue. Returns -1 on failure, else the number of items still on
812 * the queue.
814 virtual int dequeue_head (ACE_Message_Block *&first_item,
815 ACE_Time_Value *timeout = 0);
817 /// Dump the state of the queue.
818 virtual void dump (void) const;
821 * Just call priority enqueue method: tail enqueue semantics for dynamic
822 * message queues are unstable: the message may or may not be where
823 * it was placed after the queue is refreshed prior to the next
824 * enqueue or dequeue operation.
826 virtual int enqueue_tail (ACE_Message_Block *new_item,
827 ACE_Time_Value *timeout = 0);
830 * Just call priority enqueue method: head enqueue semantics for dynamic
831 * message queues are unstable: the message may or may not be where
832 * it was placed after the queue is refreshed prior to the next
833 * enqueue or dequeue operation.
835 virtual int enqueue_head (ACE_Message_Block *new_item,
836 ACE_Time_Value *timeout = 0);
839 /// Declare the dynamic allocation hooks.
840 ACE_ALLOC_HOOK_DECLARE;
842 protected:
845 * Enqueue an <ACE_Message_Block *> in accordance with its priority.
846 * priority may be *dynamic* or *static* or a combination or *both*
847 * It calls the priority evaluation function passed into the Dynamic
848 * Message Queue constructor to update the priorities of all
849 * enqueued messages.
851 virtual int enqueue_i (ACE_Message_Block *new_item);
853 /// Enqueue a message in priority order within a given priority status sublist
854 virtual int sublist_enqueue_i (ACE_Message_Block *new_item,
855 const ACE_Time_Value &current_time,
856 ACE_Message_Block *&sublist_head,
857 ACE_Message_Block *&sublist_tail,
858 ACE_Dynamic_Message_Strategy::Priority_Status status);
861 * Dequeue and return the <ACE_Message_Block *> at the head of the
862 * logical queue. Attempts first to dequeue from the pending
863 * portion of the queue, or if that is empty from the late portion,
864 * or if that is empty from the beyond late portion, or if that is
865 * empty just sets the passed pointer to zero and returns -1.
867 virtual int dequeue_head_i (ACE_Message_Block *&first_item);
869 /// Refresh the queue using the strategy
870 /// specific priority status function.
871 virtual int refresh_queue (const ACE_Time_Value & current_time);
873 /// Refresh the pending queue using the strategy
874 /// specific priority status function.
875 virtual int refresh_pending_queue (const ACE_Time_Value & current_time);
877 /// Refresh the late queue using the strategy
878 /// specific priority status function.
879 virtual int refresh_late_queue (const ACE_Time_Value & current_time);
881 /// Pointer to head of the pending messages
882 ACE_Message_Block *pending_head_;
884 /// Pointer to tail of the pending messages
885 ACE_Message_Block *pending_tail_;
887 /// Pointer to head of the late messages
888 ACE_Message_Block *late_head_;
890 /// Pointer to tail of the late messages
891 ACE_Message_Block *late_tail_;
893 /// Pointer to head of the beyond late messages
894 ACE_Message_Block *beyond_late_head_;
896 /// Pointer to tail of the beyond late messages
897 ACE_Message_Block *beyond_late_tail_;
899 /// Pointer to a dynamic priority evaluation function.
900 ACE_Dynamic_Message_Strategy &message_strategy_;
902 private:
903 // = Disallow public access to these operations.
905 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> &))
906 ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> &))
908 // provide definitions for these (just call base class method),
909 // but make them private so they're not accessible outside the class
911 /// Private method to hide public base class method: just calls base class method
912 virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
913 ACE_Time_Value *timeout = 0);
918 * @class ACE_Message_Queue_Factory
920 * @brief ACE_Message_Queue_Factory is a static factory class template which
921 * provides a separate factory method for each of the major kinds of
922 * priority based message dispatching: static, earliest deadline first
923 * (EDF), and minimum laxity first (MLF).
925 * The ACE_Dynamic_Message_Queue class assumes responsibility for
926 * releasing the resources of the strategy with which it was
927 * constructed: the user of a message queue constructed by
928 * any of these factory methods is only responsible for
929 * ensuring destruction of the message queue itself.
931 template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
932 class ACE_Message_Queue_Factory
934 public:
935 /// Factory method for a statically prioritized ACE_Message_Queue
936 static ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *
937 create_static_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
938 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
939 ACE_Notification_Strategy * = 0);
941 /// Factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
942 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *
943 create_deadline_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
944 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
945 ACE_Notification_Strategy * = 0,
946 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
947 u_long static_bit_field_shift = 10, // 10 low order bits
948 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
949 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
951 /// Factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
952 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *
953 create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
954 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
955 ACE_Notification_Strategy * = 0,
956 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
957 u_long static_bit_field_shift = 10, // 10 low order bits
958 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
959 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
962 #if defined (ACE_VXWORKS)
964 /// Factory method for a wrapped VxWorks message queue
965 static ACE_Message_Queue_Vx *
966 create_Vx_message_queue (size_t max_messages, size_t max_message_length,
967 ACE_Notification_Strategy *ns = 0);
969 #endif /* defined (ACE_VXWORKS) */
971 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
973 /// Factory method for a NT message queue.
974 static ACE_Message_Queue_NT *
975 create_NT_message_queue (size_t max_threads);
977 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
980 // Forward decls.
981 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> class ACE_Message_Queue_Ex_Iterator;
982 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> class ACE_Message_Queue_Ex_Reverse_Iterator;
985 * @class ACE_Message_Queue_Ex
987 * @brief A threaded message queueing facility, modeled after the
988 * queueing facilities in System V STREAMs.
990 * ACE_Message_Queue_Ex is a strongly-typed version of the
991 * ACE_Message_Queue class. Rather than queueing in terms of ACE_Message_Block
992 * objects, ACE_Message_Queue_Ex has a template argument to specify the
993 * type of objects that are queued.
995 * The second template argument parameterizes the queue's synchronization.
996 * The argument specifies a synchronization strategy. The two main
997 * strategies available for ACE_SYNCH_DECL are:
998 * -# ACE_MT_SYNCH: all operations are thread-safe
999 * -# ACE_NULL_SYNCH: no synchronization and no locking overhead
1001 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
1002 class ACE_Message_Queue_Ex
1004 public:
1006 enum
1008 /// Default priority value. This is the lowest priority.
1009 DEFAULT_PRIORITY = 0
1012 friend class ACE_Message_Queue_Ex_Iterator <ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>;
1013 friend class ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>;
1015 // = Traits
1016 typedef ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>
1017 ITERATOR;
1018 typedef ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>
1019 REVERSE_ITERATOR;
1022 * @name Initialization methods
1024 //@{
1026 * Initialize an ACE_Message_Queue_Ex.
1028 * @param high_water_mark High water mark. Determines how many bytes can be
1029 * stored in a queue before it's considered full. Supplier threads
1030 * must block until the queue is no longer full.
1031 * @param low_water_mark Low water mark. Determines how many bytes must be in
1032 * the queue before supplier threads are allowed to enqueue additional
1033 * data. By default, the @a hwm equals @a lwm, which means
1034 * that suppliers will be able to enqueue new messages as soon as
1035 * a consumer removes any message from the queue. Making the low
1036 * water mark smaller than the high water mark forces consumers to
1037 * drain more messages from the queue before suppliers can enqueue
1038 * new messages, which can minimize the "silly window syndrome."
1039 * @param ns Notification strategy. Pointer to an object conforming to the
1040 * ACE_Notification_Strategy interface. If set, the object's
1041 * notify(void) method will be called each time data is added to
1042 * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
1044 ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
1045 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
1046 ACE_Notification_Strategy * ns = 0);
1047 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
1048 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
1049 ACE_Notification_Strategy * = 0);
1050 //@}
1052 /// Releases all resources from the message queue and marks it deactivated.
1053 /// @sa flush().
1055 /// @retval The number of messages released from the queue; -1 on error.
1056 virtual int close (void);
1058 /// Releases all resources from the message queue and marks it deactivated.
1059 virtual ~ACE_Message_Queue_Ex (void);
1062 * Releases all resources from the message queue but does not mark it
1063 * deactivated. This method holds the queue lock during this operation.
1064 * @sa close().
1066 * @return The number of messages flushed; -1 on error.
1068 virtual int flush (void);
1071 * Release all resources from the message queue but do not mark it
1072 * as deactivated.
1074 * @pre The caller must be holding the queue lock before calling this
1075 * method.
1077 * @return The number of messages flushed.
1079 virtual int flush_i (void);
1081 /** @name Enqueue and dequeue methods
1083 * The enqueue and dequeue methods accept a timeout value passed as
1084 * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
1085 * the caller will block until action is possible. If the timeout pointer
1086 * is non-zero, the call will wait (if needed, subject to water mark
1087 * settings) until the absolute time specified in the referenced
1088 * ACE_Time_Value object is reached. If the time is reached before the
1089 * desired action is possible, the method will return -1 with errno set
1090 * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
1091 * these methods will also fail and return -1 when the queue is closed,
1092 * deactivated, pulsed, or when a signal occurs.
1094 * The time parameters are handled the same as in ACE_Message_Queue, so
1095 * you can see C++NPv2 Section 6.2 and APG Section 12.3 for a fuller
1096 * treatment of ACE_Message_Queue, enqueueing, dequeueing, and how these
1097 * operations are affected by queue state transitions.
1099 //@{
1101 * Retrieve a pointer to the first item in the queue without removing it.
1103 * @note Because the item whose pointer is returned is still on the queue,
1104 * another thread may dequeue that item at any time,
1105 * including before the calling thread examines the peeked-at item.
1106 * Be very careful with this method in multithreaded queueing
1107 * situations.
1109 * @param first_item Reference to an ACE_MESSAGE_TYPE * that will
1110 * point to the first item on the queue. The item
1111 * remains on the queue until this or another thread
1112 * dequeues it.
1113 * @param timeout The absolute time the caller will wait until
1114 * for an item to be queued.
1116 * @retval >0 The number of items on the queue.
1117 * @retval -1 On failure. errno holds the reason. Common errno values are:
1118 * - EWOULDBLOCK: the timeout elapsed
1119 * - ESHUTDOWN: the queue was deactivated or pulsed
1121 virtual int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
1122 ACE_Time_Value *timeout = 0);
1125 * Enqueue an ACE_MESSAGE TYPE into the queue in accordance with
1126 * the specified priority (0 is lowest priority). FIFO
1127 * order is maintained when items of the same priority are
1128 * inserted consecutively.
1130 * @param new_item Pointer to an item that will be added to the queue.
1131 * @param timeout The absolute time the caller will wait until
1132 * for the block to be queued.
1133 * @param priority The priority to use when enqueueing the item.
1135 * @retval >0 The number of items on the queue after adding
1136 * the specified item.
1137 * @retval -1 On failure. errno holds the reason. Common errno values are:
1138 * - EWOULDBLOCK: the timeout elapsed
1139 * - ESHUTDOWN: the queue was deactivated or pulsed
1141 virtual int enqueue_prio (ACE_MESSAGE_TYPE *new_item,
1142 ACE_Time_Value *timeout = 0,
1143 unsigned long priority = DEFAULT_PRIORITY);
1146 * This method acts just like enqueue_tail(). There's no deadline
1147 * time associated with items.
1149 virtual int enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
1150 ACE_Time_Value *timeout = 0);
1153 * @deprecated This is an alias for enqueue_prio(). It's only here for
1154 * backwards compatibility and will go away in a subsequent release.
1155 * Please use enqueue_prio() instead.
1157 virtual int enqueue (ACE_MESSAGE_TYPE *new_item,
1158 ACE_Time_Value *timeout = 0);
1161 * Enqueue an item at the tail of the queue.
1163 * @param new_item Pointer to an item that will be added to the queue.
1164 * @param timeout The absolute time the caller will wait until
1165 * for the item to be queued.
1167 * @retval >0 The number of items on the queue after adding
1168 * the specified item.
1169 * @retval -1 On failure. errno holds the reason. Common errno values are:
1170 * - EWOULDBLOCK: the timeout elapsed
1171 * - ESHUTDOWN: the queue was deactivated or pulsed
1173 virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item,
1174 ACE_Time_Value *timeout = 0);
1177 * Enqueue an item at the head of the queue.
1179 * @param new_item Pointer to an item that will be added to the queue.
1180 * @param timeout The absolute time the caller will wait until
1181 * for the item to be queued.
1183 * @retval >0 The number of items on the queue after adding
1184 * the specified item.
1185 * @retval -1 On failure. errno holds the reason. Common errno values are:
1186 * - EWOULDBLOCK: the timeout elapsed
1187 * - ESHUTDOWN: the queue was deactivated or pulsed
1189 virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item,
1190 ACE_Time_Value *timeout = 0);
1192 /// This method is an alias for the following <dequeue_head> method.
1193 virtual int dequeue (ACE_MESSAGE_TYPE *&first_item,
1194 ACE_Time_Value *timeout = 0);
1197 * Dequeue the item at the head of the queue and return a pointer to it.
1199 * @param first_item Reference to an ACE_MESSAGE_TYPE * that will
1200 * be set to the address of the dequeued item.
1201 * @param timeout The absolute time the caller will wait until
1202 * for an item to be dequeued.
1204 * @retval >=0 The number of items remaining in the queue.
1205 * @retval -1 On failure. errno holds the reason. Common errno values are:
1206 * - EWOULDBLOCK: the timeout elapsed
1207 * - ESHUTDOWN: the queue was deactivated or pulsed
1209 virtual int dequeue_head (ACE_MESSAGE_TYPE *&first_item,
1210 ACE_Time_Value *timeout = 0);
1213 * Dequeue the item that has the lowest priority (preserves
1214 * FIFO order for items with the same priority) and return a pointer
1215 * to it.
1217 * @param dequeued Reference to an ACE_MESSAGE_TYPE * that will
1218 * be set to the address of the dequeued item.
1219 * @param timeout The absolute time the caller will wait until
1220 * for an item to be dequeued.
1222 * @retval >=0 The number of items remaining in the queue.
1223 * @retval -1 On failure. errno holds the reason. Common errno values are:
1224 * - EWOULDBLOCK: the timeout elapsed
1225 * - ESHUTDOWN: the queue was deactivated or pulsed
1227 virtual int dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
1228 ACE_Time_Value *timeout = 0);
1231 * Dequeue the item at the tail of the queue and return a pointer to it.
1233 * @param dequeued Reference to an ACE_MESSAGE_TYPE * that will
1234 * be set to the address of the dequeued item.
1235 * @param timeout The absolute time the caller will wait until
1236 * for an item to be dequeued.
1238 * @retval >=0 The number of items remaining in the queue.
1239 * @retval -1 On failure. errno holds the reason. Common errno values are:
1240 * - EWOULDBLOCK: the timeout elapsed
1241 * - ESHUTDOWN: the queue was deactivated or pulsed
1243 virtual int dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
1244 ACE_Time_Value *timeout = 0);
1247 * Because there's deadline associated with enqueue_deadline(), this
1248 * method will behave just as dequeue_head().
1250 virtual int dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
1251 ACE_Time_Value *timeout = 0);
1252 //@}
1254 /** @name Queue statistics methods
1256 //@{
1258 /// True if queue is full, else false.
1259 virtual bool is_full (void);
1261 /// True if queue is empty, else false.
1262 virtual bool is_empty (void);
1265 * Number of total bytes on the queue, i.e., sum of the message
1266 * block sizes.
1268 virtual size_t message_bytes (void);
1270 * Number of total length on the queue, i.e., sum of the message
1271 * block lengths.
1273 virtual size_t message_length (void);
1275 * Number of total messages on the queue.
1277 virtual size_t message_count (void);
1279 // = Manual changes to these stats (used when queued message blocks
1280 // change size or lengths).
1282 * New value of the number of total bytes on the queue, i.e., sum of
1283 * the message block sizes.
1285 virtual void message_bytes (size_t new_size);
1287 * New value of the number of total length on the queue, i.e., sum
1288 * of the message block lengths.
1290 virtual void message_length (size_t new_length);
1292 //@}
1294 /** @name Water mark (flow control) methods
1296 //@{
1299 * Get high watermark.
1301 virtual size_t high_water_mark (void);
1303 * Set the high watermark, which determines how many bytes can be
1304 * stored in a queue before it's considered "full."
1306 virtual void high_water_mark (size_t hwm);
1309 * Get low watermark.
1311 virtual size_t low_water_mark (void);
1313 * Set the low watermark, which determines how many bytes must be in
1314 * the queue before supplier threads are allowed to enqueue
1315 * additional <ACE_MESSAGE_TYPE>s.
1317 virtual void low_water_mark (size_t lwm);
1318 //@}
1320 /** @name Activation and queue state methods
1321 * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
1322 * queue states and transitions and how the transitions affect message
1323 * enqueueing and dequeueing operations.
1325 //@{
1328 * Deactivate the queue and wakeup all threads waiting on the queue
1329 * so they can continue. No messages are removed from the queue,
1330 * however. Any other operations called until the queue is
1331 * activated again will immediately return -1 with @c errno ==
1332 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
1333 * call and WAS_ACTIVE if queue was active before the call.
1335 virtual int deactivate (void);
1338 * Reactivate the queue so that threads can enqueue and dequeue
1339 * messages again. Returns the state of the queue before the call.
1341 virtual int activate (void);
1344 * Pulse the queue to wake up any waiting threads. Changes the
1345 * queue state to PULSED; future enqueue/dequeue operations proceed
1346 * as in ACTIVATED state.
1348 * @retval The queue's state before this call.
1350 virtual int pulse (void);
1352 /// Returns the current state of the queue, which can be one of
1353 /// ACTIVATED, DEACTIVATED, or PULSED.
1354 virtual int state (void);
1356 /// Returns true if the state of the queue is DEACTIVATED,
1357 /// but false if the queue's state is ACTIVATED or PULSED.
1358 virtual int deactivated (void);
1359 //@}
1361 /** @name Notification strategy methods
1363 //@{
1366 * This hook is automatically invoked by <enqueue_head>,
1367 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
1368 * into the queue. Subclasses can override this method to perform
1369 * specific notification strategies (e.g., signaling events for a
1370 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a
1371 * multi-threaded application with concurrent consumers, there is no
1372 * guarantee that the queue will be still be non-empty by the time
1373 * the notification occurs.
1375 virtual int notify (void);
1377 /// Get the notification strategy for the <Message_Queue>
1378 virtual ACE_Notification_Strategy *notification_strategy (void);
1380 /// Set the notification strategy for the <Message_Queue>
1381 virtual void notification_strategy (ACE_Notification_Strategy *s);
1382 //@}
1384 /// Returns a reference to the lock used by the ACE_Message_Queue_Ex.
1385 virtual ACE_SYNCH_MUTEX_T &lock (void);
1387 /// Get the current time of day according to the queue's TIME_POLICY.
1388 /// Allows users to initialize timeout
1389 ACE_Time_Value_T<TIME_POLICY> gettimeofday ();
1391 /// Allows applications to control how the timer queue gets the time
1392 /// of day.
1393 void set_time_policy (TIME_POLICY const & time_policy);
1395 /// Dump the state of an object.
1396 virtual void dump (void) const;
1398 /// Declare the dynamic allocation hooks.
1399 ACE_ALLOC_HOOK_DECLARE;
1401 protected:
1402 /// Implement this via an ACE_Message_Queue.
1403 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> queue_;
1407 * @class ACE_Message_Queue_Ex_Iterator
1409 * @brief Iterator for the ACE_Message_Queue_Ex.
1411 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
1412 class ACE_Message_Queue_Ex_Iterator
1414 public:
1415 ACE_Message_Queue_Ex_Iterator (ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY> & queue);
1417 // = Iteration methods.
1418 /// Pass back the @a entry that hasn't been seen in the queue.
1419 /// Returns 0 when all items have been seen, else 1.
1420 int next (ACE_MESSAGE_TYPE *&entry);
1422 /// Returns 1 when all items have been seen, else 0.
1423 int done (void) const;
1425 /// Move forward by one element in the queue. Returns 0 when all the
1426 /// items in the set have been seen, else 1.
1427 int advance (void);
1429 /// Dump the state of an object.
1430 void dump (void) const;
1432 /// Declare the dynamic allocation hooks.
1433 ACE_ALLOC_HOOK_DECLARE;
1435 private:
1436 /// Implement this via the ACE_Message_Queue_Iterator
1437 ACE_Message_Queue_Iterator<ACE_SYNCH_USE, TIME_POLICY> iter_;
1441 * @class ACE_Message_Queue_Ex_Iterator
1443 * @brief Reverse iterator for the ACE_Message_Queue_Ex.
1445 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
1446 class ACE_Message_Queue_Ex_Reverse_Iterator
1448 public:
1449 ACE_Message_Queue_Ex_Reverse_Iterator (ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY> & queue);
1451 // = Iteration methods.
1452 /// Pass back the @a entry that hasn't been seen in the queue.
1453 /// Returns 0 when all items have been seen, else 1.
1454 int next (ACE_MESSAGE_TYPE *&entry);
1456 /// Returns 1 when all items have been seen, else 0.
1457 int done (void) const;
1459 /// Move forward by one element in the queue. Returns 0 when all the
1460 /// items in the set have been seen, else 1.
1461 int advance (void);
1463 /// Dump the state of an object.
1464 void dump (void) const;
1466 /// Declare the dynamic allocation hooks.
1467 ACE_ALLOC_HOOK_DECLARE;
1469 private:
1470 /// Implement this via the ACE_Message_Queue_Reverse_Iterator
1471 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE, TIME_POLICY> iter_;
1475 * @class ACE_Message_Queue_Ex_N
1477 * @brief A threaded message queueing facility, modeled after the
1478 * queueing facilities in System V STREAMs which can enqueue
1479 * multiple messages in one call.
1481 * As ACE_Message_Queue_Ex, ACE_Message_Queue_Ex_N is a strongly-typed
1482 * version of the ACE_Message_Queue. If @c ACE_SYNCH_DECL is @c ACE_MT_SYNCH
1483 * then all operations are thread-safe. Otherwise, if it's @c ACE_NULL_SYNCH
1484 * then there's no locking overhead.
1486 * The @c ACE_MESSAGE_TYPE messages that are sent to this
1487 * queue can be chained. Messages are expected to have a
1488 * @c next method that returns the next message in the chain;
1489 * ACE_Message_Queue_Ex_N uses this method to run through
1490 * all the incoming messages and enqueue them in one call.
1492 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
1493 class ACE_Message_Queue_Ex_N : public ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>
1495 public:
1497 * Initialize an ACE_Message_Queue_Ex_N. The @a high_water_mark
1498 * determines how many bytes can be stored in a queue before it's
1499 * considered "full." Supplier threads must block until the queue
1500 * is no longer full. The @a low_water_mark determines how many
1501 * bytes must be in the queue before supplier threads are allowed to
1502 * enqueue additional messages. By default, the @a high_water_mark
1503 * equals the @a low_water_mark, which means that suppliers will be
1504 * able to enqueue new messages as soon as a consumer removes any message
1505 * from the queue. Making the @a low_water_mark smaller than the
1506 * @a high_water_mark forces consumers to drain more messages from the
1507 * queue before suppliers can enqueue new messages, which can minimize
1508 * the "silly window syndrome."
1510 ACE_Message_Queue_Ex_N (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
1511 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
1512 ACE_Notification_Strategy * ns = 0);
1514 /// Close down the message queue and release all resources.
1515 virtual ~ACE_Message_Queue_Ex_N (void);
1518 * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the head of the queue.
1519 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
1520 * start of a series of @c ACE_MESSAGE_TYPE objects connected via their
1521 * @c next() pointers. The series of blocks will be added to the queue in
1522 * the same order they are passed in as.
1524 * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be
1525 * added to the queue. If the block's @c next() pointer
1526 * is non-zero, all blocks chained from the @c next()
1527 * pointer are enqueued as well.
1528 * @param tv The absolute time the caller will wait until
1529 * for the block to be queued.
1531 * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after
1532 * adding the specified block(s).
1533 * @retval -1 On failure. errno holds the reason. Common errno values are:
1534 * - EWOULDBLOCK: the timeout elapsed
1535 * - ESHUTDOWN: the queue was deactivated or pulsed
1537 virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0);
1540 * Enqueue one or more @c ACE_MESSAGE_TYPE objects at the tail of the queue.
1541 * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
1542 * start of a series of @c ACE_MESSAGE_TYPE objects connected via their
1543 * @c next() pointers. The series of blocks will be added to the queue in
1544 * the same order they are passed in as.
1546 * @param new_item Pointer to an @c ACE_MESSAGE_TYPE that will be
1547 * added to the queue. If the block's @c next() pointer
1548 * is non-zero, all blocks chained from the @c next()
1549 * pointer are enqueued as well.
1550 * @param tv The absolute time the caller will wait until
1551 * for the block to be queued.
1553 * @retval >0 The number of @c ACE_MESSAGE_TYPE objects on the queue after
1554 * adding the specified block(s).
1555 * @retval -1 On failure. errno holds the reason. Common errno values are:
1556 * - EWOULDBLOCK: the timeout elapsed
1557 * - ESHUTDOWN: the queue was deactivated or pulsed
1559 virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, ACE_Time_Value *tv = 0);
1561 /// Declare the dynamic allocation hooks.
1562 ACE_ALLOC_HOOK_DECLARE;
1564 protected:
1566 * An helper method that wraps the incoming chain messages
1567 * with ACE_Message_Blocks.
1569 ACE_Message_Block *wrap_with_mbs_i (ACE_MESSAGE_TYPE *new_item);
1572 ACE_END_VERSIONED_NAMESPACE_DECL
1574 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
1575 #include "ace/Message_Queue_T.cpp"
1576 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
1578 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
1579 #pragma implementation ("Message_Queue_T.cpp")
1580 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
1582 #include /**/ "ace/post.h"
1584 #endif /* ACE_MESSAGE_QUEUE_T_H */