1 #include "orbsvcs/Log_Macros.h"
2 #include "Shared_Backing_Store.h"
3 #include "Server_Info.h"
4 #include "Activator_Info.h"
5 #include "AsyncAccessManager.h"
6 #include "ImR_Locator_i.h"
9 #include "Locator_XMLHandler.h"
10 #include "ImR_LocatorC.h"
11 #include "ace/File_Lock.h"
12 #include "ace/OS_NS_stdio.h"
13 #include "ace/OS_NS_strings.h"
14 #include "ace/OS_NS_ctype.h"
15 #include "ace/OS_NS_unistd.h"
16 #include "ACEXML/parser/parser/Parser.h"
17 #include "ACEXML/common/FileCharStream.h"
18 #include "ACEXML/common/XML_Util.h"
19 #include "tao/IORManipulation/IORManip_Loader.h"
20 #include "tao/ORB_Core.h"
31 unlink_in_destructor_ (false),
36 Lockable_File (const ACE_TString
& file
,
38 bool unlink_in_destructor
= false)
43 unlink_in_destructor_(false),
46 init_fl(file
, flags
, unlink_in_destructor
);
59 this->file_lock_
.reset ();
61 this->locked_
= false;
71 FILE* get_file (const ACE_TString
& file
,
73 bool unlink_in_destructor
= false)
75 init_fl (file
, flags
, unlink_in_destructor
);
80 void init_fl (const ACE_TString
& file
,
82 bool unlink_in_destructor
= false)
86 flags_
= flags
| O_CREAT
;
87 unlink_in_destructor_
= unlink_in_destructor
;
89 const ACE_TCHAR
* const flags_str
=
90 ((flags_
& O_RDWR
) != 0) ? ACE_TEXT ("r+") :
91 (((flags_
& O_WRONLY
) != 0) ? ACE_TEXT ("w") : ACE_TEXT ("r"));
92 this->filename_
= file
;
93 #if defined (ACE_WIN32)
94 this->file_
= ACE_OS::fopen (file
.c_str(), flags_str
);
96 this->file_lock_
.reset
97 (new ACE_File_Lock (file
.c_str (),
98 flags_
, 0666, unlink_in_destructor
));
100 // Truncating output so this will not allow reading then writing
101 ACE_OS::ftruncate (this->file_lock_
->get_handle (), 0);
102 this->file_
= ACE_OS::fdopen (this->file_lock_
->get_handle (), flags_str
);
108 if (this->file_
== 0)
111 ACE_OS::fflush (this->file_
);
112 ACE_OS::fclose (this->file_
);
114 #if defined (ACE_WIN32)
115 if (this->unlink_in_destructor_
)
117 ACE_OS::unlink (this->filename_
.c_str ());
118 this->unlink_in_destructor_
= false;
125 #if !defined (ACE_WIN32)
129 if (file_lock_
.get () == 0)
131 ORBSVCS_ERROR ((LM_ERROR
,
132 ACE_TEXT("(%P|%t) ERROR: Shared_Backing_Store ")
133 ACE_TEXT("attempting to lock ")
134 ACE_TEXT ("an uninitialized Lockable_File.\n")));
135 this->locked_
= false;
140 if ((this->flags_
& O_RDWR
) != 0)
142 res
= file_lock_
->acquire();
144 else if ((this->flags_
& O_WRONLY
) != 0)
146 res
= file_lock_
->acquire_write();
150 res
= file_lock_
->acquire_read();
153 this->locked_
= res
== 0;
157 ORBSVCS_DEBUG ((LM_DEBUG
,
158 ACE_TEXT("(%P|%t) Shared_Backing_Store ")
159 ACE_TEXT("failed to acquire lock\n")));
164 std::unique_ptr
<ACE_File_Lock
> file_lock_
;
168 bool unlink_in_destructor_
;
169 ACE_TString filename_
;
170 }; // class Lockable_File
174 create_uid (const Options::ImrType repo_type
,
175 const unsigned int repo_id
,
176 Shared_Backing_Store::UniqueId
&id
)
178 id
.repo_id
= repo_id
;
179 id
.repo_type
= repo_type
;
182 case Options::BACKUP_IMR
:
183 id
.repo_type_str
= ACE_TEXT ("0");
185 case Options::PRIMARY_IMR
:
186 id
.repo_type_str
= ACE_TEXT ("1");
188 case Options::STANDALONE_IMR
:
189 id
.repo_type_str
= ACE_TEXT ("2");
193 ACE_OS::itoa(repo_id
, tmp
, 10);
195 id
.repo_id_str
= tmp
;
196 id
.unique_filename
= id
.repo_type_str
+ ACE_TEXT ("_") +
197 id
.repo_id_str
+ ACE_TEXT (".xml");
200 void write_listing_item (FILE* list
,
201 const ACE_TString
& fname
,
202 const ACE_CString
& name
,
203 const ACE_TCHAR
* tag
)
205 ACE_OS::fprintf (list
, "\t<%s", ACE_TEXT_ALWAYS_CHAR (tag
));
206 ACE_OS::fprintf (list
, " fname=\"%s\"", ACE_TEXT_ALWAYS_CHAR (fname
.c_str ()));
207 ACE_OS::fprintf (list
, " name=\"%s\" />\n", name
.c_str ());
210 } // End anonymous namespace
212 //---------------------------------------------------------------------------
214 Shared_Backing_Store::Shared_Backing_Store(const Options
& opts
,
216 ImR_Locator_i
*loc_impl
)
217 : XML_Backing_Store (opts
, orb
, true),
218 listing_file_ (opts
.persist_file_name() + ACE_TEXT("imr_listing.xml")),
219 imr_type_ (opts
.imr_type ()),
220 sync_needed_ (NO_SYNC
),
227 loc_impl_ (loc_impl
),
229 replicator_ (*this, opts
),
232 update_handler_ (this)
234 IMR_REPLICA
[Options::PRIMARY_IMR
] = "ImR_ReplicaPrimary";
235 IMR_REPLICA
[Options::BACKUP_IMR
] = "ImR_ReplicaBackup";
236 IMR_REPLICA
[Options::STANDALONE_IMR
] = "ImR_NoReplica";
238 this->repo_values_
[REPO_TYPE
] =
239 std::make_pair(ACE_CString("repo_type"),
241 this->repo_values_
[REPO_ID
] =
242 std::make_pair(ACE_CString("repo_id"),
246 Shared_Backing_Store::~Shared_Backing_Store()
251 Shared_Backing_Store::shutdown ()
253 this->replicator_
.stop ();
254 this->replicator_
.wait ();
258 Shared_Backing_Store::bind_unique_id (const ACE_CString
& key
,
259 UniqueIdMap
& unique_ids
,
262 unique_ids
.bind (key
, uid
);
266 Shared_Backing_Store::find_unique_id (const ACE_CString
& key
,
267 UniqueIdMap
&unique_ids
,
270 if (unique_ids
.find (key
, uid
) != 0)
272 const unsigned int repo_id
= this->repo_id_
++;
273 ::create_uid (this->imr_type_
, repo_id
, uid
);
274 this->bind_unique_id (key
, unique_ids
, uid
);
279 Shared_Backing_Store::update_unique_id (const ACE_CString
&key
,
280 UniqueIdMap
& unique_ids
,
281 Options::ImrType
& entry_repo_type
,
282 unsigned int& entry_repo_id
,
286 const bool found
= (unique_ids
.find (key
, temp_id
) == 0);
288 ::create_uid (entry_repo_type
, entry_repo_id
, uid
);
289 this->bind_unique_id (key
, unique_ids
, uid
);
291 if (entry_repo_id
== 0)
293 // if no repo id provided, treat it like it came from this repo
294 entry_repo_id
= this->repo_id_
++;
295 entry_repo_type
= this->imr_type_
;
299 if (entry_repo_id
!= uid
.repo_id
&&
300 entry_repo_type
!= uid
.repo_type
)
302 // if already existed, replace the contents
303 ORBSVCS_ERROR ((LM_ERROR
,
304 ACE_TEXT ("(%P|%t) ERROR: replacing name = %C with ")
305 ACE_TEXT ("existing repo_id = %d and imr_type = %d, ")
306 ACE_TEXT ("with repo_id = %d and imr_type = %d\n"),
307 key
.c_str(), uid
.repo_id
, uid
.repo_type
,
308 entry_repo_id
, entry_repo_type
));
312 if (entry_repo_type
== this->imr_type_
&& entry_repo_id
>= this->repo_id_
)
314 // persisting existing entries for this repo, so move the repo_id past
316 this->repo_id_
= entry_repo_id
+ 1;
321 Shared_Backing_Store::verify_unique_id (const ACE_CString
& key
,
322 const XML_Backing_Store::NameValues
& extra_params
,
323 UniqueIdMap
& unique_ids
)
325 size_t const size
= extra_params
.size();
326 if ((size
!= 2) && (this->opts_
.debug() > 4))
330 ACE_TEXT("(%P|%t) Persisted server id=%C name=%C doesn't have all ")
331 ACE_TEXT("unique id params. (%d of 2)\n"),
335 unsigned int repo_id
= 0;
336 // default to this repo
337 Options::ImrType repo_type
= this->imr_type_
;
339 if ((size
> Shared_Backing_Store::REPO_TYPE
) &&
340 (extra_params
[Shared_Backing_Store::REPO_TYPE
].first
==
341 this->repo_values_
[Shared_Backing_Store::REPO_TYPE
].first
))
344 (Options::ImrType
)ACE_OS::atoi(extra_params
[Shared_Backing_Store::REPO_TYPE
].second
.c_str());
346 if ((size
> Shared_Backing_Store::REPO_ID
) &&
347 (extra_params
[Shared_Backing_Store::REPO_ID
].first
==
348 this->repo_values_
[Shared_Backing_Store::REPO_ID
].first
))
351 ACE_OS::atoi(extra_params
[Shared_Backing_Store::REPO_ID
].second
.c_str());
355 ORBSVCS_ERROR((LM_ERROR
,
356 ACE_TEXT("(%P|%t) Persisted name = %C did not supply a repo_id\n"),
361 update_unique_id (key
, unique_ids
, repo_type
, repo_id
, uid
);
365 Shared_Backing_Store::persistent_remove (const ACE_CString
& name
,
368 Lockable_File listing_lf
;
369 int err
= this->persist_listings (listing_lf
);
374 Shared_Backing_Store::UniqueId uid
;
376 this->activator_uids_
.find (name
, uid
) :
377 this->server_uids_
.find (name
, uid
);
380 ORBSVCS_ERROR((LM_ERROR
,
381 ACE_TEXT("(%P|%t) Couldn't find unique repo id for name = %C\n"),
386 const ACE_TString fname
= this->filename_
+ uid
.unique_filename
;
388 // take the lock, then remove the file
389 Lockable_File
file(fname
, O_WRONLY
, true);
391 listing_lf
.release();
393 ImplementationRepository::UpdateInfo info
;
394 info
.name
= CORBA::string_dup (name
.c_str ());
395 info
.action
.kind (activator
?
396 ImplementationRepository::repo_activator
:
397 ImplementationRepository::repo_server
);
398 this->replicator_
.send_entity (info
);
403 Shared_Backing_Store::persistent_update (const Server_Info_Ptr
& info
, bool add
)
405 Lockable_File listing_lf
;
408 const int err
= this->persist_listings (listing_lf
);
415 ACE_TString name
= ACEXML_escape_string (ACE_TEXT_CHAR_TO_TCHAR (info
->key_name_
.c_str ()));
418 this->find_unique_id (info
->key_name_
, this->server_uids_
, uid
);
420 const ACE_TString fname
= this->filename_
+ uid
.unique_filename
;
421 if (this->opts_
.debug() > 9)
423 ORBSVCS_DEBUG((LM_INFO
, ACE_TEXT ("(%P|%t) Persisting server to %s(%C)\n"),
424 fname
.c_str(), info
->key_name_
.c_str()));
426 Lockable_File
server_file (fname
, O_WRONLY
);
427 const ACE_TString bfname
= fname
+ ACE_TEXT (".bak");
428 FILE* fp
= server_file
.get_file();
431 ORBSVCS_ERROR ((LM_ERROR
, ACE_TEXT ("(%P|%t) Couldn't write to file %s\n"),
435 // successful added file (if adding), so release the listing file lock
436 listing_lf
.release();
439 ACE_OS::fprintf (fp
,"<?xml version=\"1.0\"?>\n");
441 this->repo_values_
[REPO_TYPE
].second
= ACE_TEXT_ALWAYS_CHAR (uid
.repo_type_str
.c_str ());
442 this->repo_values_
[REPO_ID
].second
= ACE_TEXT_ALWAYS_CHAR (uid
.repo_id_str
.c_str ());
444 this->persist (fp
, *info
, "", this->repo_values_
);
446 // Copy the current file to a backup.
447 FILE* bfp
= ACE_OS::fopen (bfname
.c_str(),ACE_TEXT("w"));
448 ACE_OS::fprintf (bfp
,"<?xml version=\"1.0\"?>\n");
449 this->persist (bfp
, *info
, "", this->repo_values_
);
450 ACE_OS::fflush (bfp
);
451 ACE_OS::fclose (bfp
);
453 server_file
.release ();
455 ImplementationRepository::UpdateInfo entity
;
456 entity
.name
= static_cast<const char *> (ACE_TEXT_ALWAYS_CHAR (name
.c_str ()));
457 ImplementationRepository::RepoInfo rinfo
;
458 rinfo
.kind
= ImplementationRepository::repo_server
;
459 rinfo
.repo
.repo_id
= uid
.repo_id
;
460 rinfo
.repo
.repo_type
= uid
.repo_type
;
461 entity
.action
.info (rinfo
);
462 this->replicator_
.send_entity (entity
);
469 Shared_Backing_Store::persistent_update(const Activator_Info_Ptr
& info
, bool add
)
471 Lockable_File listing_lf
;
474 const int err
= this->persist_listings (listing_lf
);
481 ACE_CString name
= lcase (info
->name
);
484 this->find_unique_id (name
, this->activator_uids_
,uid
);
486 const ACE_TString fname
= this->filename_
+ uid
.unique_filename
;
487 if (this->opts_
.debug() > 9)
489 ORBSVCS_DEBUG ((LM_INFO
,
490 ACE_TEXT ("(%P|%t) Persisting activator to %s(%C)\n"),
491 fname
.c_str(), info
->name
.c_str()));
493 Lockable_File
activator_file (fname
, O_WRONLY
);
494 const ACE_TString bfname
= fname
+ ACE_TEXT (".bak");
495 FILE* fp
= activator_file
.get_file();
498 ORBSVCS_ERROR ((LM_ERROR
,
499 ACE_TEXT ("(%P|%t) Couldn't write to file %s\n"),
503 // successfully added file (if adding), so release the listing file lock
504 listing_lf
.release ();
505 ACE_OS::fprintf (fp
, "<?xml version=\"1.0\"?>\n");
507 this->repo_values_
[REPO_TYPE
].second
= ACE_TEXT_ALWAYS_CHAR (uid
.repo_type_str
.c_str ());
508 this->repo_values_
[REPO_ID
].second
= ACE_TEXT_ALWAYS_CHAR (uid
.repo_id_str
.c_str ());
510 persist (fp
, *info
, "", this->repo_values_
);
512 // Copy the current file to a backup.
513 FILE* bfp
= ACE_OS::fopen (bfname
.c_str(),ACE_TEXT("w+"));
514 ACE_OS::fprintf (bfp
,"<?xml version=\"1.0\"?>\n");
515 persist (bfp
, *info
, "", this->repo_values_
);
516 ACE_OS::fflush (bfp
);
517 ACE_OS::fclose (bfp
);
518 activator_file
.release ();
520 ImplementationRepository::UpdateInfo entity
;
521 entity
.name
= CORBA::string_dup (name
.c_str ());
522 ImplementationRepository::RepoInfo rinfo
;
523 rinfo
.kind
= ImplementationRepository::repo_activator
;
524 rinfo
.repo
.repo_id
= uid
.repo_id
;
525 rinfo
.repo
.repo_type
= uid
.repo_type
;
526 entity
.action
.info (rinfo
);
527 this->replicator_
.send_entity (entity
);
533 Shared_Backing_Store::repo_mode() const
535 return this->listing_file_
.c_str();
539 Shared_Backing_Store::connect_replicas ()
541 ACE_CString replica_ior_file
= this->replica_ior_filename (true);
542 bool was_running
= this->replicator_
.init_peer (replica_ior_file
);
544 // Check if a peer IOR is defined
545 if (replicator_
.peer_available ())
547 return replicator_
.send_registration (this->imr_ior_
.inout());
549 if (this->imr_type_
== Options::BACKUP_IMR
)
550 { // We are a backup IMR Locator
552 // If the primary has started at some point in the past, but is
553 // not available right now, then we will assume that we are in
554 // a restart situation where the backup is being started while
555 // the primary is still down. This implies that a successful
556 // start of the replication pair has been made in the past and
557 // we can use the combined ior from the previous run.
560 // Verify that we recovered the IOR successfully. If we did not
561 // then fail startup of the backup IMR Locator.
562 if (this->recover_ior () == -1)
563 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
564 ACE_TEXT("Error: Unable to retrieve IOR from combined IOR ")
565 ACE_TEXT ("file: %C\n"),
566 replica_ior_file
.c_str()),
570 { // There has been a startup error. The backup can only be started
571 // after the primary has been successfully started.
572 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
573 ACE_TEXT("Error: Primary has not been started previously.\n ")
574 ACE_TEXT ("file: %C\n"),
575 replica_ior_file
.c_str()),
580 // For either primary or backup - no connection currently, just wait for peer to start
585 Shared_Backing_Store::init_repo(PortableServer::POA_ptr
)
587 this->non_ft_imr_ior_
= this->imr_ior_
;
589 if (this->imr_type_
!= Options::STANDALONE_IMR
)
591 this->replicator_
.init_orb ();
592 this->replicator_
.activate ();
593 this->connect_replicas ();
596 // only start the repo clean if no replica is running
597 if (this->opts_
.repository_erase() &&
598 !this->replicator_
.peer_available ())
600 Lockable_File listing_lf
;
601 const XMLHandler_Ptr listings
= get_listings(listing_lf
, false);
604 if (this->opts_
.debug() > 9)
606 ORBSVCS_DEBUG((LM_INFO
,
607 ACE_TEXT ("(%P|%t) Persisted Repository already empty\n")));
612 const ACE_Vector
<ACE_TString
>& filenames
= listings
->filenames();
613 size_t const sz
= filenames
.size ();
614 for (CORBA::ULong i
= 0; i
< sz
; ++i
)
616 if (this->opts_
.debug() > 9)
618 ORBSVCS_DEBUG((LM_INFO
, ACE_TEXT ("(%P|%t) Removing %s\n"),
619 filenames
[i
].c_str()));
621 ACE_OS::unlink ( filenames
[i
].c_str () );
624 if (this->opts_
.debug() > 9)
626 ORBSVCS_DEBUG((LM_INFO
, ACE_TEXT ("(%P|%t) Removing %s\n"),
627 this->listing_file_
.c_str()));
629 ACE_OS::unlink ( this->listing_file_
.c_str () );
633 // Ignore persistent_load return since files don't have to exist
634 this->persistent_load (false);
636 if (this->opts_
.debug() > 9)
638 ORBSVCS_DEBUG((LM_INFO
,
639 ACE_TEXT ("(%P|%t) ImR Repository initialized\n")));
646 Shared_Backing_Store::persistent_load (bool only_changes
)
648 Lockable_File listing_lf
;
649 const XMLHandler_Ptr listings
= this->get_listings (listing_lf
, only_changes
);
652 // failed to retrieve listings
658 listings
->remove_unmatched (*this);
661 const ACE_Vector
<ACE_TString
>& filenames
= listings
->filenames ();
662 size_t const sz
= filenames
.size ();
663 if (this->opts_
.debug() > 9)
665 ORBSVCS_DEBUG((LM_INFO
, ACE_TEXT ("(%P|%t) persistent_load %d files\n"), sz
));
667 for (CORBA::ULong i
= 0; i
< sz
; ++i
)
669 const ACE_TString
& fname
= filenames
[i
];
670 Lockable_File
file(fname
, O_RDONLY
);
672 if (this->load_file (fname
, file
.get_file()) != 0)
674 this->load_file (fname
+ ACE_TEXT (".bak"));
681 Shared_Backing_Store::XMLHandler_Ptr
682 Shared_Backing_Store::get_listings (Lockable_File
& listing_lf
,
683 bool only_changes
) const
685 LocatorListings_XMLHandler
*raw_xml
= 0;
688 ACE_NEW_RETURN (raw_xml
,
689 LocatorListings_XMLHandler (this->filename_
,
696 ACE_NEW_RETURN (raw_xml
,
697 LocatorListings_XMLHandler (this->filename_
),
701 XMLHandler_Ptr
listings_handler (raw_xml
);
703 if (this->load_file (this->listing_file_
,
706 listing_lf
.get_file (this->listing_file_
, O_RDONLY
)) != 0)
708 if (this->load_file (this->listing_file_
+ ACE_TEXT (".bak"),
710 this->opts_
.debug()) != 0)
712 listings_handler
.reset();
716 return listings_handler
;
720 Shared_Backing_Store::sync_load ()
723 if (this->sync_needed_
== FULL_SYNC
)
725 if (this->opts_
.debug() > 5)
727 ORBSVCS_DEBUG((LM_INFO
,
728 ACE_TEXT("(%P|%t) sync_load FULL_SYNC\n")));
730 err
= this->persistent_load (false);
732 else if (this->sync_needed_
== INC_SYNC
)
734 if (this->sync_files_
.empty ())
738 if (this->opts_
.debug() > 5)
740 ORBSVCS_DEBUG((LM_INFO
,
741 ACE_TEXT("(%P|%t) sync_load INC_SYNC, %d files\n"),
742 this->sync_files_
.size ()));
745 // ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->sync_lock_, -1);
747 std::set
<ACE_TString
>::const_iterator fname
= this->sync_files_
.begin();
748 for ( ; fname
!= this->sync_files_
.end(); ++fname
)
750 if (this->opts_
.debug() > 6)
752 ORBSVCS_DEBUG((LM_INFO
,
753 ACE_TEXT("(%P|%t) sync_load %s\n"),
756 Lockable_File
file (*fname
, O_RDONLY
);
757 int ind_err
= this->load_file (*fname
, file
.get_file());
763 this->sync_files_
.clear();
766 this->sync_needed_
= NO_SYNC
;
771 Shared_Backing_Store::write_listing (FILE* list
)
773 ACE_OS::fprintf (list
,"<?xml version=\"1.0\"?>\n");
774 ACE_OS::fprintf (list
,"<ImRListing>\n");
778 Locator_Repository::SIMap::ENTRY
* sientry
= 0;
779 Locator_Repository::SIMap::CONST_ITERATOR
siit (this->servers ());
780 for (; siit
.next (sientry
); siit
.advance() )
782 const Server_Info_Ptr
& info
= sientry
->int_id_
;
784 this->find_unique_id (sientry
->ext_id_
, this->server_uids_
, uid
);
785 ACE_TString listing_name
= ACEXML_escape_string (ACE_TEXT_CHAR_TO_TCHAR (info
->key_name_
.c_str ()));
786 ::write_listing_item (list
, uid
.unique_filename
, ACE_TEXT_ALWAYS_CHAR (listing_name
.c_str ()),
787 Locator_XMLHandler::SERVER_INFO_TAG
);
791 Locator_Repository::AIMap::ENTRY
* aientry
= 0;
792 Locator_Repository::AIMap::CONST_ITERATOR
aiit (this->activators ());
793 for (; aiit
.next (aientry
); aiit
.advance ())
795 const ACE_CString
& aname
= aientry
->ext_id_
;
796 this->find_unique_id (aname
, this->activator_uids_
, uid
);
797 ::write_listing_item (list
, uid
.unique_filename
, aname
,
798 Locator_XMLHandler::ACTIVATOR_INFO_TAG
);
801 ACE_OS::fprintf (list
,"</ImRListing>\n");
805 Shared_Backing_Store::persist_listings (Lockable_File
& listing_lf
)
807 // ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->sync_lock_, -1);
808 FILE* list
= listing_lf
.get_file (this->listing_file_
, O_WRONLY
);
811 ORBSVCS_ERROR ((LM_ERROR
,
812 ACE_TEXT ("Couldn't write to file %s\n"),
813 this->listing_file_
.c_str()));
817 this->write_listing (list
);
819 const ACE_TString bfname
= this->listing_file_
+ ACE_TEXT (".bak");
822 FILE* baklist
= ACE_OS::fopen (bfname
.c_str(), ACE_TEXT("w"));
825 ORBSVCS_ERROR ((LM_ERROR
,
826 ACE_TEXT ("Couldn't write to file %s\n"),
831 this->write_listing (baklist
);
832 ACE_OS::fflush (baklist
);
833 ACE_OS::fclose (baklist
);
839 Shared_Backing_Store::report_ior(PortableServer::POA_ptr imr_poa
)
841 if (this->imr_type_
== Options::STANDALONE_IMR
)
843 return Locator_Repository::report_ior(imr_poa
);
846 CORBA::Object_var obj
= this->orb_
->resolve_initial_references ("AsyncIORTable");
847 IORTable::Table_var ior_table
= IORTable::Table::_narrow (obj
.in ());
848 ACE_ASSERT (! CORBA::is_nil (ior_table
.in ()));
850 const char* const replica_name(IMR_REPLICA
[this->imr_type_
]);
851 ACE_CString replica_filename
= replica_ior_filename(false);
852 FILE* fp
= ACE_OS::fopen (replica_filename
.c_str (), "w");
855 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
856 ACE_TEXT ("(%P|%t) ImR: Could not open file: %C\n"),
857 replica_filename
.c_str ()), -1);
860 CORBA::String_var replica_ior
= this->replicator_
.ior ();
861 ior_table
->bind(replica_name
, replica_ior
.in());
862 ACE_OS::fprintf (fp
, "%s", replica_ior
.in ());
866 // only report the imr ior if the fault tolerant ImR is complete
867 if (this->replicator_
.peer_available ())
869 err
= Locator_Repository::report_ior(imr_poa
);
876 Shared_Backing_Store::locator_service_ior (const char* peer_ior
) const
878 const CORBA::Object_ptr this_obj
=
879 this->orb_
->string_to_object(this->non_ft_imr_ior_
.in ());
880 const CORBA::Object_ptr peer_obj
=
881 this->orb_
->string_to_object(peer_ior
);
882 const CORBA::Object_ptr
& obj1
=
883 (this->imr_type_
== Options::PRIMARY_IMR
) ? this_obj
: peer_obj
;
884 const CORBA::Object_ptr
& obj2
=
885 (this->imr_type_
!= Options::PRIMARY_IMR
) ? this_obj
: peer_obj
;
887 CORBA::Object_var IORM
=
888 this->orb_
->resolve_initial_references (TAO_OBJID_IORMANIPULATION
, 0);
890 TAO_IOP::TAO_IOR_Manipulation_var iorm
=
891 TAO_IOP::TAO_IOR_Manipulation::_narrow (IORM
.in ());
893 CORBA::Object_var locator_service
= iorm
->add_profiles (obj1
, obj2
);
895 char* const combined_ior
=
896 this->orb_
->object_to_string(locator_service
.in ());
901 Shared_Backing_Store::load_server (Server_Info
*info
,
903 const NameValues
& extra_params
)
905 if (this->opts_
.debug() > 4)
907 ORBSVCS_DEBUG ((LM_DEBUG
,
908 ACE_TEXT ("(%P|%t) loading server <%C>\n"),
909 info
->key_name_
.c_str ()));
912 // Ensure there is an entry for this server
913 this->verify_unique_id (info
->key_name_
,
917 if (this->servers ().find (info
->key_name_
, si
) != 0)
919 // Create new or replace the existing entry
920 XML_Backing_Store::load_server (info
, server_started
, extra_params
);
924 bool is_started
= info
->is_running ();
925 bool was_started
= si
->is_running ();
932 si
->server
= ImplementationRepository::ServerObject::_nil();
936 // will create a new server below if no previous server
937 // or the ior has changed
938 server_started
= CORBA::is_nil(si
->server
.in ());
940 this->create_server (server_started
, si
);
941 if (was_started
&& !is_started
)
943 this->opts_
.pinger ()->remove_server (si
->key_name_
.c_str (), 0);
945 if (!was_started
&& is_started
)
947 CORBA::Object_var obj
=
948 this->loc_impl_
->set_timeout_policy (si
->server
.in (),
949 ACE_Time_Value (1,0));
950 si
->server
= ImplementationRepository::ServerObject::_narrow (obj
.in ());
951 this->opts_
.pinger ()->add_server (si
->key_name_
.c_str (),
952 this->opts_
.ping_external (),
960 Shared_Backing_Store::load_activator (const ACE_CString
& activator_name
,
962 const ACE_CString
& ior
,
963 const NameValues
& extra_params
)
965 // use this to make sure an unique id entry is created
966 this->verify_unique_id (activator_name
,
968 this->activator_uids_
);
969 XML_Backing_Store::load_activator (activator_name
, token
, ior
, extra_params
);
973 Shared_Backing_Store::notify_remote_access (const char * id
,
974 ImplementationRepository::AAM_Status s
)
976 this->replicator_
.send_access_state (id
, s
);
980 Shared_Backing_Store::updates_available
981 (const ImplementationRepository::UpdateInfoSeq
& info
, bool seq_gap
)
983 // ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->sync_lock_);
984 CORBA::Long len
= this->updates_
.length ();
985 this->updates_
.length (len
+ info
.length () + (seq_gap
? 1 : 0));
989 for (CORBA::Long i
= 0; !found
&& i
< len
; i
++)
991 if (this->updates_
[i
].action
._d () == ImplementationRepository::repo_update
)
993 this->updates_
[i
].action
.info ().repo
.repo_id
= -1;
994 this->updates_
[i
].action
.info ().repo
.repo_type
= -1;
1000 ImplementationRepository::RepoInfo rinfo
;
1001 rinfo
.kind
= ImplementationRepository::repo_server
;
1002 rinfo
.repo
.repo_id
= -1;
1003 rinfo
.repo
.repo_type
= -1;
1004 this->updates_
[len
++].action
.info (rinfo
);
1007 for (CORBA::ULong i
= 0; i
< info
.length (); i
++)
1009 if (info
[i
].action
._d () == ImplementationRepository::access
|| !seq_gap
)
1011 this->updates_
[len
++] = info
[i
];
1014 this->updates_
.length (len
);
1016 if (this->notified_
)
1018 this->notified_
= true;
1019 this->orb_
->orb_core ()->reactor ()->notify (&this->update_handler_
);
1023 Shared_Backing_Store::Update_Handler::handle_exception (ACE_HANDLE
)
1025 this->owner_
->process_updates ();
1030 Shared_Backing_Store::process_updates ()
1032 // ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->sync_lock_);
1033 this->notified_
= false;
1034 this->sync_needed_
= NO_SYNC
;
1035 for (CORBA::ULong i
= 0; i
< this->updates_
.length (); i
++)
1037 ImplementationRepository::UpdateInfo
&entity
= this->updates_
[i
];
1038 switch (entity
.action
._d ())
1040 case ImplementationRepository::access
:
1042 if (this->opts_
.debug() > 4)
1044 ORBSVCS_DEBUG ((LM_INFO
,
1045 ACE_TEXT("(%P|%t) notify_access_state_update, <%C> now <%C>\n"),
1047 AsyncAccessManager::status_name (entity
.action
.state ())));
1049 this->loc_impl_
->remote_access_update (entity
.name
.in (),
1050 entity
.action
.state ());
1053 case ImplementationRepository::repo_update
:
1055 if (this->sync_needed_
== FULL_SYNC
)
1059 if (entity
.action
.info().repo
.repo_id
== -1)
1061 this->sync_needed_
= FULL_SYNC
;
1062 this->sync_files_
.clear();
1065 this->sync_needed_
= INC_SYNC
;
1066 const ACE_CString name
= entity
.name
.in ();
1067 Options::ImrType repo_type
= (Options::ImrType
)entity
.action
.info().repo
.repo_type
;
1068 unsigned int repo_id
= entity
.action
.info().repo
.repo_id
;
1070 update_unique_id (name
,
1071 entity
.action
.info().kind
== ImplementationRepository::repo_activator
?
1072 this->activator_uids_
:
1074 repo_type
, repo_id
, uid
);
1075 const ACE_TString fname
= this->filename_
+ uid
.unique_filename
;
1076 this->sync_files_
.insert (fname
);
1079 case ImplementationRepository::repo_remove
:
1081 const ACE_CString name
= entity
.name
.in ();
1082 if (entity
.action
.kind() == ImplementationRepository::repo_activator
)
1084 this->activators().unbind (name
);
1088 this->opts_
.pinger ()->remove_server (name
.c_str(), 0);
1089 this->servers().unbind (name
);
1095 this->updates_
.length (0);
1101 Shared_Backing_Store::gen_ior (char*& ft_imr_ior
)
1103 // store off original char* to ensure memory cleanup
1104 CORBA::String_var ior
= ft_imr_ior
;
1106 // if we already have the fault tolerant ImR ior
1107 // then just copy it
1110 if (this->opts_
.debug() > 2)
1112 ORBSVCS_DEBUG ((LM_INFO
,
1113 ACE_TEXT("(%P|%t) Already registered <%C>\n"),
1114 this->imr_ior_
.in()));
1117 ior
= this->imr_ior_
.in();
1119 ft_imr_ior
= ior
._retn();
1123 // otherwise we need to combine the primary and backup ior to make
1124 // the fault tolerant ImR ior
1125 char* combined_ior
= 0;
1126 CORBA::String_var reason
;
1129 combined_ior
= locator_service_ior(ft_imr_ior
);
1131 catch (const TAO_IOP::Invalid_IOR
& )
1133 reason
= "invalid ior";
1135 catch (const TAO_IOP::EmptyProfileList
& )
1137 reason
= "no profiles";
1139 catch (const TAO_IOP::Duplicate
& )
1141 reason
= "duplicate profile";
1144 if (combined_ior
== 0)
1146 // give back the original pointer and don't clean it up
1147 ft_imr_ior
= ior
._retn();
1148 ORBSVCS_ERROR((LM_ERROR
,
1149 "ERROR: Failed to create Fault Tolerant ImR, reason=%C\n",
1151 throw ImplementationRepository::InvalidPeer(reason
.in());
1154 ft_imr_ior
= combined_ior
;
1155 // pass as const char* to make sure string is copied
1156 this->imr_ior_
= (const char*)ft_imr_ior
;
1158 PortableServer::POA_var null_poa
;
1159 Locator_Repository::report_ior(null_poa
.in ());
1163 Shared_Backing_Store::replica_ior_filename(bool peer_ior_file
) const
1165 Options::ImrType desired_type
= this->imr_type_
;
1168 desired_type
= (desired_type
== Options::PRIMARY_IMR
) ?
1169 Options::BACKUP_IMR
:
1170 Options::PRIMARY_IMR
;
1172 ACE_CString ior
= ACE_TEXT_ALWAYS_CHAR (this->filename_
.c_str());
1173 ior
+= IMR_REPLICA
[desired_type
];
1179 Shared_Backing_Store::LocatorListings_XMLHandler::LocatorListings_XMLHandler(
1180 const ACE_TString
& dir
)
1182 only_changes_(false)
1186 Shared_Backing_Store::LocatorListings_XMLHandler::LocatorListings_XMLHandler(
1187 const ACE_TString
& dir
,
1188 const Locator_Repository::SIMap
& servers
,
1189 const Locator_Repository::AIMap
& activators
)
1193 Locator_Repository::SIMap::ENTRY
* sientry
= 0;
1194 Locator_Repository::SIMap::CONST_ITERATOR
siit (servers
);
1195 for (; siit
.next (sientry
); siit
.advance() )
1197 unmatched_servers_
.bind (sientry
->ext_id_
, sientry
->int_id_
);
1200 Locator_Repository::AIMap::ENTRY
* aientry
= 0;
1201 Locator_Repository::AIMap::CONST_ITERATOR
aiit (activators
);
1202 for (; aiit
.next (aientry
); aiit
.advance() )
1204 unmatched_activators_
.bind (aientry
->ext_id_
, aientry
->int_id_
);
1209 Shared_Backing_Store::LocatorListings_XMLHandler::startElement (
1210 const ACEXML_Char
* ,
1211 const ACEXML_Char
* ,
1212 const ACEXML_Char
* qName
,
1213 ACEXML_Attributes
* attrs
)
1216 (ACE_OS::strcasecmp (qName
, Locator_XMLHandler::SERVER_INFO_TAG
) == 0);
1218 (ACE_OS::strcasecmp (qName
, Locator_XMLHandler::ACTIVATOR_INFO_TAG
) != 0))
1223 if (attrs
!= 0 && attrs
->getLength () == 2)
1225 ACE_TString fname
= attrs
->getValue ((size_t)0);
1226 bool store_fname
= !only_changes_
;
1229 ACE_CString name
= ACE_TEXT_ALWAYS_CHAR(attrs
->getValue ((size_t)1));
1230 // if the name is not present, then this is an add, so store it
1231 store_fname
= server
?
1232 (unmatched_servers_
.unbind (name
) != 0) :
1233 (unmatched_activators_
.unbind (name
) != 0);
1238 filenames_
.push_back(dir_
+ fname
);
1243 ORBSVCS_DEBUG(( LM_DEBUG
,
1244 ACE_TEXT ("LocatorListings_XMLHandler::startElement ")
1245 ACE_TEXT ("incorrect number of attrs, %d not 2\n"),
1246 attrs
== 0 ? 0 : attrs
->getLength ()));
1251 Shared_Backing_Store::LocatorListings_XMLHandler::endElement (
1252 const ACEXML_Char
* ,
1253 const ACEXML_Char
* ,
1254 const ACEXML_Char
* )
1259 Shared_Backing_Store::LocatorListings_XMLHandler::remove_unmatched(
1260 Locator_Repository
& repo
)
1262 Locator_Repository::SIMap::ENTRY
* sientry
= 0;
1263 Locator_Repository::SIMap::CONST_ITERATOR
siit (this->unmatched_servers_
);
1264 for (; siit
.next (sientry
); siit
.advance() )
1266 int const ret
= repo
.servers().unbind (sientry
->ext_id_
);
1269 ORBSVCS_ERROR((LM_ERROR
,
1270 ACE_TEXT ("ERROR: could not remove server: %C\n"),
1271 sientry
->int_id_
->key_name_
.c_str()));
1275 Locator_Repository::AIMap::ENTRY
* aientry
= 0;
1276 Locator_Repository::AIMap::CONST_ITERATOR
aiit (this->unmatched_activators_
);
1277 for (; aiit
.next (aientry
); aiit
.advance ())
1279 int ret
= repo
.activators().unbind (aientry
->ext_id_
);
1282 ORBSVCS_ERROR((LM_ERROR
,
1283 ACE_TEXT ("ERROR: could not remove activator: %C\n"),
1284 aientry
->int_id_
->name
.c_str()));
1289 const ACE_Vector
<ACE_TString
>&
1290 Shared_Backing_Store::LocatorListings_XMLHandler::filenames() const
1292 return this->filenames_
;