Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / Kokyu / Dispatcher_Task.cpp
blob31e1ba85044432785e1248ccbe83b6190c3fe562
1 #include "Dispatcher_Task.h"
3 #include "ace/Malloc_T.h"
4 #include "ace/OS_NS_errno.h"
6 #if ! defined (__ACE_INLINE__)
7 #include "Dispatcher_Task.inl"
8 #endif /* __ACE_INLINE__ */
10 namespace
11 //anonymous namespace - use this to avoid polluting the global namespace
13 const int ALLOC_POOL_CHUNKS = 200;
16 namespace Kokyu
18 typedef ACE_Cached_Allocator<Dispatch_Queue_Item, ACE_SYNCH_MUTEX>
19 Dispatch_Queue_Item_Allocator;
21 int
22 Dispatcher_Task::initialize ()
24 switch(curr_config_info_.dispatching_type_)
26 case FIFO_DISPATCHING:
27 ACE_NEW_RETURN (
28 this->the_queue_,
29 ACE_Message_Queue<ACE_SYNCH>,
30 -1);
31 break;
33 case DEADLINE_DISPATCHING:
34 ACE_NEW_RETURN (
35 this->the_queue_,
36 ACE_Dynamic_Message_Queue<ACE_SYNCH> (deadline_msg_strategy_),
37 -1);
38 break;
40 case LAXITY_DISPATCHING:
41 ACE_NEW_RETURN (
42 this->the_queue_,
43 ACE_Dynamic_Message_Queue<ACE_SYNCH> (laxity_msg_strategy_),
44 -1);
45 break;
47 default:
48 return -1;
51 if (this->the_queue_ != 0)
53 this->msg_queue(this->the_queue_);
56 if (this->allocator_ == 0)
58 ACE_NEW_RETURN (this->allocator_,
59 Dispatch_Queue_Item_Allocator(ALLOC_POOL_CHUNKS),
60 -1);
61 own_allocator_ = 1;
64 return 0;
67 int
68 Dispatcher_Task::svc ()
70 int done = 0;
72 ACE_hthread_t thr_handle;
73 ACE_Thread::self (thr_handle);
74 int prio;
76 if (ACE_Thread::getprio (thr_handle, prio) == -1)
78 if (errno == ENOTSUP)
80 ACE_DEBUG((LM_DEBUG,
81 ACE_TEXT ("getprio not supported on this platform\n")
82 ));
83 return 0;
85 ACE_ERROR_RETURN ((LM_ERROR,
86 ACE_TEXT ("%p\n"),
87 ACE_TEXT ("getprio failed")),
88 -1);
91 //ACE_DEBUG ((LM_DEBUG, "(%t) Dispatcher Thread started prio=%d\n", prio));
93 while (!done)
95 ACE_Message_Block *mb = 0;
97 if (this->getq (mb) == -1)
99 if (ACE_OS::last_error () == ESHUTDOWN)
101 return 0;
103 else
105 ACE_ERROR ((LM_ERROR,
106 "EC (%P|%t) getq error in Dispatching Queue\n"));
110 //ACE_DEBUG ((LM_DEBUG, "(%t) : next command got from queue\n"));
112 Dispatch_Queue_Item *qitem =
113 dynamic_cast<Dispatch_Queue_Item*> (mb);
115 if (qitem == 0)
117 ACE_Message_Block::release (mb);
118 continue;
121 Dispatch_Command* command = qitem->command ();
123 ACE_ASSERT(command != 0);
124 int result = command->execute ();
126 if (command->can_be_deleted ())
127 command->destroy ();
129 ACE_Message_Block::release (mb);
131 if (result == -1)
132 done = 1;
134 return 0;
138 Dispatcher_Task::enqueue (const Dispatch_Command* cmd,
139 const QoSDescriptor& qos_info)
141 void* buf = this->allocator_->malloc (sizeof (Dispatch_Queue_Item));
143 if (buf == 0)
144 return -1;
146 ACE_Message_Block *mb =
147 new (buf) Dispatch_Queue_Item (cmd,
148 qos_info,
149 &(this->data_block_),
150 ACE_Message_Block::DONT_DELETE,
151 this->allocator_);
153 this->putq (mb);
155 return 0;
158 int Dispatcher_Task::get_native_prio ()
160 ACE_hthread_t thr_handle;
161 ACE_Thread::self (thr_handle);
162 int prio;
164 if (ACE_Thread::getprio (thr_handle, prio) == -1)
166 if (errno == ENOTSUP)
168 ACE_DEBUG((LM_DEBUG,
169 ACE_TEXT ("getprior not supported on this platform\n")
171 return 0;
173 ACE_ERROR_RETURN ((LM_ERROR,
174 ACE_TEXT ("%p\n"),
175 ACE_TEXT ("getprio failed")),
176 -1);
179 return prio;
182 void Dispatch_Queue_Item::init_i (const QoSDescriptor& qos_info)
184 this->msg_priority (qos_info.preemption_priority_);
185 this->msg_execution_time (qos_info.execution_time_);
186 this->msg_deadline_time (qos_info.deadline_);