Use override/default for RTPortableServer
[ACE_TAO.git] / ACE / ace / Stream.cpp
blobd799bc3bc4da3775b00fd83689579fe9b3a05640
1 #ifndef ACE_STREAM_CPP
2 #define ACE_STREAM_CPP
4 #include "ace/Stream.h"
6 #if !defined (ACE_LACKS_PRAGMA_ONCE)
7 # pragma once
8 #endif /* ACE_LACKS_PRAGMA_ONCE */
10 #include "ace/Stream_Modules.h"
11 #include "ace/OS_NS_string.h"
13 #if !defined (__ACE_INLINE__)
14 #include "ace/Stream.inl"
15 #endif /* __ACE_INLINE__ */
17 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
19 ACE_ALLOC_HOOK_DEFINE_Tyc(ACE_Stream)
21 // Give some idea of what the heck is going on in a stream!
23 template <ACE_SYNCH_DECL, class TIME_POLICY> void
24 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::dump () const
26 #if defined (ACE_HAS_DUMP)
27 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::dump");
28 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- module links --------\n")));
30 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mp = this->stream_head_;
32 mp = mp->next ())
34 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("module name = %s\n"), mp->name ()));
35 if (mp == this->stream_tail_)
36 break;
39 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- writer links --------\n")));
41 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *tp;
43 for (tp = this->stream_head_->writer ();
45 tp = tp->next ())
47 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("writer queue name = %s\n"), tp->name ()));
48 tp->dump ();
49 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------\n")));
50 if (tp == this->stream_tail_->writer ()
51 || (this->linked_us_
52 && tp == this->linked_us_->stream_head_->reader ()))
53 break;
56 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- reader links --------\n")));
57 for (tp = this->stream_tail_->reader (); ; tp = tp->next ())
59 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("reader queue name = %s\n"), tp->name ()));
60 tp->dump ();
61 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("-------\n")));
62 if (tp == this->stream_head_->reader ()
63 || (this->linked_us_
64 && tp == this->linked_us_->stream_head_->writer ()))
65 break;
67 #endif /* ACE_HAS_DUMP */
70 template <ACE_SYNCH_DECL, class TIME_POLICY> int
71 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *new_top)
73 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push");
74 if (this->push_module (new_top,
75 this->stream_head_->next (),
76 this->stream_head_) == -1)
77 return -1;
78 else
79 return 0;
82 template <ACE_SYNCH_DECL, class TIME_POLICY> int
83 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
85 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::put");
86 return this->stream_head_->writer ()->put (mb, tv);
89 template <ACE_SYNCH_DECL, class TIME_POLICY> int
90 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::get (ACE_Message_Block *&mb, ACE_Time_Value *tv)
92 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::get");
93 return this->stream_head_->reader ()->getq (mb, tv);
96 // Return the "top" ACE_Module in a ACE_Stream, skipping over the
97 // stream_head.
99 template <ACE_SYNCH_DECL, class TIME_POLICY> int
100 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::top (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *&m)
102 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::top");
103 if (this->stream_head_->next () == this->stream_tail_)
104 return -1;
105 else
107 m = this->stream_head_->next ();
108 return 0;
112 template <ACE_SYNCH_DECL, class TIME_POLICY> int
113 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::insert (const ACE_TCHAR *prev_name,
114 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod)
116 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::insert");
118 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *prev_mod = this->stream_head_;
119 prev_mod != 0;
120 prev_mod = prev_mod->next ())
121 if (ACE_OS::strcmp (prev_mod->name (), prev_name) == 0)
123 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *next_mod = prev_mod->next ();
125 // We can't insert a module below <stream_tail_>.
126 if (next_mod == 0)
127 return -1;
129 mod->link (next_mod);
130 prev_mod->link (mod);
132 if (mod->reader ()->open (mod->arg ()) == -1)
133 return -1;
135 if (mod->writer ()->open (mod->arg ()) == -1)
136 return -1;
138 return 0;
141 return -1;
144 template <ACE_SYNCH_DECL, class TIME_POLICY> int
145 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::replace (const ACE_TCHAR *replace_name,
146 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod,
147 int flags)
149 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::replace");
150 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *prev_mod = 0;
152 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *rep_mod = this->stream_head_;
153 rep_mod != 0;
154 rep_mod = rep_mod->next ())
155 if (ACE_OS::strcmp (rep_mod->name (), replace_name) == 0)
157 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *next_mod = rep_mod->next ();
159 if (next_mod)
160 mod->link (next_mod);
161 else // In case the <next_mod> is <stream_tail_>.
163 mod->writer ()->next (0);
164 mod->next (0);
165 this->stream_tail_ = mod;
168 if (prev_mod)
169 prev_mod->link (mod);
170 else // In case the <rep_mod> is <stream_head_>.
172 mod->reader ()->next (0);
173 this->stream_head_ = mod;
176 if (mod->reader ()->open (mod->arg ()) == -1)
177 return -1;
179 if (mod->writer ()->open (mod->arg ()) == -1)
180 return -1;
182 if (flags != ACE_Module<ACE_SYNCH_USE, TIME_POLICY>::M_DELETE_NONE)
184 rep_mod->close (flags);
185 delete rep_mod;
188 return 0;
190 else
191 prev_mod = rep_mod;
193 return -1;
196 // Remove the "top" ACE_Module in a ACE_Stream, skipping over the
197 // stream_head.
199 template <ACE_SYNCH_DECL, class TIME_POLICY> int
200 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::pop (int flags)
202 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::pop");
203 if (this->stream_head_->next () == this->stream_tail_)
204 return -1;
205 else
207 // Skip over the ACE_Stream head.
208 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *top_mod = this->stream_head_->next ();
209 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *new_top = top_mod->next ();
211 this->stream_head_->next (new_top);
213 // Close the top ACE_Module.
215 top_mod->close (flags);
217 // Don't delete the Module unless the flags request this.
218 if (flags != ACE_Module<ACE_SYNCH_USE, TIME_POLICY>::M_DELETE_NONE)
219 delete top_mod;
221 this->stream_head_->writer ()->next (new_top->writer ());
222 new_top->reader ()->next (this->stream_head_->reader ());
223 return 0;
227 // Remove a named ACE_Module from an arbitrary place in the
228 // ACE_Stream.
230 template <ACE_SYNCH_DECL, class TIME_POLICY> int
231 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::remove (const ACE_TCHAR *name,
232 int flags)
234 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::remove");
235 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *prev = 0;
237 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod = this->stream_head_;
238 mod != 0;
239 mod = mod->next ())
241 #ifndef ACE_NLOGGING
242 if (ACE::debug ())
244 ACELIB_DEBUG ((LM_DEBUG,
245 ACE_TEXT ("ACE_Stream::remove - comparing existing module :%s: with :%s:\n"),
246 mod->name (),
247 name));
249 #endif
251 if (ACE_OS::strcmp (mod->name (), name) == 0)
253 if (prev == 0) // Deleting ACE_Stream Head
254 this->stream_head_->link (mod->next ());
255 else
256 prev->link (mod->next ());
258 // Close down the module.
259 mod->close (flags);
261 // Don't delete the Module unless the flags request this.
262 if (flags != ACE_Module<ACE_SYNCH_USE, TIME_POLICY>::M_DELETE_NONE)
264 // Release the memory.
265 delete mod;
268 return 0;
270 else
271 prev = mod;
274 ACELIB_DEBUG ((LM_WARNING, ACE_TEXT ("ACE_Stream::remove failed to find module with name %s to remove\n"),name));
275 return -1;
278 template <ACE_SYNCH_DECL, class TIME_POLICY> ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *
279 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::find (const ACE_TCHAR *name)
281 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::find");
282 for (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod = this->stream_head_;
283 mod != 0;
284 mod = mod->next ())
285 if (ACE_OS::strcmp (mod->name (), name) == 0)
286 return mod;
288 return 0;
291 // Actually push a module onto the stack...
293 template <ACE_SYNCH_DECL, class TIME_POLICY> int
294 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push_module (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *new_top,
295 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *current_top,
296 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head)
298 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::push_module");
299 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *nt_reader = new_top->reader ();
300 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *nt_writer = new_top->writer ();
301 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *ct_reader = 0;
302 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *ct_writer = 0;
304 if (current_top)
306 ct_reader = current_top->reader ();
307 ct_writer = current_top->writer ();
308 ct_reader->next (nt_reader);
311 nt_writer->next (ct_writer);
313 if (head)
315 if (head != new_top)
316 head->link (new_top);
318 else
319 nt_reader->next (0);
321 new_top->next (current_top);
323 if (nt_reader->open (new_top->arg ()) == -1)
324 return -1;
326 if (nt_writer->open (new_top->arg ()) == -1)
327 return -1;
328 return 0;
331 template <ACE_SYNCH_DECL, class TIME_POLICY> int
332 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open (void *a,
333 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head,
334 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *tail)
336 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open");
337 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
339 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *h1 = 0, *h2 = 0;
340 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *t1 = 0, *t2 = 0;
342 if (head == 0)
344 typedef ACE_Stream_Head<ACE_SYNCH_USE, TIME_POLICY> STREAM_HEAD_TYPE;
345 ACE_NEW_NORETURN (h1,
346 STREAM_HEAD_TYPE);
347 ACE_NEW_NORETURN (h2,
348 STREAM_HEAD_TYPE);
349 typedef ACE_Module<ACE_SYNCH_USE, TIME_POLICY> MODULE_TYPE;
350 ACE_NEW_NORETURN (head,
351 MODULE_TYPE (ACE_TEXT ("ACE_Stream_Head"),
352 h1, h2,
354 M_DELETE));
357 if (tail == 0)
359 typedef ACE_Stream_Tail<ACE_SYNCH_USE, TIME_POLICY> STREAM_TAIL_TYPE;
360 ACE_NEW_NORETURN (t1,
361 STREAM_TAIL_TYPE);
362 ACE_NEW_NORETURN (t2,
363 STREAM_TAIL_TYPE);
364 typedef ACE_Module<ACE_SYNCH_USE, TIME_POLICY> MODULE_TYPE;
365 ACE_NEW_NORETURN (tail,
366 MODULE_TYPE (ACE_TEXT ("ACE_Stream_Tail"),
367 t1, t2,
369 M_DELETE));
372 // Make sure *all* the allocation succeeded!
373 if ((head == 0 && (h1 == 0 || h2 == 0))
374 || (tail == 0 && (t1 == 0 || t2 == 0)))
376 delete h1;
377 delete h2;
378 delete t1;
379 delete t2;
380 delete head;
381 delete tail;
382 errno = ENOMEM;
383 return -1;
386 this->stream_head_ = head;
387 this->stream_tail_ = tail;
389 if (this->push_module (this->stream_tail_) == -1)
390 return -1;
391 else if (this->push_module (this->stream_head_,
392 this->stream_tail_,
393 this->stream_head_) == -1)
394 return -1;
396 return 0;
399 template <ACE_SYNCH_DECL, class TIME_POLICY> int
400 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::close (int flags)
402 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::close");
403 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
405 if (this->stream_head_ != 0
406 && this->stream_tail_ != 0)
408 // Don't bother checking return value here.
409 this->unlink_i ();
411 int result = 0;
413 // Remove and cleanup all the intermediate modules.
415 while (this->stream_head_->next () != this->stream_tail_)
416 if (this->pop (flags) == -1)
417 result = -1;
419 // Clean up the head and tail of the stream.
420 if (this->stream_head_->close (flags) == -1)
421 result = -1;
422 if (this->stream_tail_->close (flags) == -1)
423 result = -1;
425 // Cleanup the memory.
426 delete this->stream_head_;
427 delete this->stream_tail_;
429 this->stream_head_ = 0;
430 this->stream_tail_ = 0;
432 // Tell all threads waiting on the close that we are done.
433 this->final_close_.broadcast ();
434 return result;
436 return 0;
439 template <ACE_SYNCH_DECL, class TIME_POLICY> int
440 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
441 void *a)
443 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::control");
444 ACE_IO_Cntl_Msg ioc (cmd);
446 ACE_Message_Block *db = 0;
448 // Try to create a data block that contains the user-supplied data.
449 ACE_NEW_RETURN (db,
450 ACE_Message_Block (sizeof (int),
451 ACE_Message_Block::MB_IOCTL,
453 (char *) a),
454 -1);
455 // Try to create a control block <cb> that contains the control
456 // field and a pointer to the data block <db> in <cb>'s continuation
457 // field.
458 ACE_Message_Block *cb = 0;
460 ACE_NEW_NORETURN (cb,
461 ACE_Message_Block (sizeof ioc,
462 ACE_Message_Block::MB_IOCTL,
464 (char *) &ioc));
465 // @@ Michael: The old semantic assumed that cb returns == 0
466 // if no memory was available. We will now return immediately
467 // without release (errno is set to ENOMEM by the macro).
469 // If we can't allocate <cb> then we need to delete db and return
470 // -1.
471 if (cb == 0)
473 db->release ();
474 errno = ENOMEM;
475 return -1;
478 int result;
480 if (this->stream_head_->writer ()->put (cb) == -1)
481 result = -1;
482 else if (this->stream_head_->reader ()->getq (cb) == -1)
483 result = -1;
484 else
485 result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
487 // This will also release db if it's reference count == 0.
488 cb->release ();
490 return result;
493 // Link two streams together at their bottom-most Modules (i.e., the
494 // one just above the Stream tail). Note that all of this is premised
495 // on the fact that the Stream head and Stream tail are non-NULL...
496 // This must be called with locks held.
498 template <ACE_SYNCH_DECL, class TIME_POLICY> int
499 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link_i (ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &us)
501 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link_i");
502 this->linked_us_ = &us;
503 // Make sure the other side is also linked to us!
504 us.linked_us_ = this;
506 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *my_tail = this->stream_head_;
508 if (my_tail == 0)
509 return -1;
511 // Locate the module just above our Stream tail.
512 while (my_tail->next () != this->stream_tail_)
513 my_tail = my_tail->next ();
515 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *other_tail = us.stream_head_;
517 if (other_tail == 0)
518 return -1;
520 // Locate the module just above the other Stream's tail.
521 while (other_tail->next () != us.stream_tail_)
522 other_tail = other_tail->next ();
524 // Reattach the pointers so that the two streams are linked!
525 my_tail->writer ()->next (other_tail->reader ());
526 other_tail->writer ()->next (my_tail->reader ());
527 return 0;
530 template <ACE_SYNCH_DECL, class TIME_POLICY> int
531 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link (ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &us)
533 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::link");
535 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
537 return this->link_i (us);
540 // Must be called with locks held...
542 template <ACE_SYNCH_DECL, class TIME_POLICY> int
543 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink_i ()
545 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink_i");
547 // Only try to unlink if we are in fact still linked!
549 if (this->linked_us_ != 0)
551 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *my_tail = this->stream_head_;
553 // Only relink if we still exist!
554 if (my_tail)
556 // Find the module that's just before our stream tail.
557 while (my_tail->next () != this->stream_tail_)
558 my_tail = my_tail->next ();
560 // Restore the writer's next() link to our tail.
561 my_tail->writer ()->next (this->stream_tail_->writer ());
564 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *other_tail =
565 this->linked_us_->stream_head_;
567 // Only fiddle with the other side if it in fact still remains.
568 if (other_tail != 0)
570 while (other_tail->next () != this->linked_us_->stream_tail_)
571 other_tail = other_tail->next ();
573 other_tail->writer ()->next (this->linked_us_->stream_tail_->writer ());
576 // Make sure the other side is also aware that it's been unlinked!
577 this->linked_us_->linked_us_ = 0;
579 this->linked_us_ = 0;
580 return 0;
582 else
583 return -1;
586 template <ACE_SYNCH_DECL, class TIME_POLICY> int
587 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink ()
589 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::unlink");
590 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
591 return this->unlink_i ();
594 template <ACE_SYNCH_DECL, class TIME_POLICY>
595 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream (void * a,
596 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head,
597 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *tail)
598 : stream_head_ (0),
599 stream_tail_ (0),
600 linked_us_ (0),
601 #if defined (ACE_HAS_THREADS)
602 final_close_ (lock_, cond_attr_)
603 #else
604 final_close_ (lock_)
605 #endif
607 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream");
608 if (this->open (a, head, tail) == -1)
609 ACELIB_ERROR ((LM_ERROR,
610 ACE_TEXT ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::open (%s, %s)\n"),
611 head->name (), tail->name ()));
614 template <ACE_SYNCH_DECL, class TIME_POLICY>
615 ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Stream ()
617 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE, TIME_POLICY>::~ACE_Stream");
619 if (this->stream_head_ != 0)
620 this->close ();
623 template <ACE_SYNCH_DECL, class TIME_POLICY>
624 ACE_Stream_Iterator<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &sr)
625 : next_ (sr.stream_head_)
627 ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE, TIME_POLICY>::ACE_Stream_Iterator");
630 ACE_END_VERSIONED_NAMESPACE_DECL
632 #endif /* ACE_STREAM_CPP */