1 #include "ace/OS_NS_unistd.h"
2 #include "Notify_Structured_Push_Consumer.h"
3 #include "Notify_Test_Client.h"
4 #include "orbsvcs/Notify/Notify_Extensions.h"
8 static const int CONSUMER_DELAY
= 1; // seconds.
10 Notify_Structured_Push_Consumer::Notify_Structured_Push_Consumer (
12 TimeBase::TimeT block
,
13 unsigned int expected
,
14 Notify_Test_Client
& client
)
16 blocking_timeout_ (block
),
21 this->client_
.consumer_start (this);
26 Notify_Structured_Push_Consumer::_connect (
27 CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
)
29 CosNotifyComm::StructuredPushConsumer_var objref
=
32 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier
=
33 consumer_admin
->obtain_notification_push_supplier (
34 CosNotifyChannelAdmin::STRUCTURED_EVENT
,
38 CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (
41 CosNotification::QoSProperties
properties (2);
42 properties
.length (2);
43 // The discard policy and admin properties tests already test using
44 // the MaxQueueLength policy, so we'll use MEPC instead. It should
45 // make no difference.
46 // If the blocking timeout is less than CONSUMER_DELAY seconds, then
47 // we want to ensure that exactly one event is discarded, so we set
48 // the MEPC to expected_ - 1. If the supplier sends 20, then we expect 19,
49 // and we set MEPC to 18. The first event will be dispatched at once, but
50 // will block in the consumer. This will allow the queue in the notify svc
51 // to fill up to 18. However, the blocking code will timeout before the
52 // consumer finishes which will cause an event to be discarded. This will
53 // allow the last event to be queued. Eventually the consumer will unblock
54 // and receive the remaining events
55 properties
[0].name
= CORBA::string_dup (CosNotification::MaxEventsPerConsumer
);
56 if (blocking_timeout_
< CONSUMER_DELAY
* 1000 * 1000 * 10)
57 properties
[0].value
<<= (CORBA::Long
) expected_
- 1;
59 properties
[0].value
<<= (CORBA::Long
) 10;
62 properties
[1].name
= CORBA::string_dup (TAO_Notify_Extensions::BlockingPolicy
);
63 properties
[1].value
<<= this->blocking_timeout_
;
65 this->proxy_
->set_qos (properties
);
66 this->proxy_
->connect_structured_push_consumer (objref
.in ());
68 // give ownership to POA
74 Notify_Structured_Push_Consumer::push_structured_event (
75 const CosNotification::StructuredEvent
& event
)
77 ACE_DEBUG((LM_DEBUG
, "-"));
78 ACE_UNUSED_ARG(event
);
81 if (this->count_
> this->expected_
)
84 ACE_TEXT ("Structured Consumer (%P|%t): ERROR: too ")
85 ACE_TEXT ("many events received.\n")));
88 if (this->count_
>= this->expected_
)
90 ACE_DEBUG((LM_DEBUG
, "\nConsumer received %u events.\n", count_
));
91 this->client_
.consumer_done (this);
94 // By pausing here, we force the channel to back up, which will
95 // either result in discarding of events, or blocking, depending
96 // on whether our BlockingPolicy is greater than the following
98 // A BlockingPolicy > 1 second should allow the first event.
99 // A BlockingPolicy < 1 second should discard the first event.
101 ACE_OS::sleep (CONSUMER_DELAY
);