1 #include "ace/config-lite.h"
2 #if defined (ACE_HAS_THREADS)
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_sys_time.h"
7 #include "ace/Containers.h"
9 #include "ace/Condition_T.h"
11 // Listing 4 code/ch16
15 Follower (ACE_Thread_Mutex
&leader_lock
)
18 owner_
= ACE_Thread::self ();
21 //FUZZ: disable check_for_lack_ACE_OS
24 return this->cond_
.wait ();
29 return this->cond_
.signal ();
31 //FUZZ: enable check_for_lack_ACE_OS
39 ACE_Condition
<ACE_Thread_Mutex
> cond_
;
43 // Listing 1 code/ch16
44 class LF_ThreadPool
: public ACE_Task
<ACE_MT_SYNCH
>
47 LF_ThreadPool () : shutdown_(0), current_leader_(0)
49 ACE_TRACE ("LF_ThreadPool::TP");
62 Follower
*make_follower ();
64 int elect_new_leader ();
68 ACE_TRACE ("LF_ThreadPool::leader_active");
69 return this->current_leader_
!= 0;
72 void leader_active (ACE_thread_t leader
)
74 ACE_TRACE ("LF_ThreadPool::leader_active");
75 this->current_leader_
= leader
;
78 void process_message (ACE_Message_Block
*mb
);
82 return (shutdown_
== 1);
87 ACE_thread_t current_leader_
;
88 ACE_Thread_Mutex leader_lock_
;
89 ACE_Unbounded_Queue
<Follower
*> followers_
;
90 ACE_Thread_Mutex followers_lock_
;
91 static long LONG_TIME
;
94 // Listing 2 code/ch16
98 ACE_TRACE ("LF_ThreadPool::svc");
101 become_leader (); // Block until this thread is the leader.
103 ACE_Message_Block
*mb
= 0;
104 ACE_Time_Value
tv (LONG_TIME
);
105 tv
+= ACE_OS::gettimeofday ();
107 // Get a message, elect new leader, then process message.
108 if (this->getq (mb
, &tv
) < 0)
110 if (elect_new_leader () == 0)
116 process_message (mb
);
122 // Listing 3 code/ch16
124 LF_ThreadPool::become_leader ()
126 ACE_TRACE ("LF_ThreadPool::become_leader");
129 (ACE_Thread_Mutex
, leader_mon
, this->leader_lock_
, -1);
130 if (leader_active ())
132 Follower
*fw
= make_follower ();
134 // Wait until told to do so.
135 while (leader_active ())
142 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Becoming the leader\n")));
144 // Mark yourself as the active leader.
145 leader_active (ACE_Thread::self ());
150 LF_ThreadPool::make_follower ()
152 ACE_TRACE ("LF_ThreadPool::make_follower");
155 (ACE_Thread_Mutex
, follower_mon
, this->followers_lock_
, 0);
157 ACE_NEW_RETURN (fw
, Follower (this->leader_lock_
), 0);
158 this->followers_
.enqueue_tail (fw
);
162 // Listing 5 code/ch16
164 LF_ThreadPool::elect_new_leader ()
166 ACE_TRACE ("LF_ThreadPool::elect_new_leader");
169 (ACE_Thread_Mutex
, leader_mon
, this->leader_lock_
, -1);
172 // Wake up a follower
173 if (!followers_
.is_empty ())
175 ACE_GUARD_RETURN (ACE_Thread_Mutex
,
177 this->followers_lock_
,
179 // Get the old follower.
181 if (this->followers_
.dequeue_head (fw
) != 0)
183 ACE_DEBUG ((LM_DEBUG
,
184 ACE_TEXT ("(%t) Resigning and Electing %d\n"),
186 return (fw
->signal () == 0) ? 0 : -1;
191 ((LM_ERROR
, ACE_TEXT ("(%t) Oops no followers left\n")));
198 LF_ThreadPool::process_message (ACE_Message_Block
*mb
)
200 ACE_TRACE ("LF_ThreadPool::process_message");
202 ACE_OS::memcpy (&msgId
, mb
->rd_ptr (), sizeof(int));
205 ACE_DEBUG ((LM_DEBUG
,
206 ACE_TEXT ("(%t) Started processing message:%d\n"),
209 ACE_DEBUG ((LM_DEBUG
,
210 ACE_TEXT ("(%t) Finished processing message:%d\n"),
214 long LF_ThreadPool::LONG_TIME
= 5L;
216 int ACE_TMAIN (int, ACE_TCHAR
*[])
219 tp
.activate (THR_NEW_LWP
| THR_JOINABLE
, 5);
221 // Wait for a few seconds...
223 ACE_Time_Value
tv (1L);
225 ACE_Message_Block
*mb
= 0;
226 for (int i
= 0; i
< 30; i
++)
228 ACE_NEW_RETURN (mb
, ACE_Message_Block (sizeof(int)), -1);
229 ACE_OS::memcpy (mb
->wr_ptr (), &i
, sizeof(int));
232 // Add a new work item.
236 ACE_Thread_Manager::instance ()->wait ();
244 #include "ace/OS_main.h"
245 #include "ace/OS_NS_stdio.h"
247 int ACE_TMAIN (int, ACE_TCHAR
*[])
249 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
253 #endif /* ACE_HAS_THREADS */