2 // This file is part of the aMule Project.
4 // Copyright (c) 2006-2008 Mikkel Schubert ( xaignar@amule.org / http://www.amule.org )
6 // Any parts of this program derived from the xMule, lMule or eMule project,
7 // or contributed by third-party developers are copyrighted by their
10 // This program is free software; you can redistribute it and/or modify
11 // it under the terms of the GNU General Public License as published by
12 // the Free Software Foundation; either version 2 of the License, or
13 // (at your option) any later version.
15 // This program is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU General Public License for more details.
20 // You should have received a copy of the GNU General Public License
21 // along with this program; if not, write to the Free Software
22 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
25 #include "ThreadScheduler.h" // Interface declarations
26 #include "Logger.h" // Needed for Add(Debug)LogLineM
27 #include <common/Format.h> // Needed for CFormat
28 #include "ScopedPtr.h" // Needed for CScopedPtr
30 #include <algorithm> // Needed for std::sort // Do_not_auto_remove (mingw-gcc-3.4.5)
32 //! Global lock the scheduler and its thread.
33 static wxMutex s_lock
;
34 //! Pointer to the global scheduler instance (automatically instantiated).
35 static CThreadScheduler
* s_scheduler
= NULL
;
36 //! Specifies if the scheduler is running.
37 static bool s_running
= false;
38 //! Specifies if the gobal scheduler has been terminated.
39 static bool s_terminated
= false;
42 * This class is used in a custom implementation of wxThreadHelper.
44 * The reason for not using wxThreadHelper are as follows:
45 * - wxThreadHelper makes use of wxThread:Kill, which is warned against
46 * serveral times in the docs, and even calls it in its destructor.
47 * - Managing the thread-object is difficult, since the only way to
48 * destroy it is to create a new thread.
50 class CTaskThread
: public CMuleThread
53 CTaskThread(CThreadScheduler
* owner
)
54 : CMuleThread(wxTHREAD_JOINABLE
),
59 //! For simplicity's sake, all code is placed in CThreadScheduler::Entry
61 return m_owner
->Entry();
65 //! The scheduler owning this thread.
66 CThreadScheduler
* m_owner
;
70 void CThreadScheduler::Start()
72 wxMutexLocker
lock(s_lock
);
77 // Ensures that a thread is started if tasks are already waiting.
79 AddDebugLogLineM(false, logThreads
, wxT("Starting scheduler"));
80 s_scheduler
->CreateSchedulerThread();
85 void CThreadScheduler::Terminate()
87 AddDebugLogLineM(false, logThreads
, wxT("Terminating scheduler"));
88 CThreadScheduler
* ptr
= NULL
;
91 wxMutexLocker
lock(s_lock
);
93 // Safely unlink the scheduler, as to avoid race-conditions.
101 AddDebugLogLineM(false, logThreads
, wxT("Scheduler terminated"));
105 bool CThreadScheduler::AddTask(CThreadTask
* task
, bool overwrite
)
107 wxMutexLocker
lock(s_lock
);
109 // When terminated (on shutdown), all tasks are ignored.
111 AddDebugLogLineM(false, logThreads
, wxT("Task discarded: ") + task
->GetDesc());
114 } else if (s_scheduler
== NULL
) {
115 s_scheduler
= new CThreadScheduler();
116 AddDebugLogLineM(false, logThreads
, wxT("Scheduler created."));
119 return s_scheduler
->DoAddTask(task
, overwrite
);
123 /** Returns string representation of error code. */
124 wxString
GetErrMsg(wxThreadError err
)
127 case wxTHREAD_NO_ERROR
: return wxT("wxTHREAD_NO_ERROR");
128 case wxTHREAD_NO_RESOURCE
: return wxT("wxTHREAD_NO_RESOURCE");
129 case wxTHREAD_RUNNING
: return wxT("wxTHREAD_RUNNING");
130 case wxTHREAD_NOT_RUNNING
: return wxT("wxTHREAD_NOT_RUNNING");
131 case wxTHREAD_KILLED
: return wxT("wxTHREAD_KILLED");
132 case wxTHREAD_MISC_ERROR
: return wxT("wxTHREAD_MISC_ERROR");
134 return wxT("Unknown error");
139 void CThreadScheduler::CreateSchedulerThread()
141 if ((m_thread
&& m_thread
->IsAlive()) || m_tasks
.empty()) {
145 // A thread can only be run once, so the old one must be safely disposed of
147 AddDebugLogLineM(false, logThreads
, wxT("CreateSchedulerThread: Disposing of old thread."));
152 m_thread
= new CTaskThread(this);
154 wxThreadError err
= m_thread
->Create();
155 if (err
== wxTHREAD_NO_ERROR
) {
156 // Try to avoid reducing the latency of the main thread
157 m_thread
->SetPriority(WXTHREAD_MIN_PRIORITY
);
159 err
= m_thread
->Run();
160 if (err
== wxTHREAD_NO_ERROR
) {
161 AddDebugLogLineM(false, logThreads
, wxT("Scheduler thread started"));
164 AddDebugLogLineM(true, logThreads
, wxT("Error while starting scheduler thread: ") + GetErrMsg(err
));
167 AddDebugLogLineM(true, logThreads
, wxT("Error while creating scheduler thread: ") + GetErrMsg(err
));
170 // Creation or running failed.
177 /** This is the sorter functor for the task-queue. */
180 bool operator()(const CThreadScheduler::CEntryPair
& a
, const CThreadScheduler::CEntryPair
& b
) {
181 if (a
.first
->GetPriority() != b
.first
->GetPriority()) {
182 return a
.first
->GetPriority() > b
.first
->GetPriority();
185 // Compare tasks numbers.
186 return a
.second
< b
.second
;
192 CThreadScheduler::CThreadScheduler()
193 : m_tasksDirty(false),
201 CThreadScheduler::~CThreadScheduler()
210 size_t CThreadScheduler::GetTaskCount() const
212 wxMutexLocker
lock(s_lock
);
214 return m_tasks
.size();
218 bool CThreadScheduler::DoAddTask(CThreadTask
* task
, bool overwrite
)
220 // GetTick is too lowres, so we just use a counter to ensure that
221 // the sorted order will match the order in which the tasks were added.
222 static unsigned taskAge
= 0;
224 // Get the map for this task type, implicitly creating it as needed.
225 CDescMap
& map
= m_taskDescs
[task
->GetType()];
227 CDescMap::value_type
entry(task
->GetDesc(), task
);
228 if (map
.insert(entry
).second
) {
229 AddDebugLogLineM(false, logThreads
, wxT("Task scheduled: ") + task
->GetType() + wxT(" - ") + task
->GetDesc());
230 m_tasks
.push_back(CEntryPair(task
, taskAge
++));
232 } else if (overwrite
) {
233 AddDebugLogLineM(false, logThreads
, wxT("Task overwritten: ") + task
->GetType() + wxT(" - ") + task
->GetDesc());
235 CThreadTask
* existingTask
= map
[task
->GetDesc()];
236 if (m_currentTask
== existingTask
) {
237 // The duplicate is already being executed, abort it.
238 m_currentTask
->m_abort
= true;
240 // Task not yet started, simply remove and delete.
241 wxCHECK2(map
.erase(existingTask
->GetDesc()), /* Do nothing. */);
245 m_tasks
.push_back(CEntryPair(task
, taskAge
++));
246 map
[task
->GetDesc()] = task
;
249 AddDebugLogLineM(false, logThreads
, wxT("Duplicate task, discarding: ") + task
->GetType() + wxT(" - ") + task
->GetDesc());
255 CreateSchedulerThread();
262 void* CThreadScheduler::Entry()
264 AddDebugLogLineM(false, logThreads
, wxT("Entering scheduling loop"));
266 while (!m_thread
->TestDestroy()) {
267 CScopedPtr
<CThreadTask
> task(NULL
);
270 wxMutexLocker
lock(s_lock
);
272 // Resort tasks by priority/age if list has been modified.
274 AddDebugLogLineM(false, logThreads
, wxT("Resorting tasks"));
275 std::sort(m_tasks
.begin(), m_tasks
.end(), CTaskSorter());
276 m_tasksDirty
= false;
277 } else if (m_tasks
.empty()) {
278 AddDebugLogLineM(false, logThreads
, wxT("No more tasks, stopping"));
282 // Select the next task
283 task
.reset(m_tasks
.front().first
);
285 m_currentTask
= task
.get();
288 AddDebugLogLineM(false, logThreads
, wxT("Current task: ") + task
->GetType() + wxT(" - ") + task
->GetDesc());
290 task
->m_owner
= m_thread
;
294 // Check if this was the last task of this type
295 bool isLastTask
= false;
298 wxMutexLocker
lock(s_lock
);
300 // If the task has been aborted, the entry now refers to
301 // a different task, so dont remove it. That also means
302 // that it cant be the last task of this type.
303 if (!task
->m_abort
) {
304 AddDebugLogLineM(false, logThreads
,
305 CFormat(wxT("Completed task '%s%s', %u tasks remaining."))
307 % (task
->GetDesc().IsEmpty() ? wxString() : (wxT(" - ") + task
->GetDesc()))
310 CDescMap
& map
= m_taskDescs
[task
->GetType()];
311 if (!map
.erase(task
->GetDesc())) {
313 } else if (map
.empty()) {
314 m_taskDescs
.erase(task
->GetType());
319 m_currentTask
= NULL
;
323 // Allow the task to signal that all sub-tasks have been completed
324 AddDebugLogLineM(false, logThreads
, wxT("Last task, calling OnLastTask"));
329 AddDebugLogLineM(false, logThreads
, wxT("Leaving scheduling loop"));
336 CThreadTask::CThreadTask(const wxString
& type
, const wxString
& desc
, ETaskPriority priority
)
339 m_priority(priority
),
346 CThreadTask::~CThreadTask()
351 void CThreadTask::OnLastTask()
353 // Does nothing by default.
357 void CThreadTask::OnExit()
359 // Does nothing by default.
363 bool CThreadTask::TestDestroy() const
365 wxCHECK(m_owner
, m_abort
);
367 return m_abort
|| m_owner
->TestDestroy();
371 const wxString
& CThreadTask::GetType() const
377 const wxString
& CThreadTask::GetDesc() const
383 ETaskPriority
CThreadTask::GetPriority() const
389 // File_checked_for_headers