Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / ACE / protocols / ace / TMCast / Group.cpp
blob844a78219d6bad151be6be4167c223bbbf45d248
1 // author : Boris Kolpackov <boris@dre.vanderbilt.edu>
3 #include "Group.hpp"
5 #include <typeinfo>
7 // OS primitives
8 #include <ace/OS_NS_stdlib.h>
9 #include <ace/Synch.h>
10 #include <ace/Time_Value.h>
11 #include <ace/SOCK_Dgram_Mcast.h>
12 #include <ace/Condition_T.h>
13 #include <ace/OS_NS_sys_time.h>
15 #include "Messaging.hpp"
17 #include "Protocol.hpp"
19 // Components
21 #include "LinkListener.hpp"
22 #include "FaultDetector.hpp"
23 #include "TransactionController.hpp"
25 namespace ACE_TMCast
27 bool
28 operator== (std::type_info const* pa, std::type_info const& b)
30 return *pa == b;
36 class Terminate : public virtual Message {};
42 class Failure : public virtual Message {};
48 class Scheduler
50 public:
51 Scheduler (ACE_INET_Addr const& addr,
52 char const* id,
53 MessageQueue& out_send_data,
54 MessageQueue& out_recv_data,
55 MessageQueue& out_control)
57 : cond_ (mutex_),
59 addr_ (addr),
60 sock_ (),
62 out_control_ (out_control),
64 in_data_ (mutex_),
65 in_link_data_(mutex_),
66 in_control_ (mutex_),
68 sync_schedule (ACE_OS::gettimeofday ()),
70 transaction_controller_ (in_data_, out_send_data, out_recv_data)
72 ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH);
73 id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0';
75 sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded
77 in_data_.subscribe (cond_);
78 in_link_data_.subscribe (cond_);
79 in_control_.subscribe (cond_);
81 ACE_thread_t unused;
82 if (ACE_OS::thr_create (&thread_thunk,
83 this,
84 THR_JOINABLE,
85 &unused,
86 &thread_) != 0) ACE_OS::abort ();
89 virtual ~Scheduler ()
92 MessageQueueAutoLock lock (in_control_);
94 in_control_.push (MessagePtr (new Terminate));
97 if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort ();
99 // cerr << "Scheduler is down." << endl;
102 public:
103 MessageQueue&
104 in_data ()
106 return in_data_;
109 private:
110 static ACE_THR_FUNC_RETURN
111 thread_thunk (void* arg)
113 Scheduler* obj = reinterpret_cast<Scheduler*> (arg);
114 obj->execute ();
115 return 0;
118 void
119 execute ()
123 sock_.join (addr_);
124 std::unique_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_));
127 AutoLock lock (mutex_);
129 // Loop
133 while (true)
135 cond_.wait (&sync_schedule);
137 // "Loop of Fairness"
139 bool done = false;
143 // control message
146 if (!in_control_.empty ())
148 done = true;
149 break;
152 // outsync
155 if (sync_schedule < ACE_OS::gettimeofday ())
157 // OUTSYNC
159 outsync ();
161 // schedule next outsync
162 sync_schedule =
163 ACE_OS::gettimeofday () +
164 ACE_Time_Value (0, Protocol::SYNC_PERIOD);
167 // link message
170 if (!in_link_data_.empty ())
172 MessagePtr m (in_link_data_.front ());
173 in_link_data_.pop ();
175 LinkData* data;
177 if (dynamic_cast<LinkFailure*>(m.get()))
179 // cerr << "link failure" << endl;
180 throw false;
182 else if ((data = dynamic_cast<LinkData*> (m.get ())))
184 // INSYNC, TL, CT
186 // Filter out loopback.
188 if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0)
190 insync ();
191 transaction_list ();
192 current_transaction (data->header().current,
193 data->payload (),
194 data->size ());
197 else
199 // cerr << "unknown message type from link listener: "
200 // << typeid (*m).name () << endl;
201 ACE_OS::abort ();
205 // api message
208 if (!in_data_.empty ())
210 // API
212 api ();
215 } while (!in_link_data_.empty() ||
216 sync_schedule < ACE_OS::gettimeofday ());
218 if (done) break;
222 catch (...)
224 // cerr << "Exception in scheduler loop." << endl;
225 MessageQueueAutoLock lock (out_control_);
226 out_control_.push (MessagePtr (new Failure));
230 // Events
232 // Order:
234 // INSYNC, TSL, VOTE, BEGIN
235 // API
236 // OUTSYNC
239 void
240 insync ()
242 fault_detector_.insync ();
245 void
246 outsync ()
248 char buf[Protocol::MAX_MESSAGE_SIZE];
250 Protocol::MessageHeader* hdr =
251 reinterpret_cast<Protocol::MessageHeader*> (buf);
253 void* data = buf + sizeof (Protocol::MessageHeader);
255 hdr->length = sizeof (Protocol::MessageHeader);
256 hdr->check_sum = 0;
258 ACE_OS::strcpy (hdr->member_id.id, id_);
260 size_t size (0);
262 transaction_controller_.outsync (hdr->current, data, size);
264 hdr->length += size;
266 fault_detector_.outsync ();
268 // sock_.send (buf, hdr->length, addr_);
269 sock_.send (buf, hdr->length);
272 void
273 transaction_list ()
277 void
278 current_transaction (Protocol::Transaction const& t,
279 void const* payload,
280 size_t size)
282 transaction_controller_.current_transaction (t, payload, size);
285 void
286 api ()
288 transaction_controller_.api ();
291 private:
292 ACE_hthread_t thread_;
294 ACE_Thread_Mutex mutex_;
295 ACE_Condition<ACE_Thread_Mutex> cond_;
297 // FUZZ: disable check_for_ACE_Guard
298 typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
299 // FUZZ: enable check_for_ACE_Guard
301 char id_[Protocol::MEMBER_ID_LENGTH];
303 ACE_INET_Addr addr_;
304 ACE_SOCK_Dgram_Mcast sock_;
306 MessageQueue& out_control_;
308 MessageQueue in_data_;
309 MessageQueue in_link_data_;
310 MessageQueue in_control_;
312 // Protocol state
316 ACE_Time_Value sync_schedule;
318 FaultDetector fault_detector_;
319 TransactionController transaction_controller_;
326 class Group::GroupImpl
328 public:
329 virtual ~GroupImpl ()
333 GroupImpl (ACE_INET_Addr const& addr, char const* id)
334 : send_cond_ (mutex_),
335 recv_cond_ (mutex_),
336 failed_ (false),
337 in_send_data_ (mutex_),
338 in_recv_data_ (mutex_),
339 in_control_ (mutex_),
340 scheduler_ (new Scheduler (addr,
342 in_send_data_,
343 in_recv_data_,
344 in_control_)),
345 out_data_ (scheduler_->in_data ())
347 in_send_data_.subscribe (send_cond_);
348 in_recv_data_.subscribe (recv_cond_);
350 in_control_.subscribe (send_cond_);
351 in_control_.subscribe (recv_cond_);
354 //FUZZ: disable check_for_lack_ACE_OS
355 void send (void const* msg, size_t size)
357 //FUZZ: enable check_for_lack_ACE_OS
359 if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg ();
361 // Note the potential deadlock if I lock mutex_ and out_data_ in
362 // reverse order.
364 MessageQueueAutoLock l1 (out_data_);
365 AutoLock l2 (mutex_);
367 throw_if_failed ();
369 out_data_.push (MessagePtr (new Send (msg, size)));
371 l1.unlock (); // no need to keep it locked
373 while (true)
375 throw_if_failed ();
377 if (!in_send_data_.empty ())
379 MessagePtr m (in_send_data_.front ());
380 in_send_data_.pop ();
382 if (dynamic_cast<ACE_TMCast::Aborted*>(m.get()))
384 throw Group::Aborted ();
386 else if (dynamic_cast<Commited*>(m.get()))
388 return;
390 else
392 // cerr << "send: group-scheduler messaging protocol violation; "
393 // << "unexpected message " << typeid (*m).name ()
394 // << " " << typeid (Aborted).name () << endl;
396 ACE_OS::abort ();
400 // cerr << "send: waiting on condition" << endl;
401 send_cond_.wait ();
402 // cerr << "send: wokeup on condition" << endl;
406 //FUZZ: disable check_for_lack_ACE_OS
407 size_t recv (void* msg, size_t size)
409 //FUZZ: enable check_for_lack_ACE_OS
411 AutoLock lock (mutex_);
413 while (true)
415 throw_if_failed ();
417 if (!in_recv_data_.empty ())
419 MessagePtr m (in_recv_data_.front ());
420 in_recv_data_.pop ();
422 Recv* data = dynamic_cast<Recv*> (m.get ());
423 if (data)
425 Recv* data = dynamic_cast<Recv*> (m.get ());
427 if (size < data->size ())
428 throw Group::InsufficienSpace ();
430 ACE_OS::memcpy (msg, data->payload (), data->size ());
432 return data->size ();
434 else
436 // cerr << "recv: group-scheduler messaging protocol violation. "
437 // << "unexpected message " << typeid (*m).name () << endl;
439 ACE_OS::abort ();
443 recv_cond_.wait ();
447 private:
448 void
449 throw_if_failed ()
451 if (!failed_ && !in_control_.empty ()) failed_ = true;
453 if (failed_) throw Group::Failed ();
456 private:
457 ACE_Thread_Mutex mutex_;
458 ACE_Condition<ACE_Thread_Mutex> send_cond_;
459 ACE_Condition<ACE_Thread_Mutex> recv_cond_;
461 // FUZZ: disable check_for_ACE_Guard
462 typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
463 // FUZZ: enable check_for_ACE_Guard
465 bool failed_;
467 MessageQueue in_send_data_;
468 MessageQueue in_recv_data_;
469 MessageQueue in_control_;
471 std::unique_ptr<Scheduler> scheduler_;
473 MessageQueue& out_data_;
477 // Group
480 Group::
481 Group (ACE_INET_Addr const& addr, char const* id)
482 : pimpl_ (new GroupImpl (addr, id))
486 Group::
487 ~Group ()
491 void
492 Group::send (void const* msg, size_t size)
494 pimpl_->send (msg, size);
497 size_t
498 Group::recv (void* msg, size_t size)
500 return pimpl_->recv (msg, size);