1 #include "publisher_impl.h"
2 #include "ace/OS_NS_sys_time.h"
4 #include "ace/Process_Mutex.h"
5 #include "ace/OS_NS_unistd.h"
12 Subscriber_var subscriber
;
17 class Publisher_impl::Worker
: public ACE_Task_Base
20 Worker(Publisher_impl
* _owner
);
22 void addSubscriber(Subscriber_ptr subscriber
);
27 vector
<_Subscriber
> subscribers
;
28 TAO_SYNCH_MUTEX mutex
;
29 Publisher_impl
* owner
;
32 Publisher_impl::Worker::Worker(Publisher_impl
* _owner
)
38 Publisher_impl::Worker::~Worker()
43 void Publisher_impl::Worker::addSubscriber(Subscriber_ptr subscriber
)
45 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex
);
46 subscribers
.push_back(_Subscriber());
47 _Subscriber
& s
= subscribers
.back();
48 s
.unsubscribed
= false;
49 s
.subscriber
= Subscriber::_duplicate(subscriber
);
53 int Publisher_impl::Worker::svc ()
56 bool doShutdown
= false;
57 unsigned long iteration
= 0;
65 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex
, 0);
66 doShutdown
= subscribers
.size() > 0;
67 for (vector
<_Subscriber
>::iterator iter
= subscribers
.begin();
68 iter
!= subscribers
.end(); ++iter
)
70 if (!iter
->unsubscribed
)
75 if (!CORBA::is_nil(iter
->subscriber
.in ()))
76 iter
->subscriber
->onData(data
);
78 iter
->unsubscribed
= true;
83 iter
->unsubscribed
= true;
88 if (iteration
% 200 == 0)
90 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex
, 0);
91 for (vector
<_Subscriber
>::iterator iter
= subscribers
.begin();
92 iter
!= subscribers
.end(); ++iter
)
94 if (!iter
->unsubscribed
)
98 iter
->subscriber
->isAlive();
102 iter
->unsubscribed
= true;
115 void Publisher_impl::Worker::terminate()
120 Publisher_impl::Publisher_impl(CORBA::ORB_ptr orb
)
121 : orb_ (CORBA::ORB::_duplicate (orb
))
123 worker
= new Worker(this);
127 Publisher_impl::~Publisher_impl()
130 worker
->thr_mgr()->wait();
134 void subscribe (::Subscriber_ptr subscriber
);
137 Publisher_impl::subscribe(
138 ::Subscriber_ptr subscriber
141 worker
->addSubscriber(subscriber
);
145 Publisher_impl::shutdown (
148 this->orb_
->shutdown (false);