ICE 3.4.2
[php5-ice-freebsdport.git] / cpp / src / IceStorm / TopicI.cpp
blob068f659bd662ac540c439b27afb7d708eff6ba36
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
10 #include <IceUtil/DisableWarnings.h>
11 #include <IceStorm/TopicI.h>
12 #include <IceStorm/Instance.h>
13 #include <IceStorm/Subscriber.h>
14 #include <IceStorm/TraceLevels.h>
15 #include <IceStorm/NodeI.h>
16 #include <IceStorm/Observers.h>
17 #include <IceStorm/DB.h>
18 #include <IceStorm/Util.h>
19 #include <Ice/LoggerUtil.h>
20 #include <algorithm>
22 using namespace std;
23 using namespace IceStorm;
24 using namespace IceStormElection;
26 using namespace IceDB;
28 namespace
31 void
32 halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
35 Ice::Error error(com->getLogger());
36 error << "fatal exception: " << ex << "\n*** Aborting application ***";
39 abort();
43 // The servant has a 1-1 association with a topic. It is used to
44 // receive events from Publishers.
46 class PublisherI : public Ice::BlobjectArray
48 public:
50 PublisherI(const TopicImplPtr& topic, const InstancePtr& instance) :
51 _topic(topic), _instance(instance)
55 ~PublisherI()
57 //cout << "~PublisherI" << endl;
60 virtual bool
61 ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
62 Ice::ByteSeq&,
63 const Ice::Current& current)
65 // The publish call does a cached read.
66 EventDataPtr event = new EventData(
67 current.operation,
68 current.mode,
69 Ice::ByteSeq(),
70 current.ctx);
73 // COMPILERBUG: gcc 4.0.1 doesn't like this.
75 //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
76 Ice::ByteSeq data(inParams.first, inParams.second);
77 event->data.swap(data);
79 EventDataSeq v;
80 v.push_back(event);
81 _topic->publish(false, v);
83 return true;
86 private:
88 const TopicImplPtr _topic;
89 const InstancePtr _instance;
93 // The servant has a 1-1 association with a topic. It is used to
94 // receive events from linked Topics.
96 class TopicLinkI : public TopicLink
98 public:
100 TopicLinkI(const TopicImplPtr& impl, const InstancePtr& instance) :
101 _impl(impl), _instance(instance)
105 ~TopicLinkI()
107 //cout << "~TopicLinkI" << endl;
110 virtual void
111 forward(const EventDataSeq& v, const Ice::Current& current)
113 // The publish call does a cached read.
114 _impl->publish(true, v);
117 private:
119 const TopicImplPtr _impl;
120 const InstancePtr _instance;
123 class TopicI : public TopicInternal
125 public:
127 TopicI(const TopicImplPtr& impl, const InstancePtr& instance) :
128 _impl(impl), _instance(instance)
132 ~TopicI()
134 //cout << "~TopicI" << endl;
137 virtual string getName(const Ice::Current&) const
139 // Use cached reads.
140 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
141 return _impl->getName();
144 virtual Ice::ObjectPrx getPublisher(const Ice::Current&) const
146 // Use cached reads.
147 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
148 return _impl->getPublisher();
151 virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current&) const
153 // Use cached reads.
154 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
155 return _impl->getNonReplicatedPublisher();
158 virtual void subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& current)
160 while(true)
162 Ice::Long generation = -1;
163 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
164 if(master)
168 master->subscribe(qos, obj);
170 catch(const Ice::ConnectFailedException&)
172 _instance->node()->recovery(generation);
173 continue;
175 catch(const Ice::TimeoutException&)
177 _instance->node()->recovery(generation);
178 continue;
181 else
183 FinishUpdateHelper unlock(_instance->node());
184 _impl->subscribe(qos, obj);
186 break;
190 virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj,
191 const Ice::Current& current)
193 while(true)
195 Ice::Long generation = -1;
196 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
197 if(master)
201 return master->subscribeAndGetPublisher(qos, obj);
203 catch(const Ice::ConnectFailedException&)
205 _instance->node()->recovery(generation);
206 continue;
208 catch(const Ice::TimeoutException&)
210 _instance->node()->recovery(generation);
211 continue;
214 else
216 FinishUpdateHelper unlock(_instance->node());
217 return _impl->subscribeAndGetPublisher(qos, obj);
222 virtual void unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current& current)
224 while(true)
226 Ice::Long generation = -1;
227 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
228 if(master)
232 master->unsubscribe(subscriber);
234 catch(const Ice::ConnectFailedException&)
236 _instance->node()->recovery(generation);
237 continue;
239 catch(const Ice::TimeoutException&)
241 _instance->node()->recovery(generation);
242 continue;
245 else
247 FinishUpdateHelper unlock(_instance->node());
248 _impl->unsubscribe(subscriber);
250 break;
254 virtual TopicLinkPrx getLinkProxy(const Ice::Current&)
256 // Use cached reads.
257 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
258 return _impl->getLinkProxy();
261 virtual void reap(const Ice::IdentitySeq& ids, const Ice::Current& current)
263 NodeIPtr node = _instance->node();
264 if(!node->updateMaster(__FILE__, __LINE__))
266 throw ReapWouldBlock();
268 FinishUpdateHelper unlock(node);
269 _impl->reap(ids);
272 virtual void link(const TopicPrx& topic, Ice::Int cost, const Ice::Current& current)
274 while(true)
276 Ice::Long generation = -1;
277 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
278 if(master)
282 master->link(topic, cost);
284 catch(const Ice::ConnectFailedException&)
286 _instance->node()->recovery(generation);
287 continue;
289 catch(const Ice::TimeoutException&)
291 _instance->node()->recovery(generation);
292 continue;
295 else
297 FinishUpdateHelper unlock(_instance->node());
298 _impl->link(topic, cost);
300 break;
304 virtual void unlink(const TopicPrx& topic, const Ice::Current& current)
306 while(true)
308 Ice::Long generation = -1;
309 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
310 if(master)
314 master->unlink(topic);
316 catch(const Ice::ConnectFailedException&)
318 _instance->node()->recovery(generation);
319 continue;
321 catch(const Ice::TimeoutException&)
323 _instance->node()->recovery(generation);
324 continue;
327 else
329 FinishUpdateHelper unlock(_instance->node());
330 _impl->unlink(topic);
332 break;
336 virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current&) const
338 // Use cached reads.
339 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
340 return _impl->getLinkInfoSeq();
343 virtual void destroy(const Ice::Current& current)
345 while(true)
347 Ice::Long generation = -1;
348 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
349 if(master)
353 master->destroy();
355 catch(const Ice::ConnectFailedException&)
357 _instance->node()->recovery(generation);
358 continue;
360 catch(const Ice::TimeoutException&)
362 _instance->node()->recovery(generation);
363 continue;
366 else
368 FinishUpdateHelper unlock(_instance->node());
369 _impl->destroy();
371 break;
375 private:
377 TopicPrx getMasterFor(const Ice::Current& cur, Ice::Long& generation, const char* file, int line) const
379 NodeIPtr node = _instance->node();
380 Ice::ObjectPrx master;
381 if(node)
383 master = _instance->node()->startUpdate(generation, file, line);
385 return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) : TopicPrx();
388 const TopicImplPtr _impl;
389 const InstancePtr _instance;
394 namespace IceStorm
396 extern string identityToTopicName(const Ice::Identity& id);
399 TopicImpl::TopicImpl(
400 const InstancePtr& instance,
401 const string& name,
402 const Ice::Identity& id,
403 const SubscriberRecordSeq& subscribers) :
404 _instance(instance),
405 _name(name),
406 _id(id),
407 _databaseCache(instance->databaseCache()),
408 _destroyed(false)
412 __setNoDelete(true);
414 // TODO: If we want to improve the performance of the
415 // non-replicated case we could allocate a null-topic impl here.
416 _servant = new TopicI(this, instance);
419 // Create a servant per topic to receive event data. If the
420 // category is empty then we are in backwards compatibility
421 // mode. In this case the servant's identity is
422 // category=<topicname>, name=publish, otherwise the name is
423 // <instancename>/<topicname>.publish. The same applies to the
424 // link proxy.
426 // Activate the object and save a reference to give to publishers.
428 Ice::Identity pubid;
429 Ice::Identity linkid;
430 if(id.category.empty())
432 pubid.category = _name;
433 pubid.name = "publish";
434 linkid.category = _name;
435 linkid.name = "link";
437 else
439 pubid.category = id.category;
440 pubid.name = _name + ".publish";
441 linkid.category = id.category;
442 linkid.name = _name + ".link";
445 _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid);
446 _linkPrx = TopicLinkPrx::uncheckedCast(
447 _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid));
450 // Re-establish subscribers.
452 for(SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p)
454 Ice::Identity id = p->obj->ice_getIdentity();
455 TraceLevelsPtr traceLevels = _instance->traceLevels();
456 if(traceLevels->topic > 0)
458 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
459 out << _name << " recreate " << _instance->communicator()->identityToString(id);
460 if(traceLevels->topic > 1)
462 out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
469 // Create the subscriber object add it to the set of
470 // subscribers.
472 SubscriberPtr subscriber = Subscriber::create(_instance, *p);
473 _subscribers.push_back(subscriber);
475 catch(const Ice::Exception& ex)
477 Ice::Warning out(traceLevels->logger);
478 out << _name << " recreate " << _instance->communicator()->identityToString(id);
479 if(traceLevels->topic > 1)
481 out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
483 out << " failed: " << ex;
487 catch(...)
489 shutdown();
490 __setNoDelete(false);
491 throw;
493 __setNoDelete(false);
496 TopicImpl::~TopicImpl()
498 //cout << "~TopicImpl" << endl;
501 string
502 TopicImpl::getName() const
504 // Immutable
505 return _name;
508 Ice::ObjectPrx
509 TopicImpl::getPublisher() const
511 // Immutable
512 if(_instance->publisherReplicaProxy())
514 return _instance->publisherReplicaProxy()->ice_identity(_publisherPrx->ice_getIdentity());
516 return _publisherPrx;
519 Ice::ObjectPrx
520 TopicImpl::getNonReplicatedPublisher() const
522 // If there is an adapter id configured then we're using icegrid
523 // so create an indirect proxy, otherwise create a direct proxy.
524 if(!_publisherPrx->ice_getAdapterId().empty())
526 return _instance->publishAdapter()->createIndirectProxy(_publisherPrx->ice_getIdentity());
528 else
530 return _instance->publishAdapter()->createDirectProxy(_publisherPrx->ice_getIdentity());
535 // COMPILERFIX: For some reason with VC6 find reports an error.
537 #if defined(_MSC_VER) && (_MSC_VER < 1300)
538 namespace
540 vector<SubscriberPtr>::iterator
541 find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, const Ice::Identity& ident)
543 while(start != end)
545 if(*start == ident)
547 return start;
549 ++start;
551 return end;
554 #endif
556 namespace
558 void
559 trace(Ice::Trace& out, const InstancePtr& instance, const vector<SubscriberPtr>& s)
561 out << '[';
562 for(vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p)
564 if(p != s.begin())
566 out << ",";
568 out << instance->communicator()->identityToString((*p)->id());
570 out << "]";
574 void
575 TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
577 Ice::Identity id = obj->ice_getIdentity();
578 TraceLevelsPtr traceLevels = _instance->traceLevels();
579 QoS qos = origQoS;
580 if(traceLevels->topic > 0)
582 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
583 out << _name << ": subscribe: " << _instance->communicator()->identityToString(id);
585 if(traceLevels->topic > 1)
587 out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
588 << " QoS: ";
589 for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
591 if(p != qos.begin())
593 out << ',';
595 out << '[' << p->first << "," << p->second << ']';
597 out << " subscriptions: ";
598 trace(out, _instance, _subscribers);
602 string reliability = "oneway";
604 QoS::iterator p = qos.find("reliability");
605 if(p != qos.end())
607 reliability = p->second;
608 qos.erase(p);
612 Ice::ObjectPrx newObj = obj;
613 if(reliability == "batch")
615 if(newObj->ice_isDatagram())
617 newObj = newObj->ice_batchDatagram();
619 else
621 newObj = newObj->ice_batchOneway();
624 else if(reliability == "twoway")
626 newObj = newObj->ice_twoway();
628 else if(reliability == "twoway ordered")
630 qos["reliability"] = "ordered";
631 newObj = newObj->ice_twoway();
633 else // reliability == "oneway"
635 if(reliability != "oneway" && traceLevels->subscriber > 0)
637 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
638 out << reliability <<" mode not understood.";
640 if(!newObj->ice_isDatagram())
642 newObj = newObj->ice_oneway();
646 IceUtil::Mutex::Lock sync(_subscribersMutex);
647 SubscriberRecord record;
648 record.id = id;
649 record.obj = newObj;
650 record.theQoS = qos;
651 record.topicName = _name;
652 record.link = false;
653 record.cost = 0;
655 LogUpdate llu;
657 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
658 if(p != _subscribers.end())
660 // If we already have this subscriber remove it from our
661 // subscriber list and remove it from the database.
662 (*p)->destroy();
663 _subscribers.erase(p);
665 for(;;)
669 DatabaseConnectionPtr connection = _databaseCache->getConnection();
670 TransactionHolder txn(connection);
672 SubscriberRecordKey key;
673 key.topic = _id;
674 key.id = record.id;
676 SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
677 subscribersWrapper->erase(key);
679 LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
680 llu = lluWrapper->get();
681 llu.iteration++;
682 lluWrapper->put(llu);
684 txn.commit();
685 break;
687 catch(const DeadlockException&)
689 continue;
691 catch(const DatabaseException& ex)
693 halt(_instance->communicator(), ex);
696 Ice::IdentitySeq ids;
697 ids.push_back(id);
698 _instance->observers()->removeSubscriber(llu, _name, ids);
701 SubscriberPtr subscriber = Subscriber::create(_instance, record);
702 for(;;)
706 DatabaseConnectionPtr connection = _databaseCache->getConnection();
707 TransactionHolder txn(connection);
709 SubscriberRecordKey key;
710 key.topic = _id;
711 key.id = subscriber->id();
713 SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
714 subscribersWrapper->put(key, record);
716 // Update the LLU.
717 LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
718 llu = lluWrapper->get();
719 llu.iteration++;
720 lluWrapper->put(llu);
722 txn.commit();
723 break;
725 catch(const DeadlockException&)
727 continue;
729 catch(const DatabaseException& ex)
731 halt(_instance->communicator(), ex);
735 _subscribers.push_back(subscriber);
737 _instance->observers()->addSubscriber(llu, _name, record);
740 Ice::ObjectPrx
741 TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
743 Ice::Identity id = obj->ice_getIdentity();
745 TraceLevelsPtr traceLevels = _instance->traceLevels();
746 if(traceLevels->topic > 0)
748 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
749 out << _name << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
751 if(traceLevels->topic > 1)
753 out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
754 << " QoS: ";
755 for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
757 if(p != qos.begin())
759 out << ',';
763 out << " subscriptions: ";
764 trace(out, _instance, _subscribers);
768 IceUtil::Mutex::Lock sync(_subscribersMutex);
770 SubscriberRecord record;
771 record.id = id;
772 record.obj = obj;
773 record.theQoS = qos;
774 record.topicName = _name;
775 record.link = false;
776 record.cost = 0;
778 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
779 if(p != _subscribers.end())
781 throw AlreadySubscribed();
784 LogUpdate llu;
786 SubscriberPtr subscriber = Subscriber::create(_instance, record);
787 for(;;)
791 DatabaseConnectionPtr connection = _databaseCache->getConnection();
792 TransactionHolder txn(connection);
794 SubscriberRecordKey key;
795 key.topic = _id;
796 key.id = subscriber->id();
798 SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
799 subscribersWrapper->put(key, record);
801 LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
802 llu = lluWrapper->get();
803 llu.iteration++;
804 lluWrapper->put(llu);
806 txn.commit();
807 break;
809 catch(const DeadlockException&)
811 continue;
813 catch(const DatabaseException& ex)
815 halt(_instance->communicator(), ex);
819 _subscribers.push_back(subscriber);
821 _instance->observers()->addSubscriber(llu, _name, record);
823 return subscriber->proxy();
826 void
827 TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber)
829 TraceLevelsPtr traceLevels = _instance->traceLevels();
830 if(!subscriber)
832 if(traceLevels->topic > 0)
834 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
835 out << "unsubscribe with null subscriber.";
837 return;
840 Ice::Identity id = subscriber->ice_getIdentity();
842 if(traceLevels->topic > 0)
844 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
845 out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
847 if(traceLevels->topic > 1)
849 out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
850 trace(out, _instance, _subscribers);
854 IceUtil::Mutex::Lock sync(_subscribersMutex);
855 Ice::IdentitySeq ids;
856 ids.push_back(id);
857 removeSubscribers(ids);
860 TopicLinkPrx
861 TopicImpl::getLinkProxy()
863 // immutable
864 if(_instance->publisherReplicaProxy())
866 return TopicLinkPrx::uncheckedCast(_instance->publisherReplicaProxy()->ice_identity(
867 _linkPrx->ice_getIdentity()));
869 return _linkPrx;
872 void
873 TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
875 TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
876 TopicLinkPrx link = internal->getLinkProxy();
878 TraceLevelsPtr traceLevels = _instance->traceLevels();
879 if(traceLevels->topic > 0)
881 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
882 out << _name << ": link " << _instance->communicator()->identityToString(topic->ice_getIdentity())
883 << " cost " << cost;
886 IceUtil::Mutex::Lock sync(_subscribersMutex);
888 Ice::Identity id = topic->ice_getIdentity();
890 SubscriberRecord record;
891 record.id = id;
892 record.obj = link;
893 record.theTopic = topic;
894 record.topicName = _name;
895 record.link = true;
896 record.cost = cost;
898 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
899 if(p != _subscribers.end())
901 string name = identityToTopicName(id);
902 LinkExists ex;
903 ex.name = name;
904 throw ex;
907 LogUpdate llu;
909 SubscriberPtr subscriber = Subscriber::create(_instance, record);
911 for(;;)
915 DatabaseConnectionPtr connection = _databaseCache->getConnection();
916 TransactionHolder txn(connection);
918 SubscriberRecordKey key;
919 key.topic = _id;
920 key.id = id;
922 SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
923 subscribersWrapper->put(key, record);
925 LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
926 llu = lluWrapper->get();
927 llu.iteration++;
928 lluWrapper->put(llu);
930 txn.commit();
931 break;
933 catch(const DeadlockException&)
935 continue;
937 catch(const DatabaseException& ex)
939 halt(_instance->communicator(), ex);
943 _subscribers.push_back(subscriber);
945 _instance->observers()->addSubscriber(llu, _name, record);
948 void
949 TopicImpl::unlink(const TopicPrx& topic)
951 IceUtil::Mutex::Lock sync(_subscribersMutex);
952 if(_destroyed)
954 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
957 Ice::Identity id = topic->ice_getIdentity();
959 vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id);
960 if(p == _subscribers.end())
962 string name = identityToTopicName(id);
963 TraceLevelsPtr traceLevels = _instance->traceLevels();
964 if(traceLevels->topic > 0)
966 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
967 out << _name << ": unlink " << name << " failed - not linked";
970 NoSuchLink ex;
971 ex.name = name;
972 throw ex;
975 TraceLevelsPtr traceLevels = _instance->traceLevels();
976 if(traceLevels->topic > 0)
978 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
979 out << _name << " unlink " << _instance->communicator()->identityToString(id);
982 Ice::IdentitySeq ids;
983 ids.push_back(id);
984 removeSubscribers(ids);
987 void
988 TopicImpl::reap(const Ice::IdentitySeq& ids)
990 IceUtil::Mutex::Lock sync(_subscribersMutex);
992 TraceLevelsPtr traceLevels = _instance->traceLevels();
993 if(traceLevels->topic > 0)
995 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
996 out << _name << ": reap ";
997 for(Ice::IdentitySeq::const_iterator p = ids.begin(); p != ids.end() ; ++p)
999 if(p != ids.begin())
1001 out << ",";
1003 out << _instance->communicator()->identityToString(*p);
1007 removeSubscribers(ids);
1010 void
1011 TopicImpl::shutdown()
1013 IceUtil::Mutex::Lock sync(_subscribersMutex);
1014 _servant = 0;
1016 // Shutdown each subscriber. This waits for the event queues to drain.
1017 for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
1019 (*p)->shutdown();
1023 LinkInfoSeq
1024 TopicImpl::getLinkInfoSeq() const
1026 IceUtil::Mutex::Lock sync(_subscribersMutex);
1028 LinkInfoSeq seq;
1029 for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
1031 SubscriberRecord record = (*p)->record();
1032 if(record.link && !(*p)->errored())
1034 LinkInfo info;
1035 info.name = identityToTopicName(record.theTopic->ice_getIdentity());
1036 info.cost = record.cost;
1037 info.theTopic = record.theTopic;
1038 seq.push_back(info);
1041 return seq;
1044 void
1045 TopicImpl::destroy()
1047 IceUtil::Mutex::Lock sync(_subscribersMutex);
1049 if(_destroyed)
1051 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
1053 _destroyed = true;
1055 TraceLevelsPtr traceLevels = _instance->traceLevels();
1056 if(traceLevels->topic > 0)
1058 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1059 out << _name << ": destroy";
1062 // destroyInternal clears out the topic content.
1063 LogUpdate llu = {0,0};
1064 _instance->observers()->destroyTopic(destroyInternal(llu, true), _name);
1067 TopicContent
1068 TopicImpl::getContent() const
1070 IceUtil::Mutex::Lock sync(_subscribersMutex);
1072 TopicContent content;
1073 content.id = _id;
1074 for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
1076 // Don't return errored subscribers (subscribers that have
1077 // errored out, but not reaped due to a failure with the
1078 // master). This means we can avoid the reaping step later.
1079 if(!(*p)->errored())
1081 content.records.push_back((*p)->record());
1084 return content;
1087 void
1088 TopicImpl::update(const SubscriberRecordSeq& records)
1090 IceUtil::Mutex::Lock sync(_subscribersMutex);
1092 // We do this with two scans. The first runs through the subscribers
1093 // that we have and removes those not in the init list. The second
1094 // runs through the init list and add the ones that don't
1095 // exist.
1098 vector<SubscriberPtr>::iterator p = _subscribers.begin();
1099 while(p != _subscribers.end())
1101 SubscriberRecordSeq::const_iterator q;
1102 for(q = records.begin(); q != records.end(); ++q)
1104 if((*p)->id() == q->id)
1106 break;
1109 // The subscriber doesn't exist in the incoming subscriber
1110 // set so destroy it.
1111 if(q == records.end())
1113 (*p)->destroy();
1114 p = _subscribers.erase(p);
1116 else
1118 // Otherwise reset the reaped status if necessary.
1119 (*p)->resetIfReaped();
1120 ++p;
1125 for(SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p)
1127 vector<SubscriberPtr>::iterator q;
1128 for(q = _subscribers.begin(); q != _subscribers.end(); ++q)
1130 if((*q)->id() == p->id)
1132 break;
1135 if(q == _subscribers.end())
1137 SubscriberPtr subscriber = Subscriber::create(_instance, *p);
1138 _subscribers.push_back(subscriber);
1143 bool
1144 TopicImpl::destroyed() const
1146 IceUtil::Mutex::Lock sync(_subscribersMutex);
1147 return _destroyed;
1150 Ice::Identity
1151 TopicImpl::id() const
1153 // immutable
1154 return _id;
1157 TopicPrx
1158 TopicImpl::proxy() const
1160 // immutable
1161 Ice::ObjectPrx prx;
1162 if(_instance->topicReplicaProxy())
1164 prx = _instance->topicReplicaProxy()->ice_identity(_id);
1166 else
1168 prx = _instance->topicAdapter()->createProxy(_id);
1170 return TopicPrx::uncheckedCast(prx);
1173 namespace
1176 class TopicInternal_reapI : public AMI_TopicInternal_reap
1178 public:
1180 TopicInternal_reapI(const InstancePtr& instance, Ice::Long generation) :
1181 _instance(instance), _generation(generation)
1185 virtual void ice_response()
1189 virtual void ice_exception(const Ice::Exception& ex)
1191 TraceLevelsPtr traceLevels = _instance->traceLevels();
1192 if(traceLevels->topic > 0)
1194 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1195 out << "exception when calling `reap' on the master replica: " << ex;
1197 _instance->node()->recovery(_generation);
1200 private:
1202 const InstancePtr _instance;
1203 const Ice::Long _generation;
1208 void
1209 TopicImpl::publish(bool forwarded, const EventDataSeq& events)
1211 TopicInternalPrx masterInternal;
1212 Ice::Long generation = -1;
1213 Ice::IdentitySeq reap;
1215 // Use cached reads.
1216 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
1219 // Copy of the subscriber list so that event publishing can occur
1220 // in parallel.
1222 vector<SubscriberPtr> copy;
1224 IceUtil::Mutex::Lock sync(_subscribersMutex);
1225 copy = _subscribers;
1229 // Queue each event, gathering a list of those subscribers that
1230 // must be reaped.
1232 for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
1234 if(!(*p)->queue(forwarded, events) && (*p)->reap())
1236 reap.push_back((*p)->id());
1240 // If there are no subscribers in error then we're done.
1241 if(reap.empty())
1243 return;
1245 if(!unlock.getMaster())
1247 IceUtil::Mutex::Lock sync(_subscribersMutex);
1248 removeSubscribers(reap);
1249 return;
1251 masterInternal = TopicInternalPrx::uncheckedCast(unlock.getMaster()->ice_identity(_id));
1252 generation = unlock.generation();
1256 // Tell the master to reap this set of subscribers. This is an
1257 // AMI invocation so it shouldn't block the caller (in the
1258 // typical case) we do it outside of the mutex lock for
1259 // performance reasons.
1261 // We must release the cached lock before calling this as the AMI
1262 // call may raise an exception in the caller (that is directly
1263 // call ice_exception) which calls recover() on the node which
1264 // would result in a deadlock since the node is locked.
1265 masterInternal->reap_async(new TopicInternal_reapI(_instance, generation), reap);
1268 void
1269 TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& record)
1271 IceUtil::Mutex::Lock sync(_subscribersMutex);
1273 TraceLevelsPtr traceLevels = _instance->traceLevels();
1274 if(traceLevels->topic > 0)
1276 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1277 out << _name << ": add replica observer: " << _instance->communicator()->identityToString(record.id);
1279 if(traceLevels->topic > 1)
1281 out << " endpoints: " << IceStormInternal::describeEndpoints(record.obj)
1282 << " QoS: ";
1283 for(QoS::const_iterator p = record.theQoS.begin(); p != record.theQoS.end() ; ++p)
1285 if(p != record.theQoS.begin())
1287 out << ',';
1289 out << '[' << p->first << "," << p->second << ']';
1292 out << " llu: " << llu.generation << "/" << llu.iteration;
1295 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
1296 if(p != _subscribers.end())
1298 // If the subscriber is already in the database display a
1299 // diagnostic.
1300 TraceLevelsPtr traceLevels = _instance->traceLevels();
1301 if(traceLevels->topic > 0)
1303 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1304 out << _instance->communicator()->identityToString(record.id) << ": already subscribed";
1306 return;
1309 SubscriberPtr subscriber = Subscriber::create(_instance, record);
1310 for(;;)
1314 DatabaseConnectionPtr connection = _databaseCache->getConnection();
1315 TransactionHolder txn(connection);
1317 SubscriberRecordKey key;
1318 key.topic = _id;
1319 key.id = subscriber->id();
1321 SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
1322 subscribersWrapper->put(key, record);
1324 // Update the LLU.
1325 LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
1326 lluWrapper->put(llu);
1328 txn.commit();
1329 break;
1331 catch(const DeadlockException&)
1333 continue;
1335 catch(const DatabaseException& ex)
1337 halt(_instance->communicator(), ex);
1341 _subscribers.push_back(subscriber);
1344 void
1345 TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq& ids)
1347 TraceLevelsPtr traceLevels = _instance->traceLevels();
1348 if(traceLevels->topic > 0)
1350 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1351 out << _name << ": remove replica observer: ";
1352 for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1354 if(id != ids.begin())
1356 out << ",";
1358 out << _instance->communicator()->identityToString(*id);
1360 out << " llu: " << llu.generation << "/" << llu.iteration;
1363 IceUtil::Mutex::Lock sync(_subscribersMutex);
1365 // Remove the subscriber from the subscribers list. If the
1366 // subscriber had a local failure and was removed from the
1367 // subscriber list it could already be gone. That's not a problem.
1368 for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1370 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
1371 if(p != _subscribers.end())
1373 (*p)->destroy();
1374 _subscribers.erase(p);
1378 // Next remove from the database.
1379 for(;;)
1383 DatabaseConnectionPtr connection = _databaseCache->getConnection();
1384 TransactionHolder txn(connection);
1386 for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1388 SubscriberRecordKey key;
1389 key.topic = _id;
1390 key.id = *id;
1392 SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
1393 subscribersWrapper->erase(key);
1396 LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
1397 lluWrapper->put(llu);
1399 txn.commit();
1400 break;
1402 catch(const DeadlockException&)
1404 continue;
1406 catch(const DatabaseException& ex)
1408 halt(_instance->communicator(), ex);
1413 void
1414 TopicImpl::observerDestroyTopic(const LogUpdate& llu)
1416 IceUtil::Mutex::Lock sync(_subscribersMutex);
1418 if(_destroyed)
1420 return;
1422 _destroyed = true;
1424 TraceLevelsPtr traceLevels = _instance->traceLevels();
1425 if(traceLevels->topic > 0)
1427 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1428 out << _name << ": destroyed";
1429 out << " llu: " << llu.generation << "/" << llu.iteration;
1431 destroyInternal(llu, false);
1434 Ice::ObjectPtr
1435 TopicImpl::getServant() const
1437 return _servant;
1440 LogUpdate
1441 TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
1443 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
1444 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
1446 // Destroy each of the subscribers.
1447 for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
1449 (*p)->destroy();
1451 _subscribers.clear();
1453 // Clear out the database records related to this topic.
1454 LogUpdate llu;
1455 for(;;)
1459 DatabaseConnectionPtr connection = _databaseCache->getConnection();
1460 TransactionHolder txn(connection);
1462 // Erase all subscriber records and the topic record.
1463 SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
1464 subscribersWrapper->eraseTopic(_id);
1466 // Update the LLU.
1467 LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
1468 if(master)
1470 llu = lluWrapper->get();
1471 llu.iteration++;
1473 else
1475 llu = origLLU;
1477 lluWrapper->put(llu);
1479 txn.commit();
1480 break;
1482 catch(const DeadlockException&)
1484 continue;
1486 catch(const DatabaseException& ex)
1488 halt(_instance->communicator(), ex);
1492 _instance->topicAdapter()->remove(_id);
1494 _servant = 0;
1496 return llu;
1499 void
1500 TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
1502 Ice::IdentitySeq removed;
1504 // First remove the subscriber from the subscribers list. Its
1505 // possible that some of these subscribers have already been
1506 // removed (consider, for example, a concurrent reap call from two
1507 // replicas on the same subscriber). To avoid sending unnecessary
1508 // observer updates keep track of the observers that are actually
1509 // removed.
1510 for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1512 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
1513 if(p != _subscribers.end())
1515 (*p)->destroy();
1516 _subscribers.erase(p);
1517 removed.push_back(*id);
1521 // If there is no further work to do we are done.
1522 if(removed.empty())
1524 return;
1527 // Next update the database and send the notification to any
1528 // slaves.
1529 LogUpdate llu;
1530 for(;;)
1534 DatabaseConnectionPtr connection = _databaseCache->getConnection();
1535 TransactionHolder txn(connection);
1537 for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1539 SubscriberRecordKey key;
1540 key.topic = _id;
1541 key.id = *id;
1543 SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
1544 subscribersWrapper->erase(key);
1547 LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
1548 llu = lluWrapper->get();
1549 llu.iteration++;
1550 lluWrapper->put(llu);
1552 txn.commit();
1553 break;
1555 catch(const DeadlockException&)
1557 continue;
1559 catch(const DatabaseException& ex)
1561 halt(_instance->communicator(), ex);
1565 _instance->observers()->removeSubscriber(llu, _name, ids);