1 #include "JAWS/Concurrency.h"
2 #include "JAWS/IO_Handler.h"
3 #include "JAWS/Pipeline.h"
4 #include "JAWS/Pipeline_Tasks.h"
5 #include "JAWS/Policy.h"
6 #include "JAWS/Data_Block.h"
7 #include "JAWS/Waiter.h"
8 #include "JAWS/Reaper.h"
11 JAWS_Concurrency_Base::JAWS_Concurrency_Base ()
12 : ACE_Task
<ACE_SYNCH
> (new ACE_Thread_Manager
),
15 reaper_ (new JAWS_Reaper (this))
19 JAWS_Concurrency_Base::~JAWS_Concurrency_Base ()
21 delete this->thr_mgr_
;
26 JAWS_Concurrency_Base::singleton_mb ()
28 if (this->mb_acquired_
== 0)
30 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, g
, this->lock_
, 0);
32 if (this->mb_acquired_
== 0)
35 ACE_Message_Block
*mb
= 0;
37 result
= this->getq (mb
);
38 this->mb_acquired_
= 1;
40 if (result
== -1 || mb
== 0)
51 JAWS_Concurrency_Base::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
53 return this->putq (mb
, tv
);
57 JAWS_Concurrency_Base::svc ()
59 JAWS_TRACE ("JAWS_Concurrency_Base::svc");
61 ACE_Message_Block
*mb
= 0; // The message queue element
62 JAWS_Data_Block
*db
= 0; // Contains the task list
64 mb
= this->singleton_mb ();
66 // A NULL data block indicates that the thread should shut
70 JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty message block");
74 db
= dynamic_cast<JAWS_Data_Block
*> (mb
);
82 JAWS_Concurrency_Base::svc_loop (JAWS_Data_Block
*db
)
84 JAWS_TRACE ("JAWS_Concurrency_Base::svc_loop");
86 // Thread specific message block and data block
87 ACE_DEBUG ((LM_DEBUG
, "(%t) Creating DataBlock\n"));
88 JAWS_Data_Block
*ts_db
= new JAWS_Data_Block (*db
);
91 ACE_ERROR ((LM_ERROR
, "%p\n", "JAWS_Concurrency_Base::svc_hook"));
97 if (this->svc_hook (ts_db
) != 0)
99 ts_db
->task (db
->task ());
100 ts_db
->policy (db
->policy ());
102 ts_db
->io_handler (0);
103 ts_db
->rd_ptr (ts_db
->wr_ptr ());
107 ACE_DEBUG ((LM_DEBUG
, "(%t) Deleting DataBlock\n"));
108 delete ts_db
; // ts_db->release ();
114 JAWS_Concurrency_Base::svc_hook (JAWS_Data_Block
*ts_db
)
116 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook");
120 JAWS_IO_Handler
*handler
= 0; // Keeps the state of the task
121 JAWS_Pipeline_Handler
*task
= 0; // The task itself
122 JAWS_Data_Block
*mb
= 0; // The task message block
124 task
= ts_db
->task ();
127 // Get the waiter index
128 JAWS_Waiter
*waiter
= JAWS_Waiter_Singleton::instance ();
129 int waiter_index
= waiter
->index ();
134 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, looping");
136 // Use a NULL task to make the thread recycle now
139 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, recycling");
143 JAWS_IO_Handler
**ioh
= waiter
->find_by_index (waiter_index
);
148 // the task should set the handler to the appropriate next step
149 result
= task
->put (mb
);
151 if (result
== 0 || result
== -3)
152 handler
= mb
->io_handler ();
155 if (result
== 1 || result
== 2)
157 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, waiting");
158 // need to wait for an asynchronous event
160 // We need a way to destroy all the handlers created by the
161 // Asynch_Acceptor. Figure this out later.
162 handler
= waiter
->wait_for_completion (waiter_index
);
165 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, bad proactor");
174 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, negative result");
176 ACE_ERROR ((LM_ERROR
, "%p\n", "JAWS_Concurrency_Base::svc_hook"));
184 JAWS_IO_Handler
**ioh
= waiter
->find_by_index (waiter_index
);
194 mb
= handler
->message_block ();
195 task
= handler
->task ();
204 JAWS_Concurrency_Base::activate_hook ()
209 JAWS_Dispatcher::JAWS_Dispatcher ()
215 JAWS_Dispatcher::dispatch (ACE_Message_Block
*mb
)
217 return this->policy ()->concurrency ()->put (mb
);
220 JAWS_Dispatch_Policy
*
221 JAWS_Dispatcher::policy ()
223 return this->policy_
;
226 JAWS_Dispatch_Policy
*
227 JAWS_Dispatcher::policy (JAWS_Dispatch_Policy
*p
)
230 return this->policy_
;
234 JAWS_Thread_Pool_Task::make (long flags
, int nthreads
, int maxthreads
)
236 this->flags_
= flags
;
237 this->nthreads_
= nthreads
;
238 this->maxthreads_
= maxthreads
;
240 ACE_thread_t
*thr_names
= new ACE_thread_t
[nthreads
];
242 if (this->activate (flags
| THR_SUSPENDED
,
245 ACE_DEFAULT_THREAD_PRIORITY
,
252 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "JAWS_Thread_Pool_Task::activate"),
255 for (int i
= 0; i
< nthreads
; i
++)
257 JAWS_Thread_ID
thr_id(thr_names
[i
]);
258 JAWS_IO_Handler
*dummy
= 0;
260 JAWS_Waiter_Singleton::instance ()->insert (thr_id
, dummy
);
265 this->thr_mgr_
->resume_all ();
267 if (this->reaper_
->open () == -1)
268 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "JAWS_Reaper::open"),
275 JAWS_Thread_Per_Task::make (long flags
, int maxthreads
)
277 this->flags_
= flags
;
278 this->maxthreads_
= maxthreads
;
283 JAWS_Thread_Per_Task::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
285 JAWS_TRACE ("JAWS_Thread_Per_Task::put");
288 return this->activate_hook ();
292 JAWS_Thread_Per_Task::svc_loop (JAWS_Data_Block
*db
)
294 return this->svc_hook (db
);
298 JAWS_Thread_Per_Task::activate_hook ()
300 const int force_active
= 1;
301 const int nthreads
= 1;
303 ACE_thread_t thr_name
;
305 if (this->activate (this->flags_
| THR_SUSPENDED
,
308 ACE_DEFAULT_THREAD_PRIORITY
,
315 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "JAWS_Thread_Pool_Task::activate"),
318 JAWS_Thread_ID
thr_id (thr_name
);
319 JAWS_IO_Handler
*dummy
= 0;
321 // In the thread-per-request strategy, need to take care of the
322 // case when the waiter array is full. Think about that problem
324 JAWS_Waiter_Singleton::instance ()->insert (thr_id
, dummy
);
326 this->thr_mgr_
->resume (thr_name
);
328 if (this->reaper_
->open () == -1)
329 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "JAWS_Reaper::open"),