1 #include "orbsvcs/Log_Macros.h"
2 #include "Replicator.h"
3 #include "Shared_Backing_Store.h"
4 #include "tao/ORB_Core.h"
5 #include "ace/OS_NS_stdio.h"
6 #include "ace/OS_NS_strings.h"
7 #include "ace/OS_NS_ctype.h"
8 #include "ace/OS_NS_unistd.h"
10 UPN_i::UPN_i (Replicator
&owner
)
16 UPN_i::notify_update (CORBA::ULongLong seq_num
,
17 const ImplementationRepository::UpdateInfoSeq
& info
)
20 CORBA::ULongLong expected
= ++this->owner_
.replica_seq_num_
;
21 if (expected
< seq_num
)
24 if (this->owner_
.debug_
> 0)
26 ORBSVCS_DEBUG ((LM_DEBUG
,
27 ACE_TEXT ("(%P|%t) UPN_i::notify_updated_entity ")
28 ACE_TEXT ("expected %Lu got %Lu\n"),
31 this->owner_
.replica_seq_num_
= seq_num
;
33 else if (expected
> seq_num
)
35 if (this->owner_
.debug_
> 0)
37 ORBSVCS_DEBUG ((LM_DEBUG
,
38 ACE_TEXT ("(%P|%t) UPN_i::notify_updated_entity ")
39 ACE_TEXT ("expected %Lu got %Lu\n"),
42 --this->owner_
.replica_seq_num_
;
44 this->owner_
.repo_
.updates_available (info
, missed
);
48 UPN_i::register_replica
49 (ImplementationRepository::UpdatePushNotification_ptr replica
,
51 CORBA::ULongLong_out seq_num
)
54 ImplementationRepository::UpdatePushNotification::_duplicate (replica
);
55 this->owner_
.replica_seq_num_
= 0;
57 this->owner_
.repo_
.gen_ior (ft_imr_ior
);
58 seq_num
= this->owner_
.seq_num_
;
61 //---------------------------------------------------------------------------
63 Replicator::Replicator (Shared_Backing_Store
&repo
, const Options
& opts
)
75 debug_ (opts
.debug ()),
76 endpoint_ (opts
.ft_endpoint ()),
77 update_delay_ (opts
.ft_update_delay ())
81 Replicator::~Replicator()
86 Replicator::init_orb ()
90 argv
[0] = ACE_OS::strdup (ACE_TEXT (""));
91 argv
[1] = ACE_OS::strdup (ACE_TEXT ("-ORBIgnoreDefaultSvcConfFile"));
92 argv
[2] = ACE_OS::strdup (ACE_TEXT ("-ORBGestalt"));
93 argv
[3] = ACE_OS::strdup (ACE_TEXT ("Local"));
94 argv
[4] = ACE_OS::strdup (ACE_TEXT ("-ORBSvcConfDirective"));
95 argv
[5] = ACE_OS::strdup (ACE_TEXT ("static Client_Strategy_Factory \"-ORBConnectStrategy Blocked -ORBDefaultSyncScope server\""));
96 if (endpoint_
.length ())
98 argv
[6] = ACE_OS::strdup (ACE_TEXT ("-ORBListenEnpoints"));
99 argv
[7] = ACE_OS::strdup (ACE_TEXT_CHAR_TO_TCHAR (endpoint_
.c_str ()));
106 this->orb_
= CORBA::ORB_init (argc
, argv
, "replicator_orb");
107 this->reactor_
= this->orb_
->orb_core ()->reactor ();
109 CORBA::Object_var obj
= this->orb_
->resolve_initial_references ("RootPOA");
110 PortableServer::POA_var poa
= PortableServer::POA::_narrow (obj
.in ());
111 PortableServer::ServantBase_var servant
;
112 ACE_NEW (servant
, UPN_i (*this));
113 PortableServer::ObjectId_var oid
= poa
->activate_object (servant
.in ());
114 obj
= poa
->id_to_reference (oid
.in ());
116 ImplementationRepository::UpdatePushNotification::_narrow (obj
.in ());
117 PortableServer::POAManager_var mgr
= poa
->the_POAManager ();
124 return this->orb_
->object_to_string (this->me_
.in ());
128 Replicator::peer_available ()
130 return !CORBA::is_nil (this->peer_
.in ());
136 if (this->reactor_
== 0)
140 this->orb_
->shutdown (true);
146 if (this->reactor_
== 0)
154 catch (const CORBA::SystemException
&ex
)
156 if (this->debug_
> 0)
158 ORBSVCS_DEBUG ((LM_DEBUG
,
159 ACE_TEXT ("(%P|%t) Replicator::svc caught %C\n"),
160 ex
._info ().c_str ()));
168 Replicator::handle_exception (ACE_HANDLE
)
170 ACE_OS::sleep (this->update_delay_
);
171 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, mon
, this->lock_
, -1);
172 this->notified_
= false;
173 if (this->to_send_
.length () == 0)
177 if (CORBA::is_nil (this->peer_
))
179 this->to_send_
.length (0);
184 CORBA::Long len
= this->to_send_
.length ();
185 ImplementationRepository::UpdateInfoSeq
payload (len
);
186 payload
.length (len
);
188 for (CORBA::Long l
= 0; l
< len
; l
++)
190 if (this->to_send_
[l
].action
._d () != ImplementationRepository::access
||
191 this->to_send_
[l
].action
.state () != ImplementationRepository::AAM_UPDATE_FAILED
)
193 payload
[p
++] = this->to_send_
[l
];
194 if (this->to_send_
[l
].action
._d () != ImplementationRepository::access
)
196 ACE_CString sid
= this->to_send_
[l
].name
.in();
197 this->prev_update_
.bind (sid
, this->to_send_
[l
].action
.state ());
202 this->to_send_
.length (0);
203 CORBA::ULongLong seq
= ++this->seq_num_
;
205 this->peer_
->notify_update (seq
, payload
);
207 catch (const CORBA::Exception
&)
210 ImplementationRepository::UpdatePushNotification::_nil ();
216 Replicator::send_access_state (const char *name
, ImplementationRepository::AAM_Status state
)
218 if (this->reactor_
== 0)
222 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
223 ImplementationRepository::AAM_Status p
=
224 ImplementationRepository::AAM_UPDATE_FAILED
;
225 ACE_CString sid
= name
;
226 bool prev
= this->prev_update_
.find (sid
, p
) == 0 && p
== state
;
228 CORBA::Long len
= this->to_send_
.length ();
230 for (CORBA::Long i
= 0; !found
&& i
< len
; i
++)
232 if (this->to_send_
[i
].action
._d () == ImplementationRepository::access
&&
233 ACE_OS::strcmp (this->to_send_
[i
].name
, name
) == 0)
236 this->to_send_
[i
].action
.state
237 (prev
? ImplementationRepository::AAM_UPDATE_FAILED
: state
);
242 this->to_send_
.length (len
+1);
243 this->to_send_
[len
].name
= CORBA::string_dup (name
);
244 this->to_send_
[len
].action
.state (state
);
248 this->notified_
= true;
249 this->reactor_
->notify (this);
253 Replicator::send_entity (ImplementationRepository::UpdateInfo
&info
)
255 if (this->reactor_
== 0)
259 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
260 CORBA::Long len
= this->to_send_
.length ();
262 for (CORBA::Long i
= 0; !found
&& i
< len
; i
++)
264 if (ACE_OS::strcmp (this->to_send_
[i
].name
, info
.name
) == 0)
266 if (this->to_send_
[i
].action
._d () == info
.action
._d ())
268 switch (info
.action
._d ())
270 case ImplementationRepository::repo_update
:
272 found
= (this->to_send_
[i
].action
.info ().kind
== info
.action
.info ().kind
);
275 case ImplementationRepository::repo_remove
:
277 found
= (this->to_send_
[i
].action
.kind () == info
.action
.kind ());
287 if (this->to_send_
[i
].action
._d () == ImplementationRepository::access
)
291 switch (info
.action
._d ())
293 case ImplementationRepository::repo_update
:
295 found
= (this->to_send_
[i
].action
.kind () == info
.action
.info ().kind
);
298 case ImplementationRepository::repo_remove
:
300 found
= (this->to_send_
[i
].action
.info ().kind
== info
.action
.kind ());
309 this->to_send_
[i
].action
= info
.action
;
316 this->to_send_
.length (len
+1);
317 this->to_send_
[len
] = info
;
321 this->notified_
= true;
322 this->reactor_
->notify (this);
326 Replicator::send_registration (char *&imr_ior
)
328 if (this->debug_
> 1)
330 ORBSVCS_DEBUG((LM_INFO
,
331 ACE_TEXT("Registering with previously running ImR replica\n")));
336 this->peer_
->register_replica(this->me_
,
338 this->replica_seq_num_
);
340 catch (const ImplementationRepository::InvalidPeer
& ip
)
342 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
343 ACE_TEXT("Error: Replicator::send_registration invalid ImR replica because %C\n"),
344 ip
.reason
.in()), -1);
347 if (this->debug_
> 9)
349 ORBSVCS_DEBUG ((LM_INFO
,
350 ACE_TEXT("Initializing repository with ft ior=<%C> ")
351 ACE_TEXT("and replica seq number %Lu\n"),
352 imr_ior
, replica_seq_num_
));
359 Replicator::init_peer (const ACE_CString
&replica_ior_file
)
361 if (this->debug_
> 1)
363 ORBSVCS_DEBUG ((LM_INFO
,
364 ACE_TEXT("Resolving ImR replica %C\n"),
365 replica_ior_file
.c_str()));
368 if (ACE_OS::access (replica_ior_file
.c_str (), F_OK
) != 0)
371 ImplementationRepository::UpdatePushNotification::_nil();
375 ACE_CString replica_ior
= "file://" + replica_ior_file
;
376 CORBA::Object_var obj
=
377 this->orb_
->string_to_object (replica_ior
.c_str());
379 if (!CORBA::is_nil (obj
.in ()))
381 bool non_exist
= true;
384 this->peer_
= ImplementationRepository::
385 UpdatePushNotification::_narrow (obj
.in());
386 non_exist
= (this->peer_
->_non_existent() == 1);
388 catch (const CORBA::Exception
& )
395 ImplementationRepository::UpdatePushNotification::_nil();