Upstream tarball 9882
[amule.git] / src / ThreadScheduler.cpp
blobdade37101271cb7d9e427e86a59e3ad912e4c871
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2006-2008 Mikkel Schubert ( xaignar@amule.org / http://www.amule.org )
5 //
6 // Any parts of this program derived from the xMule, lMule or eMule project,
7 // or contributed by third-party developers are copyrighted by their
8 // respective authors.
9 //
10 // This program is free software; you can redistribute it and/or modify
11 // it under the terms of the GNU General Public License as published by
12 // the Free Software Foundation; either version 2 of the License, or
13 // (at your option) any later version.
15 // This program is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU General Public License for more details.
19 //
20 // You should have received a copy of the GNU General Public License
21 // along with this program; if not, write to the Free Software
22 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
25 #include "ThreadScheduler.h" // Interface declarations
26 #include "Logger.h" // Needed for Add(Debug)LogLineM
27 #include <common/Format.h> // Needed for CFormat
28 #include "ScopedPtr.h" // Needed for CScopedPtr
30 #include <algorithm> // Needed for std::sort // Do_not_auto_remove (mingw-gcc-3.4.5)
32 //! Global lock the scheduler and its thread.
33 static wxMutex s_lock;
34 //! Pointer to the global scheduler instance (automatically instantiated).
35 static CThreadScheduler* s_scheduler = NULL;
36 //! Specifies if the scheduler is running.
37 static bool s_running = false;
38 //! Specifies if the gobal scheduler has been terminated.
39 static bool s_terminated = false;
41 /**
42 * This class is used in a custom implementation of wxThreadHelper.
44 * The reason for not using wxThreadHelper are as follows:
45 * - wxThreadHelper makes use of wxThread:Kill, which is warned against
46 * serveral times in the docs, and even calls it in its destructor.
47 * - Managing the thread-object is difficult, since the only way to
48 * destroy it is to create a new thread.
50 class CTaskThread : public CMuleThread
52 public:
53 CTaskThread(CThreadScheduler* owner)
54 : CMuleThread(wxTHREAD_JOINABLE),
55 m_owner(owner)
59 //! For simplicity's sake, all code is placed in CThreadScheduler::Entry
60 void* Entry() {
61 return m_owner->Entry();
64 private:
65 //! The scheduler owning this thread.
66 CThreadScheduler* m_owner;
70 void CThreadScheduler::Start()
72 wxMutexLocker lock(s_lock);
74 s_running = true;
75 s_terminated = false;
77 // Ensures that a thread is started if tasks are already waiting.
78 if (s_scheduler) {
79 AddDebugLogLineM(false, logThreads, wxT("Starting scheduler"));
80 s_scheduler->CreateSchedulerThread();
85 void CThreadScheduler::Terminate()
87 AddDebugLogLineM(false, logThreads, wxT("Terminating scheduler"));
88 CThreadScheduler* ptr = NULL;
91 wxMutexLocker lock(s_lock);
93 // Safely unlink the scheduler, as to avoid race-conditions.
94 ptr = s_scheduler;
95 s_running = false;
96 s_terminated = true;
97 s_scheduler = NULL;
100 delete ptr;
101 AddDebugLogLineM(false, logThreads, wxT("Scheduler terminated"));
105 bool CThreadScheduler::AddTask(CThreadTask* task, bool overwrite)
107 wxMutexLocker lock(s_lock);
109 // When terminated (on shutdown), all tasks are ignored.
110 if (s_terminated) {
111 AddDebugLogLineM(false, logThreads, wxT("Task discarded: ") + task->GetDesc());
112 delete task;
113 return false;
114 } else if (s_scheduler == NULL) {
115 s_scheduler = new CThreadScheduler();
116 AddDebugLogLineM(false, logThreads, wxT("Scheduler created."));
119 return s_scheduler->DoAddTask(task, overwrite);
123 /** Returns string representation of error code. */
124 wxString GetErrMsg(wxThreadError err)
126 switch (err) {
127 case wxTHREAD_NO_ERROR: return wxT("wxTHREAD_NO_ERROR");
128 case wxTHREAD_NO_RESOURCE: return wxT("wxTHREAD_NO_RESOURCE");
129 case wxTHREAD_RUNNING: return wxT("wxTHREAD_RUNNING");
130 case wxTHREAD_NOT_RUNNING: return wxT("wxTHREAD_NOT_RUNNING");
131 case wxTHREAD_KILLED: return wxT("wxTHREAD_KILLED");
132 case wxTHREAD_MISC_ERROR: return wxT("wxTHREAD_MISC_ERROR");
133 default:
134 return wxT("Unknown error");
139 void CThreadScheduler::CreateSchedulerThread()
141 if ((m_thread && m_thread->IsAlive()) || m_tasks.empty()) {
142 return;
145 // A thread can only be run once, so the old one must be safely disposed of
146 if (m_thread) {
147 AddDebugLogLineM(false, logThreads, wxT("CreateSchedulerThread: Disposing of old thread."));
148 m_thread->Stop();
149 delete m_thread;
152 m_thread = new CTaskThread(this);
154 wxThreadError err = m_thread->Create();
155 if (err == wxTHREAD_NO_ERROR) {
156 // Try to avoid reducing the latency of the main thread
157 m_thread->SetPriority(WXTHREAD_MIN_PRIORITY);
159 err = m_thread->Run();
160 if (err == wxTHREAD_NO_ERROR) {
161 AddDebugLogLineM(false, logThreads, wxT("Scheduler thread started"));
162 return;
163 } else {
164 AddDebugLogLineM(true, logThreads, wxT("Error while starting scheduler thread: ") + GetErrMsg(err));
166 } else {
167 AddDebugLogLineM(true, logThreads, wxT("Error while creating scheduler thread: ") + GetErrMsg(err));
170 // Creation or running failed.
171 m_thread->Stop();
172 delete m_thread;
173 m_thread = NULL;
177 /** This is the sorter functor for the task-queue. */
178 struct CTaskSorter
180 bool operator()(const CThreadScheduler::CEntryPair& a, const CThreadScheduler::CEntryPair& b) {
181 if (a.first->GetPriority() != b.first->GetPriority()) {
182 return a.first->GetPriority() > b.first->GetPriority();
185 // Compare tasks numbers.
186 return a.second < b.second;
192 CThreadScheduler::CThreadScheduler()
193 : m_tasksDirty(false),
194 m_thread(NULL),
195 m_currentTask(NULL)
201 CThreadScheduler::~CThreadScheduler()
203 if (m_thread) {
204 m_thread->Stop();
205 delete m_thread;
210 size_t CThreadScheduler::GetTaskCount() const
212 wxMutexLocker lock(s_lock);
214 return m_tasks.size();
218 bool CThreadScheduler::DoAddTask(CThreadTask* task, bool overwrite)
220 // GetTick is too lowres, so we just use a counter to ensure that
221 // the sorted order will match the order in which the tasks were added.
222 static unsigned taskAge = 0;
224 // Get the map for this task type, implicitly creating it as needed.
225 CDescMap& map = m_taskDescs[task->GetType()];
227 CDescMap::value_type entry(task->GetDesc(), task);
228 if (map.insert(entry).second) {
229 AddDebugLogLineM(false, logThreads, wxT("Task scheduled: ") + task->GetType() + wxT(" - ") + task->GetDesc());
230 m_tasks.push_back(CEntryPair(task, taskAge++));
231 m_tasksDirty = true;
232 } else if (overwrite) {
233 AddDebugLogLineM(false, logThreads, wxT("Task overwritten: ") + task->GetType() + wxT(" - ") + task->GetDesc());
235 CThreadTask* existingTask = map[task->GetDesc()];
236 if (m_currentTask == existingTask) {
237 // The duplicate is already being executed, abort it.
238 m_currentTask->m_abort = true;
239 } else {
240 // Task not yet started, simply remove and delete.
241 wxCHECK2(map.erase(existingTask->GetDesc()), /* Do nothing. */);
242 delete existingTask;
245 m_tasks.push_back(CEntryPair(task, taskAge++));
246 map[task->GetDesc()] = task;
247 m_tasksDirty = true;
248 } else {
249 AddDebugLogLineM(false, logThreads, wxT("Duplicate task, discarding: ") + task->GetType() + wxT(" - ") + task->GetDesc());
250 delete task;
251 return false;
254 if (s_running) {
255 CreateSchedulerThread();
258 return true;
262 void* CThreadScheduler::Entry()
264 AddDebugLogLineM(false, logThreads, wxT("Entering scheduling loop"));
266 while (!m_thread->TestDestroy()) {
267 CScopedPtr<CThreadTask> task(NULL);
270 wxMutexLocker lock(s_lock);
272 // Resort tasks by priority/age if list has been modified.
273 if (m_tasksDirty) {
274 AddDebugLogLineM(false, logThreads, wxT("Resorting tasks"));
275 std::sort(m_tasks.begin(), m_tasks.end(), CTaskSorter());
276 m_tasksDirty = false;
277 } else if (m_tasks.empty()) {
278 AddDebugLogLineM(false, logThreads, wxT("No more tasks, stopping"));
279 break;
282 // Select the next task
283 task.reset(m_tasks.front().first);
284 m_tasks.pop_front();
285 m_currentTask = task.get();
288 AddDebugLogLineM(false, logThreads, wxT("Current task: ") + task->GetType() + wxT(" - ") + task->GetDesc());
289 // Execute the task
290 task->m_owner = m_thread;
291 task->Entry();
292 task->OnExit();
294 // Check if this was the last task of this type
295 bool isLastTask = false;
298 wxMutexLocker lock(s_lock);
300 // If the task has been aborted, the entry now refers to
301 // a different task, so dont remove it. That also means
302 // that it cant be the last task of this type.
303 if (!task->m_abort) {
304 AddDebugLogLineM(false, logThreads,
305 CFormat(wxT("Completed task '%s%s', %u tasks remaining."))
306 % task->GetType()
307 % (task->GetDesc().IsEmpty() ? wxString() : (wxT(" - ") + task->GetDesc()))
308 % m_tasks.size() );
310 CDescMap& map = m_taskDescs[task->GetType()];
311 if (!map.erase(task->GetDesc())) {
312 wxFAIL;
313 } else if (map.empty()) {
314 m_taskDescs.erase(task->GetType());
315 isLastTask = true;
319 m_currentTask = NULL;
322 if (isLastTask) {
323 // Allow the task to signal that all sub-tasks have been completed
324 AddDebugLogLineM(false, logThreads, wxT("Last task, calling OnLastTask"));
325 task->OnLastTask();
329 AddDebugLogLineM(false, logThreads, wxT("Leaving scheduling loop"));
331 return 0;
336 CThreadTask::CThreadTask(const wxString& type, const wxString& desc, ETaskPriority priority)
337 : m_type(type),
338 m_desc(desc),
339 m_priority(priority),
340 m_owner(NULL),
341 m_abort(false)
346 CThreadTask::~CThreadTask()
351 void CThreadTask::OnLastTask()
353 // Does nothing by default.
357 void CThreadTask::OnExit()
359 // Does nothing by default.
363 bool CThreadTask::TestDestroy() const
365 wxCHECK(m_owner, m_abort);
367 return m_abort || m_owner->TestDestroy();
371 const wxString& CThreadTask::GetType() const
373 return m_type;
377 const wxString& CThreadTask::GetDesc() const
379 return m_desc;
383 ETaskPriority CThreadTask::GetPriority() const
385 return m_priority;
389 // File_checked_for_headers