2 #include "ace/MEM_IO.h"
3 #include "ace/Handle_Set.h"
5 #if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
7 #if !defined (__ACE_INLINE__)
8 #include "ace/MEM_IO.inl"
9 #endif /* __ACE_INLINE__ */
13 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
15 ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO
)
17 ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO (void)
22 ACE_Reactive_MEM_IO::init (ACE_HANDLE handle
,
23 const ACE_TCHAR
*name
,
24 MALLOC_OPTIONS
*options
)
26 ACE_TRACE ("ACE_Reactive_MEM_IO::init");
27 this->handle_
= handle
;
28 return this->create_shm_malloc (name
,
33 ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node
*&buf
,
35 const ACE_Time_Value
*timeout
)
37 ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf");
39 if (this->shm_malloc_
== 0 || this->handle_
== ACE_INVALID_HANDLE
)
42 ACE_OFF_T new_offset
= 0;
43 ssize_t retv
= ACE::recv (this->handle_
,
51 // ACELIB_DEBUG ((LM_INFO, "MEM_Stream closed\n"));
55 else if (retv
!= static_cast <ssize_t
> (sizeof (ACE_OFF_T
)))
57 // Nothing available or we are really screwed.
62 return this->get_buf_len (new_offset
, buf
);
66 ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node
*buf
,
68 const ACE_Time_Value
*timeout
)
70 ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf");
72 if (this->shm_malloc_
== 0 || this->handle_
== ACE_INVALID_HANDLE
)
79 ACE_Utils::truncate_cast
<ACE_OFF_T
> (
80 reinterpret_cast<char *> (buf
)
81 - static_cast<char *> (this->shm_malloc_
->base_addr ()));
83 // Send the offset value over the socket.
84 if (ACE::send (this->handle_
,
85 (const char *) &offset
,
88 timeout
) != static_cast <ssize_t
> (sizeof (offset
)))
90 // unsuccessful send, release the memory in the shared-memory.
91 this->release_buffer (buf
);
96 return ACE_Utils::truncate_cast
<ssize_t
> (buf
->size ());
99 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
101 ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node
*new_node
)
106 // Here, we assume we already have acquired the lock necessary.
107 // And we are allowed to write.
108 if (this->mq_
->tail_
.addr () == 0) // nothing in the queue.
110 this->mq_
->head_
= new_node
;
111 this->mq_
->tail_
= new_node
;
116 this->mq_
->tail_
->next_
= new_node
;
118 this->mq_
->tail_
= new_node
;
124 ACE_MT_MEM_IO::Simple_Queue::read ()
129 ACE_MEM_SAP_Node
*retv
= 0;
133 retv
= this->mq_
->head_
;
134 // Here, we assume we already have acquired the lock necessary
135 // and there are soemthing in the queue.
136 if (this->mq_
->head_
== this->mq_
->tail_
)
138 // Last message in the queue.
139 this->mq_
->head_
= 0;
140 this->mq_
->tail_
= 0;
143 this->mq_
->head_
= retv
->next_
;
145 ACE_SEH_EXCEPT (this->malloc_
->memory_pool ().seh_selector (GetExceptionInformation ()))
152 ACE_MT_MEM_IO::~ACE_MT_MEM_IO ()
154 delete this->recv_channel_
.sema_
;
155 delete this->recv_channel_
.lock_
;
156 delete this->send_channel_
.sema_
;
157 delete this->send_channel_
.lock_
;
161 ACE_MT_MEM_IO::init (ACE_HANDLE handle
,
162 const ACE_TCHAR
*name
,
163 MALLOC_OPTIONS
*options
)
165 ACE_TRACE ("ACE_MT_MEM_IO::init");
166 ACE_UNUSED_ARG (handle
);
168 // @@ Give me a rule on naming and how the queue should
169 // be kept in the shared memory and we are done
171 if (this->create_shm_malloc (name
, options
) == -1)
174 ACE_TCHAR server_sema
[MAXPATHLEN
];
175 ACE_TCHAR client_sema
[MAXPATHLEN
];
176 ACE_TCHAR server_lock
[MAXPATHLEN
];
177 ACE_TCHAR client_lock
[MAXPATHLEN
];
178 const ACE_TCHAR
*basename
= ACE::basename (name
);
179 // size_t baselen = ACE_OS::strlen (basename);
181 // Building names. @@ Check buffer overflow?
182 ACE_OS::strcpy (server_sema
, basename
);
183 ACE_OS::strcat (server_sema
, ACE_TEXT ("_sema_to_server"));
184 ACE_OS::strcpy (client_sema
, basename
);
185 ACE_OS::strcat (client_sema
, ACE_TEXT ("_sema_to_client"));
186 ACE_OS::strcpy (server_lock
, basename
);
187 ACE_OS::strcat (server_lock
, ACE_TEXT ("_lock_to_server"));
188 ACE_OS::strcpy (client_lock
, basename
);
189 ACE_OS::strcat (client_lock
, ACE_TEXT ("_lock_to_client"));
191 void *to_server_ptr
= 0;
192 // @@ Here, we assume the shared memory fill will never be resued.
193 // So we can determine whether we are server or client by examining
194 // if the simple message queues have already been set up in
195 // the Malloc object or not.
196 if (this->shm_malloc_
->find ("to_server", to_server_ptr
) == -1)
200 ACE_ALLOCATOR_RETURN (ptr
,
201 this->shm_malloc_
->malloc (2 * sizeof (MQ_Struct
)),
204 MQ_Struct
*mymq
= reinterpret_cast<MQ_Struct
*> (ptr
);
207 (mymq
+ 1)->tail_
= 0;
208 (mymq
+ 1)->head_
= 0;
209 if (this->shm_malloc_
->bind ("to_server", mymq
) == -1)
212 if (this->shm_malloc_
->bind ("to_client", mymq
+ 1) == -1)
215 this->recv_channel_
.queue_
.init (mymq
, this->shm_malloc_
);
216 ACE_NEW_RETURN (this->recv_channel_
.sema_
,
217 ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema
),
219 ACE_NEW_RETURN (this->recv_channel_
.lock_
,
220 ACE_SYNCH_PROCESS_MUTEX (server_lock
),
223 this->send_channel_
.queue_
.init (mymq
+ 1, this->shm_malloc_
);
224 ACE_NEW_RETURN (this->send_channel_
.sema_
,
225 ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema
),
227 ACE_NEW_RETURN (this->send_channel_
.lock_
,
228 ACE_SYNCH_PROCESS_MUTEX (client_lock
),
234 MQ_Struct
*mymq
= reinterpret_cast<MQ_Struct
*> (to_server_ptr
);
235 this->recv_channel_
.queue_
.init (mymq
+1, this->shm_malloc_
);
236 ACE_NEW_RETURN (this->recv_channel_
.sema_
,
237 ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema
),
239 ACE_NEW_RETURN (this->recv_channel_
.lock_
,
240 ACE_SYNCH_PROCESS_MUTEX (client_lock
),
243 this->send_channel_
.queue_
.init (mymq
, this->shm_malloc_
);
244 ACE_NEW_RETURN (this->send_channel_
.sema_
,
245 ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema
),
247 ACE_NEW_RETURN (this->send_channel_
.lock_
,
248 ACE_SYNCH_PROCESS_MUTEX (server_lock
),
255 ACE_MT_MEM_IO::fini ()
257 const int ret
= ACE_MEM_SAP::fini ();
258 ACE_Process_Mutex::unlink (this->recv_channel_
.lock_
->name ());
259 ACE_Process_Mutex::unlink (this->send_channel_
.lock_
->name ());
264 ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node
*&buf
,
266 const ACE_Time_Value
*timeout
)
268 ACE_TRACE ("ACE_MT_MEM_IO::recv_buf");
270 // @@ Don't know how to handle timeout yet.
271 ACE_UNUSED_ARG (timeout
);
272 ACE_UNUSED_ARG (flags
);
274 if (this->shm_malloc_
== 0)
279 // Need to handle timeout here.
280 if (this->recv_channel_
.sema_
->acquire () == -1)
286 // @@ We can probably skip the lock in certain circumstance.
287 ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX
, ace_mon
, *this->recv_channel_
.lock_
, -1);
289 buf
= this->recv_channel_
.queue_
.read ();
293 return ACE_Utils::truncate_cast
<ssize_t
> (buf
->size ());
301 ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node
*buf
,
303 const ACE_Time_Value
*timeout
)
305 ACE_TRACE ("ACE_MT_MEM_IO::send_buf");
307 // @@ Don't know how to handle timeout yet.
308 ACE_UNUSED_ARG (timeout
);
309 ACE_UNUSED_ARG (flags
);
311 if (this->shm_malloc_
== 0)
317 // @@ We can probably skip the lock in certain curcumstances.
318 ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX
, ace_mon
, *this->send_channel_
.lock_
, -1);
320 if (this->send_channel_
.queue_
.write (buf
) == -1)
322 this->release_buffer (buf
);
327 if (this->send_channel_
.sema_
->release () == -1)
332 return ACE_Utils::truncate_cast
<ssize_t
> (buf
->size ());
334 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
337 ACE_MEM_IO::dump (void) const
339 #if defined (ACE_HAS_DUMP)
340 ACE_TRACE ("ACE_MEM_IO::dump");
341 #endif /* ACE_HAS_DUMP */
345 ACE_MEM_IO::init (const ACE_TCHAR
*name
,
346 ACE_MEM_IO::Signal_Strategy type
,
347 ACE_MEM_SAP::MALLOC_OPTIONS
*options
)
349 ACE_UNUSED_ARG (type
);
351 delete this->deliver_strategy_
;
352 this->deliver_strategy_
= 0;
355 case ACE_MEM_IO::Reactive
:
356 ACE_NEW_RETURN (this->deliver_strategy_
,
357 ACE_Reactive_MEM_IO (),
360 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
362 ACE_NEW_RETURN (this->deliver_strategy_
,
366 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
371 return this->deliver_strategy_
->init (this->get_handle (),
377 ACE_MEM_IO::fini (void)
379 if (this->deliver_strategy_
!= 0)
381 return this->deliver_strategy_
->fini ();
389 // Allows a client to read from a socket without having to provide
390 // a buffer to read. This method determines how much data is in the
391 // socket, allocates a buffer of this size, reads in the data, and
392 // returns the number of bytes read.
395 ACE_MEM_IO::send (const ACE_Message_Block
*message_block
,
396 const ACE_Time_Value
*timeout
)
398 ACE_TRACE ("ACE_MEM_IO::send");
400 if (this->deliver_strategy_
== 0)
402 return -1; // Something went seriously wrong.
405 size_t len
= message_block
->total_length ();
409 ACE_MEM_SAP_Node
*buf
=
410 reinterpret_cast<ACE_MEM_SAP_Node
*> (
411 this->deliver_strategy_
->acquire_buffer (
412 ACE_Utils::truncate_cast
<ssize_t
> (len
)));
416 while (message_block
!= 0)
418 ACE_OS::memcpy (static_cast<char *> (buf
->data ()) + n
,
419 message_block
->rd_ptr (),
420 message_block
->length ());
421 n
+= message_block
->length ();
423 if (message_block
->cont ())
425 message_block
= message_block
->cont ();
429 message_block
= message_block
->next ();
435 return this->deliver_strategy_
->send_buf (buf
,
443 ACE_END_VERSIONED_NAMESPACE_DECL
445 #endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */