2 * @file Loopback_Supplier.cpp
4 * @author Carlos O'Ryan <coryan@uci.edu>
7 #include "Loopback_Supplier.h"
8 #include "Implicit_Deactivator.h"
10 Loopback_Supplier::Loopback_Supplier (CORBA::Long experiment_id
,
11 CORBA::Long response_type
,
12 PortableServer::POA_ptr poa
)
13 : experiment_id_ (experiment_id
)
14 , response_type_ (response_type
)
15 , default_POA_ (PortableServer::POA::_duplicate (poa
))
21 Loopback_Supplier::connect (RtecEventChannelAdmin::EventChannel_ptr ec
)
23 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin
=
27 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
28 if (!CORBA::is_nil (this->proxy_consumer_
.in ()))
31 this->proxy_consumer_
=
32 supplier_admin
->obtain_push_consumer ();
35 RtecEventComm::PushSupplier_var supplier
=
38 RtecEventChannelAdmin::SupplierQOS supplier_qos
;
39 supplier_qos
.is_gateway
= 0;
40 supplier_qos
.publications
.length (1);
41 RtecEventComm::EventHeader
& sh0
=
42 supplier_qos
.publications
[0].event
.header
;
43 sh0
.type
= this->response_type_
;
44 sh0
.source
= this->experiment_id_
;
46 this->proxy_consumer_
->connect_push_supplier (supplier
.in (),
51 Loopback_Supplier::disconnect ()
53 RtecEventChannelAdmin::ProxyPushConsumer_var proxy
;
55 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
56 if (CORBA::is_nil (this->proxy_consumer_
.in ()))
58 proxy
= this->proxy_consumer_
._retn ();
61 Implicit_Deactivator
deactivator (this);
64 proxy
->disconnect_push_consumer ();
65 } catch (const CORBA::Exception
&) {
70 Loopback_Supplier::push (const RtecEventComm::EventSet
&source
)
72 // ACE_DEBUG ((LM_DEBUG, "Loopback_Supplier pushing\n"));
73 RtecEventChannelAdmin::ProxyPushConsumer_var proxy
;
75 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
76 if (CORBA::is_nil (this->proxy_consumer_
.in ()))
78 proxy
= this->proxy_consumer_
;
81 this->counter_
+= source
.length ();
82 if ((this->counter_
+ 1) % 1000 == 0)
85 "(%P|%t) - Loopback (%d) sending %d messages\n",
86 this->response_type_
, this->counter_
+ 1));
91 // ACE_DEBUG ((LM_DEBUG, "Loopback_Supplier::push (%P|%t)\n"));
92 RtecEventComm::EventSet
events (source
);
93 for (CORBA::ULong i
= 0; i
!= events
.length (); ++i
)
95 events
[i
].header
.ttl
= 1;
96 events
[i
].header
.type
= this->response_type_
;
97 events
[i
].header
.source
= this->experiment_id_
;
100 proxy
->push (events
);
104 Loopback_Supplier::disconnect_push_supplier ()
106 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
107 this->proxy_consumer_
=
108 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
111 PortableServer::POA_ptr
112 Loopback_Supplier::_default_POA ()
114 return PortableServer::POA::_duplicate (this->default_POA_
.in ());