2 * Copyright (C) 2005-2008 Team XBMC
5 * This Program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2, or (at your option)
10 * This Program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with XBMC; see the file COPYING. If not, write to
17 * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
18 * http://www.gnu.org/copyleft/gpl.html
22 #include "JobManager.h"
24 #include "SingleLock.h"
28 bool CJob::ShouldCancel(unsigned int progress
, unsigned int total
) const
31 return m_callback
->OnJobProgress(progress
, total
, this);
35 CJobWorker::CJobWorker(CJobManager
*manager
)
37 m_jobManager
= manager
;
38 Create(true); // start work immediately, and kill ourselves when we're done
41 CJobWorker::~CJobWorker()
43 // while we should already be removed from the job manager, if an exception
44 // occurs during processing, we may skip over that step. Thus, before we
45 // go out of scope, ensure the job manager knows we're gone.
46 m_jobManager
->RemoveWorker(this);
51 void CJobWorker::Process()
53 SetPriority( GetMinPriority() );
57 // request an item from our manager (this call is blocking)
58 CJob
*job
= m_jobManager
->GetNextJob(this);
62 // we have a job to do
63 bool success
= job
->DoWork();
64 m_jobManager
->OnJobComplete(success
, job
);
68 void CJobQueue::CJobPointer::CancelJob()
70 CJobManager::GetInstance().CancelJob(m_id
);
74 CJobQueue::CJobQueue(bool lifo
, unsigned int jobsAtOnce
, CJob::PRIORITY priority
)
75 : m_jobsAtOnce(jobsAtOnce
), m_priority(priority
), m_lifo(lifo
)
79 CJobQueue::~CJobQueue()
84 void CJobQueue::OnJobComplete(unsigned int jobID
, bool success
, CJob
*job
)
86 CSingleLock
lock(m_section
);
87 // check if this job is in our processing list
88 Processing::iterator i
= find(m_processing
.begin(), m_processing
.end(), job
);
89 if (i
!= m_processing
.end())
90 m_processing
.erase(i
);
91 // request a new job be queued
95 void CJobQueue::AddJob(CJob
*job
)
97 CSingleLock
lock(m_section
);
98 // check if we have this job already. If so, we're done.
99 if (find(m_jobQueue
.begin(), m_jobQueue
.end(), job
) != m_jobQueue
.end() ||
100 find(m_processing
.begin(), m_processing
.end(), job
) != m_processing
.end())
107 m_jobQueue
.push_back(CJobPointer(job
));
109 m_jobQueue
.push_front(CJobPointer(job
));
113 void CJobQueue::QueueNextJob()
115 CSingleLock
lock(m_section
);
116 if (m_jobQueue
.size() && m_processing
.size() < m_jobsAtOnce
)
118 CJobPointer
&job
= m_jobQueue
.back();
119 job
.m_id
= CJobManager::GetInstance().AddJob(job
.m_job
, this, m_priority
);
120 m_processing
.push_back(job
);
121 m_jobQueue
.pop_back();
125 void CJobQueue::CancelJobs()
127 CSingleLock
lock(m_section
);
128 for_each(m_processing
.begin(), m_processing
.end(), mem_fun_ref(&CJobPointer::CancelJob
));
129 for_each(m_jobQueue
.begin(), m_jobQueue
.end(), mem_fun_ref(&CJobPointer::FreeJob
));
131 m_processing
.clear();
134 CJobManager
&CJobManager::GetInstance()
136 static CJobManager sJobManager
;
140 CJobManager::CJobManager()
146 void CJobManager::CancelJobs()
148 CSingleLock
lock(m_section
);
151 // clear any pending jobs
152 for (unsigned int priority
= CJob::PRIORITY_LOW
; priority
<= CJob::PRIORITY_HIGH
; ++priority
)
154 for_each(m_jobQueue
[priority
].begin(), m_jobQueue
[priority
].end(), mem_fun_ref(&CWorkItem::FreeJob
));
155 m_jobQueue
[priority
].clear();
158 // cancel any callbacks on jobs still processing
159 for_each(m_processing
.begin(), m_processing
.end(), mem_fun_ref(&CWorkItem::Cancel
));
161 // tell our workers to finish
162 while (m_workers
.size())
166 Sleep(0); // yield after setting the event to give the workers some time to die
171 CJobManager::~CJobManager()
175 unsigned int CJobManager::AddJob(CJob
*job
, IJobCallback
*callback
, CJob::PRIORITY priority
)
177 CSingleLock
lock(m_section
);
179 // create a work item for this job
180 CWorkItem
work(job
, m_jobCounter
++, callback
);
181 m_jobQueue
[priority
].push_back(work
);
183 StartWorkers(priority
);
187 void CJobManager::CancelJob(unsigned int jobID
)
189 CSingleLock
lock(m_section
);
191 // check whether we have this job in the queue
192 for (unsigned int priority
= CJob::PRIORITY_LOW
; priority
<= CJob::PRIORITY_HIGH
; ++priority
)
194 JobQueue::iterator i
= find(m_jobQueue
[priority
].begin(), m_jobQueue
[priority
].end(), jobID
);
195 if (i
!= m_jobQueue
[priority
].end())
198 m_jobQueue
[priority
].erase(i
);
202 // or if we're processing it
203 Processing::iterator it
= find(m_processing
.begin(), m_processing
.end(), jobID
);
204 if (it
!= m_processing
.end())
205 it
->m_callback
= NULL
; // job is in progress, so only thing to do is to remove callback
208 void CJobManager::StartWorkers(CJob::PRIORITY priority
)
210 CSingleLock
lock(m_section
);
212 // check how many free threads we have
213 if (m_processing
.size() >= GetMaxWorkers(priority
))
216 // do we have any sleeping threads?
217 if (m_processing
.size() < m_workers
.size())
223 // everyone is busy - we need more workers
224 m_workers
.push_back(new CJobWorker(this));
227 CJob
*CJobManager::PopJob()
229 CSingleLock
lock(m_section
);
230 for (int priority
= CJob::PRIORITY_HIGH
; priority
>= CJob::PRIORITY_LOW
; --priority
)
232 if (m_jobQueue
[priority
].size() && m_processing
.size() < GetMaxWorkers(CJob::PRIORITY(priority
)))
234 CWorkItem job
= m_jobQueue
[priority
].front();
235 m_jobQueue
[priority
].pop_front();
236 // add to the processing vector
237 m_processing
.push_back(job
);
238 job
.m_job
->m_callback
= this;
245 CJob
*CJobManager::GetNextJob(const CJobWorker
*worker
)
247 CSingleLock
lock(m_section
);
250 // grab a job off the queue if we have one
251 CJob
*job
= PopJob();
254 // no jobs are left - sleep for 30 seconds to allow new jobs to come in
256 bool newJob
= m_jobEvent
.WaitMSec(30000);
261 // ensure no jobs have come in during the period after
262 // timeout and before we held the lock
263 CJob
*job
= PopJob();
267 RemoveWorker(worker
);
271 bool CJobManager::OnJobProgress(unsigned int progress
, unsigned int total
, const CJob
*job
) const
273 CSingleLock
lock(m_section
);
274 // find the job in the processing queue, and check whether it's cancelled (no callback)
275 Processing::const_iterator i
= find(m_processing
.begin(), m_processing
.end(), job
);
276 if (i
!= m_processing
.end())
279 lock
.Leave(); // leave section prior to call
282 item
.m_callback
->OnJobProgress(item
.m_id
, progress
, total
, job
);
286 return true; // couldn't find the job, or it's been cancelled
289 void CJobManager::OnJobComplete(bool success
, CJob
*job
)
291 CSingleLock
lock(m_section
);
292 // remove the job from the processing queue
293 Processing::iterator i
= find(m_processing
.begin(), m_processing
.end(), job
);
294 if (i
!= m_processing
.end())
296 // tell any listeners we're done with the job, then delete it
298 m_processing
.erase(i
);
301 item
.m_callback
->OnJobComplete(item
.m_id
, success
, item
.m_job
);
306 void CJobManager::RemoveWorker(const CJobWorker
*worker
)
308 CSingleLock
lock(m_section
);
310 Workers::iterator i
= find(m_workers
.begin(), m_workers
.end(), worker
);
311 if (i
!= m_workers
.end())
312 m_workers
.erase(i
); // workers auto-delete
315 unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority
) const
317 static const unsigned int max_workers
= 5;
318 return max_workers
- (CJob::PRIORITY_HIGH
- priority
);