Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / apps / JAWS2 / JAWS / Concurrency.cpp
blob6f41bfde363615734afaed01f11fbd5e430bdee3
1 #include "JAWS/Concurrency.h"
2 #include "JAWS/IO_Handler.h"
3 #include "JAWS/Pipeline.h"
4 #include "JAWS/Pipeline_Tasks.h"
5 #include "JAWS/Policy.h"
6 #include "JAWS/Data_Block.h"
7 #include "JAWS/Waiter.h"
8 #include "JAWS/Reaper.h"
12 JAWS_Concurrency_Base::JAWS_Concurrency_Base (void)
13 : ACE_Task<ACE_SYNCH> (new ACE_Thread_Manager),
14 mb_acquired_ (0),
15 mb_ (0),
16 reaper_ (new JAWS_Reaper (this))
20 JAWS_Concurrency_Base::~JAWS_Concurrency_Base (void)
22 delete this->thr_mgr_;
23 delete this->reaper_;
26 ACE_Message_Block *
27 JAWS_Concurrency_Base::singleton_mb (void)
29 if (this->mb_acquired_ == 0)
31 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, g, this->lock_, 0);
33 if (this->mb_acquired_ == 0)
35 int result;
36 ACE_Message_Block *mb = 0;
38 result = this->getq (mb);
39 this->mb_acquired_ = 1;
41 if (result == -1 || mb == 0)
42 return 0;
44 this->mb_ = mb;
48 return this->mb_;
51 int
52 JAWS_Concurrency_Base::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
54 return this->putq (mb, tv);
57 int
58 JAWS_Concurrency_Base::svc (void)
60 JAWS_TRACE ("JAWS_Concurrency_Base::svc");
62 ACE_Message_Block *mb = 0; // The message queue element
63 JAWS_Data_Block *db = 0; // Contains the task list
65 mb = this->singleton_mb ();
67 // A NULL data block indicates that the thread should shut
68 // itself down
69 if (mb == 0)
71 JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty message block");
72 return -1;
75 db = dynamic_cast<JAWS_Data_Block *> (mb);
77 this->svc_loop (db);
79 return 0;
82 int
83 JAWS_Concurrency_Base::svc_loop (JAWS_Data_Block *db)
85 JAWS_TRACE ("JAWS_Concurrency_Base::svc_loop");
87 // Thread specific message block and data block
88 ACE_DEBUG ((LM_DEBUG, "(%t) Creating DataBlock\n"));
89 JAWS_Data_Block *ts_db = new JAWS_Data_Block (*db);
90 if (ts_db == 0)
92 ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc_hook"));
93 return -1;
96 for (;;)
98 if (this->svc_hook (ts_db) != 0)
99 break;
100 ts_db->task (db->task ());
101 ts_db->policy (db->policy ());
102 ts_db->payload (0);
103 ts_db->io_handler (0);
104 ts_db->rd_ptr (ts_db->wr_ptr ());
105 ts_db->crunch ();
108 ACE_DEBUG ((LM_DEBUG, "(%t) Deleting DataBlock\n"));
109 delete ts_db; // ts_db->release ();
111 return 0;
115 JAWS_Concurrency_Base::svc_hook (JAWS_Data_Block *ts_db)
117 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook");
119 int result = 0;
121 JAWS_IO_Handler *handler = 0; // Keeps the state of the task
122 JAWS_Pipeline_Handler *task = 0; // The task itself
123 JAWS_Data_Block *mb = 0; // The task message block
125 task = ts_db->task ();
126 handler = 0;
128 // Get the waiter index
129 JAWS_Waiter *waiter = JAWS_Waiter_Singleton::instance ();
130 int waiter_index = waiter->index ();
132 mb = ts_db;
135 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, looping");
137 // Use a NULL task to make the thread recycle now
138 if (task == 0)
140 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, recycling");
141 if (handler)
142 handler->done ();
143 handler = 0;
144 JAWS_IO_Handler **ioh = waiter->find_by_index (waiter_index);
145 *ioh = 0;
146 break;
149 // the task should set the handler to the appropriate next step
150 result = task->put (mb);
152 if (result == 0 || result == -3)
153 handler = mb->io_handler ();
154 else handler = 0;
156 if (result == 1 || result == 2)
158 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, waiting");
159 // need to wait for an asynchronous event
161 // We need a way to destroy all the handlers created by the
162 // Asynch_Acceptor. Figure this out later.
163 handler = waiter->wait_for_completion (waiter_index);
164 if (handler == 0)
166 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, bad proactor");
167 // Proactor failed
168 result = -1;
169 break;
173 if (result < 0)
175 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, negative result");
176 if (result == -1)
177 ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc_hook"));
179 if (handler)
180 handler->done ();
182 handler = 0;
183 if (result == -2)
185 JAWS_IO_Handler **ioh = waiter->find_by_index (waiter_index);
186 *ioh = 0;
187 result = 0;
189 break;
192 if (handler == 0)
193 break;
195 mb = handler->message_block ();
196 task = handler->task ();
197 result = 0;
199 while (result == 0);
201 return result;
205 JAWS_Concurrency_Base::activate_hook (void)
207 return 0;
210 JAWS_Dispatcher::JAWS_Dispatcher (void)
211 : policy_(0)
216 JAWS_Dispatcher::dispatch (ACE_Message_Block *mb)
218 return this->policy ()->concurrency ()->put (mb);
221 JAWS_Dispatch_Policy *
222 JAWS_Dispatcher::policy (void)
224 return this->policy_;
227 JAWS_Dispatch_Policy *
228 JAWS_Dispatcher::policy (JAWS_Dispatch_Policy *p)
230 this->policy_ = p;
231 return this->policy_;
235 JAWS_Thread_Pool_Task::make (long flags, int nthreads, int maxthreads)
237 this->flags_ = flags;
238 this->nthreads_ = nthreads;
239 this->maxthreads_ = maxthreads;
241 ACE_thread_t *thr_names = new ACE_thread_t[nthreads];
243 if (this->activate (flags | THR_SUSPENDED,
244 nthreads,
245 0, // force active
246 ACE_DEFAULT_THREAD_PRIORITY,
247 -1, // group id
248 0, // ACE_Task_Base
249 0, // thread handles
250 0, // stack
251 0, // stack size
252 thr_names) == -1)
253 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Thread_Pool_Task::activate"),
254 -1);
256 for (int i = 0; i < nthreads; i++)
258 JAWS_Thread_ID thr_id(thr_names[i]);
259 JAWS_IO_Handler *dummy = 0;
261 JAWS_Waiter_Singleton::instance ()->insert (thr_id, dummy);
264 delete[] thr_names;
266 this->thr_mgr_->resume_all ();
268 if (this->reaper_->open () == -1)
269 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::open"),
270 -1);
272 return 0;
276 JAWS_Thread_Per_Task::make (long flags, int maxthreads)
278 this->flags_ = flags;
279 this->maxthreads_ = maxthreads;
280 return 0;
284 JAWS_Thread_Per_Task::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
286 JAWS_TRACE ("JAWS_Thread_Per_Task::put");
288 this->putq (mb, tv);
289 return this->activate_hook ();
293 JAWS_Thread_Per_Task::svc_loop (JAWS_Data_Block *db)
295 return this->svc_hook (db);
299 JAWS_Thread_Per_Task::activate_hook (void)
301 const int force_active = 1;
302 const int nthreads = 1;
304 ACE_thread_t thr_name;
306 if (this->activate (this->flags_ | THR_SUSPENDED,
307 nthreads,
308 force_active,
309 ACE_DEFAULT_THREAD_PRIORITY,
310 -1, // group id
311 0, // ACE_Task_Base
312 0, // thread handle
313 0, // stack
314 0, // stack size
315 &thr_name) == -1)
316 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Thread_Pool_Task::activate"),
317 -1);
319 JAWS_Thread_ID thr_id (thr_name);
320 JAWS_IO_Handler *dummy = 0;
322 // In the thread-per-request strategy, need to take care of the
323 // case when the waiter array is full. Think about that problem
324 // later.
325 JAWS_Waiter_Singleton::instance ()->insert (thr_id, dummy);
327 this->thr_mgr_->resume (thr_name);
329 if (this->reaper_->open () == -1)
330 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::open"),
331 -1);
333 return 0;