ICE 3.4.2
[php5-ice-freebsdport.git] / cpp / src / IceGrid / NodeI.cpp
blob964f4de9a3239d1add6cb231bd7d39b03971233f
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
10 #include <IceUtil/Timer.h>
11 #include <IceUtil/FileUtil.h>
12 #include <Ice/Ice.h>
13 #include <IcePatch2/Util.h>
14 #include <IcePatch2/ClientUtil.h>
15 #include <IceGrid/NodeI.h>
16 #include <IceGrid/Activator.h>
17 #include <IceGrid/ServerI.h>
18 #include <IceGrid/ServerAdapterI.h>
19 #include <IceGrid/Util.h>
20 #include <IceGrid/TraceLevels.h>
21 #include <IceGrid/NodeSessionManager.h>
23 using namespace std;
24 using namespace IcePatch2;
25 using namespace IceGrid;
27 namespace
30 class LogPatcherFeedback : public IcePatch2::PatcherFeedback
32 public:
34 LogPatcherFeedback(const TraceLevelsPtr& traceLevels, const string& dest) :
35 _traceLevels(traceLevels),
36 _startedPatch(false),
37 _lastProgress(0),
38 _dest(dest)
42 void
43 setPatchingPath(const string& path)
45 _path = path;
46 _startedPatch = false;
47 _lastProgress = 0;
50 virtual bool
51 noFileSummary(const string& reason)
53 if(_traceLevels->patch > 0)
55 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
56 out << _dest << ": can't load summary file (will perform a thorough patch):\n" << reason;
58 return true;
61 virtual bool
62 checksumStart()
64 if(_traceLevels->patch > 0)
66 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
67 out << _dest << ": started checksum calculation";
69 return true;
72 virtual bool
73 checksumProgress(const string& path)
75 if(_traceLevels->patch > 2)
77 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
78 out << _dest << ": calculating checksum for " << getBasename(path);
80 return true;
83 virtual bool
84 checksumEnd()
86 if(_traceLevels->patch > 0)
88 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
89 out << _dest << ": finished checksum calculation";
91 return true;
94 virtual bool
95 fileListStart()
97 if(_traceLevels->patch > 0)
99 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
100 out << _dest << ": getting list of file to patch";
102 return true;
105 virtual bool
106 fileListProgress(Ice::Int percent)
108 return true;
111 virtual bool
112 fileListEnd()
114 if(_traceLevels->patch > 0)
116 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
117 out << _dest << ": getting list of file to patch completed";
119 return true;
122 virtual bool
123 patchStart(const string& path, Ice::Long size, Ice::Long totalProgress, Ice::Long totalSize)
125 if(_traceLevels->patch > 1 && totalSize > (1024 * 1024))
127 int progress = static_cast<int>(static_cast<double>(totalProgress) / totalSize * 100.0);
128 progress /= 5;
129 progress *= 5;
130 if(progress != _lastProgress)
132 _lastProgress = progress;
133 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
134 out << _dest << ": downloaded " << progress << "% (" << totalProgress << '/' << totalSize << ')';
135 if(!_path.empty())
137 out << " of " << _path;
141 else if(_traceLevels->patch > 0)
143 if(!_startedPatch)
145 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
146 int roundedSize = static_cast<int>(static_cast<double>(totalSize) / 1024);
147 if(roundedSize == 0 && totalSize > 0)
149 roundedSize = 1;
151 out << _dest << ": downloading " << (_path.empty() ? string("") : (_path + " ")) << roundedSize
152 << "KB ";
153 _startedPatch = true;
157 return true;
160 virtual bool
161 patchProgress(Ice::Long progress, Ice::Long size, Ice::Long totalProgress, Ice::Long totalSize)
163 return true;
166 virtual bool
167 patchEnd()
169 return true;
172 void
173 finishPatch()
175 if(_traceLevels->patch > 0)
177 Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
178 out << _dest << ": downloading completed";
182 private:
184 const TraceLevelsPtr _traceLevels;
185 bool _startedPatch;
186 int _lastProgress;
187 string _path;
188 string _dest;
191 class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp
193 public:
195 NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) :
196 NodeI::Update(node, observer), _info(info)
200 virtual bool
201 send()
205 _observer->nodeUp_async(this, _info);
207 catch(const Ice::LocalException&)
209 return false;
211 return true;
214 virtual void
215 ice_response()
217 finished(true);
220 virtual void
221 ice_exception(const Ice::Exception&)
223 finished(false);
226 private:
228 NodeDynamicInfo _info;
231 class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer
233 public:
235 UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) :
236 NodeI::Update(node, observer), _info(info)
240 virtual bool
241 send()
245 _observer->updateServer_async(this, _node->getName(), _info);
247 catch(const Ice::LocalException&)
249 return false;
251 return true;
254 virtual void
255 ice_response()
257 finished(true);
260 virtual void
261 ice_exception(const Ice::Exception&)
263 finished(false);
266 private:
268 ServerDynamicInfo _info;
271 class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter
273 public:
275 UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) :
276 NodeI::Update(node, observer), _info(info)
280 virtual bool
281 send()
285 _observer->updateAdapter_async(this, _node->getName(), _info);
287 catch(const Ice::LocalException&)
289 return false;
291 return true;
294 virtual void
295 ice_response()
297 finished(true);
300 virtual void
301 ice_exception(const Ice::Exception&)
303 finished(false);
306 private:
308 AdapterDynamicInfo _info;
313 NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer)
317 NodeI::Update::~Update()
321 void
322 NodeI::Update::finished(bool success)
324 _node->dequeueUpdate(_observer, this, !success);
327 NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
328 NodeSessionManager& sessions,
329 const ActivatorPtr& activator,
330 const IceUtil::TimerPtr& timer,
331 const TraceLevelsPtr& traceLevels,
332 const NodePrx& proxy,
333 const string& name,
334 const UserAccountMapperPrx& mapper) :
335 _communicator(adapter->getCommunicator()),
336 _adapter(adapter),
337 _sessions(sessions),
338 _activator(activator),
339 _timer(timer),
340 _traceLevels(traceLevels),
341 _name(name),
342 _proxy(proxy),
343 _redirectErrToOut(false),
344 _allowEndpointsOverride(false),
345 _waitTime(0),
346 _userAccountMapper(mapper),
347 _platform("IceGrid.Node", _communicator, _traceLevels),
348 _fileCache(new FileCache(_communicator)),
349 _serial(1),
350 _consistencyCheckDone(false)
352 Ice::PropertiesPtr props = _communicator->getProperties();
354 const_cast<string&>(_dataDir) = _platform.getDataDir();
355 const_cast<string&>(_serversDir) = _dataDir + "/servers";
356 const_cast<string&>(_tmpDir) = _dataDir + "/tmp";
357 const_cast<string&>(_instanceName) = _communicator->getDefaultLocator()->ice_getIdentity().category;
358 const_cast<Ice::Int&>(_waitTime) = props->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60);
359 const_cast<string&>(_outputDir) = props->getProperty("IceGrid.Node.Output");
360 const_cast<bool&>(_redirectErrToOut) = props->getPropertyAsInt("IceGrid.Node.RedirectErrToOut") > 0;
361 const_cast<bool&>(_allowEndpointsOverride) = props->getPropertyAsInt("IceGrid.Node.AllowEndpointsOverride") > 0;
364 // Parse the properties override property.
366 vector<string> overrides = props->getPropertyAsList("IceGrid.Node.PropertiesOverride");
367 if(!overrides.empty())
369 for(vector<string>::iterator p = overrides.begin(); p != overrides.end(); ++p)
371 if(p->find("--") != 0)
373 *p = "--" + *p;
377 Ice::PropertiesPtr p = Ice::createProperties();
378 p->parseCommandLineOptions("", overrides);
379 Ice::PropertyDict propDict = p->getPropertiesForPrefix("");
380 for(Ice::PropertyDict::const_iterator q = propDict.begin(); q != propDict.end(); ++q)
382 _propertiesOverride.push_back(createProperty(q->first, q->second));
387 NodeI::~NodeI()
391 void
392 NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB,
393 const InternalServerDescriptorPtr& descriptor,
394 const string& replicaName,
395 const Ice::Current& current)
397 ServerCommandPtr command;
399 Lock sync(*this);
400 ++_serial;
402 Ice::Identity id = createServerIdentity(descriptor->id);
405 // Check if we already have a servant for this server. If that's
406 // the case, the server is already loaded and we just need to
407 // update it.
409 while(true)
411 bool added = false;
412 ServerIPtr server;
415 server = ServerIPtr::dynamicCast(_adapter->find(id));
416 if(!server)
418 ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id));
419 server = new ServerI(this, proxy, _serversDir, descriptor->id, _waitTime);
420 _adapter->add(server, id);
421 added = true;
424 catch(const Ice::ObjectAdapterDeactivatedException&)
427 // We throw an object not exist exception to avoid
428 // dispatch warnings. The registry will consider the
429 // node has being unreachable upon receival of this
430 // exception (like any other Ice::LocalException). We
431 // could also have disabled dispatch warnings but they
432 // can still useful to catch other issues.
434 throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, current.facet, current.operation);
439 command = server->load(amdCB, descriptor, replicaName);
441 catch(const Ice::ObjectNotExistException&)
443 assert(!added);
444 continue;
446 catch(const Ice::Exception&)
448 if(added)
452 _adapter->remove(id);
454 catch(const Ice::ObjectAdapterDeactivatedException&)
456 // IGNORE
459 throw;
461 break;
464 if(command)
466 command->execute();
470 void
471 NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB,
472 const string& serverId,
473 const string& uuid,
474 int revision,
475 const string& replicaName,
476 const Ice::Current& current)
478 ServerCommandPtr command;
480 Lock sync(*this);
481 ++_serial;
483 ServerIPtr server;
486 server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(serverId)));
488 catch(const Ice::ObjectAdapterDeactivatedException&)
491 // We throw an object not exist exception to avoid
492 // dispatch warnings. The registry will consider the node
493 // has being unreachable upon receival of this exception
494 // (like any other Ice::LocalException). We could also
495 // have disabled dispatch warnings but they can still
496 // useful to catch other issues.
498 throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, current.facet, current.operation);
501 if(!server)
503 server = new ServerI(this, 0, _serversDir, serverId, _waitTime);
507 // Destroy the server object if it's loaded.
511 command = server->destroy(amdCB, uuid, revision, replicaName);
513 catch(const Ice::ObjectNotExistException&)
515 amdCB->ice_response();
516 return;
519 if(command)
521 command->execute();
525 void
526 NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
527 const PatcherFeedbackPrx& feedback,
528 const string& application,
529 const string& server,
530 const InternalDistributionDescriptorPtr& appDistrib,
531 bool shutdown,
532 const Ice::Current&)
534 amdCB->ice_response();
537 Lock sync(*this);
538 while(_patchInProgress.find(application) != _patchInProgress.end())
540 wait();
542 _patchInProgress.insert(application);
546 set<ServerIPtr> servers;
547 bool patchApplication = !appDistrib->icepatch.empty();
548 if(server.empty())
551 // Patch all the servers from the application.
553 servers = getApplicationServers(application);
555 else
557 ServerIPtr svr;
560 svr = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(server)));
562 catch(const Ice::ObjectAdapterDeactivatedException&)
566 if(svr)
568 if(appDistrib->icepatch.empty() || !svr->dependsOnApplicationDistrib())
571 // Don't patch the application if the server doesn't
572 // depend on it.
574 patchApplication = false;
575 servers.insert(svr);
577 else
580 // If the server to patch depends on the application,
581 // we need to shutdown all the application servers
582 // that depend on the application.
584 servers = getApplicationServers(application);
589 set<ServerIPtr>::iterator s = servers.begin();
590 while(s != servers.end())
592 if(!appDistrib->icepatch.empty() && (*s)->dependsOnApplicationDistrib())
594 ++s;
596 else if((*s)->getDistribution() && (server.empty() || server == (*s)->getId()))
598 ++s;
600 else
603 // Exclude servers which don't depend on the application distribution
604 // or don't have a distribution.
606 servers.erase(s++);
610 string failure;
611 if(!servers.empty())
615 set<ServerIPtr>::iterator s = servers.begin();
616 vector<string> running;
617 while(s != servers.end())
621 if(!(*s)->startPatch(shutdown))
623 running.push_back((*s)->getId());
624 servers.erase(s++);
626 else
628 ++s;
631 catch(const Ice::ObjectNotExistException&)
633 servers.erase(s++);
637 if(!running.empty())
639 if(running.size() == 1)
641 throw "server `" + toString(running) + "' is active";
643 else
645 throw "servers `" + toString(running, ", ") + "' are active";
649 for(s = servers.begin(); s != servers.end(); ++s)
651 (*s)->waitForPatch();
655 // Patch the application.
657 FileServerPrx icepatch;
658 if(patchApplication)
660 assert(!appDistrib->icepatch.empty());
661 icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(appDistrib->icepatch));
662 if(!icepatch)
664 throw "proxy `" + appDistrib->icepatch + "' is not a file server.";
666 patch(icepatch, "distrib/" + application, appDistrib->directories);
670 // Patch the server(s).
672 for(s = servers.begin(); s != servers.end(); ++s)
674 InternalDistributionDescriptorPtr dist = (*s)->getDistribution();
675 if(dist && (server.empty() || (*s)->getId() == server))
677 icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(dist->icepatch));
678 if(!icepatch)
680 throw "proxy `" + dist->icepatch + "' is not a file server.";
682 patch(icepatch, "servers/" + (*s)->getId() + "/distrib", dist->directories);
684 if(!server.empty())
686 break; // No need to continue.
691 catch(const Ice::LocalException& e)
693 ostringstream os;
694 os << e;
695 failure = os.str();
697 catch(const string& e)
699 failure = e;
701 catch(const char* e)
703 failure = e;
706 for(set<ServerIPtr>::const_iterator s = servers.begin(); s != servers.end(); ++s)
708 (*s)->finishPatch();
713 Lock sync(*this);
714 _patchInProgress.erase(application);
715 notifyAll();
720 if(failure.empty())
722 feedback->finished();
724 else
726 feedback->failed(failure);
729 catch(const Ice::LocalException&)
734 void
735 NodeI::registerWithReplica(const InternalRegistryPrx& replica, const Ice::Current&)
737 _sessions.create(replica);
740 void
741 NodeI::replicaInit(const InternalRegistryPrxSeq& replicas, const Ice::Current&)
743 _sessions.replicaInit(replicas);
746 void
747 NodeI::replicaAdded(const InternalRegistryPrx& replica, const Ice::Current&)
749 _sessions.replicaAdded(replica);
752 void
753 NodeI::replicaRemoved(const InternalRegistryPrx& replica, const Ice::Current&)
755 _sessions.replicaRemoved(replica);
758 std::string
759 NodeI::getName(const Ice::Current&) const
761 return _name;
764 std::string
765 NodeI::getHostname(const Ice::Current&) const
767 return _platform.getHostname();
770 LoadInfo
771 NodeI::getLoad(const Ice::Current&) const
773 return _platform.getLoadInfo();
777 NodeI::getProcessorSocketCount(const Ice::Current&) const
779 return _platform.getProcessorSocketCount();
782 void
783 NodeI::shutdown(const Ice::Current&) const
785 _activator->shutdown();
788 Ice::Long
789 NodeI::getOffsetFromEnd(const string& filename, int count, const Ice::Current&) const
791 return _fileCache->getOffsetFromEnd(getFilePath(filename), count);
794 bool
795 NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos, Ice::StringSeq& lines,
796 const Ice::Current&) const
798 return _fileCache->read(getFilePath(filename), pos, size, newPos, lines);
801 void
802 NodeI::shutdown()
804 IceUtil::Mutex::Lock sync(_serversLock);
805 for(map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.begin();
806 p != _serversByApplication.end(); ++p)
808 for(set<ServerIPtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
810 (*q)->shutdown();
813 _serversByApplication.clear();
816 Ice::CommunicatorPtr
817 NodeI::getCommunicator() const
819 return _communicator;
822 Ice::ObjectAdapterPtr
823 NodeI::getAdapter() const
825 return _adapter;
828 ActivatorPtr
829 NodeI::getActivator() const
831 return _activator;
834 IceUtil::TimerPtr
835 NodeI::getTimer() const
837 return _timer;
840 TraceLevelsPtr
841 NodeI::getTraceLevels() const
843 return _traceLevels;
846 UserAccountMapperPrx
847 NodeI::getUserAccountMapper() const
849 return _userAccountMapper;
852 PlatformInfo&
853 NodeI::getPlatformInfo() const
855 return _platform;
858 FileCachePtr
859 NodeI::getFileCache() const
861 return _fileCache;
864 NodePrx
865 NodeI::getProxy() const
867 return _proxy;
870 const PropertyDescriptorSeq&
871 NodeI::getPropertiesOverride() const
873 return _propertiesOverride;
876 string
877 NodeI::getOutputDir() const
879 return _outputDir;
882 bool
883 NodeI::getRedirectErrToOut() const
885 return _redirectErrToOut;
888 bool
889 NodeI::allowEndpointsOverride() const
891 return _allowEndpointsOverride;
894 NodeSessionPrx
895 NodeI::registerWithRegistry(const InternalRegistryPrx& registry)
897 return registry->registerNode(_platform.getInternalNodeInfo(), _proxy, _platform.getLoadInfo());
900 void
901 NodeI::checkConsistency(const NodeSessionPrx& session)
904 // Only do the consistency check on the startup. This ensures that servers can't
905 // be removed by a bogus master when the master session is re-established.
907 if(_consistencyCheckDone)
909 return;
911 _consistencyCheckDone = true;
914 // We use a serial number to keep track of the concurrent changes
915 // on the node. When a server is loaded/destroyed the serial is
916 // incremented. This allows to ensure that the list of servers
917 // returned by the registry is consistent with the servers
918 // currently deployed on the node: if the serial didn't change
919 // after getting the list of servers from the registry, we have
920 // the accurate list of servers that should be deployed on the
921 // node.
923 unsigned long serial = 0;
924 Ice::StringSeq servers;
925 vector<ServerCommandPtr> commands;
926 while(true)
929 Lock sync(*this);
930 if(serial == _serial)
932 _serial = 1; // We can reset the serial number.
933 commands = checkConsistencyNoSync(servers);
934 break;
936 serial = _serial;
938 assert(session);
941 servers = session->getServers();
943 catch(const Ice::LocalException&)
945 return; // The connection with the session was lost.
947 sort(servers.begin(), servers.end());
950 for_each(commands.begin(), commands.end(), IceUtil::voidMemFun(&ServerCommand::execute));
953 void
954 NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observer)
956 IceUtil::Mutex::Lock sync(_observerMutex);
957 assert(_observers.find(session) == _observers.end());
958 _observers.insert(make_pair(session, observer));
960 _observerUpdates.erase(observer); // Remove any updates from the previous session.
962 ServerDynamicInfoSeq serverInfos;
963 AdapterDynamicInfoSeq adapterInfos;
964 for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin();
965 p != _serversDynamicInfo.end(); ++p)
967 assert(p->second.state != Destroyed && (p->second.state != Inactive || !p->second.enabled));
968 serverInfos.push_back(p->second);
971 for(map<string, AdapterDynamicInfo>::const_iterator q = _adaptersDynamicInfo.begin();
972 q != _adaptersDynamicInfo.end(); ++q)
974 assert(q->second.proxy);
975 adapterInfos.push_back(q->second);
978 NodeDynamicInfo info;
979 info.info = _platform.getNodeInfo();
980 info.servers = serverInfos;
981 info.adapters = adapterInfos;
982 queueUpdate(observer, new NodeUp(this, observer, info));
985 void
986 NodeI::removeObserver(const NodeSessionPrx& session)
988 IceUtil::Mutex::Lock sync(_observerMutex);
989 _observers.erase(session);
992 void
993 NodeI::observerUpdateServer(const ServerDynamicInfo& info)
995 IceUtil::Mutex::Lock sync(_observerMutex);
997 if(info.state == Destroyed || (info.state == Inactive && info.enabled))
999 _serversDynamicInfo.erase(info.id);
1001 else
1003 _serversDynamicInfo[info.id] = info;
1007 // Send the update and make sure we don't send the update twice to
1008 // the same observer (it's possible for the observer to be
1009 // registered twice if a replica is removed and added right away
1010 // after).
1012 set<NodeObserverPrx> sent;
1013 for(map<NodeSessionPrx, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
1015 if(sent.find(p->second) == sent.end())
1017 queueUpdate(p->second, new UpdateServer(this, p->second, info));
1022 void
1023 NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
1025 IceUtil::Mutex::Lock sync(_observerMutex);
1027 if(info.proxy)
1029 _adaptersDynamicInfo[info.id] = info;
1031 else
1033 _adaptersDynamicInfo.erase(info.id);
1037 // Send the update and make sure we don't send the update twice to
1038 // the same observer (it's possible for the observer to be
1039 // registered twice if a replica is removed and added right away
1040 // after).
1042 set<NodeObserverPrx> sent;
1043 for(map<NodeSessionPrx, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
1045 if(sent.find(p->second) == sent.end())
1047 queueUpdate(p->second, new UpdateAdapter(this, p->second, info));
1052 void
1053 NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update)
1055 //Lock sync(*this); Called within the synchronization
1056 map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
1057 if(p == _observerUpdates.end())
1059 if(update->send())
1061 _observerUpdates[proxy].push_back(update);
1064 else
1066 p->second.push_back(update);
1070 void
1071 NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all)
1073 IceUtil::Mutex::Lock sync(_observerMutex);
1074 map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
1075 if(p == _observerUpdates.end() || p->second.front().get() != update.get())
1077 return;
1080 p->second.pop_front();
1082 if(all || (!p->second.empty() && !p->second.front()->send()))
1084 p->second.clear();
1087 if(p->second.empty())
1089 _observerUpdates.erase(p);
1093 void
1094 NodeI::addServer(const ServerIPtr& server, const string& application)
1096 IceUtil::Mutex::Lock sync(_serversLock);
1097 map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
1098 if(p == _serversByApplication.end())
1100 map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>());
1101 p = _serversByApplication.insert(p, v);
1103 p->second.insert(server);
1106 void
1107 NodeI::removeServer(const ServerIPtr& server, const std::string& application)
1109 IceUtil::Mutex::Lock sync(_serversLock);
1110 map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
1111 if(p != _serversByApplication.end())
1113 p->second.erase(server);
1114 if(p->second.empty())
1116 _serversByApplication.erase(p);
1118 string appDir = _dataDir + "/distrib/" + application;
1119 if(IceUtilInternal::directoryExists(appDir))
1123 IcePatch2::removeRecursive(appDir);
1125 catch(const string& msg)
1127 Ice::Warning out(_traceLevels->logger);
1128 out << "removing application directory `" << appDir << "' failed:\n" << msg;
1135 Ice::Identity
1136 NodeI::createServerIdentity(const string& name) const
1138 Ice::Identity id;
1139 id.category = _instanceName + "-Server";
1140 id.name = name;
1141 return id;
1144 string
1145 NodeI::getServerAdminCategory() const
1147 return _instanceName + "-NodeRouter";
1150 vector<ServerCommandPtr>
1151 NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
1153 vector<ServerCommandPtr> commands;
1156 // Check if the servers directory doesn't contain more servers
1157 // than the registry really knows.
1159 Ice::StringSeq contents;
1162 contents = readDirectory(_serversDir);
1164 catch(const string& msg)
1166 Ice::Error out(_traceLevels->logger);
1167 out << "couldn't read directory `" << _serversDir << "':\n" << msg;
1168 return commands;
1171 vector<string> remove;
1172 set_difference(contents.begin(), contents.end(), servers.begin(), servers.end(), back_inserter(remove));
1175 // Remove the extra servers if possible.
1179 vector<string>::iterator p = remove.begin();
1180 while(p != remove.end())
1182 ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(*p)));
1183 if(server)
1186 // If the server is loaded, we invoke on it to destroy it.
1190 ServerCommandPtr command = server->destroy(0, "", 0, "Master");
1191 if(command)
1193 commands.push_back(command);
1195 p = remove.erase(p);
1196 continue;
1198 catch(const Ice::LocalException& ex)
1200 Ice::Error out(_traceLevels->logger);
1201 out << "server `" << *p << "' destroy failed:\n" << ex;
1203 catch(const string&)
1205 assert(false);
1211 if(canRemoveServerDirectory(*p))
1214 // If the server directory can be removed and we
1215 // either remove it or back it up before to remove it.
1217 removeRecursive(_serversDir + "/" + *p);
1218 p = remove.erase(p);
1219 continue;
1222 catch(const string& msg)
1224 Ice::Warning out(_traceLevels->logger);
1225 out << "removing server directory `" << _serversDir << "/" << *p << "' failed:\n" << msg;
1228 *p = _serversDir + "/" + *p;
1229 ++p;
1232 catch(const Ice::ObjectAdapterDeactivatedException&)
1235 // Just return the server commands, we'll finish the
1236 // consistency check next time the node is started.
1238 return commands;
1241 if(!remove.empty())
1243 Ice::Warning out(_traceLevels->logger);
1244 out << "server directories containing data not created or written by IceGrid were not removed:\n";
1245 out << toString(remove);
1247 return commands;
1250 NodeSessionPrx
1251 NodeI::getMasterNodeSession() const
1253 return _sessions.getMasterNodeSession();
1256 bool
1257 NodeI::canRemoveServerDirectory(const string& name)
1260 // Check if there's files which we didn't create.
1262 Ice::StringSeq c = readDirectory(_serversDir + "/" + name);
1263 set<string> contents(c.begin(), c.end());
1264 contents.erase("dbs");
1265 contents.erase("config");
1266 contents.erase("distrib");
1267 contents.erase("revision");
1268 if(!contents.empty())
1270 return false;
1273 c = readDirectory(_serversDir + "/" + name + "/config");
1274 Ice::StringSeq::const_iterator p;
1275 for(p = c.begin() ; p != c.end(); ++p)
1277 if(p->find("config") != 0)
1279 return false;
1283 c = readDirectory(_serversDir + "/" + name + "/dbs");
1284 for(p = c.begin() ; p != c.end(); ++p)
1288 Ice::StringSeq files = readDirectory(_serversDir + "/" + name + "/dbs/" + *p);
1289 files.erase(remove(files.begin(), files.end(), "DB_CONFIG"), files.end());
1290 files.erase(remove(files.begin(), files.end(), "__Freeze"), files.end());
1291 if(!files.empty())
1293 return false;
1296 catch(const string&)
1298 return false;
1302 return true;
1305 void
1306 NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<string>& directories)
1308 IcePatch2::PatcherFeedbackPtr feedback = new LogPatcherFeedback(_traceLevels, dest);
1309 IcePatch2::createDirectory(_dataDir + "/" + dest);
1310 PatcherPtr patcher = new Patcher(icepatch, feedback, _dataDir + "/" + dest, false, 100, 1);
1311 bool aborted = !patcher->prepare();
1312 if(!aborted)
1314 if(directories.empty())
1316 aborted = !patcher->patch("");
1317 dynamic_cast<LogPatcherFeedback*>(feedback.get())->finishPatch();
1319 else
1321 for(vector<string>::const_iterator p = directories.begin(); p != directories.end(); ++p)
1323 dynamic_cast<LogPatcherFeedback*>(feedback.get())->setPatchingPath(*p);
1324 if(!patcher->patch(*p))
1326 aborted = true;
1327 break;
1329 dynamic_cast<LogPatcherFeedback*>(feedback.get())->finishPatch();
1333 if(!aborted)
1335 patcher->finish();
1339 // Update the files owner/group
1343 set<ServerIPtr>
1344 NodeI::getApplicationServers(const string& application) const
1346 IceUtil::Mutex::Lock sync(_serversLock);
1347 set<ServerIPtr> servers;
1348 map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.find(application);
1349 if(p != _serversByApplication.end())
1351 servers = p->second;
1353 return servers;
1358 string
1359 NodeI::getFilePath(const string& filename) const
1361 string file;
1362 if(filename == "stderr")
1364 file = _communicator->getProperties()->getProperty("Ice.StdErr");
1365 if(file.empty())
1367 throw FileNotAvailableException("Ice.StdErr configuration property is not set");
1370 else if(filename == "stdout")
1372 file = _communicator->getProperties()->getProperty("Ice.StdOut");
1373 if(file.empty())
1375 throw FileNotAvailableException("Ice.StdOut configuration property is not set");
1378 else
1380 throw FileNotAvailableException("unknown file");
1382 return file;