Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / Notify / Reconnecting / Supplier.cpp
blobc550f4178fefd17858714605f8c28504c5d82350
1 #include "Supplier.h"
2 #include "orbsvcs/CosNotifyChannelAdminC.h"
3 #include "tao/debug.h"
4 #include "tao/PortableServer/PortableServerC.h"
5 #include "tao/TimeBaseC.h"
6 #include "ace/OS_NS_stdio.h"
7 #include "ace/OS_NS_strings.h"
11 static const char NOTIFY_FACTORY_NAME[] = "NotifyEventChannelFactory";
12 static const char NAMING_SERVICE_NAME[] = "NameService";
14 ///////////////////////////
15 // StructuredPushSupplier_i
17 void
18 StructuredPushSupplier_i::subscription_change (
19 const CosNotification::EventTypeSeq & added,
20 const CosNotification::EventTypeSeq & removed)
22 ACE_UNUSED_ARG (added);
23 ACE_UNUSED_ARG (removed);
24 ACE_DEBUG ((LM_DEBUG,
25 ACE_TEXT ("(%P,%t) Supplier StructuredPushSupplier received subscription change\n")
26 ));
30 void
31 StructuredPushSupplier_i::disconnect_structured_push_supplier ()
33 ACE_DEBUG ((LM_DEBUG,
34 ACE_TEXT ("(%P,%t) Supplier StructuredPushSupplier received disconnect\n")
35 ));
38 ///////////////////////////
39 // SequencePushSupplier_i
41 void
42 SequencePushSupplier_i::subscription_change (
43 const CosNotification::EventTypeSeq & added,
44 const CosNotification::EventTypeSeq & removed)
46 ACE_UNUSED_ARG (added);
47 ACE_UNUSED_ARG (removed);
48 ACE_DEBUG ((LM_DEBUG,
49 ACE_TEXT ("(%P,%t) Supplier SequencePushSupplier received subscription change\n")
50 ));
53 void
54 SequencePushSupplier_i::disconnect_sequence_push_supplier ()
56 ACE_DEBUG ((LM_DEBUG,
57 ACE_TEXT ("(%P,%t) Supplier SequencePushSupplier received disconnect\n")
58 ));
61 ///////////////////////////
62 // AnyPushSupplier_i
64 void
65 AnyPushSupplier_i::subscription_change (
66 const CosNotification::EventTypeSeq & added,
67 const CosNotification::EventTypeSeq & removed)
69 ACE_UNUSED_ARG (added);
70 ACE_UNUSED_ARG (removed);
71 ACE_DEBUG ((LM_DEBUG,
72 ACE_TEXT ("(%P,%t) Supplier AnyPushSupplier received subscription change\n")
73 ));
77 void
78 AnyPushSupplier_i::disconnect_push_supplier ()
80 ACE_DEBUG ((LM_DEBUG,
81 ACE_TEXT ("(%P,%t) Suppleir AnyPushSupplier received disconnect\n")
82 ));
85 /////////////////////////
86 // ReconnectionCallback_i
88 ReconnectionCallback_i::ReconnectionCallback_i (Supplier_Main & supplier_main)
89 : supplier_main_ (supplier_main)
90 , id_is_valid_ (false)
91 , reconnect_count_ (0)
95 size_t
96 ReconnectionCallback_i::reconnect_count () const
98 return this->reconnect_count_;
101 void
102 ReconnectionCallback_i::reconnect (CORBA::Object_ptr reconnection)
104 ACE_DEBUG ((LM_DEBUG,
105 ACE_TEXT ("(%P|%t) Supplier received reconnection request\n")
107 this->ecf_ = CosNotifyChannelAdmin::EventChannelFactory::_narrow (reconnection);
108 if (!CORBA::is_nil (this->ecf_.in ()))
110 this->supplier_main_.reconnect (this->ecf_.in ());
111 this->reconnect_count_ += 1;
113 else
115 ACE_DEBUG ((LM_DEBUG,
116 ACE_TEXT ("(%P|%t) Supplier reconnection request failed: wrong object type\n")
121 CORBA::Boolean
122 ReconnectionCallback_i::is_alive (void)
124 return true;
127 ReconnectionCallback_i::~ReconnectionCallback_i ()
129 // normally you would disconnect from registry here, but
130 // to simulate a failure, we exit without cleaning up
131 // if the fini method is not called.
135 void
136 ReconnectionCallback_i::fini (void)
138 if (this->id_is_valid_)
140 NotifyExt::ReconnectionRegistry_var registry =
141 NotifyExt::ReconnectionRegistry::_narrow (this->ecf_.in ());
143 registry->unregister_callback (this->callback_id_);
144 this->id_is_valid_ = false;
148 void
149 ReconnectionCallback_i::init (
150 PortableServer::POA_ptr poa,
151 CosNotifyChannelAdmin::EventChannelFactory_ptr ecf)
153 this->ecf_ = CosNotifyChannelAdmin::EventChannelFactory::_duplicate (ecf);
154 PortableServer::ObjectId_var reconnection_callback_id =
155 poa->activate_object (this);
157 CORBA::Object_var obj =
158 poa->id_to_reference (reconnection_callback_id.in ());
160 NotifyExt::ReconnectionCallback_var callback =
161 NotifyExt::ReconnectionCallback::_narrow (obj.in ());
163 NotifyExt::ReconnectionRegistry_var registry =
164 NotifyExt::ReconnectionRegistry::_narrow (ecf);
166 this->callback_id_ = registry->register_callback (callback.in ());
167 this->id_is_valid_ = true;
172 /////////////////
173 // Supplier_Main
175 Supplier_Main::Supplier_Main ()
176 : verbose_ (false)
177 , mode_ (MODE_ANY)
178 , send_ (10)
179 , use_naming_service_ (true)
180 , serial_number_ (0)
181 , disconnect_on_exit_ (false)
182 , id_file_ (ACE_TEXT ("supplier.ids"))
183 , pause_ (0)
184 , ec_id_ (0)
185 , sa_id_(0)
186 , structured_proxy_id_(0)
187 , sequence_proxy_id_(0)
188 , any_proxy_id_(0)
189 , reconnection_callback_ (*this)
190 , reconnecting_ (false)
194 Supplier_Main::~Supplier_Main ()
199 Supplier_Main::parse_args (int argc, ACE_TCHAR *argv[])
201 int result = 0;
202 int narg = 1;
203 bool corba_arg = false;
204 while (narg < argc && result == 0)
206 int consumed = parse_single_arg (argc - narg, &argv[narg]);
207 if ( consumed > 0)
209 narg += consumed;
210 corba_arg = false;
212 else if (ACE_OS::strncmp (argv[narg], ACE_TEXT("-ORB"), 4) == 0)
214 corba_arg = true;
216 else if (corba_arg)
218 // previous argument was a ORB arg.
219 // current argument is unrecognized
220 // assume the ORB eats this arg
221 narg += 1;
222 corba_arg = false;
224 else
226 ACE_OS::fprintf(stderr, "Unrecognized argument: %s\n",
227 ACE_TEXT_ALWAYS_CHAR (argv[narg]));
228 usage (stderr);
229 result = -1;
232 return result;
236 Supplier_Main::parse_single_arg (int argc, ACE_TCHAR *argv[])
238 int consumed = 0;
239 if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-v")) == 0)
241 this->verbose_ = true;
242 consumed = 1;
244 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-any")) == 0)
246 this->mode_ = MODE_ANY;
247 consumed = 1;
249 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-structured")) == 0)
251 this->mode_ = MODE_STRUCTURED;
252 consumed = 1;
254 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-sequence")) == 0)
256 this->mode_ = MODE_SEQUENCE;
257 consumed = 1;
259 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-channel")) == 0)
261 this->channel_file_= argv[1];
262 consumed = 2;
264 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-send")) == 0 && argc > 1)
266 this->send_ = ACE_OS::atoi (argv[1]);
267 consumed = 2;
269 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-pause")) == 0 && argc > 1)
271 this->pause_ = ACE_OS::atoi (argv[1]);
272 consumed = 2;
274 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-serial_number")) == 0)
276 this->serial_number_= ACE_OS::atoi (argv[1]);
277 consumed = 2;
279 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT ("-nonamesvc")) == 0)
281 this->use_naming_service_ = false;
282 consumed = 1;
284 else if (ACE_OS::strcasecmp (argv[0], ACE_TEXT("-disconnect")) == 0)
286 this->disconnect_on_exit_ = true;
287 consumed = 1;
290 return consumed;
293 void Supplier_Main::usage(FILE * log)const
295 //FUZZ: disable check_for_lack_ACE_OS
296 ACE_OS::fputs (
297 ACE_TEXT ("usage\n")
298 ACE_TEXT (" -channel filename Where to find a channel number.\n")
299 ACE_TEXT (" -any or -structured or -sequence\n")
300 ACE_TEXT (" What type of event to send (pick one, default is -any)\n")
301 ACE_TEXT (" -send n How many events of each type to send.\n")
302 ACE_TEXT (" -pause n Pause after sending n events. Write to file \"Supplier.paused\"\n")
303 ACE_TEXT (" -serial_number n What serial number to start with.\n")
304 ACE_TEXT (" -v Verbose output.\n")
305 ACE_TEXT (" -disconnect Disconnect from channel on exit (prevents reconnect.)\n")
306 ACE_TEXT (" -nonamesvc Don't use the name service to find EventChannelFactory\n")
307 , log);
308 //FUZZ: enable check_for_lack_ACE_OS
311 int Supplier_Main::init (int argc, ACE_TCHAR *argv[])
313 this->orb_ = CORBA::ORB_init (argc, argv);
315 if (0 != this->parse_args (argc, argv))
317 return -1;
320 CORBA::Object_ptr poa_object =
321 this->orb_->resolve_initial_references("RootPOA");
323 if (CORBA::is_nil (poa_object))
325 ACE_ERROR ((LM_ERROR,
326 ACE_TEXT (" (%P|%t) Unable to initialize the POA.\n")));
327 return -1;
330 this->root_poa_ =
331 PortableServer::POA::_narrow (poa_object);
333 PortableServer::POAManager_var poa_manager =
334 root_poa_->the_POAManager ();
336 poa_manager->activate ();
338 if (this->use_naming_service_ )
340 this->find_notify_factory ();
342 else
344 int ok = resolve_notify_factory ();
345 if (!ok)
347 return -1;
351 this->reconnecting_ = load_ids ();
353 init_event_channel ();
355 init_supplier_admin ();
357 switch (this->mode_)
359 case MODE_STRUCTURED:
361 init_structured_proxy_consumer ();
362 break;
364 case MODE_SEQUENCE:
366 init_sequence_proxy_consumer ();
367 break;
369 case MODE_ANY:
372 init_any_proxy_consumer ();
373 break;
375 default:
377 ACE_ERROR ((LM_ERROR,
378 ACE_TEXT ("(%P|%t) Supplier: Unknown event push mode.\n")
380 break;
383 this->reconnection_callback_.init (
384 this->root_poa_.in (),
385 this->ecf_.in ());
387 save_ids ();
388 return 0;
390 void
391 Supplier_Main::save_ids()
393 FILE *idf =
394 ACE_OS::fopen (this->id_file_.c_str (), "w");
396 if (idf != 0)
398 int endflag = 12345;
399 int imode = static_cast<int>(this->mode_);
400 ACE_OS::fprintf (idf,
401 "%d,%d,%d,%d,%d,%d,%d,\n",
402 static_cast<int> (imode),
403 static_cast<int> (ec_id_),
404 static_cast<int> (sa_id_),
405 static_cast<int> (structured_proxy_id_),
406 static_cast<int> (sequence_proxy_id_),
407 static_cast<int> (any_proxy_id_),
408 static_cast<int> (endflag) );
409 ACE_OS::fclose (idf);
413 bool
414 Supplier_Main::load_ids()
416 bool ok = false;
417 FILE *idf =
418 ACE_OS::fopen (this->id_file_.c_str (), "r");
420 if (idf != 0)
422 int field = 0;
424 char buffer[100] = ""; // because ACE fgets doesn't put a null if the file is empty
425 ACE_OS::fgets (buffer, sizeof(buffer), idf);
426 ACE_OS::fclose (idf);
427 char * pb = buffer;
428 while (!ok && *pb != 0)
430 char * eb = ACE_OS::strchr (pb, ',');
431 char * nb = eb + 1;
432 if (eb == 0)
434 eb = pb + ACE_OS::strlen (pb);
435 nb = eb;
437 *eb = 0;
438 if (pb < eb)
440 int value = ACE_OS::atoi(pb);
441 switch (++field)
443 case 1:
444 this->mode_ = static_cast<Mode_T> (value);
445 break;
446 case 2:
447 this->ec_id_ = value;
448 break;
449 case 3:
450 this->sa_id_ = value;
451 break;
452 case 4:
453 this->structured_proxy_id_ = value;
454 break;
455 case 5:
456 this->sequence_proxy_id_ = value;
457 break;
458 case 6:
459 this->any_proxy_id_ = value;
460 break;
461 case 7:
462 ok = value == 12345;
463 break;
464 default:
465 ACE_OS::fprintf (stderr, ACE_TEXT ("Supplier: Warning: too many fields in saved id file.\n"));
466 ok = false;
467 break;
470 pb = nb;
473 return ok;
476 void
477 Supplier_Main::reconnect (
478 CosNotifyChannelAdmin::EventChannelFactory_ptr dest_factory)
480 this->ecf_ = CosNotifyChannelAdmin::EventChannelFactory::_duplicate (dest_factory);
481 this->reconnecting_ = true;
482 init_event_channel ();
484 init_supplier_admin ();
486 switch (this->mode_)
488 case MODE_STRUCTURED:
490 init_structured_proxy_consumer ();
491 break;
493 case MODE_SEQUENCE:
495 init_sequence_proxy_consumer ();
496 break;
498 case MODE_ANY:
500 init_any_proxy_consumer ();
501 break;
508 Supplier_Main::resolve_naming_service (void)
510 // ignore redundant calls
511 if (CORBA::is_nil (this->naming_context_.in ()))
513 CORBA::Object_var naming_obj =
514 this->orb_->resolve_initial_references (NAMING_SERVICE_NAME);
516 this->naming_context_ =
517 CosNaming::NamingContext::_narrow (naming_obj.in ());
520 return !CORBA::is_nil (this->naming_context_.in ());
524 Supplier_Main::find_notify_factory (void)
526 int status = this->resolve_naming_service ();
527 if (status)
529 CosNaming::Name name (1);
530 name.length (1);
531 name[0].id = CORBA::string_dup (NOTIFY_FACTORY_NAME);
533 CORBA::Object_var obj =
534 this->naming_context_->resolve (name);
536 this->ecf_ =
537 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in ());
539 return ! CORBA::is_nil (this->ecf_.in ());
543 Supplier_Main::resolve_notify_factory (void)
545 CORBA::Object_var factory_obj =
546 this->orb_->resolve_initial_references (NOTIFY_FACTORY_NAME);
548 this->ecf_ =
549 CosNotifyChannelAdmin::EventChannelFactory::_narrow (
550 factory_obj.in ());
551 return ! CORBA::is_nil (this->ecf_.in ());
554 void
555 Supplier_Main::init_event_channel (void)
557 bool ok = false;
558 if (this->reconnecting_)
562 this->ec_ = this->ecf_->get_event_channel (
563 this->ec_id_);
564 ok = ! CORBA::is_nil (this->ec_.in ());
565 if (ok && this->verbose_)
567 ACE_DEBUG ((LM_DEBUG,
568 ACE_TEXT ("(%P|%t) Supplier: Reconnect to event channel %d\n"),
569 static_cast<int>(this->ec_id_)
573 catch (...)
578 // if we don't have a channel yet, and a channel id file was specified
579 // try to read from it
580 if (!ok && this->channel_file_.length () > 0)
582 FILE * chf = ACE_OS::fopen (this->channel_file_.c_str (), "r");
583 if (chf != 0)
585 char buffer[100];
586 ACE_OS::fgets (buffer, sizeof(buffer), chf);
587 ACE_OS::fclose (chf);
588 this->ec_id_ = ACE_OS::atoi (buffer);
592 this->ec_ = this->ecf_->get_event_channel (
593 this->ec_id_);
594 ok = ! CORBA::is_nil (this->ec_.in ());
595 if (ok)
597 if (this->verbose_)
599 ACE_DEBUG ((LM_DEBUG,
600 ACE_TEXT ("(%P|%t) Supplier: Connect to Existing event channel %d\n"),
601 static_cast<int>(this->ec_id_)
604 // kill the channel filename so we don't overwrite the file
605 this->channel_file_ = ACE_TEXT("");
608 catch (...)
614 if (!ok)
616 CosNotification::QoSProperties qosprops (7);
617 qosprops.length (7);
618 CORBA::ULong i = 0;
619 qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability);
620 qosprops[i++].value <<= CosNotification::Persistent;
621 qosprops[i].name = CORBA::string_dup(CosNotification::ConnectionReliability);
622 qosprops[i++].value <<= CosNotification::Persistent; // Required, or we won't persist much
623 qosprops[i].name = CORBA::string_dup(CosNotification::Priority);
624 qosprops[i++].value <<= CosNotification::HighestPriority;
625 qosprops[i].name = CORBA::string_dup(CosNotification::Timeout);
626 qosprops[i++].value <<= (TimeBase::TimeT) 42 * 1000000; // 4.2s
627 qosprops[i].name = CORBA::string_dup(CosNotification::StopTimeSupported);
628 qosprops[i++].value <<= CORBA::Any::from_boolean(1);
629 qosprops[i].name = CORBA::string_dup(CosNotification::MaximumBatchSize);
630 qosprops[i++].value <<= (CORBA::Long) 2;
631 qosprops[i].name = CORBA::string_dup(CosNotification::PacingInterval);
632 qosprops[i++].value <<= (TimeBase::TimeT) 50 * 10000; // 50ms
634 CosNotification::AdminProperties adminprops(4);
635 adminprops.length (4);
636 i = 0;
637 adminprops[i].name = CORBA::string_dup(CosNotification::MaxQueueLength);
638 adminprops[i++].value <<= (CORBA::Long) 1234;
639 adminprops[i].name = CORBA::string_dup(CosNotification::MaxConsumers);
640 adminprops[i++].value <<= (CORBA::Long) 1000;
641 adminprops[i].name = CORBA::string_dup(CosNotification::MaxSuppliers);
642 adminprops[i++].value <<= (CORBA::Long) 1000;
643 adminprops[i].name = CORBA::string_dup(CosNotification::RejectNewEvents);
644 adminprops[i++].value <<= CORBA::Any::from_boolean(1);
646 ec_ = this->ecf_->create_channel (
647 qosprops,
648 adminprops,
649 this->ec_id_);
650 ok = ! CORBA::is_nil (ec_.in ());
651 if (ok && this->verbose_)
653 ACE_DEBUG ((LM_DEBUG,
654 ACE_TEXT ("(%P|%t) Supplier: Create event channel %d\n"),
655 static_cast<int> (this->ec_id_)
660 // save channel id
661 if (ok && this->channel_file_.length() > 0)
663 FILE * chf = ACE_OS::fopen (this->channel_file_.c_str (), "w");
664 if (chf != 0)
666 ACE_OS::fprintf (chf, "%d\n", static_cast<int> (this->ec_id_));
667 ACE_OS::fclose (chf);
672 CosNotifyChannelAdmin::AdminID default_admin_id = static_cast<CosNotifyChannelAdmin::AdminID>(-1);
674 void
675 Supplier_Main::init_supplier_admin (void)
677 bool ok = false;
678 if (this->reconnecting_ && this->sa_id_ != default_admin_id)
682 this->sa_ = this->ec_->get_supplieradmin(
683 this->sa_id_);
684 ok = ! CORBA::is_nil (this->sa_.in ());
685 if (ok && this->verbose_)
687 ACE_DEBUG ((LM_DEBUG,
688 ACE_TEXT ("(%P|%t) Supplier: Reconnect to supplier admin %d\n"),
689 static_cast<int>(this->sa_id_)
693 catch (...)
698 if (!ok)
702 this->sa_ = this->ec_->default_supplier_admin ();
703 ok = ! CORBA::is_nil (this->sa_.in ());
704 this->sa_id_ = default_admin_id;
705 if (ok && this->verbose_)
707 ACE_DEBUG ((LM_DEBUG,
708 ACE_TEXT ("(%P|%t) Supplier: Using default supplier admin\n")
711 else if (this->verbose_)
713 ACE_DEBUG ((LM_DEBUG,
714 ACE_TEXT ("(%P|%t) Supplier: No default supplier admin\n")
718 catch (...)
723 if (!ok)
725 this->sa_ = this->ec_->new_for_suppliers(
726 CosNotifyChannelAdmin::OR_OP,
727 this->sa_id_);
728 ok = ! CORBA::is_nil (this->sa_.in ());
729 if (ok && this->verbose_)
731 ACE_DEBUG ((LM_DEBUG,
732 ACE_TEXT ("(%P|%t) Supplier: Create new supplier admin %d\n"),
733 static_cast<int>(this->sa_id_)
739 void
740 Supplier_Main::init_structured_proxy_consumer (void)
742 bool ok = false;
743 CosNotifyChannelAdmin::ProxyConsumer_var proxy;
744 if (this->reconnecting_)
748 proxy = this->sa_->get_proxy_consumer (
749 this->structured_proxy_id_
751 ok = ! CORBA::is_nil (proxy.in ());
752 if (ok && this->verbose_)
754 ACE_DEBUG ((LM_DEBUG,
755 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy supplier %d\n"),
756 static_cast<int>(this->structured_proxy_id_)
760 catch (...)
765 if (!ok)
767 proxy = this->sa_->obtain_notification_push_consumer(
768 CosNotifyChannelAdmin::STRUCTURED_EVENT,
769 this->structured_proxy_id_);
770 ok = ! CORBA::is_nil (proxy.in ());
771 if (ok && this->verbose_)
773 ACE_DEBUG ((LM_DEBUG,
774 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
775 static_cast<int>(this->structured_proxy_id_)
779 this->structured_proxy_push_consumer_ =
780 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(proxy.in ());
782 if (CORBA::is_nil (this->structured_proxy_push_consumer_.in ()))
784 ACE_ERROR ((LM_ERROR,
785 ACE_TEXT ("(%P|%t) init_structured_proxy_consumer received nil ProxyConsumer\n")
787 throw CORBA::OBJECT_NOT_EXIST ();
789 if (CORBA::is_nil (this->structured_push_supplier_ref_.in ()))
791 PortableServer::ObjectId_var push_supplier_id =
792 this->root_poa_->activate_object (
793 &(this->structured_push_supplier_));
795 CORBA::Object_var obj =
796 this->root_poa_->id_to_reference (push_supplier_id.in ());
798 this->structured_push_supplier_ref_ =
799 CosNotifyComm::StructuredPushSupplier::_narrow (obj.in ());
801 if (CORBA::is_nil (structured_push_supplier_ref_.in ()))
803 ACE_ERROR ((LM_ERROR,
804 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
805 static_cast<int>(this->structured_proxy_id_)
808 throw CORBA::UNKNOWN();
811 this->structured_proxy_push_consumer_->connect_structured_push_supplier (
812 structured_push_supplier_ref_.in ());
815 void
816 Supplier_Main::init_sequence_proxy_consumer (void)
818 bool ok = false;
819 CosNotifyChannelAdmin::ProxyConsumer_var proxy;
820 if (this->reconnecting_)
824 proxy = this->sa_->get_proxy_consumer(
825 this->sequence_proxy_id_);
826 ok = ! CORBA::is_nil (proxy.in ());
827 if (ok && this->verbose_)
829 ACE_DEBUG ((LM_DEBUG,
830 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy %d\n"),
831 static_cast<int>(this->sequence_proxy_id_)
835 catch (...)
840 if (!ok)
842 proxy = this->sa_->obtain_notification_push_consumer(
843 CosNotifyChannelAdmin::SEQUENCE_EVENT,
844 this->sequence_proxy_id_);
845 ok = ! CORBA::is_nil (proxy.in ());
846 if (ok && this->verbose_)
848 ACE_DEBUG ((LM_DEBUG,
849 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
850 static_cast<int>(this->sequence_proxy_id_)
854 this->sequence_proxy_push_consumer_ =
855 CosNotifyChannelAdmin::SequenceProxyPushConsumer::_narrow(proxy.in ());
857 if (CORBA::is_nil (this->sequence_proxy_push_consumer_.in ()))
859 ACE_ERROR ((LM_ERROR,
860 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
861 static_cast<int>(this->sequence_proxy_id_)
863 throw CORBA::UNKNOWN();
866 if (CORBA::is_nil (this->sequence_push_supplier_ref_.in ()))
868 PortableServer::ObjectId_var push_supplier_id =
869 this->root_poa_->activate_object (
870 &(this->sequence_push_supplier_));
872 CORBA::Object_var obj =
873 this->root_poa_->id_to_reference (push_supplier_id.in ());
875 this->sequence_push_supplier_ref_ =
876 CosNotifyComm::SequencePushSupplier::_narrow (obj.in ());
878 if (CORBA::is_nil (sequence_push_supplier_ref_.in ()))
880 ACE_ERROR ((LM_ERROR,
881 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
882 static_cast<int>(this->sequence_proxy_id_)
884 throw CORBA::UNKNOWN();
887 this->sequence_proxy_push_consumer_->connect_sequence_push_supplier (
888 sequence_push_supplier_ref_.in ());
891 void
892 Supplier_Main::init_any_proxy_consumer (void)
894 bool ok = false;
895 CosNotifyChannelAdmin::ProxyConsumer_var proxy;
896 if (this->reconnecting_)
900 proxy = this->sa_->get_proxy_consumer(
901 this->any_proxy_id_);
902 ok = ! CORBA::is_nil (proxy.in ());
903 if (ok && this->verbose_)
905 ACE_DEBUG ((LM_DEBUG,
906 ACE_TEXT ("(%P|%t) Supplier: Reconnect to proxy %d\n"),
907 static_cast<int>(this->any_proxy_id_)
911 catch (...)
916 if (!ok)
918 proxy = this->sa_->obtain_notification_push_consumer(
919 CosNotifyChannelAdmin::ANY_EVENT,
920 this->any_proxy_id_);
921 ok = ! CORBA::is_nil (proxy.in ());
922 if (ok && this->verbose_)
924 ACE_DEBUG ((LM_DEBUG,
925 ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
926 static_cast<int>(this->any_proxy_id_)
930 this->any_proxy_push_consumer_ =
931 CosNotifyChannelAdmin::ProxyPushConsumer::_narrow(proxy.in ());
933 if (CORBA::is_nil (this->any_proxy_push_consumer_.in ()))
935 ACE_ERROR ((LM_ERROR,
936 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
937 static_cast<int>(this->any_proxy_id_)
939 throw CORBA::UNKNOWN();
942 if (CORBA::is_nil (this->any_push_supplier_ref_.in ()))
944 PortableServer::ObjectId_var push_supplier_id =
945 this->root_poa_->activate_object (
946 &(this->any_push_supplier_));
948 CORBA::Object_var obj =
949 this->root_poa_->id_to_reference (push_supplier_id.in ());
951 this->any_push_supplier_ref_ =
952 CosNotifyComm::PushSupplier::_narrow (obj.in ());
954 if (CORBA::is_nil (any_push_supplier_ref_.in ()))
956 ACE_ERROR ((LM_ERROR,
957 ACE_TEXT ("(%P|%t) Supplier: Received wrong type of push consumer proxy %d\n"),
958 static_cast<int>(this->sequence_proxy_id_)
960 throw CORBA::UNKNOWN();
963 this->any_proxy_push_consumer_->connect_any_push_supplier (
964 any_push_supplier_ref_.in ());
967 int Supplier_Main::fini (void)
969 if (this->disconnect_on_exit_)
971 this->reconnection_callback_.fini ();
972 if (!CORBA::is_nil (this->structured_proxy_push_consumer_.in ()))
974 if (this->verbose_)
976 ACE_DEBUG ((LM_DEBUG,
977 ACE_TEXT ("(%P|%t) Disconnecting structured\n")
980 this->structured_proxy_push_consumer_->disconnect_structured_push_consumer ();
982 if (!CORBA::is_nil (this->sequence_proxy_push_consumer_.in ()))
984 if (this->verbose_)
986 ACE_DEBUG ((LM_DEBUG,
987 ACE_TEXT ("(%P|%t) Disconnecting sequence\n")
990 this->sequence_proxy_push_consumer_->disconnect_sequence_push_consumer ();
992 if (!CORBA::is_nil (this->any_proxy_push_consumer_.in ()))
994 if (this->verbose_)
996 ACE_DEBUG ((LM_DEBUG,
997 ACE_TEXT ("(%P|%t) Disconnecting any\n")
1000 this->any_proxy_push_consumer_->disconnect_push_consumer ();
1002 if (!CORBA::is_nil (this->sa_.in ()) && this->sa_id_ != default_admin_id)
1004 if (this->verbose_)
1006 ACE_DEBUG ((LM_DEBUG,
1007 ACE_TEXT ("(%P|%t) destroy admin %d\n"),
1008 static_cast<int>(this->sa_id_)
1011 this->sa_->destroy();
1014 this->orb_->shutdown ();
1015 return 0;
1018 void Supplier_Main::send_structured_event (void)
1020 CosNotification::StructuredEvent event;
1022 // EventHeader.
1024 // FixedEventHeader.
1025 // EventType.
1026 // string.
1027 event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
1028 // string
1029 event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
1030 // string
1031 event.header.fixed_header.event_name = CORBA::string_dup("reconnect_test");
1033 // OptionalHeaderFields.
1034 // PropertySeq.
1035 // sequence<Property>: string name, any value
1036 CosNotification::PropertySeq& qos = event.header.variable_header;
1037 qos.length (2);
1038 qos[0].name = CORBA::string_dup (CosNotification::Priority);
1039 qos[0].value <<= CosNotification::LowestPriority;
1040 qos[1].name = CORBA::string_dup (CosNotification::EventReliability);
1041 qos[1].value <<= CosNotification::Persistent;
1043 // FilterableEventBody
1044 // PropertySeq
1045 // sequence<Property>: string name, any value
1046 event.filterable_data.length (1);
1047 event.filterable_data[0].name = CORBA::string_dup("serial_number");
1048 event.filterable_data[0].value <<= CORBA::ULong ( this->serial_number_);
1050 // any
1051 event.remainder_of_body <<= CORBA::ULong ( this->serial_number_);
1053 if (this->verbose_)
1055 ACE_DEBUG ((LM_DEBUG,
1056 ACE_TEXT ("(%P,%t) Supplier push structured event %d\n"),
1057 static_cast<int>(serial_number_)
1061 this->structured_proxy_push_consumer_->push_structured_event (event);
1064 void Supplier_Main::send_sequence_event (void)
1066 CosNotification::EventBatch event_batch(1);
1067 event_batch.length (1);
1068 CosNotification::StructuredEvent & event = event_batch[0];
1070 // EventHeader.
1072 // FixedEventHeader.
1073 // EventType.
1074 // string.
1075 event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
1076 // string
1077 event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
1078 // string
1079 event.header.fixed_header.event_name = CORBA::string_dup("reconnect_test");
1081 // OptionalHeaderFields.
1082 // PropertySeq.
1083 // sequence<Property>: string name, any value
1084 CosNotification::PropertySeq& qos = event.header.variable_header;
1085 qos.length (2);
1086 qos[0].name = CORBA::string_dup (CosNotification::Priority);
1087 qos[0].value <<= CosNotification::LowestPriority;
1088 qos[1].name = CORBA::string_dup (CosNotification::EventReliability);
1089 qos[1].value <<= CosNotification::Persistent;
1091 // FilterableEventBody
1092 // PropertySeq
1093 // sequence<Property>: string name, any value
1094 event.filterable_data.length (1);
1095 event.filterable_data[0].name = CORBA::string_dup("serial_number");
1096 event.filterable_data[0].value <<= CORBA::ULong ( this->serial_number_);
1098 // any
1099 event.remainder_of_body <<= CORBA::ULong ( this->serial_number_);
1101 if (this->verbose_)
1103 ACE_DEBUG ((LM_DEBUG,
1104 ACE_TEXT ("(%P,%t) Supplier push sequence events %d\n"),
1105 static_cast<int>(this->serial_number_)
1109 this->sequence_proxy_push_consumer_->push_structured_events (event_batch);
1112 void Supplier_Main::send_any_event (void)
1114 CORBA::Any event;
1115 event <<= CORBA::ULong (this->serial_number_);
1117 if (this->verbose_)
1119 ACE_DEBUG ((LM_DEBUG,
1120 ACE_TEXT ("(%P,%t) Supplier push any event %d\n"),
1121 static_cast<int>(this->serial_number_)
1125 this->any_proxy_push_consumer_->push (event);
1128 int Supplier_Main::run (void)
1130 int result = 0;
1131 bool paused = false;
1132 size_t reconnections = 0;
1134 size_t send = 0;
1135 while ( send < this->send_)
1137 ACE_Time_Value tv(0, 100 * 1000);
1138 orb_->run(tv);
1140 if (this->pause_ != 0 && send == this->pause_)
1142 if (this->verbose_)
1144 ACE_DEBUG ((LM_DEBUG,
1145 ACE_TEXT ("(%P|%t) Supplier paused after %d events\n"),
1146 static_cast<int>(this->pause_)
1149 reconnections = this->reconnection_callback_.reconnect_count ();
1150 FILE * pause_file = ACE_OS::fopen ("Supplier.paused", "w");
1151 if (pause_file != 0)
1153 ACE_OS::fputs (ACE_TEXT ("paused\n"), pause_file);
1154 ACE_OS::fclose (pause_file);
1156 paused = true;
1157 this->pause_ = 0;
1159 if (paused)
1161 if (this->reconnection_callback_.reconnect_count () != reconnections)
1163 if (this->verbose_)
1165 ACE_DEBUG ((LM_DEBUG,
1166 ACE_TEXT ("(%P|%t) Supplier no longer paused. Next s# %d\n"),
1167 static_cast<int>(this->serial_number_)
1170 paused = false;
1174 if (!paused)
1176 switch (this->mode_)
1178 case MODE_STRUCTURED:
1180 send_structured_event ();
1181 break;
1183 case MODE_SEQUENCE:
1185 send_sequence_event ();
1186 break;
1188 case MODE_ANY:
1190 send_any_event ();
1191 break;
1194 this->serial_number_ += 1;
1195 send += 1;
1198 return result;
1203 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
1205 int result = -1;
1206 Supplier_Main app;
1209 result = app.init(argc, argv);
1211 if (result == 0)
1213 result = app.run ();
1215 if (result == 0)
1217 app.fini ();
1220 catch (const CORBA::Exception& ex)
1222 ex._tao_print_exception ("Error: Supplier::main\t\n");
1223 result = -1;
1225 ACE_DEBUG ((LM_DEBUG,
1226 ACE_TEXT ("(%P,%t) Supplier exits: code %d\n"),
1227 result
1229 return result;