1 // **********************************************************************
3 // Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
8 // **********************************************************************
10 #include <IceUtil/DisableWarnings.h>
11 #include <IceStorm/TransientTopicI.h>
12 #include <IceStorm/Instance.h>
13 #include <IceStorm/Subscriber.h>
14 #include <IceStorm/TraceLevels.h>
15 #include <IceStorm/Util.h>
22 using namespace IceStorm
;
29 // The servant has a 1-1 association with a topic. It is used to
30 // receive events from Publishers.
32 class TransientPublisherI
: public Ice::BlobjectArray
36 TransientPublisherI(const TransientTopicImplPtr
& impl
) :
42 ice_invoke(const pair
<const Ice::Byte
*, const Ice::Byte
*>& inParams
,
44 const Ice::Current
& current
)
47 EventDataPtr event
= new EventData(
54 // COMPILERBUG: gcc 4.0.1 doesn't like this.
56 //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
57 Ice::ByteSeq
data(inParams
.first
, inParams
.second
);
58 event
->data
.swap(data
);
62 _impl
->publish(false, v
);
69 const TransientTopicImplPtr _impl
;
73 // The servant has a 1-1 association with a topic. It is used to
74 // receive events from linked Topics.
76 class TransientTopicLinkI
: public TopicLink
80 TransientTopicLinkI(const TransientTopicImplPtr
& impl
) :
86 forward(const EventDataSeq
& v
, const Ice::Current
& current
)
88 _impl
->publish(true, v
);
93 const TransientTopicImplPtr _impl
;
100 extern string
identityToTopicName(const Ice::Identity
& id
);
103 TransientTopicImpl::TransientTopicImpl(
104 const InstancePtr
& instance
,
106 const Ice::Identity
& id
) :
113 // Create a servant per topic to receive event data. If the
114 // category is empty then we are in backwards compatibility
115 // mode. In this case the servant's identity is
116 // category=<topicname>, name=publish, otherwise the name is
117 // <instancename>/<topicname>.publish. The same applies to the
120 // Activate the object and save a reference to give to publishers.
123 Ice::Identity linkid
;
124 if(id
.category
.empty())
126 pubid
.category
= _name
;
127 pubid
.name
= "publish";
128 linkid
.category
= _name
;
129 linkid
.name
= "link";
133 pubid
.category
= id
.category
;
134 pubid
.name
= _name
+ ".publish";
135 linkid
.category
= id
.category
;
136 linkid
.name
= _name
+ ".link";
139 _publisherPrx
= _instance
->publishAdapter()->add(new TransientPublisherI(this), pubid
);
140 _linkPrx
= TopicLinkPrx::uncheckedCast(_instance
->publishAdapter()->add(new TransientTopicLinkI(this), linkid
));
143 TransientTopicImpl::~TransientTopicImpl()
148 TransientTopicImpl::getName(const Ice::Current
&) const
155 TransientTopicImpl::getPublisher(const Ice::Current
&) const
158 return _publisherPrx
;
162 TransientTopicImpl::getNonReplicatedPublisher(const Ice::Current
&) const
165 return _publisherPrx
;
169 // COMPILERFIX: For some reason with VC6 find reports an error.
171 #if defined(_MSC_VER) && (_MSC_VER < 1300)
174 static vector
<SubscriberPtr
>::iterator
175 find(vector
<SubscriberPtr
>::iterator start
, vector
<SubscriberPtr
>::iterator end
, const Ice::Identity
& ident
)
191 TransientTopicImpl::subscribe(const QoS
& origQoS
, const Ice::ObjectPrx
& obj
, const Ice::Current
&)
193 Ice::Identity id
= obj
->ice_getIdentity();
194 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
196 if(traceLevels
->topic
> 0)
198 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
199 out
<< _name
<< ": subscribe: " << _instance
->communicator()->identityToString(id
);
201 if(traceLevels
->topic
> 1)
203 out
<< " endpoints: " << IceStormInternal::describeEndpoints(obj
)
205 for(QoS::const_iterator p
= qos
.begin(); p
!= qos
.end() ; ++p
)
211 out
<< '[' << p
->first
<< "," << p
->second
<< ']';
216 string reliability
= "oneway";
218 QoS::iterator p
= qos
.find("reliability");
221 reliability
= p
->second
;
226 Ice::ObjectPrx newObj
= obj
;
227 if(reliability
== "batch")
229 if(newObj
->ice_isDatagram())
231 newObj
= newObj
->ice_batchDatagram();
235 newObj
= newObj
->ice_batchOneway();
238 else if(reliability
== "twoway")
240 newObj
= newObj
->ice_twoway();
242 else if(reliability
== "twoway ordered")
244 qos
["reliability"] = "ordered";
245 newObj
= newObj
->ice_twoway();
247 else // reliability == "oneway"
249 if(reliability
!= "oneway" && traceLevels
->subscriber
> 0)
251 Ice::Trace
out(traceLevels
->logger
, traceLevels
->subscriberCat
);
252 out
<< reliability
<<" mode not understood.";
254 if(!newObj
->ice_isDatagram())
256 newObj
= newObj
->ice_oneway();
261 SubscriberRecord record
;
265 record
.topicName
= _name
;
269 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), record
.id
);
270 if(p
!= _subscribers
.end())
272 // If we already have this subscriber remove it from our
273 // subscriber list and remove it from the database.
275 _subscribers
.erase(p
);
278 SubscriberPtr subscriber
= Subscriber::create(_instance
, record
);
279 _subscribers
.push_back(subscriber
);
283 TransientTopicImpl::subscribeAndGetPublisher(const QoS
& qos
, const Ice::ObjectPrx
& obj
, const Ice::Current
&)
285 Ice::Identity id
= obj
->ice_getIdentity();
287 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
288 if(traceLevels
->topic
> 0)
290 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
291 out
<< _name
<< ": subscribeAndGetPublisher: " << _instance
->communicator()->identityToString(id
);
293 if(traceLevels
->topic
> 1)
295 out
<< " endpoints: " << IceStormInternal::describeEndpoints(obj
)
297 for(QoS::const_iterator p
= qos
.begin(); p
!= qos
.end() ; ++p
)
310 SubscriberRecord record
;
314 record
.topicName
= _name
;
318 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), record
.id
);
319 if(p
!= _subscribers
.end())
321 throw AlreadySubscribed();
324 SubscriberPtr subscriber
= Subscriber::create(_instance
, record
);
325 _subscribers
.push_back(subscriber
);
327 return subscriber
->proxy();
331 TransientTopicImpl::unsubscribe(const Ice::ObjectPrx
& subscriber
, const Ice::Current
&)
333 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
336 if(traceLevels
->topic
> 0)
338 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
339 out
<< "unsubscribe with null subscriber.";
344 Ice::Identity id
= subscriber
->ice_getIdentity();
346 if(traceLevels
->topic
> 0)
348 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
349 out
<< _name
<< ": unsubscribe: " << _instance
->communicator()->identityToString(id
);
350 if(traceLevels
->topic
> 1)
352 out
<< " endpoints: " << IceStormInternal::describeEndpoints(subscriber
);
357 // First remove the subscriber from the subscribers list. Note
358 // that its possible that the subscriber isn't in the list, but is
359 // in the database if the subscriber was locally reaped.
360 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), id
);
361 if(p
!= _subscribers
.end())
364 _subscribers
.erase(p
);
369 TransientTopicImpl::getLinkProxy(const Ice::Current
&)
376 TransientTopicImpl::link(const TopicPrx
& topic
, Ice::Int cost
, const Ice::Current
&)
378 TopicInternalPrx internal
= TopicInternalPrx::uncheckedCast(topic
);
379 TopicLinkPrx link
= internal
->getLinkProxy();
381 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
382 if(traceLevels
->topic
> 0)
384 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
385 out
<< _name
<< ": link " << _instance
->communicator()->identityToString(topic
->ice_getIdentity())
391 Ice::Identity id
= topic
->ice_getIdentity();
393 SubscriberRecord record
;
396 record
.theTopic
= topic
;
397 record
.topicName
= _name
;
401 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), record
.id
);
402 if(p
!= _subscribers
.end())
404 string name
= identityToTopicName(id
);
410 SubscriberPtr subscriber
= Subscriber::create(_instance
, record
);
411 _subscribers
.push_back(subscriber
);
415 TransientTopicImpl::unlink(const TopicPrx
& topic
, const Ice::Current
&)
420 throw Ice::ObjectNotExistException(__FILE__
, __LINE__
);
423 Ice::Identity id
= topic
->ice_getIdentity();
425 vector
<SubscriberPtr
>::iterator p
= find(_subscribers
.begin(), _subscribers
.end(), id
);
426 if(p
== _subscribers
.end())
428 string name
= identityToTopicName(id
);
429 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
430 if(traceLevels
->topic
> 0)
432 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
433 out
<< _name
<< ": unlink " << name
<< " failed - not linked";
441 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
442 if(traceLevels
->topic
> 0)
444 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
445 out
<< _name
<< " unlink " << _instance
->communicator()->identityToString(id
);
448 // Remove the subscriber from the subscribers list. Note
449 // that its possible that the subscriber isn't in the list, but is
450 // in the database if the subscriber was locally reaped.
451 p
= find(_subscribers
.begin(), _subscribers
.end(), id
);
452 if(p
!= _subscribers
.end())
455 _subscribers
.erase(p
);
460 TransientTopicImpl::getLinkInfoSeq(const Ice::Current
&) const
464 for(vector
<SubscriberPtr
>::const_iterator p
= _subscribers
.begin(); p
!= _subscribers
.end(); ++p
)
466 SubscriberRecord record
= (*p
)->record();
467 if(record
.link
&& !(*p
)->errored())
470 info
.name
= identityToTopicName(record
.theTopic
->ice_getIdentity());
471 info
.cost
= record
.cost
;
472 info
.theTopic
= record
.theTopic
;
480 TransientTopicImpl::destroy(const Ice::Current
&)
486 throw Ice::ObjectNotExistException(__FILE__
, __LINE__
);
490 TraceLevelsPtr traceLevels
= _instance
->traceLevels();
491 if(traceLevels
->topic
> 0)
493 Ice::Trace
out(traceLevels
->logger
, traceLevels
->topicCat
);
494 out
<< _name
<< ": destroy";
499 _instance
->publishAdapter()->remove(_linkPrx
->ice_getIdentity());
500 _instance
->publishAdapter()->remove(_publisherPrx
->ice_getIdentity());
502 catch(const Ice::ObjectAdapterDeactivatedException
&)
504 // Ignore -- this could occur on shutdown.
507 // Destroy all of the subscribers.
508 for(vector
<SubscriberPtr
>::const_iterator p
= _subscribers
.begin(); p
!= _subscribers
.end(); ++p
)
512 _subscribers
.clear();
516 TransientTopicImpl::reap(const Ice::IdentitySeq
&, const Ice::Current
&)
521 TransientTopicImpl::destroyed() const
528 TransientTopicImpl::id() const
535 TransientTopicImpl::publish(bool forwarded
, const EventDataSeq
& events
)
538 // Copy of the subscriber list so that event publishing can occur
541 vector
<SubscriberPtr
> copy
;
548 // Queue each event, gathering a list of those subscribers that
551 vector
<Ice::Identity
> e
;
552 for(vector
<SubscriberPtr
>::const_iterator p
= copy
.begin(); p
!= copy
.end(); ++p
)
554 if(!(*p
)->queue(forwarded
, events
) && (*p
)->reap())
556 e
.push_back((*p
)->id());
561 // Run through the error list removing those subscribers that are
562 // in error from the subscriber list.
567 for(vector
<Ice::Identity
>::const_iterator ep
= e
.begin(); ep
!= e
.end(); ++ep
)
570 // Its possible for the subscriber to already have been
571 // removed since the copy is iterated over outside of
574 // Note that although this could be quicker if we used a
575 // map, the most optimal case should be pushing around
576 // events not searching for a particular subscriber.
578 // The subscriber is immediately destroyed & removed from
579 // the _subscribers list. Add the subscriber to a list of
580 // error'd subscribers and remove it from the database on
583 vector
<SubscriberPtr
>::iterator q
= find(_subscribers
.begin(), _subscribers
.end(), *ep
);
584 if(q
!= _subscribers
.end())
586 SubscriberPtr subscriber
= *q
;
588 // Destroy the subscriber.
590 subscriber
->destroy();
591 _subscribers
.erase(q
);
598 TransientTopicImpl::shutdown()
602 // Shutdown each subscriber. This waits for the event queues to drain.
603 for(vector
<SubscriberPtr
>::const_iterator p
= _subscribers
.begin(); p
!= _subscribers
.end(); ++p
)