1 #include "ace/OS_NS_unistd.h"
2 #include "Notify_Structured_Push_Consumer.h"
3 #include "Notify_Test_Client.h"
7 Notify_Structured_Push_Consumer::Notify_Structured_Push_Consumer (
10 CORBA::Long max_events_per_consumer
,
11 Notify_Test_Client
& client
)
13 discard_policy_ (policy
),
14 max_events_per_consumer_ (max_events_per_consumer
),
20 this->client_
.consumer_start (this);
25 Notify_Structured_Push_Consumer::_connect (
26 CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
)
28 CosNotifyComm::StructuredPushConsumer_var objref
=
31 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier
=
32 consumer_admin
->obtain_notification_push_supplier (
33 CosNotifyChannelAdmin::STRUCTURED_EVENT
,
37 CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (
40 CosNotification::QoSProperties
properties (2);
41 properties
.length (2);
42 properties
[0].name
= CORBA::string_dup (CosNotification::DiscardPolicy
);
43 properties
[0].value
<<= this->discard_policy_
;
44 properties
[1].name
= CORBA::string_dup (CosNotification::MaxEventsPerConsumer
);
45 properties
[1].value
<<= this->max_events_per_consumer_
;
47 this->proxy_
->set_qos (properties
);
48 this->proxy_
->connect_structured_push_consumer (objref
.in ());
50 // give ownership to POA
56 Notify_Structured_Push_Consumer::push_structured_event (
57 const CosNotification::StructuredEvent
& event
)
59 ACE_DEBUG((LM_DEBUG
, "-"));
62 if (this->count_
> max_events_per_consumer_
+ 1)
64 this->client_
.consumer_done (this);
66 ACE_TEXT ("Structured Consumer (%P|%t): ERROR: too ")
67 ACE_TEXT ("many events received (%d).\n"), this->count_
));
70 ACE_ASSERT(ACE_OS::strcmp(event
.header
.variable_header
[0].name
.in(), "Id") == 0);
71 CORBA::Any v
= event
.header
.variable_header
[0].value
;
75 // Force the notify service to queue events
76 if (this->count_
== 1)
82 // @@ The priority header isn't making it through the notify service.
83 //n = event.header.variable_header[1].name;
84 //if (ACE_OS::strcmp(n.in(), CosNotification::Priority) != 0)
86 // ACE_ERROR((LM_ERROR, "Error: Couldn't find Priority header in event.\n"));
89 //v = event.header.variable_header[1].value;
90 //CORBA::Long priority = 0;
92 CORBA::Long expected
= 0;
93 if (discard_policy_
== CosNotification::PriorityOrder
)
95 expected
= sent_
- max_events_per_consumer_
+ count_
;
96 if (first_
!= sent_
- max_events_per_consumer_
+ 1)
99 else if (discard_policy_
== CosNotification::FifoOrder
)
101 expected
= sent_
- max_events_per_consumer_
+ count_
;
102 if (first_
!= sent_
- max_events_per_consumer_
+ 1)
105 else if (discard_policy_
== CosNotification::LifoOrder
)
111 ACE_ERROR((LM_ERROR
, "Error: Unexpected discard policy.\n"));
115 if (id
!= expected
&& count_
!= 1)
117 ACE_DEBUG((LM_DEBUG
, "Error: Expected %d, ", expected
));
118 this->client_
.consumer_done (this);
121 ACE_DEBUG((LM_DEBUG
, "received %d\n", id
));
123 // We should receive mepc + 1, because the first event will be in-transit
124 // before our sleep causes the notify to queue events.
125 // However, on some platforms, we'll only receive mepc, because filtering
126 // happened before the first event.
127 if (this->count_
>= this->max_events_per_consumer_
)
129 this->client_
.consumer_done (this);