1 #ifndef ACE_IOS_STREAM_HANDLER_CPP
2 #define ACE_IOS_STREAM_HANDLER_CPP
4 #include "ace/INet/INet_Log.h"
5 #include "ace/INet/StreamHandler.h"
6 #include "ace/OS_NS_Thread.h"
7 #include "ace/OS_NS_errno.h"
8 #include "ace/Countdown_Time.h"
9 #include "ace/Truncate.h"
11 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
17 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
18 StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::StreamHandler (
19 const ACE_Synch_Options
&synch_options
,
20 ACE_Thread_Manager
*thr_mgr
,
23 : ACE_Svc_Handler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
> (thr_mgr
, mq
, reactor
),
25 send_timeout_ (false),
26 receive_timeout_ (false),
27 notification_strategy_ (reactor
,
29 ACE_Event_Handler::WRITE_MASK
)
31 INET_TRACE ("ACE_IOS_StreamHandler - ctor");
33 unsigned long opt
= synch_options
[ACE_Synch_Options::USE_REACTOR
] ?
34 ACE_Synch_Options::USE_REACTOR
: 0;
35 if (synch_options
[ACE_Synch_Options::USE_TIMEOUT
])
36 opt
|= ACE_Synch_Options::USE_TIMEOUT
;
37 this->sync_opt_
.set (opt
,
38 synch_options
.timeout (),
39 synch_options
.arg ());
42 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
43 StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::~StreamHandler ()
45 INET_TRACE ("ACE_IOS_StreamHandler - dtor");
47 this->connected_
= false;
50 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
51 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::open (void * /*p*/)
53 this->connected_
= true;
57 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
58 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::close (u_long flags
)
60 this->connected_
= false;
61 return base_type::close (flags
);
64 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
65 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::handle_input (ACE_HANDLE
)
67 // always read non-blocking however much there is
68 ACE_Time_Value to
= ACE_Time_Value::zero
;
69 return this->handle_input_i (MAX_INPUT_SIZE
, &to
);
72 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
73 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::handle_input_i (size_t rdlen
, ACE_Time_Value
* timeout
)
75 INET_TRACE ("ACE_IOS_StreamHandler::handle_input_i");
77 char buffer
[MAX_INPUT_SIZE
];
81 // blocking (with or without timeout) or non-blocking?
82 bool no_wait
= timeout
&& (*timeout
== ACE_Time_Value::zero
);
84 recv_cnt
= this->peer ().recv_n (buffer
,
85 rdlen
<= sizeof(buffer
) ? rdlen
: sizeof(buffer
),
91 INET_HEX_DUMP (11, (LM_DEBUG
, buffer
, bytes_in
, DLINFO
92 ACE_TEXT ("ACE_IOS_StreamHandler::handle_input_i <--")));
94 ACE_Message_Block
*mb
= 0;
95 ACE_NEW_RETURN (mb
, ACE_Message_Block (bytes_in
), -1);
96 mb
->copy (buffer
, bytes_in
);
97 ACE_Time_Value
nowait (ACE_OS::gettimeofday ());
98 if (this->putq (mb
, &nowait
) == -1)
100 INET_ERROR (1, (LM_ERROR
, DLINFO
101 ACE_TEXT ("ACE_IOS_StreamHandler - discarding input data, "),
102 ACE_TEXT ("enqueue failed (%d)\n"),
103 ACE_OS::last_error ()));
105 this->connected_
= false;
110 if (recv_cnt
== 0 || (recv_cnt
< 0 && !no_wait
))
114 INET_ERROR (1, (LM_ERROR
, DLINFO
115 ACE_TEXT ("ACE_IOS_StreamHandler - receive failed (%d)\n"),
116 ACE_OS::last_error ()));
118 this->connected_
= false;
119 return this->using_reactor () ? -1 : 0;
124 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
125 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::handle_output (ACE_HANDLE
)
127 if (this->use_timeout ())
129 ACE_Time_Value to
= this->sync_opt_
.timeout ();
130 return this->handle_output_i (&to
);
133 return this->handle_output_i (0);
136 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
137 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::handle_output_i (ACE_Time_Value
* timeout
)
139 INET_TRACE ("ACE_IOS_StreamHandler::handle_output_i");
141 ACE_Message_Block
*mb
= 0;
142 ACE_Time_Value
nowait (ACE_OS::gettimeofday ());
143 size_t bytes_out
= 0;
144 if (-1 != this->getq (mb
, &nowait
))
147 this->peer ().send_n (mb
->rd_ptr (), mb
->length (), timeout
, &bytes_out
);
150 INET_HEX_DUMP (11, (LM_DEBUG
, mb
->rd_ptr (), bytes_out
, DLINFO
151 ACE_TEXT ("ACE_IOS_StreamHandler::handle_output_i -->")));
153 mb
->rd_ptr (static_cast<size_t> (bytes_out
));
154 if (mb
->length () > 0)
161 INET_ERROR (1, (LM_ERROR
, DLINFO
162 ACE_TEXT ("%p; ACE_IOS_StreamHandler - "),
163 ACE_TEXT ("send failed\n")));
164 this->connected_
= false;
165 return this->using_reactor () ? -1 : 0;
168 return (this->msg_queue ()->is_empty ()) ? -1 : 0;
171 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
172 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::read_from_stream (
177 INET_TRACE ("ACE_IOS_StreamHandler::read_from_stream");
179 size_t recv_char_count
= 0;
180 char* wptr
= (char*)buf
;
181 size_t char_length
= length
* char_size
;
182 ACE_Time_Value max_wait_time
= this->sync_opt_
.timeout ();
184 if (this->using_reactor ())
187 this->reactor ()->owner (&tid
);
188 bool reactor_thread
=
189 ACE_OS::thr_equal (ACE_Thread::self (), tid
) ? true : false;
191 if (this->connected_
)
193 if (this->reactor ()->register_handler(this,
194 ACE_Event_Handler::READ_MASK
) != 0)
200 // run the event loop for the maximum allowed time to get the
202 while ((this->connected_
|| this->char_in_queue (char_size
)) && char_length
> 0)
205 if (reactor_thread
&& !this->char_in_queue (char_size
))
207 // Run the event loop.
208 result
= this->reactor ()->handle_events (this->use_timeout () ?
214 result
= this->process_input (&wptr
[recv_char_count
],
217 this->use_timeout () ?
223 this->reactor ()->remove_handler (this,
224 ACE_Event_Handler::READ_MASK
);
228 recv_char_count
+= result
;
230 if (recv_char_count
> 0)
235 if (this->use_timeout () &&
236 max_wait_time
== ACE_Time_Value::zero
)
238 this->reactor ()->remove_handler (this,
239 ACE_Event_Handler::READ_MASK
);
240 this->receive_timeout_
= true;
245 this->reactor ()->remove_handler (this,
246 ACE_Event_Handler::READ_MASK
);
251 // the first read we will try to read as much as possible
253 // if that does not result in any data the next read will be
254 // blocking for 1 char_size data
255 size_t rdlen
= MAX_INPUT_SIZE
;
256 ACE_Time_Value timeout
= ACE_Time_Value::zero
;
257 ACE_Time_Value
* to
= &timeout
;
258 while ((this->connected_
|| this->char_in_queue (char_size
)) && char_length
> 0)
260 if (!this->char_in_queue (char_size
))
262 // nothing in queue, so see if there is anything newly arrived
263 result
= this->handle_input_i (rdlen
, to
);
269 result
= this->process_input (&wptr
[recv_char_count
],
272 this->use_timeout () ?
278 recv_char_count
+= result
;
280 if (recv_char_count
> 0)
282 // if we got any char_size data (either newly read
283 // or remainder from queue) we quit
287 if (this->use_timeout () &&
288 max_wait_time
== ACE_Time_Value::zero
)
290 this->receive_timeout_
= true;
294 if (this->connected_
&& char_length
>0)
296 // nothing has been read the first time round
297 // now start blocking read 1 char_size data at a time
299 to
= this->use_timeout () ? &max_wait_time
: 0;
304 return ACE_Utils::truncate_cast
<int> (recv_char_count
/ char_size
);
307 // This method makes sure to only ever copy full char_size elements
308 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
309 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::process_input (
313 ACE_Time_Value
* timeout
)
315 INET_TRACE ("ACE_IOS_StreamHandler::process_input");
317 ACE_Time_Value
wait (ACE_OS::gettimeofday ());
318 // keep track of how much time we use here
319 ACE_Countdown_Time
timeout_countdown (timeout
);
320 // if timeout specified add it to the abs waittime
321 // otherwise it's a 'nowait'
325 timeout_countdown
.start ();
327 ACE_Message_Block
*mb_remain
= 0;
328 size_t recv_char_count
= 0;
329 while (!this->msg_queue ()->is_empty () && char_length
> 0)
331 ACE_Message_Block
*mb
= 0;
332 if (this->getq (mb
, &wait
) == -1)
334 if (ACE_OS::last_error () == EWOULDBLOCK
)
335 break; // timeout; queue still empty
337 return -1; // message queue shut down
344 if ((mb_remain
->length () + mb
->length ()) < char_size
)
346 ACE_Message_Block
*mb_new
= 0;
347 ACE_NEW_NORETURN (mb
,
348 ACE_Message_Block (mb_remain
->length () + mb
->length ()));
352 mb_remain
->release ();
353 return -1; // out of memory error
355 mb_new
->copy (mb_remain
->rd_ptr (), mb_remain
->length ());
356 mb_remain
->release ();
357 mb_new
->copy (mb
->rd_ptr (), mb
->length ());
360 continue; // check for next msg block
363 copy_len
= (mb_remain
->length () > char_length
) ?
365 mb_remain
->length ();
366 ACE_OS::memmove (&buf
[recv_char_count
],
367 mb_remain
->rd_ptr (),
369 char_length
-= copy_len
;
370 recv_char_count
+= copy_len
;
371 mb_remain
->rd_ptr (copy_len
);
372 if (mb_remain
->length () > 0)
374 continue; // buffer is full
377 // cleanup empty block
378 mb_remain
->release ();
382 // normalize to total nr of char_size elements available in mb [+ mb_remain]
383 size_t total_char_len
= ((mb
->length () + copy_len
)/ char_size
) * char_size
;
384 // what was the max we could copy?
385 size_t max_copy_len
= (total_char_len
> char_length
) ?
388 // subtract what we possibly already copied from mb_remain
389 copy_len
= max_copy_len
- copy_len
;
391 ACE_OS::memmove (&buf
[recv_char_count
],
394 recv_char_count
+= copy_len
;
395 char_length
-= copy_len
;
396 mb
->rd_ptr (copy_len
);
397 if (mb
->length () > 0)
407 this->ungetq (mb_remain
);
412 // stop countdown; update timeout value
413 timeout_countdown
.stop ();
416 return ACE_Utils::truncate_cast
<int> (recv_char_count
);
419 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
420 bool StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::use_timeout () const
422 return this->sync_opt_
[ACE_Synch_Options::USE_TIMEOUT
];
425 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
426 bool StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::char_in_queue (size_t char_size
)
428 return this->msg_queue ()->message_bytes () >= char_size
;
431 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
432 int StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::write_to_stream (const void * buf
, size_t length
, size_t char_size
)
434 INET_TRACE ("ACE_IOS_StreamHandler::write_to_stream");
436 // check if we're allowed to control the reactor if reactive
437 bool use_reactor
= this->using_reactor ();
441 this->reactor ()->owner (&tid
);
443 ACE_OS::thr_equal (ACE_Thread::self (), tid
) ? true : false;
446 // set notification strategy if reactive
447 NotificationStrategyGuard
ns_guard__(*this,
449 &this->notification_strategy_
: 0);
451 size_t datasz
= length
* char_size
;
452 ACE_Message_Block
*mb
= 0;
453 ACE_NEW_RETURN (mb
, ACE_Message_Block (datasz
), -1);
454 mb
->copy ((const char*)buf
, datasz
);
455 ACE_Time_Value
nowait (ACE_OS::gettimeofday ());
456 if (this->putq (mb
, &nowait
) == -1)
458 INET_ERROR (1, (LM_ERROR
, DLINFO
459 ACE_TEXT ("(%d) ACE_IOS_StreamHandler - discarding output data, "),
460 ACE_TEXT ("enqueue failed\n"),
461 ACE_OS::last_error ()));
466 ACE_Time_Value max_wait_time
= this->sync_opt_
.timeout ();
471 if (this->reactor ()->register_handler(this,
472 ACE_Event_Handler::WRITE_MASK
) != 0)
477 // run the event loop for the maximum allowed time to get the
479 while (this->connected_
)
481 // Run the event loop.
482 result
= this->reactor ()->handle_events (this->use_timeout () ?
487 INET_ERROR (1, (LM_ERROR
, DLINFO
488 ACE_TEXT ("(%d) ACE_IOS_StreamHandler::write_to_stream - ")
489 ACE_TEXT ("handle_events failed\n"),
490 ACE_OS::last_error ()));
493 // If we got our message out, no need to run the event loop any
495 if (this->msg_queue ()->is_empty ())
500 // Did we timeout? If so, stop running the loop.
502 && this->use_timeout ()
503 && max_wait_time
== ACE_Time_Value::zero
)
505 this->reactor ()->remove_handler (this, ACE_Event_Handler::WRITE_MASK
);
506 this->send_timeout_
= true;
507 return ACE_Utils::truncate_cast
<int>
508 (length
- (this->msg_queue ()->message_bytes () / char_size
));
511 // Other errors? If so, stop running the loop.
514 this->reactor ()->remove_handler (this, ACE_Event_Handler::WRITE_MASK
);
518 // Otherwise, keep going...
523 while (this->connected_
)
525 result
= this->handle_output_i (this->use_timeout () ?
528 // If we got our message out, no need to run the event loop any
530 if (this->msg_queue ()->is_empty ())
535 // Did we timeout? If so, stop running the loop.
537 && this->use_timeout ()
538 && max_wait_time
== ACE_Time_Value::zero
)
540 this->send_timeout_
= true;
541 return ACE_Utils::truncate_cast
<int>
542 (length
- (this->msg_queue ()->message_bytes () / char_size
));
545 // Otherwise, keep going...
549 if (this->connected_
)
550 return ACE_Utils::truncate_cast
<int> (length
); // all sent
552 return ACE_Utils::truncate_cast
<int>
553 (length
- (this->msg_queue ()->message_bytes () / char_size
));
556 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
557 bool StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::is_connected () const
559 return this->connected_
;
562 template <ACE_PEER_STREAM_1
, ACE_SYNCH_DECL
>
563 bool StreamHandler
<ACE_PEER_STREAM
, ACE_SYNCH_USE
>::using_reactor () const
565 return this->sync_opt_
[ACE_Synch_Options::USE_REACTOR
];
571 ACE_END_VERSIONED_NAMESPACE_DECL
573 #endif /* ACE_IOS_STREAM_HANDLER_CPP */