1 #include "JAWS/Concurrency.h"
2 #include "JAWS/IO_Handler.h"
3 #include "JAWS/Pipeline.h"
4 #include "JAWS/Pipeline_Tasks.h"
5 #include "JAWS/Policy.h"
6 #include "JAWS/Data_Block.h"
7 #include "JAWS/Waiter.h"
8 #include "JAWS/Reaper.h"
12 JAWS_Concurrency_Base::JAWS_Concurrency_Base (void)
13 : ACE_Task
<ACE_SYNCH
> (new ACE_Thread_Manager
),
16 reaper_ (new JAWS_Reaper (this))
20 JAWS_Concurrency_Base::~JAWS_Concurrency_Base (void)
22 delete this->thr_mgr_
;
27 JAWS_Concurrency_Base::singleton_mb (void)
29 if (this->mb_acquired_
== 0)
31 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, g
, this->lock_
, 0);
33 if (this->mb_acquired_
== 0)
36 ACE_Message_Block
*mb
= 0;
38 result
= this->getq (mb
);
39 this->mb_acquired_
= 1;
41 if (result
== -1 || mb
== 0)
52 JAWS_Concurrency_Base::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
54 return this->putq (mb
, tv
);
58 JAWS_Concurrency_Base::svc (void)
60 JAWS_TRACE ("JAWS_Concurrency_Base::svc");
62 ACE_Message_Block
*mb
= 0; // The message queue element
63 JAWS_Data_Block
*db
= 0; // Contains the task list
65 mb
= this->singleton_mb ();
67 // A NULL data block indicates that the thread should shut
71 JAWS_TRACE ("JAWS_Concurrency_Base::svc, empty message block");
75 db
= dynamic_cast<JAWS_Data_Block
*> (mb
);
83 JAWS_Concurrency_Base::svc_loop (JAWS_Data_Block
*db
)
85 JAWS_TRACE ("JAWS_Concurrency_Base::svc_loop");
87 // Thread specific message block and data block
88 ACE_DEBUG ((LM_DEBUG
, "(%t) Creating DataBlock\n"));
89 JAWS_Data_Block
*ts_db
= new JAWS_Data_Block (*db
);
92 ACE_ERROR ((LM_ERROR
, "%p\n", "JAWS_Concurrency_Base::svc_hook"));
98 if (this->svc_hook (ts_db
) != 0)
100 ts_db
->task (db
->task ());
101 ts_db
->policy (db
->policy ());
103 ts_db
->io_handler (0);
104 ts_db
->rd_ptr (ts_db
->wr_ptr ());
108 ACE_DEBUG ((LM_DEBUG
, "(%t) Deleting DataBlock\n"));
109 delete ts_db
; // ts_db->release ();
115 JAWS_Concurrency_Base::svc_hook (JAWS_Data_Block
*ts_db
)
117 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook");
121 JAWS_IO_Handler
*handler
= 0; // Keeps the state of the task
122 JAWS_Pipeline_Handler
*task
= 0; // The task itself
123 JAWS_Data_Block
*mb
= 0; // The task message block
125 task
= ts_db
->task ();
128 // Get the waiter index
129 JAWS_Waiter
*waiter
= JAWS_Waiter_Singleton::instance ();
130 int waiter_index
= waiter
->index ();
135 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, looping");
137 // Use a NULL task to make the thread recycle now
140 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, recycling");
144 JAWS_IO_Handler
**ioh
= waiter
->find_by_index (waiter_index
);
149 // the task should set the handler to the appropriate next step
150 result
= task
->put (mb
);
152 if (result
== 0 || result
== -3)
153 handler
= mb
->io_handler ();
156 if (result
== 1 || result
== 2)
158 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, waiting");
159 // need to wait for an asynchronous event
161 // We need a way to destroy all the handlers created by the
162 // Asynch_Acceptor. Figure this out later.
163 handler
= waiter
->wait_for_completion (waiter_index
);
166 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, bad proactor");
175 JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, negative result");
177 ACE_ERROR ((LM_ERROR
, "%p\n", "JAWS_Concurrency_Base::svc_hook"));
185 JAWS_IO_Handler
**ioh
= waiter
->find_by_index (waiter_index
);
195 mb
= handler
->message_block ();
196 task
= handler
->task ();
205 JAWS_Concurrency_Base::activate_hook (void)
210 JAWS_Dispatcher::JAWS_Dispatcher (void)
216 JAWS_Dispatcher::dispatch (ACE_Message_Block
*mb
)
218 return this->policy ()->concurrency ()->put (mb
);
221 JAWS_Dispatch_Policy
*
222 JAWS_Dispatcher::policy (void)
224 return this->policy_
;
227 JAWS_Dispatch_Policy
*
228 JAWS_Dispatcher::policy (JAWS_Dispatch_Policy
*p
)
231 return this->policy_
;
235 JAWS_Thread_Pool_Task::make (long flags
, int nthreads
, int maxthreads
)
237 this->flags_
= flags
;
238 this->nthreads_
= nthreads
;
239 this->maxthreads_
= maxthreads
;
241 ACE_thread_t
*thr_names
= new ACE_thread_t
[nthreads
];
243 if (this->activate (flags
| THR_SUSPENDED
,
246 ACE_DEFAULT_THREAD_PRIORITY
,
253 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "JAWS_Thread_Pool_Task::activate"),
256 for (int i
= 0; i
< nthreads
; i
++)
258 JAWS_Thread_ID
thr_id(thr_names
[i
]);
259 JAWS_IO_Handler
*dummy
= 0;
261 JAWS_Waiter_Singleton::instance ()->insert (thr_id
, dummy
);
266 this->thr_mgr_
->resume_all ();
268 if (this->reaper_
->open () == -1)
269 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "JAWS_Reaper::open"),
276 JAWS_Thread_Per_Task::make (long flags
, int maxthreads
)
278 this->flags_
= flags
;
279 this->maxthreads_
= maxthreads
;
284 JAWS_Thread_Per_Task::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
286 JAWS_TRACE ("JAWS_Thread_Per_Task::put");
289 return this->activate_hook ();
293 JAWS_Thread_Per_Task::svc_loop (JAWS_Data_Block
*db
)
295 return this->svc_hook (db
);
299 JAWS_Thread_Per_Task::activate_hook (void)
301 const int force_active
= 1;
302 const int nthreads
= 1;
304 ACE_thread_t thr_name
;
306 if (this->activate (this->flags_
| THR_SUSPENDED
,
309 ACE_DEFAULT_THREAD_PRIORITY
,
316 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "JAWS_Thread_Pool_Task::activate"),
319 JAWS_Thread_ID
thr_id (thr_name
);
320 JAWS_IO_Handler
*dummy
= 0;
322 // In the thread-per-request strategy, need to take care of the
323 // case when the waiter array is full. Think about that problem
325 JAWS_Waiter_Singleton::instance ()->insert (thr_id
, dummy
);
327 this->thr_mgr_
->resume (thr_name
);
329 if (this->reaper_
->open () == -1)
330 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "JAWS_Reaper::open"),