Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / TAO / orbsvcs / ImplRepo_Service / Shared_Backing_Store.cpp
blob29b71f9529ac564279e194696ea079f6339ea574
1 #include "orbsvcs/Log_Macros.h"
2 #include "Shared_Backing_Store.h"
3 #include "Server_Info.h"
4 #include "Activator_Info.h"
5 #include "AsyncAccessManager.h"
6 #include "ImR_Locator_i.h"
7 #include "utils.h"
8 #include "LiveCheck.h"
9 #include "Locator_XMLHandler.h"
10 #include "ImR_LocatorC.h"
11 #include "ace/File_Lock.h"
12 #include "ace/OS_NS_stdio.h"
13 #include "ace/OS_NS_strings.h"
14 #include "ace/OS_NS_ctype.h"
15 #include "ace/OS_NS_unistd.h"
16 #include "ACEXML/parser/parser/Parser.h"
17 #include "ACEXML/common/FileCharStream.h"
18 #include "ACEXML/common/XML_Util.h"
19 #include "tao/IORManipulation/IORManip_Loader.h"
20 #include "tao/ORB_Core.h"
22 namespace {
23 class Lockable_File
25 public:
26 Lockable_File ()
27 : file_lock_ (),
28 file_ (0),
29 flags_ (0),
30 locked_ (false),
31 unlink_in_destructor_ (false),
32 filename_ ()
36 Lockable_File (const ACE_TString& file,
37 const int flags,
38 bool unlink_in_destructor = false)
39 : file_lock_ (),
40 file_(0),
41 flags_(0),
42 locked_(false),
43 unlink_in_destructor_(false),
44 filename_ ()
46 init_fl(file, flags, unlink_in_destructor);
49 ~Lockable_File ()
51 release ();
54 void release ()
56 if (this->file_ != 0)
58 close_file ();
59 this->file_lock_.reset ();
61 this->locked_ = false;
64 FILE* get_file ()
66 lock ();
68 return this->file_;
71 FILE* get_file (const ACE_TString& file,
72 const int flags,
73 bool unlink_in_destructor = false)
75 init_fl (file, flags, unlink_in_destructor);
76 return get_file ();
79 private:
80 void init_fl (const ACE_TString& file,
81 const int flags,
82 bool unlink_in_destructor = false)
84 release ();
85 errno = 0;
86 flags_ = flags | O_CREAT;
87 unlink_in_destructor_ = unlink_in_destructor;
89 const ACE_TCHAR* const flags_str =
90 ((flags_ & O_RDWR) != 0) ? ACE_TEXT ("r+") :
91 (((flags_ & O_WRONLY) != 0) ? ACE_TEXT ("w") : ACE_TEXT ("r"));
92 this->filename_ = file;
93 #if defined (ACE_WIN32)
94 this->file_ = ACE_OS::fopen (file.c_str(), flags_str);
95 #else
96 this->file_lock_.reset
97 (new ACE_File_Lock (file.c_str (),
98 flags_, 0666, unlink_in_destructor));
100 // Truncating output so this will not allow reading then writing
101 ACE_OS::ftruncate (this->file_lock_->get_handle (), 0);
102 this->file_ = ACE_OS::fdopen (this->file_lock_->get_handle (), flags_str);
103 #endif
106 void close_file ()
108 if (this->file_ == 0)
109 return;
111 ACE_OS::fflush (this->file_);
112 ACE_OS::fclose (this->file_);
113 this->file_ = 0;
114 #if defined (ACE_WIN32)
115 if (this->unlink_in_destructor_)
117 ACE_OS::unlink (this->filename_.c_str ());
118 this->unlink_in_destructor_ = false;
120 #endif
123 void lock ()
125 #if !defined (ACE_WIN32)
126 if (this->locked_)
127 return;
129 if (file_lock_.get () == 0)
131 ORBSVCS_ERROR ((LM_ERROR,
132 ACE_TEXT("(%P|%t) ERROR: Shared_Backing_Store ")
133 ACE_TEXT("attempting to lock ")
134 ACE_TEXT ("an uninitialized Lockable_File.\n")));
135 this->locked_ = false;
136 return;
139 int res = -1;
140 if ((this->flags_ & O_RDWR) != 0)
142 res = file_lock_->acquire();
144 else if ((this->flags_ & O_WRONLY) != 0)
146 res = file_lock_->acquire_write();
148 else
150 res = file_lock_->acquire_read();
153 this->locked_ = res == 0;
155 if (!this->locked_)
157 ORBSVCS_DEBUG ((LM_DEBUG,
158 ACE_TEXT("(%P|%t) Shared_Backing_Store ")
159 ACE_TEXT("failed to acquire lock\n")));
161 #endif
164 std::unique_ptr <ACE_File_Lock> file_lock_;
165 FILE* file_;
166 int flags_;
167 bool locked_;
168 bool unlink_in_destructor_;
169 ACE_TString filename_;
170 }; // class Lockable_File
173 void
174 create_uid (const Options::ImrType repo_type,
175 const unsigned int repo_id,
176 Shared_Backing_Store::UniqueId &id)
178 id.repo_id = repo_id;
179 id.repo_type = repo_type;
180 switch (repo_type)
182 case Options::BACKUP_IMR:
183 id.repo_type_str = ACE_TEXT ("0");
184 break;
185 case Options::PRIMARY_IMR:
186 id.repo_type_str = ACE_TEXT ("1");
187 break;
188 case Options::STANDALONE_IMR:
189 id.repo_type_str = ACE_TEXT ("2");
192 ACE_TCHAR tmp[20];
193 ACE_OS::itoa(repo_id, tmp, 10);
195 id.repo_id_str = tmp;
196 id.unique_filename = id.repo_type_str + ACE_TEXT ("_") +
197 id.repo_id_str + ACE_TEXT (".xml");
200 void write_listing_item (FILE* list,
201 const ACE_TString& fname,
202 const ACE_CString& name,
203 const ACE_TCHAR* tag)
205 ACE_OS::fprintf (list, "\t<%s", ACE_TEXT_ALWAYS_CHAR (tag));
206 ACE_OS::fprintf (list, " fname=\"%s\"", ACE_TEXT_ALWAYS_CHAR (fname.c_str ()));
207 ACE_OS::fprintf (list, " name=\"%s\" />\n", name.c_str ());
210 } // End anonymous namespace
212 //---------------------------------------------------------------------------
214 Shared_Backing_Store::Shared_Backing_Store(const Options& opts,
215 CORBA::ORB_ptr orb,
216 ImR_Locator_i *loc_impl)
217 : XML_Backing_Store (opts, orb, true),
218 listing_file_ (opts.persist_file_name() + ACE_TEXT("imr_listing.xml")),
219 imr_type_ (opts.imr_type ()),
220 sync_needed_ (NO_SYNC),
221 sync_files_ (),
222 non_ft_imr_ior_ (),
223 server_uids_ (),
224 activator_uids_ (),
225 repo_id_ (1),
226 repo_values_ (2),
227 loc_impl_ (loc_impl),
228 /* sync_lock_ (), */
229 replicator_ (*this, opts),
230 updates_ (10),
231 notified_ (false),
232 update_handler_ (this)
234 IMR_REPLICA[Options::PRIMARY_IMR] = "ImR_ReplicaPrimary";
235 IMR_REPLICA[Options::BACKUP_IMR] = "ImR_ReplicaBackup";
236 IMR_REPLICA[Options::STANDALONE_IMR] = "ImR_NoReplica";
238 this->repo_values_[REPO_TYPE] =
239 std::make_pair(ACE_CString("repo_type"),
240 ACE_CString());
241 this->repo_values_[REPO_ID] =
242 std::make_pair(ACE_CString("repo_id"),
243 ACE_CString());
246 Shared_Backing_Store::~Shared_Backing_Store()
250 void
251 Shared_Backing_Store::shutdown ()
253 this->replicator_.stop ();
254 this->replicator_.wait ();
257 void
258 Shared_Backing_Store::bind_unique_id (const ACE_CString& key,
259 UniqueIdMap& unique_ids,
260 const UniqueId& uid)
262 unique_ids.bind (key, uid);
265 void
266 Shared_Backing_Store::find_unique_id (const ACE_CString& key,
267 UniqueIdMap &unique_ids,
268 UniqueId &uid)
270 if (unique_ids.find (key, uid) != 0)
272 const unsigned int repo_id = this->repo_id_++;
273 ::create_uid (this->imr_type_, repo_id, uid);
274 this->bind_unique_id (key, unique_ids, uid);
278 void
279 Shared_Backing_Store::update_unique_id (const ACE_CString &key,
280 UniqueIdMap& unique_ids,
281 Options::ImrType& entry_repo_type,
282 unsigned int& entry_repo_id,
283 UniqueId& uid)
285 UniqueId temp_id;
286 const bool found = (unique_ids.find (key, temp_id) == 0);
288 ::create_uid (entry_repo_type, entry_repo_id, uid);
289 this->bind_unique_id (key, unique_ids, uid);
291 if (entry_repo_id == 0)
293 // if no repo id provided, treat it like it came from this repo
294 entry_repo_id = this->repo_id_++;
295 entry_repo_type = this->imr_type_;
297 else if (found)
299 if (entry_repo_id != uid.repo_id &&
300 entry_repo_type != uid.repo_type)
302 // if already existed, replace the contents
303 ORBSVCS_ERROR ((LM_ERROR,
304 ACE_TEXT ("(%P|%t) ERROR: replacing name = %C with ")
305 ACE_TEXT ("existing repo_id = %d and imr_type = %d, ")
306 ACE_TEXT ("with repo_id = %d and imr_type = %d\n"),
307 key.c_str(), uid.repo_id, uid.repo_type,
308 entry_repo_id, entry_repo_type));
312 if (entry_repo_type == this->imr_type_ && entry_repo_id >= this->repo_id_)
314 // persisting existing entries for this repo, so move the repo_id past
315 // the entries id
316 this->repo_id_ = entry_repo_id + 1;
320 void
321 Shared_Backing_Store::verify_unique_id (const ACE_CString& key,
322 const XML_Backing_Store::NameValues& extra_params,
323 UniqueIdMap& unique_ids)
325 size_t const size = extra_params.size();
326 if ((size != 2) && (this->opts_.debug() > 4))
328 ORBSVCS_ERROR((
329 LM_ERROR,
330 ACE_TEXT("(%P|%t) Persisted server id=%C name=%C doesn't have all ")
331 ACE_TEXT("unique id params. (%d of 2)\n"),
332 size));
335 unsigned int repo_id = 0;
336 // default to this repo
337 Options::ImrType repo_type = this->imr_type_;
339 if ((size > Shared_Backing_Store::REPO_TYPE) &&
340 (extra_params[Shared_Backing_Store::REPO_TYPE].first ==
341 this->repo_values_[Shared_Backing_Store::REPO_TYPE].first))
343 repo_type =
344 (Options::ImrType)ACE_OS::atoi(extra_params[Shared_Backing_Store::REPO_TYPE].second.c_str());
346 if ((size > Shared_Backing_Store::REPO_ID) &&
347 (extra_params[Shared_Backing_Store::REPO_ID].first ==
348 this->repo_values_[Shared_Backing_Store::REPO_ID].first))
350 repo_id =
351 ACE_OS::atoi(extra_params[Shared_Backing_Store::REPO_ID].second.c_str());
353 else
355 ORBSVCS_ERROR((LM_ERROR,
356 ACE_TEXT("(%P|%t) Persisted name = %C did not supply a repo_id\n"),
357 key.c_str()));
360 UniqueId uid;
361 update_unique_id (key, unique_ids, repo_type, repo_id, uid);
365 Shared_Backing_Store::persistent_remove (const ACE_CString& name,
366 bool activator)
368 Lockable_File listing_lf;
369 int err = this->persist_listings (listing_lf);
370 if (err != 0)
372 return err;
374 Shared_Backing_Store::UniqueId uid;
375 err = activator ?
376 this->activator_uids_.find (name, uid) :
377 this->server_uids_.find (name, uid);
378 if (err != 0)
380 ORBSVCS_ERROR((LM_ERROR,
381 ACE_TEXT("(%P|%t) Couldn't find unique repo id for name = %C\n"),
382 name.c_str()));
383 return err;
386 const ACE_TString fname = this->filename_ + uid.unique_filename;
388 // take the lock, then remove the file
389 Lockable_File file(fname, O_WRONLY, true);
391 listing_lf.release();
393 ImplementationRepository::UpdateInfo info;
394 info.name = CORBA::string_dup (name.c_str ());
395 info.action.kind (activator ?
396 ImplementationRepository::repo_activator :
397 ImplementationRepository::repo_server);
398 this->replicator_.send_entity (info);
399 return 0;
403 Shared_Backing_Store::persistent_update (const Server_Info_Ptr& info, bool add)
405 Lockable_File listing_lf;
406 if (add)
408 const int err = this->persist_listings (listing_lf);
409 if (err != 0)
411 return err;
415 ACE_TString name = ACEXML_escape_string (ACE_TEXT_CHAR_TO_TCHAR (info->key_name_.c_str ()));
417 UniqueId uid;
418 this->find_unique_id (info->key_name_, this->server_uids_, uid);
420 const ACE_TString fname = this->filename_ + uid.unique_filename;
421 if (this->opts_.debug() > 9)
423 ORBSVCS_DEBUG((LM_INFO, ACE_TEXT ("(%P|%t) Persisting server to %s(%C)\n"),
424 fname.c_str(), info->key_name_.c_str()));
426 Lockable_File server_file (fname, O_WRONLY);
427 const ACE_TString bfname = fname + ACE_TEXT (".bak");
428 FILE* fp = server_file.get_file();
429 if (fp == 0)
431 ORBSVCS_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Couldn't write to file %s\n"),
432 fname.c_str()));
433 //return -1;
435 // successful added file (if adding), so release the listing file lock
436 listing_lf.release();
437 if (fp != 0)
439 ACE_OS::fprintf (fp,"<?xml version=\"1.0\"?>\n");
441 this->repo_values_[REPO_TYPE].second = ACE_TEXT_ALWAYS_CHAR (uid.repo_type_str.c_str ());
442 this->repo_values_[REPO_ID].second = ACE_TEXT_ALWAYS_CHAR (uid.repo_id_str.c_str ());
444 this->persist (fp, *info, "", this->repo_values_);
446 // Copy the current file to a backup.
447 FILE* bfp = ACE_OS::fopen (bfname.c_str(),ACE_TEXT("w"));
448 ACE_OS::fprintf (bfp,"<?xml version=\"1.0\"?>\n");
449 this->persist (bfp, *info, "", this->repo_values_);
450 ACE_OS::fflush (bfp);
451 ACE_OS::fclose (bfp);
453 server_file.release ();
455 ImplementationRepository::UpdateInfo entity;
456 entity.name = static_cast<const char *> (ACE_TEXT_ALWAYS_CHAR (name.c_str ()));
457 ImplementationRepository::RepoInfo rinfo;
458 rinfo.kind = ImplementationRepository::repo_server;
459 rinfo.repo.repo_id = uid.repo_id;
460 rinfo.repo.repo_type = uid.repo_type;
461 entity.action.info (rinfo);
462 this->replicator_.send_entity (entity);
464 return 0;
469 Shared_Backing_Store::persistent_update(const Activator_Info_Ptr& info, bool add)
471 Lockable_File listing_lf;
472 if (add)
474 const int err = this->persist_listings (listing_lf);
475 if (err != 0)
477 return err;
481 ACE_CString name = lcase (info->name);
483 UniqueId uid;
484 this->find_unique_id (name, this->activator_uids_,uid);
486 const ACE_TString fname = this->filename_ + uid.unique_filename;
487 if (this->opts_.debug() > 9)
489 ORBSVCS_DEBUG ((LM_INFO,
490 ACE_TEXT ("(%P|%t) Persisting activator to %s(%C)\n"),
491 fname.c_str(), info->name.c_str()));
493 Lockable_File activator_file (fname, O_WRONLY);
494 const ACE_TString bfname = fname + ACE_TEXT (".bak");
495 FILE* fp = activator_file.get_file();
496 if (fp == 0)
498 ORBSVCS_ERROR ((LM_ERROR,
499 ACE_TEXT ("(%P|%t) Couldn't write to file %s\n"),
500 fname.c_str()));
501 return -1;
503 // successfully added file (if adding), so release the listing file lock
504 listing_lf.release ();
505 ACE_OS::fprintf (fp, "<?xml version=\"1.0\"?>\n");
507 this->repo_values_[REPO_TYPE].second = ACE_TEXT_ALWAYS_CHAR (uid.repo_type_str.c_str ());
508 this->repo_values_[REPO_ID].second = ACE_TEXT_ALWAYS_CHAR (uid.repo_id_str.c_str ());
510 persist (fp, *info, "", this->repo_values_);
512 // Copy the current file to a backup.
513 FILE* bfp = ACE_OS::fopen (bfname.c_str(),ACE_TEXT("w+"));
514 ACE_OS::fprintf (bfp,"<?xml version=\"1.0\"?>\n");
515 persist (bfp, *info, "", this->repo_values_);
516 ACE_OS::fflush (bfp);
517 ACE_OS::fclose (bfp);
518 activator_file.release ();
520 ImplementationRepository::UpdateInfo entity;
521 entity.name = CORBA::string_dup (name.c_str ());
522 ImplementationRepository::RepoInfo rinfo;
523 rinfo.kind = ImplementationRepository::repo_activator;
524 rinfo.repo.repo_id = uid.repo_id;
525 rinfo.repo.repo_type = uid.repo_type;
526 entity.action.info (rinfo);
527 this->replicator_.send_entity (entity);
529 return 0;
532 const ACE_TCHAR*
533 Shared_Backing_Store::repo_mode() const
535 return this->listing_file_.c_str();
539 Shared_Backing_Store::connect_replicas ()
541 ACE_CString replica_ior_file = this->replica_ior_filename (true);
542 bool was_running = this->replicator_.init_peer (replica_ior_file);
544 // Check if a peer IOR is defined
545 if (replicator_.peer_available ())
547 return replicator_.send_registration (this->imr_ior_.inout());
549 if (this->imr_type_ == Options::BACKUP_IMR)
550 { // We are a backup IMR Locator
552 // If the primary has started at some point in the past, but is
553 // not available right now, then we will assume that we are in
554 // a restart situation where the backup is being started while
555 // the primary is still down. This implies that a successful
556 // start of the replication pair has been made in the past and
557 // we can use the combined ior from the previous run.
558 if (was_running)
560 // Verify that we recovered the IOR successfully. If we did not
561 // then fail startup of the backup IMR Locator.
562 if (this->recover_ior () == -1)
563 ORBSVCS_ERROR_RETURN ((LM_ERROR,
564 ACE_TEXT("Error: Unable to retrieve IOR from combined IOR ")
565 ACE_TEXT ("file: %C\n"),
566 replica_ior_file.c_str()),
567 -1);
569 else
570 { // There has been a startup error. The backup can only be started
571 // after the primary has been successfully started.
572 ORBSVCS_ERROR_RETURN ((LM_ERROR,
573 ACE_TEXT("Error: Primary has not been started previously.\n ")
574 ACE_TEXT ("file: %C\n"),
575 replica_ior_file.c_str()),
576 -1);
580 // For either primary or backup - no connection currently, just wait for peer to start
581 return 0;
585 Shared_Backing_Store::init_repo(PortableServer::POA_ptr)
587 this->non_ft_imr_ior_ = this->imr_ior_;
589 if (this->imr_type_ != Options::STANDALONE_IMR)
591 this->replicator_.init_orb ();
592 this->replicator_.activate ();
593 this->connect_replicas ();
596 // only start the repo clean if no replica is running
597 if (this->opts_.repository_erase() &&
598 !this->replicator_.peer_available ())
600 Lockable_File listing_lf;
601 const XMLHandler_Ptr listings = get_listings(listing_lf, false);
602 if (listings.null())
604 if (this->opts_.debug() > 9)
606 ORBSVCS_DEBUG((LM_INFO,
607 ACE_TEXT ("(%P|%t) Persisted Repository already empty\n")));
610 else
612 const ACE_Vector<ACE_TString>& filenames = listings->filenames();
613 size_t const sz = filenames.size ();
614 for (CORBA::ULong i = 0; i < sz; ++i)
616 if (this->opts_.debug() > 9)
618 ORBSVCS_DEBUG((LM_INFO, ACE_TEXT ("(%P|%t) Removing %s\n"),
619 filenames[i].c_str()));
621 ACE_OS::unlink ( filenames[i].c_str () );
624 if (this->opts_.debug() > 9)
626 ORBSVCS_DEBUG((LM_INFO, ACE_TEXT ("(%P|%t) Removing %s\n"),
627 this->listing_file_.c_str()));
629 ACE_OS::unlink ( this->listing_file_.c_str () );
633 // Ignore persistent_load return since files don't have to exist
634 this->persistent_load (false);
636 if (this->opts_.debug() > 9)
638 ORBSVCS_DEBUG((LM_INFO,
639 ACE_TEXT ("(%P|%t) ImR Repository initialized\n")));
642 return 0;
646 Shared_Backing_Store::persistent_load (bool only_changes)
648 Lockable_File listing_lf;
649 const XMLHandler_Ptr listings = this->get_listings (listing_lf, only_changes);
650 if (listings.null())
652 // failed to retrieve listings
653 return -1;
656 if (only_changes)
658 listings->remove_unmatched (*this);
661 const ACE_Vector<ACE_TString>& filenames = listings->filenames ();
662 size_t const sz = filenames.size ();
663 if (this->opts_.debug() > 9)
665 ORBSVCS_DEBUG((LM_INFO, ACE_TEXT ("(%P|%t) persistent_load %d files\n"), sz));
667 for (CORBA::ULong i = 0; i < sz; ++i)
669 const ACE_TString& fname = filenames[i];
670 Lockable_File file(fname, O_RDONLY);
672 if (this->load_file (fname, file.get_file()) != 0)
674 this->load_file (fname + ACE_TEXT (".bak"));
678 return 0;
681 Shared_Backing_Store::XMLHandler_Ptr
682 Shared_Backing_Store::get_listings (Lockable_File& listing_lf,
683 bool only_changes) const
685 LocatorListings_XMLHandler *raw_xml = 0;
686 if (only_changes)
688 ACE_NEW_RETURN (raw_xml,
689 LocatorListings_XMLHandler (this->filename_,
690 servers(),
691 activators()),
692 XMLHandler_Ptr());
694 else
696 ACE_NEW_RETURN (raw_xml,
697 LocatorListings_XMLHandler (this->filename_),
698 XMLHandler_Ptr());
701 XMLHandler_Ptr listings_handler (raw_xml);
703 if (this->load_file (this->listing_file_,
704 *listings_handler,
705 this->opts_.debug(),
706 listing_lf.get_file (this->listing_file_, O_RDONLY)) != 0)
708 if (this->load_file (this->listing_file_ + ACE_TEXT (".bak"),
709 *listings_handler,
710 this->opts_.debug()) != 0)
712 listings_handler.reset();
716 return listings_handler;
720 Shared_Backing_Store::sync_load ()
722 int err = 0;
723 if (this->sync_needed_ == FULL_SYNC)
725 if (this->opts_.debug() > 5)
727 ORBSVCS_DEBUG((LM_INFO,
728 ACE_TEXT("(%P|%t) sync_load FULL_SYNC\n")));
730 err = this->persistent_load (false);
732 else if (this->sync_needed_ == INC_SYNC)
734 if (this->sync_files_.empty ())
736 return 0;
738 if (this->opts_.debug() > 5)
740 ORBSVCS_DEBUG((LM_INFO,
741 ACE_TEXT("(%P|%t) sync_load INC_SYNC, %d files\n"),
742 this->sync_files_.size ()));
745 // ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->sync_lock_, -1);
747 std::set<ACE_TString>::const_iterator fname = this->sync_files_.begin();
748 for ( ; fname != this->sync_files_.end(); ++fname)
750 if (this->opts_.debug() > 6)
752 ORBSVCS_DEBUG((LM_INFO,
753 ACE_TEXT("(%P|%t) sync_load %s\n"),
754 fname->c_str()));
756 Lockable_File file (*fname, O_RDONLY);
757 int ind_err = this->load_file (*fname, file.get_file());
758 if (ind_err != 0)
760 err = ind_err;
763 this->sync_files_.clear();
766 this->sync_needed_ = NO_SYNC;
767 return err;
770 void
771 Shared_Backing_Store::write_listing (FILE* list)
773 ACE_OS::fprintf (list,"<?xml version=\"1.0\"?>\n");
774 ACE_OS::fprintf (list,"<ImRListing>\n");
775 UniqueId uid;
777 // Save servers
778 Locator_Repository::SIMap::ENTRY* sientry = 0;
779 Locator_Repository::SIMap::CONST_ITERATOR siit (this->servers ());
780 for (; siit.next (sientry); siit.advance() )
782 const Server_Info_Ptr& info = sientry->int_id_;
784 this->find_unique_id (sientry->ext_id_, this->server_uids_, uid);
785 ACE_TString listing_name = ACEXML_escape_string (ACE_TEXT_CHAR_TO_TCHAR (info->key_name_.c_str ()));
786 ::write_listing_item (list, uid.unique_filename, ACE_TEXT_ALWAYS_CHAR (listing_name.c_str ()),
787 Locator_XMLHandler::SERVER_INFO_TAG);
790 // Save Activators
791 Locator_Repository::AIMap::ENTRY* aientry = 0;
792 Locator_Repository::AIMap::CONST_ITERATOR aiit (this->activators ());
793 for (; aiit.next (aientry); aiit.advance ())
795 const ACE_CString& aname = aientry->ext_id_;
796 this->find_unique_id (aname, this->activator_uids_, uid);
797 ::write_listing_item (list, uid.unique_filename, aname,
798 Locator_XMLHandler::ACTIVATOR_INFO_TAG);
801 ACE_OS::fprintf (list,"</ImRListing>\n");
805 Shared_Backing_Store::persist_listings (Lockable_File& listing_lf)
807 // ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->sync_lock_, -1);
808 FILE* list = listing_lf.get_file (this->listing_file_, O_WRONLY);
809 if (list == 0)
811 ORBSVCS_ERROR ((LM_ERROR,
812 ACE_TEXT ("Couldn't write to file %s\n"),
813 this->listing_file_.c_str()));
814 return -1;
817 this->write_listing (list);
819 const ACE_TString bfname = this->listing_file_ + ACE_TEXT (".bak");
821 // Write backup file
822 FILE* baklist = ACE_OS::fopen (bfname.c_str(), ACE_TEXT("w"));
823 if (baklist == 0)
825 ORBSVCS_ERROR ((LM_ERROR,
826 ACE_TEXT ("Couldn't write to file %s\n"),
827 bfname.c_str()));
828 return -1;
831 this->write_listing (baklist);
832 ACE_OS::fflush (baklist);
833 ACE_OS::fclose (baklist);
835 return 0;
839 Shared_Backing_Store::report_ior(PortableServer::POA_ptr imr_poa)
841 if (this->imr_type_ == Options::STANDALONE_IMR)
843 return Locator_Repository::report_ior(imr_poa);
846 CORBA::Object_var obj = this->orb_->resolve_initial_references ("AsyncIORTable");
847 IORTable::Table_var ior_table = IORTable::Table::_narrow (obj.in ());
848 ACE_ASSERT (! CORBA::is_nil (ior_table.in ()));
850 const char* const replica_name(IMR_REPLICA[this->imr_type_]);
851 ACE_CString replica_filename = replica_ior_filename(false);
852 FILE* fp = ACE_OS::fopen (replica_filename.c_str (), "w");
853 if (fp == 0)
855 ORBSVCS_ERROR_RETURN ((LM_ERROR,
856 ACE_TEXT ("(%P|%t) ImR: Could not open file: %C\n"),
857 replica_filename.c_str ()), -1);
860 CORBA::String_var replica_ior = this->replicator_.ior ();
861 ior_table->bind(replica_name, replica_ior.in());
862 ACE_OS::fprintf (fp, "%s", replica_ior.in ());
863 ACE_OS::fclose (fp);
865 int err = 0;
866 // only report the imr ior if the fault tolerant ImR is complete
867 if (this->replicator_.peer_available ())
869 err = Locator_Repository::report_ior(imr_poa);
872 return err;
875 char*
876 Shared_Backing_Store::locator_service_ior (const char* peer_ior) const
878 const CORBA::Object_ptr this_obj =
879 this->orb_->string_to_object(this->non_ft_imr_ior_.in ());
880 const CORBA::Object_ptr peer_obj =
881 this->orb_->string_to_object(peer_ior);
882 const CORBA::Object_ptr& obj1 =
883 (this->imr_type_ == Options::PRIMARY_IMR) ? this_obj : peer_obj;
884 const CORBA::Object_ptr& obj2 =
885 (this->imr_type_ != Options::PRIMARY_IMR) ? this_obj : peer_obj;
887 CORBA::Object_var IORM =
888 this->orb_->resolve_initial_references (TAO_OBJID_IORMANIPULATION, 0);
890 TAO_IOP::TAO_IOR_Manipulation_var iorm =
891 TAO_IOP::TAO_IOR_Manipulation::_narrow (IORM.in ());
893 CORBA::Object_var locator_service = iorm->add_profiles (obj1, obj2);
895 char* const combined_ior =
896 this->orb_->object_to_string(locator_service.in ());
897 return combined_ior;
900 void
901 Shared_Backing_Store::load_server (Server_Info *info,
902 bool server_started,
903 const NameValues& extra_params)
905 if (this->opts_.debug() > 4)
907 ORBSVCS_DEBUG ((LM_DEBUG,
908 ACE_TEXT ("(%P|%t) loading server <%C>\n"),
909 info->key_name_.c_str ()));
912 // Ensure there is an entry for this server
913 this->verify_unique_id (info->key_name_,
914 extra_params,
915 this->server_uids_);
916 Server_Info_Ptr si;
917 if (this->servers ().find (info->key_name_, si) != 0)
919 // Create new or replace the existing entry
920 XML_Backing_Store::load_server (info, server_started, extra_params);
921 return;
924 bool is_started = info->is_running ();
925 bool was_started = si->is_running ();
927 *si.get () = *info;
928 delete info;
930 if (!server_started)
932 si->server = ImplementationRepository::ServerObject::_nil();
934 else
936 // will create a new server below if no previous server
937 // or the ior has changed
938 server_started = CORBA::is_nil(si->server.in ());
940 this->create_server (server_started, si);
941 if (was_started && !is_started)
943 this->opts_.pinger ()->remove_server (si->key_name_.c_str (), 0);
945 if (!was_started && is_started)
947 CORBA::Object_var obj =
948 this->loc_impl_->set_timeout_policy (si->server.in (),
949 ACE_Time_Value (1,0));
950 si->server = ImplementationRepository::ServerObject::_narrow (obj.in ());
951 this->opts_.pinger ()->add_server (si->key_name_.c_str (),
952 this->opts_.ping_external (),
953 si->server.in (),
954 si->pid);
959 void
960 Shared_Backing_Store::load_activator (const ACE_CString& activator_name,
961 long token,
962 const ACE_CString& ior,
963 const NameValues& extra_params)
965 // use this to make sure an unique id entry is created
966 this->verify_unique_id (activator_name,
967 extra_params,
968 this->activator_uids_);
969 XML_Backing_Store::load_activator (activator_name, token, ior, extra_params);
972 void
973 Shared_Backing_Store::notify_remote_access (const char * id,
974 ImplementationRepository::AAM_Status s)
976 this->replicator_.send_access_state (id, s);
979 void
980 Shared_Backing_Store::updates_available
981 (const ImplementationRepository::UpdateInfoSeq& info, bool seq_gap)
983 // ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->sync_lock_);
984 CORBA::Long len = this->updates_.length ();
985 this->updates_.length (len + info.length () + (seq_gap ? 1 : 0));
986 if (seq_gap)
988 bool found = false;
989 for (CORBA::Long i = 0; !found && i < len; i++)
991 if (this->updates_[i].action._d () == ImplementationRepository::repo_update)
993 this->updates_[i].action.info ().repo.repo_id = -1;
994 this->updates_[i].action.info ().repo.repo_type = -1;
995 found = true;
998 if (!found)
1000 ImplementationRepository::RepoInfo rinfo;
1001 rinfo.kind = ImplementationRepository::repo_server;
1002 rinfo.repo.repo_id = -1;
1003 rinfo.repo.repo_type = -1;
1004 this->updates_[len++].action.info (rinfo);
1007 for (CORBA::ULong i = 0; i < info.length (); i++)
1009 if (info[i].action._d () == ImplementationRepository::access || !seq_gap)
1011 this->updates_[len++] = info[i];
1014 this->updates_.length (len);
1016 if (this->notified_)
1017 return;
1018 this->notified_ = true;
1019 this->orb_->orb_core ()->reactor ()->notify (&this->update_handler_);
1023 Shared_Backing_Store::Update_Handler::handle_exception (ACE_HANDLE)
1025 this->owner_->process_updates ();
1026 return 0;
1029 void
1030 Shared_Backing_Store::process_updates ()
1032 // ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->sync_lock_);
1033 this->notified_ = false;
1034 this->sync_needed_ = NO_SYNC;
1035 for (CORBA::ULong i = 0; i < this->updates_.length (); i++)
1037 ImplementationRepository::UpdateInfo &entity = this->updates_[i];
1038 switch (entity.action._d ())
1040 case ImplementationRepository::access:
1042 if (this->opts_.debug() > 4)
1044 ORBSVCS_DEBUG ((LM_INFO,
1045 ACE_TEXT("(%P|%t) notify_access_state_update, <%C> now <%C>\n"),
1046 entity.name.in (),
1047 AsyncAccessManager::status_name (entity.action.state ())));
1049 this->loc_impl_->remote_access_update (entity.name.in (),
1050 entity.action.state ());
1051 break;
1053 case ImplementationRepository::repo_update:
1055 if (this->sync_needed_ == FULL_SYNC)
1057 continue;
1059 if (entity.action.info().repo.repo_id == -1)
1061 this->sync_needed_ = FULL_SYNC;
1062 this->sync_files_.clear();
1063 continue;
1065 this->sync_needed_ = INC_SYNC;
1066 const ACE_CString name = entity.name.in ();
1067 Options::ImrType repo_type = (Options::ImrType)entity.action.info().repo.repo_type;
1068 unsigned int repo_id = entity.action.info().repo.repo_id;
1069 UniqueId uid;
1070 update_unique_id (name,
1071 entity.action.info().kind == ImplementationRepository::repo_activator ?
1072 this->activator_uids_ :
1073 this->server_uids_,
1074 repo_type, repo_id, uid);
1075 const ACE_TString fname = this->filename_ + uid.unique_filename;
1076 this->sync_files_.insert (fname);
1077 break;
1079 case ImplementationRepository::repo_remove:
1081 const ACE_CString name = entity.name.in ();
1082 if (entity.action.kind() == ImplementationRepository::repo_activator)
1084 this->activators().unbind (name);
1086 else
1088 this->opts_.pinger ()->remove_server (name.c_str(), 0);
1089 this->servers().unbind (name);
1091 break;
1095 this->updates_.length (0);
1096 // mon.release ();
1097 this->sync_load ();
1100 void
1101 Shared_Backing_Store::gen_ior (char*& ft_imr_ior)
1103 // store off original char* to ensure memory cleanup
1104 CORBA::String_var ior = ft_imr_ior;
1106 // if we already have the fault tolerant ImR ior
1107 // then just copy it
1108 if (registered())
1110 if (this->opts_.debug() > 2)
1112 ORBSVCS_DEBUG ((LM_INFO,
1113 ACE_TEXT("(%P|%t) Already registered <%C>\n"),
1114 this->imr_ior_.in()));
1116 // make a copy
1117 ior = this->imr_ior_.in();
1118 // handoff memory
1119 ft_imr_ior = ior._retn();
1120 return;
1123 // otherwise we need to combine the primary and backup ior to make
1124 // the fault tolerant ImR ior
1125 char* combined_ior = 0;
1126 CORBA::String_var reason;
1129 combined_ior = locator_service_ior(ft_imr_ior);
1131 catch (const TAO_IOP::Invalid_IOR& )
1133 reason = "invalid ior";
1135 catch (const TAO_IOP::EmptyProfileList& )
1137 reason = "no profiles";
1139 catch (const TAO_IOP::Duplicate& )
1141 reason = "duplicate profile";
1144 if (combined_ior == 0)
1146 // give back the original pointer and don't clean it up
1147 ft_imr_ior = ior._retn();
1148 ORBSVCS_ERROR((LM_ERROR,
1149 "ERROR: Failed to create Fault Tolerant ImR, reason=%C\n",
1150 reason.in()));
1151 throw ImplementationRepository::InvalidPeer(reason.in());
1154 ft_imr_ior = combined_ior;
1155 // pass as const char* to make sure string is copied
1156 this->imr_ior_ = (const char*)ft_imr_ior;
1158 PortableServer::POA_var null_poa;
1159 Locator_Repository::report_ior(null_poa.in ());
1162 ACE_CString
1163 Shared_Backing_Store::replica_ior_filename(bool peer_ior_file) const
1165 Options::ImrType desired_type = this->imr_type_;
1166 if (peer_ior_file)
1168 desired_type = (desired_type == Options::PRIMARY_IMR) ?
1169 Options::BACKUP_IMR :
1170 Options::PRIMARY_IMR;
1172 ACE_CString ior = ACE_TEXT_ALWAYS_CHAR (this->filename_.c_str());
1173 ior += IMR_REPLICA[desired_type];
1174 ior += ".ior";
1176 return ior;
1179 Shared_Backing_Store::LocatorListings_XMLHandler::LocatorListings_XMLHandler(
1180 const ACE_TString& dir)
1181 : dir_(dir),
1182 only_changes_(false)
1186 Shared_Backing_Store::LocatorListings_XMLHandler::LocatorListings_XMLHandler(
1187 const ACE_TString& dir,
1188 const Locator_Repository::SIMap& servers,
1189 const Locator_Repository::AIMap& activators)
1190 : dir_(dir),
1191 only_changes_(true)
1193 Locator_Repository::SIMap::ENTRY* sientry = 0;
1194 Locator_Repository::SIMap::CONST_ITERATOR siit (servers);
1195 for (; siit.next (sientry); siit.advance() )
1197 unmatched_servers_.bind (sientry->ext_id_, sientry->int_id_);
1200 Locator_Repository::AIMap::ENTRY* aientry = 0;
1201 Locator_Repository::AIMap::CONST_ITERATOR aiit (activators);
1202 for (; aiit.next (aientry); aiit.advance() )
1204 unmatched_activators_.bind (aientry->ext_id_, aientry->int_id_);
1208 void
1209 Shared_Backing_Store::LocatorListings_XMLHandler::startElement (
1210 const ACEXML_Char* ,
1211 const ACEXML_Char* ,
1212 const ACEXML_Char* qName,
1213 ACEXML_Attributes* attrs)
1215 const bool server =
1216 (ACE_OS::strcasecmp (qName, Locator_XMLHandler::SERVER_INFO_TAG) == 0);
1217 if (!server &&
1218 (ACE_OS::strcasecmp (qName, Locator_XMLHandler::ACTIVATOR_INFO_TAG) != 0))
1220 return;
1223 if (attrs != 0 && attrs->getLength () == 2)
1225 ACE_TString fname = attrs->getValue ((size_t)0);
1226 bool store_fname = !only_changes_;
1227 if (only_changes_)
1229 ACE_CString name = ACE_TEXT_ALWAYS_CHAR(attrs->getValue ((size_t)1));
1230 // if the name is not present, then this is an add, so store it
1231 store_fname = server ?
1232 (unmatched_servers_.unbind (name) != 0) :
1233 (unmatched_activators_.unbind (name) != 0);
1236 if (store_fname)
1238 filenames_.push_back(dir_ + fname);
1241 else
1243 ORBSVCS_DEBUG(( LM_DEBUG,
1244 ACE_TEXT ("LocatorListings_XMLHandler::startElement ")
1245 ACE_TEXT ("incorrect number of attrs, %d not 2\n"),
1246 attrs == 0 ? 0 : attrs->getLength ()));
1250 void
1251 Shared_Backing_Store::LocatorListings_XMLHandler::endElement (
1252 const ACEXML_Char* ,
1253 const ACEXML_Char* ,
1254 const ACEXML_Char* )
1258 void
1259 Shared_Backing_Store::LocatorListings_XMLHandler::remove_unmatched(
1260 Locator_Repository& repo)
1262 Locator_Repository::SIMap::ENTRY* sientry = 0;
1263 Locator_Repository::SIMap::CONST_ITERATOR siit (this->unmatched_servers_);
1264 for (; siit.next (sientry); siit.advance() )
1266 int const ret = repo.servers().unbind (sientry->ext_id_);
1267 if (ret != 0)
1269 ORBSVCS_ERROR((LM_ERROR,
1270 ACE_TEXT ("ERROR: could not remove server: %C\n"),
1271 sientry->int_id_->key_name_.c_str()));
1275 Locator_Repository::AIMap::ENTRY* aientry = 0;
1276 Locator_Repository::AIMap::CONST_ITERATOR aiit (this->unmatched_activators_);
1277 for (; aiit.next (aientry); aiit.advance ())
1279 int ret = repo.activators().unbind (aientry->ext_id_);
1280 if (ret != 0)
1282 ORBSVCS_ERROR((LM_ERROR,
1283 ACE_TEXT ("ERROR: could not remove activator: %C\n"),
1284 aientry->int_id_->name.c_str()));
1289 const ACE_Vector<ACE_TString>&
1290 Shared_Backing_Store::LocatorListings_XMLHandler::filenames() const
1292 return this->filenames_;