1 #include "ace/Message_Queue.h"
2 #include "ace/Message_Queue_NT.h"
3 #include "ace/Log_Category.h"
5 #if !defined (__ACE_INLINE__)
6 #include "ace/Message_Queue_NT.inl"
7 #endif /* __ACE_INLINE__ */
9 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
11 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
13 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads
)
14 : max_cthrs_ (max_threads
),
19 completion_port_ (ACE_INVALID_HANDLE
)
21 ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
22 this->open (max_threads
);
26 ACE_Message_Queue_NT::open (DWORD max_threads
)
28 ACE_TRACE ("ACE_Message_Queue_NT::open");
29 this->max_cthrs_
= max_threads
;
30 this->state_
= ACE_Message_Queue_Base::ACTIVATED
;
31 this->completion_port_
= ::CreateIoCompletionPort (ACE_INVALID_HANDLE
,
33 ACE_Message_Queue_Base::ACTIVATED
,
35 if (this->completion_port_
== 0)
36 this->state_
= ACE_Message_Queue_Base::DEACTIVATED
;
37 return (this->completion_port_
== 0 ? -1 : 0);
41 ACE_Message_Queue_NT::close ()
43 ACE_TRACE ("ACE_Message_Queue_NT::close");
44 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1);
46 return (::CloseHandle (this->completion_port_
) ? 0 : -1 );
49 ACE_Message_Queue_NT::~ACE_Message_Queue_NT ()
51 ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
56 ACE_Message_Queue_NT::enqueue (ACE_Message_Block
*new_item
,
59 ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
60 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1);
61 if (this->state_
!= ACE_Message_Queue_Base::DEACTIVATED
)
63 size_t const msize
= new_item
->total_size ();
64 size_t const mlength
= new_item
->total_length ();
65 // Note - we send ACTIVATED in the 3rd arg to tell the completion
66 // routine it's _NOT_ being woken up because of deactivate().
67 ULONG_PTR state_to_post
;
68 state_to_post
= ACE_Message_Queue_Base::ACTIVATED
;
69 if (::PostQueuedCompletionStatus (this->completion_port_
,
70 static_cast<DWORD
> (msize
),
72 reinterpret_cast<LPOVERLAPPED
> (new_item
)))
74 // Update the states once I succeed.
75 this->cur_bytes_
+= msize
;
76 this->cur_length_
+= mlength
;
77 return ACE_Utils::truncate_cast
<int> (++this->cur_count_
);
83 // Fail to enqueue the message.
88 ACE_Message_Queue_NT::dequeue (ACE_Message_Block
*&first_item
,
89 ACE_Time_Value
*timeout
)
91 ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
94 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1);
96 // Make sure the MQ is not deactivated before proceeding.
97 if (this->state_
== ACE_Message_Queue_Base::DEACTIVATED
)
99 errno
= ESHUTDOWN
; // Operation on deactivated MQ not allowed.
103 ++this->cur_thrs_
; // Increase the waiting thread count.
106 ULONG_PTR queue_state
;
108 // Get a message from the completion port.
109 int retv
= ::GetQueuedCompletionStatus (this->completion_port_
,
112 reinterpret_cast<LPOVERLAPPED
*> (&first_item
),
113 (timeout
== 0 ? INFINITE
: timeout
->msec ()));
115 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1);
116 --this->cur_thrs_
; // Decrease waiting thread count.
119 if (queue_state
== ACE_Message_Queue_Base::ACTIVATED
)
120 { // Really get a valid MB from the queue.
122 this->cur_bytes_
-= msize
;
123 this->cur_length_
-= first_item
->total_length ();
124 return ACE_Utils::truncate_cast
<int> (this->cur_count_
);
126 else // Woken up by deactivate () or pulse ().
134 ACE_Message_Queue_NT::deactivate ()
136 ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
137 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1);
139 int const previous_state
= this->state_
;
140 if (previous_state
!= ACE_Message_Queue_Base::DEACTIVATED
)
142 this->state_
= ACE_Message_Queue_Base::DEACTIVATED
;
144 // Get the number of shutdown messages necessary to wake up all
147 this->cur_thrs_
- static_cast<DWORD
> (this->cur_count_
);
149 ::PostQueuedCompletionStatus (this->completion_port_
,
154 return previous_state
;
158 ACE_Message_Queue_NT::activate ()
160 ACE_TRACE ("ACE_Message_Queue_NT::activate");
161 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1);
162 int const previous_status
= this->state_
;
163 this->state_
= ACE_Message_Queue_Base::ACTIVATED
;
164 return previous_status
;
168 ACE_Message_Queue_NT::pulse ()
170 ACE_TRACE ("ACE_Message_Queue_NT::pulse");
171 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1);
173 int const previous_state
= this->state_
;
174 if (previous_state
!= ACE_Message_Queue_Base::DEACTIVATED
)
176 this->state_
= ACE_Message_Queue_Base::PULSED
;
178 // Get the number of shutdown messages necessary to wake up all
182 this->cur_thrs_
- static_cast<DWORD
> (this->cur_count_
);
184 ::PostQueuedCompletionStatus (this->completion_port_
,
189 return previous_state
;
193 ACE_Message_Queue_NT::dump () const
195 #if defined (ACE_HAS_DUMP)
196 ACE_TRACE ("ACE_Message_Queue_NT::dump");
198 ACELIB_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
199 switch (this->state_
)
201 case ACE_Message_Queue_Base::ACTIVATED
:
202 ACELIB_DEBUG ((LM_DEBUG
,
203 ACE_TEXT ("state = ACTIVATED\n")));
205 case ACE_Message_Queue_Base::DEACTIVATED
:
206 ACELIB_DEBUG ((LM_DEBUG
,
207 ACE_TEXT ("state = DEACTIVATED\n")));
209 case ACE_Message_Queue_Base::PULSED
:
210 ACELIB_DEBUG ((LM_DEBUG
,
211 ACE_TEXT ("state = PULSED\n")));
215 ACELIB_DEBUG ((LM_DEBUG
,
216 ACE_TEXT ("max_cthrs_ = %d\n")
217 ACE_TEXT ("cur_thrs_ = %d\n")
218 ACE_TEXT ("cur_bytes = %d\n")
219 ACE_TEXT ("cur_length = %d\n")
220 ACE_TEXT ("cur_count = %d\n")
221 ACE_TEXT ("completion_port_ = %x\n"),
227 this->completion_port_
));
228 ACELIB_DEBUG ((LM_DEBUG
, ACE_END_DUMP
));
229 #endif /* ACE_HAS_DUMP */
232 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
234 ACE_END_VERSIONED_NAMESPACE_DECL