Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / HTBP / HTBP_Channel.cpp
blobd3df409fc998d331b8e31ecc66d5f17ea6e0aecc
1 /* -*- C++ -*- */
3 //=============================================================================
4 /**
5 * @file HTBP_Channel.cpp
7 * @author Phil Mesnier, Priyanka Gontla
8 */
9 //=============================================================================
10 #include "HTBP_Channel.h"
12 #if !defined (__ACE_INLINE__)
13 #include "HTBP_Channel.inl"
14 #endif
16 #include "HTBP_Session.h"
17 #include "HTBP_Filter_Factory.h"
19 #include <memory>
20 #include "ace/Message_Block.h"
21 #include "ace/Reactor.h"
22 #include "ace/os_include/netinet/os_tcp.h"
23 #include "ace/OS_NS_time.h"
25 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
27 // Initialization and termination methods.
28 /// Constructor.
29 ACE::HTBP::Channel::Channel (ACE::HTBP::Session *s)
30 : filter_ (0),
31 session_ (s),
32 ace_stream_ (),
33 notifier_ (0),
34 leftovers_ (1001),
35 data_len_ (0),
36 data_consumed_ (0),
37 state_ (Init),
38 error_buffer_ (0)
40 ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));
41 this->filter_ = ACE::HTBP::Filter_Factory::get_filter (this->session_ != 0);
42 this->request_count_ = static_cast<unsigned long> (ACE_OS::time());
45 /// Constructor, takes ownership of the supplied stream
46 ACE::HTBP::Channel::Channel (ACE_SOCK_Stream &s)
47 : filter_ (0),
48 session_ (0),
49 ace_stream_ (s.get_handle()),
50 notifier_ (0),
51 leftovers_ (1001),
52 data_len_ (0),
53 data_consumed_ (0),
54 state_ (Init),
55 error_buffer_ (0)
58 #if !defined (ACE_LACKS_TCP_NODELAY)
59 int no_delay = 1;
60 int result = this->ace_stream_.set_option (ACE_IPPROTO_TCP,
61 TCP_NODELAY,
62 (void *) &no_delay,
63 sizeof (no_delay));
64 if (result == -1)
65 ACE_DEBUG ((LM_DEBUG, "HTBP::Channel ctor(stream), %p\n", "set_option" ));
66 #endif /* ! ACE_LACKS_TCP_NODELAY */
68 this->filter_ = ACE::HTBP::Filter_Factory::get_filter (this->session_ != 0);
69 this->request_count_ = static_cast<unsigned long> (ACE_OS::time());
72 ACE::HTBP::Channel::Channel (ACE_HANDLE h)
73 : filter_ (0),
74 session_ (0),
75 ace_stream_ (h),
76 notifier_ (0),
77 leftovers_ (1001),
78 data_len_ (0),
79 data_consumed_ (0),
80 state_ (Init),
81 error_buffer_ (0)
83 #if !defined (ACE_LACKS_TCP_NODELAY)
84 int no_delay = 1;
85 int result = this->ace_stream_.set_option (ACE_IPPROTO_TCP,
86 TCP_NODELAY,
87 (void *) &no_delay,
88 sizeof (no_delay));
89 if (result == -1)
90 ACE_DEBUG ((LM_DEBUG, "HTBP::Channel(handle) ctor, %p\n", "set_option" ));
91 #endif /* ! ACE_LACKS_TCP_NODELAY */
93 this->filter_ = ACE::HTBP::Filter_Factory::get_filter (this->session_ != 0);
94 this->request_count_ = static_cast<unsigned long> (ACE_OS::time());
97 /// Destructor.
98 ACE::HTBP::Channel::~Channel ()
100 delete this->notifier_;
101 delete this->filter_;
104 /// Dump the state of an object.
105 void
106 ACE::HTBP::Channel::dump () const
110 unsigned long
111 ACE::HTBP::Channel::request_count ()
113 return this->request_count_++;
116 void
117 ACE::HTBP::Channel::register_notifier (ACE_Reactor *r)
119 if (r == 0)
120 return;
121 if (this->notifier_ == 0)
123 ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));
125 else
127 if (notifier_->get_handle() == ACE_INVALID_HANDLE)
129 delete this->notifier_;
130 ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));
134 r->register_handler(notifier_,ACE_Event_Handler::READ_MASK);
137 ACE::HTBP::Notifier *
138 ACE::HTBP::Channel::notifier ()
140 return this->notifier_;
143 ACE_HANDLE
144 ACE::HTBP::Channel::get_handle () const
146 return this->ace_stream_.get_handle ();
149 void
150 ACE::HTBP::Channel::data_consumed (size_t n)
152 this->data_consumed_ += n;
153 if (this->data_consumed_ == this->data_len_)
155 this->filter_->recv_data_trailer(this);
156 this->filter_->send_ack(this);
161 ACE::HTBP::Channel::load_buffer ()
163 this->leftovers_.crunch();
164 if (this->state() == Detached ||
165 this->state() == Ack_Sent)
167 this->data_len_ = 0;
168 this->data_consumed_ = 0;
171 ssize_t nread = 0;
172 errno = 0;
173 #if 0
174 if (this->session_ &&
175 (this->session_->sock_flags() & ACE_NONBLOCK == ACE_NONBLOCK))
176 #endif
178 nread =
179 ACE::handle_read_ready (this->ace_stream().get_handle(),
180 &ACE_Time_Value::zero);
181 if (nread == -1 && errno == ETIME)
182 errno = EWOULDBLOCK;
184 if (nread != -1)
185 nread = this->ace_stream().recv (this->leftovers_.wr_ptr(),
186 this->leftovers_.space()-1);
187 if (nread < 1)
189 if (nread == 0 || (errno != EWOULDBLOCK && errno != EAGAIN))
191 this->state_ = Closed;
192 #if 0
193 ACE_ERROR ((LM_ERROR,
194 "load_buffer[%d] %p\n",
195 this->ace_stream_.get_handle(),"recv"));
196 #endif
198 return nread;
200 this->leftovers_.wr_ptr(nread);
201 *this->leftovers_.wr_ptr() = '\0';
202 return nread;
206 ACE::HTBP::Channel::flush_buffer ()
208 if (this->session_)
209 return this->session_->flush_outbound_queue();
210 return 0;
214 ACE::HTBP::Channel::send_ack ()
216 return this->filter_->send_ack(this);
220 ACE::HTBP::Channel::recv_ack ()
222 if (load_buffer() == -1)
223 return -1;
224 return this->filter_->recv_ack(this);
227 void
228 ACE::HTBP::Channel::state (ACE::HTBP::Channel::State s)
230 if (s == Detached)
232 this->session_->detach(this);
233 this->session_ = 0;
235 this->state_ = s;
239 ACE::HTBP::Channel::consume_error ()
241 if (error_buffer_ == 0)
243 ACE_NEW_RETURN (error_buffer_,
244 ACE_Message_Block (this->data_len_ + 1),
248 ssize_t result = 0;
249 size_t n = error_buffer_->size();
250 char *buf = error_buffer_->wr_ptr();
252 if (this->leftovers_.length() > 0)
254 result = ACE_MIN (n,this->leftovers_.length());
255 ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);
256 this->leftovers_.rd_ptr(result);
257 buf += result;
260 if (result < (ssize_t)n &&
261 result < (ssize_t)data_len_)
263 n -= result;
264 result += this->ace_stream_.recv(buf, n);
266 if (result > 0)
268 this->error_buffer_->wr_ptr(result);
269 this->data_consumed_ += result;
270 if (this->data_consumed_ == this->data_len_)
272 *this->error_buffer_->wr_ptr() = '\0';
273 if (ACE::debug())
274 ACE_DEBUG ((LM_DEBUG,
275 ACE_TEXT ("ACE::HTBP::Channel::consume_error ")
276 ACE_TEXT("Received entire error buffer: \n%s\n"),
277 this->error_buffer_->rd_ptr()));
278 delete error_buffer_;
279 error_buffer_ = 0;
281 return 1;
284 return 0;
287 //---------------------------------------------------------------------------
288 // = I/O functions.
290 /// The ACE::HTBP::Channel is a sibling of the ACE_SOCK_IO class, rather than a
291 /// decendant. This is due to the requirement to wrap all messages with
292 /// an HTTP request or reply wrapper, and to send application data in only
293 /// one direction on one stream.
296 ACE::HTBP::Channel::pre_recv()
298 if (ACE::debug())
299 ACE_DEBUG ((LM_DEBUG,
300 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
301 ACE_TEXT ("in initial state = %d\n"),state_));
302 if (this->state_ == Init ||
303 this->state_ == Detached ||
304 this->state_ == Header_Pending ||
305 this->state_ == Ack_Sent)
307 if (this->load_buffer() == -1 && this->leftovers_.length() == 0)
309 if (errno != EWOULDBLOCK)
310 this->state_ = Closed;
311 if (ACE::debug())
312 ACE_DEBUG ((LM_DEBUG,
313 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
314 ACE_TEXT ("pre_recv returning -1, state = %d, %p\n"),
315 state_, ACE_TEXT("load_buffer()")));
316 return -1;
318 if (this->filter_->recv_data_header(this) == -1)
319 ACE_ERROR ((LM_ERROR,
320 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
321 ACE_TEXT ("recv_data_header failed, %p\n"),
322 ACE_TEXT("pre_recv")));
324 switch (this->state_)
326 case Data_Queued:
327 case Ack_Sent:
328 case Ready:
329 return 0;
330 case Header_Pending:
331 errno = EWOULDBLOCK;
332 return -1;
333 default:
334 if (ACE::debug())
335 ACE_DEBUG ((LM_DEBUG,
336 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
337 ACE_TEXT("channel[%d] state = %d, %p\n"),
338 this->get_handle(),
339 this->state_,
340 ACE_TEXT("pre_recv")));
342 return -1;
345 /// Recv an <n> byte buffer from the connected socket.
346 ssize_t
347 ACE::HTBP::Channel::recv (void *buf,
348 size_t n,
349 int flags,
350 const ACE_Time_Value *timeout)
352 ssize_t result = 0;
353 if (this->pre_recv() == -1 && this->leftovers_.length() == 0)
354 return -1;
355 if (this->leftovers_.length() > 0)
357 result = ACE_MIN (n,this->leftovers_.length());
358 ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);
359 this->leftovers_.rd_ptr(result);
360 buf = (char *)buf + result;
363 if (result < (ssize_t)n &&
364 result < (ssize_t)data_len_)
366 n -= result;
367 result += this->ace_stream_.recv(buf, n, flags, timeout);
369 if (result > 0)
370 data_consumed((size_t)result);
371 return result;
374 /// Recv an <n> byte buffer from the connected socket.
375 ssize_t
376 ACE::HTBP::Channel::recv (void *buf,
377 size_t n,
378 const ACE_Time_Value *timeout)
380 ssize_t result = 0;
381 if (this->pre_recv() == -1)
382 return -1;
384 result = 0;
385 if (this->leftovers_.length() > 0)
387 result = ACE_MIN (n,this->leftovers_.length());
388 ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);
389 this->leftovers_.rd_ptr(result);
390 buf = (char *)buf + result;
393 if ((size_t)result < n && (size_t)result < this->data_len())
395 n -= result;
396 result += this->ace_stream_.recv(buf, n, timeout);
399 if (result > 0)
400 this->data_consumed((size_t)result);
401 return result;
404 /// Recv an <iovec> of size <n> from the connected socket.
405 ssize_t
406 ACE::HTBP::Channel::recvv (iovec iov[],
407 int iovcnt,
408 const ACE_Time_Value *timeout)
410 ssize_t result = 0;
411 if (this->pre_recv() == -1)
412 return -1;
414 if (this->leftovers_.length())
416 int ndx = 0;
417 iovec *iov2 = new iovec[iovcnt];
418 std::unique_ptr<iovec[]> guard (iov2);
419 for (int i = 0; i < iovcnt; i++)
421 size_t n = ACE_MIN ((size_t) iov[i].iov_len ,
422 (size_t) this->leftovers_.length());
423 if (n > 0)
425 ACE_OS::memcpy (iov[i].iov_base,this->leftovers_.rd_ptr(), n);
426 this->leftovers_.rd_ptr(n);
427 result += n;
429 if (n < (size_t) iov[i].iov_len)
431 iov2[ndx].iov_len = iov[i].iov_len - n;
432 iov2[ndx].iov_base = (char *)iov[i].iov_base + n;
433 ndx++;
436 if (ndx > 0)
437 result += this->ace_stream_.recvv(iov2,ndx,timeout);
439 else
440 result = this->ace_stream_.recvv(iov,iovcnt,timeout);
442 if (result > 0)
443 this->data_consumed((size_t)result);
444 return result;
447 ssize_t
448 ACE::HTBP::Channel::recvv (iovec *io_vec,
449 const ACE_Time_Value *timeout)
451 ssize_t result = 0;
452 if (this->pre_recv() == -1)
453 return -1;
454 if (ACE::debug())
455 ACE_DEBUG ((LM_DEBUG,
456 ACE_TEXT ("ACE::HTBP::Channel::recvv ")
457 ACE_TEXT("recvv, leftover len = %d\n"),
458 this->leftovers_.length()));
459 if (this->leftovers_.length())
461 io_vec->iov_base = 0;
462 io_vec->iov_len = 0;
463 ACE_NEW_RETURN (io_vec->iov_base,
464 char[this->leftovers_.length()],-1);
465 io_vec->iov_len = this->leftovers_.length();
466 ACE_OS::memcpy (io_vec->iov_base,
467 this->leftovers_.rd_ptr(),
468 io_vec->iov_len);
469 result = io_vec->iov_len;
470 this->leftovers_.length(0);
472 else
473 result = this->ace_stream_.recvv(io_vec,timeout);
475 if (result > 0)
476 this->data_consumed((size_t)result);
477 return result;
480 ssize_t
481 ACE::HTBP::Channel::send (const void *buf,
482 size_t n,
483 int flags,
484 const ACE_Time_Value *timeout)
486 ssize_t result = 0;
487 if (this->filter_->send_data_header(n,this) == -1)
488 return -1;
489 result = this->ace_stream_.send(buf,n,flags,timeout);
490 if (result == -1)
491 return -1;
492 if (this->filter_->send_data_trailer(this) == -1)
493 return -1;
494 return result;
497 ssize_t
498 ACE::HTBP::Channel::send (const void *buf,
499 size_t n,
500 const ACE_Time_Value *timeout)
502 ssize_t result = 0;
503 if (this->filter_ == 0)
504 ACE_ERROR_RETURN ((LM_ERROR,
505 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::send: filter ")
506 ACE_TEXT ("is null\n")),-1);
507 if (this->filter_->send_data_header(n,this) == -1)
508 return -1;
509 result = this->ace_stream_.send (buf,n,timeout);
510 if (result == -1)
511 return -1;
512 if (this->filter_->send_data_trailer(this) == -1)
513 return -1;
514 return result;
517 ssize_t
518 ACE::HTBP::Channel::sendv (const iovec iov[],
519 int iovcnt,
520 const ACE_Time_Value *timeout)
522 if (this->ace_stream_.get_handle() == ACE_INVALID_HANDLE)
523 this->session_->inbound();
525 ssize_t result = 0;
526 size_t n = 0;
528 for (int i = 0; i < iovcnt; n += iov[i++].iov_len)
530 // No action.
533 if (this->filter_->send_data_header(n,this) == -1)
534 ACE_ERROR_RETURN ((LM_ERROR,
535 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
536 ACE_TEXT("%p\n"),
537 ACE_TEXT("send_data_header")),-1);
539 result = this->ace_stream_.sendv (iov,iovcnt,timeout);
541 if (result == -1)
542 ACE_ERROR_RETURN ((LM_ERROR,
543 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
544 ACE_TEXT("%p\n"),
545 ACE_TEXT("ace_stream_.sendv")),-1);
547 if (this->filter_->send_data_trailer(this) == -1)
548 ACE_ERROR_RETURN ((LM_ERROR,
549 ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
550 ACE_TEXT("%p\n"),
551 ACE_TEXT("send_data_trailer\n")),-1);
552 return result;
556 ACE::HTBP::Channel::enable (int value) const
558 this->ace_stream_.enable(value);
560 return 0; //this->ace_stream_.enable(value);
564 ACE::HTBP::Channel::disable (int value) const
566 this->ace_stream_.disable(value);
568 return 0;//this->ace_stream_.disable(value);
571 ACE_END_VERSIONED_NAMESPACE_DECL