1 #include "ace/OS_NS_unistd.h"
2 #include "Notify_Sequence_Push_Consumer.h"
3 #include "Notify_Test_Client.h"
4 #include "orbsvcs/TimeBaseC.h"
8 // The NS will try to send in M size chunks, but may send smaller if
9 // the pacing interval expires. We set the pacing interval large
10 // enough so that *should* not happen.
11 // Note : This batch size must be coordinated with the events sent
12 // by the supplier. For example, when discard policy is set to
13 // PriorityOrder, then we want to have exactly BATCH_SIZE events
14 // with the highest priority so that we know we received the right ones.
15 // We also need to be sure to send a multiple of the batch size.
16 static const CORBA::Long BATCH_SIZE
= 4;
18 static const TimeBase::TimeT PACING
= 20 * 1000 * 10000;
20 // This needs to be big enough to ensure that the supplier has sent
22 static const int FIRST_SLEEP_SECS
= 1;
24 Notify_Sequence_Push_Consumer::Notify_Sequence_Push_Consumer (
27 Notify_Test_Client
& client
,
30 , discard_policy_ (policy
)
36 this->client_
.consumer_start (this);
41 Notify_Sequence_Push_Consumer::_connect (
42 CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
)
44 CosNotifyComm::SequencePushConsumer_var consumer
=
47 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier
=
48 consumer_admin
->obtain_notification_push_supplier (
49 CosNotifyChannelAdmin::SEQUENCE_EVENT
,
53 CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow (
56 CosNotification::QoSProperties
properties (4);
57 properties
.length (4);
59 properties
[idx
].name
= CORBA::string_dup (CosNotification::MaximumBatchSize
);
60 properties
[idx
].value
<<= BATCH_SIZE
;
61 properties
[++idx
].name
= CORBA::string_dup (CosNotification::PacingInterval
);
62 properties
[idx
].value
<<= PACING
;
63 properties
[++idx
].name
= CORBA::string_dup (CosNotification::DiscardPolicy
);
64 properties
[idx
].value
<<= this->discard_policy_
;
65 properties
[++idx
].name
= CORBA::string_dup (CosNotification::MaxEventsPerConsumer
);
66 // We set this equal to the batch size so that we conveniently always receive
67 // one more batch after the first.
68 properties
[idx
].value
<<= BATCH_SIZE
;
70 this->proxy_
->set_qos (properties
);
71 this->proxy_
->connect_sequence_push_consumer (consumer
.in ());
73 // give ownership to POA
79 Notify_Sequence_Push_Consumer::push_structured_events (
80 const CosNotification::EventBatch
& events
)
84 // The pacing interval is set large enough that it should never come into
85 // play unless something goes wrong.
86 if (events
.length() != static_cast<size_t>(BATCH_SIZE
))
89 "Error : Unexpected batch size %u/%u\n", events
.length(),
91 this->client_
.consumer_done (this);
93 ACE_ASSERT(ACE_OS::strcmp(events
[0].header
.variable_header
[0].name
.in(), "Id") == 0);
97 events
[0].header
.variable_header
[0].value
>>= id1
;
98 events
[events
.length() - 1].header
.variable_header
[0].value
>>= id2
;
100 ACE_DEBUG((LM_DEBUG
, "{%d-%d}\n", id1
, id2
));
104 // We sleep long enough after the first batch to ensure that
105 // the supplier has time to send all the events. This will allow
106 // the notify service to discard all but one batch.
107 ACE_OS::sleep(FIRST_SLEEP_SECS
);
112 // Validate the batch is ordered
113 for (CORBA::Long i
= 1; i
< BATCH_SIZE
; ++i
)
116 events
[i
].header
.variable_header
[0].value
>>= id
;
119 ACE_ERROR((LM_ERROR
, "Error: Invalid batch. Expected %d, Was %d\n", id1
+ i
, id
));
120 this->client_
.consumer_done (this);
125 CORBA::Long expected
= 0;
127 if (discard_policy_
== CosNotification::PriorityOrder
)
129 expected
= sent_
- BATCH_SIZE
+ 1; // e.g. 37, 38, 39, 40
131 else if (discard_policy_
== CosNotification::FifoOrder
)
133 expected
= sent_
- BATCH_SIZE
+ 1; // e.g. 37, 38, 39, 40
137 ACE_ASSERT(discard_policy_
== CosNotification::LifoOrder
);
138 expected
= BATCH_SIZE
+ 1; // e.g. 5, 6, 7, 8
141 // On some slower platforms, the discard policy may be applied before the first
142 // batch is sent. In that case we may only get a single batch.
143 // On other platforms we may get two batches, but the first batch
144 // may or may not conform to the discard policy.
146 if (count_
== 1 && id1
!= expected
&& discard_policy_
!= CosNotification::LifoOrder
)
148 // We expect to get another batch with the correct one.
152 this->client_
.consumer_done (this);
157 ACE_ERROR((LM_ERROR
, "Error : Too many batches received.\n"));