1 // author : Boris Kolpackov <boris@dre.vanderbilt.edu>
3 #include "ace/OS_NS_string.h"
4 #include "ace/OS_NS_stdlib.h"
6 #include "ace/Bound_Ptr.h"
8 #include "Protocol.hpp"
9 #include "Messaging.hpp"
19 class Send
: public virtual Message
22 Send (void const* msg
, size_t size
)
25 ACE_OS::memcpy (payload_
, msg
, size_
);
42 char payload_
[Protocol::MAX_PAYLOAD_SIZE
];
46 ACE_Strong_Bound_Ptr
<Send
, ACE_SYNCH_MUTEX
>
50 class Recv
: public virtual Message
53 Recv (void const* msg
, size_t size
)
56 ACE_OS::memcpy (payload_
, msg
, size_
);
73 char payload_
[Protocol::MAX_PAYLOAD_SIZE
];
77 ACE_Strong_Bound_Ptr
<Recv
, ACE_SYNCH_MUTEX
>
80 class Aborted
: public virtual Message
{};
82 class Commited
: public virtual Message
{};
88 class TransactionController
91 TransactionController (MessageQueue
& in
,
92 MessageQueue
& send_out
,
93 MessageQueue
& recv_out
)
96 separation_duration_ (0),
102 current_
.status
= Protocol::TS_COMMITED
;
110 outsync (Protocol::Transaction
& c
, void* payload
, size_t& size
)
112 if (current_
.status
== Protocol::TS_COMMIT
||
113 current_
.status
== Protocol::TS_ABORT
)
115 if (++voting_duration_
>= Protocol::VOTING_FRAME
)
117 // end of voting frame
119 if (current_
.status
== Protocol::TS_COMMIT
)
124 MessageQueueAutoLock
lock (send_out_
);
125 send_out_
.push (MessagePtr (new Commited
));
127 else // joined transaction
129 MessageQueueAutoLock
lock (recv_out_
);
130 recv_out_
.push (MessagePtr (recv_
));
135 current_
.status
= Protocol::TS_COMMITED
;
137 // if (trace_) cerr << "commited transaction with id "
138 // << current_.id << endl;
144 MessageQueueAutoLock
lock (send_out_
);
145 send_out_
.push (MessagePtr (new Aborted
));
149 // free revc_ buffer if necessary
151 if (recv_
.get ()) recv_
= RecvPtr ();
155 current_
.status
= Protocol::TS_ABORTED
;
157 // if (trace_) cerr << "aborted transaction with id "
158 // << current_.id << endl;
161 // start transaction separation frame (counts down)
162 // +1 because it will be decremented on this iteration
163 separation_duration_
= Protocol::SEPARATION_FRAME
+ 1;
167 // Set current outsync info
170 c
.status
= current_
.status
;
173 // Do some post-processing
175 switch (current_
.status
)
177 case Protocol::TS_COMMITED
:
178 case Protocol::TS_ABORTED
:
180 if (separation_duration_
> 0) --separation_duration_
;
183 case Protocol::TS_BEGIN
:
187 size
= send_
->size ();
188 ACE_OS::memcpy (payload
, send_
->payload (), size
);
192 // get redy to vote for 'commit'
194 current_
.status
= Protocol::TS_COMMIT
;
195 voting_duration_
= 0;
201 current_transaction (Protocol::Transaction
const& t
,
205 Protocol::TransactionId
& id
= current_
.id
;
206 Protocol::TransactionStatus
& s
= current_
.status
;
208 if (id
== 0 && t
.id
!= 0) // catch up
212 case Protocol::TS_BEGIN
:
213 case Protocol::TS_COMMIT
:
214 case Protocol::TS_ABORT
:
217 s
= Protocol::TS_COMMITED
;
220 case Protocol::TS_ABORTED
:
221 case Protocol::TS_COMMITED
:
229 // if (trace_) cerr << "caught up with id " << id << endl;
232 bool stable (s
== Protocol::TS_COMMITED
|| s
== Protocol::TS_ABORTED
);
236 case Protocol::TS_BEGIN
:
238 if (!stable
|| t
.id
!= id
+ 1)
240 // Transaction is in progress or hole in transaction id's
242 // cerr << "unexpected request to join " << t
243 // << " while on " << current_ << endl;
245 // if (!stable) cerr << "voting progress is " << voting_duration_
246 // << "/" << Protocol::VOTING_FRAME << endl;
248 if (t
.id
== id
) // collision
250 if (!stable
&& s
!= Protocol::TS_ABORT
)
253 // cerr << "aborting both transactions" << endl;
255 s
= Protocol::TS_ABORT
;
256 voting_duration_
= 0; //@@ reset voting frame
261 // @@ delicate case. need to think more
263 // cerr << "Declaring node failed." << endl;
269 // join the transaction
273 recv_
= RecvPtr (new Recv (payload
, size
));
276 s
= Protocol::TS_COMMIT
;
277 voting_duration_
= 0;
279 // if (trace_) cerr << "joining-for-commit transaction with id "
284 case Protocol::TS_COMMIT
:
286 if (stable
&& id
== t
.id
- 1)
288 // not begin and and we haven't joined
295 current_
.status
= Protocol::TS_ABORT
;
296 voting_duration_
= 0;
298 // if (trace_) cerr << "joining-for-abort transaction with id "
299 // << current_.id << endl;
303 case Protocol::TS_ABORT
:
305 if ((!stable
&& id
== t
.id
&& s
== Protocol::TS_COMMIT
) ||
306 (stable
&& id
== t
.id
- 1)) // abort current || new transaction
308 // if (trace_) cerr << "voting-for-abort on transaction with id "
309 // << current_.id << endl;
312 s
= Protocol::TS_ABORT
;
314 voting_duration_
= 0; //@@ reseting voting_duration_
322 case Protocol::TS_ABORTED
:
323 case Protocol::TS_COMMITED
:
334 if ((current_
.status
== Protocol::TS_COMMITED
||
335 current_
.status
== Protocol::TS_ABORTED
) &&
336 separation_duration_
== 0) // no transaction in progress
338 // start new transaction
340 // Note that in_ is already locked by Scheduler
342 MessagePtr
m (in_
.front ());
345 Message
const &msg
= *m
; // avoid warning for side-effects in typeid
346 if (typeid (msg
) == typeid (Send
))
352 // cerr << "Expecting Send but received " << typeid (*m).name ()
359 current_
.status
= Protocol::TS_BEGIN
;
363 // if (trace_) cerr << "starting transaction with id " << current_.id
369 // FUZZ: disable check_for_ACE_Guard
370 typedef ACE_Guard
<ACE_Thread_Mutex
> AutoLock
;
371 // FUZZ: enable check_for_ACE_Guard
375 Protocol::Transaction current_
;
379 unsigned short voting_duration_
;
380 unsigned short separation_duration_
;
383 MessageQueue
& send_out_
;
384 MessageQueue
& recv_out_
;