Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / protocols / ace / INet / StreamHandler.cpp
blobcc521e656a2cef900c5afdc22b5e768241d216b6
1 #ifndef ACE_IOS_STREAM_HANDLER_CPP
2 #define ACE_IOS_STREAM_HANDLER_CPP
4 #include "ace/INet/INet_Log.h"
5 #include "ace/INet/StreamHandler.h"
6 #include "ace/OS_NS_Thread.h"
7 #include "ace/OS_NS_errno.h"
8 #include "ace/Countdown_Time.h"
9 #include "ace/Truncate.h"
11 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
13 namespace ACE
15 namespace IOS
17 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
18 StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::StreamHandler (
19 const ACE_Synch_Options &synch_options,
20 ACE_Thread_Manager *thr_mgr,
21 mq_type *mq,
22 ACE_Reactor *reactor)
23 : ACE_Svc_Handler<ACE_PEER_STREAM, ACE_SYNCH_USE> (thr_mgr, mq, reactor),
24 connected_ (false),
25 send_timeout_ (false),
26 receive_timeout_ (false),
27 notification_strategy_ (reactor,
28 this,
29 ACE_Event_Handler::WRITE_MASK)
31 INET_TRACE ("ACE_IOS_StreamHandler - ctor");
33 unsigned long opt = synch_options[ACE_Synch_Options::USE_REACTOR] ?
34 ACE_Synch_Options::USE_REACTOR : 0;
35 if (synch_options[ACE_Synch_Options::USE_TIMEOUT])
36 opt |= ACE_Synch_Options::USE_TIMEOUT;
37 this->sync_opt_.set (opt,
38 synch_options.timeout (),
39 synch_options.arg ());
42 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
43 StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::~StreamHandler ()
45 INET_TRACE ("ACE_IOS_StreamHandler - dtor");
47 this->connected_ = false;
50 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
51 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::open (void * /*p*/)
53 this->connected_ = true;
54 return 0;
57 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
58 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::close (u_long flags)
60 this->connected_ = false;
61 return base_type::close (flags);
64 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
65 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::handle_input (ACE_HANDLE)
67 // always read non-blocking however much there is
68 ACE_Time_Value to = ACE_Time_Value::zero;
69 return this->handle_input_i (MAX_INPUT_SIZE, &to);
72 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
73 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::handle_input_i (size_t rdlen, ACE_Time_Value* timeout)
75 INET_TRACE ("ACE_IOS_StreamHandler::handle_input_i");
77 char buffer[MAX_INPUT_SIZE];
78 ssize_t recv_cnt;
79 size_t bytes_in = 0;
81 // blocking (with or without timeout) or non-blocking?
82 bool no_wait = timeout && (*timeout == ACE_Time_Value::zero);
84 recv_cnt = this->peer ().recv_n (buffer,
85 rdlen <= sizeof(buffer) ? rdlen : sizeof(buffer),
86 timeout,
87 &bytes_in);
89 if (bytes_in > 0)
91 INET_HEX_DUMP (11, (LM_DEBUG, buffer, bytes_in, DLINFO
92 ACE_TEXT ("ACE_IOS_StreamHandler::handle_input_i <--")));
94 ACE_Message_Block *mb = 0;
95 ACE_NEW_RETURN (mb, ACE_Message_Block (bytes_in), -1);
96 mb->copy (buffer, bytes_in);
97 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
98 if (this->putq (mb, &nowait) == -1)
100 INET_ERROR (1, (LM_ERROR, DLINFO
101 ACE_TEXT ("ACE_IOS_StreamHandler - discarding input data, "),
102 ACE_TEXT ("enqueue failed (%d)\n"),
103 ACE_OS::last_error ()));
104 mb->release ();
105 this->connected_ = false;
106 return -1;
110 if (recv_cnt == 0 || (recv_cnt < 0 && !no_wait))
112 if (recv_cnt < 0)
114 INET_ERROR (1, (LM_ERROR, DLINFO
115 ACE_TEXT ("ACE_IOS_StreamHandler - receive failed (%d)\n"),
116 ACE_OS::last_error ()));
118 this->connected_ = false;
119 return this->using_reactor () ? -1 : 0;
121 return 0;
124 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
125 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::handle_output (ACE_HANDLE)
127 if (this->use_timeout ())
129 ACE_Time_Value to = this->sync_opt_.timeout ();
130 return this->handle_output_i (&to);
132 else
133 return this->handle_output_i (0);
136 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
137 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::handle_output_i (ACE_Time_Value* timeout)
139 INET_TRACE ("ACE_IOS_StreamHandler::handle_output_i");
141 ACE_Message_Block *mb = 0;
142 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
143 size_t bytes_out = 0;
144 if (-1 != this->getq (mb, &nowait))
146 ssize_t send_cnt =
147 this->peer ().send_n (mb->rd_ptr (), mb->length (), timeout, &bytes_out);
148 if (bytes_out > 0)
150 INET_HEX_DUMP (11, (LM_DEBUG, mb->rd_ptr (), bytes_out, DLINFO
151 ACE_TEXT ("ACE_IOS_StreamHandler::handle_output_i -->")));
153 mb->rd_ptr (static_cast<size_t> (bytes_out));
154 if (mb->length () > 0)
155 this->ungetq (mb);
156 else
157 mb->release ();
159 if (send_cnt <= 0)
161 INET_ERROR (1, (LM_ERROR, DLINFO
162 ACE_TEXT ("%p; ACE_IOS_StreamHandler - "),
163 ACE_TEXT ("send failed\n")));
164 this->connected_ = false;
165 return this->using_reactor () ? -1 : 0;
168 return (this->msg_queue ()->is_empty ()) ? -1 : 0;
171 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
172 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::read_from_stream (
173 void * buf,
174 size_t length,
175 size_t char_size)
177 INET_TRACE ("ACE_IOS_StreamHandler::read_from_stream");
179 size_t recv_char_count = 0;
180 char* wptr = (char*)buf;
181 size_t char_length = length * char_size;
182 ACE_Time_Value max_wait_time = this->sync_opt_.timeout ();
183 int result = 0;
184 if (this->using_reactor ())
186 ACE_thread_t tid;
187 this->reactor ()->owner (&tid);
188 bool reactor_thread =
189 ACE_OS::thr_equal (ACE_Thread::self (), tid) ? true : false;
191 if (this->connected_)
193 if (this->reactor ()->register_handler(this,
194 ACE_Event_Handler::READ_MASK) != 0)
196 return -1;
200 // run the event loop for the maximum allowed time to get the
201 // message data in
202 while ((this->connected_ || this->char_in_queue (char_size)) && char_length > 0)
204 result = 0;
205 if (reactor_thread && !this->char_in_queue (char_size))
207 // Run the event loop.
208 result = this->reactor ()->handle_events (this->use_timeout () ?
209 &max_wait_time : 0);
212 if (result != -1)
214 result = this->process_input (&wptr[recv_char_count],
215 char_length,
216 char_size,
217 this->use_timeout () ?
218 &max_wait_time : 0);
221 if (result == -1)
223 this->reactor ()->remove_handler (this,
224 ACE_Event_Handler::READ_MASK);
225 return -1;
228 recv_char_count += result;
230 if (recv_char_count > 0)
232 break;
235 if (this->use_timeout () &&
236 max_wait_time == ACE_Time_Value::zero)
238 this->reactor ()->remove_handler (this,
239 ACE_Event_Handler::READ_MASK);
240 this->receive_timeout_ = true;
241 return -1;
245 this->reactor ()->remove_handler (this,
246 ACE_Event_Handler::READ_MASK);
248 else
250 // non-reactive
251 // the first read we will try to read as much as possible
252 // non-blocking
253 // if that does not result in any data the next read will be
254 // blocking for 1 char_size data
255 size_t rdlen = MAX_INPUT_SIZE;
256 ACE_Time_Value timeout = ACE_Time_Value::zero;
257 ACE_Time_Value* to = &timeout;
258 while ((this->connected_ || this->char_in_queue (char_size)) && char_length > 0)
260 if (!this->char_in_queue (char_size))
262 // nothing in queue, so see if there is anything newly arrived
263 result = this->handle_input_i (rdlen, to);
266 if (result == -1)
267 return result;
269 result = this->process_input (&wptr[recv_char_count],
270 char_length,
271 char_size,
272 this->use_timeout () ?
273 &max_wait_time : 0);
275 if (result == -1)
276 return result;
278 recv_char_count += result;
280 if (recv_char_count > 0)
282 // if we got any char_size data (either newly read
283 // or remainder from queue) we quit
284 break;
287 if (this->use_timeout () &&
288 max_wait_time == ACE_Time_Value::zero)
290 this->receive_timeout_ = true;
291 return -1;
294 if (this->connected_ && char_length >0)
296 // nothing has been read the first time round
297 // now start blocking read 1 char_size data at a time
298 rdlen = char_size;
299 to = this->use_timeout () ? &max_wait_time : 0;
304 return ACE_Utils::truncate_cast<int> (recv_char_count / char_size);
307 // This method makes sure to only ever copy full char_size elements
308 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
309 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::process_input (
310 char* buf,
311 size_t& char_length,
312 size_t char_size,
313 ACE_Time_Value* timeout)
315 INET_TRACE ("ACE_IOS_StreamHandler::process_input");
317 ACE_Time_Value wait (ACE_OS::gettimeofday ());
318 // keep track of how much time we use here
319 ACE_Countdown_Time timeout_countdown (timeout);
320 // if timeout specified add it to the abs waittime
321 // otherwise it's a 'nowait'
322 if (timeout)
324 wait += *timeout;
325 timeout_countdown.start ();
327 ACE_Message_Block *mb_remain = 0;
328 size_t recv_char_count = 0;
329 while (!this->msg_queue ()->is_empty () && char_length > 0)
331 ACE_Message_Block *mb = 0;
332 if (this->getq (mb, &wait) == -1)
334 if (ACE_OS::last_error () == EWOULDBLOCK)
335 break; // timeout; queue still empty
336 else
337 return -1; // message queue shut down
340 size_t copy_len = 0;
342 if (mb_remain)
344 if ((mb_remain->length () + mb->length ()) < char_size)
346 ACE_Message_Block *mb_new = 0;
347 ACE_NEW_NORETURN (mb,
348 ACE_Message_Block (mb_remain->length () + mb->length ()));
349 if (mb_new == 0)
351 mb->release ();
352 mb_remain->release ();
353 return -1; // out of memory error
355 mb_new->copy (mb_remain->rd_ptr (), mb_remain->length ());
356 mb_remain->release ();
357 mb_new->copy (mb->rd_ptr (), mb->length ());
358 mb->release ();
359 mb_remain = mb_new;
360 continue; // check for next msg block
363 copy_len = (mb_remain->length () > char_length) ?
364 char_length :
365 mb_remain->length ();
366 ACE_OS::memmove (&buf[recv_char_count],
367 mb_remain->rd_ptr (),
368 copy_len);
369 char_length -= copy_len;
370 recv_char_count += copy_len;
371 mb_remain->rd_ptr (copy_len);
372 if (mb_remain->length () > 0)
374 continue; // buffer is full
377 // cleanup empty block
378 mb_remain->release ();
379 mb_remain = 0;
382 // normalize to total nr of char_size elements available in mb [+ mb_remain]
383 size_t total_char_len = ((mb->length () + copy_len)/ char_size) * char_size;
384 // what was the max we could copy?
385 size_t max_copy_len = (total_char_len > char_length) ?
386 char_length :
387 total_char_len;
388 // subtract what we possibly already copied from mb_remain
389 copy_len = max_copy_len - copy_len;
391 ACE_OS::memmove (&buf[recv_char_count],
392 mb->rd_ptr (),
393 copy_len);
394 recv_char_count += copy_len;
395 char_length -= copy_len;
396 mb->rd_ptr (copy_len);
397 if (mb->length () > 0)
399 mb_remain = mb;
401 else
402 mb->release ();
405 if (mb_remain)
407 this->ungetq (mb_remain);
410 if (timeout)
412 // stop countdown; update timeout value
413 timeout_countdown.stop ();
416 return ACE_Utils::truncate_cast<int> (recv_char_count);
419 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
420 bool StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::use_timeout () const
422 return this->sync_opt_[ACE_Synch_Options::USE_TIMEOUT];
425 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
426 bool StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::char_in_queue (size_t char_size)
428 return this->msg_queue ()->message_bytes () >= char_size;
431 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
432 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::write_to_stream (const void * buf, size_t length, size_t char_size)
434 INET_TRACE ("ACE_IOS_StreamHandler::write_to_stream");
436 // check if we're allowed to control the reactor if reactive
437 bool use_reactor = this->using_reactor ();
438 if (use_reactor)
440 ACE_thread_t tid;
441 this->reactor ()->owner (&tid);
442 use_reactor =
443 ACE_OS::thr_equal (ACE_Thread::self (), tid) ? true : false;
446 // set notification strategy if reactive
447 NotificationStrategyGuard ns_guard__(*this,
448 use_reactor ?
449 &this->notification_strategy_ : 0);
451 size_t datasz = length * char_size;
452 ACE_Message_Block *mb = 0;
453 ACE_NEW_RETURN (mb, ACE_Message_Block (datasz), -1);
454 mb->copy ((const char*)buf, datasz);
455 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
456 if (this->putq (mb, &nowait) == -1)
458 INET_ERROR (1, (LM_ERROR, DLINFO
459 ACE_TEXT ("(%d) ACE_IOS_StreamHandler - discarding output data, "),
460 ACE_TEXT ("enqueue failed\n"),
461 ACE_OS::last_error ()));
462 mb->release ();
463 return 0;
466 ACE_Time_Value max_wait_time = this->sync_opt_.timeout ();
467 int result = 0;
469 if (use_reactor)
471 if (this->reactor ()->register_handler(this,
472 ACE_Event_Handler::WRITE_MASK) != 0)
474 return -1;
477 // run the event loop for the maximum allowed time to get the
478 // message data out
479 while (this->connected_)
481 // Run the event loop.
482 result = this->reactor ()->handle_events (this->use_timeout () ?
483 &max_wait_time : 0);
485 if (result == -1)
487 INET_ERROR (1, (LM_ERROR, DLINFO
488 ACE_TEXT ("(%d) ACE_IOS_StreamHandler::write_to_stream - ")
489 ACE_TEXT ("handle_events failed\n"),
490 ACE_OS::last_error ()));
493 // If we got our message out, no need to run the event loop any
494 // further.
495 if (this->msg_queue ()->is_empty ())
497 break;
500 // Did we timeout? If so, stop running the loop.
501 if (result == 0
502 && this->use_timeout ()
503 && max_wait_time == ACE_Time_Value::zero)
505 this->reactor ()->remove_handler (this, ACE_Event_Handler::WRITE_MASK);
506 this->send_timeout_ = true;
507 return ACE_Utils::truncate_cast<int>
508 (length - (this->msg_queue ()->message_bytes () / char_size));
511 // Other errors? If so, stop running the loop.
512 if (result == -1)
514 this->reactor ()->remove_handler (this, ACE_Event_Handler::WRITE_MASK);
515 return -1;
518 // Otherwise, keep going...
521 else
523 while (this->connected_)
525 result = this->handle_output_i (this->use_timeout () ?
526 &max_wait_time : 0);
528 // If we got our message out, no need to run the event loop any
529 // further.
530 if (this->msg_queue ()->is_empty ())
532 break;
535 // Did we timeout? If so, stop running the loop.
536 if (result == 0
537 && this->use_timeout ()
538 && max_wait_time == ACE_Time_Value::zero)
540 this->send_timeout_ = true;
541 return ACE_Utils::truncate_cast<int>
542 (length - (this->msg_queue ()->message_bytes () / char_size));
545 // Otherwise, keep going...
549 if (this->connected_)
550 return ACE_Utils::truncate_cast<int> (length); // all sent
551 else
552 return ACE_Utils::truncate_cast<int>
553 (length - (this->msg_queue ()->message_bytes () / char_size));
556 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
557 bool StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::is_connected () const
559 return this->connected_;
562 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
563 bool StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::using_reactor () const
565 return this->sync_opt_[ACE_Synch_Options::USE_REACTOR];
571 ACE_END_VERSIONED_NAMESPACE_DECL
573 #endif /* ACE_IOS_STREAM_HANDLER_CPP */