2 #include "orbsvcs/CosNotifyChannelAdminC.h"
4 #include "tao/PortableServer/PortableServerC.h"
5 #include "tao/TimeBaseC.h"
6 #include "ace/OS_NS_stdio.h"
7 #include "ace/OS_NS_strings.h"
11 static const char NOTIFY_FACTORY_NAME
[] = "NotifyEventChannelFactory";
12 static const char NAMING_SERVICE_NAME
[] = "NameService";
14 ///////////////////////////
15 // StructuredPushSupplier_i
18 StructuredPushSupplier_i::subscription_change (
19 const CosNotification::EventTypeSeq
& added
,
20 const CosNotification::EventTypeSeq
& removed
)
22 ACE_UNUSED_ARG (added
);
23 ACE_UNUSED_ARG (removed
);
25 ACE_TEXT ("(%P,%t) Supplier StructuredPushSupplier received subscription change\n")
31 StructuredPushSupplier_i::disconnect_structured_push_supplier ()
34 ACE_TEXT ("(%P,%t) Supplier StructuredPushSupplier received disconnect\n")
38 ///////////////////////////
39 // SequencePushSupplier_i
42 SequencePushSupplier_i::subscription_change (
43 const CosNotification::EventTypeSeq
& added
,
44 const CosNotification::EventTypeSeq
& removed
)
46 ACE_UNUSED_ARG (added
);
47 ACE_UNUSED_ARG (removed
);
49 ACE_TEXT ("(%P,%t) Supplier SequencePushSupplier received subscription change\n")
54 SequencePushSupplier_i::disconnect_sequence_push_supplier ()
57 ACE_TEXT ("(%P,%t) Supplier SequencePushSupplier received disconnect\n")
61 ///////////////////////////
65 AnyPushSupplier_i::subscription_change (
66 const CosNotification::EventTypeSeq
& added
,
67 const CosNotification::EventTypeSeq
& removed
)
69 ACE_UNUSED_ARG (added
);
70 ACE_UNUSED_ARG (removed
);
72 ACE_TEXT ("(%P,%t) Supplier AnyPushSupplier received subscription change\n")
78 AnyPushSupplier_i::disconnect_push_supplier ()
81 ACE_TEXT ("(%P,%t) Suppleir AnyPushSupplier received disconnect\n")
85 /////////////////////////
86 // ReconnectionCallback_i
88 ReconnectionCallback_i::ReconnectionCallback_i (Supplier_Main
& supplier_main
)
89 : supplier_main_ (supplier_main
)
90 , id_is_valid_ (false)
91 , reconnect_count_ (0)
96 ReconnectionCallback_i::reconnect_count () const
98 return this->reconnect_count_
;
102 ReconnectionCallback_i::reconnect (CORBA::Object_ptr reconnection
)
104 ACE_DEBUG ((LM_DEBUG
,
105 ACE_TEXT ("(%P|%t) Supplier received reconnection request\n")
107 this->ecf_
= CosNotifyChannelAdmin::EventChannelFactory::_narrow (reconnection
);
108 if (!CORBA::is_nil (this->ecf_
.in ()))
110 this->supplier_main_
.reconnect (this->ecf_
.in ());
111 this->reconnect_count_
+= 1;
115 ACE_DEBUG ((LM_DEBUG
,
116 ACE_TEXT ("(%P|%t) Supplier reconnection request failed: wrong object type\n")
122 ReconnectionCallback_i::is_alive (void)
127 ReconnectionCallback_i::~ReconnectionCallback_i ()
129 // normally you would disconnect from registry here, but
130 // to simulate a failure, we exit without cleaning up
131 // if the fini method is not called.
136 ReconnectionCallback_i::fini (void)
138 if (this->id_is_valid_
)
140 NotifyExt::ReconnectionRegistry_var registry
=
141 NotifyExt::ReconnectionRegistry::_narrow (this->ecf_
.in ());
143 registry
->unregister_callback (this->callback_id_
);
144 this->id_is_valid_
= false;
149 ReconnectionCallback_i::init (
150 PortableServer::POA_ptr poa
,
151 CosNotifyChannelAdmin::EventChannelFactory_ptr ecf
)
153 this->ecf_
= CosNotifyChannelAdmin::EventChannelFactory::_duplicate (ecf
);
154 PortableServer::ObjectId_var reconnection_callback_id
=
155 poa
->activate_object (this);
157 CORBA::Object_var obj
=
158 poa
->id_to_reference (reconnection_callback_id
.in ());
160 NotifyExt::ReconnectionCallback_var callback
=
161 NotifyExt::ReconnectionCallback::_narrow (obj
.in ());
163 NotifyExt::ReconnectionRegistry_var registry
=
164 NotifyExt::ReconnectionRegistry::_narrow (ecf
);
166 this->callback_id_
= registry
->register_callback (callback
.in ());
167 this->id_is_valid_
= true;
175 Supplier_Main::Supplier_Main ()
179 , use_naming_service_ (true)
181 , disconnect_on_exit_ (false)
182 , id_file_ (ACE_TEXT ("supplier.ids"))
186 , structured_proxy_id_(0)
187 , sequence_proxy_id_(0)
189 , reconnection_callback_ (*this)
190 , reconnecting_ (false)
194 Supplier_Main::~Supplier_Main ()
199 Supplier_Main::parse_args (int argc
, ACE_TCHAR
*argv
[])
203 bool corba_arg
= false;
204 while (narg
< argc
&& result
== 0)
206 int consumed
= parse_single_arg (argc
- narg
, &argv
[narg
]);
212 else if (ACE_OS::strncmp (argv
[narg
], ACE_TEXT("-ORB"), 4) == 0)
218 // previous argument was a ORB arg.
219 // current argument is unrecognized
220 // assume the ORB eats this arg
226 ACE_OS::fprintf(stderr
, "Unrecognized argument: %s\n",
227 ACE_TEXT_ALWAYS_CHAR (argv
[narg
]));
236 Supplier_Main::parse_single_arg (int argc
, ACE_TCHAR
*argv
[])
239 if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-v")) == 0)
241 this->verbose_
= true;
244 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-any")) == 0)
246 this->mode_
= MODE_ANY
;
249 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-structured")) == 0)
251 this->mode_
= MODE_STRUCTURED
;
254 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-sequence")) == 0)
256 this->mode_
= MODE_SEQUENCE
;
259 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-channel")) == 0)
261 this->channel_file_
= argv
[1];
264 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-send")) == 0 && argc
> 1)
266 this->send_
= ACE_OS::atoi (argv
[1]);
269 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-pause")) == 0 && argc
> 1)
271 this->pause_
= ACE_OS::atoi (argv
[1]);
274 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-serial_number")) == 0)
276 this->serial_number_
= ACE_OS::atoi (argv
[1]);
279 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT ("-nonamesvc")) == 0)
281 this->use_naming_service_
= false;
284 else if (ACE_OS::strcasecmp (argv
[0], ACE_TEXT("-disconnect")) == 0)
286 this->disconnect_on_exit_
= true;
293 void Supplier_Main::usage(FILE * log
)const
295 //FUZZ: disable check_for_lack_ACE_OS
298 ACE_TEXT (" -channel filename Where to find a channel number.\n")
299 ACE_TEXT (" -any or -structured or -sequence\n")
300 ACE_TEXT (" What type of event to send (pick one, default is -any)\n")
301 ACE_TEXT (" -send n How many events of each type to send.\n")
302 ACE_TEXT (" -pause n Pause after sending n events. Write to file \"Supplier.paused\"\n")
303 ACE_TEXT (" -serial_number n What serial number to start with.\n")
304 ACE_TEXT (" -v Verbose output.\n")
305 ACE_TEXT (" -disconnect Disconnect from channel on exit (prevents reconnect.)\n")
306 ACE_TEXT (" -nonamesvc Don't use the name service to find EventChannelFactory\n")
308 //FUZZ: enable check_for_lack_ACE_OS
311 int Supplier_Main::init (int argc
, ACE_TCHAR
*argv
[])
313 this->orb_
= CORBA::ORB_init (argc
, argv
);
315 if (0 != this->parse_args (argc
, argv
))
320 CORBA::Object_ptr poa_object
=
321 this->orb_
->resolve_initial_references("RootPOA");
323 if (CORBA::is_nil (poa_object
))
325 ACE_ERROR ((LM_ERROR
,
326 ACE_TEXT (" (%P|%t) Unable to initialize the POA.\n")));
331 PortableServer::POA::_narrow (poa_object
);
333 PortableServer::POAManager_var poa_manager
=
334 root_poa_
->the_POAManager ();
336 poa_manager
->activate ();
338 if (this->use_naming_service_
)
340 this->find_notify_factory ();
344 int ok
= resolve_notify_factory ();
351 this->reconnecting_
= load_ids ();
353 init_event_channel ();
355 init_supplier_admin ();
359 case MODE_STRUCTURED
:
361 init_structured_proxy_consumer ();
366 init_sequence_proxy_consumer ();
372 init_any_proxy_consumer ();
377 ACE_ERROR ((LM_ERROR
,
378 ACE_TEXT ("(%P|%t) Supplier: Unknown event push mode.\n")
383 this->reconnection_callback_
.init (
384 this->root_poa_
.in (),
391 Supplier_Main::save_ids()
394 ACE_OS::fopen (this->id_file_
.c_str (), "w");
399 int imode
= static_cast<int>(this->mode_
);
400 ACE_OS::fprintf (idf
,
401 "%d,%d,%d,%d,%d,%d,%d,\n",
402 static_cast<int> (imode
),
403 static_cast<int> (ec_id_
),
404 static_cast<int> (sa_id_
),
405 static_cast<int> (structured_proxy_id_
),
406 static_cast<int> (sequence_proxy_id_
),
407 static_cast<int> (any_proxy_id_
),
408 static_cast<int> (endflag
) );
409 ACE_OS::fclose (idf
);
414 Supplier_Main::load_ids()
418 ACE_OS::fopen (this->id_file_
.c_str (), "r");
424 char buffer
[100] = ""; // because ACE fgets doesn't put a null if the file is empty
425 ACE_OS::fgets (buffer
, sizeof(buffer
), idf
);
426 ACE_OS::fclose (idf
);
428 while (!ok
&& *pb
!= 0)
430 char * eb
= ACE_OS::strchr (pb
, ',');
434 eb
= pb
+ ACE_OS::strlen (pb
);
440 int value
= ACE_OS::atoi(pb
);
444 this->mode_
= static_cast<Mode_T
> (value
);
447 this->ec_id_
= value
;
450 this->sa_id_
= value
;
453 this->structured_proxy_id_
= value
;
456 this->sequence_proxy_id_
= value
;
459 this->any_proxy_id_
= value
;
465 ACE_OS::fprintf (stderr
, ACE_TEXT ("Supplier: Warning: too many fields in saved id file.\n"));
477 Supplier_Main::reconnect (
478 CosNotifyChannelAdmin::EventChannelFactory_ptr dest_factory
)
480 this->ecf_
= CosNotifyChannelAdmin::EventChannelFactory::_duplicate (dest_factory
);
481 this->reconnecting_
= true;
482 init_event_channel ();
484 init_supplier_admin ();
488 case MODE_STRUCTURED
:
490 init_structured_proxy_consumer ();
495 init_sequence_proxy_consumer ();
500 init_any_proxy_consumer ();
508 Supplier_Main::resolve_naming_service (void)
510 // ignore redundant calls
511 if (CORBA::is_nil (this->naming_context_
.in ()))
513 CORBA::Object_var naming_obj
=
514 this->orb_
->resolve_initial_references (NAMING_SERVICE_NAME
);
516 this->naming_context_
=
517 CosNaming::NamingContext::_narrow (naming_obj
.in ());
520 return !CORBA::is_nil (this->naming_context_
.in ());
524 Supplier_Main::find_notify_factory (void)
526 int status
= this->resolve_naming_service ();
529 CosNaming::Name
name (1);
531 name
[0].id
= CORBA::string_dup (NOTIFY_FACTORY_NAME
);
533 CORBA::Object_var obj
=
534 this->naming_context_
->resolve (name
);
537 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj
.in ());
539 return ! CORBA::is_nil (this->ecf_
.in ());
543 Supplier_Main::resolve_notify_factory (void)
545 CORBA::Object_var factory_obj
=
546 this->orb_
->resolve_initial_references (NOTIFY_FACTORY_NAME
);
549 CosNotifyChannelAdmin::EventChannelFactory::_narrow (
551 return ! CORBA::is_nil (this->ecf_
.in ());
555 Supplier_Main::init_event_channel (void)
558 if (this->reconnecting_
)
562 this->ec_
= this->ecf_
->get_event_channel (
564 ok
= ! CORBA::is_nil (this->ec_
.in ());
565 if (ok
&& this->verbose_
)
567 ACE_DEBUG ((LM_DEBUG
,
568 ACE_TEXT ("(%P|%t) Supplier: Reconnect to event channel %d\n"),
569 static_cast<int>(this->ec_id_
)
578 // if we don't have a channel yet, and a channel id file was specified
579 // try to read from it
580 if (!ok
&& this->channel_file_
.length () > 0)
582 FILE * chf
= ACE_OS::fopen (this->channel_file_
.c_str (), "r");
586 ACE_OS::fgets (buffer
, sizeof(buffer
), chf
);
587 ACE_OS::fclose (chf
);
588 this->ec_id_
= ACE_OS::atoi (buffer
);
592 this->ec_
= this->ecf_
->get_event_channel (
594 ok
= ! CORBA::is_nil (this->ec_
.in ());
599 ACE_DEBUG ((LM_DEBUG
,
600 ACE_TEXT ("(%P|%t) Supplier: Connect to Existing event channel %d\n"),
601 static_cast<int>(this->ec_id_
)
604 // kill the channel filename so we don't overwrite the file
605 this->channel_file_
= ACE_TEXT("");
616 CosNotification::QoSProperties
qosprops (7);
619 qosprops
[i
].name
= CORBA::string_dup(CosNotification::EventReliability
);
620 qosprops
[i
++].value
<<= CosNotification::Persistent
;
621 qosprops
[i
].name
= CORBA::string_dup(CosNotification::ConnectionReliability
);
622 qosprops
[i
++].value
<<= CosNotification::Persistent
; // Required, or we won't persist much
623 qosprops
[i
].name
= CORBA::string_dup(CosNotification::Priority
);
624 qosprops
[i
++].value
<<= CosNotification::HighestPriority
;
625 qosprops
[i
].name
= CORBA::string_dup(CosNotification::Timeout
);
626 qosprops
[i
++].value
<<= (TimeBase::TimeT
) 42 * 1000000; // 4.2s
627 qosprops
[i
].name
= CORBA::string_dup(CosNotification::StopTimeSupported
);
628 qosprops
[i
++].value
<<= CORBA::Any::from_boolean(1);
629 qosprops
[i
].name
= CORBA::string_dup(CosNotification::MaximumBatchSize
);
630 qosprops
[i
++].value
<<= (CORBA::Long
) 2;
631 qosprops
[i
].name
= CORBA::string_dup(CosNotification::PacingInterval
);
632 qosprops
[i
++].value
<<= (TimeBase::TimeT
) 50 * 10000; // 50ms
634 CosNotification::AdminProperties
adminprops(4);
635 adminprops
.length (4);
637 adminprops
[i
].name
= CORBA::string_dup(CosNotification::MaxQueueLength
);
638 adminprops
[i
++].value
<<= (CORBA::Long
) 1234;
639 adminprops
[i
].name
= CORBA::string_dup(CosNotification::MaxConsumers
);
640 adminprops
[i
++].value
<<= (CORBA::Long
) 1000;
641 adminprops
[i
].name
= CORBA::string_dup(CosNotification::MaxSuppliers
);
642 adminprops
[i
++].value
<<= (CORBA::Long
) 1000;
643 adminprops
[i
].name
= CORBA::string_dup(CosNotification::RejectNewEvents
);
644 adminprops
[i
++].value
<<= CORBA::Any::from_boolean(1);
646 ec_
= this->ecf_
->create_channel (
650 ok
= ! CORBA::is_nil (ec_
.in ());
651 if (ok
&& this->verbose_
)
653 ACE_DEBUG ((LM_DEBUG
,
654 ACE_TEXT ("(%P|%t) Supplier: Create event channel %d\n"),
655 static_cast<int> (this->ec_id_
)
661 if (ok
&& this->channel_file_
.length() > 0)
663 FILE * chf
= ACE_OS::fopen (this->channel_file_
.c_str (), "w");
666 ACE_OS::fprintf (chf
, "%d\n", static_cast<int> (this->ec_id_
));
667 ACE_OS::fclose (chf
);
672 CosNotifyChannelAdmin::AdminID default_admin_id
= static_cast<CosNotifyChannelAdmin::AdminID
>(-1);
675 Supplier_Main::init_supplier_admin (void)
678 if (this->reconnecting_
&& this->sa_id_
!= default_admin_id
)
682 this->sa_
= this->ec_
->get_supplieradmin(
684 ok
= ! CORBA::is_nil (this->sa_
.in ());
685 if (ok
&& this->verbose_
)
687 ACE_DEBUG ((LM_DEBUG
,
688 ACE_TEXT ("(%P|%t) Supplier: Reconnect to supplier admin %d\n"),
689 static_cast<int>(this->sa_id_
)
702 this->sa_
= this->ec_
->default_supplier_admin ();
703 ok
= ! CORBA::is_nil (this->sa_
.in ());
704 this->sa_id_
= default_admin_id
;
705 if (ok
&& this->verbose_
)
707 ACE_DEBUG ((LM_DEBUG
,
708 ACE_TEXT ("(%P|%t) Supplier: Using default supplier admin\n")
711 else if (this->verbose_
)
713 ACE_DEBUG ((LM_DEBUG
,
714 ACE_TEXT ("(%P|%t) Supplier: No default supplier admin\n")
725 this->sa_
= this->ec_
->new_for_suppliers(
726 CosNotifyChannelAdmin::OR_OP
,
728 ok
= ! CORBA::is_nil (this->sa_
.in ());
729 if (ok
&& this->verbose_
)
731 ACE_DEBUG ((LM_DEBUG
,
732 ACE_TEXT ("(%P|%t) Supplier: Create new supplier admin %d\n"),
733 static_cast<int>(this->sa_id_
)
740 Supplier_Main::init_structured_proxy_consumer (void)
743 CosNotifyChannelAdmin::ProxyConsumer_var proxy
;
744 if (this->reconnecting_
)
748 proxy
= this->sa_
->get_proxy_consumer (
749 this->structured_proxy_id_
751 ok
= ! CORBA::is_nil (proxy
.in ());
752 if (ok
&& this->verbose_
)
754 ACE_DEBUG ((LM_DEBUG
,
755 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy supplier %d\n"),
756 static_cast<int>(this->structured_proxy_id_
)
767 proxy
= this->sa_
->obtain_notification_push_consumer(
768 CosNotifyChannelAdmin::STRUCTURED_EVENT
,
769 this->structured_proxy_id_
);
770 ok
= ! CORBA::is_nil (proxy
.in ());
771 if (ok
&& this->verbose_
)
773 ACE_DEBUG ((LM_DEBUG
,
774 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
775 static_cast<int>(this->structured_proxy_id_
)
779 this->structured_proxy_push_consumer_
=
780 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(proxy
.in ());
782 if (CORBA::is_nil (this->structured_proxy_push_consumer_
.in ()))
784 ACE_ERROR ((LM_ERROR
,
785 ACE_TEXT ("(%P|%t) init_structured_proxy_consumer received nil ProxyConsumer\n")
787 throw CORBA::OBJECT_NOT_EXIST ();
789 if (CORBA::is_nil (this->structured_push_supplier_ref_
.in ()))
791 PortableServer::ObjectId_var push_supplier_id
=
792 this->root_poa_
->activate_object (
793 &(this->structured_push_supplier_
));
795 CORBA::Object_var obj
=
796 this->root_poa_
->id_to_reference (push_supplier_id
.in ());
798 this->structured_push_supplier_ref_
=
799 CosNotifyComm::StructuredPushSupplier::_narrow (obj
.in ());
801 if (CORBA::is_nil (structured_push_supplier_ref_
.in ()))
803 ACE_ERROR ((LM_ERROR
,
804 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
805 static_cast<int>(this->structured_proxy_id_
)
808 throw CORBA::UNKNOWN();
811 this->structured_proxy_push_consumer_
->connect_structured_push_supplier (
812 structured_push_supplier_ref_
.in ());
816 Supplier_Main::init_sequence_proxy_consumer (void)
819 CosNotifyChannelAdmin::ProxyConsumer_var proxy
;
820 if (this->reconnecting_
)
824 proxy
= this->sa_
->get_proxy_consumer(
825 this->sequence_proxy_id_
);
826 ok
= ! CORBA::is_nil (proxy
.in ());
827 if (ok
&& this->verbose_
)
829 ACE_DEBUG ((LM_DEBUG
,
830 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy %d\n"),
831 static_cast<int>(this->sequence_proxy_id_
)
842 proxy
= this->sa_
->obtain_notification_push_consumer(
843 CosNotifyChannelAdmin::SEQUENCE_EVENT
,
844 this->sequence_proxy_id_
);
845 ok
= ! CORBA::is_nil (proxy
.in ());
846 if (ok
&& this->verbose_
)
848 ACE_DEBUG ((LM_DEBUG
,
849 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
850 static_cast<int>(this->sequence_proxy_id_
)
854 this->sequence_proxy_push_consumer_
=
855 CosNotifyChannelAdmin::SequenceProxyPushConsumer::_narrow(proxy
.in ());
857 if (CORBA::is_nil (this->sequence_proxy_push_consumer_
.in ()))
859 ACE_ERROR ((LM_ERROR
,
860 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
861 static_cast<int>(this->sequence_proxy_id_
)
863 throw CORBA::UNKNOWN();
866 if (CORBA::is_nil (this->sequence_push_supplier_ref_
.in ()))
868 PortableServer::ObjectId_var push_supplier_id
=
869 this->root_poa_
->activate_object (
870 &(this->sequence_push_supplier_
));
872 CORBA::Object_var obj
=
873 this->root_poa_
->id_to_reference (push_supplier_id
.in ());
875 this->sequence_push_supplier_ref_
=
876 CosNotifyComm::SequencePushSupplier::_narrow (obj
.in ());
878 if (CORBA::is_nil (sequence_push_supplier_ref_
.in ()))
880 ACE_ERROR ((LM_ERROR
,
881 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
882 static_cast<int>(this->sequence_proxy_id_
)
884 throw CORBA::UNKNOWN();
887 this->sequence_proxy_push_consumer_
->connect_sequence_push_supplier (
888 sequence_push_supplier_ref_
.in ());
892 Supplier_Main::init_any_proxy_consumer (void)
895 CosNotifyChannelAdmin::ProxyConsumer_var proxy
;
896 if (this->reconnecting_
)
900 proxy
= this->sa_
->get_proxy_consumer(
901 this->any_proxy_id_
);
902 ok
= ! CORBA::is_nil (proxy
.in ());
903 if (ok
&& this->verbose_
)
905 ACE_DEBUG ((LM_DEBUG
,
906 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy %d\n"),
907 static_cast<int>(this->any_proxy_id_
)
918 proxy
= this->sa_
->obtain_notification_push_consumer(
919 CosNotifyChannelAdmin::ANY_EVENT
,
920 this->any_proxy_id_
);
921 ok
= ! CORBA::is_nil (proxy
.in ());
922 if (ok
&& this->verbose_
)
924 ACE_DEBUG ((LM_DEBUG
,
925 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
926 static_cast<int>(this->any_proxy_id_
)
930 this->any_proxy_push_consumer_
=
931 CosNotifyChannelAdmin::ProxyPushConsumer::_narrow(proxy
.in ());
933 if (CORBA::is_nil (this->any_proxy_push_consumer_
.in ()))
935 ACE_ERROR ((LM_ERROR
,
936 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
937 static_cast<int>(this->any_proxy_id_
)
939 throw CORBA::UNKNOWN();
942 if (CORBA::is_nil (this->any_push_supplier_ref_
.in ()))
944 PortableServer::ObjectId_var push_supplier_id
=
945 this->root_poa_
->activate_object (
946 &(this->any_push_supplier_
));
948 CORBA::Object_var obj
=
949 this->root_poa_
->id_to_reference (push_supplier_id
.in ());
951 this->any_push_supplier_ref_
=
952 CosNotifyComm::PushSupplier::_narrow (obj
.in ());
954 if (CORBA::is_nil (any_push_supplier_ref_
.in ()))
956 ACE_ERROR ((LM_ERROR
,
957 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
958 static_cast<int>(this->sequence_proxy_id_
)
960 throw CORBA::UNKNOWN();
963 this->any_proxy_push_consumer_
->connect_any_push_supplier (
964 any_push_supplier_ref_
.in ());
967 int Supplier_Main::fini (void)
969 if (this->disconnect_on_exit_
)
971 this->reconnection_callback_
.fini ();
972 if (!CORBA::is_nil (this->structured_proxy_push_consumer_
.in ()))
976 ACE_DEBUG ((LM_DEBUG
,
977 ACE_TEXT ("(%P|%t) Disconnecting structured\n")
980 this->structured_proxy_push_consumer_
->disconnect_structured_push_consumer ();
982 if (!CORBA::is_nil (this->sequence_proxy_push_consumer_
.in ()))
986 ACE_DEBUG ((LM_DEBUG
,
987 ACE_TEXT ("(%P|%t) Disconnecting sequence\n")
990 this->sequence_proxy_push_consumer_
->disconnect_sequence_push_consumer ();
992 if (!CORBA::is_nil (this->any_proxy_push_consumer_
.in ()))
996 ACE_DEBUG ((LM_DEBUG
,
997 ACE_TEXT ("(%P|%t) Disconnecting any\n")
1000 this->any_proxy_push_consumer_
->disconnect_push_consumer ();
1002 if (!CORBA::is_nil (this->sa_
.in ()) && this->sa_id_
!= default_admin_id
)
1006 ACE_DEBUG ((LM_DEBUG
,
1007 ACE_TEXT ("(%P|%t) destroy admin %d\n"),
1008 static_cast<int>(this->sa_id_
)
1011 this->sa_
->destroy();
1014 this->orb_
->shutdown ();
1018 void Supplier_Main::send_structured_event (void)
1020 CosNotification::StructuredEvent event
;
1024 // FixedEventHeader.
1027 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("*");
1029 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup("*");
1031 event
.header
.fixed_header
.event_name
= CORBA::string_dup("reconnect_test");
1033 // OptionalHeaderFields.
1035 // sequence<Property>: string name, any value
1036 CosNotification::PropertySeq
& qos
= event
.header
.variable_header
;
1038 qos
[0].name
= CORBA::string_dup (CosNotification::Priority
);
1039 qos
[0].value
<<= CosNotification::LowestPriority
;
1040 qos
[1].name
= CORBA::string_dup (CosNotification::EventReliability
);
1041 qos
[1].value
<<= CosNotification::Persistent
;
1043 // FilterableEventBody
1045 // sequence<Property>: string name, any value
1046 event
.filterable_data
.length (1);
1047 event
.filterable_data
[0].name
= CORBA::string_dup("serial_number");
1048 event
.filterable_data
[0].value
<<= CORBA::ULong ( this->serial_number_
);
1051 event
.remainder_of_body
<<= CORBA::ULong ( this->serial_number_
);
1055 ACE_DEBUG ((LM_DEBUG
,
1056 ACE_TEXT ("(%P,%t) Supplier push structured event %d\n"),
1057 static_cast<int>(serial_number_
)
1061 this->structured_proxy_push_consumer_
->push_structured_event (event
);
1064 void Supplier_Main::send_sequence_event (void)
1066 CosNotification::EventBatch
event_batch(1);
1067 event_batch
.length (1);
1068 CosNotification::StructuredEvent
& event
= event_batch
[0];
1072 // FixedEventHeader.
1075 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("*");
1077 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup("*");
1079 event
.header
.fixed_header
.event_name
= CORBA::string_dup("reconnect_test");
1081 // OptionalHeaderFields.
1083 // sequence<Property>: string name, any value
1084 CosNotification::PropertySeq
& qos
= event
.header
.variable_header
;
1086 qos
[0].name
= CORBA::string_dup (CosNotification::Priority
);
1087 qos
[0].value
<<= CosNotification::LowestPriority
;
1088 qos
[1].name
= CORBA::string_dup (CosNotification::EventReliability
);
1089 qos
[1].value
<<= CosNotification::Persistent
;
1091 // FilterableEventBody
1093 // sequence<Property>: string name, any value
1094 event
.filterable_data
.length (1);
1095 event
.filterable_data
[0].name
= CORBA::string_dup("serial_number");
1096 event
.filterable_data
[0].value
<<= CORBA::ULong ( this->serial_number_
);
1099 event
.remainder_of_body
<<= CORBA::ULong ( this->serial_number_
);
1103 ACE_DEBUG ((LM_DEBUG
,
1104 ACE_TEXT ("(%P,%t) Supplier push sequence events %d\n"),
1105 static_cast<int>(this->serial_number_
)
1109 this->sequence_proxy_push_consumer_
->push_structured_events (event_batch
);
1112 void Supplier_Main::send_any_event (void)
1115 event
<<= CORBA::ULong (this->serial_number_
);
1119 ACE_DEBUG ((LM_DEBUG
,
1120 ACE_TEXT ("(%P,%t) Supplier push any event %d\n"),
1121 static_cast<int>(this->serial_number_
)
1125 this->any_proxy_push_consumer_
->push (event
);
1128 int Supplier_Main::run (void)
1131 bool paused
= false;
1132 size_t reconnections
= 0;
1135 while ( send
< this->send_
)
1137 ACE_Time_Value
tv(0, 100 * 1000);
1140 if (this->pause_
!= 0 && send
== this->pause_
)
1144 ACE_DEBUG ((LM_DEBUG
,
1145 ACE_TEXT ("(%P|%t) Supplier paused after %d events\n"),
1146 static_cast<int>(this->pause_
)
1149 reconnections
= this->reconnection_callback_
.reconnect_count ();
1150 FILE * pause_file
= ACE_OS::fopen ("Supplier.paused", "w");
1151 if (pause_file
!= 0)
1153 ACE_OS::fputs (ACE_TEXT ("paused\n"), pause_file
);
1154 ACE_OS::fclose (pause_file
);
1161 if (this->reconnection_callback_
.reconnect_count () != reconnections
)
1165 ACE_DEBUG ((LM_DEBUG
,
1166 ACE_TEXT ("(%P|%t) Supplier no longer paused. Next s# %d\n"),
1167 static_cast<int>(this->serial_number_
)
1176 switch (this->mode_
)
1178 case MODE_STRUCTURED
:
1180 send_structured_event ();
1185 send_sequence_event ();
1194 this->serial_number_
+= 1;
1203 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
1209 result
= app
.init(argc
, argv
);
1213 result
= app
.run ();
1220 catch (const CORBA::Exception
& ex
)
1222 ex
._tao_print_exception ("Error: Supplier::main\t\n");
1225 ACE_DEBUG ((LM_DEBUG
,
1226 ACE_TEXT ("(%P,%t) Supplier exits: code %d\n"),