changed: update version strings for beta4
[xbmc.git] / xbmc / utils / JobManager.cpp
blobc12df7649988ca2ed3c3ff7666f637bd1097686b
1 /*
2 * Copyright (C) 2005-2008 Team XBMC
3 * http://www.xbmc.org
5 * This Program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2, or (at your option)
8 * any later version.
10 * This Program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with XBMC; see the file COPYING. If not, write to
17 * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
18 * http://www.gnu.org/copyleft/gpl.html
22 #include "JobManager.h"
23 #include <algorithm>
24 #include "SingleLock.h"
26 using namespace std;
28 bool CJob::ShouldCancel(unsigned int progress, unsigned int total) const
30 if (m_callback)
31 return m_callback->OnJobProgress(progress, total, this);
32 return false;
35 CJobWorker::CJobWorker(CJobManager *manager)
37 m_jobManager = manager;
38 Create(true); // start work immediately, and kill ourselves when we're done
41 CJobWorker::~CJobWorker()
43 // while we should already be removed from the job manager, if an exception
44 // occurs during processing, we may skip over that step. Thus, before we
45 // go out of scope, ensure the job manager knows we're gone.
46 m_jobManager->RemoveWorker(this);
47 if(!IsAutoDelete())
48 StopThread();
51 void CJobWorker::Process()
53 SetPriority( GetMinPriority() );
54 SetName("Jobworker");
55 while (true)
57 // request an item from our manager (this call is blocking)
58 CJob *job = m_jobManager->GetNextJob(this);
59 if (!job)
60 break;
62 // we have a job to do
63 bool success = job->DoWork();
64 m_jobManager->OnJobComplete(success, job);
68 void CJobQueue::CJobPointer::CancelJob()
70 CJobManager::GetInstance().CancelJob(m_id);
71 m_id = 0;
74 CJobQueue::CJobQueue(bool lifo, unsigned int jobsAtOnce, CJob::PRIORITY priority)
75 : m_jobsAtOnce(jobsAtOnce), m_priority(priority), m_lifo(lifo)
79 CJobQueue::~CJobQueue()
81 CancelJobs();
84 void CJobQueue::OnJobComplete(unsigned int jobID, bool success, CJob *job)
86 CSingleLock lock(m_section);
87 // check if this job is in our processing list
88 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
89 if (i != m_processing.end())
90 m_processing.erase(i);
91 // request a new job be queued
92 QueueNextJob();
95 void CJobQueue::AddJob(CJob *job)
97 CSingleLock lock(m_section);
98 // check if we have this job already. If so, we're done.
99 if (find(m_jobQueue.begin(), m_jobQueue.end(), job) != m_jobQueue.end() ||
100 find(m_processing.begin(), m_processing.end(), job) != m_processing.end())
102 delete job;
103 return;
106 if (m_lifo)
107 m_jobQueue.push_back(CJobPointer(job));
108 else
109 m_jobQueue.push_front(CJobPointer(job));
110 QueueNextJob();
113 void CJobQueue::QueueNextJob()
115 CSingleLock lock(m_section);
116 if (m_jobQueue.size() && m_processing.size() < m_jobsAtOnce)
118 CJobPointer &job = m_jobQueue.back();
119 job.m_id = CJobManager::GetInstance().AddJob(job.m_job, this, m_priority);
120 m_processing.push_back(job);
121 m_jobQueue.pop_back();
125 void CJobQueue::CancelJobs()
127 CSingleLock lock(m_section);
128 for_each(m_processing.begin(), m_processing.end(), mem_fun_ref(&CJobPointer::CancelJob));
129 for_each(m_jobQueue.begin(), m_jobQueue.end(), mem_fun_ref(&CJobPointer::FreeJob));
130 m_jobQueue.clear();
131 m_processing.clear();
134 CJobManager &CJobManager::GetInstance()
136 static CJobManager sJobManager;
137 return sJobManager;
140 CJobManager::CJobManager()
142 m_jobCounter = 0;
143 m_running = true;
146 void CJobManager::CancelJobs()
148 CSingleLock lock(m_section);
149 m_running = false;
151 // clear any pending jobs
152 for (unsigned int priority = CJob::PRIORITY_LOW; priority <= CJob::PRIORITY_HIGH; ++priority)
154 for_each(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), mem_fun_ref(&CWorkItem::FreeJob));
155 m_jobQueue[priority].clear();
158 // cancel any callbacks on jobs still processing
159 for_each(m_processing.begin(), m_processing.end(), mem_fun_ref(&CWorkItem::Cancel));
161 // tell our workers to finish
162 while (m_workers.size())
164 lock.Leave();
165 m_jobEvent.Set();
166 Sleep(0); // yield after setting the event to give the workers some time to die
167 lock.Enter();
171 CJobManager::~CJobManager()
175 unsigned int CJobManager::AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority)
177 CSingleLock lock(m_section);
179 // create a work item for this job
180 CWorkItem work(job, m_jobCounter++, callback);
181 m_jobQueue[priority].push_back(work);
183 StartWorkers(priority);
184 return work.m_id;
187 void CJobManager::CancelJob(unsigned int jobID)
189 CSingleLock lock(m_section);
191 // check whether we have this job in the queue
192 for (unsigned int priority = CJob::PRIORITY_LOW; priority <= CJob::PRIORITY_HIGH; ++priority)
194 JobQueue::iterator i = find(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), jobID);
195 if (i != m_jobQueue[priority].end())
197 delete i->m_job;
198 m_jobQueue[priority].erase(i);
199 return;
202 // or if we're processing it
203 Processing::iterator it = find(m_processing.begin(), m_processing.end(), jobID);
204 if (it != m_processing.end())
205 it->m_callback = NULL; // job is in progress, so only thing to do is to remove callback
208 void CJobManager::StartWorkers(CJob::PRIORITY priority)
210 CSingleLock lock(m_section);
212 // check how many free threads we have
213 if (m_processing.size() >= GetMaxWorkers(priority))
214 return;
216 // do we have any sleeping threads?
217 if (m_processing.size() < m_workers.size())
219 m_jobEvent.Set();
220 return;
223 // everyone is busy - we need more workers
224 m_workers.push_back(new CJobWorker(this));
227 CJob *CJobManager::PopJob()
229 CSingleLock lock(m_section);
230 for (int priority = CJob::PRIORITY_HIGH; priority >= CJob::PRIORITY_LOW; --priority)
232 if (m_jobQueue[priority].size() && m_processing.size() < GetMaxWorkers(CJob::PRIORITY(priority)))
234 CWorkItem job = m_jobQueue[priority].front();
235 m_jobQueue[priority].pop_front();
236 // add to the processing vector
237 m_processing.push_back(job);
238 job.m_job->m_callback = this;
239 return job.m_job;
242 return NULL;
245 CJob *CJobManager::GetNextJob(const CJobWorker *worker)
247 CSingleLock lock(m_section);
248 while (m_running)
250 // grab a job off the queue if we have one
251 CJob *job = PopJob();
252 if (job)
253 return job;
254 // no jobs are left - sleep for 30 seconds to allow new jobs to come in
255 lock.Leave();
256 bool newJob = m_jobEvent.WaitMSec(30000);
257 lock.Enter();
258 if (!newJob)
259 break;
261 // ensure no jobs have come in during the period after
262 // timeout and before we held the lock
263 CJob *job = PopJob();
264 if (job)
265 return job;
266 // have no jobs
267 RemoveWorker(worker);
268 return NULL;
271 bool CJobManager::OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const
273 CSingleLock lock(m_section);
274 // find the job in the processing queue, and check whether it's cancelled (no callback)
275 Processing::const_iterator i = find(m_processing.begin(), m_processing.end(), job);
276 if (i != m_processing.end())
278 CWorkItem item(*i);
279 lock.Leave(); // leave section prior to call
280 if (item.m_callback)
282 item.m_callback->OnJobProgress(item.m_id, progress, total, job);
283 return false;
286 return true; // couldn't find the job, or it's been cancelled
289 void CJobManager::OnJobComplete(bool success, CJob *job)
291 CSingleLock lock(m_section);
292 // remove the job from the processing queue
293 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
294 if (i != m_processing.end())
296 // tell any listeners we're done with the job, then delete it
297 CWorkItem item(*i);
298 m_processing.erase(i);
299 lock.Leave();
300 if (item.m_callback)
301 item.m_callback->OnJobComplete(item.m_id, success, item.m_job);
302 item.FreeJob();
306 void CJobManager::RemoveWorker(const CJobWorker *worker)
308 CSingleLock lock(m_section);
309 // remove our worker
310 Workers::iterator i = find(m_workers.begin(), m_workers.end(), worker);
311 if (i != m_workers.end())
312 m_workers.erase(i); // workers auto-delete
315 unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority) const
317 static const unsigned int max_workers = 5;
318 return max_workers - (CJob::PRIORITY_HIGH - priority);