1 // author : Boris Kolpackov <boris@dre.vanderbilt.edu>
8 #include <ace/OS_NS_stdlib.h>
10 #include <ace/Time_Value.h>
11 #include <ace/SOCK_Dgram_Mcast.h>
12 #include <ace/Condition_T.h>
13 #include <ace/OS_NS_sys_time.h>
15 #include "Messaging.hpp"
17 #include "Protocol.hpp"
21 #include "LinkListener.hpp"
22 #include "FaultDetector.hpp"
23 #include "TransactionController.hpp"
28 operator== (std::type_info
const* pa
, std::type_info
const& b
)
36 class Terminate
: public virtual Message
{};
42 class Failure
: public virtual Message
{};
51 Scheduler (ACE_INET_Addr
const& addr
,
53 MessageQueue
& out_send_data
,
54 MessageQueue
& out_recv_data
,
55 MessageQueue
& out_control
)
62 out_control_ (out_control
),
65 in_link_data_(mutex_
),
68 sync_schedule (ACE_OS::gettimeofday ()),
70 transaction_controller_ (in_data_
, out_send_data
, out_recv_data
)
72 ACE_OS::strncpy (id_
, id
, Protocol::MEMBER_ID_LENGTH
);
73 id_
[Protocol::MEMBER_ID_LENGTH
- 1] = '\0';
75 sock_
.set_option (IP_MULTICAST_TTL
, 32); // @@ ttl is hardcoded
77 in_data_
.subscribe (cond_
);
78 in_link_data_
.subscribe (cond_
);
79 in_control_
.subscribe (cond_
);
82 if (ACE_OS::thr_create (&thread_thunk
,
86 &thread_
) != 0) ACE_OS::abort ();
92 MessageQueueAutoLock
lock (in_control_
);
94 in_control_
.push (MessagePtr (new Terminate
));
97 if (ACE_OS::thr_join (thread_
, 0) != 0) ACE_OS::abort ();
99 // cerr << "Scheduler is down." << endl;
110 static ACE_THR_FUNC_RETURN
111 thread_thunk (void* arg
)
113 Scheduler
* obj
= reinterpret_cast<Scheduler
*> (arg
);
124 std::unique_ptr
<LinkListener
> ll (new LinkListener (sock_
, in_link_data_
));
127 AutoLock
lock (mutex_
);
135 cond_
.wait (&sync_schedule
);
137 // "Loop of Fairness"
146 if (!in_control_
.empty ())
155 if (sync_schedule
< ACE_OS::gettimeofday ())
161 // schedule next outsync
163 ACE_OS::gettimeofday () +
164 ACE_Time_Value (0, Protocol::SYNC_PERIOD
);
170 if (!in_link_data_
.empty ())
172 MessagePtr
m (in_link_data_
.front ());
173 in_link_data_
.pop ();
177 if (dynamic_cast<LinkFailure
*>(m
.get()))
179 // cerr << "link failure" << endl;
182 else if ((data
= dynamic_cast<LinkData
*> (m
.get ())))
186 // Filter out loopback.
188 if (ACE_OS::strcmp (data
->header().member_id
.id
, id_
) != 0)
192 current_transaction (data
->header().current
,
199 // cerr << "unknown message type from link listener: "
200 // << typeid (*m).name () << endl;
208 if (!in_data_
.empty ())
215 } while (!in_link_data_
.empty() ||
216 sync_schedule
< ACE_OS::gettimeofday ());
224 // cerr << "Exception in scheduler loop." << endl;
225 MessageQueueAutoLock
lock (out_control_
);
226 out_control_
.push (MessagePtr (new Failure
));
234 // INSYNC, TSL, VOTE, BEGIN
242 fault_detector_
.insync ();
248 char buf
[Protocol::MAX_MESSAGE_SIZE
];
250 Protocol::MessageHeader
* hdr
=
251 reinterpret_cast<Protocol::MessageHeader
*> (buf
);
253 void* data
= buf
+ sizeof (Protocol::MessageHeader
);
255 hdr
->length
= sizeof (Protocol::MessageHeader
);
258 ACE_OS::strcpy (hdr
->member_id
.id
, id_
);
262 transaction_controller_
.outsync (hdr
->current
, data
, size
);
266 fault_detector_
.outsync ();
268 // sock_.send (buf, hdr->length, addr_);
269 sock_
.send (buf
, hdr
->length
);
278 current_transaction (Protocol::Transaction
const& t
,
282 transaction_controller_
.current_transaction (t
, payload
, size
);
288 transaction_controller_
.api ();
292 ACE_hthread_t thread_
;
294 ACE_Thread_Mutex mutex_
;
295 ACE_Condition
<ACE_Thread_Mutex
> cond_
;
297 // FUZZ: disable check_for_ACE_Guard
298 typedef ACE_Guard
<ACE_Thread_Mutex
> AutoLock
;
299 // FUZZ: enable check_for_ACE_Guard
301 char id_
[Protocol::MEMBER_ID_LENGTH
];
304 ACE_SOCK_Dgram_Mcast sock_
;
306 MessageQueue
& out_control_
;
308 MessageQueue in_data_
;
309 MessageQueue in_link_data_
;
310 MessageQueue in_control_
;
316 ACE_Time_Value sync_schedule
;
318 FaultDetector fault_detector_
;
319 TransactionController transaction_controller_
;
326 class Group::GroupImpl
329 virtual ~GroupImpl ()
333 GroupImpl (ACE_INET_Addr
const& addr
, char const* id
)
334 : send_cond_ (mutex_
),
337 in_send_data_ (mutex_
),
338 in_recv_data_ (mutex_
),
339 in_control_ (mutex_
),
340 scheduler_ (new Scheduler (addr
,
345 out_data_ (scheduler_
->in_data ())
347 in_send_data_
.subscribe (send_cond_
);
348 in_recv_data_
.subscribe (recv_cond_
);
350 in_control_
.subscribe (send_cond_
);
351 in_control_
.subscribe (recv_cond_
);
354 //FUZZ: disable check_for_lack_ACE_OS
355 void send (void const* msg
, size_t size
)
357 //FUZZ: enable check_for_lack_ACE_OS
359 if (size
> Protocol::MAX_PAYLOAD_SIZE
) throw InvalidArg ();
361 // Note the potential deadlock if I lock mutex_ and out_data_ in
364 MessageQueueAutoLock
l1 (out_data_
);
365 AutoLock
l2 (mutex_
);
369 out_data_
.push (MessagePtr (new Send (msg
, size
)));
371 l1
.unlock (); // no need to keep it locked
377 if (!in_send_data_
.empty ())
379 MessagePtr
m (in_send_data_
.front ());
380 in_send_data_
.pop ();
382 if (dynamic_cast<ACE_TMCast::Aborted
*>(m
.get()))
384 throw Group::Aborted ();
386 else if (dynamic_cast<Commited
*>(m
.get()))
392 // cerr << "send: group-scheduler messaging protocol violation; "
393 // << "unexpected message " << typeid (*m).name ()
394 // << " " << typeid (Aborted).name () << endl;
400 // cerr << "send: waiting on condition" << endl;
402 // cerr << "send: wokeup on condition" << endl;
406 //FUZZ: disable check_for_lack_ACE_OS
407 size_t recv (void* msg
, size_t size
)
409 //FUZZ: enable check_for_lack_ACE_OS
411 AutoLock
lock (mutex_
);
417 if (!in_recv_data_
.empty ())
419 MessagePtr
m (in_recv_data_
.front ());
420 in_recv_data_
.pop ();
422 Recv
* data
= dynamic_cast<Recv
*> (m
.get ());
425 Recv
* data
= dynamic_cast<Recv
*> (m
.get ());
427 if (size
< data
->size ())
428 throw Group::InsufficienSpace ();
430 ACE_OS::memcpy (msg
, data
->payload (), data
->size ());
432 return data
->size ();
436 // cerr << "recv: group-scheduler messaging protocol violation. "
437 // << "unexpected message " << typeid (*m).name () << endl;
451 if (!failed_
&& !in_control_
.empty ()) failed_
= true;
453 if (failed_
) throw Group::Failed ();
457 ACE_Thread_Mutex mutex_
;
458 ACE_Condition
<ACE_Thread_Mutex
> send_cond_
;
459 ACE_Condition
<ACE_Thread_Mutex
> recv_cond_
;
461 // FUZZ: disable check_for_ACE_Guard
462 typedef ACE_Guard
<ACE_Thread_Mutex
> AutoLock
;
463 // FUZZ: enable check_for_ACE_Guard
467 MessageQueue in_send_data_
;
468 MessageQueue in_recv_data_
;
469 MessageQueue in_control_
;
471 std::unique_ptr
<Scheduler
> scheduler_
;
473 MessageQueue
& out_data_
;
481 Group (ACE_INET_Addr
const& addr
, char const* id
)
482 : pimpl_ (new GroupImpl (addr
, id
))
492 Group::send (void const* msg
, size_t size
)
494 pimpl_
->send (msg
, size
);
498 Group::recv (void* msg
, size_t size
)
500 return pimpl_
->recv (msg
, size
);