3 //=============================================================================
5 * @file HTBP_Channel.cpp
7 * @author Phil Mesnier, Priyanka Gontla
9 //=============================================================================
10 #include "HTBP_Channel.h"
12 #if !defined (__ACE_INLINE__)
13 #include "HTBP_Channel.inl"
16 #include "HTBP_Session.h"
17 #include "HTBP_Filter_Factory.h"
20 #include "ace/Message_Block.h"
21 #include "ace/Reactor.h"
22 #include "ace/os_include/netinet/os_tcp.h"
23 #include "ace/OS_NS_time.h"
25 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
27 // Initialization and termination methods.
29 ACE::HTBP::Channel::Channel (ACE::HTBP::Session
*s
)
40 ACE_NEW (this->notifier_
,ACE::HTBP::Notifier(this));
41 this->filter_
= ACE::HTBP::Filter_Factory::get_filter (this->session_
!= 0);
42 this->request_count_
= static_cast<unsigned long> (ACE_OS::time());
45 /// Constructor, takes ownership of the supplied stream
46 ACE::HTBP::Channel::Channel (ACE_SOCK_Stream
&s
)
49 ace_stream_ (s
.get_handle()),
58 #if !defined (ACE_LACKS_TCP_NODELAY)
60 int result
= this->ace_stream_
.set_option (ACE_IPPROTO_TCP
,
65 ACE_DEBUG ((LM_DEBUG
, "HTBP::Channel ctor(stream), %p\n", "set_option" ));
66 #endif /* ! ACE_LACKS_TCP_NODELAY */
68 this->filter_
= ACE::HTBP::Filter_Factory::get_filter (this->session_
!= 0);
69 this->request_count_
= static_cast<unsigned long> (ACE_OS::time());
72 ACE::HTBP::Channel::Channel (ACE_HANDLE h
)
83 #if !defined (ACE_LACKS_TCP_NODELAY)
85 int result
= this->ace_stream_
.set_option (ACE_IPPROTO_TCP
,
90 ACE_DEBUG ((LM_DEBUG
, "HTBP::Channel(handle) ctor, %p\n", "set_option" ));
91 #endif /* ! ACE_LACKS_TCP_NODELAY */
93 this->filter_
= ACE::HTBP::Filter_Factory::get_filter (this->session_
!= 0);
94 this->request_count_
= static_cast<unsigned long> (ACE_OS::time());
98 ACE::HTBP::Channel::~Channel ()
100 delete this->notifier_
;
101 delete this->filter_
;
104 /// Dump the state of an object.
106 ACE::HTBP::Channel::dump () const
111 ACE::HTBP::Channel::request_count ()
113 return this->request_count_
++;
117 ACE::HTBP::Channel::register_notifier (ACE_Reactor
*r
)
121 if (this->notifier_
== 0)
123 ACE_NEW (this->notifier_
,ACE::HTBP::Notifier(this));
127 if (notifier_
->get_handle() == ACE_INVALID_HANDLE
)
129 delete this->notifier_
;
130 ACE_NEW (this->notifier_
,ACE::HTBP::Notifier(this));
134 r
->register_handler(notifier_
,ACE_Event_Handler::READ_MASK
);
137 ACE::HTBP::Notifier
*
138 ACE::HTBP::Channel::notifier ()
140 return this->notifier_
;
144 ACE::HTBP::Channel::get_handle () const
146 return this->ace_stream_
.get_handle ();
150 ACE::HTBP::Channel::data_consumed (size_t n
)
152 this->data_consumed_
+= n
;
153 if (this->data_consumed_
== this->data_len_
)
155 this->filter_
->recv_data_trailer(this);
156 this->filter_
->send_ack(this);
161 ACE::HTBP::Channel::load_buffer ()
163 this->leftovers_
.crunch();
164 if (this->state() == Detached
||
165 this->state() == Ack_Sent
)
168 this->data_consumed_
= 0;
174 if (this->session_
&&
175 (this->session_
->sock_flags() & ACE_NONBLOCK
== ACE_NONBLOCK
))
179 ACE::handle_read_ready (this->ace_stream().get_handle(),
180 &ACE_Time_Value::zero
);
181 if (nread
== -1 && errno
== ETIME
)
185 nread
= this->ace_stream().recv (this->leftovers_
.wr_ptr(),
186 this->leftovers_
.space()-1);
189 if (nread
== 0 || (errno
!= EWOULDBLOCK
&& errno
!= EAGAIN
))
191 this->state_
= Closed
;
193 ACE_ERROR ((LM_ERROR
,
194 "load_buffer[%d] %p\n",
195 this->ace_stream_
.get_handle(),"recv"));
200 this->leftovers_
.wr_ptr(nread
);
201 *this->leftovers_
.wr_ptr() = '\0';
206 ACE::HTBP::Channel::flush_buffer ()
209 return this->session_
->flush_outbound_queue();
214 ACE::HTBP::Channel::send_ack ()
216 return this->filter_
->send_ack(this);
220 ACE::HTBP::Channel::recv_ack ()
222 if (load_buffer() == -1)
224 return this->filter_
->recv_ack(this);
228 ACE::HTBP::Channel::state (ACE::HTBP::Channel::State s
)
232 this->session_
->detach(this);
239 ACE::HTBP::Channel::consume_error ()
241 if (error_buffer_
== 0)
243 ACE_NEW_RETURN (error_buffer_
,
244 ACE_Message_Block (this->data_len_
+ 1),
249 size_t n
= error_buffer_
->size();
250 char *buf
= error_buffer_
->wr_ptr();
252 if (this->leftovers_
.length() > 0)
254 result
= ACE_MIN (n
,this->leftovers_
.length());
255 ACE_OS::memcpy (buf
,this->leftovers_
.rd_ptr(), result
);
256 this->leftovers_
.rd_ptr(result
);
260 if (result
< (ssize_t
)n
&&
261 result
< (ssize_t
)data_len_
)
264 result
+= this->ace_stream_
.recv(buf
, n
);
268 this->error_buffer_
->wr_ptr(result
);
269 this->data_consumed_
+= result
;
270 if (this->data_consumed_
== this->data_len_
)
272 *this->error_buffer_
->wr_ptr() = '\0';
274 ACE_DEBUG ((LM_DEBUG
,
275 ACE_TEXT ("ACE::HTBP::Channel::consume_error ")
276 ACE_TEXT("Received entire error buffer: \n%s\n"),
277 this->error_buffer_
->rd_ptr()));
278 delete error_buffer_
;
287 //---------------------------------------------------------------------------
290 /// The ACE::HTBP::Channel is a sibling of the ACE_SOCK_IO class, rather than a
291 /// decendant. This is due to the requirement to wrap all messages with
292 /// an HTTP request or reply wrapper, and to send application data in only
293 /// one direction on one stream.
296 ACE::HTBP::Channel::pre_recv()
299 ACE_DEBUG ((LM_DEBUG
,
300 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
301 ACE_TEXT ("in initial state = %d\n"),state_
));
302 if (this->state_
== Init
||
303 this->state_
== Detached
||
304 this->state_
== Header_Pending
||
305 this->state_
== Ack_Sent
)
307 if (this->load_buffer() == -1 && this->leftovers_
.length() == 0)
309 if (errno
!= EWOULDBLOCK
)
310 this->state_
= Closed
;
312 ACE_DEBUG ((LM_DEBUG
,
313 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
314 ACE_TEXT ("pre_recv returning -1, state = %d, %p\n"),
315 state_
, ACE_TEXT("load_buffer()")));
318 if (this->filter_
->recv_data_header(this) == -1)
319 ACE_ERROR ((LM_ERROR
,
320 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
321 ACE_TEXT ("recv_data_header failed, %p\n"),
322 ACE_TEXT("pre_recv")));
324 switch (this->state_
)
335 ACE_DEBUG ((LM_DEBUG
,
336 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
337 ACE_TEXT("channel[%d] state = %d, %p\n"),
340 ACE_TEXT("pre_recv")));
345 /// Recv an <n> byte buffer from the connected socket.
347 ACE::HTBP::Channel::recv (void *buf
,
350 const ACE_Time_Value
*timeout
)
353 if (this->pre_recv() == -1 && this->leftovers_
.length() == 0)
355 if (this->leftovers_
.length() > 0)
357 result
= ACE_MIN (n
,this->leftovers_
.length());
358 ACE_OS::memcpy (buf
,this->leftovers_
.rd_ptr(), result
);
359 this->leftovers_
.rd_ptr(result
);
360 buf
= (char *)buf
+ result
;
363 if (result
< (ssize_t
)n
&&
364 result
< (ssize_t
)data_len_
)
367 result
+= this->ace_stream_
.recv(buf
, n
, flags
, timeout
);
370 data_consumed((size_t)result
);
374 /// Recv an <n> byte buffer from the connected socket.
376 ACE::HTBP::Channel::recv (void *buf
,
378 const ACE_Time_Value
*timeout
)
381 if (this->pre_recv() == -1)
385 if (this->leftovers_
.length() > 0)
387 result
= ACE_MIN (n
,this->leftovers_
.length());
388 ACE_OS::memcpy (buf
,this->leftovers_
.rd_ptr(), result
);
389 this->leftovers_
.rd_ptr(result
);
390 buf
= (char *)buf
+ result
;
393 if ((size_t)result
< n
&& (size_t)result
< this->data_len())
396 result
+= this->ace_stream_
.recv(buf
, n
, timeout
);
400 this->data_consumed((size_t)result
);
404 /// Recv an <iovec> of size <n> from the connected socket.
406 ACE::HTBP::Channel::recvv (iovec iov
[],
408 const ACE_Time_Value
*timeout
)
411 if (this->pre_recv() == -1)
414 if (this->leftovers_
.length())
417 iovec
*iov2
= new iovec
[iovcnt
];
418 std::unique_ptr
<iovec
[]> guard (iov2
);
419 for (int i
= 0; i
< iovcnt
; i
++)
421 size_t n
= ACE_MIN ((size_t) iov
[i
].iov_len
,
422 (size_t) this->leftovers_
.length());
425 ACE_OS::memcpy (iov
[i
].iov_base
,this->leftovers_
.rd_ptr(), n
);
426 this->leftovers_
.rd_ptr(n
);
429 if (n
< (size_t) iov
[i
].iov_len
)
431 iov2
[ndx
].iov_len
= iov
[i
].iov_len
- n
;
432 iov2
[ndx
].iov_base
= (char *)iov
[i
].iov_base
+ n
;
437 result
+= this->ace_stream_
.recvv(iov2
,ndx
,timeout
);
440 result
= this->ace_stream_
.recvv(iov
,iovcnt
,timeout
);
443 this->data_consumed((size_t)result
);
448 ACE::HTBP::Channel::recvv (iovec
*io_vec
,
449 const ACE_Time_Value
*timeout
)
452 if (this->pre_recv() == -1)
455 ACE_DEBUG ((LM_DEBUG
,
456 ACE_TEXT ("ACE::HTBP::Channel::recvv ")
457 ACE_TEXT("recvv, leftover len = %d\n"),
458 this->leftovers_
.length()));
459 if (this->leftovers_
.length())
461 io_vec
->iov_base
= 0;
463 ACE_NEW_RETURN (io_vec
->iov_base
,
464 char[this->leftovers_
.length()],-1);
465 io_vec
->iov_len
= this->leftovers_
.length();
466 ACE_OS::memcpy (io_vec
->iov_base
,
467 this->leftovers_
.rd_ptr(),
469 result
= io_vec
->iov_len
;
470 this->leftovers_
.length(0);
473 result
= this->ace_stream_
.recvv(io_vec
,timeout
);
476 this->data_consumed((size_t)result
);
481 ACE::HTBP::Channel::send (const void *buf
,
484 const ACE_Time_Value
*timeout
)
487 if (this->filter_
->send_data_header(n
,this) == -1)
489 result
= this->ace_stream_
.send(buf
,n
,flags
,timeout
);
492 if (this->filter_
->send_data_trailer(this) == -1)
498 ACE::HTBP::Channel::send (const void *buf
,
500 const ACE_Time_Value
*timeout
)
503 if (this->filter_
== 0)
504 ACE_ERROR_RETURN ((LM_ERROR
,
505 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::send: filter ")
506 ACE_TEXT ("is null\n")),-1);
507 if (this->filter_
->send_data_header(n
,this) == -1)
509 result
= this->ace_stream_
.send (buf
,n
,timeout
);
512 if (this->filter_
->send_data_trailer(this) == -1)
518 ACE::HTBP::Channel::sendv (const iovec iov
[],
520 const ACE_Time_Value
*timeout
)
522 if (this->ace_stream_
.get_handle() == ACE_INVALID_HANDLE
)
523 this->session_
->inbound();
528 for (int i
= 0; i
< iovcnt
; n
+= iov
[i
++].iov_len
)
533 if (this->filter_
->send_data_header(n
,this) == -1)
534 ACE_ERROR_RETURN ((LM_ERROR
,
535 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
537 ACE_TEXT("send_data_header")),-1);
539 result
= this->ace_stream_
.sendv (iov
,iovcnt
,timeout
);
542 ACE_ERROR_RETURN ((LM_ERROR
,
543 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
545 ACE_TEXT("ace_stream_.sendv")),-1);
547 if (this->filter_
->send_data_trailer(this) == -1)
548 ACE_ERROR_RETURN ((LM_ERROR
,
549 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
551 ACE_TEXT("send_data_trailer\n")),-1);
556 ACE::HTBP::Channel::enable (int value
) const
558 this->ace_stream_
.enable(value
);
560 return 0; //this->ace_stream_.enable(value);
564 ACE::HTBP::Channel::disable (int value
) const
566 this->ace_stream_
.disable(value
);
568 return 0;//this->ace_stream_.disable(value);
571 ACE_END_VERSIONED_NAMESPACE_DECL