Changes to attempt to silence bcc64x
[ACE_TAO.git] / TAO / orbsvcs / tests / Notify / Reconnecting / Supplier.cpp
blob346246d79cc3b86b4081631621c95a66d7a51ba0
1 #include "Supplier.h"
2 #include "orbsvcs/CosNotifyChannelAdminC.h"
3 #include "tao/debug.h"
4 #include "tao/PortableServer/PortableServerC.h"
5 #include "tao/TimeBaseC.h"
6 #include "ace/OS_NS_stdio.h"
7 #include "ace/OS_NS_strings.h"
10 static const char NOTIFY_FACTORY_NAME[] = "NotifyEventChannelFactory";
11 static const char NAMING_SERVICE_NAME[] = "NameService";
13 ///////////////////////////
14 // StructuredPushSupplier_i
16 void
17 StructuredPushSupplier_i::subscription_change (
18 const CosNotification::EventTypeSeq & added,
19 const CosNotification::EventTypeSeq & removed)
21 ACE_UNUSED_ARG (added);
22 ACE_UNUSED_ARG (removed);
23 ACE_DEBUG ((LM_DEBUG,
24 ACE_TEXT ("(%P,%t) Supplier StructuredPushSupplier received subscription change\n")
25 ));
29 void
30 StructuredPushSupplier_i::disconnect_structured_push_supplier ()
32 ACE_DEBUG ((LM_DEBUG,
33 ACE_TEXT ("(%P,%t) Supplier StructuredPushSupplier received disconnect\n")
34 ));
37 ///////////////////////////
38 // SequencePushSupplier_i
40 void
41 SequencePushSupplier_i::subscription_change (
42 const CosNotification::EventTypeSeq & added,
43 const CosNotification::EventTypeSeq & removed)
45 ACE_UNUSED_ARG (added);
46 ACE_UNUSED_ARG (removed);
47 ACE_DEBUG ((LM_DEBUG,
48 ACE_TEXT ("(%P,%t) Supplier SequencePushSupplier received subscription change\n")
49 ));
52 void
53 SequencePushSupplier_i::disconnect_sequence_push_supplier ()
55 ACE_DEBUG ((LM_DEBUG,
56 ACE_TEXT ("(%P,%t) Supplier SequencePushSupplier received disconnect\n")
57 ));
60 ///////////////////////////
61 // AnyPushSupplier_i
63 void
64 AnyPushSupplier_i::subscription_change (
65 const CosNotification::EventTypeSeq & added,
66 const CosNotification::EventTypeSeq & removed)
68 ACE_UNUSED_ARG (added);
69 ACE_UNUSED_ARG (removed);
70 ACE_DEBUG ((LM_DEBUG,
71 ACE_TEXT ("(%P,%t) Supplier AnyPushSupplier received subscription change\n")
72 ));
76 void
77 AnyPushSupplier_i::disconnect_push_supplier ()
79 ACE_DEBUG ((LM_DEBUG,
80 ACE_TEXT ("(%P,%t) Suppleir AnyPushSupplier received disconnect\n")
81 ));
84 /////////////////////////
85 // ReconnectionCallback_i
87 ReconnectionCallback_i::ReconnectionCallback_i (Supplier_Main & supplier_main)
88 : supplier_main_ (supplier_main)
89 , id_is_valid_ (false)
90 , reconnect_count_ (0)
94 size_t
95 ReconnectionCallback_i::reconnect_count () const
97 return this->reconnect_count_;
100 void
101 ReconnectionCallback_i::reconnect (CORBA::Object_ptr reconnection)
103 ACE_DEBUG ((LM_DEBUG,
104 ACE_TEXT ("(%P|%t) Supplier received reconnection request\n")
106 this->ecf_ = CosNotifyChannelAdmin::EventChannelFactory::_narrow (reconnection);
107 if (!CORBA::is_nil (this->ecf_.in ()))
109 this->supplier_main_.reconnect (this->ecf_.in ());
110 this->reconnect_count_ += 1;
112 else
114 ACE_DEBUG ((LM_DEBUG,
115 ACE_TEXT ("(%P|%t) Supplier reconnection request failed: wrong object type\n")
120 CORBA::Boolean
121 ReconnectionCallback_i::is_alive ()
123 return true;
126 ReconnectionCallback_i::~ReconnectionCallback_i ()
128 // normally you would disconnect from registry here, but
129 // to simulate a failure, we exit without cleaning up
130 // if the fini method is not called.
134 void
135 ReconnectionCallback_i::fini ()
137 if (this->id_is_valid_)
139 NotifyExt::ReconnectionRegistry_var registry =
140 NotifyExt::ReconnectionRegistry::_narrow (this->ecf_.in ());
142 registry->unregister_callback (this->callback_id_);
143 this->id_is_valid_ = false;
147 void
148 ReconnectionCallback_i::init (
149 PortableServer::POA_ptr poa,
150 CosNotifyChannelAdmin::EventChannelFactory_ptr ecf)
152 this->ecf_ = CosNotifyChannelAdmin::EventChannelFactory::_duplicate (ecf);
153 PortableServer::ObjectId_var reconnection_callback_id =
154 poa->activate_object (this);
156 CORBA::Object_var obj =
157 poa->id_to_reference (reconnection_callback_id.in ());
159 NotifyExt::ReconnectionCallback_var callback =
160 NotifyExt::ReconnectionCallback::_narrow (obj.in ());
162 NotifyExt::ReconnectionRegistry_var registry =
163 NotifyExt::ReconnectionRegistry::_narrow (ecf);
165 this->callback_id_ = registry->register_callback (callback.in ());
166 this->id_is_valid_ = true;
170 /////////////////
171 // Supplier_Main
173 Supplier_Main::Supplier_Main ()
174 : verbose_ (false)
175 , mode_ (MODE_ANY)
176 , send_ (10)
177 , use_naming_service_ (true)
178 , serial_number_ (0)
179 , disconnect_on_exit_ (false)
180 , id_file_ (ACE_TEXT ("supplier.ids"))
181 , pause_ (0)
182 , ec_id_ (0)
183 , sa_id_(0)
184 , structured_proxy_id_(0)
185 , sequence_proxy_id_(0)
186 , any_proxy_id_(0)
187 , reconnection_callback_ (*this)
188 , reconnecting_ (false)
192 Supplier_Main::~Supplier_Main ()
197 Supplier_Main::parse_args (int argc, ACE_TCHAR *argv[])
199 int result = 0;
200 int narg = 1;
201 bool corba_arg = false;
202 while (narg < argc && result == 0)
204 int consumed = parse_single_arg (argc - narg, &argv[narg]);
205 if ( consumed > 0)
207 narg += consumed;
208 corba_arg = false;
210 else if (ACE_OS::strncmp (argv[narg], ACE_TEXT("-ORB"), 4) == 0)
212 corba_arg = true;
214 else if (corba_arg)
216 // previous argument was a ORB arg.
217 // current argument is unrecognized
218 // assume the ORB eats this arg
219 narg += 1;
220 corba_arg = false;
222 else
224 ACE_OS::fprintf(stderr, "Unrecognized argument: %s\n",
225 ACE_TEXT_ALWAYS_CHAR (argv[narg]));
226 usage (stderr);
227 result = -1;
230 return result;
234 Supplier_Main::parse_single_arg (int argc, ACE_TCHAR *argv[])
236 int consumed = 0;
237 if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-v")) == 0)
239 this->verbose_ = true;
240 consumed = 1;
242 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-any")) == 0)
244 this->mode_ = MODE_ANY;
245 consumed = 1;
247 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-structured")) == 0)
249 this->mode_ = MODE_STRUCTURED;
250 consumed = 1;
252 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-sequence")) == 0)
254 this->mode_ = MODE_SEQUENCE;
255 consumed = 1;
257 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-channel")) == 0)
259 this->channel_file_= argv[1];
260 consumed = 2;
262 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-send")) == 0 && argc > 1)
264 this->send_ = ACE_OS::atoi (argv[1]);
265 consumed = 2;
267 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-pause")) == 0 && argc > 1)
269 this->pause_ = ACE_OS::atoi (argv[1]);
270 consumed = 2;
272 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-serial_number")) == 0)
274 this->serial_number_= ACE_OS::atoi (argv[1]);
275 consumed = 2;
277 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-nonamesvc")) == 0)
279 this->use_naming_service_ = false;
280 consumed = 1;
282 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT("-disconnect")) == 0)
284 this->disconnect_on_exit_ = true;
285 consumed = 1;
288 return consumed;
291 void Supplier_Main::usage(FILE * log)const
293 //FUZZ: disable check_for_lack_ACE_OS
294 ACE_OS::fputs (
295 ACE_TEXT ("usage\n")
296 ACE_TEXT (" -channel filename Where to find a channel number.\n")
297 ACE_TEXT (" -any or -structured or -sequence\n")
298 ACE_TEXT (" What type of event to send (pick one, default is -any)\n")
299 ACE_TEXT (" -send n How many events of each type to send.\n")
300 ACE_TEXT (" -pause n Pause after sending n events. Write to file \"Supplier.paused\"\n")
301 ACE_TEXT (" -serial_number n What serial number to start with.\n")
302 ACE_TEXT (" -v Verbose output.\n")
303 ACE_TEXT (" -disconnect Disconnect from channel on exit (prevents reconnect.)\n")
304 ACE_TEXT (" -nonamesvc Don't use the name service to find EventChannelFactory\n")
305 , log);
306 //FUZZ: enable check_for_lack_ACE_OS
309 int Supplier_Main::init (int argc, ACE_TCHAR *argv[])
311 this->orb_ = CORBA::ORB_init (argc, argv);
313 if (0 != this->parse_args (argc, argv))
315 return -1;
318 CORBA::Object_ptr poa_object =
319 this->orb_->resolve_initial_references("RootPOA");
321 if (CORBA::is_nil (poa_object))
323 ACE_ERROR ((LM_ERROR,
324 ACE_TEXT (" (%P|%t) Unable to initialize the POA.\n")));
325 return -1;
328 this->root_poa_ =
329 PortableServer::POA::_narrow (poa_object);
331 PortableServer::POAManager_var poa_manager =
332 root_poa_->the_POAManager ();
334 poa_manager->activate ();
336 if (this->use_naming_service_ )
338 this->find_notify_factory ();
340 else
342 int ok = resolve_notify_factory ();
343 if (!ok)
345 return -1;
349 this->reconnecting_ = load_ids ();
351 init_event_channel ();
353 init_supplier_admin ();
355 switch (this->mode_)
357 case MODE_STRUCTURED:
359 init_structured_proxy_consumer ();
360 break;
362 case MODE_SEQUENCE:
364 init_sequence_proxy_consumer ();
365 break;
367 case MODE_ANY:
369 init_any_proxy_consumer ();
370 break;
372 default:
374 ACE_ERROR ((LM_ERROR,
375 ACE_TEXT ("(%P|%t) Supplier: Unknown event push mode.\n")
377 break;
380 this->reconnection_callback_.init (
381 this->root_poa_.in (),
382 this->ecf_.in ());
384 save_ids ();
385 return 0;
387 void
388 Supplier_Main::save_ids()
390 FILE *idf =
391 ACE_OS::fopen (this->id_file_.c_str (), "w");
393 if (idf != 0)
395 int endflag = 12345;
396 int imode = static_cast<int>(this->mode_);
397 ACE_OS::fprintf (idf,
398 "%d,%d,%d,%d,%d,%d,%d,\n",
399 static_cast<int> (imode),
400 static_cast<int> (ec_id_),
401 static_cast<int> (sa_id_),
402 static_cast<int> (structured_proxy_id_),
403 static_cast<int> (sequence_proxy_id_),
404 static_cast<int> (any_proxy_id_),
405 static_cast<int> (endflag));
406 ACE_OS::fclose (idf);
410 bool
411 Supplier_Main::load_ids()
413 bool ok = false;
414 FILE *idf =
415 ACE_OS::fopen (this->id_file_.c_str (), "r");
417 if (idf != 0)
419 int field = 0;
421 char buffer[100] = ""; // because ACE fgets doesn't put a null if the file is empty
422 ACE_OS::fgets (buffer, sizeof(buffer), idf);
423 ACE_OS::fclose (idf);
424 char * pb = buffer;
425 while (!ok && *pb != 0)
427 char * eb = ACE_OS::strchr (pb, ',');
428 char * nb = eb + 1;
429 if (eb == 0)
431 eb = pb + ACE_OS::strlen (pb);
432 nb = eb;
434 *eb = 0;
435 if (pb < eb)
437 int value = ACE_OS::atoi(pb);
438 switch (++field)
440 case 1:
441 this->mode_ = static_cast<Mode_T> (value);
442 break;
443 case 2:
444 this->ec_id_ = value;
445 break;
446 case 3:
447 this->sa_id_ = value;
448 break;
449 case 4:
450 this->structured_proxy_id_ = value;
451 break;
452 case 5:
453 this->sequence_proxy_id_ = value;
454 break;
455 case 6:
456 this->any_proxy_id_ = value;
457 break;
458 case 7:
459 ok = value == 12345;
460 break;
461 default:
462 ACE_OS::fprintf (stderr, ACE_TEXT ("Supplier: Warning: too many fields in saved id file.\n"));
463 ok = false;
464 break;
467 pb = nb;
470 return ok;
473 void
474 Supplier_Main::reconnect (
475 CosNotifyChannelAdmin::EventChannelFactory_ptr dest_factory)
477 this->ecf_ = CosNotifyChannelAdmin::EventChannelFactory::_duplicate (dest_factory);
478 this->reconnecting_ = true;
479 init_event_channel ();
481 init_supplier_admin ();
483 switch (this->mode_)
485 case MODE_STRUCTURED:
487 init_structured_proxy_consumer ();
488 break;
490 case MODE_SEQUENCE:
492 init_sequence_proxy_consumer ();
493 break;
495 case MODE_ANY:
497 init_any_proxy_consumer ();
498 break;
505 Supplier_Main::resolve_naming_service ()
507 // ignore redundant calls
508 if (CORBA::is_nil (this->naming_context_.in ()))
510 CORBA::Object_var naming_obj =
511 this->orb_->resolve_initial_references (NAMING_SERVICE_NAME);
513 this->naming_context_ =
514 CosNaming::NamingContext::_narrow (naming_obj.in ());
517 return !CORBA::is_nil (this->naming_context_.in ());
521 Supplier_Main::find_notify_factory ()
523 int status = this->resolve_naming_service ();
524 if (status)
526 CosNaming::Name name (1);
527 name.length (1);
528 name[0].id = CORBA::string_dup (NOTIFY_FACTORY_NAME);
530 CORBA::Object_var obj =
531 this->naming_context_->resolve (name);
533 this->ecf_ =
534 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in ());
536 return ! CORBA::is_nil (this->ecf_.in ());
540 Supplier_Main::resolve_notify_factory ()
542 CORBA::Object_var factory_obj =
543 this->orb_->resolve_initial_references (NOTIFY_FACTORY_NAME);
545 this->ecf_ =
546 CosNotifyChannelAdmin::EventChannelFactory::_narrow (
547 factory_obj.in ());
548 return ! CORBA::is_nil (this->ecf_.in ());
551 void
552 Supplier_Main::init_event_channel ()
554 bool ok = false;
555 if (this->reconnecting_)
559 this->ec_ = this->ecf_->get_event_channel (
560 this->ec_id_);
561 ok = ! CORBA::is_nil (this->ec_.in ());
562 if (ok && this->verbose_)
564 ACE_DEBUG ((LM_DEBUG,
565 ACE_TEXT ("(%P|%t) Supplier: Reconnect to event channel %d\n"),
566 static_cast<int>(this->ec_id_)
570 catch (...)
575 // if we don't have a channel yet, and a channel id file was specified
576 // try to read from it
577 if (!ok && this->channel_file_.length () > 0)
579 FILE * chf = ACE_OS::fopen (this->channel_file_.c_str (), "r");
580 if (chf != 0)
582 char buffer[100];
583 ACE_OS::fgets (buffer, sizeof(buffer), chf);
584 ACE_OS::fclose (chf);
585 this->ec_id_ = ACE_OS::atoi (buffer);
589 this->ec_ = this->ecf_->get_event_channel (
590 this->ec_id_);
591 ok = ! CORBA::is_nil (this->ec_.in ());
592 if (ok)
594 if (this->verbose_)
596 ACE_DEBUG ((LM_DEBUG,
597 ACE_TEXT ("(%P|%t) Supplier: Connect to Existing event channel %d\n"),
598 static_cast<int>(this->ec_id_)
601 // kill the channel filename so we don't overwrite the file
602 this->channel_file_ = ACE_TEXT("");
605 catch (...)
611 if (!ok)
613 CosNotification::QoSProperties qosprops (7);
614 qosprops.length (7);
615 CORBA::ULong i = 0;
616 qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability);
617 qosprops[i++].value <<= CosNotification::Persistent;
618 qosprops[i].name = CORBA::string_dup(CosNotification::ConnectionReliability);
619 qosprops[i++].value <<= CosNotification::Persistent; // Required, or we won't persist much
620 qosprops[i].name = CORBA::string_dup(CosNotification::Priority);
621 qosprops[i++].value <<= CosNotification::HighestPriority;
622 qosprops[i].name = CORBA::string_dup(CosNotification::Timeout);
623 qosprops[i++].value <<= (TimeBase::TimeT) 42 * 1000000; // 4.2s
624 qosprops[i].name = CORBA::string_dup(CosNotification::StopTimeSupported);
625 qosprops[i++].value <<= CORBA::Any::from_boolean(1);
626 qosprops[i].name = CORBA::string_dup(CosNotification::MaximumBatchSize);
627 qosprops[i++].value <<= (CORBA::Long) 2;
628 qosprops[i].name = CORBA::string_dup(CosNotification::PacingInterval);
629 qosprops[i++].value <<= (TimeBase::TimeT) 50 * 10000; // 50ms
631 CosNotification::AdminProperties adminprops(4);
632 adminprops.length (4);
633 i = 0;
634 adminprops[i].name = CORBA::string_dup(CosNotification::MaxQueueLength);
635 adminprops[i++].value <<= (CORBA::Long) 1234;
636 adminprops[i].name = CORBA::string_dup(CosNotification::MaxConsumers);
637 adminprops[i++].value <<= (CORBA::Long) 1000;
638 adminprops[i].name = CORBA::string_dup(CosNotification::MaxSuppliers);
639 adminprops[i++].value <<= (CORBA::Long) 1000;
640 adminprops[i].name = CORBA::string_dup(CosNotification::RejectNewEvents);
641 adminprops[i++].value <<= CORBA::Any::from_boolean(1);
643 ec_ = this->ecf_->create_channel (
644 qosprops,
645 adminprops,
646 this->ec_id_);
647 ok = ! CORBA::is_nil (ec_.in ());
648 if (ok && this->verbose_)
650 ACE_DEBUG ((LM_DEBUG,
651 ACE_TEXT ("(%P|%t) Supplier: Create event channel %d\n"),
652 static_cast<int> (this->ec_id_)
657 // save channel id
658 if (ok && this->channel_file_.length() > 0)
660 FILE * chf = ACE_OS::fopen (this->channel_file_.c_str (), "w");
661 if (chf != 0)
663 ACE_OS::fprintf (chf, "%d\n", static_cast<int> (this->ec_id_));
664 ACE_OS::fclose (chf);
669 CosNotifyChannelAdmin::AdminID default_admin_id = static_cast<CosNotifyChannelAdmin::AdminID>(-1);
671 void
672 Supplier_Main::init_supplier_admin ()
674 bool ok = false;
675 if (this->reconnecting_ && this->sa_id_ != default_admin_id)
679 this->sa_ = this->ec_->get_supplieradmin(
680 this->sa_id_);
681 ok = ! CORBA::is_nil (this->sa_.in ());
682 if (ok && this->verbose_)
684 ACE_DEBUG ((LM_DEBUG,
685 ACE_TEXT ("(%P|%t) Supplier: Reconnect to supplier admin %d\n"),
686 static_cast<int>(this->sa_id_)
690 catch (...)
695 if (!ok)
699 this->sa_ = this->ec_->default_supplier_admin ();
700 ok = ! CORBA::is_nil (this->sa_.in ());
701 this->sa_id_ = default_admin_id;
702 if (ok && this->verbose_)
704 ACE_DEBUG ((LM_DEBUG,
705 ACE_TEXT ("(%P|%t) Supplier: Using default supplier admin\n")
708 else if (this->verbose_)
710 ACE_DEBUG ((LM_DEBUG,
711 ACE_TEXT ("(%P|%t) Supplier: No default supplier admin\n")
715 catch (...)
720 if (!ok)
722 this->sa_ = this->ec_->new_for_suppliers(
723 CosNotifyChannelAdmin::OR_OP,
724 this->sa_id_);
725 ok = ! CORBA::is_nil (this->sa_.in ());
726 if (ok && this->verbose_)
728 ACE_DEBUG ((LM_DEBUG,
729 ACE_TEXT ("(%P|%t) Supplier: Create new supplier admin %d\n"),
730 static_cast<int>(this->sa_id_)
736 void
737 Supplier_Main::init_structured_proxy_consumer ()
739 bool ok = false;
740 CosNotifyChannelAdmin::ProxyConsumer_var proxy;
741 if (this->reconnecting_)
745 proxy = this->sa_->get_proxy_consumer (this->structured_proxy_id_);
746 ok = ! CORBA::is_nil (proxy.in ());
747 if (ok && this->verbose_)
749 ACE_DEBUG ((LM_DEBUG,
750 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy supplier %d\n"),
751 static_cast<int>(this->structured_proxy_id_)
755 catch (...)
760 if (!ok)
762 proxy = this->sa_->obtain_notification_push_consumer(
763 CosNotifyChannelAdmin::STRUCTURED_EVENT,
764 this->structured_proxy_id_);
765 ok = ! CORBA::is_nil (proxy.in ());
766 if (ok && this->verbose_)
768 ACE_DEBUG ((LM_DEBUG,
769 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
770 static_cast<int>(this->structured_proxy_id_)
774 this->structured_proxy_push_consumer_ =
775 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(proxy.in ());
777 if (CORBA::is_nil (this->structured_proxy_push_consumer_.in ()))
779 ACE_ERROR ((LM_ERROR,
780 ACE_TEXT ("(%P|%t) init_structured_proxy_consumer received nil ProxyConsumer\n")
782 throw CORBA::OBJECT_NOT_EXIST ();
784 if (CORBA::is_nil (this->structured_push_supplier_ref_.in ()))
786 PortableServer::ObjectId_var push_supplier_id =
787 this->root_poa_->activate_object (
788 &(this->structured_push_supplier_));
790 CORBA::Object_var obj =
791 this->root_poa_->id_to_reference (push_supplier_id.in ());
793 this->structured_push_supplier_ref_ =
794 CosNotifyComm::StructuredPushSupplier::_narrow (obj.in ());
796 if (CORBA::is_nil (structured_push_supplier_ref_.in ()))
798 ACE_ERROR ((LM_ERROR,
799 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
800 static_cast<int>(this->structured_proxy_id_)
803 throw CORBA::UNKNOWN();
806 this->structured_proxy_push_consumer_->connect_structured_push_supplier (
807 structured_push_supplier_ref_.in ());
810 void
811 Supplier_Main::init_sequence_proxy_consumer ()
813 bool ok = false;
814 CosNotifyChannelAdmin::ProxyConsumer_var proxy;
815 if (this->reconnecting_)
819 proxy = this->sa_->get_proxy_consumer(
820 this->sequence_proxy_id_);
821 ok = ! CORBA::is_nil (proxy.in ());
822 if (ok && this->verbose_)
824 ACE_DEBUG ((LM_DEBUG,
825 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy %d\n"),
826 static_cast<int>(this->sequence_proxy_id_)
830 catch (...)
835 if (!ok)
837 proxy = this->sa_->obtain_notification_push_consumer(
838 CosNotifyChannelAdmin::SEQUENCE_EVENT,
839 this->sequence_proxy_id_);
840 ok = ! CORBA::is_nil (proxy.in ());
841 if (ok && this->verbose_)
843 ACE_DEBUG ((LM_DEBUG,
844 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
845 static_cast<int>(this->sequence_proxy_id_)
849 this->sequence_proxy_push_consumer_ =
850 CosNotifyChannelAdmin::SequenceProxyPushConsumer::_narrow(proxy.in ());
852 if (CORBA::is_nil (this->sequence_proxy_push_consumer_.in ()))
854 ACE_ERROR ((LM_ERROR,
855 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
856 static_cast<int>(this->sequence_proxy_id_)
858 throw CORBA::UNKNOWN();
861 if (CORBA::is_nil (this->sequence_push_supplier_ref_.in ()))
863 PortableServer::ObjectId_var push_supplier_id =
864 this->root_poa_->activate_object (
865 &(this->sequence_push_supplier_));
867 CORBA::Object_var obj =
868 this->root_poa_->id_to_reference (push_supplier_id.in ());
870 this->sequence_push_supplier_ref_ =
871 CosNotifyComm::SequencePushSupplier::_narrow (obj.in ());
873 if (CORBA::is_nil (sequence_push_supplier_ref_.in ()))
875 ACE_ERROR ((LM_ERROR,
876 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
877 static_cast<int>(this->sequence_proxy_id_)
879 throw CORBA::UNKNOWN();
882 this->sequence_proxy_push_consumer_->connect_sequence_push_supplier (
883 sequence_push_supplier_ref_.in ());
886 void
887 Supplier_Main::init_any_proxy_consumer ()
889 bool ok = false;
890 CosNotifyChannelAdmin::ProxyConsumer_var proxy;
891 if (this->reconnecting_)
895 proxy = this->sa_->get_proxy_consumer(
896 this->any_proxy_id_);
897 ok = ! CORBA::is_nil (proxy.in ());
898 if (ok && this->verbose_)
900 ACE_DEBUG ((LM_DEBUG,
901 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy %d\n"),
902 static_cast<int>(this->any_proxy_id_)
906 catch (...)
911 if (!ok)
913 proxy = this->sa_->obtain_notification_push_consumer(
914 CosNotifyChannelAdmin::ANY_EVENT,
915 this->any_proxy_id_);
916 ok = ! CORBA::is_nil (proxy.in ());
917 if (ok && this->verbose_)
919 ACE_DEBUG ((LM_DEBUG,
920 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
921 static_cast<int>(this->any_proxy_id_)
925 this->any_proxy_push_consumer_ =
926 CosNotifyChannelAdmin::ProxyPushConsumer::_narrow(proxy.in ());
928 if (CORBA::is_nil (this->any_proxy_push_consumer_.in ()))
930 ACE_ERROR ((LM_ERROR,
931 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
932 static_cast<int>(this->any_proxy_id_)
934 throw CORBA::UNKNOWN();
937 if (CORBA::is_nil (this->any_push_supplier_ref_.in ()))
939 PortableServer::ObjectId_var push_supplier_id =
940 this->root_poa_->activate_object (
941 &(this->any_push_supplier_));
943 CORBA::Object_var obj =
944 this->root_poa_->id_to_reference (push_supplier_id.in ());
946 this->any_push_supplier_ref_ =
947 CosNotifyComm::PushSupplier::_narrow (obj.in ());
949 if (CORBA::is_nil (any_push_supplier_ref_.in ()))
951 ACE_ERROR ((LM_ERROR,
952 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
953 static_cast<int>(this->sequence_proxy_id_)
955 throw CORBA::UNKNOWN();
958 this->any_proxy_push_consumer_->connect_any_push_supplier (
959 any_push_supplier_ref_.in ());
962 int Supplier_Main::fini ()
964 if (this->disconnect_on_exit_)
966 this->reconnection_callback_.fini ();
967 if (!CORBA::is_nil (this->structured_proxy_push_consumer_.in ()))
969 if (this->verbose_)
971 ACE_DEBUG ((LM_DEBUG,
972 ACE_TEXT ("(%P|%t) Disconnecting structured\n")
975 this->structured_proxy_push_consumer_->disconnect_structured_push_consumer ();
977 if (!CORBA::is_nil (this->sequence_proxy_push_consumer_.in ()))
979 if (this->verbose_)
981 ACE_DEBUG ((LM_DEBUG,
982 ACE_TEXT ("(%P|%t) Disconnecting sequence\n")
985 this->sequence_proxy_push_consumer_->disconnect_sequence_push_consumer ();
987 if (!CORBA::is_nil (this->any_proxy_push_consumer_.in ()))
989 if (this->verbose_)
991 ACE_DEBUG ((LM_DEBUG,
992 ACE_TEXT ("(%P|%t) Disconnecting any\n")
995 this->any_proxy_push_consumer_->disconnect_push_consumer ();
997 if (!CORBA::is_nil (this->sa_.in ()) && this->sa_id_ != default_admin_id)
999 if (this->verbose_)
1001 ACE_DEBUG ((LM_DEBUG,
1002 ACE_TEXT ("(%P|%t) destroy admin %d\n"),
1003 static_cast<int>(this->sa_id_)
1006 this->sa_->destroy();
1009 this->orb_->shutdown ();
1010 return 0;
1013 void Supplier_Main::send_structured_event ()
1015 CosNotification::StructuredEvent event;
1017 // EventHeader.
1019 // FixedEventHeader.
1020 // EventType.
1021 // string.
1022 event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
1023 // string
1024 event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
1025 // string
1026 event.header.fixed_header.event_name = CORBA::string_dup("reconnect_test");
1028 // OptionalHeaderFields.
1029 // PropertySeq.
1030 // sequence<Property>: string name, any value
1031 CosNotification::PropertySeq& qos = event.header.variable_header;
1032 qos.length (2);
1033 qos[0].name = CORBA::string_dup (CosNotification::Priority);
1034 qos[0].value <<= CosNotification::LowestPriority;
1035 qos[1].name = CORBA::string_dup (CosNotification::EventReliability);
1036 qos[1].value <<= CosNotification::Persistent;
1038 // FilterableEventBody
1039 // PropertySeq
1040 // sequence<Property>: string name, any value
1041 event.filterable_data.length (1);
1042 event.filterable_data[0].name = CORBA::string_dup("serial_number");
1043 event.filterable_data[0].value <<= CORBA::ULong ( this->serial_number_);
1045 // any
1046 event.remainder_of_body <<= CORBA::ULong ( this->serial_number_);
1048 if (this->verbose_)
1050 ACE_DEBUG ((LM_DEBUG,
1051 ACE_TEXT ("(%P,%t) Supplier push structured event %d\n"),
1052 static_cast<int>(serial_number_)
1056 this->structured_proxy_push_consumer_->push_structured_event (event);
1059 void Supplier_Main::send_sequence_event ()
1061 CosNotification::EventBatch event_batch(1);
1062 event_batch.length (1);
1063 CosNotification::StructuredEvent & event = event_batch[0];
1065 // EventHeader.
1067 // FixedEventHeader.
1068 // EventType.
1069 // string.
1070 event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
1071 // string
1072 event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
1073 // string
1074 event.header.fixed_header.event_name = CORBA::string_dup("reconnect_test");
1076 // OptionalHeaderFields.
1077 // PropertySeq.
1078 // sequence<Property>: string name, any value
1079 CosNotification::PropertySeq& qos = event.header.variable_header;
1080 qos.length (2);
1081 qos[0].name = CORBA::string_dup (CosNotification::Priority);
1082 qos[0].value <<= CosNotification::LowestPriority;
1083 qos[1].name = CORBA::string_dup (CosNotification::EventReliability);
1084 qos[1].value <<= CosNotification::Persistent;
1086 // FilterableEventBody
1087 // PropertySeq
1088 // sequence<Property>: string name, any value
1089 event.filterable_data.length (1);
1090 event.filterable_data[0].name = CORBA::string_dup("serial_number");
1091 event.filterable_data[0].value <<= CORBA::ULong ( this->serial_number_);
1093 // any
1094 event.remainder_of_body <<= CORBA::ULong ( this->serial_number_);
1096 if (this->verbose_)
1098 ACE_DEBUG ((LM_DEBUG,
1099 ACE_TEXT ("(%P,%t) Supplier push sequence events %d\n"),
1100 static_cast<int>(this->serial_number_)
1104 this->sequence_proxy_push_consumer_->push_structured_events (event_batch);
1107 void Supplier_Main::send_any_event ()
1109 CORBA::Any event;
1110 event <<= CORBA::ULong (this->serial_number_);
1112 if (this->verbose_)
1114 ACE_DEBUG ((LM_DEBUG,
1115 ACE_TEXT ("(%P,%t) Supplier push any event %d\n"),
1116 static_cast<int>(this->serial_number_)
1120 this->any_proxy_push_consumer_->push (event);
1123 int Supplier_Main::run ()
1125 int result = 0;
1126 bool paused = false;
1127 size_t reconnections = 0;
1129 size_t send = 0;
1130 while ( send < this->send_)
1132 ACE_Time_Value tv(0, 100 * 1000);
1133 orb_->run(tv);
1135 if (this->pause_ != 0 && send == this->pause_)
1137 if (this->verbose_)
1139 ACE_DEBUG ((LM_DEBUG,
1140 ACE_TEXT ("(%P|%t) Supplier paused after %d events\n"),
1141 static_cast<int>(this->pause_)
1144 reconnections = this->reconnection_callback_.reconnect_count ();
1145 FILE * pause_file = ACE_OS::fopen ("Supplier.paused", "w");
1146 if (pause_file != 0)
1148 ACE_OS::fputs (ACE_TEXT ("paused\n"), pause_file);
1149 ACE_OS::fclose (pause_file);
1151 paused = true;
1152 this->pause_ = 0;
1154 if (paused)
1156 if (this->reconnection_callback_.reconnect_count () != reconnections)
1158 if (this->verbose_)
1160 ACE_DEBUG ((LM_DEBUG,
1161 ACE_TEXT ("(%P|%t) Supplier no longer paused. Next s# %d\n"),
1162 static_cast<int>(this->serial_number_)
1165 paused = false;
1169 if (!paused)
1171 switch (this->mode_)
1173 case MODE_STRUCTURED:
1175 send_structured_event ();
1176 break;
1178 case MODE_SEQUENCE:
1180 send_sequence_event ();
1181 break;
1183 case MODE_ANY:
1185 send_any_event ();
1186 break;
1189 this->serial_number_ += 1;
1190 send += 1;
1193 return result;
1198 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
1200 int result = -1;
1201 Supplier_Main app;
1204 result = app.init(argc, argv);
1206 if (result == 0)
1208 result = app.run ();
1210 if (result == 0)
1212 app.fini ();
1215 catch (const CORBA::Exception& ex)
1217 ex._tao_print_exception ("Error: Supplier::main\t\n");
1218 result = -1;
1220 ACE_DEBUG ((LM_DEBUG,
1221 ACE_TEXT ("(%P,%t) Supplier exits: code %d\n"),
1222 result
1224 return result;