Merge pull request #2301 from sonndinh/remove-dup-reactor-functions
[ACE_TAO.git] / TAO / tests / Bug_2417_Regression / publisher_impl.cpp
blob19e887398db1729283ad4413cbdf435163b9d3a1
1 #include "publisher_impl.h"
2 #include "ace/OS_NS_sys_time.h"
3 #include "ace/Task.h"
4 #include "ace/Process_Mutex.h"
5 #include "ace/OS_NS_unistd.h"
6 #include <vector>
8 using namespace std;
10 struct _Subscriber
12 Subscriber_var subscriber;
13 bool unsubscribed;
14 unsigned int count;
17 class Publisher_impl::Worker : public ACE_Task_Base
19 public:
20 Worker(Publisher_impl * _owner);
21 ~Worker();
22 void addSubscriber(Subscriber_ptr subscriber);
23 virtual int svc ();
24 void terminate();
25 private:
26 bool terminated;
27 vector<_Subscriber> subscribers;
28 TAO_SYNCH_MUTEX mutex;
29 Publisher_impl * owner;
32 Publisher_impl::Worker::Worker(Publisher_impl * _owner)
33 : owner(_owner)
35 terminated = false;
38 Publisher_impl::Worker::~Worker()
40 terminated = true;
43 void Publisher_impl::Worker::addSubscriber(Subscriber_ptr subscriber)
45 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex);
46 subscribers.push_back(_Subscriber());
47 _Subscriber& s = subscribers.back();
48 s.unsubscribed = false;
49 s.subscriber = Subscriber::_duplicate(subscriber);
50 s.count = 0;
53 int Publisher_impl::Worker::svc ()
55 double data = 0.0;
56 bool doShutdown = false;
57 unsigned long iteration = 0;
58 ACE_Time_Value tv;
59 tv.set(0.01);
60 while (!terminated)
62 data += 0.01;
63 ++iteration;
65 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->mutex, 0);
66 doShutdown = subscribers.size() > 0;
67 for (vector<_Subscriber>::iterator iter = subscribers.begin();
68 iter != subscribers.end(); ++iter)
70 if (!iter->unsubscribed)
72 doShutdown = false;
73 try
75 if (!CORBA::is_nil(iter->subscriber.in ()))
76 iter->subscriber->onData(data);
77 else
78 iter->unsubscribed = true;
79 ++iter->count;
81 catch (...)
83 iter->unsubscribed = true;
88 if (iteration % 200 == 0)
90 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->mutex, 0);
91 for (vector<_Subscriber>::iterator iter = subscribers.begin();
92 iter != subscribers.end(); ++iter)
94 if (!iter->unsubscribed)
96 try
98 iter->subscriber->isAlive();
100 catch (...)
102 iter->unsubscribed = true;
107 if (doShutdown)
108 owner->shutdown();
109 else
110 ACE_OS::sleep(tv);
112 return 0;
115 void Publisher_impl::Worker::terminate()
117 terminated = true;
120 Publisher_impl::Publisher_impl(CORBA::ORB_ptr orb)
121 : orb_ (CORBA::ORB::_duplicate (orb))
123 worker = new Worker(this);
124 worker->activate();
127 Publisher_impl::~Publisher_impl()
129 worker->terminate();
130 worker->thr_mgr()->wait();
131 delete worker;
134 void subscribe (::Subscriber_ptr subscriber);
136 void
137 Publisher_impl::subscribe(
138 ::Subscriber_ptr subscriber
141 worker->addSubscriber(subscriber);
144 void
145 Publisher_impl::shutdown (
148 this->orb_->shutdown (false);
149 worker->terminate();