1 #include "Dispatcher_Task.h"
3 #include "ace/Malloc_T.h"
4 #include "ace/OS_NS_errno.h"
6 #if ! defined (__ACE_INLINE__)
7 #include "Dispatcher_Task.inl"
8 #endif /* __ACE_INLINE__ */
11 //anonymous namespace - use this to avoid polluting the global namespace
13 const int ALLOC_POOL_CHUNKS
= 200;
18 typedef ACE_Cached_Allocator
<Dispatch_Queue_Item
, ACE_SYNCH_MUTEX
>
19 Dispatch_Queue_Item_Allocator
;
22 Dispatcher_Task::initialize ()
24 switch(curr_config_info_
.dispatching_type_
)
26 case FIFO_DISPATCHING
:
29 ACE_Message_Queue
<ACE_SYNCH
>,
33 case DEADLINE_DISPATCHING
:
36 ACE_Dynamic_Message_Queue
<ACE_SYNCH
> (deadline_msg_strategy_
),
40 case LAXITY_DISPATCHING
:
43 ACE_Dynamic_Message_Queue
<ACE_SYNCH
> (laxity_msg_strategy_
),
51 if (this->the_queue_
!= 0)
53 this->msg_queue(this->the_queue_
);
56 if (this->allocator_
== 0)
58 ACE_NEW_RETURN (this->allocator_
,
59 Dispatch_Queue_Item_Allocator(ALLOC_POOL_CHUNKS
),
68 Dispatcher_Task::svc ()
72 ACE_hthread_t thr_handle
;
73 ACE_Thread::self (thr_handle
);
76 if (ACE_Thread::getprio (thr_handle
, prio
) == -1)
81 ACE_TEXT ("getprio not supported on this platform\n")
85 ACE_ERROR_RETURN ((LM_ERROR
,
87 ACE_TEXT ("getprio failed")),
91 //ACE_DEBUG ((LM_DEBUG, "(%t) Dispatcher Thread started prio=%d\n", prio));
95 ACE_Message_Block
*mb
= 0;
97 if (this->getq (mb
) == -1)
99 if (ACE_OS::last_error () == ESHUTDOWN
)
105 ACE_ERROR ((LM_ERROR
,
106 "EC (%P|%t) getq error in Dispatching Queue\n"));
110 //ACE_DEBUG ((LM_DEBUG, "(%t) : next command got from queue\n"));
112 Dispatch_Queue_Item
*qitem
=
113 dynamic_cast<Dispatch_Queue_Item
*> (mb
);
117 ACE_Message_Block::release (mb
);
121 Dispatch_Command
* command
= qitem
->command ();
123 ACE_ASSERT(command
!= 0);
124 int result
= command
->execute ();
126 if (command
->can_be_deleted ())
129 ACE_Message_Block::release (mb
);
138 Dispatcher_Task::enqueue (const Dispatch_Command
* cmd
,
139 const QoSDescriptor
& qos_info
)
141 void* buf
= this->allocator_
->malloc (sizeof (Dispatch_Queue_Item
));
146 ACE_Message_Block
*mb
=
147 new (buf
) Dispatch_Queue_Item (cmd
,
149 &(this->data_block_
),
150 ACE_Message_Block::DONT_DELETE
,
158 int Dispatcher_Task::get_native_prio ()
160 ACE_hthread_t thr_handle
;
161 ACE_Thread::self (thr_handle
);
164 if (ACE_Thread::getprio (thr_handle
, prio
) == -1)
166 if (errno
== ENOTSUP
)
169 ACE_TEXT ("getprior not supported on this platform\n")
173 ACE_ERROR_RETURN ((LM_ERROR
,
175 ACE_TEXT ("getprio failed")),
182 void Dispatch_Queue_Item::init_i (const QoSDescriptor
& qos_info
)
184 this->msg_priority (qos_info
.preemption_priority_
);
185 this->msg_execution_time (qos_info
.execution_time_
);
186 this->msg_deadline_time (qos_info
.deadline_
);