Merge pull request #2317 from jwillemsen/jwi-deleteop
[ACE_TAO.git] / ACE / protocols / ace / TMCast / TransactionController.hpp
blob3165f551931b0e906793602a75688c3dbe5c2e56
1 // author : Boris Kolpackov <boris@dre.vanderbilt.edu>
3 #include "ace/OS_NS_string.h"
4 #include "ace/OS_NS_stdlib.h"
5 #include "ace/Synch.h"
6 #include "ace/Bound_Ptr.h"
8 #include "Protocol.hpp"
9 #include "Messaging.hpp"
11 #include <typeinfo>
13 namespace ACE_TMCast
16 // Messages
19 class Send : public virtual Message
21 public:
22 Send (void const* msg, size_t size)
23 : size_ (size)
25 ACE_OS::memcpy (payload_, msg, size_);
28 void const*
29 payload () const
31 return payload_;
34 size_t
35 size () const
37 return size_;
40 private:
41 size_t size_;
42 char payload_[Protocol::MAX_PAYLOAD_SIZE];
45 typedef
46 ACE_Strong_Bound_Ptr<Send, ACE_SYNCH_MUTEX>
47 SendPtr;
50 class Recv : public virtual Message
52 public:
53 Recv (void const* msg, size_t size)
54 : size_ (size)
56 ACE_OS::memcpy (payload_, msg, size_);
59 void const*
60 payload () const
62 return payload_;
65 size_t
66 size () const
68 return size_;
71 private:
72 size_t size_;
73 char payload_[Protocol::MAX_PAYLOAD_SIZE];
76 typedef
77 ACE_Strong_Bound_Ptr<Recv, ACE_SYNCH_MUTEX>
78 RecvPtr;
80 class Aborted : public virtual Message {};
82 class Commited : public virtual Message {};
88 class TransactionController
90 public:
91 TransactionController (MessageQueue& in,
92 MessageQueue& send_out,
93 MessageQueue& recv_out)
94 : //trace_ (false),
95 voting_duration_ (0),
96 separation_duration_ (0),
97 in_ (in),
98 send_out_ (send_out),
99 recv_out_ (recv_out)
101 current_.id = 0;
102 current_.status = Protocol::TS_COMMITED;
105 public:
106 class Failure {};
109 void
110 outsync (Protocol::Transaction& c, void* payload, size_t& size)
112 if (current_.status == Protocol::TS_COMMIT ||
113 current_.status == Protocol::TS_ABORT)
115 if (++voting_duration_ >= Protocol::VOTING_FRAME)
117 // end of voting frame
119 if (current_.status == Protocol::TS_COMMIT)
122 if (initiated_)
124 MessageQueueAutoLock lock (send_out_);
125 send_out_.push (MessagePtr (new Commited));
127 else // joined transaction
129 MessageQueueAutoLock lock (recv_out_);
130 recv_out_.push (MessagePtr (recv_));
131 recv_ = RecvPtr ();
135 current_.status = Protocol::TS_COMMITED;
137 // if (trace_) cerr << "commited transaction with id "
138 // << current_.id << endl;
140 else // TS_ABORT
142 if (initiated_)
144 MessageQueueAutoLock lock (send_out_);
145 send_out_.push (MessagePtr (new Aborted));
147 else
149 // free revc_ buffer if necessary
151 if (recv_.get ()) recv_ = RecvPtr ();
155 current_.status = Protocol::TS_ABORTED;
157 // if (trace_) cerr << "aborted transaction with id "
158 // << current_.id << endl;
161 // start transaction separation frame (counts down)
162 // +1 because it will be decremented on this iteration
163 separation_duration_ = Protocol::SEPARATION_FRAME + 1;
167 // Set current outsync info
169 c.id = current_.id;
170 c.status = current_.status;
173 // Do some post-processing
175 switch (current_.status)
177 case Protocol::TS_COMMITED:
178 case Protocol::TS_ABORTED:
180 if (separation_duration_ > 0) --separation_duration_;
181 break;
183 case Protocol::TS_BEGIN:
185 // transfer payload
187 size = send_->size ();
188 ACE_OS::memcpy (payload, send_->payload (), size);
190 send_ = SendPtr ();
192 // get redy to vote for 'commit'
194 current_.status = Protocol::TS_COMMIT;
195 voting_duration_ = 0;
200 void
201 current_transaction (Protocol::Transaction const& t,
202 void const* payload,
203 size_t size)
205 Protocol::TransactionId& id = current_.id;
206 Protocol::TransactionStatus& s = current_.status;
208 if (id == 0 && t.id != 0) // catch up
210 switch (t.status)
212 case Protocol::TS_BEGIN:
213 case Protocol::TS_COMMIT:
214 case Protocol::TS_ABORT:
216 id = t.id - 1;
217 s = Protocol::TS_COMMITED;
218 break;
220 case Protocol::TS_ABORTED:
221 case Protocol::TS_COMMITED:
223 id = t.id;
224 s = t.status;
225 break;
229 // if (trace_) cerr << "caught up with id " << id << endl;
232 bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED);
234 switch (t.status)
236 case Protocol::TS_BEGIN:
238 if (!stable || t.id != id + 1)
240 // Transaction is in progress or hole in transaction id's
242 // cerr << "unexpected request to join " << t
243 // << " while on " << current_ << endl;
245 // if (!stable) cerr << "voting progress is " << voting_duration_
246 // << "/" << Protocol::VOTING_FRAME << endl;
248 if (t.id == id) // collision
250 if (!stable && s != Protocol::TS_ABORT)
252 // abort both
253 // cerr << "aborting both transactions" << endl;
255 s = Protocol::TS_ABORT;
256 voting_duration_ = 0; //@@ reset voting frame
259 else
261 // @@ delicate case. need to think more
263 // cerr << "Declaring node failed." << endl;
264 throw Failure ();
267 else
269 // join the transaction
271 initiated_ = false;
273 recv_ = RecvPtr (new Recv (payload, size));
275 id = t.id;
276 s = Protocol::TS_COMMIT;
277 voting_duration_ = 0;
279 // if (trace_) cerr << "joining-for-commit transaction with id "
280 // << id << endl;
282 break;
284 case Protocol::TS_COMMIT:
286 if (stable && id == t.id - 1)
288 // not begin and and we haven't joined
290 // join for abort
292 initiated_ = false;
294 current_.id = t.id;
295 current_.status = Protocol::TS_ABORT;
296 voting_duration_ = 0;
298 // if (trace_) cerr << "joining-for-abort transaction with id "
299 // << current_.id << endl;
301 break;
303 case Protocol::TS_ABORT:
305 if ((!stable && id == t.id && s == Protocol::TS_COMMIT) ||
306 (stable && id == t.id - 1)) // abort current || new transaction
308 // if (trace_) cerr << "voting-for-abort on transaction with id "
309 // << current_.id << endl;
311 id = t.id;
312 s = Protocol::TS_ABORT;
314 voting_duration_ = 0; //@@ reseting voting_duration_
316 else
320 break;
322 case Protocol::TS_ABORTED:
323 case Protocol::TS_COMMITED:
325 // nothing for now
326 break;
331 void
332 api ()
334 if ((current_.status == Protocol::TS_COMMITED ||
335 current_.status == Protocol::TS_ABORTED) &&
336 separation_duration_ == 0) // no transaction in progress
338 // start new transaction
340 // Note that in_ is already locked by Scheduler
342 MessagePtr m (in_.front ());
343 in_.pop ();
345 Message const &msg = *m; // avoid warning for side-effects in typeid
346 if (typeid (msg) == typeid (Send))
348 send_ = SendPtr (m);
350 else
352 // cerr << "Expecting Send but received " << typeid (*m).name ()
353 // << endl;
355 ACE_OS::abort ();
358 current_.id++;
359 current_.status = Protocol::TS_BEGIN;
361 initiated_ = true;
363 // if (trace_) cerr << "starting transaction with id " << current_.id
364 // << endl;
368 private:
369 // FUZZ: disable check_for_ACE_Guard
370 typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
371 // FUZZ: enable check_for_ACE_Guard
373 // bool trace_;
375 Protocol::Transaction current_;
377 bool initiated_;
379 unsigned short voting_duration_;
380 unsigned short separation_duration_;
382 MessageQueue& in_;
383 MessageQueue& send_out_;
384 MessageQueue& recv_out_;
386 SendPtr send_;
387 RecvPtr recv_;