2 #include "ace/OS_NS_unistd.h"
3 #include "ace/Reactor.h"
9 #include "jaws3/Jaws_IO.h"
10 #include "jaws3/Reactive_IO.h"
11 #include "jaws3/Reactive_IO_Helpers.h"
12 #include "jaws3/Event_Completer.h"
15 JAWS_Reactive_IO::send ( ACE_HANDLE handle
16 , ACE_Message_Block
*mb
17 , JAWS_Event_Completer
*completer
18 , const ACE_Time_Value
&tv
22 if (mb
->length () == 0)
24 JAWS_Event_Result
io_result ( 0
25 , JAWS_Event_Result::JE_OK
26 , JAWS_Event_Result::JE_SEND_OK
29 completer
->output_complete (io_result
, act
);
34 JAWS_IO_Reactive_Send
*rs
;
35 rs
= JAWS_IO_Reactive_Send::make (handle
, mb
, completer
, tv
, act
);
39 JAWS_Event_Result
io_result ( 0
40 , JAWS_Event_Result::JE_ERROR
41 , JAWS_Event_Result::JE_SEND_FAIL
44 completer
->output_complete (io_result
, act
);
53 JAWS_Reactive_IO::send ( ACE_HANDLE handle
54 , ACE_Message_Block
*mb
55 , JAWS_Event_Completer
*completer
59 this->send (handle
, mb
, completer
, ACE_Time_Value::zero
, act
);
64 JAWS_Reactive_IO::recv ( ACE_HANDLE handle
65 , ACE_Message_Block
*mb
66 , JAWS_Event_Completer
*completer
67 , const ACE_Time_Value
&tv
71 JAWS_IO_Reactive_Recv
*rr
;
72 rr
= JAWS_IO_Reactive_Recv::make (handle
, mb
, completer
, tv
, act
);
76 JAWS_Event_Result
io_result ( 0
77 , JAWS_Event_Result::JE_ERROR
78 , JAWS_Event_Result::JE_RECV_FAIL
81 completer
->output_complete (io_result
, act
);
91 JAWS_Reactive_IO::recv ( ACE_HANDLE handle
92 , ACE_Message_Block
*mb
93 , JAWS_Event_Completer
*completer
97 this->recv (handle
, mb
, completer
, ACE_Time_Value::zero
, act
);
102 JAWS_Reactive_IO::transmit ( ACE_HANDLE handle
104 , JAWS_Event_Completer
*completer
105 , const ACE_Time_Value
&tv
107 , ACE_Message_Block
*header
108 , ACE_Message_Block
*trailer
111 JAWS_IO_Reactive_Transmit
*rt
;
112 rt
= JAWS_IO_Reactive_Transmit::make ( handle
123 JAWS_Event_Result
io_result ( 0
124 , JAWS_Event_Result::JE_ERROR
125 , JAWS_Event_Result::JE_TRANSMIT_FAIL
128 completer
->output_complete (io_result
, act
);
138 JAWS_Reactive_IO::transmit ( ACE_HANDLE handle
140 , JAWS_Event_Completer
*completer
142 , ACE_Message_Block
*header
143 , ACE_Message_Block
*trailer
146 this->transmit ( handle
149 , ACE_Time_Value::zero
158 JAWS_IO_Reactive_Handler::open ()
160 int result
= ACE_Reactor::instance ()->notify (this);
163 this->close (result
);
167 JAWS_IO_Reactive_Handler::close (int result
)
171 if (ACE_BIT_ENABLED (this->mask_
, ACE_Event_Handler::WRITE_MASK
))
173 JAWS_Event_Result
io_result ( 0
174 , JAWS_Event_Result::JE_ERROR
175 , JAWS_Event_Result::JE_SEND_FAIL
177 this->io_result_
= io_result
;
179 else if (ACE_BIT_ENABLED (this->mask_
, ACE_Event_Handler::READ_MASK
))
181 JAWS_Event_Result
io_result ( 0
182 , JAWS_Event_Result::JE_ERROR
183 , JAWS_Event_Result::JE_RECV_FAIL
185 this->io_result_
= io_result
;
188 this->handle_close (this->handle_
, this->mask_
);
193 JAWS_IO_Reactive_Handler::handle_timeout (const ACE_Time_Value
&, const void *)
195 if (this->was_active_
)
197 this->was_active_
= 0;
200 ACE_Reactor::instance ()->schedule_timer (this, 0, this->tv_
);
205 ACE_Reactor::instance ()
206 ->remove_handler ( this
207 , ACE_Event_Handler::RWE_MASK
|ACE_Event_Handler::DONT_CALL
210 this->timer_id_
= -1;
212 if (ACE_BIT_ENABLED (this->mask_
, ACE_Event_Handler::WRITE_MASK
))
214 JAWS_Event_Result
io_result ( 0
215 , JAWS_Event_Result::JE_ERROR
216 , JAWS_Event_Result::JE_SEND_TIMEOUT
219 this->io_result_
= io_result
;
221 else if (ACE_BIT_ENABLED (this->mask_
, ACE_Event_Handler::READ_MASK
))
223 JAWS_Event_Result
io_result ( 0
224 , JAWS_Event_Result::JE_ERROR
225 , JAWS_Event_Result::JE_RECV_TIMEOUT
228 this->io_result_
= io_result
;
235 JAWS_IO_Reactive_Handler::handle_close (ACE_HANDLE
, ACE_Reactor_Mask
)
237 if (this->completer_
)
239 if (ACE_BIT_ENABLED (this->mask_
, ACE_Event_Handler::WRITE_MASK
))
240 this->completer_
->output_complete (this->io_result_
, this->act_
);
241 else if (ACE_BIT_ENABLED (this->mask_
, ACE_Event_Handler::READ_MASK
))
242 this->completer_
->input_complete (this->io_result_
, this->act_
);
245 ACE_Reactor::instance ()
246 ->remove_handler ( this
247 , ACE_Event_Handler::RWE_MASK
|ACE_Event_Handler::DONT_CALL
255 JAWS_IO_Reactive_Handler::handle_exception (ACE_HANDLE handle
)
257 if (handle
== ACE_INVALID_HANDLE
)
259 // We are being called back from a notify call.
260 // This is our cue to register ourselves with the Reactor.
264 ACE_Reactor::instance ()
265 ->register_handler (this, this->mask_
|ACE_Event_Handler::EXCEPT_MASK
);
268 this->close (result
);
273 // back to our regularly scheduled except mask handling.
275 if (ACE_BIT_ENABLED (this->mask_
, ACE_Event_Handler::WRITE_MASK
))
277 JAWS_Event_Result
io_result ( this->bytes_
278 , JAWS_Event_Result::JE_ERROR
279 , JAWS_Event_Result::JE_SEND_SHORT
281 this->io_result_
= io_result
;
283 else if (ACE_BIT_ENABLED (this->mask_
, ACE_Event_Handler::READ_MASK
))
285 JAWS_Event_Result
io_result ( this->bytes_
286 , JAWS_Event_Result::JE_ERROR
287 , JAWS_Event_Result::JE_RECV_SHORT
289 this->io_result_
= io_result
;
297 JAWS_IO_Reactive_Send::handle_output (ACE_HANDLE handle
)
299 this->was_active_
= 1;
301 ssize_t count
= ACE::send ( handle
302 , this->mb_
->rd_ptr ()
303 , this->mb_
->length ()
306 if (count
<= 0 && this->bytes_
== 0)
308 JAWS_Event_Result
io_result ( 0
309 , JAWS_Event_Result::JE_ERROR
310 , JAWS_Event_Result::JE_SEND_FAIL
312 this->io_result_
= io_result
;
314 else if (count
<= 0 && this->bytes_
> 0)
316 JAWS_Event_Result
io_result ( this->bytes_
317 , JAWS_Event_Result::JE_ERROR
318 , JAWS_Event_Result::JE_SEND_SHORT
320 this->io_result_
= io_result
;
325 this->mb_
->rd_ptr (count
);
327 this->bytes_
+= count
;
329 JAWS_Event_Result
io_result ( this->bytes_
330 , JAWS_Event_Result::JE_OK
331 , JAWS_Event_Result::JE_SEND_OK
333 this->io_result_
= io_result
;
336 if (count
<= 0 || this->mb_
->length () == 0)
339 // Not done yet, so stay registered.
345 JAWS_IO_Reactive_Recv::handle_input (ACE_HANDLE handle
)
347 ssize_t count
= ACE::recv ( handle
348 , this->mb_
->wr_ptr ()
349 , this->mb_
->space ()
354 JAWS_Event_Result
io_result ( 0
355 , JAWS_Event_Result::JE_ERROR
356 , JAWS_Event_Result::JE_RECV_FAIL
358 this->io_result_
= io_result
;
363 this->mb_
->wr_ptr (count
);
365 JAWS_Event_Result
io_result ( count
366 , JAWS_Event_Result::JE_OK
367 , JAWS_Event_Result::JE_RECV_OK
369 this->io_result_
= io_result
;
377 JAWS_IO_Reactive_Transmit::handle_output (ACE_HANDLE handle
)
379 this->was_active_
= 1;
381 if (this->header_
&& this->header_
->length () > 0)
382 return this->handle_output_header (handle
);
386 if (this->source_
!= ACE_INVALID_HANDLE
)
387 return this->handle_output_source (handle
);
389 if (this->trailer_
&& this->trailer_
->length () > 0)
390 return this->handle_output_trailer (handle
);
394 JAWS_Event_Result
io_result ( this->bytes_
395 , JAWS_Event_Result::JE_OK
396 , JAWS_Event_Result::JE_TRANSMIT_OK
398 this->io_result_
= io_result
;
404 JAWS_IO_Reactive_Transmit::handle_output_header (ACE_HANDLE handle
)
406 return this->handle_output_mb (handle
, this->header_
);
410 JAWS_IO_Reactive_Transmit::handle_output_source (ACE_HANDLE handle
)
412 ACE_Message_Block
*mb
= this->source_buf_
;
417 // Try to read data into the mb if data is still available.
418 if (mb
->space () && this->source_
!= ACE_INVALID_HANDLE
)
421 count
= ACE_OS::read (this->source_
, mb
->wr_ptr (), mb
->space ());
425 this->source_
= ACE_INVALID_HANDLE
;
426 this->source_buf_
= 0;
428 if (this->bytes_
== 0)
430 JAWS_Event_Result
io_result ( 0
431 , JAWS_Event_Result::JE_ERROR
432 , JAWS_Event_Result::JE_TRANSMIT_FAIL
434 this->io_result_
= io_result
;
436 else if (this->bytes_
> 0)
438 JAWS_Event_Result
io_result ( this->bytes_
439 , JAWS_Event_Result::JE_ERROR
440 , JAWS_Event_Result::JE_TRANSMIT_SHORT
442 this->io_result_
= io_result
;
448 this->source_
= ACE_INVALID_HANDLE
;
455 if (mb
->length () > 0)
456 result
= this->handle_output_mb (handle
, mb
);
460 this->source_
= ACE_INVALID_HANDLE
;
461 this->source_buf_
= 0;
463 else if (mb
== 0 && this->source_
== ACE_INVALID_HANDLE
)
464 this->source_buf_
= 0;
466 this->source_buf_
->crunch ();
472 JAWS_IO_Reactive_Transmit::handle_output_trailer (ACE_HANDLE handle
)
474 int result
= this->handle_output_mb (handle
, this->trailer_
);
476 if (result
== 0 && this->trailer_
== 0)
478 JAWS_Event_Result
io_result ( this->bytes_
479 , JAWS_Event_Result::JE_ERROR
480 , JAWS_Event_Result::JE_TRANSMIT_SHORT
482 this->io_result_
= io_result
;
490 JAWS_IO_Reactive_Transmit::handle_output_mb ( ACE_HANDLE handle
491 , ACE_Message_Block
*&mb
494 ssize_t count
= ACE::send (handle
, mb
->rd_ptr (), mb
->length ());
496 if (count
<= 0 && this->bytes_
== 0)
498 JAWS_Event_Result
io_result ( 0
499 , JAWS_Event_Result::JE_ERROR
500 , JAWS_Event_Result::JE_TRANSMIT_FAIL
502 this->io_result_
= io_result
;
504 else if (count
<= 0 && this->bytes_
> 0)
506 JAWS_Event_Result
io_result ( this->bytes_
507 , JAWS_Event_Result::JE_ERROR
508 , JAWS_Event_Result::JE_TRANSMIT_SHORT
510 this->io_result_
= io_result
;
515 this->bytes_
+= count
;
521 if (mb
->length () == 0)
528 JAWS_IO_Reactive_Transmit::close (int result
)
532 JAWS_Event_Result
io_result ( 0
533 , JAWS_Event_Result::JE_ERROR
534 , JAWS_Event_Result::JE_TRANSMIT_FAIL
536 this->io_result_
= io_result
;
538 this->handle_close (this->handle_
, this->mask_
);
543 JAWS_IO_Reactive_Transmit::handle_timeout (const ACE_Time_Value
&, const void *)
545 if (this->was_active_
)
547 this->was_active_
= 0;
550 ACE_Reactor::instance ()->schedule_timer (this, 0, this->tv_
);
555 ACE_Reactor::instance ()
556 ->remove_handler ( this
557 , ACE_Event_Handler::RWE_MASK
|ACE_Event_Handler::DONT_CALL
560 this->timer_id_
= -1;
562 JAWS_Event_Result
io_result ( 0
563 , JAWS_Event_Result::JE_ERROR
564 , JAWS_Event_Result::JE_TRANSMIT_TIMEOUT
568 this->io_result_
= io_result
;
574 JAWS_IO_Reactive_Transmit::handle_exception (ACE_HANDLE handle
)
576 if (handle
== ACE_INVALID_HANDLE
)
578 // We are being called back from a notify call.
579 // This is our cue to register ourselves with the Reactor.
583 ACE_Reactor::instance ()
584 ->register_handler (this, this->mask_
|ACE_Event_Handler::EXCEPT_MASK
);
587 this->close (result
);
592 // back to our regularly scheduled except mask handling.
594 JAWS_Event_Result
io_result ( this->bytes_
595 , JAWS_Event_Result::JE_ERROR
596 , JAWS_Event_Result::JE_TRANSMIT_SHORT
598 this->io_result_
= io_result
;