6 //#include "ace/Module.h"
7 #include "ace/Stream.h"
9 #if !defined (ACE_LACKS_PRAGMA_ONCE)
11 #endif /* ACE_LACKS_PRAGMA_ONCE */
13 #include "ace/Stream_Modules.h"
14 #include "ace/OS_NS_string.h"
16 #if !defined (__ACE_INLINE__)
17 #include "ace/Stream.inl"
18 #endif /* __ACE_INLINE__ */
20 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
22 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Stream
)
24 // Give some idea of what the heck is going on in a stream!
26 template <ACE_SYNCH_DECL
, class TIME_POLICY
> void
27 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::dump (void) const
29 #if defined (ACE_HAS_DUMP)
30 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::dump");
31 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------- module links --------\n")));
33 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mp
= this->stream_head_
;
37 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("module name = %s\n"), mp
->name ()));
38 if (mp
== this->stream_tail_
)
42 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------- writer links --------\n")));
44 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *tp
;
46 for (tp
= this->stream_head_
->writer ();
50 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("writer queue name = %s\n"), tp
->name ()));
52 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------\n")));
53 if (tp
== this->stream_tail_
->writer ()
55 && tp
== this->linked_us_
->stream_head_
->reader ()))
59 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------- reader links --------\n")));
60 for (tp
= this->stream_tail_
->reader (); ; tp
= tp
->next ())
62 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("reader queue name = %s\n"), tp
->name ()));
64 ACELIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("-------\n")));
65 if (tp
== this->stream_head_
->reader ()
67 && tp
== this->linked_us_
->stream_head_
->writer ()))
70 #endif /* ACE_HAS_DUMP */
73 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
74 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::push (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *new_top
)
76 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push");
77 if (this->push_module (new_top
,
78 this->stream_head_
->next (),
79 this->stream_head_
) == -1)
85 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
86 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
88 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::put");
89 return this->stream_head_
->writer ()->put (mb
, tv
);
92 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
93 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::get (ACE_Message_Block
*&mb
, ACE_Time_Value
*tv
)
95 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::get");
96 return this->stream_head_
->reader ()->getq (mb
, tv
);
99 // Return the "top" ACE_Module in a ACE_Stream, skipping over the
102 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
103 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::top (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *&m
)
105 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::top");
106 if (this->stream_head_
->next () == this->stream_tail_
)
110 m
= this->stream_head_
->next ();
115 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
116 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::insert (const ACE_TCHAR
*prev_name
,
117 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mod
)
119 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::insert");
121 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *prev_mod
= this->stream_head_
;
123 prev_mod
= prev_mod
->next ())
124 if (ACE_OS::strcmp (prev_mod
->name (), prev_name
) == 0)
126 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *next_mod
= prev_mod
->next ();
128 // We can't insert a module below <stream_tail_>.
132 mod
->link (next_mod
);
133 prev_mod
->link (mod
);
135 if (mod
->reader ()->open (mod
->arg ()) == -1)
138 if (mod
->writer ()->open (mod
->arg ()) == -1)
147 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
148 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::replace (const ACE_TCHAR
*replace_name
,
149 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mod
,
152 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::replace");
153 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *prev_mod
= 0;
155 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *rep_mod
= this->stream_head_
;
157 rep_mod
= rep_mod
->next ())
158 if (ACE_OS::strcmp (rep_mod
->name (), replace_name
) == 0)
160 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *next_mod
= rep_mod
->next ();
163 mod
->link (next_mod
);
164 else // In case the <next_mod> is <stream_tail_>.
166 mod
->writer ()->next (0);
168 this->stream_tail_
= mod
;
172 prev_mod
->link (mod
);
173 else // In case the <rep_mod> is <stream_head_>.
175 mod
->reader ()->next (0);
176 this->stream_head_
= mod
;
179 if (mod
->reader ()->open (mod
->arg ()) == -1)
182 if (mod
->writer ()->open (mod
->arg ()) == -1)
185 if (flags
!= ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
>::M_DELETE_NONE
)
187 rep_mod
->close (flags
);
199 // Remove the "top" ACE_Module in a ACE_Stream, skipping over the
202 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
203 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::pop (int flags
)
205 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::pop");
206 if (this->stream_head_
->next () == this->stream_tail_
)
210 // Skip over the ACE_Stream head.
211 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *top_mod
= this->stream_head_
->next ();
212 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *new_top
= top_mod
->next ();
214 this->stream_head_
->next (new_top
);
216 // Close the top ACE_Module.
218 top_mod
->close (flags
);
220 // Don't delete the Module unless the flags request this.
221 if (flags
!= ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
>::M_DELETE_NONE
)
224 this->stream_head_
->writer ()->next (new_top
->writer ());
225 new_top
->reader ()->next (this->stream_head_
->reader ());
230 // Remove a named ACE_Module from an arbitrary place in the
233 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
234 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::remove (const ACE_TCHAR
*name
,
237 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::remove");
238 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *prev
= 0;
240 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mod
= this->stream_head_
;
247 ACELIB_DEBUG ((LM_DEBUG
,
248 ACE_TEXT ("ACE_Stream::remove - comparing existing module :%s: with :%s:\n"),
254 if (ACE_OS::strcmp (mod
->name (), name
) == 0)
256 if (prev
== 0) // Deleting ACE_Stream Head
257 this->stream_head_
->link (mod
->next ());
259 prev
->link (mod
->next ());
261 // Close down the module.
264 // Don't delete the Module unless the flags request this.
265 if (flags
!= ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
>::M_DELETE_NONE
)
267 // Release the memory.
277 ACELIB_DEBUG ((LM_WARNING
, ACE_TEXT ("ACE_Stream::remove failed to find module with name %s to remove\n"),name
));
281 template <ACE_SYNCH_DECL
, class TIME_POLICY
> ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *
282 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::find (const ACE_TCHAR
*name
)
284 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::find");
285 for (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *mod
= this->stream_head_
;
288 if (ACE_OS::strcmp (mod
->name (), name
) == 0)
294 // Actually push a module onto the stack...
296 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
297 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::push_module (ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *new_top
,
298 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *current_top
,
299 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *head
)
301 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push_module");
302 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *nt_reader
= new_top
->reader ();
303 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *nt_writer
= new_top
->writer ();
304 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *ct_reader
= 0;
305 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *ct_writer
= 0;
309 ct_reader
= current_top
->reader ();
310 ct_writer
= current_top
->writer ();
311 ct_reader
->next (nt_reader
);
314 nt_writer
->next (ct_writer
);
319 head
->link (new_top
);
324 new_top
->next (current_top
);
326 if (nt_reader
->open (new_top
->arg ()) == -1)
329 if (nt_writer
->open (new_top
->arg ()) == -1)
334 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
335 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::open (void *a
,
336 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *head
,
337 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *tail
)
339 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open");
340 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
342 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *h1
= 0, *h2
= 0;
343 ACE_Task
<ACE_SYNCH_USE
, TIME_POLICY
> *t1
= 0, *t2
= 0;
347 typedef ACE_Stream_Head
<ACE_SYNCH_USE
, TIME_POLICY
> STREAM_HEAD_TYPE
;
348 ACE_NEW_NORETURN (h1
,
350 ACE_NEW_NORETURN (h2
,
352 typedef ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> MODULE_TYPE
;
353 ACE_NEW_NORETURN (head
,
354 MODULE_TYPE (ACE_TEXT ("ACE_Stream_Head"),
362 typedef ACE_Stream_Tail
<ACE_SYNCH_USE
, TIME_POLICY
> STREAM_TAIL_TYPE
;
363 ACE_NEW_NORETURN (t1
,
365 ACE_NEW_NORETURN (t2
,
367 typedef ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> MODULE_TYPE
;
368 ACE_NEW_NORETURN (tail
,
369 MODULE_TYPE (ACE_TEXT ("ACE_Stream_Tail"),
375 // Make sure *all* the allocation succeeded!
376 if ((head
== 0 && (h1
== 0 || h2
== 0))
377 || (tail
== 0 && (t1
== 0 || t2
== 0)))
389 this->stream_head_
= head
;
390 this->stream_tail_
= tail
;
392 if (this->push_module (this->stream_tail_
) == -1)
394 else if (this->push_module (this->stream_head_
,
396 this->stream_head_
) == -1)
402 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
403 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::close (int flags
)
405 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::close");
406 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
408 if (this->stream_head_
!= 0
409 && this->stream_tail_
!= 0)
411 // Don't bother checking return value here.
416 // Remove and cleanup all the intermediate modules.
418 while (this->stream_head_
->next () != this->stream_tail_
)
419 if (this->pop (flags
) == -1)
422 // Clean up the head and tail of the stream.
423 if (this->stream_head_
->close (flags
) == -1)
425 if (this->stream_tail_
->close (flags
) == -1)
428 // Cleanup the memory.
429 delete this->stream_head_
;
430 delete this->stream_tail_
;
432 this->stream_head_
= 0;
433 this->stream_tail_
= 0;
435 // Tell all threads waiting on the close that we are done.
436 this->final_close_
.broadcast ();
442 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
443 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd
,
446 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::control");
447 ACE_IO_Cntl_Msg
ioc (cmd
);
449 ACE_Message_Block
*db
= 0;
451 // Try to create a data block that contains the user-supplied data.
453 ACE_Message_Block (sizeof (int),
454 ACE_Message_Block::MB_IOCTL
,
458 // Try to create a control block <cb> that contains the control
459 // field and a pointer to the data block <db> in <cb>'s continuation
461 ACE_Message_Block
*cb
= 0;
463 ACE_NEW_NORETURN (cb
,
464 ACE_Message_Block (sizeof ioc
,
465 ACE_Message_Block::MB_IOCTL
,
468 // @@ Michael: The old semantic assumed that cb returns == 0
469 // if no memory was available. We will now return immediately
470 // without release (errno is set to ENOMEM by the macro).
472 // If we can't allocate <cb> then we need to delete db and return
483 if (this->stream_head_
->writer ()->put (cb
) == -1)
485 else if (this->stream_head_
->reader ()->getq (cb
) == -1)
488 result
= ((ACE_IO_Cntl_Msg
*) cb
->rd_ptr ())->rval ();
490 // This will also release db if it's reference count == 0.
496 // Link two streams together at their bottom-most Modules (i.e., the
497 // one just above the Stream tail). Note that all of this is premised
498 // on the fact that the Stream head and Stream tail are non-NULL...
499 // This must be called with locks held.
501 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
502 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::link_i (ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
> &us
)
504 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link_i");
505 this->linked_us_
= &us
;
506 // Make sure the other side is also linked to us!
507 us
.linked_us_
= this;
509 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *my_tail
= this->stream_head_
;
514 // Locate the module just above our Stream tail.
515 while (my_tail
->next () != this->stream_tail_
)
516 my_tail
= my_tail
->next ();
518 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *other_tail
= us
.stream_head_
;
523 // Locate the module just above the other Stream's tail.
524 while (other_tail
->next () != us
.stream_tail_
)
525 other_tail
= other_tail
->next ();
527 // Reattach the pointers so that the two streams are linked!
528 my_tail
->writer ()->next (other_tail
->reader ());
529 other_tail
->writer ()->next (my_tail
->reader ());
533 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
534 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::link (ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
> &us
)
536 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link");
538 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
540 return this->link_i (us
);
543 // Must be called with locks held...
545 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
546 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::unlink_i (void)
548 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink_i");
550 // Only try to unlink if we are in fact still linked!
552 if (this->linked_us_
!= 0)
554 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *my_tail
= this->stream_head_
;
556 // Only relink if we still exist!
559 // Find the module that's just before our stream tail.
560 while (my_tail
->next () != this->stream_tail_
)
561 my_tail
= my_tail
->next ();
563 // Restore the writer's next() link to our tail.
564 my_tail
->writer ()->next (this->stream_tail_
->writer ());
567 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *other_tail
=
568 this->linked_us_
->stream_head_
;
570 // Only fiddle with the other side if it in fact still remains.
573 while (other_tail
->next () != this->linked_us_
->stream_tail_
)
574 other_tail
= other_tail
->next ();
576 other_tail
->writer ()->next (this->linked_us_
->stream_tail_
->writer ());
580 // Make sure the other side is also aware that it's been unlinked!
581 this->linked_us_
->linked_us_
= 0;
583 this->linked_us_
= 0;
590 template <ACE_SYNCH_DECL
, class TIME_POLICY
> int
591 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::unlink (void)
593 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink");
594 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T
, ace_mon
, this->lock_
, -1);
595 return this->unlink_i ();
598 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
599 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Stream (void * a
,
600 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *head
,
601 ACE_Module
<ACE_SYNCH_USE
, TIME_POLICY
> *tail
)
605 #if defined (ACE_HAS_THREADS)
606 final_close_ (lock_
, cond_attr_
)
611 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream");
612 if (this->open (a
, head
, tail
) == -1)
613 ACELIB_ERROR ((LM_ERROR
,
614 ACE_TEXT ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open (%s, %s)\n"),
615 head
->name (), tail
->name ()));
618 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
619 ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
>::~ACE_Stream (void)
621 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Stream");
623 if (this->stream_head_
!= 0)
627 template <ACE_SYNCH_DECL
, class TIME_POLICY
>
628 ACE_Stream_Iterator
<ACE_SYNCH_USE
, TIME_POLICY
>::ACE_Stream_Iterator (const ACE_Stream
<ACE_SYNCH_USE
, TIME_POLICY
> &sr
)
629 : next_ (sr
.stream_head_
)
631 ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream_Iterator");
634 ACE_END_VERSIONED_NAMESPACE_DECL
636 #endif /* ACE_STREAM_CPP */