Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / examples / FaultTolerance / RolyPoly / ReplicaController.cpp
blob853644a347777fa3fa2137443f77f86a6fdaa835
1 // file : RolyPoly/ReplicaController.cpp
2 // author : Boris Kolpackov <boris@dre.vanderbilt.edu>
3 #include "tao/AnyTypeCode/Any_Impl.h"
4 #include "tao/AnyTypeCode/TypeCode.h"
5 #include "tao/AnyTypeCode/DynamicC.h"
6 #include "tao/PortableServer/Servant_Base.h"
7 #include "tao/PI/PI.h"
9 #include "orbsvcs/FT_CORBA_ORBC.h"
11 #include "ace/UUID.h"
12 #include "ace/Thread_Manager.h"
13 #include "ace/TMCast/Group.hpp"
15 #include "CrashPoint.h"
16 #include "StateUpdate.h"
17 #include "ReplicaController.h"
20 // State slot.
23 namespace
25 PortableInterceptor::SlotId state_slot_id_;
28 PortableInterceptor::SlotId
29 state_slot_id ()
31 return state_slot_id_;
34 void
35 state_slot_id (PortableInterceptor::SlotId slot_id)
37 state_slot_id_ = slot_id;
40 Checkpointable::
41 ~Checkpointable ()
45 CORBA::Any* Checkpointable::
46 get_state ()
48 return 0;
51 void Checkpointable::
52 associate_state (CORBA::ORB_ptr orb, CORBA::Any const& state)
54 try
56 CORBA::Object_var pic_obj =
57 orb->resolve_initial_references ("PICurrent");
60 PortableInterceptor::Current_var pic =
61 PortableInterceptor::Current::_narrow (
62 pic_obj.in ());
65 pic->set_slot (state_slot_id (), state);
68 catch (const CORBA::Exception& ex)
70 ex._tao_print_exception ("Caught exception:");
74 // ReplyLogger
77 ReplicaController::
78 ~ReplicaController ()
82 ReplicaController::
83 ReplicaController (CORBA::ORB_ptr orb)
84 : orb_ (CORBA::ORB::_duplicate (orb))
86 try
88 CORBA::Object_var poa_object =
89 orb_->resolve_initial_references ("RootPOA");
91 root_poa_ = PortableServer::POA::_narrow (
92 poa_object.in ());
94 catch (const CORBA::Exception& ex)
96 ex._tao_print_exception ("Caught exception:");
97 ACE_OS::abort ();
100 // Generate member id
101 ACE_Utils::UUID uuid;
102 ACE_Utils::UUID_GENERATOR::instance ()->init ();
103 ACE_Utils::UUID_GENERATOR::instance ()->generate_UUID (uuid);
105 ACE_INET_Addr address (10000, "239.255.0.1");
107 ACE_DEBUG ((LM_DEBUG, "Becoming a member with id %s\n",
108 uuid.to_string ()->c_str ()));
110 ACE_auto_ptr_reset (group_, new ACE_TMCast::Group (address, uuid.to_string ()->c_str ()));
112 int r = ACE_Thread_Manager::instance ()->spawn (
113 &ReplicaController::listener_thunk, this);
115 if (r < 0)
117 orb_->shutdown (0);
121 void ReplicaController::
122 listener ()
126 for (char buffer[1024];;)
128 size_t n = group_->recv (buffer, sizeof (buffer));
130 ACE_HEX_DUMP ((LM_DEBUG, buffer, n));
132 TAO_InputCDR cdr (buffer, n);
134 CORBA::OctetSeq object_id;
135 PortableInterceptor::AdapterName adapter_name;
136 CORBA::String_var client_id;
137 CORBA::Long retention_id;
138 CORBA::OctetSeq reply;
139 CORBA::Any state;
141 cdr >> object_id;
142 cdr >> adapter_name;
143 cdr >> client_id.out ();
144 cdr >> retention_id;
145 cdr >> reply;
146 cdr >> state;
148 if (!cdr.good_bit ())
150 ACE_DEBUG ((LM_DEBUG, "CDR failed\n"));
151 //@@ what to do?
154 ACE_DEBUG ((LM_DEBUG,
155 "Received log for %s with rid %i\n",
156 client_id.in (),
157 retention_id));
160 RecordId rid (client_id.in (), retention_id);
162 CORBA::OctetSeq_var tmp (new CORBA::OctetSeq (reply));
163 log_.insert (rid, tmp);
165 // Update state.
166 CORBA::TypeCode_var tc = state.type ();
168 if (tc->kind () != CORBA::tk_null)
170 PortableServer::POA_var poa = resolve_poa (adapter_name);
172 PortableServer::ServantBase_var servant =
173 poa->id_to_servant (object_id);
175 Checkpointable* target =
176 dynamic_cast<Checkpointable*> (servant.in ());
178 if (target) target->set_state (state);
182 catch (ACE_TMCast::Group::Failed const&)
184 ACE_DEBUG ((LM_DEBUG,
185 "Group failure. Perhaps, I am alone in the group.\n"));
187 catch (ACE_TMCast::Group::InsufficienSpace const&)
189 ACE_DEBUG ((LM_DEBUG, "Group::InsufficienSpace\n"));
192 orb_->shutdown (0);
195 PortableServer::POA_ptr ReplicaController::
196 resolve_poa (PortableInterceptor::AdapterName const&)
198 //@@ Assume for now it's a root poa.
199 return PortableServer::POA::_duplicate (root_poa_.in ());
203 ACE_THR_FUNC_RETURN ReplicaController::
204 listener_thunk (void* p)
206 ReplicaController* obj = reinterpret_cast<ReplicaController*> (p);
207 obj->listener();
208 return 0;
211 namespace
213 FT::FTRequestServiceContext*
214 extract_context (
215 PortableInterceptor::ServerRequestInfo_ptr ri);
218 #if TAO_HAS_EXTENDED_FT_INTERCEPTORS == 1
219 void
220 ReplicaController::tao_ft_interception_point (
221 PortableInterceptor::ServerRequestInfo_ptr ri,
222 CORBA::OctetSeq_out ocs)
224 FT::FTRequestServiceContext_var ftr (
225 extract_context (ri));
227 ACE_DEBUG ((LM_DEBUG,
228 "(%P|%t) Received request from %C with rid %i\n",
229 ftr->client_id.in (),
230 ftr->retention_id));
232 // Check if this request is eligible for replay.
234 RecordId rid (ftr->client_id.in (), ftr->retention_id);
236 if (log_.contains (rid))
238 ACE_DEBUG ((LM_DEBUG,
239 "(%P|%t) Replaying reply for %C with rid %i\n",
240 ftr->client_id.in (),
241 ftr->retention_id));
243 CORBA::OctetSeq_var copy (log_.lookup (rid)); // make a copy
245 ocs = copy._retn ();
248 return;
251 #endif /*TAO_HAS_EXTENDED_FT_INTERCEPTORS*/
253 void
254 ReplicaController::send_reply (
255 PortableInterceptor::ServerRequestInfo_ptr ri)
257 FT::FTRequestServiceContext_var ftr (
258 extract_context (ri));
261 ACE_DEBUG ((LM_DEBUG,
262 "(%P|%t) Sending reply for %s with rid %i\n",
263 ftr->client_id.in (),
264 ftr->retention_id));
267 // Prepare reply for logging.
269 CORBA::Any_var result =
270 ri->result ();
272 TAO_OutputCDR cdr;
273 result->impl ()->marshal_value (cdr);
275 Dynamic::ParameterList_var pl =
276 ri->arguments ();
278 CORBA::ULong len = pl->length ();
280 for (CORBA::ULong index = 0; index != len ; ++index)
282 //@@ No chance for PARAM_OUT
283 if ((*pl)[index].mode == CORBA::PARAM_INOUT)
285 (*pl)[index].argument.impl ()->marshal_value (cdr);
289 CORBA::OctetSeq_var reply;
291 ACE_NEW (reply.out (), CORBA::OctetSeq (cdr.total_length ()));
293 reply->length (cdr.total_length ());
295 CORBA::Octet* buf = reply->get_buffer ();
297 // @@ What if this throws an exception?? We don't have any way to
298 // check whether this succeeded
299 for (ACE_Message_Block const* mb = cdr.begin ();
300 mb != 0;
301 mb = mb->cont ())
303 ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ());
304 buf += mb->length ();
307 // Logging the reply and state update.
310 // First send message to members.
313 // Extract state update.
315 CORBA::OctetSeq_var oid = ri->object_id ();
316 PortableInterceptor::AdapterName_var an = ri->adapter_name ();
318 CORBA::Any_var state = ri->get_slot (state_slot_id ());
320 CORBA::TypeCode_var tc = state->type ();
322 if (tc->kind () == CORBA::tk_null)
324 ACE_DEBUG ((LM_DEBUG, "Slot update is void\n"));
326 PortableServer::POA_var poa = resolve_poa (an.in ());
328 PortableServer::ServantBase_var servant =
329 poa->id_to_servant (oid.in ());
331 Checkpointable* target =
332 dynamic_cast<Checkpointable*> (servant.in ());
334 if (target)
336 CORBA::Any_var tmp = target->get_state ();
338 if (tmp.ptr () != 0) state = tmp._retn ();
342 TAO_OutputCDR cdr;
344 cdr << oid.in ();
345 cdr << an.in ();
346 cdr << ftr->client_id.in ();
347 cdr << ftr->retention_id;
348 cdr << reply.in ();
349 cdr << state.in ();
351 size_t size = cdr.total_length ();
353 CORBA::OctetSeq_var msg;
355 ACE_NEW (msg.out (), CORBA::OctetSeq (size));
357 msg->length (size);
360 CORBA::Octet* buf = msg->get_buffer ();
362 for (ACE_Message_Block const* mb = cdr.begin ();
363 mb != 0;
364 mb = mb->cont ())
366 ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ());
367 buf += mb->length ();
371 CORBA::Octet* buf = msg->get_buffer ();
373 // Crash point 1.
375 if (crash_point == 1 && ftr->retention_id > 2) ACE_OS::exit (1);
379 while (true)
383 group_->send (buf, size);
384 ACE_DEBUG ((LM_DEBUG, "Sent log record of length %i\n", size));
385 break;
387 catch (ACE_TMCast::Group::Aborted const&)
389 ACE_DEBUG ((LM_DEBUG, "Retrying to send log record.\n"));
393 catch (ACE_TMCast::Group::Failed const&)
395 ACE_DEBUG ((LM_DEBUG,
396 "Group failure. Perhaps, I am alone in the group.\n"));
401 // Now perform local logging.
403 RecordId rid (ftr->client_id.in (), ftr->retention_id);
405 // This is slow but eh-safe ;-).
407 log_.insert (rid, reply);
410 // Crash point 2.
412 if (crash_point == 2 && ftr->retention_id > 2) ACE_OS::exit (1);
416 namespace
418 FT::FTRequestServiceContext*
419 extract_context (PortableInterceptor::ServerRequestInfo_ptr ri)
421 IOP::ServiceContext_var svc =
422 ri->get_request_service_context (IOP::FT_REQUEST);
424 TAO_InputCDR cdr (reinterpret_cast<const char*> (svc->context_data.get_buffer ()),
425 svc->context_data.length ());
427 CORBA::Boolean byte_order;
429 if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
431 //@@ what to throw?
432 throw CORBA::BAD_CONTEXT ();
435 cdr.reset_byte_order (static_cast<int> (byte_order));
437 // Funny, the following two lines should normally translate
438 // just to one ctor call. But because we have to use this
439 // ACE_NEW macro hackery we have a default ctor call plus
440 // assignment operator call. Yet another example how the
441 // majority is being penalized by some broken platforms.
443 FT::FTRequestServiceContext_var req;
445 //@@ completed status maybe wrong
447 ACE_NEW_THROW_EX (req,
448 FT::FTRequestServiceContext,
449 CORBA::NO_MEMORY (
450 CORBA::SystemException::_tao_minor_code (
451 TAO::VMCID,
452 ENOMEM),
453 CORBA::COMPLETED_NO));
455 cdr >> *req;
457 if (!cdr.good_bit ())
459 //@@ what to throw?
460 throw CORBA::UNKNOWN ();
463 return req._retn ();
468 char*
469 ReplicaController::name (void)
471 return CORBA::string_dup ("ReplicaController");
474 void
475 ReplicaController::send_exception (
476 PortableInterceptor::ServerRequestInfo_ptr)
480 void
481 ReplicaController::send_other (
482 PortableInterceptor::ServerRequestInfo_ptr)
486 void
487 ReplicaController::destroy (void)
491 void
492 ReplicaController::receive_request_service_contexts (
493 PortableInterceptor::ServerRequestInfo_ptr)
498 void
499 ReplicaController::receive_request (
500 PortableInterceptor::ServerRequestInfo_ptr)