Use =default for skeleton copy constructor
[ACE_TAO.git] / ACE / tests / Bug_3878_Regression_Test.cpp
blob4fe3c47e10ce63dc7b65aa171773f56c03711e1f
1 #include "ace/config-lite.h"
3 #include "ace/OS_NS_string.h"
4 #include "ace/OS_NS_time.h"
5 #include "ace/Task.h"
6 #include "ace/Containers.h"
7 #include "ace/Synch.h"
8 #include "ace/SString.h"
9 #include "ace/Method_Request.h"
10 #include "ace/Future.h"
11 #include "ace/Activation_Queue.h"
12 #include "ace/Condition_T.h"
13 #include "test_config.h"
15 #if defined (ACE_HAS_THREADS)
17 class Worker;
19 class IManager
21 public:
22 virtual ~IManager () { }
24 virtual int return_to_work (Worker *worker) = 0;
27 // Listing 2 code/ch16
28 class Worker : public ACE_Task<ACE_MT_SYNCH>
30 public:
31 Worker (IManager *manager) : manager_(manager) { }
33 int svc () override
35 ACE_Thread_ID id;
36 thread_id_ = id;
37 while (1)
39 ACE_Message_Block *mb = 0;
40 if (this->getq (mb) == -1)
41 ACE_ERROR_BREAK
42 ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")));
43 if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
45 ACE_DEBUG ((LM_INFO,
46 ACE_TEXT ("(%t) Shutting down\n")));
47 mb->release ();
48 break;
50 // Process the message.
51 process_message (mb);
52 // Return to work.
53 this->manager_->return_to_work (this);
56 return 0;
59 const ACE_Thread_ID& thread_id ()
61 return thread_id_;
64 private:
65 void process_message (ACE_Message_Block *mb)
67 ACE_TRACE ("Worker::process_message");
68 int msgId;
69 ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
70 mb->release ();
72 ACE_DEBUG ((LM_DEBUG,
73 ACE_TEXT ("(%t) Started processing message %d\n"),
74 msgId));
75 ACE_OS::sleep (3);
76 ACE_DEBUG ((LM_DEBUG,
77 ACE_TEXT ("(%t) Finished processing message %d\n"),
78 msgId));
81 IManager *manager_;
82 ACE_Thread_ID thread_id_;
85 // Listing 1 code/ch16
86 class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
88 public:
89 enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
91 Manager ()
92 : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
94 ACE_TRACE ("Manager::Manager");
97 int svc () override
99 ACE_TRACE ("Manager::svc");
101 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
103 // Create pool.
104 create_worker_pool ();
106 while (!done ())
108 ACE_Message_Block *mb = 0;
109 ACE_Time_Value tv ((long)MAX_TIMEOUT);
110 tv += ACE_OS::time (0);
112 // Get a message request.
113 if (this->getq (mb, &tv) < 0)
115 shut_down ();
116 break;
119 // Choose a worker.
120 Worker *worker = 0;
122 ACE_GUARD_RETURN (ACE_Mutex,
123 worker_mon, this->workers_lock_, -1);
125 while (this->workers_.is_empty ())
126 workers_cond_.wait ();
128 this->workers_.dequeue_head (worker);
131 // Ask the worker to do the job.
132 worker->putq (mb);
135 return 0;
138 int shut_down ();
140 const ACE_Thread_ID& thread_id (Worker *worker);
142 int return_to_work (Worker *worker) override
144 ACE_GUARD_RETURN (ACE_Mutex,
145 worker_mon, this->workers_lock_, -1);
146 ACE_DEBUG ((LM_DEBUG,
147 ACE_TEXT ("(%t) Worker %t returning to work.\n")));
148 this->workers_.enqueue_tail (worker);
149 this->workers_cond_.signal ();
151 return 0;
154 private:
155 int create_worker_pool ()
157 ACE_GUARD_RETURN (ACE_Mutex,
158 worker_mon,
159 this->workers_lock_,
160 -1);
161 for (int i = 0; i < POOL_SIZE; i++)
163 Worker *worker;
164 ACE_NEW_RETURN (worker, Worker (this), -1);
165 this->workers_.enqueue_tail (worker);
166 worker->activate ();
169 return 0;
172 int done ();
174 private:
175 int shutdown_;
176 ACE_Mutex workers_lock_;
177 ACE_Condition<ACE_Mutex> workers_cond_;
178 ACE_Unbounded_Queue<Worker* > workers_;
180 // Listing 1
182 int Manager::done ()
184 return (shutdown_ == 1);
188 Manager::shut_down ()
190 ACE_TRACE ("Manager::shut_down");
191 ACE_Unbounded_Queue<Worker* >::ITERATOR iter =
192 this->workers_.begin ();
193 Worker **worker_ptr = 0;
196 iter.next (worker_ptr);
197 Worker *worker = (*worker_ptr);
198 ACE_Thread_ID id = thread_id (worker);
199 char buf [65];
200 id.to_string (buf, 65);
201 ACE_DEBUG ((LM_DEBUG,
202 ACE_TEXT ("(%t) Attempting shutdown of %C\n"),
203 buf));
205 // Send the hangup message.
206 ACE_Message_Block *mb = 0;
207 ACE_NEW_RETURN
208 (mb,
209 ACE_Message_Block(0,
210 ACE_Message_Block::MB_HANGUP),
211 -1);
212 worker->putq (mb);
214 // Wait for the exit.
215 worker->wait ();
217 ACE_TEST_ASSERT (worker->msg_queue ()->is_empty ());
218 ACE_DEBUG ((LM_DEBUG,
219 ACE_TEXT ("(%t) Worker %C shut down.\n"),
220 buf));
221 delete worker;
223 while (iter.advance ());
225 shutdown_ = 1;
227 return 0;
230 const ACE_Thread_ID&
231 Manager::thread_id (Worker *worker)
233 return worker->thread_id ();
235 #endif
238 run_main (int, ACE_TCHAR *[])
240 ACE_START_TEST (ACE_TEXT ("Bug_3878_Regression_Test"));
242 #if defined (ACE_HAS_THREADS)
243 Manager tp;
244 tp.activate ();
246 // Wait for a moment every time you send a message.
247 ACE_Time_Value tv;
248 tv.msec (100);
250 ACE_Message_Block *mb = 0;
251 for (int i = 0; i < 3; i++)
253 ACE_NEW_RETURN
254 (mb, ACE_Message_Block(sizeof(int)), -1);
256 ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
258 ACE_OS::sleep (tv);
260 // Add a new work item.
261 tp.putq (mb);
264 ACE_Thread_Manager::instance ()->wait ();
265 #else /* ACE_HAS_THREADS */
266 ACE_ERROR ((LM_INFO,
267 ACE_TEXT ("threads are not supported on this platform\n")));
268 #endif
270 ACE_END_TEST;
272 return 0;