Changes to attempt to silence bcc64x
[ACE_TAO.git] / TAO / orbsvcs / tests / Notify / Discarding / Notify_Sequence_Push_Consumer.cpp
blob71bdc4d9686a7f51341a49ae7e2170de7e9a3cfd
1 #include "ace/OS_NS_unistd.h"
2 #include "Notify_Sequence_Push_Consumer.h"
3 #include "Notify_Test_Client.h"
4 #include "orbsvcs/TimeBaseC.h"
5 #include "common.h"
6 #include "tao/debug.h"
8 // The NS will try to send in M size chunks, but may send smaller if
9 // the pacing interval expires. We set the pacing interval large
10 // enough so that *should* not happen.
11 // Note : This batch size must be coordinated with the events sent
12 // by the supplier. For example, when discard policy is set to
13 // PriorityOrder, then we want to have exactly BATCH_SIZE events
14 // with the highest priority so that we know we received the right ones.
15 // We also need to be sure to send a multiple of the batch size.
16 static const CORBA::Long BATCH_SIZE = 4;
18 static const TimeBase::TimeT PACING = 20 * 1000 * 10000;
20 // This needs to be big enough to ensure that the supplier has sent
21 // all the events.
22 static const int FIRST_SLEEP_SECS = 1;
24 Notify_Sequence_Push_Consumer::Notify_Sequence_Push_Consumer (
25 const char* name,
26 CORBA::Short policy,
27 Notify_Test_Client& client,
28 int sent)
29 : name_ (name)
30 , discard_policy_ (policy)
31 , count_ (0)
32 , client_ (client)
33 , sent_(sent)
34 , first_(0)
36 this->client_.consumer_start (this);
40 void
41 Notify_Sequence_Push_Consumer::_connect (
42 CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin)
44 CosNotifyComm::SequencePushConsumer_var consumer =
45 this->_this ();
47 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier =
48 consumer_admin->obtain_notification_push_supplier (
49 CosNotifyChannelAdmin::SEQUENCE_EVENT,
50 proxy_id_);
52 this->proxy_ =
53 CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow (
54 proxysupplier.in ());
56 CosNotification::QoSProperties properties (4);
57 properties.length (4);
58 CORBA::ULong idx = 0;
59 properties[idx].name = CORBA::string_dup (CosNotification::MaximumBatchSize);
60 properties[idx].value <<= BATCH_SIZE;
61 properties[++idx].name = CORBA::string_dup (CosNotification::PacingInterval);
62 properties[idx].value <<= PACING;
63 properties[++idx].name = CORBA::string_dup (CosNotification::DiscardPolicy);
64 properties[idx].value <<= this->discard_policy_;
65 properties[++idx].name = CORBA::string_dup (CosNotification::MaxEventsPerConsumer);
66 // We set this equal to the batch size so that we conveniently always receive
67 // one more batch after the first.
68 properties[idx].value <<= BATCH_SIZE;
70 this->proxy_->set_qos (properties);
71 this->proxy_->connect_sequence_push_consumer (consumer.in ());
73 // give ownership to POA
74 this->_remove_ref ();
78 void
79 Notify_Sequence_Push_Consumer::push_structured_events (
80 const CosNotification::EventBatch& events)
82 ++count_;
84 // The pacing interval is set large enough that it should never come into
85 // play unless something goes wrong.
86 if (events.length() != static_cast<size_t>(BATCH_SIZE))
88 ACE_ERROR((LM_ERROR,
89 "Error : Unexpected batch size %u/%u\n", events.length(),
90 BATCH_SIZE));
91 this->client_.consumer_done (this);
93 ACE_ASSERT(ACE_OS::strcmp(events[0].header.variable_header[0].name.in(), "Id") == 0);
95 CORBA::Long id1 = 0;
96 CORBA::Long id2 = 0;
97 events[0].header.variable_header[0].value >>= id1;
98 events[events.length() - 1].header.variable_header[0].value >>= id2;
100 ACE_DEBUG((LM_DEBUG, "{%d-%d}\n", id1, id2));
102 if (count_ == 1)
104 // We sleep long enough after the first batch to ensure that
105 // the supplier has time to send all the events. This will allow
106 // the notify service to discard all but one batch.
107 ACE_OS::sleep(FIRST_SLEEP_SECS);
109 first_ = id1;
112 // Validate the batch is ordered
113 for (CORBA::Long i = 1; i < BATCH_SIZE; ++i)
115 CORBA::Long id = 0;
116 events[i].header.variable_header[0].value >>= id;
117 if (id != id1 + i)
119 ACE_ERROR((LM_ERROR, "Error: Invalid batch. Expected %d, Was %d\n", id1 + i, id));
120 this->client_.consumer_done (this);
121 return;
125 CORBA::Long expected = 0;
127 if (discard_policy_ == CosNotification::PriorityOrder)
129 expected = sent_ - BATCH_SIZE + 1; // e.g. 37, 38, 39, 40
131 else if (discard_policy_ == CosNotification::FifoOrder)
133 expected = sent_ - BATCH_SIZE + 1; // e.g. 37, 38, 39, 40
135 else
137 ACE_ASSERT(discard_policy_ == CosNotification::LifoOrder);
138 expected = BATCH_SIZE + 1; // e.g. 5, 6, 7, 8
141 // On some slower platforms, the discard policy may be applied before the first
142 // batch is sent. In that case we may only get a single batch.
143 // On other platforms we may get two batches, but the first batch
144 // may or may not conform to the discard policy.
146 if (count_ == 1 && id1 != expected && discard_policy_ != CosNotification::LifoOrder)
148 // We expect to get another batch with the correct one.
150 else
152 this->client_.consumer_done (this);
155 if (count_ > 2)
157 ACE_ERROR((LM_ERROR, "Error : Too many batches received.\n"));