ICE 3.4.2
[php5-ice-freebsdport.git] / cpp / src / IceStorm / TransientTopicI.cpp
blob09bb6d80addf6a18b569ef09498bc1883a1bfaed
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
10 #include <IceUtil/DisableWarnings.h>
11 #include <IceStorm/TransientTopicI.h>
12 #include <IceStorm/Instance.h>
13 #include <IceStorm/Subscriber.h>
14 #include <IceStorm/TraceLevels.h>
15 #include <IceStorm/Util.h>
17 #include <Ice/Ice.h>
19 #include <list>
20 #include <algorithm>
22 using namespace IceStorm;
23 using namespace std;
25 namespace
29 // The servant has a 1-1 association with a topic. It is used to
30 // receive events from Publishers.
32 class TransientPublisherI : public Ice::BlobjectArray
34 public:
36 TransientPublisherI(const TransientTopicImplPtr& impl) :
37 _impl(impl)
41 virtual bool
42 ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
43 Ice::ByteSeq&,
44 const Ice::Current& current)
46 // Use cached reads.
47 EventDataPtr event = new EventData(
48 current.operation,
49 current.mode,
50 Ice::ByteSeq(),
51 current.ctx);
54 // COMPILERBUG: gcc 4.0.1 doesn't like this.
56 //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
57 Ice::ByteSeq data(inParams.first, inParams.second);
58 event->data.swap(data);
60 EventDataSeq v;
61 v.push_back(event);
62 _impl->publish(false, v);
64 return true;
67 private:
69 const TransientTopicImplPtr _impl;
73 // The servant has a 1-1 association with a topic. It is used to
74 // receive events from linked Topics.
76 class TransientTopicLinkI : public TopicLink
78 public:
80 TransientTopicLinkI(const TransientTopicImplPtr& impl) :
81 _impl(impl)
85 virtual void
86 forward(const EventDataSeq& v, const Ice::Current& current)
88 _impl->publish(true, v);
91 private:
93 const TransientTopicImplPtr _impl;
98 namespace IceStorm
100 extern string identityToTopicName(const Ice::Identity& id);
103 TransientTopicImpl::TransientTopicImpl(
104 const InstancePtr& instance,
105 const string& name,
106 const Ice::Identity& id) :
107 _instance(instance),
108 _name(name),
109 _id(id),
110 _destroyed(false)
113 // Create a servant per topic to receive event data. If the
114 // category is empty then we are in backwards compatibility
115 // mode. In this case the servant's identity is
116 // category=<topicname>, name=publish, otherwise the name is
117 // <instancename>/<topicname>.publish. The same applies to the
118 // link proxy.
120 // Activate the object and save a reference to give to publishers.
122 Ice::Identity pubid;
123 Ice::Identity linkid;
124 if(id.category.empty())
126 pubid.category = _name;
127 pubid.name = "publish";
128 linkid.category = _name;
129 linkid.name = "link";
131 else
133 pubid.category = id.category;
134 pubid.name = _name + ".publish";
135 linkid.category = id.category;
136 linkid.name = _name + ".link";
139 _publisherPrx = _instance->publishAdapter()->add(new TransientPublisherI(this), pubid);
140 _linkPrx = TopicLinkPrx::uncheckedCast(_instance->publishAdapter()->add(new TransientTopicLinkI(this), linkid));
143 TransientTopicImpl::~TransientTopicImpl()
147 string
148 TransientTopicImpl::getName(const Ice::Current&) const
150 // Immutable
151 return _name;
154 Ice::ObjectPrx
155 TransientTopicImpl::getPublisher(const Ice::Current&) const
157 // Immutable
158 return _publisherPrx;
161 Ice::ObjectPrx
162 TransientTopicImpl::getNonReplicatedPublisher(const Ice::Current&) const
164 // Immutable
165 return _publisherPrx;
169 // COMPILERFIX: For some reason with VC6 find reports an error.
171 #if defined(_MSC_VER) && (_MSC_VER < 1300)
172 namespace
174 static vector<SubscriberPtr>::iterator
175 find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, const Ice::Identity& ident)
177 while(start != end)
179 if(*start == ident)
181 return start;
183 ++start;
185 return end;
188 #endif
190 void
191 TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
193 Ice::Identity id = obj->ice_getIdentity();
194 TraceLevelsPtr traceLevels = _instance->traceLevels();
195 QoS qos = origQoS;
196 if(traceLevels->topic > 0)
198 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
199 out << _name << ": subscribe: " << _instance->communicator()->identityToString(id);
201 if(traceLevels->topic > 1)
203 out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
204 << " QoS: ";
205 for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
207 if(p != qos.begin())
209 out << ',';
211 out << '[' << p->first << "," << p->second << ']';
216 string reliability = "oneway";
218 QoS::iterator p = qos.find("reliability");
219 if(p != qos.end())
221 reliability = p->second;
222 qos.erase(p);
226 Ice::ObjectPrx newObj = obj;
227 if(reliability == "batch")
229 if(newObj->ice_isDatagram())
231 newObj = newObj->ice_batchDatagram();
233 else
235 newObj = newObj->ice_batchOneway();
238 else if(reliability == "twoway")
240 newObj = newObj->ice_twoway();
242 else if(reliability == "twoway ordered")
244 qos["reliability"] = "ordered";
245 newObj = newObj->ice_twoway();
247 else // reliability == "oneway"
249 if(reliability != "oneway" && traceLevels->subscriber > 0)
251 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
252 out << reliability <<" mode not understood.";
254 if(!newObj->ice_isDatagram())
256 newObj = newObj->ice_oneway();
260 Lock sync(*this);
261 SubscriberRecord record;
262 record.id = id;
263 record.obj = newObj;
264 record.theQoS = qos;
265 record.topicName = _name;
266 record.link = false;
267 record.cost = 0;
269 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
270 if(p != _subscribers.end())
272 // If we already have this subscriber remove it from our
273 // subscriber list and remove it from the database.
274 (*p)->destroy();
275 _subscribers.erase(p);
278 SubscriberPtr subscriber = Subscriber::create(_instance, record);
279 _subscribers.push_back(subscriber);
282 Ice::ObjectPrx
283 TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&)
285 Ice::Identity id = obj->ice_getIdentity();
287 TraceLevelsPtr traceLevels = _instance->traceLevels();
288 if(traceLevels->topic > 0)
290 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
291 out << _name << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
293 if(traceLevels->topic > 1)
295 out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
296 << " QoS: ";
297 for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
299 if(p != qos.begin())
301 out << ',';
308 Lock sync(*this);
310 SubscriberRecord record;
311 record.id = id;
312 record.obj = obj;
313 record.theQoS = qos;
314 record.topicName = _name;
315 record.link = false;
316 record.cost = 0;
318 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
319 if(p != _subscribers.end())
321 throw AlreadySubscribed();
324 SubscriberPtr subscriber = Subscriber::create(_instance, record);
325 _subscribers.push_back(subscriber);
327 return subscriber->proxy();
330 void
331 TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
333 TraceLevelsPtr traceLevels = _instance->traceLevels();
334 if(!subscriber)
336 if(traceLevels->topic > 0)
338 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
339 out << "unsubscribe with null subscriber.";
341 return;
344 Ice::Identity id = subscriber->ice_getIdentity();
346 if(traceLevels->topic > 0)
348 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
349 out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
350 if(traceLevels->topic > 1)
352 out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
356 Lock sync(*this);
357 // First remove the subscriber from the subscribers list. Note
358 // that its possible that the subscriber isn't in the list, but is
359 // in the database if the subscriber was locally reaped.
360 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
361 if(p != _subscribers.end())
363 (*p)->destroy();
364 _subscribers.erase(p);
368 TopicLinkPrx
369 TransientTopicImpl::getLinkProxy(const Ice::Current&)
371 // immutable
372 return _linkPrx;
375 void
376 TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
378 TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
379 TopicLinkPrx link = internal->getLinkProxy();
381 TraceLevelsPtr traceLevels = _instance->traceLevels();
382 if(traceLevels->topic > 0)
384 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
385 out << _name << ": link " << _instance->communicator()->identityToString(topic->ice_getIdentity())
386 << " cost " << cost;
389 Lock sync(*this);
391 Ice::Identity id = topic->ice_getIdentity();
393 SubscriberRecord record;
394 record.id = id;
395 record.obj = link;
396 record.theTopic = topic;
397 record.topicName = _name;
398 record.link = true;
399 record.cost = cost;
401 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
402 if(p != _subscribers.end())
404 string name = identityToTopicName(id);
405 LinkExists ex;
406 ex.name = name;
407 throw ex;
410 SubscriberPtr subscriber = Subscriber::create(_instance, record);
411 _subscribers.push_back(subscriber);
414 void
415 TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
417 Lock sync(*this);
418 if(_destroyed)
420 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
423 Ice::Identity id = topic->ice_getIdentity();
425 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
426 if(p == _subscribers.end())
428 string name = identityToTopicName(id);
429 TraceLevelsPtr traceLevels = _instance->traceLevels();
430 if(traceLevels->topic > 0)
432 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
433 out << _name << ": unlink " << name << " failed - not linked";
436 NoSuchLink ex;
437 ex.name = name;
438 throw ex;
441 TraceLevelsPtr traceLevels = _instance->traceLevels();
442 if(traceLevels->topic > 0)
444 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
445 out << _name << " unlink " << _instance->communicator()->identityToString(id);
448 // Remove the subscriber from the subscribers list. Note
449 // that its possible that the subscriber isn't in the list, but is
450 // in the database if the subscriber was locally reaped.
451 p = find(_subscribers.begin(), _subscribers.end(), id);
452 if(p != _subscribers.end())
454 (*p)->destroy();
455 _subscribers.erase(p);
459 LinkInfoSeq
460 TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const
462 Lock sync(*this);
463 LinkInfoSeq seq;
464 for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
466 SubscriberRecord record = (*p)->record();
467 if(record.link && !(*p)->errored())
469 LinkInfo info;
470 info.name = identityToTopicName(record.theTopic->ice_getIdentity());
471 info.cost = record.cost;
472 info.theTopic = record.theTopic;
473 seq.push_back(info);
476 return seq;
479 void
480 TransientTopicImpl::destroy(const Ice::Current&)
482 Lock sync(*this);
484 if(_destroyed)
486 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
488 _destroyed = true;
490 TraceLevelsPtr traceLevels = _instance->traceLevels();
491 if(traceLevels->topic > 0)
493 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
494 out << _name << ": destroy";
499 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
500 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
502 catch(const Ice::ObjectAdapterDeactivatedException&)
504 // Ignore -- this could occur on shutdown.
507 // Destroy all of the subscribers.
508 for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
510 (*p)->destroy();
512 _subscribers.clear();
515 void
516 TransientTopicImpl::reap(const Ice::IdentitySeq&, const Ice::Current&)
520 bool
521 TransientTopicImpl::destroyed() const
523 Lock sync(*this);
524 return _destroyed;
527 Ice::Identity
528 TransientTopicImpl::id() const
530 // immutable
531 return _id;
534 void
535 TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
538 // Copy of the subscriber list so that event publishing can occur
539 // in parallel.
541 vector<SubscriberPtr> copy;
543 Lock sync(*this);
544 copy = _subscribers;
548 // Queue each event, gathering a list of those subscribers that
549 // must be reaped.
551 vector<Ice::Identity> e;
552 for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
554 if(!(*p)->queue(forwarded, events) && (*p)->reap())
556 e.push_back((*p)->id());
561 // Run through the error list removing those subscribers that are
562 // in error from the subscriber list.
564 if(!e.empty())
566 Lock sync(*this);
567 for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
570 // Its possible for the subscriber to already have been
571 // removed since the copy is iterated over outside of
572 // mutex protection.
574 // Note that although this could be quicker if we used a
575 // map, the most optimal case should be pushing around
576 // events not searching for a particular subscriber.
578 // The subscriber is immediately destroyed & removed from
579 // the _subscribers list. Add the subscriber to a list of
580 // error'd subscribers and remove it from the database on
581 // the next reap.
583 vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
584 if(q != _subscribers.end())
586 SubscriberPtr subscriber = *q;
588 // Destroy the subscriber.
590 subscriber->destroy();
591 _subscribers.erase(q);
597 void
598 TransientTopicImpl::shutdown()
600 Lock sync(*this);
602 // Shutdown each subscriber. This waits for the event queues to drain.
603 for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
605 (*p)->shutdown();