Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / examples / Notify / Federation / Gate / Gate.cpp
blob7055c096e6fef525722d2f51ff53bf390d969d66
1 // author : Boris Kolpackov <boris@dre.vanderbilt.edu>
2 #include "Gate.h"
3 #include "ace/Truncate.h"
5 using namespace CORBA;
6 using namespace CosNotifyComm;
7 using namespace CosNotification;
8 using namespace CosNotifyChannelAdmin;
10 Gate::~Gate ()
12 // Stop tracker thread.
14 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
15 stop_ = true;
18 thread_mgr_.wait ();
22 Gate::
23 Gate (ACE_INET_Addr const& group, EventChannel_ptr ch)
24 : socket_ (group, false),
25 stop_ (false)
27 init (ch->default_consumer_admin (),
28 ch->default_supplier_admin ());
31 Gate::
32 Gate (ACE_INET_Addr const& group,
33 ConsumerAdmin_ptr consumer_admin,
34 SupplierAdmin_ptr supplier_admin)
35 : socket_ (group, false),
36 stop_ (false)
38 init (consumer_admin, supplier_admin);
41 void Gate::
42 init (ConsumerAdmin_ptr consumer_admin,
43 SupplierAdmin_ptr supplier_admin)
45 // Generate unique id. It is used to prevent event looping.
47 ACE_Utils::UUID uuid;
48 ACE_Utils::UUID_GENERATOR::instance ()->init ();
49 ACE_Utils::UUID_GENERATOR::instance ()->generate_UUID (uuid);
51 id_ = string_alloc (ACE_Utils::truncate_cast<CORBA::ULong> (
52 uuid.to_string ()->length () + 2));
53 ACE_OS::strcpy (id_.inout (), "_");
54 ACE_OS::strcpy (id_.inout () + 1, uuid.to_string ()->rep ());
56 // ACE_DEBUG ((LM_DEBUG, "ID: %s\n", id_.in ()));
59 // Obtain proxy consumer.
61 ProxyConsumer_var pc (
62 supplier_admin->obtain_notification_push_consumer (
63 STRUCTURED_EVENT, consumer_id_));
65 consumer_ = StructuredProxyPushConsumer::_narrow (pc.in ());
67 consumer_->connect_structured_push_supplier (
68 StructuredPushSupplier::_nil ());
71 // Register as consumer.
73 StructuredPushConsumer_var ref (_this ()); // Activate on default POA.
75 ProxySupplier_var ps (
76 consumer_admin->obtain_notification_push_supplier (
77 STRUCTURED_EVENT, supplier_id_));
79 supplier_ = StructuredProxyPushSupplier::_narrow (ps.in ());
81 supplier_->connect_structured_push_consumer (ref.in ());
84 // Create tracker thread.
86 thread_mgr_.spawn (tracker_thunk, this);
89 ACE_THR_FUNC_RETURN Gate::
90 tracker_thunk (void* arg)
92 Gate* a = reinterpret_cast<Gate*> (arg);
93 a->tracker ();
94 return 0;
97 void Gate::
98 tracker ()
100 // Time period after which a manual cancellation request is
101 // checked for.
103 ACE_Time_Value const timeout (0, 500);
105 while (true)
107 ssize_t n;
109 while (true)
111 n = socket_.size (timeout);
113 // Check for cancellation request.
116 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
118 if (stop_)
119 return;
122 if (n == -1)
124 if (errno != ETIME)
125 ACE_OS::abort ();
127 else
128 break;
131 CORBA::ULong seqn = ACE_Utils::truncate_cast<CORBA::ULong> (n);
132 OctetSeq seq (seqn);
133 seq.length (seqn);
135 char* buffer = reinterpret_cast<char*> (seq.get_buffer ());
137 if (socket_.recv (buffer, n) != n)
139 ACE_ERROR ((LM_ERROR,
140 "recv() reported different size than size()\n"));
141 continue;
144 TAO_InputCDR cdr (buffer, n);
146 StructuredEvent e;
148 cdr >> e;
150 // Add TTL header to prevent infinite message looping.
152 ULong i (0);
154 for (; i < e.header.variable_header.length (); ++i)
156 if (ACE_OS::strcmp (e.header.variable_header[i].name.in (), id_.in ()) == 0)
157 break;
160 if (i == e.header.variable_header.length ())
162 e.header.variable_header.length (i + 1);
164 e.header.variable_header[i].name = string_dup (id_.in ());
167 //ACE_DEBUG ((LM_DEBUG,
168 // "adding %s as header #%d\n",
169 // e.header.variable_header[i].name.in (), i));
171 e.header.variable_header[i].value <<= ULong (1);
174 cerr << "IN: "
175 << e.header.fixed_header.event_type.domain_name << "::"
176 << e.header.fixed_header.event_type.type_name << " "
177 << e.header.fixed_header.event_name << endl;
180 consumer_->push_structured_event (e);
184 void Gate::
185 push_structured_event (StructuredEvent const& e)
187 for (ULong i (0); i < e.header.variable_header.length (); ++i)
189 if (ACE_OS::strcmp (e.header.variable_header[i].name.in (), id_.in ()) == 0)
191 ULong ttl;
193 e.header.variable_header[i].value >>= ttl;
195 if (ttl <= 1)
197 //ACE_DEBUG ((LM_DEBUG,
198 // "DROPPED\n"));
199 return;
202 break;
207 cerr << "OUT: "
208 << e.header.fixed_header.event_type.domain_name << "::"
209 << e.header.fixed_header.event_type.type_name << " "
210 << e.header.fixed_header.event_name << endl;
213 TAO_OutputCDR cdr;
215 cdr << e;
217 CORBA::ULong size (ACE_Utils::truncate_cast<CORBA::ULong> (cdr.total_length ()));
219 OctetSeq seq (size);
220 seq.length (size);
222 char* buffer = reinterpret_cast<char*> (seq.get_buffer ());
225 char* buf = buffer;
227 for (ACE_Message_Block const* mb = cdr.begin ();
228 mb != 0;
229 mb = mb->cont ())
231 ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ());
232 buf += mb->length ();
236 socket_.send (buffer, size);
240 void
241 Gate::disconnect_structured_push_consumer (void)
243 // We don't care.
246 void
247 Gate::offer_change (EventTypeSeq const&, EventTypeSeq const&)
249 // We don't care.