Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / APG / ThreadPools / LF_ThreadPool.cpp
blob6b740f60bc553098d6ffced5bdfd27e0a2ab15f5
1 #include "ace/config-lite.h"
2 #if defined (ACE_HAS_THREADS)
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_sys_time.h"
6 #include "ace/Task.h"
7 #include "ace/Containers.h"
8 #include "ace/Synch.h"
9 #include "ace/Condition_T.h"
11 // Listing 4 code/ch16
12 class Follower
14 public:
15 Follower (ACE_Thread_Mutex &leader_lock)
16 : cond_(leader_lock)
18 owner_ = ACE_Thread::self ();
21 //FUZZ: disable check_for_lack_ACE_OS
22 int wait ()
24 return this->cond_.wait ();
27 int signal ()
29 return this->cond_.signal ();
31 //FUZZ: enable check_for_lack_ACE_OS
33 ACE_thread_t owner ()
35 return this->owner_;
38 private:
39 ACE_Condition<ACE_Thread_Mutex> cond_;
40 ACE_thread_t owner_;
42 // Listing 4
43 // Listing 1 code/ch16
44 class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>
46 public:
47 LF_ThreadPool () : shutdown_(0), current_leader_(0)
49 ACE_TRACE ("LF_ThreadPool::TP");
52 virtual int svc ();
54 void shut_down ()
56 shutdown_ = 1;
59 private:
60 int become_leader ();
62 Follower *make_follower ();
64 int elect_new_leader ();
66 int leader_active ()
68 ACE_TRACE ("LF_ThreadPool::leader_active");
69 return this->current_leader_ != 0;
72 void leader_active (ACE_thread_t leader)
74 ACE_TRACE ("LF_ThreadPool::leader_active");
75 this->current_leader_ = leader;
78 void process_message (ACE_Message_Block *mb);
80 int done ()
82 return (shutdown_ == 1);
85 private:
86 int shutdown_;
87 ACE_thread_t current_leader_;
88 ACE_Thread_Mutex leader_lock_;
89 ACE_Unbounded_Queue<Follower*> followers_;
90 ACE_Thread_Mutex followers_lock_;
91 static long LONG_TIME;
93 // Listing 1
94 // Listing 2 code/ch16
95 int
96 LF_ThreadPool::svc ()
98 ACE_TRACE ("LF_ThreadPool::svc");
99 while (!done ())
101 become_leader (); // Block until this thread is the leader.
103 ACE_Message_Block *mb = 0;
104 ACE_Time_Value tv (LONG_TIME);
105 tv += ACE_OS::gettimeofday ();
107 // Get a message, elect new leader, then process message.
108 if (this->getq (mb, &tv) < 0)
110 if (elect_new_leader () == 0)
111 break;
112 continue;
115 elect_new_leader ();
116 process_message (mb);
119 return 0;
121 // Listing 2
122 // Listing 3 code/ch16
124 LF_ThreadPool::become_leader ()
126 ACE_TRACE ("LF_ThreadPool::become_leader");
128 ACE_GUARD_RETURN
129 (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
130 if (leader_active ())
132 Follower *fw = make_follower ();
134 // Wait until told to do so.
135 while (leader_active ())
136 fw->wait ();
139 delete fw;
142 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader\n")));
144 // Mark yourself as the active leader.
145 leader_active (ACE_Thread::self ());
146 return 0;
149 Follower*
150 LF_ThreadPool::make_follower ()
152 ACE_TRACE ("LF_ThreadPool::make_follower");
154 ACE_GUARD_RETURN
155 (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);
156 Follower *fw;
157 ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);
158 this->followers_.enqueue_tail (fw);
159 return fw;
161 // Listing 3
162 // Listing 5 code/ch16
164 LF_ThreadPool::elect_new_leader ()
166 ACE_TRACE ("LF_ThreadPool::elect_new_leader");
168 ACE_GUARD_RETURN
169 (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
170 leader_active (0);
172 // Wake up a follower
173 if (!followers_.is_empty ())
175 ACE_GUARD_RETURN (ACE_Thread_Mutex,
176 follower_mon,
177 this->followers_lock_,
178 -1);
179 // Get the old follower.
180 Follower *fw;
181 if (this->followers_.dequeue_head (fw) != 0)
182 return -1;
183 ACE_DEBUG ((LM_DEBUG,
184 ACE_TEXT ("(%t) Resigning and Electing %d\n"),
185 fw->owner ()));
186 return (fw->signal () == 0) ? 0 : -1;
188 else
190 ACE_DEBUG
191 ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left\n")));
192 return -1;
195 // Listing 5
197 void
198 LF_ThreadPool::process_message (ACE_Message_Block *mb)
200 ACE_TRACE ("LF_ThreadPool::process_message");
201 int msgId;
202 ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
203 mb->release ();
205 ACE_DEBUG ((LM_DEBUG,
206 ACE_TEXT ("(%t) Started processing message:%d\n"),
207 msgId));
208 ACE_OS::sleep (1);
209 ACE_DEBUG ((LM_DEBUG,
210 ACE_TEXT ("(%t) Finished processing message:%d\n"),
211 msgId));
214 long LF_ThreadPool::LONG_TIME = 5L;
216 int ACE_TMAIN (int, ACE_TCHAR *[])
218 LF_ThreadPool tp;
219 tp.activate (THR_NEW_LWP| THR_JOINABLE, 5);
221 // Wait for a few seconds...
222 ACE_OS::sleep (2);
223 ACE_Time_Value tv (1L);
225 ACE_Message_Block *mb = 0;
226 for (int i = 0; i < 30; i++)
228 ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);
229 ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
230 ACE_OS::sleep (tv);
232 // Add a new work item.
233 tp.putq (mb);
236 ACE_Thread_Manager::instance ()->wait ();
238 ACE_OS::sleep (10);
240 return 0;
243 #else
244 #include "ace/OS_main.h"
245 #include "ace/OS_NS_stdio.h"
247 int ACE_TMAIN (int, ACE_TCHAR *[])
249 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
250 return 0;
253 #endif /* ACE_HAS_THREADS */