Changes to attempt to silence bcc64x
[ACE_TAO.git] / TAO / orbsvcs / tests / Notify / Discarding / Notify_Structured_Push_Consumer.cpp
blobec3c8a108986ddbe777b6b6753d4a82489174378
1 #include "ace/OS_NS_unistd.h"
2 #include "Notify_Structured_Push_Consumer.h"
3 #include "Notify_Test_Client.h"
4 #include "common.h"
5 #include "tao/debug.h"
7 Notify_Structured_Push_Consumer::Notify_Structured_Push_Consumer (
8 const char* name,
9 CORBA::Short policy,
10 CORBA::Long max_events_per_consumer,
11 Notify_Test_Client& client)
12 : name_ (name),
13 discard_policy_ (policy),
14 max_events_per_consumer_ (max_events_per_consumer),
15 count_ (0),
16 first_ (0),
17 client_ (client),
18 sent_(40)
20 this->client_.consumer_start (this);
24 void
25 Notify_Structured_Push_Consumer::_connect (
26 CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin)
28 CosNotifyComm::StructuredPushConsumer_var objref =
29 this->_this ();
31 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier =
32 consumer_admin->obtain_notification_push_supplier (
33 CosNotifyChannelAdmin::STRUCTURED_EVENT,
34 proxy_id_);
36 this->proxy_ =
37 CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (
38 proxysupplier.in ());
40 CosNotification::QoSProperties properties (2);
41 properties.length (2);
42 properties[0].name = CORBA::string_dup (CosNotification::DiscardPolicy);
43 properties[0].value <<= this->discard_policy_;
44 properties[1].name = CORBA::string_dup (CosNotification::MaxEventsPerConsumer);
45 properties[1].value <<= this->max_events_per_consumer_;
47 this->proxy_->set_qos (properties);
48 this->proxy_->connect_structured_push_consumer (objref.in ());
50 // give ownership to POA
51 this->_remove_ref ();
55 void
56 Notify_Structured_Push_Consumer::push_structured_event (
57 const CosNotification::StructuredEvent& event)
59 ACE_DEBUG((LM_DEBUG, "-"));
61 this->count_++;
62 if (this->count_ > max_events_per_consumer_ + 1)
64 this->client_.consumer_done (this);
65 ACE_ERROR ((LM_ERROR,
66 ACE_TEXT ("Structured Consumer (%P|%t): ERROR: too ")
67 ACE_TEXT ("many events received (%d).\n"), this->count_));
70 ACE_ASSERT(ACE_OS::strcmp(event.header.variable_header[0].name.in(), "Id") == 0);
71 CORBA::Any v = event.header.variable_header[0].value;
72 CORBA::Long id = 0;
73 v >>= id;
75 // Force the notify service to queue events
76 if (this->count_ == 1)
78 ACE_OS::sleep(2);
79 first_ = id;
82 // @@ The priority header isn't making it through the notify service.
83 //n = event.header.variable_header[1].name;
84 //if (ACE_OS::strcmp(n.in(), CosNotification::Priority) != 0)
85 //{
86 // ACE_ERROR((LM_ERROR, "Error: Couldn't find Priority header in event.\n"));
87 // break;
88 //}
89 //v = event.header.variable_header[1].value;
90 //CORBA::Long priority = 0;
91 //v >>= priority;
92 CORBA::Long expected = 0;
93 if (discard_policy_ == CosNotification::PriorityOrder)
95 expected = sent_ - max_events_per_consumer_ + count_;
96 if (first_ != sent_ - max_events_per_consumer_ + 1)
97 --expected;
99 else if (discard_policy_ == CosNotification::FifoOrder)
101 expected = sent_ - max_events_per_consumer_ + count_;
102 if (first_ != sent_ - max_events_per_consumer_ + 1)
103 --expected;
105 else if (discard_policy_ == CosNotification::LifoOrder)
107 expected = count_;
109 else
111 ACE_ERROR((LM_ERROR, "Error: Unexpected discard policy.\n"));
112 return;
115 if (id != expected && count_ != 1)
117 ACE_DEBUG((LM_DEBUG, "Error: Expected %d, ", expected));
118 this->client_.consumer_done (this);
121 ACE_DEBUG((LM_DEBUG, "received %d\n", id));
123 // We should receive mepc + 1, because the first event will be in-transit
124 // before our sleep causes the notify to queue events.
125 // However, on some platforms, we'll only receive mepc, because filtering
126 // happened before the first event.
127 if (this->count_ >= this->max_events_per_consumer_)
129 this->client_.consumer_done (this);