Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / RMCast / Acknowledge.h
blob1c3f968a789e0cc82c32b66cb0b4c8f1d2be4ab1
1 // author : Boris Kolpackov <boris@kolpackov.net>
3 #ifndef ACE_RMCAST_ACKNOWLEDGE_H
4 #define ACE_RMCAST_ACKNOWLEDGE_H
6 #include "ace/Hash_Map_Manager.h"
7 #include "ace/Thread_Manager.h"
9 #include "Stack.h"
10 #include "Protocol.h"
11 #include "Bits.h"
12 #include "Parameters.h"
14 #if !defined (ACE_RMCAST_DEFAULT_MAP_SIZE)
15 #define ACE_RMCAST_DEFAULT_MAP_SIZE 10
16 #endif /* ACE_RMCAST_DEFAULT_MAP_SIZE */
18 #if !defined (ACE_RMCAST_DEFAULT_QUEUE_SIZE)
19 #define ACE_RMCAST_DEFAULT_QUEUE_SIZE 10
20 #endif /* ACE_RMCAST_DEFAULT_QUEUE_SIZE */
22 namespace ACE_RMCast
24 class Acknowledge : public Element
26 public:
27 Acknowledge (Parameters const& params);
29 virtual void
30 in_start (In_Element* in);
32 virtual void
33 out_start (Out_Element* out);
35 virtual void
36 out_stop ();
38 public:
39 virtual void
40 recv (Message_ptr m);
42 virtual void
43 send (Message_ptr m);
45 private:
46 struct Descr
48 //@@ There should be no default c-tor.
50 Descr ()
51 : nak_count_ (0), timer_ (1)
55 Descr (unsigned long timer)
56 : nak_count_ (0), timer_ (timer)
60 Descr (Message_ptr m)
61 : m_ (m)
65 public:
66 bool
67 lost () const
69 return m_.get () == 0;
72 public:
73 Message_ptr
74 msg ()
76 return m_;
79 void
80 msg (Message_ptr m)
82 m_ = m;
85 public:
86 unsigned long
87 nak_count () const
89 return nak_count_;
92 void
93 nak_count (unsigned long v)
95 nak_count_ = v;
98 unsigned long
99 timer () const
101 return timer_;
104 void
105 timer (unsigned long v)
107 timer_ = v;
110 private:
111 Message_ptr m_;
113 unsigned long nak_count_;
114 unsigned long timer_;
117 private:
118 struct Queue : ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex>
120 typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> Base;
122 // Should never be here but required by ACE_Hash_Blah_Blah.
124 Queue ()
125 : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (0), max_sn_ (0)
129 Queue (u64 sn)
130 : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (sn), max_sn_ (sn)
134 Queue (Queue const& q)
135 : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (q.sn_), max_sn_ (sn_)
137 for (Queue::const_iterator i (q), e (q, 1); i != e; ++i)
139 bind ((*i).ext_id_, (*i).int_id_);
143 public:
145 bind (u64 sn, Descr const& d)
147 int r (Base::bind (sn, d));
149 if (r == 0 && sn > max_sn_) max_sn_ = sn;
151 return r;
155 rebind (u64 sn, Descr const& d)
157 int r (Base::rebind (sn, d));
159 if (r == 0 && sn > max_sn_) max_sn_ = sn;
161 return r;
165 unbind (u64 sn)
167 int r (Base::unbind (sn));
169 if (r == 0 && sn == max_sn_)
171 for (--max_sn_; max_sn_ >= sn_; --max_sn_)
173 if (find (max_sn_) == 0) break;
177 return r;
180 public:
182 sn () const
184 return sn_;
187 void
188 sn (u64 sn)
190 sn_ = sn;
194 max_sn () const
196 if (current_size () == 0) return sn_;
198 return max_sn_;
201 private:
202 u64 sn_, max_sn_;
205 typedef
206 ACE_Hash_Map_Manager_Ex<Address,
207 Queue,
208 AddressHasher,
209 ACE_Equal_To<Address>,
210 ACE_Null_Mutex>
211 Map;
213 private:
214 void
215 collapse (Queue& q);
217 void
218 track ();
220 void
221 track_queue (Address const& addr, Queue& q, Messages& msgs);
223 Profile_ptr
224 create_nrtm (u32 max_elem);
226 static ACE_THR_FUNC_RETURN
227 track_thunk (void* obj);
229 private:
230 Parameters const& params_;
232 Map hold_;
233 Mutex mutex_;
234 Condition cond_;
236 unsigned long nrtm_timer_;
238 bool stop_;
239 ACE_Thread_Manager tracker_mgr_;
243 #endif // ACE_RMCAST_ACKNOWLEDGE_H