1 // **********************************************************************
3 // Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
8 // **********************************************************************
10 #include <IceUtil/DisableWarnings.h>
11 #include <IceStorm/TopicI.h>
12 #include <IceStorm/Instance.h>
13 #include <IceStorm/Subscriber.h>
14 #include <IceStorm/TraceLevels.h>
15 #include <IceStorm/NodeI.h>
16 #include <IceStorm/Observers.h>
17 #include <IceStorm/DB.h>
18 #include <IceStorm/Util.h>
19 #include <Ice/LoggerUtil.h>
23 using namespace IceStorm
;
24 using namespace IceStormElection
;
26 using namespace IceDB
;
32 halt(const Ice::CommunicatorPtr
& com
, const DatabaseException
& ex
)
35 Ice::Error
error(com
->getLogger());
36 error
<< "fatal exception: " << ex
<< "\n*** Aborting application ***";
43 // The servant has a 1-1 association with a topic. It is used to
44 // receive events from Publishers.
46 class PublisherI
: public Ice::BlobjectArray
50 PublisherI(const TopicImplPtr
& topic
, const InstancePtr
& instance
) :
51 _topic(topic
), _instance(instance
)
57 //cout << "~PublisherI" << endl;
61 ice_invoke(const pair
<const Ice::Byte
*, const Ice::Byte
*>& inParams
,
63 const Ice::Current
& current
)
65 // The publish call does a cached read.
66 EventDataPtr event
= new EventData(
73 // COMPILERBUG: gcc 4.0.1 doesn't like this.
75 //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
76 Ice::ByteSeq
data(inParams
.first
, inParams
.second
);
77 event
->data
.swap(data
);
81 _topic
->publish(false, v
);
88 const TopicImplPtr _topic
;
89 const InstancePtr _instance
;
93 // The servant has a 1-1 association with a topic. It is used to
94 // receive events from linked Topics.
96 class TopicLinkI
: public TopicLink
100 TopicLinkI(const TopicImplPtr
& impl
, const InstancePtr
& instance
) :
101 _impl(impl
), _instance(instance
)
107 //cout << "~TopicLinkI" << endl;
111 forward(const EventDataSeq
& v
, const Ice::Current
& current
)
113 // The publish call does a cached read.
114 _impl
->publish(true, v
);
119 const TopicImplPtr _impl
;
120 const InstancePtr _instance
;
123 class TopicI
: public TopicInternal
127 TopicI(const TopicImplPtr
& impl
, const InstancePtr
& instance
) :
128 _impl(impl
), _instance(instance
)
134 //cout << "~TopicI" << endl;
137 virtual string
getName(const Ice::Current
&) const
140 CachedReadHelper
unlock(_instance
->node(), __FILE__
, __LINE__
);
141 return _impl
->getName();
144 virtual Ice::ObjectPrx
getPublisher(const Ice::Current
&) const
147 CachedReadHelper
unlock(_instance
->node(), __FILE__
, __LINE__
);
148 return _impl
->getPublisher();
151 virtual Ice::ObjectPrx
getNonReplicatedPublisher(const Ice::Current
&) const
154 CachedReadHelper
unlock(_instance
->node(), __FILE__
, __LINE__
);
155 return _impl
->getNonReplicatedPublisher();
158 virtual void subscribe(const QoS
& qos
, const Ice::ObjectPrx
& obj
, const Ice::Current
& current
)
162 Ice::Long generation
= -1;
163 TopicPrx master
= getMasterFor(current
, generation
, __FILE__
, __LINE__
);
168 master
->subscribe(qos
, obj
);
170 catch(const Ice::ConnectFailedException
&)
172 _instance
->node()->recovery(generation
);
175 catch(const Ice::TimeoutException
&)
177 _instance
->node()->recovery(generation
);
183 FinishUpdateHelper
unlock(_instance
->node());
184 _impl
->subscribe(qos
, obj
);
190 virtual Ice::ObjectPrx
subscribeAndGetPublisher(const QoS
& qos
, const Ice::ObjectPrx
& obj
,
191 const Ice::Current
& current
)
195 Ice::Long generation
= -1;
196 TopicPrx master
= getMasterFor(current
, generation
, __FILE__
, __LINE__
);
201 return master
->subscribeAndGetPublisher(qos
, obj
);
203 catch(const Ice::ConnectFailedException
&)
205 _instance
->node()->recovery(generation
);
208 catch(const Ice::TimeoutException
&)
210 _instance
->node()->recovery(generation
);
216 FinishUpdateHelper
unlock(_instance
->node());
217 return _impl
->subscribeAndGetPublisher(qos
, obj
);
222 virtual void unsubscribe(const Ice::ObjectPrx
& subscriber
, const Ice::Current
& current
)
226 Ice::Long generation
= -1;
227 TopicPrx master
= getMasterFor(current
, generation
, __FILE__
, __LINE__
);
232 master
->unsubscribe(subscriber
);
234 catch(const Ice::ConnectFailedException
&)
236 _instance
->node()->recovery(generation
);
239 catch(const Ice::TimeoutException
&)
241 _instance
->node()->recovery(generation
);
247 FinishUpdateHelper
unlock(_instance
->node());
248 _impl
->unsubscribe(subscriber
);
254 virtual TopicLinkPrx
getLinkProxy(const Ice::Current
&)
257 CachedReadHelper
unlock(_instance
->node(), __FILE__
, __LINE__
);
258 return _impl
->getLinkProxy();
261 virtual void reap(const Ice::IdentitySeq
& ids
, const Ice::Current
& current
)
263 NodeIPtr node
= _instance
->node();
264 if(!node
->updateMaster(__FILE__
, __LINE__
))
266 throw ReapWouldBlock();
268 FinishUpdateHelper
unlock(node
);
272 virtual void link(const TopicPrx
& topic
, Ice::Int cost
, const Ice::Current
& current
)
276 Ice::Long generation
= -1;
277 TopicPrx master
= getMasterFor(current
, generation
, __FILE__
, __LINE__
);
282 master
->link(topic
, cost
);
284 catch(const Ice::ConnectFailedException
&)
286 _instance
->node()->recovery(generation
);
289 catch(const Ice::TimeoutException
&)
291 _instance
->node()->recovery(generation
);
297 FinishUpdateHelper
unlock(_instance
->node());
298 _impl
->link(topic
, cost
);
304 virtual void unlink(const TopicPrx
& topic
, const Ice::Current
& current
)
308 Ice::Long generation
= -1;
309 TopicPrx master
= getMasterFor(current
, generation
, __FILE__
, __LINE__
);
314 master
->unlink(topic
);
316 catch(const Ice::ConnectFailedException
&)
318 _instance
->node()->recovery(generation
);
321 catch(const Ice::TimeoutException
&)
323 _instance
->node()->recovery(generation
);
329 FinishUpdateHelper
unlock(_instance
->node());
330 _impl
->unlink(topic
);
336 virtual LinkInfoSeq
getLinkInfoSeq(const Ice::Current
&) const
339 CachedReadHelper
unlock(_instance
->node(), __FILE__
, __LINE__
);
340 return _impl
->getLinkInfoSeq();
343 virtual void destroy(const Ice::Current
& current
)
347 Ice::Long generation
= -1;
348 TopicPrx master
= getMasterFor(current
, generation
, __FILE__
, __LINE__
);
355 catch(const Ice::ConnectFailedException
&)
357 _instance
->node()->recovery(generation
);
360 catch(const Ice::TimeoutException
&)
362 _instance
->node()->recovery(generation
);
368 FinishUpdateHelper
unlock(_instance
->node());
377 TopicPrx
getMasterFor(const Ice::Current
& cur
, Ice::Long
& generation
, const char* file
, int line
) const
379 NodeIPtr node
= _instance
->node();
380 Ice::ObjectPrx master
;
383 master
= _instance
->node()->startUpdate(generation
, file
, line
);
385 return (master
) ? TopicPrx::uncheckedCast(master
->ice_identity(cur
.id
)) : TopicPrx();
388 const TopicImplPtr _impl
;
389 const InstancePtr _instance
;
396 extern string
identityToTopicName(const Ice::Identity
& id
);
399 TopicImpl::TopicImpl(
400 const InstancePtr
& instance
,
402 const Ice::Identity
& id
,
403 const SubscriberRecordSeq
& subscribers
) :
407 _databaseCache(instance
->databaseCache()),
414 // TODO: If we want to improve the performance of the
415 // non-replicated case we could allocate a null-topic impl here.
416 _servant
= new TopicI(this, instance
);
419 // Create a servant per topic to receive event data. If the
420 // category is empty then we are in backwards compatibility
421 // mode. In this case the servant's identity is
422 // category=<topicname>, name=publish, otherwise the name is
423 // <instancename>/<topicname>.publish. The same applies to the
426 // Activate the object and save a reference to give to publishers.
429 Ice::Identity linkid
;
430 if(id
.category
.empty())
432 pubid
.category
= _name
;
433 pubid
.name
= "publish";
434 linkid
.category
= _name
;
435 linkid
.name
= "link";
439 pubid
.category
= id
.category
;
440 pubid
.name
= _name
+ ".publish";
441 linkid
.category
= id
.category
;
442 linkid
.name
= _name
+ ".link";
445 _publisherPrx
= _instance
->publishAdapter()->add(new PublisherI(this, instance
), pubid
);
446 _linkPrx
= TopicLinkPrx::uncheckedCast(
447 _instance
->publishAdapter()->add(new TopicLinkI(this, instance
), linkid
));
450 // Re-establish subscribers.
452 for(SubscriberRecordSeq::const_iterator p
= subscribers
.begin(); p
!= subscribers
.end(); ++p
)
454 Ice::Identity id
= p
->obj
->ice_getIdentity();
455 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
456 if(traceLevels
->topic
> 0)
458 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
459 out
<< _name
<< " recreate " << _instance
->communicator()->identityToString(id
);
460 if(traceLevels
->topic
> 1)
462 out
<< " endpoints: " << IceStormInternal::describeEndpoints(p
->obj
);
469 // Create the subscriber object add it to the set of
472 SubscriberPtr subscriber
= Subscriber::create(_instance
, *p
);
473 _subscribers
.push_back(subscriber
);
475 catch(const Ice::Exception
& ex
)
477 Ice::Warning
out(traceLevels
->logger
);
478 out
<< _name
<< " recreate " << _instance
->communicator()->identityToString(id
);
479 if(traceLevels
->topic
> 1)
481 out
<< " endpoints: " << IceStormInternal::describeEndpoints(p
->obj
);
483 out
<< " failed: " << ex
;
490 __setNoDelete(false);
493 __setNoDelete(false);
496 TopicImpl::~TopicImpl()
498 //cout << "~TopicImpl" << endl;
502 TopicImpl::getName() const
509 TopicImpl::getPublisher() const
512 if(_instance
->publisherReplicaProxy())
514 return _instance
->publisherReplicaProxy()->ice_identity(_publisherPrx
->ice_getIdentity());
516 return _publisherPrx
;
520 TopicImpl::getNonReplicatedPublisher() const
522 // If there is an adapter id configured then we're using icegrid
523 // so create an indirect proxy, otherwise create a direct proxy.
524 if(!_publisherPrx
->ice_getAdapterId().empty())
526 return _instance
->publishAdapter()->createIndirectProxy(_publisherPrx
->ice_getIdentity());
530 return _instance
->publishAdapter()->createDirectProxy(_publisherPrx
->ice_getIdentity());
535 // COMPILERFIX: For some reason with VC6 find reports an error.
537 #if defined(_MSC_VER) && (_MSC_VER < 1300)
540 vector
<SubscriberPtr
>::iterator
541 find(vector
<SubscriberPtr
>::iterator start
, vector
<SubscriberPtr
>::iterator end
, const Ice::Identity
& ident
)
559 trace(Ice::Trace
& out
, const InstancePtr
& instance
, const vector
<SubscriberPtr
>& s
)
562 for(vector
<SubscriberPtr
>::const_iterator p
= s
.begin(); p
!= s
.end(); ++p
)
568 out
<< instance
->communicator()->identityToString((*p
)->id());
575 TopicImpl::subscribe(const QoS
& origQoS
, const Ice::ObjectPrx
& obj
)
577 Ice::Identity id
= obj
->ice_getIdentity();
578 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
580 if(traceLevels
->topic
> 0)
582 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
583 out
<< _name
<< ": subscribe: " << _instance
->communicator()->identityToString(id
);
585 if(traceLevels
->topic
> 1)
587 out
<< " endpoints: " << IceStormInternal::describeEndpoints(obj
)
589 for(QoS::const_iterator p
= qos
.begin(); p
!= qos
.end() ; ++p
)
595 out
<< '[' << p
->first
<< "," << p
->second
<< ']';
597 out
<< " subscriptions: ";
598 trace(out
, _instance
, _subscribers
);
602 string reliability
= "oneway";
604 QoS::iterator p
= qos
.find("reliability");
607 reliability
= p
->second
;
612 Ice::ObjectPrx newObj
= obj
;
613 if(reliability
== "batch")
615 if(newObj
->ice_isDatagram())
617 newObj
= newObj
->ice_batchDatagram();
621 newObj
= newObj
->ice_batchOneway();
624 else if(reliability
== "twoway")
626 newObj
= newObj
->ice_twoway();
628 else if(reliability
== "twoway ordered")
630 qos
["reliability"] = "ordered";
631 newObj
= newObj
->ice_twoway();
633 else // reliability == "oneway"
635 if(reliability
!= "oneway" && traceLevels
->subscriber
> 0)
637 Ice::Trace
out(traceLevels
->logger
, traceLevels
->subscriberCat
);
638 out
<< reliability
<<" mode not understood.";
640 if(!newObj
->ice_isDatagram())
642 newObj
= newObj
->ice_oneway();
646 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
647 SubscriberRecord record
;
651 record
.topicName
= _name
;
657 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), record
.id
);
658 if(p
!= _subscribers
.end())
660 // If we already have this subscriber remove it from our
661 // subscriber list and remove it from the database.
663 _subscribers
.erase(p
);
669 DatabaseConnectionPtr connection
= _databaseCache
->getConnection();
670 TransactionHolder
txn(connection
);
672 SubscriberRecordKey key
;
676 SubscribersWrapperPtr subscribersWrapper
= _databaseCache
->getSubscribers(connection
);
677 subscribersWrapper
->erase(key
);
679 LLUWrapperPtr lluWrapper
= _databaseCache
->getLLU(connection
);
680 llu
= lluWrapper
->get();
682 lluWrapper
->put(llu
);
687 catch(const DeadlockException
&)
691 catch(const DatabaseException
& ex
)
693 halt(_instance
->communicator(), ex
);
696 Ice::IdentitySeq ids
;
698 _instance
->observers()->removeSubscriber(llu
, _name
, ids
);
701 SubscriberPtr subscriber
= Subscriber::create(_instance
, record
);
706 DatabaseConnectionPtr connection
= _databaseCache
->getConnection();
707 TransactionHolder
txn(connection
);
709 SubscriberRecordKey key
;
711 key
.id
= subscriber
->id();
713 SubscribersWrapperPtr subscribersWrapper
= _databaseCache
->getSubscribers(connection
);
714 subscribersWrapper
->put(key
, record
);
717 LLUWrapperPtr lluWrapper
= _databaseCache
->getLLU(connection
);
718 llu
= lluWrapper
->get();
720 lluWrapper
->put(llu
);
725 catch(const DeadlockException
&)
729 catch(const DatabaseException
& ex
)
731 halt(_instance
->communicator(), ex
);
735 _subscribers
.push_back(subscriber
);
737 _instance
->observers()->addSubscriber(llu
, _name
, record
);
741 TopicImpl::subscribeAndGetPublisher(const QoS
& qos
, const Ice::ObjectPrx
& obj
)
743 Ice::Identity id
= obj
->ice_getIdentity();
745 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
746 if(traceLevels
->topic
> 0)
748 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
749 out
<< _name
<< ": subscribeAndGetPublisher: " << _instance
->communicator()->identityToString(id
);
751 if(traceLevels
->topic
> 1)
753 out
<< " endpoints: " << IceStormInternal::describeEndpoints(obj
)
755 for(QoS::const_iterator p
= qos
.begin(); p
!= qos
.end() ; ++p
)
763 out
<< " subscriptions: ";
764 trace(out
, _instance
, _subscribers
);
768 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
770 SubscriberRecord record
;
774 record
.topicName
= _name
;
778 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), record
.id
);
779 if(p
!= _subscribers
.end())
781 throw AlreadySubscribed();
786 SubscriberPtr subscriber
= Subscriber::create(_instance
, record
);
791 DatabaseConnectionPtr connection
= _databaseCache
->getConnection();
792 TransactionHolder
txn(connection
);
794 SubscriberRecordKey key
;
796 key
.id
= subscriber
->id();
798 SubscribersWrapperPtr subscribersWrapper
= _databaseCache
->getSubscribers(connection
);
799 subscribersWrapper
->put(key
, record
);
801 LLUWrapperPtr lluWrapper
= _databaseCache
->getLLU(connection
);
802 llu
= lluWrapper
->get();
804 lluWrapper
->put(llu
);
809 catch(const DeadlockException
&)
813 catch(const DatabaseException
& ex
)
815 halt(_instance
->communicator(), ex
);
819 _subscribers
.push_back(subscriber
);
821 _instance
->observers()->addSubscriber(llu
, _name
, record
);
823 return subscriber
->proxy();
827 TopicImpl::unsubscribe(const Ice::ObjectPrx
& subscriber
)
829 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
832 if(traceLevels
->topic
> 0)
834 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
835 out
<< "unsubscribe with null subscriber.";
840 Ice::Identity id
= subscriber
->ice_getIdentity();
842 if(traceLevels
->topic
> 0)
844 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
845 out
<< _name
<< ": unsubscribe: " << _instance
->communicator()->identityToString(id
);
847 if(traceLevels
->topic
> 1)
849 out
<< " endpoints: " << IceStormInternal::describeEndpoints(subscriber
);
850 trace(out
, _instance
, _subscribers
);
854 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
855 Ice::IdentitySeq ids
;
857 removeSubscribers(ids
);
861 TopicImpl::getLinkProxy()
864 if(_instance
->publisherReplicaProxy())
866 return TopicLinkPrx::uncheckedCast(_instance
->publisherReplicaProxy()->ice_identity(
867 _linkPrx
->ice_getIdentity()));
873 TopicImpl::link(const TopicPrx
& topic
, Ice::Int cost
)
875 TopicInternalPrx internal
= TopicInternalPrx::uncheckedCast(topic
);
876 TopicLinkPrx link
= internal
->getLinkProxy();
878 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
879 if(traceLevels
->topic
> 0)
881 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
882 out
<< _name
<< ": link " << _instance
->communicator()->identityToString(topic
->ice_getIdentity())
886 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
888 Ice::Identity id
= topic
->ice_getIdentity();
890 SubscriberRecord record
;
893 record
.theTopic
= topic
;
894 record
.topicName
= _name
;
898 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), record
.id
);
899 if(p
!= _subscribers
.end())
901 string name
= identityToTopicName(id
);
909 SubscriberPtr subscriber
= Subscriber::create(_instance
, record
);
915 DatabaseConnectionPtr connection
= _databaseCache
->getConnection();
916 TransactionHolder
txn(connection
);
918 SubscriberRecordKey key
;
922 SubscribersWrapperPtr subscribersWrapper
= _databaseCache
->getSubscribers(connection
);
923 subscribersWrapper
->put(key
, record
);
925 LLUWrapperPtr lluWrapper
= _databaseCache
->getLLU(connection
);
926 llu
= lluWrapper
->get();
928 lluWrapper
->put(llu
);
933 catch(const DeadlockException
&)
937 catch(const DatabaseException
& ex
)
939 halt(_instance
->communicator(), ex
);
943 _subscribers
.push_back(subscriber
);
945 _instance
->observers()->addSubscriber(llu
, _name
, record
);
949 TopicImpl::unlink(const TopicPrx
& topic
)
951 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
954 throw Ice::ObjectNotExistException(__FILE__
, __LINE__
);
957 Ice::Identity id
= topic
->ice_getIdentity();
959 vector
<SubscriberPtr
>::const_iterator p
= find(_subscribers
.begin(), _subscribers
.end(), id
);
960 if(p
== _subscribers
.end())
962 string name
= identityToTopicName(id
);
963 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
964 if(traceLevels
->topic
> 0)
966 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
967 out
<< _name
<< ": unlink " << name
<< " failed - not linked";
975 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
976 if(traceLevels
->topic
> 0)
978 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
979 out
<< _name
<< " unlink " << _instance
->communicator()->identityToString(id
);
982 Ice::IdentitySeq ids
;
984 removeSubscribers(ids
);
988 TopicImpl::reap(const Ice::IdentitySeq
& ids
)
990 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
992 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
993 if(traceLevels
->topic
> 0)
995 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
996 out
<< _name
<< ": reap ";
997 for(Ice::IdentitySeq::const_iterator p
= ids
.begin(); p
!= ids
.end() ; ++p
)
1003 out
<< _instance
->communicator()->identityToString(*p
);
1007 removeSubscribers(ids
);
1011 TopicImpl::shutdown()
1013 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1016 // Shutdown each subscriber. This waits for the event queues to drain.
1017 for(vector
<SubscriberPtr
>::const_iterator p
= _subscribers
.begin(); p
!= _subscribers
.end(); ++p
)
1024 TopicImpl::getLinkInfoSeq() const
1026 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1029 for(vector
<SubscriberPtr
>::const_iterator p
= _subscribers
.begin(); p
!= _subscribers
.end(); ++p
)
1031 SubscriberRecord record
= (*p
)->record();
1032 if(record
.link
&& !(*p
)->errored())
1035 info
.name
= identityToTopicName(record
.theTopic
->ice_getIdentity());
1036 info
.cost
= record
.cost
;
1037 info
.theTopic
= record
.theTopic
;
1038 seq
.push_back(info
);
1045 TopicImpl::destroy()
1047 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1051 throw Ice::ObjectNotExistException(__FILE__
, __LINE__
);
1055 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
1056 if(traceLevels
->topic
> 0)
1058 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
1059 out
<< _name
<< ": destroy";
1062 // destroyInternal clears out the topic content.
1063 LogUpdate llu
= {0,0};
1064 _instance
->observers()->destroyTopic(destroyInternal(llu
, true), _name
);
1068 TopicImpl::getContent() const
1070 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1072 TopicContent content
;
1074 for(vector
<SubscriberPtr
>::const_iterator p
= _subscribers
.begin(); p
!= _subscribers
.end(); ++p
)
1076 // Don't return errored subscribers (subscribers that have
1077 // errored out, but not reaped due to a failure with the
1078 // master). This means we can avoid the reaping step later.
1079 if(!(*p
)->errored())
1081 content
.records
.push_back((*p
)->record());
1088 TopicImpl::update(const SubscriberRecordSeq
& records
)
1090 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1092 // We do this with two scans. The first runs through the subscribers
1093 // that we have and removes those not in the init list. The second
1094 // runs through the init list and add the ones that don't
1098 vector
<SubscriberPtr
>::iterator p
= _subscribers
.begin();
1099 while(p
!= _subscribers
.end())
1101 SubscriberRecordSeq::const_iterator q
;
1102 for(q
= records
.begin(); q
!= records
.end(); ++q
)
1104 if((*p
)->id() == q
->id
)
1109 // The subscriber doesn't exist in the incoming subscriber
1110 // set so destroy it.
1111 if(q
== records
.end())
1114 p
= _subscribers
.erase(p
);
1118 // Otherwise reset the reaped status if necessary.
1119 (*p
)->resetIfReaped();
1125 for(SubscriberRecordSeq::const_iterator p
= records
.begin(); p
!= records
.end(); ++p
)
1127 vector
<SubscriberPtr
>::iterator q
;
1128 for(q
= _subscribers
.begin(); q
!= _subscribers
.end(); ++q
)
1130 if((*q
)->id() == p
->id
)
1135 if(q
== _subscribers
.end())
1137 SubscriberPtr subscriber
= Subscriber::create(_instance
, *p
);
1138 _subscribers
.push_back(subscriber
);
1144 TopicImpl::destroyed() const
1146 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1151 TopicImpl::id() const
1158 TopicImpl::proxy() const
1162 if(_instance
->topicReplicaProxy())
1164 prx
= _instance
->topicReplicaProxy()->ice_identity(_id
);
1168 prx
= _instance
->topicAdapter()->createProxy(_id
);
1170 return TopicPrx::uncheckedCast(prx
);
1176 class TopicInternal_reapI
: public AMI_TopicInternal_reap
1180 TopicInternal_reapI(const InstancePtr
& instance
, Ice::Long generation
) :
1181 _instance(instance
), _generation(generation
)
1185 virtual void ice_response()
1189 virtual void ice_exception(const Ice::Exception
& ex
)
1191 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
1192 if(traceLevels
->topic
> 0)
1194 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
1195 out
<< "exception when calling `reap' on the master replica: " << ex
;
1197 _instance
->node()->recovery(_generation
);
1202 const InstancePtr _instance
;
1203 const Ice::Long _generation
;
1209 TopicImpl::publish(bool forwarded
, const EventDataSeq
& events
)
1211 TopicInternalPrx masterInternal
;
1212 Ice::Long generation
= -1;
1213 Ice::IdentitySeq reap
;
1215 // Use cached reads.
1216 CachedReadHelper
unlock(_instance
->node(), __FILE__
, __LINE__
);
1219 // Copy of the subscriber list so that event publishing can occur
1222 vector
<SubscriberPtr
> copy
;
1224 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1225 copy
= _subscribers
;
1229 // Queue each event, gathering a list of those subscribers that
1232 for(vector
<SubscriberPtr
>::const_iterator p
= copy
.begin(); p
!= copy
.end(); ++p
)
1234 if(!(*p
)->queue(forwarded
, events
) && (*p
)->reap())
1236 reap
.push_back((*p
)->id());
1240 // If there are no subscribers in error then we're done.
1245 if(!unlock
.getMaster())
1247 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1248 removeSubscribers(reap
);
1251 masterInternal
= TopicInternalPrx::uncheckedCast(unlock
.getMaster()->ice_identity(_id
));
1252 generation
= unlock
.generation();
1256 // Tell the master to reap this set of subscribers. This is an
1257 // AMI invocation so it shouldn't block the caller (in the
1258 // typical case) we do it outside of the mutex lock for
1259 // performance reasons.
1261 // We must release the cached lock before calling this as the AMI
1262 // call may raise an exception in the caller (that is directly
1263 // call ice_exception) which calls recover() on the node which
1264 // would result in a deadlock since the node is locked.
1265 masterInternal
->reap_async(new TopicInternal_reapI(_instance
, generation
), reap
);
1269 TopicImpl::observerAddSubscriber(const LogUpdate
& llu
, const SubscriberRecord
& record
)
1271 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1273 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
1274 if(traceLevels
->topic
> 0)
1276 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
1277 out
<< _name
<< ": add replica observer: " << _instance
->communicator()->identityToString(record
.id
);
1279 if(traceLevels
->topic
> 1)
1281 out
<< " endpoints: " << IceStormInternal::describeEndpoints(record
.obj
)
1283 for(QoS::const_iterator p
= record
.theQoS
.begin(); p
!= record
.theQoS
.end() ; ++p
)
1285 if(p
!= record
.theQoS
.begin())
1289 out
<< '[' << p
->first
<< "," << p
->second
<< ']';
1292 out
<< " llu: " << llu
.generation
<< "/" << llu
.iteration
;
1295 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), record
.id
);
1296 if(p
!= _subscribers
.end())
1298 // If the subscriber is already in the database display a
1300 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
1301 if(traceLevels
->topic
> 0)
1303 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
1304 out
<< _instance
->communicator()->identityToString(record
.id
) << ": already subscribed";
1309 SubscriberPtr subscriber
= Subscriber::create(_instance
, record
);
1314 DatabaseConnectionPtr connection
= _databaseCache
->getConnection();
1315 TransactionHolder
txn(connection
);
1317 SubscriberRecordKey key
;
1319 key
.id
= subscriber
->id();
1321 SubscribersWrapperPtr subscribersWrapper
= _databaseCache
->getSubscribers(connection
);
1322 subscribersWrapper
->put(key
, record
);
1325 LLUWrapperPtr lluWrapper
= _databaseCache
->getLLU(connection
);
1326 lluWrapper
->put(llu
);
1331 catch(const DeadlockException
&)
1335 catch(const DatabaseException
& ex
)
1337 halt(_instance
->communicator(), ex
);
1341 _subscribers
.push_back(subscriber
);
1345 TopicImpl::observerRemoveSubscriber(const LogUpdate
& llu
, const Ice::IdentitySeq
& ids
)
1347 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
1348 if(traceLevels
->topic
> 0)
1350 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
1351 out
<< _name
<< ": remove replica observer: ";
1352 for(Ice::IdentitySeq::const_iterator id
= ids
.begin(); id
!= ids
.end(); ++id
)
1354 if(id
!= ids
.begin())
1358 out
<< _instance
->communicator()->identityToString(*id
);
1360 out
<< " llu: " << llu
.generation
<< "/" << llu
.iteration
;
1363 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1365 // Remove the subscriber from the subscribers list. If the
1366 // subscriber had a local failure and was removed from the
1367 // subscriber list it could already be gone. That's not a problem.
1368 for(Ice::IdentitySeq::const_iterator id
= ids
.begin(); id
!= ids
.end(); ++id
)
1370 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), *id
);
1371 if(p
!= _subscribers
.end())
1374 _subscribers
.erase(p
);
1378 // Next remove from the database.
1383 DatabaseConnectionPtr connection
= _databaseCache
->getConnection();
1384 TransactionHolder
txn(connection
);
1386 for(Ice::IdentitySeq::const_iterator id
= ids
.begin(); id
!= ids
.end(); ++id
)
1388 SubscriberRecordKey key
;
1392 SubscribersWrapperPtr subscribersWrapper
= _databaseCache
->getSubscribers(connection
);
1393 subscribersWrapper
->erase(key
);
1396 LLUWrapperPtr lluWrapper
= _databaseCache
->getLLU(connection
);
1397 lluWrapper
->put(llu
);
1402 catch(const DeadlockException
&)
1406 catch(const DatabaseException
& ex
)
1408 halt(_instance
->communicator(), ex
);
1414 TopicImpl::observerDestroyTopic(const LogUpdate
& llu
)
1416 IceUtil::Mutex::Lock
sync(_subscribersMutex
);
1424 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
1425 if(traceLevels
->topic
> 0)
1427 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
1428 out
<< _name
<< ": destroyed";
1429 out
<< " llu: " << llu
.generation
<< "/" << llu
.iteration
;
1431 destroyInternal(llu
, false);
1435 TopicImpl::getServant() const
1441 TopicImpl::destroyInternal(const LogUpdate
& origLLU
, bool master
)
1443 _instance
->publishAdapter()->remove(_linkPrx
->ice_getIdentity());
1444 _instance
->publishAdapter()->remove(_publisherPrx
->ice_getIdentity());
1446 // Destroy each of the subscribers.
1447 for(vector
<SubscriberPtr
>::const_iterator p
= _subscribers
.begin(); p
!= _subscribers
.end(); ++p
)
1451 _subscribers
.clear();
1453 // Clear out the database records related to this topic.
1459 DatabaseConnectionPtr connection
= _databaseCache
->getConnection();
1460 TransactionHolder
txn(connection
);
1462 // Erase all subscriber records and the topic record.
1463 SubscribersWrapperPtr subscribersWrapper
= _databaseCache
->getSubscribers(connection
);
1464 subscribersWrapper
->eraseTopic(_id
);
1467 LLUWrapperPtr lluWrapper
= _databaseCache
->getLLU(connection
);
1470 llu
= lluWrapper
->get();
1477 lluWrapper
->put(llu
);
1482 catch(const DeadlockException
&)
1486 catch(const DatabaseException
& ex
)
1488 halt(_instance
->communicator(), ex
);
1492 _instance
->topicAdapter()->remove(_id
);
1500 TopicImpl::removeSubscribers(const Ice::IdentitySeq
& ids
)
1502 Ice::IdentitySeq removed
;
1504 // First remove the subscriber from the subscribers list. Its
1505 // possible that some of these subscribers have already been
1506 // removed (consider, for example, a concurrent reap call from two
1507 // replicas on the same subscriber). To avoid sending unnecessary
1508 // observer updates keep track of the observers that are actually
1510 for(Ice::IdentitySeq::const_iterator id
= ids
.begin(); id
!= ids
.end(); ++id
)
1512 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), *id
);
1513 if(p
!= _subscribers
.end())
1516 _subscribers
.erase(p
);
1517 removed
.push_back(*id
);
1521 // If there is no further work to do we are done.
1527 // Next update the database and send the notification to any
1534 DatabaseConnectionPtr connection
= _databaseCache
->getConnection();
1535 TransactionHolder
txn(connection
);
1537 for(Ice::IdentitySeq::const_iterator id
= ids
.begin(); id
!= ids
.end(); ++id
)
1539 SubscriberRecordKey key
;
1543 SubscribersWrapperPtr subscribersWrapper
= _databaseCache
->getSubscribers(connection
);
1544 subscribersWrapper
->erase(key
);
1547 LLUWrapperPtr lluWrapper
= _databaseCache
->getLLU(connection
);
1548 llu
= lluWrapper
->get();
1550 lluWrapper
->put(llu
);
1555 catch(const DeadlockException
&)
1559 catch(const DatabaseException
& ex
)
1561 halt(_instance
->communicator(), ex
);
1565 _instance
->observers()->removeSubscriber(llu
, _name
, ids
);