Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tao / Strategies / SHMIOP_Transport.cpp
blobbacbceb99cb5d4cfd1c54a94fab35c92ee1eee8b
1 #include "tao/Strategies/SHMIOP_Transport.h"
3 #if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0)
5 #include "tao/Strategies/SHMIOP_Connection_Handler.h"
6 #include "tao/Strategies/SHMIOP_Profile.h"
7 #include "tao/Timeprobe.h"
8 #include "tao/CDR.h"
9 #include "tao/Transport_Mux_Strategy.h"
10 #include "tao/Wait_Strategy.h"
11 #include "tao/Stub.h"
12 #include "tao/ORB_Core.h"
13 #include "tao/debug.h"
14 #include "tao/Resume_Handle.h"
15 #include "tao/GIOP_Message_Base.h"
17 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
19 TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler,
20 TAO_ORB_Core *orb_core)
21 : TAO_Transport (TAO_TAG_SHMEM_PROFILE,
22 orb_core),
23 connection_handler_ (handler)
27 TAO_SHMIOP_Transport::~TAO_SHMIOP_Transport ()
31 ACE_Event_Handler *
32 TAO_SHMIOP_Transport::event_handler_i ()
34 return this->connection_handler_;
37 TAO_Connection_Handler *
38 TAO_SHMIOP_Transport::connection_handler_i ()
40 return this->connection_handler_;
43 ssize_t
44 TAO_SHMIOP_Transport::send (iovec *iov, int iovcnt,
45 size_t &bytes_transferred,
46 const ACE_Time_Value *max_wait_time)
48 bytes_transferred = 0;
49 for (int i = 0; i < iovcnt; ++i)
51 ssize_t retval =
52 this->connection_handler_->peer ().send (iov[i].iov_base,
53 iov[i].iov_len,
54 max_wait_time);
55 if (retval > 0)
56 bytes_transferred += retval;
57 if (retval <= 0)
58 return retval;
60 return bytes_transferred;
63 ssize_t
64 TAO_SHMIOP_Transport::recv (char *buf,
65 size_t len,
66 const ACE_Time_Value *max_wait_time)
68 ssize_t n = 0;
70 int read_break = 0;
72 while (!read_break)
74 n = this->connection_handler_->peer ().recv (buf,
75 len,
76 max_wait_time);
78 // If we get a EWOULBLOCK we try to read again.
79 if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
81 n = 0;
82 continue;
85 // If there is anything else we just drop out of the loop.
86 read_break = 1;
89 if (n == -1)
91 if (TAO_debug_level > 3 && errno != ETIME)
93 TAOLIB_DEBUG ((LM_DEBUG,
94 ACE_TEXT ("TAO (%P|%t) - SHMIOP_Transport::recv, %p\n"),
95 ACE_TEXT ("TAO - read message failure ")
96 ACE_TEXT ("recv_i ()\n")));
99 else if (n == 0)
101 n = -1;
103 return n;
107 TAO_SHMIOP_Transport::handle_input (TAO_Resume_Handle &rh,
108 ACE_Time_Value *max_wait_time)
110 if (TAO_debug_level > 3)
112 TAOLIB_DEBUG ((LM_DEBUG,
113 "TAO (%P|%t) - SHMIOP_Transport[%d]::handle_input\n",
114 this->id ()));
117 // The buffer on the stack which will be used to hold the input
118 // messages, compensate shrink due to alignment
119 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
122 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
123 (void) ACE_OS::memset (buf,
124 '\0',
125 sizeof buf);
126 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
128 // Create a data block
129 ACE_Data_Block db (sizeof (buf),
130 ACE_Message_Block::MB_DATA,
131 buf,
132 this->orb_core_->input_cdr_buffer_allocator (),
133 this->orb_core_->locking_strategy (),
134 ACE_Message_Block::DONT_DELETE,
135 this->orb_core_->input_cdr_dblock_allocator ());
137 // Create a message block
138 ACE_Message_Block message_block (&db,
139 ACE_Message_Block::DONT_DELETE,
140 this->orb_core_->input_cdr_msgblock_allocator ());
143 // Align the message block
144 ACE_CDR::mb_align (&message_block);
146 const size_t missing_header_data = this->messaging_object ()->header_length ();
148 if (missing_header_data == 0)
150 return -1;
153 // .. do a read on the socket again.
154 ssize_t bytes = 0;
156 // As this used for transports where things are available in one
157 // shot this looping should not create any problems.
158 for (size_t m = missing_header_data;
159 m != 0;
160 m -= bytes)
162 bytes = 0; // reset
164 // We would have liked to use something like a recv_n ()
165 // here. But at the time when the code was written, the MEM_Stream
166 // classes had poor support for recv_n (). Till a day when we
167 // get proper recv_n (), let us stick with this. The other
168 // argument that can be said against this is that, this is the
169 // bad layer in which this is being done ie. recv_n is
170 // simulated. But...
171 bytes = this->recv (message_block.wr_ptr (),
173 max_wait_time);
175 if (bytes == 0 ||
176 bytes == -1)
178 return -1;
181 message_block.wr_ptr (bytes);
184 TAO_Queued_Data qd (&message_block);
185 size_t mesg_length = 0;
187 // Parse the incoming message for validity. The check needs to be
188 // performed by the messaging objects.
189 if (this->messaging_object ()->parse_next_message (qd,
190 mesg_length) == -1)
191 return -1;
193 if (qd.missing_data () == TAO_MISSING_DATA_UNDEFINED)
195 // parse/marshal error happened
196 return -1;
199 if (message_block.length () > mesg_length)
201 // we read too much data
202 return -1;
205 if (message_block.space () < qd.missing_data ())
207 size_t const message_size = message_block.length ()
208 + qd.missing_data ();
210 // reallocate buffer with correct size on heap
211 if (ACE_CDR::grow (&message_block, message_size) == -1)
213 if (TAO_debug_level > 0)
215 TAOLIB_ERROR ((LM_ERROR,
216 "TAO (%P|%t) - SHMIOP_Transport[%d]::handle_input, "
217 "error growing message buffer\n",
218 this->id () ));
220 return -1;
225 // As this used for transports where things are available in one
226 // shot this looping should not create any problems.
227 for (size_t n = qd.missing_data ();
228 n != 0;
229 n -= bytes)
231 bytes = 0; // reset
233 // We would have liked to use something like a recv_n ()
234 // here. But at the time when the code was written, the MEM_Stream
235 // classes had poor support for recv_n (). Till a day when we
236 // get proper recv_n (), let us stick with this. The other
237 // argument that can be said against this is that, this is the
238 // bad layer in which this is being done ie. recv_n is
239 // simulated. But...
240 bytes = this->recv (message_block.wr_ptr (),
242 max_wait_time);
244 if (bytes == 0 ||
245 bytes == -1)
247 return -1;
250 message_block.wr_ptr (bytes);
253 qd.missing_data (0);
255 // Now we have a full message in our buffer. Just go ahead and
256 // process that
257 if (this->process_parsed_messages (&qd, rh) == -1)
259 return -1;
262 return 0;
267 TAO_SHMIOP_Transport::send_request (TAO_Stub *stub,
268 TAO_ORB_Core *orb_core,
269 TAO_OutputCDR &stream,
270 TAO_Message_Semantics message_semantics,
271 ACE_Time_Value *max_wait_time)
273 if (this->ws_->sending_request (orb_core,
274 message_semantics) == -1)
276 return -1;
279 if (this->send_message (stream,
280 stub,
282 message_semantics,
283 max_wait_time) == -1)
285 return -1;
288 this->first_request_sent();
290 return 0;
294 TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
295 TAO_Stub *stub,
296 TAO_ServerRequest *request,
297 TAO_Message_Semantics message_semantics,
298 ACE_Time_Value *max_wait_time)
300 // Format the message in the stream first
301 if (this->messaging_object ()->format_message (stream, stub, request) != 0)
303 return -1;
306 // Strictly speaking, should not need to loop here because the
307 // socket never gets set to a nonblocking mode ... some Linux
308 // versions seem to need it though. Leaving it costs little.
310 // This guarantees to send all data (bytes) or return an error.
311 ssize_t n = this->send_message_shared (stub,
312 message_semantics,
313 stream.begin (),
314 max_wait_time);
316 if (n == -1)
318 if (TAO_debug_level)
319 TAOLIB_DEBUG ((LM_DEBUG,
320 ACE_TEXT ("TAO (%P|%t) closing transport %d after fault %p\n"),
321 this->id (),
322 ACE_TEXT ("send_message ()\n")));
324 return -1;
327 return 1;
330 TAO_END_VERSIONED_NAMESPACE_DECL
332 #endif /* TAO_HAS_SHMIOP && TAO_HAS_SHMIOP != 0 */