Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / HTBP / HTBP_Session.cpp
blobe0802247f04abf2dd6edfc0c405a95b5b496f62d
1 // SOCK_Stream.cpp
2 #include "ace/Log_Msg.h"
4 #include "HTBP_Session.h"
5 #include "ace/SOCK_Connector.h"
6 #include "ace/Event_Handler.h"
7 #include "ace/os_include/netinet/os_tcp.h"
8 #include <memory>
10 #include "HTBP_Filter.h"
11 #include "HTBP_ID_Requestor.h"
13 #if !defined (__ACE_INLINE__)
14 #include "HTBP_Session.inl"
15 #endif
17 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
19 ACE::HTBP::Session::Session_Map ACE::HTBP::Session::session_map_;
20 ACE_UINT32 ACE::HTBP::Session::last_session_id_ = 0;
21 ACE_SYNCH_MUTEX ACE::HTBP::Session::session_id_lock_;
23 /// Static method definitions
24 ACE_UINT32
25 ACE::HTBP::Session::next_session_id ()
27 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, g, ACE::HTBP::Session::session_id_lock_, 0);
28 return ++last_session_id_;
31 int
32 ACE::HTBP::Session::add_session (ACE::HTBP::Session *s)
34 return session_map_.bind (s->session_id(),s);
37 int
38 ACE::HTBP::Session::remove_session (ACE::HTBP::Session *s)
40 if (session_map_.current_size() > 0)
41 return session_map_.unbind(s->session_id());
42 return 0;
45 int
46 ACE::HTBP::Session::find_session (const ACE::HTBP::Session_Id_t &sid, ACE::HTBP::Session *&out)
48 ACE::HTBP::Session::Map_Entry *e = 0;
49 if (session_map_.find (sid,e) == -1)
51 out = 0;
52 return -1;
54 out = e->int_id_;
55 return 0;
58 //----------------------------------------------------------------------------
59 ACE::HTBP::Session::Session ()
60 : proxy_addr_ (0),
61 destroy_proxy_addr_ (0),
62 inbound_ (0),
63 outbound_ (0),
64 closed_ (false),
65 handler_ (0),
66 reactor_(0),
67 stream_ (0),
68 sock_flags_(0)
70 ACE::HTBP::ID_Requestor req;
71 ACE_TCHAR * htid = req.get_HTID();
72 std::unique_ptr<ACE_TCHAR[]> guard (htid);
73 session_id_.local_ = ACE_TEXT_ALWAYS_CHAR(htid);
74 session_id_.id_ = ACE::HTBP::Session::next_session_id();
75 ACE_NEW (inbound_, ACE::HTBP::Channel (this));
76 ACE_NEW (outbound_, ACE::HTBP::Channel (this));
79 ACE::HTBP::Session::Session (const ACE::HTBP::Addr &peer,
80 const ACE::HTBP::Addr &local,
81 ACE_UINT32 sid,
82 ACE_INET_Addr *proxy,
83 bool take_proxy)
84 : proxy_addr_ (proxy),
85 destroy_proxy_addr_ (take_proxy),
86 inbound_ (0),
87 outbound_ (0),
88 closed_ (false),
89 handler_ (0),
90 reactor_(0),
91 stream_ (0),
92 sock_flags_(0)
94 session_id_.peer_ = peer;
95 session_id_.local_ = local;
96 session_id_.id_ = (sid == 0) ?
97 ACE::HTBP::Session::next_session_id() : sid;
99 ACE_NEW (inbound_,ACE::HTBP::Channel (this));
100 ACE_NEW (outbound_,ACE::HTBP::Channel (this));
103 ACE::HTBP::Session::Session (const ACE::HTBP::Session_Id_t &id,
104 ACE_INET_Addr *proxy,
105 bool take_proxy)
106 : proxy_addr_ (proxy),
107 destroy_proxy_addr_ (take_proxy),
108 session_id_(id),
109 inbound_ (0),
110 outbound_ (0),
111 closed_ (false),
112 handler_ (0),
113 reactor_ (0),
114 stream_ (0),
115 sock_flags_(0)
117 ACE_NEW (inbound_, ACE::HTBP::Channel (this));
118 ACE_NEW (outbound_, ACE::HTBP::Channel (this));
121 ACE::HTBP::Session::Session (const ACE::HTBP::Session &other)
123 this->operator=(other);
126 ACE::HTBP::Session&
127 ACE::HTBP::Session::operator= (const ACE::HTBP::Session &)
129 // @TODO: figure out why the assignment operator is here if it is
130 // unimplemented? Previously there was an ACE_ASSERT(this == 0)
131 // so apparently something bad had been happening long ago, but I
132 // have no idea what.
133 return *this;
136 ACE::HTBP::Session::~Session ()
138 if (destroy_proxy_addr_)
139 delete proxy_addr_;
141 delete this->inbound_;
142 delete this->outbound_;
146 ACE::HTBP::Session::close ()
148 if (this->inbound_)
149 this->inbound_->close();
150 if (this->outbound_)
151 this->outbound_->close();
152 this->closed_ = true;
153 return ACE::HTBP::Session::remove_session (this);
157 ACE::HTBP::Channel *
158 ACE::HTBP::Session::outbound () const
160 if (!this->closed_ && this->proxy_addr_)
161 this->reconnect();
162 if ( this->outbound_ == 0)
163 return 0;
164 ACE::HTBP::Channel::State s =this->outbound_->state();
165 return s == ACE::HTBP::Channel::Init || s == ACE::HTBP::Channel::Ready ? this->outbound_ : 0;
168 void
169 ACE::HTBP::Session::reconnect_i (ACE::HTBP::Channel *s) const
171 ACE_SOCK_Connector conn;
172 if (conn.connect (s->ace_stream(),*this->proxy_addr_) == -1)
174 ACE_TCHAR buffer[128];
175 this->proxy_addr_->addr_to_string(buffer, 128, 0);
176 ACE_ERROR ((LM_ERROR,
177 ACE_TEXT("(%P|%t) ACE::HTBP::Session::reconnect")
178 ACE_TEXT(" failed to %s, %p\n"),
179 buffer, s == this->inbound_ ?
180 ACE_TEXT("inbound") : ACE_TEXT ("outbound")));
182 else
184 #if !defined (ACE_LACKS_TCP_NODELAY)
185 int no_delay = 1;
186 int result = s->ace_stream().set_option (ACE_IPPROTO_TCP,
187 TCP_NODELAY,
188 (void *) &no_delay,
189 sizeof (no_delay));
190 if (result == -1)
191 ACE_DEBUG ((LM_DEBUG, "HTBP::Session::reconnect_i, %p\n", "set_option" ));
192 #endif /* ! ACE_LACKS_TCP_NODELAY */
195 s->register_notifier(this->reactor_);
196 if (s == this->inbound_)
197 s->send_ack();
200 ACE_Event_Handler *
201 ACE::HTBP::Session::handler ()
203 return this->handler_;
206 void
207 ACE::HTBP::Session::handler (ACE_Event_Handler * h)
209 this->handler_ = h;
212 void
213 ACE::HTBP::Session::detach (ACE::HTBP::Channel *ch)
215 if (this->inbound_ == ch)
216 this->inbound_ = 0;
217 else if (this->outbound_ == ch)
218 this->outbound_ = 0;
219 else
220 ACE_ERROR ((LM_ERROR,
221 ACE_TEXT("ACE::HTBP::Session::detach ")
222 ACE_TEXT("called with unknown channel\n")));
225 void
226 ACE::HTBP::Session::reactor (ACE_Reactor *r)
228 this->reactor_ = r;
229 this->inbound_->register_notifier(r);
230 this->outbound_->register_notifier(r);
234 ACE::HTBP::Session::enqueue (ACE_Message_Block *msg)
236 this->outbound_queue_.enqueue_tail(msg);
237 return msg->length();
241 ACE::HTBP::Session::flush_outbound_queue ()
243 int result = 0;
244 if (this->outbound_queue_.message_count() > 0)
246 ACE_Message_Block *msg = 0;
247 iovec *iov = 0;
248 ACE_NEW_RETURN (iov,
249 iovec[this->outbound_queue_.message_count()],
250 -1);
251 std::unique_ptr<iovec[]> guard (iov);
252 this->outbound_queue_.peek_dequeue_head (msg);
253 for (size_t i = 0; i < this->outbound_queue_.message_count(); i++)
255 iov[i].iov_base = msg->rd_ptr();
256 iov[i].iov_len = msg->length();
257 msg = msg->next();
259 if (this->outbound_->state() == ACE::HTBP::Channel::Wait_For_Ack)
260 this->outbound_->recv_ack();
261 result = this->outbound_->sendv (iov,this->outbound_queue_.message_count(),0);
262 while (this->outbound_queue_.message_count ())
264 this->outbound_queue_.dequeue_head (msg);
265 msg->release ();
268 return result;
272 ACE::HTBP::Session::close_inbound () const
274 return this->inbound_ ? this->inbound_->close() : 0;
278 ACE::HTBP::Session::close_outbound () const
280 return this->outbound_ ? this->outbound_->close() : 0;
284 ACE::HTBP::Session::enable (int flags)
286 this->sock_flags_ |= flags;
287 int result = this->inbound_ ? this->inbound_->enable(flags) : 0;
288 result |= this->outbound_ ? this->outbound_->enable (flags) : 0;
289 return result;
293 ACE::HTBP::Session::disable (int flags)
295 this->sock_flags_ &= ~flags;
296 int result = this->inbound_ ? this->inbound_->disable(flags) : 0;
297 result |= this->outbound_ ? this->outbound_->disable (flags) : 0;
298 return result;
301 ACE::HTBP::Stream *
302 ACE::HTBP::Session::stream () const
304 return this->stream_;
307 void
308 ACE::HTBP::Session::stream (ACE::HTBP::Stream *s)
310 this->stream_ = s;
313 ACE_END_VERSIONED_NAMESPACE_DECL