Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / ace / Message_Queue_NT.cpp
blobe3d82e10ea2e7961d56e36ebd1b3862049e8e18d
1 #include "ace/Message_Queue.h"
2 #include "ace/Message_Queue_NT.h"
3 #include "ace/Log_Category.h"
5 #if !defined (__ACE_INLINE__)
6 #include "ace/Message_Queue_NT.inl"
7 #endif /* __ACE_INLINE__ */
9 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
11 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
13 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
14 : max_cthrs_ (max_threads),
15 cur_thrs_ (0),
16 cur_bytes_ (0),
17 cur_length_ (0),
18 cur_count_ (0),
19 completion_port_ (ACE_INVALID_HANDLE)
21 ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
22 this->open (max_threads);
25 int
26 ACE_Message_Queue_NT::open (DWORD max_threads)
28 ACE_TRACE ("ACE_Message_Queue_NT::open");
29 this->max_cthrs_ = max_threads;
30 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
31 this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
33 ACE_Message_Queue_Base::ACTIVATED,
34 max_threads);
35 if (this->completion_port_ == 0)
36 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
37 return (this->completion_port_ == 0 ? -1 : 0);
40 int
41 ACE_Message_Queue_NT::close ()
43 ACE_TRACE ("ACE_Message_Queue_NT::close");
44 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
45 this->deactivate ();
46 return (::CloseHandle (this->completion_port_) ? 0 : -1 );
49 ACE_Message_Queue_NT::~ACE_Message_Queue_NT ()
51 ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
52 this->close ();
55 int
56 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
57 ACE_Time_Value *)
59 ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
60 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
61 if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
63 size_t const msize = new_item->total_size ();
64 size_t const mlength = new_item->total_length ();
65 // Note - we send ACTIVATED in the 3rd arg to tell the completion
66 // routine it's _NOT_ being woken up because of deactivate().
67 ULONG_PTR state_to_post;
68 state_to_post = ACE_Message_Queue_Base::ACTIVATED;
69 if (::PostQueuedCompletionStatus (this->completion_port_,
70 static_cast<DWORD> (msize),
71 state_to_post,
72 reinterpret_cast<LPOVERLAPPED> (new_item)))
74 // Update the states once I succeed.
75 this->cur_bytes_ += msize;
76 this->cur_length_ += mlength;
77 return ACE_Utils::truncate_cast<int> (++this->cur_count_);
80 else
81 errno = ESHUTDOWN;
83 // Fail to enqueue the message.
84 return -1;
87 int
88 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
89 ACE_Time_Value *timeout)
91 ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
94 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
96 // Make sure the MQ is not deactivated before proceeding.
97 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
99 errno = ESHUTDOWN; // Operation on deactivated MQ not allowed.
100 return -1;
102 else
103 ++this->cur_thrs_; // Increase the waiting thread count.
106 ULONG_PTR queue_state;
107 DWORD msize;
108 // Get a message from the completion port.
109 int retv = ::GetQueuedCompletionStatus (this->completion_port_,
110 &msize,
111 &queue_state,
112 reinterpret_cast<LPOVERLAPPED *> (&first_item),
113 (timeout == 0 ? INFINITE : timeout->msec ()));
115 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
116 --this->cur_thrs_; // Decrease waiting thread count.
117 if (retv)
119 if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
120 { // Really get a valid MB from the queue.
121 --this->cur_count_;
122 this->cur_bytes_ -= msize;
123 this->cur_length_ -= first_item->total_length ();
124 return ACE_Utils::truncate_cast<int> (this->cur_count_);
126 else // Woken up by deactivate () or pulse ().
127 errno = ESHUTDOWN;
130 return -1;
134 ACE_Message_Queue_NT::deactivate ()
136 ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
137 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
139 int const previous_state = this->state_;
140 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
142 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
144 // Get the number of shutdown messages necessary to wake up all
145 // waiting threads.
146 DWORD cntr =
147 this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
148 while (cntr-- > 0)
149 ::PostQueuedCompletionStatus (this->completion_port_,
151 this->state_,
154 return previous_state;
158 ACE_Message_Queue_NT::activate ()
160 ACE_TRACE ("ACE_Message_Queue_NT::activate");
161 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
162 int const previous_status = this->state_;
163 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
164 return previous_status;
168 ACE_Message_Queue_NT::pulse ()
170 ACE_TRACE ("ACE_Message_Queue_NT::pulse");
171 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
173 int const previous_state = this->state_;
174 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
176 this->state_ = ACE_Message_Queue_Base::PULSED;
178 // Get the number of shutdown messages necessary to wake up all
179 // waiting threads.
181 DWORD cntr =
182 this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
183 while (cntr-- > 0)
184 ::PostQueuedCompletionStatus (this->completion_port_,
186 this->state_,
189 return previous_state;
192 void
193 ACE_Message_Queue_NT::dump () const
195 #if defined (ACE_HAS_DUMP)
196 ACE_TRACE ("ACE_Message_Queue_NT::dump");
198 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
199 switch (this->state_)
201 case ACE_Message_Queue_Base::ACTIVATED:
202 ACELIB_DEBUG ((LM_DEBUG,
203 ACE_TEXT ("state = ACTIVATED\n")));
204 break;
205 case ACE_Message_Queue_Base::DEACTIVATED:
206 ACELIB_DEBUG ((LM_DEBUG,
207 ACE_TEXT ("state = DEACTIVATED\n")));
208 break;
209 case ACE_Message_Queue_Base::PULSED:
210 ACELIB_DEBUG ((LM_DEBUG,
211 ACE_TEXT ("state = PULSED\n")));
212 break;
215 ACELIB_DEBUG ((LM_DEBUG,
216 ACE_TEXT ("max_cthrs_ = %d\n")
217 ACE_TEXT ("cur_thrs_ = %d\n")
218 ACE_TEXT ("cur_bytes = %d\n")
219 ACE_TEXT ("cur_length = %d\n")
220 ACE_TEXT ("cur_count = %d\n")
221 ACE_TEXT ("completion_port_ = %x\n"),
222 this->max_cthrs_,
223 this->cur_thrs_,
224 this->cur_bytes_,
225 this->cur_length_,
226 this->cur_count_,
227 this->completion_port_));
228 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
229 #endif /* ACE_HAS_DUMP */
232 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
234 ACE_END_VERSIONED_NAMESPACE_DECL