2 #include "ace/MEM_IO.h"
3 #include "ace/Handle_Set.h"
5 #if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
7 #if !defined (__ACE_INLINE__)
8 #include "ace/MEM_IO.inl"
9 #endif /* __ACE_INLINE__ */
11 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
13 ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO
)
15 ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO ()
20 ACE_Reactive_MEM_IO::init (ACE_HANDLE handle
,
21 const ACE_TCHAR
*name
,
22 MALLOC_OPTIONS
*options
)
24 ACE_TRACE ("ACE_Reactive_MEM_IO::init");
25 this->handle_
= handle
;
26 return this->create_shm_malloc (name
,
31 ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node
*&buf
,
33 const ACE_Time_Value
*timeout
)
35 ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf");
37 if (this->shm_malloc_
== 0 || this->handle_
== ACE_INVALID_HANDLE
)
40 ACE_OFF_T new_offset
= 0;
41 ssize_t retv
= ACE::recv (this->handle_
,
49 // ACELIB_DEBUG ((LM_INFO, "MEM_Stream closed\n"));
53 else if (retv
!= static_cast <ssize_t
> (sizeof (ACE_OFF_T
)))
55 // Nothing available or we are really screwed.
60 return this->get_buf_len (new_offset
, buf
);
64 ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node
*buf
,
66 const ACE_Time_Value
*timeout
)
68 ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf");
70 if (this->shm_malloc_
== 0 || this->handle_
== ACE_INVALID_HANDLE
)
77 ACE_Utils::truncate_cast
<ACE_OFF_T
> (
78 reinterpret_cast<char *> (buf
)
79 - static_cast<char *> (this->shm_malloc_
->base_addr ()));
81 // Send the offset value over the socket.
82 if (ACE::send (this->handle_
,
83 (const char *) &offset
,
86 timeout
) != static_cast <ssize_t
> (sizeof (offset
)))
88 // unsuccessful send, release the memory in the shared-memory.
89 this->release_buffer (buf
);
94 return ACE_Utils::truncate_cast
<ssize_t
> (buf
->size ());
97 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
99 ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node
*new_node
)
104 // Here, we assume we already have acquired the lock necessary.
105 // And we are allowed to write.
106 if (this->mq_
->tail_
.addr () == 0) // nothing in the queue.
108 this->mq_
->head_
= new_node
;
109 this->mq_
->tail_
= new_node
;
114 this->mq_
->tail_
->next_
= new_node
;
116 this->mq_
->tail_
= new_node
;
122 ACE_MT_MEM_IO::Simple_Queue::read ()
127 ACE_MEM_SAP_Node
*retv
= 0;
131 retv
= this->mq_
->head_
;
132 // Here, we assume we already have acquired the lock necessary
133 // and there are something in the queue.
134 if (this->mq_
->head_
== this->mq_
->tail_
)
136 // Last message in the queue.
137 this->mq_
->head_
= 0;
138 this->mq_
->tail_
= 0;
141 this->mq_
->head_
= retv
->next_
;
143 ACE_SEH_EXCEPT (this->malloc_
->memory_pool ().seh_selector (GetExceptionInformation ()))
150 ACE_MT_MEM_IO::~ACE_MT_MEM_IO ()
152 delete this->recv_channel_
.sema_
;
153 delete this->recv_channel_
.lock_
;
154 delete this->send_channel_
.sema_
;
155 delete this->send_channel_
.lock_
;
159 ACE_MT_MEM_IO::init (ACE_HANDLE handle
,
160 const ACE_TCHAR
*name
,
161 MALLOC_OPTIONS
*options
)
163 ACE_TRACE ("ACE_MT_MEM_IO::init");
164 ACE_UNUSED_ARG (handle
);
166 // @@ Give me a rule on naming and how the queue should
167 // be kept in the shared memory and we are done
169 if (this->create_shm_malloc (name
, options
) == -1)
172 ACE_TCHAR server_sema
[MAXPATHLEN
];
173 ACE_TCHAR client_sema
[MAXPATHLEN
];
174 ACE_TCHAR server_lock
[MAXPATHLEN
];
175 ACE_TCHAR client_lock
[MAXPATHLEN
];
176 const ACE_TCHAR
*basename
= ACE::basename (name
);
177 // size_t baselen = ACE_OS::strlen (basename);
179 // Building names. @@ Check buffer overflow?
180 ACE_OS::strcpy (server_sema
, basename
);
181 ACE_OS::strcat (server_sema
, ACE_TEXT ("_sema_to_server"));
182 ACE_OS::strcpy (client_sema
, basename
);
183 ACE_OS::strcat (client_sema
, ACE_TEXT ("_sema_to_client"));
184 ACE_OS::strcpy (server_lock
, basename
);
185 ACE_OS::strcat (server_lock
, ACE_TEXT ("_lock_to_server"));
186 ACE_OS::strcpy (client_lock
, basename
);
187 ACE_OS::strcat (client_lock
, ACE_TEXT ("_lock_to_client"));
189 void *to_server_ptr
= 0;
190 // @@ Here, we assume the shared memory fill will never be resued.
191 // So we can determine whether we are server or client by examining
192 // if the simple message queues have already been set up in
193 // the Malloc object or not.
194 if (this->shm_malloc_
->find ("to_server", to_server_ptr
) == -1)
198 ACE_ALLOCATOR_RETURN (ptr
,
199 this->shm_malloc_
->malloc (2 * sizeof (MQ_Struct
)),
202 MQ_Struct
*mymq
= reinterpret_cast<MQ_Struct
*> (ptr
);
205 (mymq
+ 1)->tail_
= 0;
206 (mymq
+ 1)->head_
= 0;
207 if (this->shm_malloc_
->bind ("to_server", mymq
) == -1)
210 if (this->shm_malloc_
->bind ("to_client", mymq
+ 1) == -1)
213 this->recv_channel_
.queue_
.init (mymq
, this->shm_malloc_
);
214 ACE_NEW_RETURN (this->recv_channel_
.sema_
,
215 ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema
),
217 ACE_NEW_RETURN (this->recv_channel_
.lock_
,
218 ACE_SYNCH_PROCESS_MUTEX (server_lock
),
221 this->send_channel_
.queue_
.init (mymq
+ 1, this->shm_malloc_
);
222 ACE_NEW_RETURN (this->send_channel_
.sema_
,
223 ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema
),
225 ACE_NEW_RETURN (this->send_channel_
.lock_
,
226 ACE_SYNCH_PROCESS_MUTEX (client_lock
),
232 MQ_Struct
*mymq
= reinterpret_cast<MQ_Struct
*> (to_server_ptr
);
233 this->recv_channel_
.queue_
.init (mymq
+1, this->shm_malloc_
);
234 ACE_NEW_RETURN (this->recv_channel_
.sema_
,
235 ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema
),
237 ACE_NEW_RETURN (this->recv_channel_
.lock_
,
238 ACE_SYNCH_PROCESS_MUTEX (client_lock
),
241 this->send_channel_
.queue_
.init (mymq
, this->shm_malloc_
);
242 ACE_NEW_RETURN (this->send_channel_
.sema_
,
243 ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema
),
245 ACE_NEW_RETURN (this->send_channel_
.lock_
,
246 ACE_SYNCH_PROCESS_MUTEX (server_lock
),
253 ACE_MT_MEM_IO::fini ()
255 const int ret
= ACE_MEM_SAP::fini ();
256 ACE_Process_Mutex::unlink (this->recv_channel_
.lock_
->name ());
257 ACE_Process_Mutex::unlink (this->send_channel_
.lock_
->name ());
262 ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node
*&buf
,
264 const ACE_Time_Value
*timeout
)
266 ACE_TRACE ("ACE_MT_MEM_IO::recv_buf");
268 // @@ Don't know how to handle timeout yet.
269 ACE_UNUSED_ARG (timeout
);
270 ACE_UNUSED_ARG (flags
);
272 if (this->shm_malloc_
== 0)
277 // Need to handle timeout here.
278 if (this->recv_channel_
.sema_
->acquire () == -1)
284 // @@ We can probably skip the lock in certain circumstance.
285 ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX
, ace_mon
, *this->recv_channel_
.lock_
, -1);
287 buf
= this->recv_channel_
.queue_
.read ();
291 return ACE_Utils::truncate_cast
<ssize_t
> (buf
->size ());
299 ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node
*buf
,
301 const ACE_Time_Value
*timeout
)
303 ACE_TRACE ("ACE_MT_MEM_IO::send_buf");
305 // @@ Don't know how to handle timeout yet.
306 ACE_UNUSED_ARG (timeout
);
307 ACE_UNUSED_ARG (flags
);
309 if (this->shm_malloc_
== 0)
315 // @@ We can probably skip the lock in certain curcumstances.
316 ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX
, ace_mon
, *this->send_channel_
.lock_
, -1);
318 if (this->send_channel_
.queue_
.write (buf
) == -1)
320 this->release_buffer (buf
);
325 if (this->send_channel_
.sema_
->release () == -1)
330 return ACE_Utils::truncate_cast
<ssize_t
> (buf
->size ());
332 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
335 ACE_MEM_IO::dump () const
337 #if defined (ACE_HAS_DUMP)
338 ACE_TRACE ("ACE_MEM_IO::dump");
339 #endif /* ACE_HAS_DUMP */
343 ACE_MEM_IO::init (const ACE_TCHAR
*name
,
344 ACE_MEM_IO::Signal_Strategy type
,
345 ACE_MEM_SAP::MALLOC_OPTIONS
*options
)
347 ACE_UNUSED_ARG (type
);
349 delete this->deliver_strategy_
;
350 this->deliver_strategy_
= 0;
353 case ACE_MEM_IO::Reactive
:
354 ACE_NEW_RETURN (this->deliver_strategy_
,
355 ACE_Reactive_MEM_IO (),
358 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
360 ACE_NEW_RETURN (this->deliver_strategy_
,
364 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
369 return this->deliver_strategy_
->init (this->get_handle (),
377 if (this->deliver_strategy_
!= 0)
379 return this->deliver_strategy_
->fini ();
387 // Allows a client to read from a socket without having to provide
388 // a buffer to read. This method determines how much data is in the
389 // socket, allocates a buffer of this size, reads in the data, and
390 // returns the number of bytes read.
393 ACE_MEM_IO::send (const ACE_Message_Block
*message_block
,
394 const ACE_Time_Value
*timeout
)
396 ACE_TRACE ("ACE_MEM_IO::send");
398 if (this->deliver_strategy_
== 0)
400 return -1; // Something went seriously wrong.
403 size_t len
= message_block
->total_length ();
407 ACE_MEM_SAP_Node
*buf
=
408 reinterpret_cast<ACE_MEM_SAP_Node
*> (
409 this->deliver_strategy_
->acquire_buffer (
410 ACE_Utils::truncate_cast
<ssize_t
> (len
)));
414 while (message_block
!= 0)
416 ACE_OS::memcpy (static_cast<char *> (buf
->data ()) + n
,
417 message_block
->rd_ptr (),
418 message_block
->length ());
419 n
+= message_block
->length ();
421 if (message_block
->cont ())
423 message_block
= message_block
->cont ();
427 message_block
= message_block
->next ();
433 return this->deliver_strategy_
->send_buf (buf
,
441 ACE_END_VERSIONED_NAMESPACE_DECL
443 #endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */