2 * Copyright (C) 2005-2018 Team Kodi
3 * This file is part of Kodi - https://kodi.tv
5 * SPDX-License-Identifier: GPL-2.0-or-later
6 * See LICENSES/README.md for more information.
9 #include "JobManager.h"
11 #include "ServiceBroker.h"
12 #include "utils/XTimeUtils.h"
13 #include "utils/log.h"
20 using namespace std::chrono_literals
;
22 bool CJob::ShouldCancel(unsigned int progress
, unsigned int total
) const
25 return m_callback
->OnJobProgress(progress
, total
, this);
29 CJobWorker::CJobWorker(CJobManager
*manager
) : CThread("JobWorker")
31 m_jobManager
= manager
;
32 Create(true); // start work immediately, and kill ourselves when we're done
35 CJobWorker::~CJobWorker()
37 m_jobManager
->RemoveWorker(this);
42 void CJobWorker::Process()
44 SetPriority(ThreadPriority::LOWEST
);
47 // request an item from our manager (this call is blocking)
48 CJob
* job
= m_jobManager
->GetNextJob();
55 success
= job
->DoWork();
59 CLog::Log(LOGERROR
, "{} error processing job {}", __FUNCTION__
, job
->GetType());
61 m_jobManager
->OnJobComplete(success
, job
);
65 void CJobQueue::CJobPointer::CancelJob()
67 CServiceBroker::GetJobManager()->CancelJob(m_id
);
71 CJobQueue::CJobQueue(bool lifo
, unsigned int jobsAtOnce
, CJob::PRIORITY priority
)
72 : m_jobsAtOnce(jobsAtOnce
), m_priority(priority
), m_lifo(lifo
)
76 CJobQueue::~CJobQueue()
81 void CJobQueue::OnJobComplete(unsigned int jobID
, bool success
, CJob
*job
)
86 void CJobQueue::OnJobAbort(unsigned int jobID
, CJob
* job
)
91 void CJobQueue::CancelJob(const CJob
*job
)
93 std::unique_lock
<CCriticalSection
> lock(m_section
);
94 Processing::iterator i
= find(m_processing
.begin(), m_processing
.end(), job
);
95 if (i
!= m_processing
.end())
98 m_processing
.erase(i
);
101 Queue::iterator j
= find(m_jobQueue
.begin(), m_jobQueue
.end(), job
);
102 if (j
!= m_jobQueue
.end())
109 bool CJobQueue::AddJob(CJob
*job
)
111 std::unique_lock
<CCriticalSection
> lock(m_section
);
112 // check if we have this job already. If so, we're done.
113 if (find(m_jobQueue
.begin(), m_jobQueue
.end(), job
) != m_jobQueue
.end() ||
114 find(m_processing
.begin(), m_processing
.end(), job
) != m_processing
.end())
121 m_jobQueue
.emplace_back(job
);
123 m_jobQueue
.emplace_front(job
);
129 void CJobQueue::OnJobNotify(CJob
* job
)
131 std::unique_lock
<CCriticalSection
> lock(m_section
);
133 // check if this job is in our processing list
134 const auto it
= std::find(m_processing
.begin(), m_processing
.end(), job
);
135 if (it
!= m_processing
.end())
136 m_processing
.erase(it
);
137 // request a new job be queued
141 void CJobQueue::QueueNextJob()
143 std::unique_lock
<CCriticalSection
> lock(m_section
);
144 while (m_jobQueue
.size() && m_processing
.size() < m_jobsAtOnce
)
146 CJobPointer
&job
= m_jobQueue
.back();
147 job
.m_id
= CServiceBroker::GetJobManager()->AddJob(job
.m_job
, this, m_priority
);
150 m_processing
.emplace_back(job
);
151 m_jobQueue
.pop_back();
154 m_jobQueue
.pop_back();
158 void CJobQueue::CancelJobs()
160 std::unique_lock
<CCriticalSection
> lock(m_section
);
161 for_each(m_processing
.begin(), m_processing
.end(), [](CJobPointer
& jp
) { jp
.CancelJob(); });
162 for_each(m_jobQueue
.begin(), m_jobQueue
.end(), [](CJobPointer
& jp
) { jp
.FreeJob(); });
164 m_processing
.clear();
167 bool CJobQueue::IsProcessing() const
169 return CServiceBroker::GetJobManager()->m_running
&&
170 (!m_processing
.empty() || !m_jobQueue
.empty());
173 bool CJobQueue::QueueEmpty() const
175 std::unique_lock
<CCriticalSection
> lock(m_section
);
176 return m_jobQueue
.empty();
179 CJobManager::CJobManager()
186 void CJobManager::Restart()
188 std::unique_lock
<CCriticalSection
> lock(m_section
);
191 throw std::logic_error("CJobManager already running");
195 void CJobManager::CancelJobs()
197 std::unique_lock
<CCriticalSection
> lock(m_section
);
200 // clear any pending jobs
201 for (unsigned int priority
= CJob::PRIORITY_LOW_PAUSABLE
; priority
<= CJob::PRIORITY_DEDICATED
; ++priority
)
203 std::for_each(m_jobQueue
[priority
].begin(), m_jobQueue
[priority
].end(), [](CWorkItem
& wi
) {
205 wi
.m_callback
->OnJobAbort(wi
.m_id
, wi
.m_job
);
208 m_jobQueue
[priority
].clear();
211 // cancel any callbacks on jobs still processing
212 std::for_each(m_processing
.begin(), m_processing
.end(), [](CWorkItem
& wi
) {
214 wi
.m_callback
->OnJobAbort(wi
.m_id
, wi
.m_job
);
218 // tell our workers to finish
219 while (m_workers
.size())
223 std::this_thread::yield(); // yield after setting the event to give the workers some time to die
228 unsigned int CJobManager::AddJob(CJob
*job
, IJobCallback
*callback
, CJob::PRIORITY priority
)
230 std::unique_lock
<CCriticalSection
> lock(m_section
);
238 // increment the job counter, ensuring 0 (invalid job) is never hit
240 if (m_jobCounter
== 0)
243 // create a work item for this job
244 CWorkItem
work(job
, m_jobCounter
, priority
, callback
);
245 m_jobQueue
[priority
].push_back(work
);
247 StartWorkers(priority
);
251 void CJobManager::CancelJob(unsigned int jobID
)
253 std::unique_lock
<CCriticalSection
> lock(m_section
);
255 // check whether we have this job in the queue
256 for (unsigned int priority
= CJob::PRIORITY_LOW_PAUSABLE
; priority
<= CJob::PRIORITY_DEDICATED
; ++priority
)
258 JobQueue::iterator i
= find(m_jobQueue
[priority
].begin(), m_jobQueue
[priority
].end(), jobID
);
259 if (i
!= m_jobQueue
[priority
].end())
262 m_jobQueue
[priority
].erase(i
);
266 // or if we're processing it
267 Processing::iterator it
= find(m_processing
.begin(), m_processing
.end(), jobID
);
268 if (it
!= m_processing
.end())
269 it
->m_callback
= NULL
; // job is in progress, so only thing to do is to remove callback
272 void CJobManager::StartWorkers(CJob::PRIORITY priority
)
274 std::unique_lock
<CCriticalSection
> lock(m_section
);
276 // check how many free threads we have
277 if (m_processing
.size() >= GetMaxWorkers(priority
))
280 // do we have any sleeping threads?
281 if (m_processing
.size() < m_workers
.size())
287 // everyone is busy - we need more workers
288 m_workers
.push_back(new CJobWorker(this));
291 CJob
*CJobManager::PopJob()
293 std::unique_lock
<CCriticalSection
> lock(m_section
);
294 for (int priority
= CJob::PRIORITY_DEDICATED
; priority
>= CJob::PRIORITY_LOW_PAUSABLE
; --priority
)
296 // Check whether we're pausing pausable jobs
297 if (priority
== CJob::PRIORITY_LOW_PAUSABLE
&& m_pauseJobs
)
300 if (m_jobQueue
[priority
].size() && m_processing
.size() < GetMaxWorkers(CJob::PRIORITY(priority
)))
302 // pop the job off the queue
303 CWorkItem job
= m_jobQueue
[priority
].front();
304 m_jobQueue
[priority
].pop_front();
306 // add to the processing vector
307 m_processing
.push_back(job
);
308 job
.m_job
->m_callback
= this;
315 void CJobManager::PauseJobs()
317 std::unique_lock
<CCriticalSection
> lock(m_section
);
321 void CJobManager::UnPauseJobs()
323 std::unique_lock
<CCriticalSection
> lock(m_section
);
327 bool CJobManager::IsProcessing(const CJob::PRIORITY
&priority
) const
329 std::unique_lock
<CCriticalSection
> lock(m_section
);
334 for(Processing::const_iterator it
= m_processing
.begin(); it
< m_processing
.end(); ++it
)
336 if (priority
== it
->m_priority
)
342 int CJobManager::IsProcessing(const std::string
&type
) const
345 std::unique_lock
<CCriticalSection
> lock(m_section
);
350 for(Processing::const_iterator it
= m_processing
.begin(); it
< m_processing
.end(); ++it
)
352 if (type
== std::string(it
->m_job
->GetType()))
358 CJob
* CJobManager::GetNextJob()
360 std::unique_lock
<CCriticalSection
> lock(m_section
);
363 // grab a job off the queue if we have one
364 CJob
*job
= PopJob();
367 // no jobs are left - sleep for 30 seconds to allow new jobs to come in
369 bool newJob
= m_jobEvent
.Wait(30000ms
);
374 // ensure no jobs have come in during the period after
375 // timeout and before we held the lock
379 bool CJobManager::OnJobProgress(unsigned int progress
, unsigned int total
, const CJob
*job
) const
381 std::unique_lock
<CCriticalSection
> lock(m_section
);
382 // find the job in the processing queue, and check whether it's cancelled (no callback)
383 Processing::const_iterator i
= find(m_processing
.begin(), m_processing
.end(), job
);
384 if (i
!= m_processing
.end())
387 lock
.unlock(); // leave section prior to call
390 item
.m_callback
->OnJobProgress(item
.m_id
, progress
, total
, job
);
394 return true; // couldn't find the job, or it's been cancelled
397 void CJobManager::OnJobComplete(bool success
, CJob
*job
)
399 std::unique_lock
<CCriticalSection
> lock(m_section
);
400 // remove the job from the processing queue
401 Processing::iterator i
= find(m_processing
.begin(), m_processing
.end(), job
);
402 if (i
!= m_processing
.end())
404 // tell any listeners we're done with the job, then delete it
410 item
.m_callback
->OnJobComplete(item
.m_id
, success
, item
.m_job
);
414 CLog::Log(LOGERROR
, "{} error processing job {}", __FUNCTION__
, item
.m_job
->GetType());
417 Processing::iterator j
= find(m_processing
.begin(), m_processing
.end(), job
);
418 if (j
!= m_processing
.end())
419 m_processing
.erase(j
);
425 void CJobManager::RemoveWorker(const CJobWorker
*worker
)
427 std::unique_lock
<CCriticalSection
> lock(m_section
);
429 Workers::iterator i
= find(m_workers
.begin(), m_workers
.end(), worker
);
430 if (i
!= m_workers
.end())
431 m_workers
.erase(i
); // workers auto-delete
434 unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority
)
436 static const unsigned int max_workers
= 5;
437 if (priority
== CJob::PRIORITY_DEDICATED
)
438 return 10000; // A large number..
439 return max_workers
- (CJob::PRIORITY_HIGH
- priority
);