2 //=============================================================================
4 * @file FT_Notifier_i.cpp
6 * This file is part of Fault Tolerant CORBA.
8 * @author Dale Wilson <wilson_d@ociweb.com>
10 //=============================================================================
12 #include "orbsvcs/Log_Macros.h"
13 #include "FT_Notifier_i.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/OS_NS_stdio.h"
17 #include "ace/OS_NS_unistd.h"
18 #include "tao/debug.h"
20 // Use this macro at the beginning of CORBA methods
21 // to aid in debugging.
22 #define METHOD_ENTRY(name) \
23 if (TAO_debug_level > 6) \
25 ORBSVCS_DEBUG (( LM_DEBUG, \
30 // Use this macro to return from CORBA methods
31 // to aid in debugging. Note that you can specify
32 // the return value after the macro, for example:
33 // METHOD_RETURN(Plugh::plover) xyzzy; is equivalent
35 // METHOD_RETURN(Plugh::troll); is equivalent to
37 // WARNING: THIS GENERATES TWO STATEMENTS!!! THE FOLLOWING
38 // will not do what you want it to:
39 // if (cave_is_closing) METHOD_RETURN(Plugh::pirate) aarrggh;
40 // Moral: Always use braces.
41 #define METHOD_RETURN(name) \
42 if (TAO_debug_level > 6) \
44 ORBSVCS_DEBUG (( LM_DEBUG, \
48 return /* value goes here */
51 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
53 // Implementation skeleton constructor
54 TAO::FT_FaultNotifier_i::FT_FaultNotifier_i ()
63 , replication_manager_ (0)
67 , consumer_connects_(0)
68 , consumer_disconnects_(0)
73 , structured_proxy_push_consumer_ (0)
74 , sequence_proxy_push_consumer_ (0)
81 // Implementation skeleton destructor
82 TAO::FT_FaultNotifier_i::~FT_FaultNotifier_i ()
94 int TAO::FT_FaultNotifier_i::idle(int &result
)
96 static unsigned long linger
= 0;
97 ACE_UNUSED_ARG(result
);
102 ORBSVCS_ERROR ((LM_ERROR
,
103 "FaultNotifier (%P|%t) Begin linger.\n"
106 if(++linger
> 5)//10)
108 ORBSVCS_ERROR ((LM_ERROR
,
109 "FaultNotifier (%P|%t) idle returnning gone\n"
120 ////////////////////////////////////////////
121 // FT_FaultNotifier_i private methods
124 // TODO: find this a common home
125 int TAO::FT_FaultNotifier_i::write_ior()
128 FILE* out
= ACE_OS::fopen (this->ior_output_file_
, "w");
131 ACE_OS::fprintf (out
, "%s", this->ior_
.in ());
132 ACE_OS::fclose (out
);
137 ORBSVCS_ERROR ((LM_ERROR
,
138 "%T %n (%P|%t) Open failed for %s\n", ior_output_file_
144 //////////////////////////////////////////////////////
145 // FT_FaultNotifier_i public, non-CORBA methods
147 int TAO::FT_FaultNotifier_i::parse_args (int argc
, ACE_TCHAR
* argv
[])
149 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("o:rq"));
152 while ((c
= get_opts ()) != -1)
158 this->ior_output_file_
= get_opts
.opt_arg ();
163 this->rm_register_
= ! this->rm_register_
;
168 this->quit_on_idle_
= 1;
175 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
178 " -r disable registration with ReplicationManager"
187 // Indicates successful parsing of the command line
191 const char * TAO::FT_FaultNotifier_i::identity () const
193 return this->identity_
.c_str();
196 PortableServer::POA_ptr
TAO::FT_FaultNotifier_i::_default_POA (void)
198 return this->poa_
.in();
202 void TAO::FT_FaultNotifier_i::_remove_ref (void)
204 notify_channel_
->destroy();
206 ORBSVCS_ERROR ((LM_ERROR
,
207 "FaultNotifier (%P|%t) _remove_ref setting gone\n"
212 int TAO::FT_FaultNotifier_i::fini (void)
214 if (this->ior_output_file_
!= 0)
216 ACE_OS::unlink (this->ior_output_file_
);
217 this->ior_output_file_
= 0;
219 if (this->ns_name_
!= 0 && this->naming_context_
.in() != 0)
221 this->naming_context_
->unbind (this_name_
);
225 if (this->registered_
)
229 this->replication_manager_
->register_fault_notifier(::FT::FaultNotifier::_nil ());
230 ORBSVCS_DEBUG ((LM_DEBUG
,
231 "FaultNotifier unregistered from ReplicationManager.\n"
234 catch (const CORBA::Exception
&)
236 ORBSVCS_DEBUG ((LM_DEBUG
,
237 "FaultNotifier Can't unregister from ReplicationManager.\n"
239 // complain, but otherwise ignore this error
243 this->registered_
= 0;
248 int TAO::FT_FaultNotifier_i::init (CORBA::ORB_ptr orb
)
251 this->orb_
= CORBA::ORB::_duplicate (orb
);
253 // Use the ROOT POA for now
254 CORBA::Object_var poa_object
=
255 this->orb_
->resolve_initial_references (TAO_OBJID_ROOTPOA
);
257 if (CORBA::is_nil (poa_object
.in ()))
258 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
259 ACE_TEXT (" (%P|%t) Unable to initialize the POA.\n")),
262 // Get the POA object.
264 PortableServer::POA::_narrow (poa_object
.in ());
267 if (CORBA::is_nil(this->poa_
.in ()))
269 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
270 ACE_TEXT (" (%P|%t) Unable to narrow the POA.\n")),
274 PortableServer::POAManager_var poa_manager
=
275 this->poa_
->the_POAManager ();
277 poa_manager
->activate ();
279 // Register with the POA.
281 this->object_id_
= this->poa_
->activate_object (this);
285 CORBA::Object_var this_obj
=
286 this->poa_
->id_to_reference (object_id_
.in ());
288 this->ior_
= this->orb_
->object_to_string (this_obj
.in ());
291 ////////////////////////////////////////////////
292 // Register with coresident Notification Channel
293 CosNotifyChannelAdmin::EventChannelFactory_var notify_factory
=
294 TAO_Notify_EventChannelFactory_i::create (poa_
.in ());
295 CosNotification::QoSProperties initial_qos
;
296 CosNotification::AdminProperties initial_admin
;
297 this->notify_channel_
=
298 notify_factory
->create_channel (initial_qos
,
302 this->filter_factory_
= this->notify_channel_
->default_filter_factory ();
304 ///////////////////////////
305 // Producer registration
307 this->supplier_admin_
= this->notify_channel_
->default_supplier_admin ();
309 ::CosNotifyChannelAdmin::ProxyID proxyId
= 0;
311 //////////////////////
312 // structured producer
313 ::CosNotifyChannelAdmin::ProxyConsumer_var consumer
314 = this->supplier_admin_
->obtain_notification_push_consumer (
315 ::CosNotifyChannelAdmin::STRUCTURED_EVENT
,
318 structured_proxy_push_consumer_
319 = ::CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(consumer
.in ());
320 if (CORBA::is_nil (this->structured_proxy_push_consumer_
.in ()))
322 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
323 "%T %n (%P|%t) Should not occur: Unable to narrow Structured Proxy Push Consumer\n"),
327 // todo: implement a push supplier if we want to receive disconnect notice
328 CosNotifyComm::StructuredPushSupplier_var stubPushSupplier
=
329 CosNotifyComm::StructuredPushSupplier::_nil();
331 this->structured_proxy_push_consumer_
->connect_structured_push_supplier (stubPushSupplier
.in());
336 = this->supplier_admin_
->obtain_notification_push_consumer (
337 ::CosNotifyChannelAdmin::SEQUENCE_EVENT
,
340 this->sequence_proxy_push_consumer_
341 = ::CosNotifyChannelAdmin::SequenceProxyPushConsumer::_narrow(consumer
.in ());
342 if (CORBA::is_nil (this->sequence_proxy_push_consumer_
.in ()))
344 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
345 "%T %n (%P|%t) Should not occur: Unable to narrow Sequence Proxy Push Consumer\n"),
349 // todo: implement this if we want to receive disconnect notice
350 CosNotifyComm::SequencePushSupplier_var stubSeqPushSupplier
=
351 CosNotifyComm::SequencePushSupplier::_nil();
353 this->sequence_proxy_push_consumer_
->connect_sequence_push_supplier (stubSeqPushSupplier
.in());
354 ///////////////////////////
355 // Consumer registration
357 // find the channel administrator for consumers
358 this->consumer_admin_
= this->notify_channel_
->default_consumer_admin ();
359 if (CORBA::is_nil (this->consumer_admin_
.in ()))
361 ORBSVCS_ERROR ((LM_ERROR
,
362 "%T %n (%P|%t) NIL consumer admin\n"
366 // everything else happens when subscriber shows up
368 ///////////////////////////////
369 // Register with ReplicationManager
370 if (this->rm_register_
)
374 CORBA::Object_var rm_obj
= orb
->resolve_initial_references("ReplicationManager");
375 this->replication_manager_
= ::FT::ReplicationManager::_narrow(rm_obj
.in());
376 if (!CORBA::is_nil (replication_manager_
.in ()))
378 // @@: should we check to see if there's already one registered?
379 FT::FaultNotifier_var notifier
= FT::FaultNotifier::_narrow (this_obj
.in ());
380 if (! CORBA::is_nil (notifier
.in ()))
382 this->replication_manager_
->register_fault_notifier(notifier
.in ());
383 ORBSVCS_DEBUG ((LM_DEBUG
,
384 "FaultNotifier registered with ReplicationManager.\n"
386 this->registered_
= 1;
390 ORBSVCS_ERROR ((LM_ERROR
,
391 "Error: Registration failed. This is not a FaultNotifier (should not occur.)\n"
397 ORBSVCS_ERROR ((LM_ERROR
,"FaultNotifier: Can't resolve ReplicationManager, It will not be registered.\n" ));
400 catch (const CORBA::Exception
& ex
)
402 ex
._tao_print_exception (
403 "FaultNotifier: Exception resolving ReplicationManager. Notifier will not be registered.\n");
408 ORBSVCS_DEBUG ((LM_DEBUG
,
409 "FaultNotifier: ReplicationManager registration disabled.\n"
412 ///////////////////////////////
413 // Set up and ready for action
418 if (this->ior_output_file_
!= 0)
420 this->identity_
= "file:";
421 this->identity_
+= ACE_TEXT_ALWAYS_CHAR(this->ior_output_file_
);
422 result
= write_ior();
428 if (this->ns_name_
!= 0)
430 this->identity_
= "name:";
431 this->identity_
+= this->ns_name_
;
433 CORBA::Object_var naming_obj
=
434 this->orb_
->resolve_initial_references ("NameService");
436 if (CORBA::is_nil(naming_obj
.in ())){
437 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
438 "%T %n (%P|%t) Unable to find the Naming Service\n"),
442 this->naming_context_
=
443 CosNaming::NamingContext::_narrow (naming_obj
.in ());
444 if (CORBA::is_nil(this->naming_context_
.in ()))
446 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
447 "%T %n (%P|%t) Should not occur: Can't narrow initial reference to naming context.\n"),
450 this->this_name_
.length (1);
451 this->this_name_
[0].id
= CORBA::string_dup (this->ns_name_
);
453 this->naming_context_
->rebind (this->this_name_
, this_obj
.in());
463 void TAO::FT_FaultNotifier_i::push_structured_fault (
464 const CosNotification::StructuredEvent
& event
467 METHOD_ENTRY(TAO::FT_FaultNotifier_i::push_structured_fault
);
469 this->structured_proxy_push_consumer_
->push_structured_event (event
);
471 METHOD_RETURN(TAO::FT_FaultNotifier_i::push_structured_fault
);
474 void TAO::FT_FaultNotifier_i::push_sequence_fault (
475 const CosNotification::EventBatch
& events
478 METHOD_ENTRY(TAO::FT_FaultNotifier_i::push_sequence_fault
);
480 this->sequence_proxy_push_consumer_
->push_structured_events (events
);
482 METHOD_RETURN(TAO::FT_FaultNotifier_i::push_sequence_fault
);
485 ::CosNotifyFilter::Filter_ptr
TAO::FT_FaultNotifier_i::create_subscription_filter (
486 const char * constraint_grammar
489 METHOD_ENTRY(TAO::FT_FaultNotifier_i::create_subscription_filter
);
490 ACE_UNUSED_ARG (constraint_grammar
); //@@todo
492 CosNotifyFilter::Filter_var filter
= this->filter_factory_
->create_filter ("ETCL");
493 METHOD_RETURN(TAO::FT_FaultNotifier_i::create_subscription_filter
)
497 FT::FaultNotifier::ConsumerId
TAO::FT_FaultNotifier_i::connect_structured_fault_consumer (
498 CosNotifyComm::StructuredPushConsumer_ptr push_consumer
,
499 CosNotifyFilter::Filter_ptr filter
502 METHOD_ENTRY(TAO::FT_FaultNotifier_i::connect_structured_fault_consumer
);
504 /////////////////////////
505 // find a ProxyInfo entry
506 // use the first nil entry or a new entry if no nils found
510 for ( size_t pos
= 0; looking
&& pos
< this->proxy_infos_
.size (); ++pos
)
512 ProxyInfo
& pi
= this->proxy_infos_
[pos
];
513 if (CORBA::is_nil(pi
.proxyVar_
.in ()))
521 infoPos
= this->proxy_infos_
.size();
522 this->proxy_infos_
.push_back(ProxyInfo());
525 ///////////////////////////////////////
526 // Assign an ID, populate the ProxyInfo
527 FT::FaultNotifier::ConsumerId result
= infoPos
;
528 ProxyInfo
& info
= this->proxy_infos_
[infoPos
];
530 = this->consumer_admin_
->obtain_notification_push_supplier (
531 ::CosNotifyChannelAdmin::STRUCTURED_EVENT
,
534 this->consumer_connects_
+= 1;
536 ::CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier
537 = ::CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(info
.proxyVar_
.in ());
539 if ( CORBA::is_nil (proxySupplier
.in ()))
541 // this is a shoould-not-occur situation. The consumer admin returned
542 // the wrong kind of object.
543 ORBSVCS_ERROR(( LM_ERROR
,
544 "%T %n (%P|%t) Unexpected result: Wrong type of object returned from obtain_notification_push_supplier\n"
549 proxySupplier
->connect_structured_push_consumer ( push_consumer
);
551 if (! CORBA::is_nil (filter
))
553 proxySupplier
->add_filter(filter
);
557 METHOD_RETURN(TAO::FT_FaultNotifier_i::connect_structured_fault_consumer
) result
;
560 FT::FaultNotifier::ConsumerId
TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer (
561 CosNotifyComm::SequencePushConsumer_ptr push_consumer
,
562 CosNotifyFilter::Filter_ptr filter
565 METHOD_ENTRY(TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer
);
566 /////////////////////////
567 // find a ProxyInfo entry
568 // use the first nil entry or a new entry if no nils found
572 for ( size_t pos
= 0; looking
&& pos
< this->proxy_infos_
.size (); ++pos
)
574 ProxyInfo
& pi
= this->proxy_infos_
[pos
];
575 if (CORBA::is_nil(pi
.proxyVar_
.in ()))
583 infoPos
= this->proxy_infos_
.size();
584 this->proxy_infos_
.push_back(ProxyInfo());
587 ///////////////////////////////////////
588 // Assign an ID, populate the ProxyInfo
589 FT::FaultNotifier::ConsumerId result
= infoPos
;
590 ProxyInfo
& info
= this->proxy_infos_
[infoPos
];
592 = this->consumer_admin_
->obtain_notification_push_supplier (
593 ::CosNotifyChannelAdmin::SEQUENCE_EVENT
,
596 this->consumer_connects_
+= 1;
598 ::CosNotifyChannelAdmin::SequenceProxyPushSupplier_var proxySupplier
599 = ::CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow(info
.proxyVar_
.in ());
600 if ( CORBA::is_nil (proxySupplier
.in ()))
602 // this is a shoould-not-occur situation. The consumer admin returned
603 // the wrong kind of object.
604 ORBSVCS_ERROR(( LM_ERROR
,
605 "%T %n (%P|%t) Unexpected result: Wrong type of object returned from obtain_notification_push_supplier\n"
610 proxySupplier
->connect_sequence_push_consumer ( push_consumer
);
612 if (! CORBA::is_nil (filter
))
614 proxySupplier
->add_filter(filter
);
617 METHOD_RETURN(TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer
) result
;
620 void TAO::FT_FaultNotifier_i::disconnect_consumer (
621 FT::FaultNotifier::ConsumerId connection
624 METHOD_ENTRY(TAO::FT_FaultNotifier_i::disconnect_consumer
);
626 size_t index
= static_cast<size_t> (connection
);
627 if (index
< this->proxy_infos_
.size())
629 ProxyInfo
& info
= this->proxy_infos_
[index
];
630 if (CORBA::is_nil(info
.proxyVar_
.in ()) )
632 throw CosEventComm::Disconnected();
636 ::CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier
637 = ::CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(info
.proxyVar_
.in ());
638 if (! CORBA::is_nil (proxySupplier
.in ()))
640 proxySupplier
->disconnect_structured_push_supplier ();
641 info
.proxyVar_
= ::CosNotifyChannelAdmin::ProxySupplier::_nil();
645 ::CosNotifyChannelAdmin::SequenceProxyPushSupplier_var proxySupplier
646 = ::CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow(info
.proxyVar_
.in ());
647 if (! CORBA::is_nil (proxySupplier
.in ()))
649 proxySupplier
->disconnect_sequence_push_supplier ();
650 info
.proxyVar_
= ::CosNotifyChannelAdmin::ProxySupplier::_nil();
654 ORBSVCS_ERROR((LM_ERROR
,
655 "%T %n (%P|%t) Unexpected proxy supplier type\n"
657 throw CosEventComm::Disconnected();
664 throw CosEventComm::Disconnected();
667 this->consumer_disconnects_
+= 1;
668 if (this->quit_on_idle_
)
670 if (! this->quitting_
671 && this->consumer_connects_
== this->consumer_disconnects_
)
673 ORBSVCS_ERROR((LM_ERROR
,
674 "FaultNotifier (%P|%t) quit on idle: connects %d, disconnects %d\n",
675 static_cast<unsigned int> (this->consumer_connects_
),
676 static_cast<unsigned int> (this->consumer_disconnects_
)
678 this->poa_
->deactivate_object (this->object_id_
.in ());
683 METHOD_RETURN(TAO::FT_FaultNotifier_i::disconnect_consumer
);
686 CORBA::Boolean
TAO::FT_FaultNotifier_i::is_alive (void)
688 METHOD_RETURN(TAO::FT_FaultNotifier_i::is_alive
) 1;
694 TAO::FT_FaultNotifier_i::ProxyInfo::ProxyInfo ()
696 , proxyVar_ (::CosNotifyChannelAdmin::ProxySupplier::_nil())
700 TAO::FT_FaultNotifier_i::ProxyInfo::ProxyInfo (const ProxyInfo
& rhs
)
701 : proxyId_ (rhs
.proxyId_
)
702 , proxyVar_ (rhs
.proxyVar_
)
706 TAO_END_VERSIONED_NAMESPACE_DECL