Correct feature names
[ACE_TAO.git] / ACE / ace / Stream.cpp
blob47315c40131bd74cc4885207318b7af1ee3ae9d0
1 // Stream.cpp
2 #ifndef ACE_STREAM_CPP
3 #define ACE_STREAM_CPP
6 //#include "ace/Module.h"
7 #include "ace/Stream.h"
9 #if !defined (ACE_LACKS_PRAGMA_ONCE)
10 # pragma once
11 #endif /* ACE_LACKS_PRAGMA_ONCE */
13 #include "ace/Stream_Modules.h"
14 #include "ace/OS_NS_string.h"
16 #if !defined (__ACE_INLINE__)
17 #include "ace/Stream.inl"
18 #endif /* __ACE_INLINE__ */
20 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
22 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Stream)
24 // Give some idea of what the heck is going on in a stream!
26 template <ACE_SYNCH_DECL, class TIME_POLICY> void
27 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::dump (void) const
29 #if defined (ACE_HAS_DUMP)
30 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::dump");
31 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- module links --------\n")));
33 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mp = this->stream_head_;
35 mp = mp->next ())
37 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("module name = %s\n"), mp->name ()));
38 if (mp == this->stream_tail_)
39 break;
42 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- writer links --------\n")));
44 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *tp;
46 for (tp = this->stream_head_->writer ();
48 tp = tp->next ())
50 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("writer queue name = %s\n"), tp->name ()));
51 tp->dump ();
52 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------\n")));
53 if (tp == this->stream_tail_->writer ()
54 || (this->linked_us_
55 && tp == this->linked_us_->stream_head_->reader ()))
56 break;
59 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- reader links --------\n")));
60 for (tp = this->stream_tail_->reader (); ; tp = tp->next ())
62 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("reader queue name = %s\n"), tp->name ()));
63 tp->dump ();
64 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------\n")));
65 if (tp == this->stream_head_->reader ()
66 || (this->linked_us_
67 && tp == this->linked_us_->stream_head_->writer ()))
68 break;
70 #endif /* ACE_HAS_DUMP */
73 template <ACE_SYNCH_DECL, class TIME_POLICY> int
74 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *new_top)
76 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push");
77 if (this->push_module (new_top,
78 this->stream_head_->next (),
79 this->stream_head_) == -1)
80 return -1;
81 else
82 return 0;
85 template <ACE_SYNCH_DECL, class TIME_POLICY> int
86 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
88 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::put");
89 return this->stream_head_->writer ()->put (mb, tv);
92 template <ACE_SYNCH_DECL, class TIME_POLICY> int
93 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::get (ACE_Message_Block *&mb, ACE_Time_Value *tv)
95 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::get");
96 return this->stream_head_->reader ()->getq (mb, tv);
99 // Return the "top" ACE_Module in a ACE_Stream, skipping over the
100 // stream_head.
102 template <ACE_SYNCH_DECL, class TIME_POLICY> int
103 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::top (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *&m)
105 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::top");
106 if (this->stream_head_->next () == this->stream_tail_)
107 return -1;
108 else
110 m = this->stream_head_->next ();
111 return 0;
115 template <ACE_SYNCH_DECL, class TIME_POLICY> int
116 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::insert (const ACE_TCHAR *prev_name,
117 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod)
119 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::insert");
121 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *prev_mod = this->stream_head_;
122 prev_mod != 0;
123 prev_mod = prev_mod->next ())
124 if (ACE_OS::strcmp (prev_mod->name (), prev_name) == 0)
126 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *next_mod = prev_mod->next ();
128 // We can't insert a module below <stream_tail_>.
129 if (next_mod == 0)
130 return -1;
132 mod->link (next_mod);
133 prev_mod->link (mod);
135 if (mod->reader ()->open (mod->arg ()) == -1)
136 return -1;
138 if (mod->writer ()->open (mod->arg ()) == -1)
139 return -1;
141 return 0;
144 return -1;
147 template <ACE_SYNCH_DECL, class TIME_POLICY> int
148 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::replace (const ACE_TCHAR *replace_name,
149 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod,
150 int flags)
152 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::replace");
153 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *prev_mod = 0;
155 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *rep_mod = this->stream_head_;
156 rep_mod != 0;
157 rep_mod = rep_mod->next ())
158 if (ACE_OS::strcmp (rep_mod->name (), replace_name) == 0)
160 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *next_mod = rep_mod->next ();
162 if (next_mod)
163 mod->link (next_mod);
164 else // In case the <next_mod> is <stream_tail_>.
166 mod->writer ()->next (0);
167 mod->next (0);
168 this->stream_tail_ = mod;
171 if (prev_mod)
172 prev_mod->link (mod);
173 else // In case the <rep_mod> is <stream_head_>.
175 mod->reader ()->next (0);
176 this->stream_head_ = mod;
179 if (mod->reader ()->open (mod->arg ()) == -1)
180 return -1;
182 if (mod->writer ()->open (mod->arg ()) == -1)
183 return -1;
185 if (flags != ACE_Module<ACE_SYNCH_USE, TIME_POLICY>::M_DELETE_NONE)
187 rep_mod->close (flags);
188 delete rep_mod;
191 return 0;
193 else
194 prev_mod = rep_mod;
196 return -1;
199 // Remove the "top" ACE_Module in a ACE_Stream, skipping over the
200 // stream_head.
202 template <ACE_SYNCH_DECL, class TIME_POLICY> int
203 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::pop (int flags)
205 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::pop");
206 if (this->stream_head_->next () == this->stream_tail_)
207 return -1;
208 else
210 // Skip over the ACE_Stream head.
211 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *top_mod = this->stream_head_->next ();
212 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *new_top = top_mod->next ();
214 this->stream_head_->next (new_top);
216 // Close the top ACE_Module.
218 top_mod->close (flags);
220 // Don't delete the Module unless the flags request this.
221 if (flags != ACE_Module<ACE_SYNCH_USE, TIME_POLICY>::M_DELETE_NONE)
222 delete top_mod;
224 this->stream_head_->writer ()->next (new_top->writer ());
225 new_top->reader ()->next (this->stream_head_->reader ());
226 return 0;
230 // Remove a named ACE_Module from an arbitrary place in the
231 // ACE_Stream.
233 template <ACE_SYNCH_DECL, class TIME_POLICY> int
234 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::remove (const ACE_TCHAR *name,
235 int flags)
237 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::remove");
238 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *prev = 0;
240 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod = this->stream_head_;
241 mod != 0;
242 mod = mod->next ())
244 #ifndef ACE_NLOGGING
245 if (ACE::debug ())
247 ACELIB_DEBUG ((LM_DEBUG,
248 ACE_TEXT ("ACE_Stream::remove - comparing existing module :%s: with :%s:\n"),
249 mod->name (),
250 name));
252 #endif
254 if (ACE_OS::strcmp (mod->name (), name) == 0)
256 if (prev == 0) // Deleting ACE_Stream Head
257 this->stream_head_->link (mod->next ());
258 else
259 prev->link (mod->next ());
261 // Close down the module.
262 mod->close (flags);
264 // Don't delete the Module unless the flags request this.
265 if (flags != ACE_Module<ACE_SYNCH_USE, TIME_POLICY>::M_DELETE_NONE)
267 // Release the memory.
268 delete mod;
271 return 0;
273 else
274 prev = mod;
277 ACELIB_DEBUG ((LM_WARNING, ACE_TEXT ("ACE_Stream::remove failed to find module with name %s to remove\n"),name));
278 return -1;
281 template <ACE_SYNCH_DECL, class TIME_POLICY> ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *
282 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::find (const ACE_TCHAR *name)
284 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::find");
285 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod = this->stream_head_;
286 mod != 0;
287 mod = mod->next ())
288 if (ACE_OS::strcmp (mod->name (), name) == 0)
289 return mod;
291 return 0;
294 // Actually push a module onto the stack...
296 template <ACE_SYNCH_DECL, class TIME_POLICY> int
297 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push_module (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *new_top,
298 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *current_top,
299 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head)
301 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push_module");
302 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *nt_reader = new_top->reader ();
303 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *nt_writer = new_top->writer ();
304 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *ct_reader = 0;
305 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *ct_writer = 0;
307 if (current_top)
309 ct_reader = current_top->reader ();
310 ct_writer = current_top->writer ();
311 ct_reader->next (nt_reader);
314 nt_writer->next (ct_writer);
316 if (head)
318 if (head != new_top)
319 head->link (new_top);
321 else
322 nt_reader->next (0);
324 new_top->next (current_top);
326 if (nt_reader->open (new_top->arg ()) == -1)
327 return -1;
329 if (nt_writer->open (new_top->arg ()) == -1)
330 return -1;
331 return 0;
334 template <ACE_SYNCH_DECL, class TIME_POLICY> int
335 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open (void *a,
336 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head,
337 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *tail)
339 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open");
340 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
342 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *h1 = 0, *h2 = 0;
343 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *t1 = 0, *t2 = 0;
345 if (head == 0)
347 typedef ACE_Stream_Head<ACE_SYNCH_USE, TIME_POLICY> STREAM_HEAD_TYPE;
348 ACE_NEW_NORETURN (h1,
349 STREAM_HEAD_TYPE);
350 ACE_NEW_NORETURN (h2,
351 STREAM_HEAD_TYPE);
352 typedef ACE_Module<ACE_SYNCH_USE, TIME_POLICY> MODULE_TYPE;
353 ACE_NEW_NORETURN (head,
354 MODULE_TYPE (ACE_TEXT ("ACE_Stream_Head"),
355 h1, h2,
357 M_DELETE));
360 if (tail == 0)
362 typedef ACE_Stream_Tail<ACE_SYNCH_USE, TIME_POLICY> STREAM_TAIL_TYPE;
363 ACE_NEW_NORETURN (t1,
364 STREAM_TAIL_TYPE);
365 ACE_NEW_NORETURN (t2,
366 STREAM_TAIL_TYPE);
367 typedef ACE_Module<ACE_SYNCH_USE, TIME_POLICY> MODULE_TYPE;
368 ACE_NEW_NORETURN (tail,
369 MODULE_TYPE (ACE_TEXT ("ACE_Stream_Tail"),
370 t1, t2,
372 M_DELETE));
375 // Make sure *all* the allocation succeeded!
376 if ((head == 0 && (h1 == 0 || h2 == 0))
377 || (tail == 0 && (t1 == 0 || t2 == 0)))
379 delete h1;
380 delete h2;
381 delete t1;
382 delete t2;
383 delete head;
384 delete tail;
385 errno = ENOMEM;
386 return -1;
389 this->stream_head_ = head;
390 this->stream_tail_ = tail;
392 if (this->push_module (this->stream_tail_) == -1)
393 return -1;
394 else if (this->push_module (this->stream_head_,
395 this->stream_tail_,
396 this->stream_head_) == -1)
397 return -1;
399 return 0;
402 template <ACE_SYNCH_DECL, class TIME_POLICY> int
403 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::close (int flags)
405 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::close");
406 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
408 if (this->stream_head_ != 0
409 && this->stream_tail_ != 0)
411 // Don't bother checking return value here.
412 this->unlink_i ();
414 int result = 0;
416 // Remove and cleanup all the intermediate modules.
418 while (this->stream_head_->next () != this->stream_tail_)
419 if (this->pop (flags) == -1)
420 result = -1;
422 // Clean up the head and tail of the stream.
423 if (this->stream_head_->close (flags) == -1)
424 result = -1;
425 if (this->stream_tail_->close (flags) == -1)
426 result = -1;
428 // Cleanup the memory.
429 delete this->stream_head_;
430 delete this->stream_tail_;
432 this->stream_head_ = 0;
433 this->stream_tail_ = 0;
435 // Tell all threads waiting on the close that we are done.
436 this->final_close_.broadcast ();
437 return result;
439 return 0;
442 template <ACE_SYNCH_DECL, class TIME_POLICY> int
443 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
444 void *a)
446 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::control");
447 ACE_IO_Cntl_Msg ioc (cmd);
449 ACE_Message_Block *db = 0;
451 // Try to create a data block that contains the user-supplied data.
452 ACE_NEW_RETURN (db,
453 ACE_Message_Block (sizeof (int),
454 ACE_Message_Block::MB_IOCTL,
456 (char *) a),
457 -1);
458 // Try to create a control block <cb> that contains the control
459 // field and a pointer to the data block <db> in <cb>'s continuation
460 // field.
461 ACE_Message_Block *cb = 0;
463 ACE_NEW_NORETURN (cb,
464 ACE_Message_Block (sizeof ioc,
465 ACE_Message_Block::MB_IOCTL,
467 (char *) &ioc));
468 // @@ Michael: The old semantic assumed that cb returns == 0
469 // if no memory was available. We will now return immediately
470 // without release (errno is set to ENOMEM by the macro).
472 // If we can't allocate <cb> then we need to delete db and return
473 // -1.
474 if (cb == 0)
476 db->release ();
477 errno = ENOMEM;
478 return -1;
481 int result;
483 if (this->stream_head_->writer ()->put (cb) == -1)
484 result = -1;
485 else if (this->stream_head_->reader ()->getq (cb) == -1)
486 result = -1;
487 else
488 result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
490 // This will also release db if it's reference count == 0.
491 cb->release ();
493 return result;
496 // Link two streams together at their bottom-most Modules (i.e., the
497 // one just above the Stream tail). Note that all of this is premised
498 // on the fact that the Stream head and Stream tail are non-NULL...
499 // This must be called with locks held.
501 template <ACE_SYNCH_DECL, class TIME_POLICY> int
502 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link_i (ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &us)
504 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link_i");
505 this->linked_us_ = &us;
506 // Make sure the other side is also linked to us!
507 us.linked_us_ = this;
509 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *my_tail = this->stream_head_;
511 if (my_tail == 0)
512 return -1;
514 // Locate the module just above our Stream tail.
515 while (my_tail->next () != this->stream_tail_)
516 my_tail = my_tail->next ();
518 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *other_tail = us.stream_head_;
520 if (other_tail == 0)
521 return -1;
523 // Locate the module just above the other Stream's tail.
524 while (other_tail->next () != us.stream_tail_)
525 other_tail = other_tail->next ();
527 // Reattach the pointers so that the two streams are linked!
528 my_tail->writer ()->next (other_tail->reader ());
529 other_tail->writer ()->next (my_tail->reader ());
530 return 0;
533 template <ACE_SYNCH_DECL, class TIME_POLICY> int
534 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link (ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &us)
536 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link");
538 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
540 return this->link_i (us);
543 // Must be called with locks held...
545 template <ACE_SYNCH_DECL, class TIME_POLICY> int
546 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink_i (void)
548 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink_i");
550 // Only try to unlink if we are in fact still linked!
552 if (this->linked_us_ != 0)
554 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *my_tail = this->stream_head_;
556 // Only relink if we still exist!
557 if (my_tail)
559 // Find the module that's just before our stream tail.
560 while (my_tail->next () != this->stream_tail_)
561 my_tail = my_tail->next ();
563 // Restore the writer's next() link to our tail.
564 my_tail->writer ()->next (this->stream_tail_->writer ());
567 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *other_tail =
568 this->linked_us_->stream_head_;
570 // Only fiddle with the other side if it in fact still remains.
571 if (other_tail != 0)
573 while (other_tail->next () != this->linked_us_->stream_tail_)
574 other_tail = other_tail->next ();
576 other_tail->writer ()->next (this->linked_us_->stream_tail_->writer ());
580 // Make sure the other side is also aware that it's been unlinked!
581 this->linked_us_->linked_us_ = 0;
583 this->linked_us_ = 0;
584 return 0;
586 else
587 return -1;
590 template <ACE_SYNCH_DECL, class TIME_POLICY> int
591 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink (void)
593 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink");
594 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
595 return this->unlink_i ();
598 template <ACE_SYNCH_DECL, class TIME_POLICY>
599 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream (void * a,
600 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head,
601 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *tail)
602 : stream_head_ (0),
603 stream_tail_ (0),
604 linked_us_ (0),
605 #if defined (ACE_HAS_THREADS)
606 final_close_ (lock_, cond_attr_)
607 #else
608 final_close_ (lock_)
609 #endif
611 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream");
612 if (this->open (a, head, tail) == -1)
613 ACELIB_ERROR ((LM_ERROR,
614 ACE_TEXT ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open (%s, %s)\n"),
615 head->name (), tail->name ()));
618 template <ACE_SYNCH_DECL, class TIME_POLICY>
619 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Stream (void)
621 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Stream");
623 if (this->stream_head_ != 0)
624 this->close ();
627 template <ACE_SYNCH_DECL, class TIME_POLICY>
628 ACE_Stream_Iterator<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &sr)
629 : next_ (sr.stream_head_)
631 ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream_Iterator");
634 ACE_END_VERSIONED_NAMESPACE_DECL
636 #endif /* ACE_STREAM_CPP */