Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / ace / Priority_Reactor.cpp
blob4036aaed213d6d624f4d05805a959ac5fd6e33e7
1 #include "ace/Priority_Reactor.h"
2 #include "ace/Malloc_T.h"
5 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
7 using QUEUE_ITERATOR = ACE_Unbounded_Queue_Iterator<ACE_Event_Tuple>;
8 // Its iterator.
10 using TUPLE_ALLOCATOR = ACE_Cached_Allocator<ACE_Node<ACE_Event_Tuple>, ACE_MT_SYNCH::NULL_MUTEX>;
11 // Defines the memory allocator used, no need for locking because it
12 // is only used in one thread of control.
14 ACE_ALLOC_HOOK_DEFINE(ACE_Priority_Reactor)
16 // Initialize ACE_Select_Reactor.
18 #define npriorities \
19 ACE_Event_Handler::HI_PRIORITY-ACE_Event_Handler::LO_PRIORITY+1
21 void
22 ACE_Priority_Reactor::init_bucket ()
24 // Allocate enough space for all the handles.
25 // TODO: This can be wrong, maybe we should use other kind of
26 // allocator here?
27 ACE_NEW (this->tuple_allocator_,
28 TUPLE_ALLOCATOR (ACE_Select_Reactor::DEFAULT_SIZE));
30 // The event handlers are assigned to a new As the Event
31 #if defined (ACE_HAS_ALLOC_HOOKS)
32 ACE_ALLOCATOR (this->bucket_,
33 static_cast<QUEUE **>(ACE_Allocator::instance()->malloc(sizeof(QUEUE *) * (npriorities))));
34 #else
35 ACE_NEW (this->bucket_,
36 QUEUE *[npriorities]);
37 #endif /* ACE_HAS_ALLOC_HOOKS */
39 // This loops "ensures" exception safety.
40 for (int i = 0; i < npriorities; ++i)
41 ACE_NEW (this->bucket_[i],
42 QUEUE (this->tuple_allocator_));
45 ACE_Priority_Reactor::ACE_Priority_Reactor (ACE_Sig_Handler *sh,
46 ACE_Timer_Queue *tq)
47 : ACE_Select_Reactor(sh, tq),
48 bucket_ (0),
49 tuple_allocator_ (0)
51 ACE_TRACE ("ACE_Priority_Reactor::ACE_Priority_Reactor");
52 this->init_bucket ();
55 ACE_Priority_Reactor::ACE_Priority_Reactor (size_t size,
56 bool restart,
57 ACE_Sig_Handler *sh,
58 ACE_Timer_Queue *tq)
59 : ACE_Select_Reactor (size, restart, sh, tq),
60 bucket_ (0),
61 tuple_allocator_ (0)
63 ACE_TRACE ("ACE_Priority_Reactor::ACE_Priority_Reactor");
64 this->init_bucket ();
67 ACE_Priority_Reactor::~ACE_Priority_Reactor ()
69 ACE_TRACE ("ACE_Priority_Reactor::~ACE_Priority_Reactor");
71 for (int i = 0; i < npriorities; ++i)
72 delete this->bucket_[i];
74 #if defined (ACE_HAS_ALLOC_HOOKS)
75 ACE_Allocator::instance()->free(this->bucket_);
76 #else
77 delete[] this->bucket_;
78 #endif /* ACE_HAS_ALLOC_HOOKS */
79 delete tuple_allocator_;
82 int
83 ACE_Priority_Reactor::build_bucket (ACE_Handle_Set &dispatch_mask,
84 int &min_priority,
85 int &max_priority)
87 ACE_Handle_Set_Iterator handle_iter (dispatch_mask);
89 for (ACE_HANDLE handle;
90 (handle = handle_iter ()) != ACE_INVALID_HANDLE;
93 ACE_Event_Handler *event_handler =
94 this->handler_rep_.find (handle);
95 if (event_handler == 0)
96 return -1;
98 ACE_Event_Tuple et (event_handler,
99 handle);
100 int prio = et.event_handler_->priority ();
102 // If the priority is out of range assign the minimum priority.
103 if (prio < ACE_Event_Handler::LO_PRIORITY
104 || prio > ACE_Event_Handler::HI_PRIORITY)
105 prio = ACE_Event_Handler::LO_PRIORITY;
107 if (bucket_[prio]->enqueue_tail (et) == -1)
108 return -1;
110 // Update the priority ranges....
111 if (min_priority > prio)
112 min_priority = prio;
113 if (max_priority < prio)
114 max_priority = prio;
117 return 0;
121 ACE_Priority_Reactor::dispatch_io_set (int number_of_active_handles,
122 int& number_dispatched,
123 int mask,
124 ACE_Handle_Set& dispatch_mask,
125 ACE_Handle_Set& ready_mask,
126 ACE_EH_PTMF callback)
128 ACE_TRACE ("ACE_Priority_Reactor::dispatch_io_set");
130 if (number_of_active_handles == 0)
131 return 0;
133 // The range for which there exists any Event_Tuple is computed on
134 // the ordering loop, minimizing iterations on the dispatching loop.
135 int min_priority =
136 ACE_Event_Handler::HI_PRIORITY;
137 int max_priority =
138 ACE_Event_Handler::LO_PRIORITY;
140 if (this->build_bucket (dispatch_mask,
141 min_priority,
142 max_priority) == -1)
143 return -1;
145 for (int i = max_priority; i >= min_priority; --i)
147 while (!bucket_[i]->is_empty ()
148 && number_dispatched < number_of_active_handles)
150 ACE_Event_Tuple et;
152 bucket_[i]->dequeue_head (et);
154 this->notify_handle (et.handle_,
155 mask,
156 ready_mask,
157 et.event_handler_,
158 callback);
159 ++number_dispatched;
161 // clear the bit from that dispatch mask,
162 // so when we need to restart the iteration (rebuilding the iterator...)
163 // we will not dispatch the already dispatched handlers
164 this->clear_dispatch_mask (et.handle_,
165 mask);
167 if (this->state_changed_)
168 this->state_changed_ = false; // so it will not rebuild it ...
171 // Even if we are aborting the loop due to this->state_changed
172 // or another error we still want to cleanup the buckets.
173 bucket_[i]->reset ();
176 return 0;
179 void
180 ACE_Priority_Reactor::dump () const
182 #if defined (ACE_HAS_DUMP)
183 ACE_TRACE ("ACE_Priority_Reactor::dump");
185 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
187 ACE_Select_Reactor::dump ();
189 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
190 #endif /* ACE_HAS_DUMP */
193 ACE_END_VERSIONED_NAMESPACE_DECL