1 #include "Notify_Sequence_Push_Consumer.h"
2 #include "Notify_Test_Client.h"
4 #include "orbsvcs/TimeBaseC.h"
6 #include "ace/OS_NS_unistd.h"
8 const int BATCH_SIZE
= 5;
10 Notify_Sequence_Push_Consumer::Notify_Sequence_Push_Consumer (
15 Notify_Test_Client
& client
)
17 order_policy_ (policy
),
18 use_ordering_ (use_ordering
),
24 this->client_
.consumer_start (this);
29 Notify_Sequence_Push_Consumer::_connect (
30 CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
)
32 CosNotifyComm::SequencePushConsumer_var objref
=
35 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier
=
36 consumer_admin
->obtain_notification_push_supplier (
37 CosNotifyChannelAdmin::SEQUENCE_EVENT
,
41 CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow (
44 CosNotification::QoSProperties
properties (3);
45 properties
.length (3);
46 properties
[0].name
= CORBA::string_dup (CosNotification::MaximumBatchSize
);
47 properties
[0].value
<<= (CORBA::Long
) BATCH_SIZE
;
48 properties
[1].name
= CORBA::string_dup (CosNotification::PacingInterval
);
49 properties
[1].value
<<= (TimeBase::TimeT
) (1000 * 10000); // 1 secs
52 properties
[2].name
= CORBA::string_dup (CosNotification::OrderPolicy
);
53 properties
[2].value
<<= this->order_policy_
;
60 this->proxy_
->set_qos (properties
);
61 this->proxy_
->connect_sequence_push_consumer (objref
.in ());
63 // give ownership to POA
69 Notify_Sequence_Push_Consumer::push_structured_events (
70 const CosNotification::EventBatch
& events
)
74 // Sleep long enough to force the channel to back up, otherwise
75 // there will be no ordering.
79 ACE_ASSERT(events
.length() == static_cast<CORBA::ULong
>(BATCH_SIZE
));
81 count_
+= events
.length();
83 if (this->count_
> this->expected_
)
85 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("ERROR: too many events received.\n")));
88 if (this->count_
>= this->expected_
)
90 this->client_
.consumer_done (this);
93 ACE_ASSERT(events
[0].header
.variable_header
.length() == 3);
94 ACE_ASSERT(ACE_OS::strcmp(events
[0].header
.variable_header
[0].name
.in(), "id") == 0);
95 CORBA::Long first_id
= 0;
96 events
[0].header
.variable_header
[0].value
>>= first_id
;
97 CORBA::Long last_id
= 0;
98 events
[events
.length() - 1].header
.variable_header
[0].value
>>= last_id
;
100 ACE_DEBUG((LM_DEBUG
, "\n Received id %d-%d\n", first_id
, last_id
));
102 int events_length
= static_cast<int>(events
.length());
104 CORBA::Long previous_id
= first_id
;
106 if (count_
> events_length
) // Ignore the very first batch
108 // First check that the sequences are ordered correctly
109 for (CORBA::ULong idx
= 1; idx
< events
.length(); ++idx
)
113 events
[idx
].header
.variable_header
[0].value
>>= id
;
114 CORBA::Long expected_id
= previous_id
+ 1;
115 if (order_policy_
== CosNotification::PriorityOrder
116 || order_policy_
== CosNotification::DeadlineOrder
)
118 expected_id
= previous_id
- 1;
120 if (id
!= expected_id
)
122 ACE_ERROR((LM_ERROR
, "Error: Expected:%d Received:%d\n", expected_id
, id
));
128 // Next check that the first id in the sequence is ordered
129 // relative to the previously retrieved sequence.
130 if (previous_first_
!= 0)
132 CORBA::Long expected_id
= previous_first_
+ BATCH_SIZE
;
133 if (order_policy_
== CosNotification::PriorityOrder
134 || order_policy_
== CosNotification::DeadlineOrder
)
136 expected_id
= previous_first_
- BATCH_SIZE
;
138 if (first_id
!= expected_id
)
140 ACE_ERROR((LM_ERROR
, "Error: Expected:%d Received:%d\n", expected_id
, first_id
));
144 previous_first_
= first_id
;