1 // author : Boris Kolpackov <boris@dre.vanderbilt.edu>
3 #include "ace/Truncate.h"
6 using namespace CosNotifyComm
;
7 using namespace CosNotification
;
8 using namespace CosNotifyChannelAdmin
;
12 // Stop tracker thread.
14 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
23 Gate (ACE_INET_Addr
const& group
, EventChannel_ptr ch
)
24 : socket_ (group
, false),
27 init (ch
->default_consumer_admin (),
28 ch
->default_supplier_admin ());
32 Gate (ACE_INET_Addr
const& group
,
33 ConsumerAdmin_ptr consumer_admin
,
34 SupplierAdmin_ptr supplier_admin
)
35 : socket_ (group
, false),
38 init (consumer_admin
, supplier_admin
);
42 init (ConsumerAdmin_ptr consumer_admin
,
43 SupplierAdmin_ptr supplier_admin
)
45 // Generate unique id. It is used to prevent event looping.
48 ACE_Utils::UUID_GENERATOR::instance ()->init ();
49 ACE_Utils::UUID_GENERATOR::instance ()->generate_UUID (uuid
);
51 id_
= string_alloc (ACE_Utils::truncate_cast
<CORBA::ULong
> (
52 uuid
.to_string ()->length () + 2));
53 ACE_OS::strcpy (id_
.inout (), "_");
54 ACE_OS::strcpy (id_
.inout () + 1, uuid
.to_string ()->rep ());
56 // ACE_DEBUG ((LM_DEBUG, "ID: %s\n", id_.in ()));
59 // Obtain proxy consumer.
61 ProxyConsumer_var
pc (
62 supplier_admin
->obtain_notification_push_consumer (
63 STRUCTURED_EVENT
, consumer_id_
));
65 consumer_
= StructuredProxyPushConsumer::_narrow (pc
.in ());
67 consumer_
->connect_structured_push_supplier (
68 StructuredPushSupplier::_nil ());
71 // Register as consumer.
73 StructuredPushConsumer_var
ref (_this ()); // Activate on default POA.
75 ProxySupplier_var
ps (
76 consumer_admin
->obtain_notification_push_supplier (
77 STRUCTURED_EVENT
, supplier_id_
));
79 supplier_
= StructuredProxyPushSupplier::_narrow (ps
.in ());
81 supplier_
->connect_structured_push_consumer (ref
.in ());
84 // Create tracker thread.
86 thread_mgr_
.spawn (tracker_thunk
, this);
89 ACE_THR_FUNC_RETURN
Gate::
90 tracker_thunk (void* arg
)
92 Gate
* a
= reinterpret_cast<Gate
*> (arg
);
100 // Time period after which a manual cancellation request is
103 ACE_Time_Value
const timeout (0, 500);
111 n
= socket_
.size (timeout
);
113 // Check for cancellation request.
116 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
131 CORBA::ULong seqn
= ACE_Utils::truncate_cast
<CORBA::ULong
> (n
);
135 char* buffer
= reinterpret_cast<char*> (seq
.get_buffer ());
137 if (socket_
.recv (buffer
, n
) != n
)
139 ACE_ERROR ((LM_ERROR
,
140 "recv() reported different size than size()\n"));
144 TAO_InputCDR
cdr (buffer
, n
);
150 // Add TTL header to prevent infinite message looping.
154 for (; i
< e
.header
.variable_header
.length (); ++i
)
156 if (ACE_OS::strcmp (e
.header
.variable_header
[i
].name
.in (), id_
.in ()) == 0)
160 if (i
== e
.header
.variable_header
.length ())
162 e
.header
.variable_header
.length (i
+ 1);
164 e
.header
.variable_header
[i
].name
= string_dup (id_
.in ());
167 //ACE_DEBUG ((LM_DEBUG,
168 // "adding %s as header #%d\n",
169 // e.header.variable_header[i].name.in (), i));
171 e
.header
.variable_header
[i
].value
<<= ULong (1);
175 << e.header.fixed_header.event_type.domain_name << "::"
176 << e.header.fixed_header.event_type.type_name << " "
177 << e.header.fixed_header.event_name << endl;
180 consumer_
->push_structured_event (e
);
185 push_structured_event (StructuredEvent
const& e
)
187 for (ULong
i (0); i
< e
.header
.variable_header
.length (); ++i
)
189 if (ACE_OS::strcmp (e
.header
.variable_header
[i
].name
.in (), id_
.in ()) == 0)
193 e
.header
.variable_header
[i
].value
>>= ttl
;
197 //ACE_DEBUG ((LM_DEBUG,
208 << e.header.fixed_header.event_type.domain_name << "::"
209 << e.header.fixed_header.event_type.type_name << " "
210 << e.header.fixed_header.event_name << endl;
217 CORBA::ULong
size (ACE_Utils::truncate_cast
<CORBA::ULong
> (cdr
.total_length ()));
222 char* buffer
= reinterpret_cast<char*> (seq
.get_buffer ());
227 for (ACE_Message_Block
const* mb
= cdr
.begin ();
231 ACE_OS::memcpy (buf
, mb
->rd_ptr (), mb
->length ());
232 buf
+= mb
->length ();
236 socket_
.send (buffer
, size
);
241 Gate::disconnect_structured_push_consumer (void)
247 Gate::offer_change (EventTypeSeq
const&, EventTypeSeq
const&)