1 // file : RolyPoly/ReplicaController.cpp
2 // author : Boris Kolpackov <boris@dre.vanderbilt.edu>
3 #include "tao/AnyTypeCode/Any_Impl.h"
4 #include "tao/AnyTypeCode/TypeCode.h"
5 #include "tao/AnyTypeCode/DynamicC.h"
6 #include "tao/PortableServer/Servant_Base.h"
9 #include "orbsvcs/FT_CORBA_ORBC.h"
12 #include "ace/Thread_Manager.h"
13 #include "ace/TMCast/Group.hpp"
15 #include "CrashPoint.h"
16 #include "StateUpdate.h"
17 #include "ReplicaController.h"
25 PortableInterceptor::SlotId state_slot_id_
;
28 PortableInterceptor::SlotId
31 return state_slot_id_
;
35 state_slot_id (PortableInterceptor::SlotId slot_id
)
37 state_slot_id_
= slot_id
;
45 CORBA::Any
* Checkpointable::
52 associate_state (CORBA::ORB_ptr orb
, CORBA::Any
const& state
)
56 CORBA::Object_var pic_obj
=
57 orb
->resolve_initial_references ("PICurrent");
60 PortableInterceptor::Current_var pic
=
61 PortableInterceptor::Current::_narrow (
65 pic
->set_slot (state_slot_id (), state
);
68 catch (const CORBA::Exception
& ex
)
70 ex
._tao_print_exception ("Caught exception:");
83 ReplicaController (CORBA::ORB_ptr orb
)
84 : orb_ (CORBA::ORB::_duplicate (orb
))
88 CORBA::Object_var poa_object
=
89 orb_
->resolve_initial_references ("RootPOA");
91 root_poa_
= PortableServer::POA::_narrow (
94 catch (const CORBA::Exception
& ex
)
96 ex
._tao_print_exception ("Caught exception:");
100 // Generate member id
101 ACE_Utils::UUID uuid
;
102 ACE_Utils::UUID_GENERATOR::instance ()->init ();
103 ACE_Utils::UUID_GENERATOR::instance ()->generate_UUID (uuid
);
105 ACE_INET_Addr
address (10000, "239.255.0.1");
107 ACE_DEBUG ((LM_DEBUG
, "Becoming a member with id %s\n",
108 uuid
.to_string ()->c_str ()));
110 ACE_auto_ptr_reset (group_
, new ACE_TMCast::Group (address
, uuid
.to_string ()->c_str ()));
112 int r
= ACE_Thread_Manager::instance ()->spawn (
113 &ReplicaController::listener_thunk
, this);
121 void ReplicaController::
126 for (char buffer
[1024];;)
128 size_t n
= group_
->recv (buffer
, sizeof (buffer
));
130 ACE_HEX_DUMP ((LM_DEBUG
, buffer
, n
));
132 TAO_InputCDR
cdr (buffer
, n
);
134 CORBA::OctetSeq object_id
;
135 PortableInterceptor::AdapterName adapter_name
;
136 CORBA::String_var client_id
;
137 CORBA::Long retention_id
;
138 CORBA::OctetSeq reply
;
143 cdr
>> client_id
.out ();
148 if (!cdr
.good_bit ())
150 ACE_DEBUG ((LM_DEBUG
, "CDR failed\n"));
154 ACE_DEBUG ((LM_DEBUG
,
155 "Received log for %s with rid %i\n",
160 RecordId
rid (client_id
.in (), retention_id
);
162 CORBA::OctetSeq_var
tmp (new CORBA::OctetSeq (reply
));
163 log_
.insert (rid
, tmp
);
166 CORBA::TypeCode_var tc
= state
.type ();
168 if (tc
->kind () != CORBA::tk_null
)
170 PortableServer::POA_var poa
= resolve_poa (adapter_name
);
172 PortableServer::ServantBase_var servant
=
173 poa
->id_to_servant (object_id
);
175 Checkpointable
* target
=
176 dynamic_cast<Checkpointable
*> (servant
.in ());
178 if (target
) target
->set_state (state
);
182 catch (ACE_TMCast::Group::Failed
const&)
184 ACE_DEBUG ((LM_DEBUG
,
185 "Group failure. Perhaps, I am alone in the group.\n"));
187 catch (ACE_TMCast::Group::InsufficienSpace
const&)
189 ACE_DEBUG ((LM_DEBUG
, "Group::InsufficienSpace\n"));
195 PortableServer::POA_ptr
ReplicaController::
196 resolve_poa (PortableInterceptor::AdapterName
const&)
198 //@@ Assume for now it's a root poa.
199 return PortableServer::POA::_duplicate (root_poa_
.in ());
203 ACE_THR_FUNC_RETURN
ReplicaController::
204 listener_thunk (void* p
)
206 ReplicaController
* obj
= reinterpret_cast<ReplicaController
*> (p
);
213 FT::FTRequestServiceContext
*
215 PortableInterceptor::ServerRequestInfo_ptr ri
);
218 #if TAO_HAS_EXTENDED_FT_INTERCEPTORS == 1
220 ReplicaController::tao_ft_interception_point (
221 PortableInterceptor::ServerRequestInfo_ptr ri
,
222 CORBA::OctetSeq_out ocs
)
224 FT::FTRequestServiceContext_var
ftr (
225 extract_context (ri
));
227 ACE_DEBUG ((LM_DEBUG
,
228 "(%P|%t) Received request from %C with rid %i\n",
229 ftr
->client_id
.in (),
232 // Check if this request is eligible for replay.
234 RecordId
rid (ftr
->client_id
.in (), ftr
->retention_id
);
236 if (log_
.contains (rid
))
238 ACE_DEBUG ((LM_DEBUG
,
239 "(%P|%t) Replaying reply for %C with rid %i\n",
240 ftr
->client_id
.in (),
243 CORBA::OctetSeq_var
copy (log_
.lookup (rid
)); // make a copy
251 #endif /*TAO_HAS_EXTENDED_FT_INTERCEPTORS*/
254 ReplicaController::send_reply (
255 PortableInterceptor::ServerRequestInfo_ptr ri
)
257 FT::FTRequestServiceContext_var
ftr (
258 extract_context (ri
));
261 ACE_DEBUG ((LM_DEBUG
,
262 "(%P|%t) Sending reply for %s with rid %i\n",
263 ftr
->client_id
.in (),
267 // Prepare reply for logging.
269 CORBA::Any_var result
=
273 result
->impl ()->marshal_value (cdr
);
275 Dynamic::ParameterList_var pl
=
278 CORBA::ULong len
= pl
->length ();
280 for (CORBA::ULong index
= 0; index
!= len
; ++index
)
282 //@@ No chance for PARAM_OUT
283 if ((*pl
)[index
].mode
== CORBA::PARAM_INOUT
)
285 (*pl
)[index
].argument
.impl ()->marshal_value (cdr
);
289 CORBA::OctetSeq_var reply
;
291 ACE_NEW (reply
.out (), CORBA::OctetSeq (cdr
.total_length ()));
293 reply
->length (cdr
.total_length ());
295 CORBA::Octet
* buf
= reply
->get_buffer ();
297 // @@ What if this throws an exception?? We don't have any way to
298 // check whether this succeeded
299 for (ACE_Message_Block
const* mb
= cdr
.begin ();
303 ACE_OS::memcpy (buf
, mb
->rd_ptr (), mb
->length ());
304 buf
+= mb
->length ();
307 // Logging the reply and state update.
310 // First send message to members.
313 // Extract state update.
315 CORBA::OctetSeq_var oid
= ri
->object_id ();
316 PortableInterceptor::AdapterName_var an
= ri
->adapter_name ();
318 CORBA::Any_var state
= ri
->get_slot (state_slot_id ());
320 CORBA::TypeCode_var tc
= state
->type ();
322 if (tc
->kind () == CORBA::tk_null
)
324 ACE_DEBUG ((LM_DEBUG
, "Slot update is void\n"));
326 PortableServer::POA_var poa
= resolve_poa (an
.in ());
328 PortableServer::ServantBase_var servant
=
329 poa
->id_to_servant (oid
.in ());
331 Checkpointable
* target
=
332 dynamic_cast<Checkpointable
*> (servant
.in ());
336 CORBA::Any_var tmp
= target
->get_state ();
338 if (tmp
.ptr () != 0) state
= tmp
._retn ();
346 cdr
<< ftr
->client_id
.in ();
347 cdr
<< ftr
->retention_id
;
351 size_t size
= cdr
.total_length ();
353 CORBA::OctetSeq_var msg
;
355 ACE_NEW (msg
.out (), CORBA::OctetSeq (size
));
360 CORBA::Octet
* buf
= msg
->get_buffer ();
362 for (ACE_Message_Block
const* mb
= cdr
.begin ();
366 ACE_OS::memcpy (buf
, mb
->rd_ptr (), mb
->length ());
367 buf
+= mb
->length ();
371 CORBA::Octet
* buf
= msg
->get_buffer ();
375 if (crash_point
== 1 && ftr
->retention_id
> 2) ACE_OS::exit (1);
383 group_
->send (buf
, size
);
384 ACE_DEBUG ((LM_DEBUG
, "Sent log record of length %i\n", size
));
387 catch (ACE_TMCast::Group::Aborted
const&)
389 ACE_DEBUG ((LM_DEBUG
, "Retrying to send log record.\n"));
393 catch (ACE_TMCast::Group::Failed
const&)
395 ACE_DEBUG ((LM_DEBUG
,
396 "Group failure. Perhaps, I am alone in the group.\n"));
401 // Now perform local logging.
403 RecordId
rid (ftr
->client_id
.in (), ftr
->retention_id
);
405 // This is slow but eh-safe ;-).
407 log_
.insert (rid
, reply
);
412 if (crash_point
== 2 && ftr
->retention_id
> 2) ACE_OS::exit (1);
418 FT::FTRequestServiceContext
*
419 extract_context (PortableInterceptor::ServerRequestInfo_ptr ri
)
421 IOP::ServiceContext_var svc
=
422 ri
->get_request_service_context (IOP::FT_REQUEST
);
424 TAO_InputCDR
cdr (reinterpret_cast<const char*> (svc
->context_data
.get_buffer ()),
425 svc
->context_data
.length ());
427 CORBA::Boolean byte_order
;
429 if ((cdr
>> ACE_InputCDR::to_boolean (byte_order
)) == 0)
432 throw CORBA::BAD_CONTEXT ();
435 cdr
.reset_byte_order (static_cast<int> (byte_order
));
437 // Funny, the following two lines should normally translate
438 // just to one ctor call. But because we have to use this
439 // ACE_NEW macro hackery we have a default ctor call plus
440 // assignment operator call. Yet another example how the
441 // majority is being penalized by some broken platforms.
443 FT::FTRequestServiceContext_var req
;
445 //@@ completed status maybe wrong
447 ACE_NEW_THROW_EX (req
,
448 FT::FTRequestServiceContext
,
450 CORBA::SystemException::_tao_minor_code (
453 CORBA::COMPLETED_NO
));
457 if (!cdr
.good_bit ())
460 throw CORBA::UNKNOWN ();
469 ReplicaController::name (void)
471 return CORBA::string_dup ("ReplicaController");
475 ReplicaController::send_exception (
476 PortableInterceptor::ServerRequestInfo_ptr
)
481 ReplicaController::send_other (
482 PortableInterceptor::ServerRequestInfo_ptr
)
487 ReplicaController::destroy (void)
492 ReplicaController::receive_request_service_contexts (
493 PortableInterceptor::ServerRequestInfo_ptr
)
499 ReplicaController::receive_request (
500 PortableInterceptor::ServerRequestInfo_ptr
)