4 #include "ace/Stream.h"
6 #if !defined (ACE_LACKS_PRAGMA_ONCE)
8 #endif /* ACE_LACKS_PRAGMA_ONCE */
10 #include "ace/Stream_Modules.h"
11 #include "ace/OS_NS_string.h"
13 #if !defined (__ACE_INLINE__)
14 #include "ace/Stream.inl"
15 #endif /* __ACE_INLINE__ */
17 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
19 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Stream
)
21 // Give some idea of what the heck is going on in a stream!
23 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
24 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::dump () const
26 #if defined (ACE_HAS_DUMP)
27 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::dump");
28 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------- module links --------\n")));
30 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mp
= this->stream_head_
;
34 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("module name = %s\n"), mp
->name ()));
35 if (mp
== this->stream_tail_
)
39 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------- writer links --------\n")));
41 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *tp
;
43 for (tp
= this->stream_head_
->writer ();
47 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("writer queue name = %s\n"), tp
->name ()));
49 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------\n")));
50 if (tp
== this->stream_tail_
->writer ()
52 && tp
== this->linked_us_
->stream_head_
->reader ()))
56 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------- reader links --------\n")));
57 for (tp
= this->stream_tail_
->reader (); ; tp
= tp
->next ())
59 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("reader queue name = %s\n"), tp
->name ()));
61 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------\n")));
62 if (tp
== this->stream_head_
->reader ()
64 && tp
== this->linked_us_
->stream_head_
->writer ()))
67 #endif /* ACE_HAS_DUMP */
70 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
71 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::push (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *new_top
)
73 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push");
74 if (this->push_module (new_top
,
75 this->stream_head_
->next (),
76 this->stream_head_
) == -1)
82 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
83 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
85 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::put");
86 return this->stream_head_
->writer ()->put (mb
, tv
);
89 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
90 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::get (ACE_Message_Block
*&mb
, ACE_Time_Value
*tv
)
92 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::get");
93 return this->stream_head_
->reader ()->getq (mb
, tv
);
96 // Return the "top" ACE_Module in a ACE_Stream, skipping over the
99 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
100 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::top (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *&m
)
102 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::top");
103 if (this->stream_head_
->next () == this->stream_tail_
)
107 m
= this->stream_head_
->next ();
112 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
113 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::insert (const ACE_TCHAR
*prev_name
,
114 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mod
)
116 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::insert");
118 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *prev_mod
= this->stream_head_
;
120 prev_mod
= prev_mod
->next ())
121 if (ACE_OS::strcmp (prev_mod
->name (), prev_name
) == 0)
123 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *next_mod
= prev_mod
->next ();
125 // We can't insert a module below <stream_tail_>.
129 mod
->link (next_mod
);
130 prev_mod
->link (mod
);
132 if (mod
->reader ()->open (mod
->arg ()) == -1)
135 if (mod
->writer ()->open (mod
->arg ()) == -1)
144 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
145 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::replace (const ACE_TCHAR
*replace_name
,
146 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mod
,
149 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::replace");
150 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *prev_mod
= 0;
152 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *rep_mod
= this->stream_head_
;
154 rep_mod
= rep_mod
->next ())
155 if (ACE_OS::strcmp (rep_mod
->name (), replace_name
) == 0)
157 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *next_mod
= rep_mod
->next ();
160 mod
->link (next_mod
);
161 else // In case the <next_mod> is <stream_tail_>.
163 mod
->writer ()->next (0);
165 this->stream_tail_
= mod
;
169 prev_mod
->link (mod
);
170 else // In case the <rep_mod> is <stream_head_>.
172 mod
->reader ()->next (0);
173 this->stream_head_
= mod
;
176 if (mod
->reader ()->open (mod
->arg ()) == -1)
179 if (mod
->writer ()->open (mod
->arg ()) == -1)
182 if (flags
!= ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
>::M_DELETE_NONE
)
184 rep_mod
->close (flags
);
196 // Remove the "top" ACE_Module in a ACE_Stream, skipping over the
199 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
200 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::pop (int flags
)
202 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::pop");
203 if (this->stream_head_
->next () == this->stream_tail_
)
207 // Skip over the ACE_Stream head.
208 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *top_mod
= this->stream_head_
->next ();
209 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *new_top
= top_mod
->next ();
211 this->stream_head_
->next (new_top
);
213 // Close the top ACE_Module.
215 top_mod
->close (flags
);
217 // Don't delete the Module unless the flags request this.
218 if (flags
!= ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
>::M_DELETE_NONE
)
221 this->stream_head_
->writer ()->next (new_top
->writer ());
222 new_top
->reader ()->next (this->stream_head_
->reader ());
227 // Remove a named ACE_Module from an arbitrary place in the
230 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
231 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::remove (const ACE_TCHAR
*name
,
234 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::remove");
235 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *prev
= 0;
237 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mod
= this->stream_head_
;
244 ACELIB_DEBUG ((LM_DEBUG
,
245 ACE_TEXT ("ACE_Stream::remove - comparing existing module :%s: with :%s:\n"),
251 if (ACE_OS::strcmp (mod
->name (), name
) == 0)
253 if (prev
== 0) // Deleting ACE_Stream Head
254 this->stream_head_
->link (mod
->next ());
256 prev
->link (mod
->next ());
258 // Close down the module.
261 // Don't delete the Module unless the flags request this.
262 if (flags
!= ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
>::M_DELETE_NONE
)
264 // Release the memory.
274 ACELIB_DEBUG ((LM_WARNING
, ACE_TEXT ("ACE_Stream::remove failed to find module with name %s to remove\n"),name
));
278 template <ACE_SYNCH_DECL
, class TIME_POLICY
> ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *
279 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::find (const ACE_TCHAR
*name
)
281 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::find");
282 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mod
= this->stream_head_
;
285 if (ACE_OS::strcmp (mod
->name (), name
) == 0)
291 // Actually push a module onto the stack...
293 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
294 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::push_module (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *new_top
,
295 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *current_top
,
296 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *head
)
298 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push_module");
299 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *nt_reader
= new_top
->reader ();
300 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *nt_writer
= new_top
->writer ();
301 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *ct_reader
= 0;
302 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *ct_writer
= 0;
306 ct_reader
= current_top
->reader ();
307 ct_writer
= current_top
->writer ();
308 ct_reader
->next (nt_reader
);
311 nt_writer
->next (ct_writer
);
316 head
->link (new_top
);
321 new_top
->next (current_top
);
323 if (nt_reader
->open (new_top
->arg ()) == -1)
326 if (nt_writer
->open (new_top
->arg ()) == -1)
331 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
332 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::open (void *a
,
333 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *head
,
334 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *tail
)
336 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open");
337 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
339 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *h1
= 0, *h2
= 0;
340 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *t1
= 0, *t2
= 0;
344 typedef ACE_Stream_Head
<ACE_SYNCH_USE
, TIME_POLICY
> STREAM_HEAD_TYPE
;
345 ACE_NEW_NORETURN (h1
,
347 ACE_NEW_NORETURN (h2
,
349 typedef ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> MODULE_TYPE
;
350 ACE_NEW_NORETURN (head
,
351 MODULE_TYPE (ACE_TEXT ("ACE_Stream_Head"),
359 typedef ACE_Stream_Tail
<ACE_SYNCH_USE
, TIME_POLICY
> STREAM_TAIL_TYPE
;
360 ACE_NEW_NORETURN (t1
,
362 ACE_NEW_NORETURN (t2
,
364 typedef ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> MODULE_TYPE
;
365 ACE_NEW_NORETURN (tail
,
366 MODULE_TYPE (ACE_TEXT ("ACE_Stream_Tail"),
372 // Make sure *all* the allocation succeeded!
373 if ((head
== 0 && (h1
== 0 || h2
== 0))
374 || (tail
== 0 && (t1
== 0 || t2
== 0)))
386 this->stream_head_
= head
;
387 this->stream_tail_
= tail
;
389 if (this->push_module (this->stream_tail_
) == -1)
391 else if (this->push_module (this->stream_head_
,
393 this->stream_head_
) == -1)
399 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
400 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::close (int flags
)
402 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::close");
403 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
405 if (this->stream_head_
!= 0
406 && this->stream_tail_
!= 0)
408 // Don't bother checking return value here.
413 // Remove and cleanup all the intermediate modules.
415 while (this->stream_head_
->next () != this->stream_tail_
)
416 if (this->pop (flags
) == -1)
419 // Clean up the head and tail of the stream.
420 if (this->stream_head_
->close (flags
) == -1)
422 if (this->stream_tail_
->close (flags
) == -1)
425 // Cleanup the memory.
426 delete this->stream_head_
;
427 delete this->stream_tail_
;
429 this->stream_head_
= 0;
430 this->stream_tail_
= 0;
432 // Tell all threads waiting on the close that we are done.
433 this->final_close_
.broadcast ();
439 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
440 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd
,
443 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::control");
444 ACE_IO_Cntl_Msg
ioc (cmd
);
446 ACE_Message_Block
*db
= 0;
448 // Try to create a data block that contains the user-supplied data.
450 ACE_Message_Block (sizeof (int),
451 ACE_Message_Block::MB_IOCTL
,
455 // Try to create a control block <cb> that contains the control
456 // field and a pointer to the data block <db> in <cb>'s continuation
458 ACE_Message_Block
*cb
= 0;
460 ACE_NEW_NORETURN (cb
,
461 ACE_Message_Block (sizeof ioc
,
462 ACE_Message_Block::MB_IOCTL
,
465 // @@ Michael: The old semantic assumed that cb returns == 0
466 // if no memory was available. We will now return immediately
467 // without release (errno is set to ENOMEM by the macro).
469 // If we can't allocate <cb> then we need to delete db and return
480 if (this->stream_head_
->writer ()->put (cb
) == -1)
482 else if (this->stream_head_
->reader ()->getq (cb
) == -1)
485 result
= ((ACE_IO_Cntl_Msg
*) cb
->rd_ptr ())->rval ();
487 // This will also release db if it's reference count == 0.
493 // Link two streams together at their bottom-most Modules (i.e., the
494 // one just above the Stream tail). Note that all of this is premised
495 // on the fact that the Stream head and Stream tail are non-NULL...
496 // This must be called with locks held.
498 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
499 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::link_i (ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
> &us
)
501 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link_i");
502 this->linked_us_
= &us
;
503 // Make sure the other side is also linked to us!
504 us
.linked_us_
= this;
506 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *my_tail
= this->stream_head_
;
511 // Locate the module just above our Stream tail.
512 while (my_tail
->next () != this->stream_tail_
)
513 my_tail
= my_tail
->next ();
515 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *other_tail
= us
.stream_head_
;
520 // Locate the module just above the other Stream's tail.
521 while (other_tail
->next () != us
.stream_tail_
)
522 other_tail
= other_tail
->next ();
524 // Reattach the pointers so that the two streams are linked!
525 my_tail
->writer ()->next (other_tail
->reader ());
526 other_tail
->writer ()->next (my_tail
->reader ());
530 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
531 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::link (ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
> &us
)
533 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link");
535 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
537 return this->link_i (us
);
540 // Must be called with locks held...
542 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
543 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::unlink_i ()
545 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink_i");
547 // Only try to unlink if we are in fact still linked!
549 if (this->linked_us_
!= 0)
551 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *my_tail
= this->stream_head_
;
553 // Only relink if we still exist!
556 // Find the module that's just before our stream tail.
557 while (my_tail
->next () != this->stream_tail_
)
558 my_tail
= my_tail
->next ();
560 // Restore the writer's next() link to our tail.
561 my_tail
->writer ()->next (this->stream_tail_
->writer ());
564 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *other_tail
=
565 this->linked_us_
->stream_head_
;
567 // Only fiddle with the other side if it in fact still remains.
570 while (other_tail
->next () != this->linked_us_
->stream_tail_
)
571 other_tail
= other_tail
->next ();
573 other_tail
->writer ()->next (this->linked_us_
->stream_tail_
->writer ());
576 // Make sure the other side is also aware that it's been unlinked!
577 this->linked_us_
->linked_us_
= 0;
579 this->linked_us_
= 0;
586 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
587 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::unlink ()
589 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink");
590 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
591 return this->unlink_i ();
594 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
595 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Stream (void * a
,
596 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *head
,
597 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *tail
)
601 #if defined (ACE_HAS_THREADS)
602 final_close_ (lock_
, cond_attr_
)
607 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream");
608 if (this->open (a
, head
, tail
) == -1)
609 ACELIB_ERROR ((LM_ERROR
,
610 ACE_TEXT ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open (%s, %s)\n"),
611 head
->name (), tail
->name ()));
614 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
615 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::~ACE_Stream ()
617 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Stream");
619 if (this->stream_head_
!= 0)
623 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
624 ACE_Stream_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Stream_Iterator (const ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
> &sr
)
625 : next_ (sr
.stream_head_
)
627 ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream_Iterator");
630 ACE_END_VERSIONED_NAMESPACE_DECL
632 #endif /* ACE_STREAM_CPP */