Use =default for skeleton copy constructor
[ACE_TAO.git] / ACE / apps / JAWS2 / JAWS / Concurrency.cpp
blobbfebb34802a84944e1d4159443f33791c05f974e
1 #include "JAWS/Concurrency.h"
2 #include "JAWS/IO_Handler.h"
3 #include "JAWS/Pipeline.h"
4 #include "JAWS/Pipeline_Tasks.h"
5 #include "JAWS/Policy.h"
6 #include "JAWS/Data_Block.h"
7 #include "JAWS/Waiter.h"
8 #include "JAWS/Reaper.h"
11 JAWS_Concurrency_Base::JAWS_Concurrency_Base ()
12 : ACE_Task<ACE_SYNCH> (new ACE_Thread_Manager),
13 mb_acquired_ (0),
14 mb_ (0),
15 reaper_ (new JAWS_Reaper (this))
19 JAWS_Concurrency_Base::~JAWS_Concurrency_Base ()
21 delete this->thr_mgr_;
22 delete this->reaper_;
25 ACE_Message_Block *
26 JAWS_Concurrency_Base::singleton_mb ()
28 if (this->mb_acquired_ == 0)
30 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, g, this->lock_, 0);
32 if (this->mb_acquired_ == 0)
34 int result;
35 ACE_Message_Block *mb = 0;
37 result = this->getq (mb);
38 this->mb_acquired_ = 1;
40 if (result == -1 || mb == 0)
41 return 0;
43 this->mb_ = mb;
47 return this->mb_;
50 int
51 JAWS_Concurrency_Base::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
53 return this->putq (mb, tv);
56 int
57 JAWS_Concurrency_Base::svc ()
59 JAWS_TRACE ("JAWS_Concurrency_Base::svc");
61 ACE_Message_Block *mb = 0; // The message queue element
62 JAWS_Data_Block *db = 0; // Contains the task list
64 mb = this->singleton_mb ();
66 // A NULL data block indicates that the thread should shut
67 // itself down
68 if (mb == 0)
70 JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty message block");
71 return -1;
74 db = dynamic_cast<JAWS_Data_Block *> (mb);
76 this->svc_loop (db);
78 return 0;
81 int
82 JAWS_Concurrency_Base::svc_loop (JAWS_Data_Block *db)
84 JAWS_TRACE ("JAWS_Concurrency_Base::svc_loop");
86 // Thread specific message block and data block
87 ACE_DEBUG ((LM_DEBUG, "(%t) Creating DataBlock\n"));
88 JAWS_Data_Block *ts_db = new JAWS_Data_Block (*db);
89 if (ts_db == 0)
91 ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc_hook"));
92 return -1;
95 for (;;)
97 if (this->svc_hook (ts_db) != 0)
98 break;
99 ts_db->task (db->task ());
100 ts_db->policy (db->policy ());
101 ts_db->payload (0);
102 ts_db->io_handler (0);
103 ts_db->rd_ptr (ts_db->wr_ptr ());
104 ts_db->crunch ();
107 ACE_DEBUG ((LM_DEBUG, "(%t) Deleting DataBlock\n"));
108 delete ts_db; // ts_db->release ();
110 return 0;
114 JAWS_Concurrency_Base::svc_hook (JAWS_Data_Block *ts_db)
116 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook");
118 int result = 0;
120 JAWS_IO_Handler *handler = 0; // Keeps the state of the task
121 JAWS_Pipeline_Handler *task = 0; // The task itself
122 JAWS_Data_Block *mb = 0; // The task message block
124 task = ts_db->task ();
125 handler = 0;
127 // Get the waiter index
128 JAWS_Waiter *waiter = JAWS_Waiter_Singleton::instance ();
129 int waiter_index = waiter->index ();
131 mb = ts_db;
134 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, looping");
136 // Use a NULL task to make the thread recycle now
137 if (task == 0)
139 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, recycling");
140 if (handler)
141 handler->done ();
142 handler = 0;
143 JAWS_IO_Handler **ioh = waiter->find_by_index (waiter_index);
144 *ioh = 0;
145 break;
148 // the task should set the handler to the appropriate next step
149 result = task->put (mb);
151 if (result == 0 || result == -3)
152 handler = mb->io_handler ();
153 else handler = 0;
155 if (result == 1 || result == 2)
157 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, waiting");
158 // need to wait for an asynchronous event
160 // We need a way to destroy all the handlers created by the
161 // Asynch_Acceptor. Figure this out later.
162 handler = waiter->wait_for_completion (waiter_index);
163 if (handler == 0)
165 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, bad proactor");
166 // Proactor failed
167 result = -1;
168 break;
172 if (result < 0)
174 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, negative result");
175 if (result == -1)
176 ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc_hook"));
178 if (handler)
179 handler->done ();
181 handler = 0;
182 if (result == -2)
184 JAWS_IO_Handler **ioh = waiter->find_by_index (waiter_index);
185 *ioh = 0;
186 result = 0;
188 break;
191 if (handler == 0)
192 break;
194 mb = handler->message_block ();
195 task = handler->task ();
196 result = 0;
198 while (result == 0);
200 return result;
204 JAWS_Concurrency_Base::activate_hook ()
206 return 0;
209 JAWS_Dispatcher::JAWS_Dispatcher ()
210 : policy_(0)
215 JAWS_Dispatcher::dispatch (ACE_Message_Block *mb)
217 return this->policy ()->concurrency ()->put (mb);
220 JAWS_Dispatch_Policy *
221 JAWS_Dispatcher::policy ()
223 return this->policy_;
226 JAWS_Dispatch_Policy *
227 JAWS_Dispatcher::policy (JAWS_Dispatch_Policy *p)
229 this->policy_ = p;
230 return this->policy_;
234 JAWS_Thread_Pool_Task::make (long flags, int nthreads, int maxthreads)
236 this->flags_ = flags;
237 this->nthreads_ = nthreads;
238 this->maxthreads_ = maxthreads;
240 ACE_thread_t *thr_names = new ACE_thread_t[nthreads];
242 if (this->activate (flags | THR_SUSPENDED,
243 nthreads,
244 0, // force active
245 ACE_DEFAULT_THREAD_PRIORITY,
246 -1, // group id
247 0, // ACE_Task_Base
248 0, // thread handles
249 0, // stack
250 0, // stack size
251 thr_names) == -1)
252 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Thread_Pool_Task::activate"),
253 -1);
255 for (int i = 0; i < nthreads; i++)
257 JAWS_Thread_ID thr_id(thr_names[i]);
258 JAWS_IO_Handler *dummy = 0;
260 JAWS_Waiter_Singleton::instance ()->insert (thr_id, dummy);
263 delete[] thr_names;
265 this->thr_mgr_->resume_all ();
267 if (this->reaper_->open () == -1)
268 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::open"),
269 -1);
271 return 0;
275 JAWS_Thread_Per_Task::make (long flags, int maxthreads)
277 this->flags_ = flags;
278 this->maxthreads_ = maxthreads;
279 return 0;
283 JAWS_Thread_Per_Task::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
285 JAWS_TRACE ("JAWS_Thread_Per_Task::put");
287 this->putq (mb, tv);
288 return this->activate_hook ();
292 JAWS_Thread_Per_Task::svc_loop (JAWS_Data_Block *db)
294 return this->svc_hook (db);
298 JAWS_Thread_Per_Task::activate_hook ()
300 const int force_active = 1;
301 const int nthreads = 1;
303 ACE_thread_t thr_name;
305 if (this->activate (this->flags_ | THR_SUSPENDED,
306 nthreads,
307 force_active,
308 ACE_DEFAULT_THREAD_PRIORITY,
309 -1, // group id
310 0, // ACE_Task_Base
311 0, // thread handle
312 0, // stack
313 0, // stack size
314 &thr_name) == -1)
315 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Thread_Pool_Task::activate"),
316 -1);
318 JAWS_Thread_ID thr_id (thr_name);
319 JAWS_IO_Handler *dummy = 0;
321 // In the thread-per-request strategy, need to take care of the
322 // case when the waiter array is full. Think about that problem
323 // later.
324 JAWS_Waiter_Singleton::instance ()->insert (thr_id, dummy);
326 this->thr_mgr_->resume (thr_name);
328 if (this->reaper_->open () == -1)
329 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::open"),
330 -1);
332 return 0;