Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / TAO / orbsvcs / ImplRepo_Service / Replicator.cpp
blobe14f60fd7f47defa0a639c7803a09a1c9ed76479
1 #include "orbsvcs/Log_Macros.h"
2 #include "Replicator.h"
3 #include "Shared_Backing_Store.h"
4 #include "tao/ORB_Core.h"
5 #include "ace/OS_NS_stdio.h"
6 #include "ace/OS_NS_strings.h"
7 #include "ace/OS_NS_ctype.h"
8 #include "ace/OS_NS_unistd.h"
10 UPN_i::UPN_i (Replicator &owner)
11 :owner_ (owner)
15 void
16 UPN_i::notify_update (CORBA::ULongLong seq_num,
17 const ImplementationRepository::UpdateInfoSeq& info)
19 bool missed = false;
20 CORBA::ULongLong expected = ++this->owner_.replica_seq_num_;
21 if (expected < seq_num)
23 missed = true;
24 if (this->owner_.debug_ > 0)
26 ORBSVCS_DEBUG ((LM_DEBUG,
27 ACE_TEXT ("(%P|%t) UPN_i::notify_updated_entity ")
28 ACE_TEXT ("expected %Lu got %Lu\n"),
29 expected, seq_num ));
31 this->owner_.replica_seq_num_ = seq_num;
33 else if (expected > seq_num)
35 if (this->owner_.debug_ > 0)
37 ORBSVCS_DEBUG ((LM_DEBUG,
38 ACE_TEXT ("(%P|%t) UPN_i::notify_updated_entity ")
39 ACE_TEXT ("expected %Lu got %Lu\n"),
40 expected, seq_num ));
42 --this->owner_.replica_seq_num_;
44 this->owner_.repo_.updates_available (info, missed);
47 void
48 UPN_i::register_replica
49 (ImplementationRepository::UpdatePushNotification_ptr replica,
50 char*& ft_imr_ior,
51 CORBA::ULongLong_out seq_num)
53 this->owner_.peer_ =
54 ImplementationRepository::UpdatePushNotification::_duplicate (replica);
55 this->owner_.replica_seq_num_ = 0;
57 this->owner_.repo_.gen_ior (ft_imr_ior);
58 seq_num = this->owner_.seq_num_;
61 //---------------------------------------------------------------------------
63 Replicator::Replicator (Shared_Backing_Store &repo, const Options& opts)
64 : me_ (),
65 peer_ (),
66 seq_num_ (0),
67 replica_seq_num_ (0),
68 repo_ (repo),
69 prev_update_ (),
70 orb_ (),
71 reactor_ (0),
72 lock_ (),
73 notified_ (false),
74 to_send_ (10),
75 debug_ (opts.debug ()),
76 endpoint_ (opts.ft_endpoint ()),
77 update_delay_ (opts.ft_update_delay ())
81 Replicator::~Replicator()
85 void
86 Replicator::init_orb ()
88 int argc = 6;
89 ACE_TCHAR *argv[8];
90 argv[0] = ACE_OS::strdup (ACE_TEXT (""));
91 argv[1] = ACE_OS::strdup (ACE_TEXT ("-ORBIgnoreDefaultSvcConfFile"));
92 argv[2] = ACE_OS::strdup (ACE_TEXT ("-ORBGestalt"));
93 argv[3] = ACE_OS::strdup (ACE_TEXT ("Local"));
94 argv[4] = ACE_OS::strdup (ACE_TEXT ("-ORBSvcConfDirective"));
95 argv[5] = ACE_OS::strdup (ACE_TEXT ("static Client_Strategy_Factory \"-ORBConnectStrategy Blocked -ORBDefaultSyncScope server\""));
96 if (endpoint_.length ())
98 argv[6] = ACE_OS::strdup (ACE_TEXT ("-ORBListenEnpoints"));
99 argv[7] = ACE_OS::strdup (ACE_TEXT_CHAR_TO_TCHAR (endpoint_.c_str ()));
101 else
103 argv[6] = 0;
104 argv[7] = 0;
106 this->orb_ = CORBA::ORB_init (argc, argv, "replicator_orb");
107 this->reactor_ = this->orb_->orb_core ()->reactor ();
109 CORBA::Object_var obj = this->orb_->resolve_initial_references ("RootPOA");
110 PortableServer::POA_var poa = PortableServer::POA::_narrow (obj.in ());
111 PortableServer::ServantBase_var servant;
112 ACE_NEW (servant, UPN_i (*this));
113 PortableServer::ObjectId_var oid = poa->activate_object (servant.in ());
114 obj = poa->id_to_reference (oid.in ());
115 this->me_ =
116 ImplementationRepository::UpdatePushNotification::_narrow (obj.in ());
117 PortableServer::POAManager_var mgr = poa->the_POAManager ();
118 mgr->activate ();
121 char *
122 Replicator::ior ()
124 return this->orb_->object_to_string (this->me_.in ());
127 bool
128 Replicator::peer_available ()
130 return !CORBA::is_nil (this->peer_.in ());
133 void
134 Replicator::stop ()
136 if (this->reactor_ == 0)
138 return;
140 this->orb_->shutdown (true);
144 Replicator::svc ()
146 if (this->reactor_ == 0)
148 return 0;
152 this->orb_->run ();
154 catch (const CORBA::SystemException &ex)
156 if (this->debug_ > 0)
158 ORBSVCS_DEBUG ((LM_DEBUG,
159 ACE_TEXT ("(%P|%t) Replicator::svc caught %C\n"),
160 ex._info ().c_str ()));
163 return 0;
168 Replicator::handle_exception (ACE_HANDLE )
170 ACE_OS::sleep (this->update_delay_);
171 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->lock_, -1);
172 this->notified_ = false;
173 if (this->to_send_.length () == 0)
175 return 0;
177 if (CORBA::is_nil (this->peer_))
179 this->to_send_.length (0);
180 return 0;
184 CORBA::Long len = this->to_send_.length ();
185 ImplementationRepository::UpdateInfoSeq payload (len);
186 payload.length (len);
187 CORBA::Long p = 0;
188 for (CORBA::Long l = 0; l < len; l++)
190 if (this->to_send_[l].action._d () != ImplementationRepository::access ||
191 this->to_send_[l].action.state () != ImplementationRepository::AAM_UPDATE_FAILED)
193 payload[p++] = this->to_send_[l];
194 if (this->to_send_[l].action._d () != ImplementationRepository::access)
196 ACE_CString sid = this->to_send_[l].name.in();
197 this->prev_update_.bind (sid, this->to_send_[l].action.state ());
201 payload.length (p);
202 this->to_send_.length (0);
203 CORBA::ULongLong seq = ++this->seq_num_;
204 mon.release ();
205 this->peer_->notify_update (seq, payload);
207 catch (const CORBA::Exception &)
209 this->peer_ =
210 ImplementationRepository::UpdatePushNotification::_nil ();
212 return 0;
215 void
216 Replicator::send_access_state (const char *name, ImplementationRepository::AAM_Status state)
218 if (this->reactor_ == 0)
220 return;
222 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
223 ImplementationRepository::AAM_Status p =
224 ImplementationRepository::AAM_UPDATE_FAILED;
225 ACE_CString sid = name;
226 bool prev = this->prev_update_.find (sid, p) == 0 && p == state;
228 CORBA::Long len = this->to_send_.length ();
229 bool found = false;
230 for (CORBA::Long i = 0; !found && i < len; i++)
232 if (this->to_send_[i].action._d () == ImplementationRepository::access &&
233 ACE_OS::strcmp (this->to_send_[i].name, name) == 0)
235 found = true;
236 this->to_send_[i].action.state
237 (prev ? ImplementationRepository::AAM_UPDATE_FAILED : state);
240 if (!found && !prev)
242 this->to_send_.length (len+1);
243 this->to_send_[len].name = CORBA::string_dup (name);
244 this->to_send_[len].action.state (state);
246 if (this->notified_)
247 return;
248 this->notified_ = true;
249 this->reactor_->notify (this);
252 void
253 Replicator::send_entity (ImplementationRepository::UpdateInfo &info)
255 if (this->reactor_ == 0)
257 return;
259 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
260 CORBA::Long len = this->to_send_.length ();
261 bool found = false;
262 for (CORBA::Long i = 0; !found && i < len; i++)
264 if (ACE_OS::strcmp (this->to_send_[i].name, info.name) == 0)
266 if (this->to_send_[i].action._d () == info.action._d ())
268 switch (info.action._d ())
270 case ImplementationRepository::repo_update:
272 found = (this->to_send_[i].action.info ().kind == info.action.info ().kind );
273 break;
275 case ImplementationRepository::repo_remove:
277 found = (this->to_send_[i].action.kind () == info.action.kind ());
278 break;
280 default:
285 else
287 if (this->to_send_[i].action._d () == ImplementationRepository::access)
289 continue;
291 switch (info.action._d ())
293 case ImplementationRepository::repo_update:
295 found = (this->to_send_[i].action.kind () == info.action.info ().kind);
296 break;
298 case ImplementationRepository::repo_remove:
300 found = (this->to_send_[i].action.info ().kind == info.action.kind ());
301 break;
303 default:
307 if (found)
309 this->to_send_[i].action = info.action;
314 if (!found)
316 this->to_send_.length (len+1);
317 this->to_send_[len] = info;
319 if (this->notified_)
320 return;
321 this->notified_ = true;
322 this->reactor_->notify (this);
326 Replicator::send_registration (char *&imr_ior)
328 if (this->debug_ > 1)
330 ORBSVCS_DEBUG((LM_INFO,
331 ACE_TEXT("Registering with previously running ImR replica\n")));
336 this->peer_->register_replica(this->me_,
337 imr_ior,
338 this->replica_seq_num_);
340 catch (const ImplementationRepository::InvalidPeer& ip)
342 ORBSVCS_ERROR_RETURN ((LM_ERROR,
343 ACE_TEXT("Error: Replicator::send_registration invalid ImR replica because %C\n"),
344 ip.reason.in()), -1);
347 if (this->debug_ > 9)
349 ORBSVCS_DEBUG ((LM_INFO,
350 ACE_TEXT("Initializing repository with ft ior=<%C> ")
351 ACE_TEXT("and replica seq number %Lu\n"),
352 imr_ior, replica_seq_num_));
355 return 0;
358 bool
359 Replicator::init_peer (const ACE_CString &replica_ior_file)
361 if (this->debug_ > 1)
363 ORBSVCS_DEBUG ((LM_INFO,
364 ACE_TEXT("Resolving ImR replica %C\n"),
365 replica_ior_file.c_str()));
368 if (ACE_OS::access (replica_ior_file.c_str (), F_OK) != 0)
370 this->peer_ =
371 ImplementationRepository::UpdatePushNotification::_nil();
372 return false;
375 ACE_CString replica_ior = "file://" + replica_ior_file;
376 CORBA::Object_var obj =
377 this->orb_->string_to_object (replica_ior.c_str());
379 if (!CORBA::is_nil (obj.in ()))
381 bool non_exist = true;
384 this->peer_ = ImplementationRepository::
385 UpdatePushNotification::_narrow (obj.in());
386 non_exist = (this->peer_->_non_existent() == 1);
388 catch (const CORBA::Exception& )
392 if (non_exist)
394 this->peer_ =
395 ImplementationRepository::UpdatePushNotification::_nil();
398 return true;