2 #include "orbsvcs/CosNotifyChannelAdminC.h"
4 #include "tao/PortableServer/PortableServerC.h"
5 #include "tao/TimeBaseC.h"
6 #include "ace/OS_NS_stdio.h"
7 #include "ace/OS_NS_strings.h"
10 static const char NOTIFY_FACTORY_NAME
[] = "NotifyEventChannelFactory";
11 static const char NAMING_SERVICE_NAME
[] = "NameService";
13 ///////////////////////////
14 // StructuredPushSupplier_i
17 StructuredPushSupplier_i::subscription_change (
18 const CosNotification::EventTypeSeq
& added
,
19 const CosNotification::EventTypeSeq
& removed
)
21 ACE_UNUSED_ARG (added
);
22 ACE_UNUSED_ARG (removed
);
24 ACE_TEXT ("(%P,%t) Supplier StructuredPushSupplier received subscription change\n")
30 StructuredPushSupplier_i::disconnect_structured_push_supplier ()
33 ACE_TEXT ("(%P,%t) Supplier StructuredPushSupplier received disconnect\n")
37 ///////////////////////////
38 // SequencePushSupplier_i
41 SequencePushSupplier_i::subscription_change (
42 const CosNotification::EventTypeSeq
& added
,
43 const CosNotification::EventTypeSeq
& removed
)
45 ACE_UNUSED_ARG (added
);
46 ACE_UNUSED_ARG (removed
);
48 ACE_TEXT ("(%P,%t) Supplier SequencePushSupplier received subscription change\n")
53 SequencePushSupplier_i::disconnect_sequence_push_supplier ()
56 ACE_TEXT ("(%P,%t) Supplier SequencePushSupplier received disconnect\n")
60 ///////////////////////////
64 AnyPushSupplier_i::subscription_change (
65 const CosNotification::EventTypeSeq
& added
,
66 const CosNotification::EventTypeSeq
& removed
)
68 ACE_UNUSED_ARG (added
);
69 ACE_UNUSED_ARG (removed
);
71 ACE_TEXT ("(%P,%t) Supplier AnyPushSupplier received subscription change\n")
77 AnyPushSupplier_i::disconnect_push_supplier ()
80 ACE_TEXT ("(%P,%t) Suppleir AnyPushSupplier received disconnect\n")
84 /////////////////////////
85 // ReconnectionCallback_i
87 ReconnectionCallback_i::ReconnectionCallback_i (Supplier_Main
& supplier_main
)
88 : supplier_main_ (supplier_main
)
89 , id_is_valid_ (false)
90 , reconnect_count_ (0)
95 ReconnectionCallback_i::reconnect_count () const
97 return this->reconnect_count_
;
101 ReconnectionCallback_i::reconnect (CORBA::Object_ptr reconnection
)
103 ACE_DEBUG ((LM_DEBUG
,
104 ACE_TEXT ("(%P|%t) Supplier received reconnection request\n")
106 this->ecf_
= CosNotifyChannelAdmin::EventChannelFactory::_narrow (reconnection
);
107 if (!CORBA::is_nil (this->ecf_
.in ()))
109 this->supplier_main_
.reconnect (this->ecf_
.in ());
110 this->reconnect_count_
+= 1;
114 ACE_DEBUG ((LM_DEBUG
,
115 ACE_TEXT ("(%P|%t) Supplier reconnection request failed: wrong object type\n")
121 ReconnectionCallback_i::is_alive ()
126 ReconnectionCallback_i::~ReconnectionCallback_i ()
128 // normally you would disconnect from registry here, but
129 // to simulate a failure, we exit without cleaning up
130 // if the fini method is not called.
135 ReconnectionCallback_i::fini ()
137 if (this->id_is_valid_
)
139 NotifyExt::ReconnectionRegistry_var registry
=
140 NotifyExt::ReconnectionRegistry::_narrow (this->ecf_
.in ());
142 registry
->unregister_callback (this->callback_id_
);
143 this->id_is_valid_
= false;
148 ReconnectionCallback_i::init (
149 PortableServer::POA_ptr poa
,
150 CosNotifyChannelAdmin::EventChannelFactory_ptr ecf
)
152 this->ecf_
= CosNotifyChannelAdmin::EventChannelFactory::_duplicate (ecf
);
153 PortableServer::ObjectId_var reconnection_callback_id
=
154 poa
->activate_object (this);
156 CORBA::Object_var obj
=
157 poa
->id_to_reference (reconnection_callback_id
.in ());
159 NotifyExt::ReconnectionCallback_var callback
=
160 NotifyExt::ReconnectionCallback::_narrow (obj
.in ());
162 NotifyExt::ReconnectionRegistry_var registry
=
163 NotifyExt::ReconnectionRegistry::_narrow (ecf
);
165 this->callback_id_
= registry
->register_callback (callback
.in ());
166 this->id_is_valid_
= true;
173 Supplier_Main::Supplier_Main ()
177 , use_naming_service_ (true)
179 , disconnect_on_exit_ (false)
180 , id_file_ (ACE_TEXT ("supplier.ids"))
184 , structured_proxy_id_(0)
185 , sequence_proxy_id_(0)
187 , reconnection_callback_ (*this)
188 , reconnecting_ (false)
192 Supplier_Main::~Supplier_Main ()
197 Supplier_Main::parse_args (int argc
, ACE_TCHAR
*argv
[])
201 bool corba_arg
= false;
202 while (narg
< argc
&& result
== 0)
204 int consumed
= parse_single_arg (argc
- narg
, &argv
[narg
]);
210 else if (ACE_OS::strncmp (argv
[narg
], ACE_TEXT("-ORB"), 4) == 0)
216 // previous argument was a ORB arg.
217 // current argument is unrecognized
218 // assume the ORB eats this arg
224 ACE_OS::fprintf(stderr
, "Unrecognized argument: %s\n",
225 ACE_TEXT_ALWAYS_CHAR (argv
[narg
]));
234 Supplier_Main::parse_single_arg (int argc
, ACE_TCHAR
*argv
[])
237 if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-v")) == 0)
239 this->verbose_
= true;
242 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-any")) == 0)
244 this->mode_
= MODE_ANY
;
247 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-structured")) == 0)
249 this->mode_
= MODE_STRUCTURED
;
252 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-sequence")) == 0)
254 this->mode_
= MODE_SEQUENCE
;
257 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-channel")) == 0)
259 this->channel_file_
= argv
[1];
262 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-send")) == 0 && argc
> 1)
264 this->send_
= ACE_OS::atoi (argv
[1]);
267 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-pause")) == 0 && argc
> 1)
269 this->pause_
= ACE_OS::atoi (argv
[1]);
272 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-serial_number")) == 0)
274 this->serial_number_
= ACE_OS::atoi (argv
[1]);
277 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-nonamesvc")) == 0)
279 this->use_naming_service_
= false;
282 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT("-disconnect")) == 0)
284 this->disconnect_on_exit_
= true;
291 void Supplier_Main::usage(FILE * log
)const
293 //FUZZ: disable check_for_lack_ACE_OS
296 ACE_TEXT (" -channel filename Where to find a channel number.\n")
297 ACE_TEXT (" -any or -structured or -sequence\n")
298 ACE_TEXT (" What type of event to send (pick one, default is -any)\n")
299 ACE_TEXT (" -send n How many events of each type to send.\n")
300 ACE_TEXT (" -pause n Pause after sending n events. Write to file \"Supplier.paused\"\n")
301 ACE_TEXT (" -serial_number n What serial number to start with.\n")
302 ACE_TEXT (" -v Verbose output.\n")
303 ACE_TEXT (" -disconnect Disconnect from channel on exit (prevents reconnect.)\n")
304 ACE_TEXT (" -nonamesvc Don't use the name service to find EventChannelFactory\n")
306 //FUZZ: enable check_for_lack_ACE_OS
309 int Supplier_Main::init (int argc
, ACE_TCHAR
*argv
[])
311 this->orb_
= CORBA::ORB_init (argc
, argv
);
313 if (0 != this->parse_args (argc
, argv
))
318 CORBA::Object_ptr poa_object
=
319 this->orb_
->resolve_initial_references("RootPOA");
321 if (CORBA::is_nil (poa_object
))
323 ACE_ERROR ((LM_ERROR
,
324 ACE_TEXT (" (%P|%t) Unable to initialize the POA.\n")));
329 PortableServer::POA::_narrow (poa_object
);
331 PortableServer::POAManager_var poa_manager
=
332 root_poa_
->the_POAManager ();
334 poa_manager
->activate ();
336 if (this->use_naming_service_
)
338 this->find_notify_factory ();
342 int ok
= resolve_notify_factory ();
349 this->reconnecting_
= load_ids ();
351 init_event_channel ();
353 init_supplier_admin ();
357 case MODE_STRUCTURED
:
359 init_structured_proxy_consumer ();
364 init_sequence_proxy_consumer ();
369 init_any_proxy_consumer ();
374 ACE_ERROR ((LM_ERROR
,
375 ACE_TEXT ("(%P|%t) Supplier: Unknown event push mode.\n")
380 this->reconnection_callback_
.init (
381 this->root_poa_
.in (),
388 Supplier_Main::save_ids()
391 ACE_OS::fopen (this->id_file_
.c_str (), "w");
396 int imode
= static_cast<int>(this->mode_
);
397 ACE_OS::fprintf (idf
,
398 "%d,%d,%d,%d,%d,%d,%d,\n",
399 static_cast<int> (imode
),
400 static_cast<int> (ec_id_
),
401 static_cast<int> (sa_id_
),
402 static_cast<int> (structured_proxy_id_
),
403 static_cast<int> (sequence_proxy_id_
),
404 static_cast<int> (any_proxy_id_
),
405 static_cast<int> (endflag
));
406 ACE_OS::fclose (idf
);
411 Supplier_Main::load_ids()
415 ACE_OS::fopen (this->id_file_
.c_str (), "r");
421 char buffer
[100] = ""; // because ACE fgets doesn't put a null if the file is empty
422 ACE_OS::fgets (buffer
, sizeof(buffer
), idf
);
423 ACE_OS::fclose (idf
);
425 while (!ok
&& *pb
!= 0)
427 char * eb
= ACE_OS::strchr (pb
, ',');
431 eb
= pb
+ ACE_OS::strlen (pb
);
437 int value
= ACE_OS::atoi(pb
);
441 this->mode_
= static_cast<Mode_T
> (value
);
444 this->ec_id_
= value
;
447 this->sa_id_
= value
;
450 this->structured_proxy_id_
= value
;
453 this->sequence_proxy_id_
= value
;
456 this->any_proxy_id_
= value
;
462 ACE_OS::fprintf (stderr
, ACE_TEXT ("Supplier: Warning: too many fields in saved id file.\n"));
474 Supplier_Main::reconnect (
475 CosNotifyChannelAdmin::EventChannelFactory_ptr dest_factory
)
477 this->ecf_
= CosNotifyChannelAdmin::EventChannelFactory::_duplicate (dest_factory
);
478 this->reconnecting_
= true;
479 init_event_channel ();
481 init_supplier_admin ();
485 case MODE_STRUCTURED
:
487 init_structured_proxy_consumer ();
492 init_sequence_proxy_consumer ();
497 init_any_proxy_consumer ();
505 Supplier_Main::resolve_naming_service ()
507 // ignore redundant calls
508 if (CORBA::is_nil (this->naming_context_
.in ()))
510 CORBA::Object_var naming_obj
=
511 this->orb_
->resolve_initial_references (NAMING_SERVICE_NAME
);
513 this->naming_context_
=
514 CosNaming::NamingContext::_narrow (naming_obj
.in ());
517 return !CORBA::is_nil (this->naming_context_
.in ());
521 Supplier_Main::find_notify_factory ()
523 int status
= this->resolve_naming_service ();
526 CosNaming::Name
name (1);
528 name
[0].id
= CORBA::string_dup (NOTIFY_FACTORY_NAME
);
530 CORBA::Object_var obj
=
531 this->naming_context_
->resolve (name
);
534 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj
.in ());
536 return ! CORBA::is_nil (this->ecf_
.in ());
540 Supplier_Main::resolve_notify_factory ()
542 CORBA::Object_var factory_obj
=
543 this->orb_
->resolve_initial_references (NOTIFY_FACTORY_NAME
);
546 CosNotifyChannelAdmin::EventChannelFactory::_narrow (
548 return ! CORBA::is_nil (this->ecf_
.in ());
552 Supplier_Main::init_event_channel ()
555 if (this->reconnecting_
)
559 this->ec_
= this->ecf_
->get_event_channel (
561 ok
= ! CORBA::is_nil (this->ec_
.in ());
562 if (ok
&& this->verbose_
)
564 ACE_DEBUG ((LM_DEBUG
,
565 ACE_TEXT ("(%P|%t) Supplier: Reconnect to event channel %d\n"),
566 static_cast<int>(this->ec_id_
)
575 // if we don't have a channel yet, and a channel id file was specified
576 // try to read from it
577 if (!ok
&& this->channel_file_
.length () > 0)
579 FILE * chf
= ACE_OS::fopen (this->channel_file_
.c_str (), "r");
583 ACE_OS::fgets (buffer
, sizeof(buffer
), chf
);
584 ACE_OS::fclose (chf
);
585 this->ec_id_
= ACE_OS::atoi (buffer
);
589 this->ec_
= this->ecf_
->get_event_channel (
591 ok
= ! CORBA::is_nil (this->ec_
.in ());
596 ACE_DEBUG ((LM_DEBUG
,
597 ACE_TEXT ("(%P|%t) Supplier: Connect to Existing event channel %d\n"),
598 static_cast<int>(this->ec_id_
)
601 // kill the channel filename so we don't overwrite the file
602 this->channel_file_
= ACE_TEXT("");
613 CosNotification::QoSProperties
qosprops (7);
616 qosprops
[i
].name
= CORBA::string_dup(CosNotification::EventReliability
);
617 qosprops
[i
++].value
<<= CosNotification::Persistent
;
618 qosprops
[i
].name
= CORBA::string_dup(CosNotification::ConnectionReliability
);
619 qosprops
[i
++].value
<<= CosNotification::Persistent
; // Required, or we won't persist much
620 qosprops
[i
].name
= CORBA::string_dup(CosNotification::Priority
);
621 qosprops
[i
++].value
<<= CosNotification::HighestPriority
;
622 qosprops
[i
].name
= CORBA::string_dup(CosNotification::Timeout
);
623 qosprops
[i
++].value
<<= (TimeBase::TimeT
) 42 * 1000000; // 4.2s
624 qosprops
[i
].name
= CORBA::string_dup(CosNotification::StopTimeSupported
);
625 qosprops
[i
++].value
<<= CORBA::Any::from_boolean(1);
626 qosprops
[i
].name
= CORBA::string_dup(CosNotification::MaximumBatchSize
);
627 qosprops
[i
++].value
<<= (CORBA::Long
) 2;
628 qosprops
[i
].name
= CORBA::string_dup(CosNotification::PacingInterval
);
629 qosprops
[i
++].value
<<= (TimeBase::TimeT
) 50 * 10000; // 50ms
631 CosNotification::AdminProperties
adminprops(4);
632 adminprops
.length (4);
634 adminprops
[i
].name
= CORBA::string_dup(CosNotification::MaxQueueLength
);
635 adminprops
[i
++].value
<<= (CORBA::Long
) 1234;
636 adminprops
[i
].name
= CORBA::string_dup(CosNotification::MaxConsumers
);
637 adminprops
[i
++].value
<<= (CORBA::Long
) 1000;
638 adminprops
[i
].name
= CORBA::string_dup(CosNotification::MaxSuppliers
);
639 adminprops
[i
++].value
<<= (CORBA::Long
) 1000;
640 adminprops
[i
].name
= CORBA::string_dup(CosNotification::RejectNewEvents
);
641 adminprops
[i
++].value
<<= CORBA::Any::from_boolean(1);
643 ec_
= this->ecf_
->create_channel (
647 ok
= ! CORBA::is_nil (ec_
.in ());
648 if (ok
&& this->verbose_
)
650 ACE_DEBUG ((LM_DEBUG
,
651 ACE_TEXT ("(%P|%t) Supplier: Create event channel %d\n"),
652 static_cast<int> (this->ec_id_
)
658 if (ok
&& this->channel_file_
.length() > 0)
660 FILE * chf
= ACE_OS::fopen (this->channel_file_
.c_str (), "w");
663 ACE_OS::fprintf (chf
, "%d\n", static_cast<int> (this->ec_id_
));
664 ACE_OS::fclose (chf
);
669 CosNotifyChannelAdmin::AdminID default_admin_id
= static_cast<CosNotifyChannelAdmin::AdminID
>(-1);
672 Supplier_Main::init_supplier_admin ()
675 if (this->reconnecting_
&& this->sa_id_
!= default_admin_id
)
679 this->sa_
= this->ec_
->get_supplieradmin(
681 ok
= ! CORBA::is_nil (this->sa_
.in ());
682 if (ok
&& this->verbose_
)
684 ACE_DEBUG ((LM_DEBUG
,
685 ACE_TEXT ("(%P|%t) Supplier: Reconnect to supplier admin %d\n"),
686 static_cast<int>(this->sa_id_
)
699 this->sa_
= this->ec_
->default_supplier_admin ();
700 ok
= ! CORBA::is_nil (this->sa_
.in ());
701 this->sa_id_
= default_admin_id
;
702 if (ok
&& this->verbose_
)
704 ACE_DEBUG ((LM_DEBUG
,
705 ACE_TEXT ("(%P|%t) Supplier: Using default supplier admin\n")
708 else if (this->verbose_
)
710 ACE_DEBUG ((LM_DEBUG
,
711 ACE_TEXT ("(%P|%t) Supplier: No default supplier admin\n")
722 this->sa_
= this->ec_
->new_for_suppliers(
723 CosNotifyChannelAdmin::OR_OP
,
725 ok
= ! CORBA::is_nil (this->sa_
.in ());
726 if (ok
&& this->verbose_
)
728 ACE_DEBUG ((LM_DEBUG
,
729 ACE_TEXT ("(%P|%t) Supplier: Create new supplier admin %d\n"),
730 static_cast<int>(this->sa_id_
)
737 Supplier_Main::init_structured_proxy_consumer ()
740 CosNotifyChannelAdmin::ProxyConsumer_var proxy
;
741 if (this->reconnecting_
)
745 proxy
= this->sa_
->get_proxy_consumer (this->structured_proxy_id_
);
746 ok
= ! CORBA::is_nil (proxy
.in ());
747 if (ok
&& this->verbose_
)
749 ACE_DEBUG ((LM_DEBUG
,
750 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy supplier %d\n"),
751 static_cast<int>(this->structured_proxy_id_
)
762 proxy
= this->sa_
->obtain_notification_push_consumer(
763 CosNotifyChannelAdmin::STRUCTURED_EVENT
,
764 this->structured_proxy_id_
);
765 ok
= ! CORBA::is_nil (proxy
.in ());
766 if (ok
&& this->verbose_
)
768 ACE_DEBUG ((LM_DEBUG
,
769 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
770 static_cast<int>(this->structured_proxy_id_
)
774 this->structured_proxy_push_consumer_
=
775 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(proxy
.in ());
777 if (CORBA::is_nil (this->structured_proxy_push_consumer_
.in ()))
779 ACE_ERROR ((LM_ERROR
,
780 ACE_TEXT ("(%P|%t) init_structured_proxy_consumer received nil ProxyConsumer\n")
782 throw CORBA::OBJECT_NOT_EXIST ();
784 if (CORBA::is_nil (this->structured_push_supplier_ref_
.in ()))
786 PortableServer::ObjectId_var push_supplier_id
=
787 this->root_poa_
->activate_object (
788 &(this->structured_push_supplier_
));
790 CORBA::Object_var obj
=
791 this->root_poa_
->id_to_reference (push_supplier_id
.in ());
793 this->structured_push_supplier_ref_
=
794 CosNotifyComm::StructuredPushSupplier::_narrow (obj
.in ());
796 if (CORBA::is_nil (structured_push_supplier_ref_
.in ()))
798 ACE_ERROR ((LM_ERROR
,
799 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
800 static_cast<int>(this->structured_proxy_id_
)
803 throw CORBA::UNKNOWN();
806 this->structured_proxy_push_consumer_
->connect_structured_push_supplier (
807 structured_push_supplier_ref_
.in ());
811 Supplier_Main::init_sequence_proxy_consumer ()
814 CosNotifyChannelAdmin::ProxyConsumer_var proxy
;
815 if (this->reconnecting_
)
819 proxy
= this->sa_
->get_proxy_consumer(
820 this->sequence_proxy_id_
);
821 ok
= ! CORBA::is_nil (proxy
.in ());
822 if (ok
&& this->verbose_
)
824 ACE_DEBUG ((LM_DEBUG
,
825 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy %d\n"),
826 static_cast<int>(this->sequence_proxy_id_
)
837 proxy
= this->sa_
->obtain_notification_push_consumer(
838 CosNotifyChannelAdmin::SEQUENCE_EVENT
,
839 this->sequence_proxy_id_
);
840 ok
= ! CORBA::is_nil (proxy
.in ());
841 if (ok
&& this->verbose_
)
843 ACE_DEBUG ((LM_DEBUG
,
844 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
845 static_cast<int>(this->sequence_proxy_id_
)
849 this->sequence_proxy_push_consumer_
=
850 CosNotifyChannelAdmin::SequenceProxyPushConsumer::_narrow(proxy
.in ());
852 if (CORBA::is_nil (this->sequence_proxy_push_consumer_
.in ()))
854 ACE_ERROR ((LM_ERROR
,
855 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
856 static_cast<int>(this->sequence_proxy_id_
)
858 throw CORBA::UNKNOWN();
861 if (CORBA::is_nil (this->sequence_push_supplier_ref_
.in ()))
863 PortableServer::ObjectId_var push_supplier_id
=
864 this->root_poa_
->activate_object (
865 &(this->sequence_push_supplier_
));
867 CORBA::Object_var obj
=
868 this->root_poa_
->id_to_reference (push_supplier_id
.in ());
870 this->sequence_push_supplier_ref_
=
871 CosNotifyComm::SequencePushSupplier::_narrow (obj
.in ());
873 if (CORBA::is_nil (sequence_push_supplier_ref_
.in ()))
875 ACE_ERROR ((LM_ERROR
,
876 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
877 static_cast<int>(this->sequence_proxy_id_
)
879 throw CORBA::UNKNOWN();
882 this->sequence_proxy_push_consumer_
->connect_sequence_push_supplier (
883 sequence_push_supplier_ref_
.in ());
887 Supplier_Main::init_any_proxy_consumer ()
890 CosNotifyChannelAdmin::ProxyConsumer_var proxy
;
891 if (this->reconnecting_
)
895 proxy
= this->sa_
->get_proxy_consumer(
896 this->any_proxy_id_
);
897 ok
= ! CORBA::is_nil (proxy
.in ());
898 if (ok
&& this->verbose_
)
900 ACE_DEBUG ((LM_DEBUG
,
901 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy %d\n"),
902 static_cast<int>(this->any_proxy_id_
)
913 proxy
= this->sa_
->obtain_notification_push_consumer(
914 CosNotifyChannelAdmin::ANY_EVENT
,
915 this->any_proxy_id_
);
916 ok
= ! CORBA::is_nil (proxy
.in ());
917 if (ok
&& this->verbose_
)
919 ACE_DEBUG ((LM_DEBUG
,
920 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
921 static_cast<int>(this->any_proxy_id_
)
925 this->any_proxy_push_consumer_
=
926 CosNotifyChannelAdmin::ProxyPushConsumer::_narrow(proxy
.in ());
928 if (CORBA::is_nil (this->any_proxy_push_consumer_
.in ()))
930 ACE_ERROR ((LM_ERROR
,
931 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
932 static_cast<int>(this->any_proxy_id_
)
934 throw CORBA::UNKNOWN();
937 if (CORBA::is_nil (this->any_push_supplier_ref_
.in ()))
939 PortableServer::ObjectId_var push_supplier_id
=
940 this->root_poa_
->activate_object (
941 &(this->any_push_supplier_
));
943 CORBA::Object_var obj
=
944 this->root_poa_
->id_to_reference (push_supplier_id
.in ());
946 this->any_push_supplier_ref_
=
947 CosNotifyComm::PushSupplier::_narrow (obj
.in ());
949 if (CORBA::is_nil (any_push_supplier_ref_
.in ()))
951 ACE_ERROR ((LM_ERROR
,
952 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
953 static_cast<int>(this->sequence_proxy_id_
)
955 throw CORBA::UNKNOWN();
958 this->any_proxy_push_consumer_
->connect_any_push_supplier (
959 any_push_supplier_ref_
.in ());
962 int Supplier_Main::fini ()
964 if (this->disconnect_on_exit_
)
966 this->reconnection_callback_
.fini ();
967 if (!CORBA::is_nil (this->structured_proxy_push_consumer_
.in ()))
971 ACE_DEBUG ((LM_DEBUG
,
972 ACE_TEXT ("(%P|%t) Disconnecting structured\n")
975 this->structured_proxy_push_consumer_
->disconnect_structured_push_consumer ();
977 if (!CORBA::is_nil (this->sequence_proxy_push_consumer_
.in ()))
981 ACE_DEBUG ((LM_DEBUG
,
982 ACE_TEXT ("(%P|%t) Disconnecting sequence\n")
985 this->sequence_proxy_push_consumer_
->disconnect_sequence_push_consumer ();
987 if (!CORBA::is_nil (this->any_proxy_push_consumer_
.in ()))
991 ACE_DEBUG ((LM_DEBUG
,
992 ACE_TEXT ("(%P|%t) Disconnecting any\n")
995 this->any_proxy_push_consumer_
->disconnect_push_consumer ();
997 if (!CORBA::is_nil (this->sa_
.in ()) && this->sa_id_
!= default_admin_id
)
1001 ACE_DEBUG ((LM_DEBUG
,
1002 ACE_TEXT ("(%P|%t) destroy admin %d\n"),
1003 static_cast<int>(this->sa_id_
)
1006 this->sa_
->destroy();
1009 this->orb_
->shutdown ();
1013 void Supplier_Main::send_structured_event ()
1015 CosNotification::StructuredEvent event
;
1019 // FixedEventHeader.
1022 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("*");
1024 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup("*");
1026 event
.header
.fixed_header
.event_name
= CORBA::string_dup("reconnect_test");
1028 // OptionalHeaderFields.
1030 // sequence<Property>: string name, any value
1031 CosNotification::PropertySeq
& qos
= event
.header
.variable_header
;
1033 qos
[0].name
= CORBA::string_dup (CosNotification::Priority
);
1034 qos
[0].value
<<= CosNotification::LowestPriority
;
1035 qos
[1].name
= CORBA::string_dup (CosNotification::EventReliability
);
1036 qos
[1].value
<<= CosNotification::Persistent
;
1038 // FilterableEventBody
1040 // sequence<Property>: string name, any value
1041 event
.filterable_data
.length (1);
1042 event
.filterable_data
[0].name
= CORBA::string_dup("serial_number");
1043 event
.filterable_data
[0].value
<<= CORBA::ULong ( this->serial_number_
);
1046 event
.remainder_of_body
<<= CORBA::ULong ( this->serial_number_
);
1050 ACE_DEBUG ((LM_DEBUG
,
1051 ACE_TEXT ("(%P,%t) Supplier push structured event %d\n"),
1052 static_cast<int>(serial_number_
)
1056 this->structured_proxy_push_consumer_
->push_structured_event (event
);
1059 void Supplier_Main::send_sequence_event ()
1061 CosNotification::EventBatch
event_batch(1);
1062 event_batch
.length (1);
1063 CosNotification::StructuredEvent
& event
= event_batch
[0];
1067 // FixedEventHeader.
1070 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("*");
1072 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup("*");
1074 event
.header
.fixed_header
.event_name
= CORBA::string_dup("reconnect_test");
1076 // OptionalHeaderFields.
1078 // sequence<Property>: string name, any value
1079 CosNotification::PropertySeq
& qos
= event
.header
.variable_header
;
1081 qos
[0].name
= CORBA::string_dup (CosNotification::Priority
);
1082 qos
[0].value
<<= CosNotification::LowestPriority
;
1083 qos
[1].name
= CORBA::string_dup (CosNotification::EventReliability
);
1084 qos
[1].value
<<= CosNotification::Persistent
;
1086 // FilterableEventBody
1088 // sequence<Property>: string name, any value
1089 event
.filterable_data
.length (1);
1090 event
.filterable_data
[0].name
= CORBA::string_dup("serial_number");
1091 event
.filterable_data
[0].value
<<= CORBA::ULong ( this->serial_number_
);
1094 event
.remainder_of_body
<<= CORBA::ULong ( this->serial_number_
);
1098 ACE_DEBUG ((LM_DEBUG
,
1099 ACE_TEXT ("(%P,%t) Supplier push sequence events %d\n"),
1100 static_cast<int>(this->serial_number_
)
1104 this->sequence_proxy_push_consumer_
->push_structured_events (event_batch
);
1107 void Supplier_Main::send_any_event ()
1110 event
<<= CORBA::ULong (this->serial_number_
);
1114 ACE_DEBUG ((LM_DEBUG
,
1115 ACE_TEXT ("(%P,%t) Supplier push any event %d\n"),
1116 static_cast<int>(this->serial_number_
)
1120 this->any_proxy_push_consumer_
->push (event
);
1123 int Supplier_Main::run ()
1126 bool paused
= false;
1127 size_t reconnections
= 0;
1130 while ( send
< this->send_
)
1132 ACE_Time_Value
tv(0, 100 * 1000);
1135 if (this->pause_
!= 0 && send
== this->pause_
)
1139 ACE_DEBUG ((LM_DEBUG
,
1140 ACE_TEXT ("(%P|%t) Supplier paused after %d events\n"),
1141 static_cast<int>(this->pause_
)
1144 reconnections
= this->reconnection_callback_
.reconnect_count ();
1145 FILE * pause_file
= ACE_OS::fopen ("Supplier.paused", "w");
1146 if (pause_file
!= 0)
1148 ACE_OS::fputs (ACE_TEXT ("paused\n"), pause_file
);
1149 ACE_OS::fclose (pause_file
);
1156 if (this->reconnection_callback_
.reconnect_count () != reconnections
)
1160 ACE_DEBUG ((LM_DEBUG
,
1161 ACE_TEXT ("(%P|%t) Supplier no longer paused. Next s# %d\n"),
1162 static_cast<int>(this->serial_number_
)
1171 switch (this->mode_
)
1173 case MODE_STRUCTURED
:
1175 send_structured_event ();
1180 send_sequence_event ();
1189 this->serial_number_
+= 1;
1198 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
1204 result
= app
.init(argc
, argv
);
1208 result
= app
.run ();
1215 catch (const CORBA::Exception
& ex
)
1217 ex
._tao_print_exception ("Error: Supplier::main\t\n");
1220 ACE_DEBUG ((LM_DEBUG
,
1221 ACE_TEXT ("(%P,%t) Supplier exits: code %d\n"),