Merge pull request #26166 from ksooo/improve-plugin-ctx-menus
[xbmc.git] / xbmc / utils / JobManager.cpp
blob55d35e985be1ca4b2c1332b81ca82d1bb8bf0cc7
1 /*
2 * Copyright (C) 2005-2018 Team Kodi
3 * This file is part of Kodi - https://kodi.tv
5 * SPDX-License-Identifier: GPL-2.0-or-later
6 * See LICENSES/README.md for more information.
7 */
9 #include "JobManager.h"
11 #include "ServiceBroker.h"
12 #include "utils/XTimeUtils.h"
13 #include "utils/log.h"
15 #include <algorithm>
16 #include <functional>
17 #include <mutex>
18 #include <stdexcept>
20 using namespace std::chrono_literals;
22 bool CJob::ShouldCancel(unsigned int progress, unsigned int total) const
24 if (m_callback)
25 return m_callback->OnJobProgress(progress, total, this);
26 return false;
29 CJobWorker::CJobWorker(CJobManager *manager) : CThread("JobWorker")
31 m_jobManager = manager;
32 Create(true); // start work immediately, and kill ourselves when we're done
35 CJobWorker::~CJobWorker()
37 m_jobManager->RemoveWorker(this);
38 if(!IsAutoDelete())
39 StopThread();
42 void CJobWorker::Process()
44 SetPriority(ThreadPriority::LOWEST);
45 while (true)
47 // request an item from our manager (this call is blocking)
48 CJob* job = m_jobManager->GetNextJob();
49 if (!job)
50 break;
52 bool success = false;
53 try
55 success = job->DoWork();
57 catch (...)
59 CLog::Log(LOGERROR, "{} error processing job {}", __FUNCTION__, job->GetType());
61 m_jobManager->OnJobComplete(success, job);
65 void CJobQueue::CJobPointer::CancelJob()
67 CServiceBroker::GetJobManager()->CancelJob(m_id);
68 m_id = 0;
71 CJobQueue::CJobQueue(bool lifo, unsigned int jobsAtOnce, CJob::PRIORITY priority)
72 : m_jobsAtOnce(jobsAtOnce), m_priority(priority), m_lifo(lifo)
76 CJobQueue::~CJobQueue()
78 CancelJobs();
81 void CJobQueue::OnJobComplete(unsigned int jobID, bool success, CJob *job)
83 OnJobNotify(job);
86 void CJobQueue::OnJobAbort(unsigned int jobID, CJob* job)
88 OnJobNotify(job);
91 void CJobQueue::CancelJob(const CJob *job)
93 std::unique_lock<CCriticalSection> lock(m_section);
94 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
95 if (i != m_processing.end())
97 i->CancelJob();
98 m_processing.erase(i);
99 return;
101 Queue::iterator j = find(m_jobQueue.begin(), m_jobQueue.end(), job);
102 if (j != m_jobQueue.end())
104 j->FreeJob();
105 m_jobQueue.erase(j);
109 bool CJobQueue::AddJob(CJob *job)
111 std::unique_lock<CCriticalSection> lock(m_section);
112 // check if we have this job already. If so, we're done.
113 if (find(m_jobQueue.begin(), m_jobQueue.end(), job) != m_jobQueue.end() ||
114 find(m_processing.begin(), m_processing.end(), job) != m_processing.end())
116 delete job;
117 return false;
120 if (m_lifo)
121 m_jobQueue.emplace_back(job);
122 else
123 m_jobQueue.emplace_front(job);
124 QueueNextJob();
126 return true;
129 void CJobQueue::OnJobNotify(CJob* job)
131 std::unique_lock<CCriticalSection> lock(m_section);
133 // check if this job is in our processing list
134 const auto it = std::find(m_processing.begin(), m_processing.end(), job);
135 if (it != m_processing.end())
136 m_processing.erase(it);
137 // request a new job be queued
138 QueueNextJob();
141 void CJobQueue::QueueNextJob()
143 std::unique_lock<CCriticalSection> lock(m_section);
144 while (m_jobQueue.size() && m_processing.size() < m_jobsAtOnce)
146 CJobPointer &job = m_jobQueue.back();
147 job.m_id = CServiceBroker::GetJobManager()->AddJob(job.m_job, this, m_priority);
148 if (job.m_id > 0)
150 m_processing.emplace_back(job);
151 m_jobQueue.pop_back();
152 return;
154 m_jobQueue.pop_back();
158 void CJobQueue::CancelJobs()
160 std::unique_lock<CCriticalSection> lock(m_section);
161 for_each(m_processing.begin(), m_processing.end(), [](CJobPointer& jp) { jp.CancelJob(); });
162 for_each(m_jobQueue.begin(), m_jobQueue.end(), [](CJobPointer& jp) { jp.FreeJob(); });
163 m_jobQueue.clear();
164 m_processing.clear();
167 bool CJobQueue::IsProcessing() const
169 return CServiceBroker::GetJobManager()->m_running &&
170 (!m_processing.empty() || !m_jobQueue.empty());
173 bool CJobQueue::QueueEmpty() const
175 std::unique_lock<CCriticalSection> lock(m_section);
176 return m_jobQueue.empty();
179 CJobManager::CJobManager()
181 m_jobCounter = 0;
182 m_running = true;
183 m_pauseJobs = false;
186 void CJobManager::Restart()
188 std::unique_lock<CCriticalSection> lock(m_section);
190 if (m_running)
191 throw std::logic_error("CJobManager already running");
192 m_running = true;
195 void CJobManager::CancelJobs()
197 std::unique_lock<CCriticalSection> lock(m_section);
198 m_running = false;
200 // clear any pending jobs
201 for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_DEDICATED; ++priority)
203 std::for_each(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), [](CWorkItem& wi) {
204 if (wi.m_callback)
205 wi.m_callback->OnJobAbort(wi.m_id, wi.m_job);
206 wi.FreeJob();
208 m_jobQueue[priority].clear();
211 // cancel any callbacks on jobs still processing
212 std::for_each(m_processing.begin(), m_processing.end(), [](CWorkItem& wi) {
213 if (wi.m_callback)
214 wi.m_callback->OnJobAbort(wi.m_id, wi.m_job);
215 wi.Cancel();
218 // tell our workers to finish
219 while (m_workers.size())
221 lock.unlock();
222 m_jobEvent.Set();
223 std::this_thread::yield(); // yield after setting the event to give the workers some time to die
224 lock.lock();
228 unsigned int CJobManager::AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority)
230 std::unique_lock<CCriticalSection> lock(m_section);
232 if (!m_running)
234 delete job;
235 return 0;
238 // increment the job counter, ensuring 0 (invalid job) is never hit
239 m_jobCounter++;
240 if (m_jobCounter == 0)
241 m_jobCounter++;
243 // create a work item for this job
244 CWorkItem work(job, m_jobCounter, priority, callback);
245 m_jobQueue[priority].push_back(work);
247 StartWorkers(priority);
248 return work.m_id;
251 void CJobManager::CancelJob(unsigned int jobID)
253 std::unique_lock<CCriticalSection> lock(m_section);
255 // check whether we have this job in the queue
256 for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_DEDICATED; ++priority)
258 JobQueue::iterator i = find(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), jobID);
259 if (i != m_jobQueue[priority].end())
261 delete i->m_job;
262 m_jobQueue[priority].erase(i);
263 return;
266 // or if we're processing it
267 Processing::iterator it = find(m_processing.begin(), m_processing.end(), jobID);
268 if (it != m_processing.end())
269 it->m_callback = NULL; // job is in progress, so only thing to do is to remove callback
272 void CJobManager::StartWorkers(CJob::PRIORITY priority)
274 std::unique_lock<CCriticalSection> lock(m_section);
276 // check how many free threads we have
277 if (m_processing.size() >= GetMaxWorkers(priority))
278 return;
280 // do we have any sleeping threads?
281 if (m_processing.size() < m_workers.size())
283 m_jobEvent.Set();
284 return;
287 // everyone is busy - we need more workers
288 m_workers.push_back(new CJobWorker(this));
291 CJob *CJobManager::PopJob()
293 std::unique_lock<CCriticalSection> lock(m_section);
294 for (int priority = CJob::PRIORITY_DEDICATED; priority >= CJob::PRIORITY_LOW_PAUSABLE; --priority)
296 // Check whether we're pausing pausable jobs
297 if (priority == CJob::PRIORITY_LOW_PAUSABLE && m_pauseJobs)
298 continue;
300 if (m_jobQueue[priority].size() && m_processing.size() < GetMaxWorkers(CJob::PRIORITY(priority)))
302 // pop the job off the queue
303 CWorkItem job = m_jobQueue[priority].front();
304 m_jobQueue[priority].pop_front();
306 // add to the processing vector
307 m_processing.push_back(job);
308 job.m_job->m_callback = this;
309 return job.m_job;
312 return NULL;
315 void CJobManager::PauseJobs()
317 std::unique_lock<CCriticalSection> lock(m_section);
318 m_pauseJobs = true;
321 void CJobManager::UnPauseJobs()
323 std::unique_lock<CCriticalSection> lock(m_section);
324 m_pauseJobs = false;
327 bool CJobManager::IsProcessing(const CJob::PRIORITY &priority) const
329 std::unique_lock<CCriticalSection> lock(m_section);
331 if (m_pauseJobs)
332 return false;
334 for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); ++it)
336 if (priority == it->m_priority)
337 return true;
339 return false;
342 int CJobManager::IsProcessing(const std::string &type) const
344 int jobsMatched = 0;
345 std::unique_lock<CCriticalSection> lock(m_section);
347 if (m_pauseJobs)
348 return 0;
350 for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); ++it)
352 if (type == std::string(it->m_job->GetType()))
353 jobsMatched++;
355 return jobsMatched;
358 CJob* CJobManager::GetNextJob()
360 std::unique_lock<CCriticalSection> lock(m_section);
361 while (m_running)
363 // grab a job off the queue if we have one
364 CJob *job = PopJob();
365 if (job)
366 return job;
367 // no jobs are left - sleep for 30 seconds to allow new jobs to come in
368 lock.unlock();
369 bool newJob = m_jobEvent.Wait(30000ms);
370 lock.lock();
371 if (!newJob)
372 break;
374 // ensure no jobs have come in during the period after
375 // timeout and before we held the lock
376 return PopJob();
379 bool CJobManager::OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const
381 std::unique_lock<CCriticalSection> lock(m_section);
382 // find the job in the processing queue, and check whether it's cancelled (no callback)
383 Processing::const_iterator i = find(m_processing.begin(), m_processing.end(), job);
384 if (i != m_processing.end())
386 CWorkItem item(*i);
387 lock.unlock(); // leave section prior to call
388 if (item.m_callback)
390 item.m_callback->OnJobProgress(item.m_id, progress, total, job);
391 return false;
394 return true; // couldn't find the job, or it's been cancelled
397 void CJobManager::OnJobComplete(bool success, CJob *job)
399 std::unique_lock<CCriticalSection> lock(m_section);
400 // remove the job from the processing queue
401 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
402 if (i != m_processing.end())
404 // tell any listeners we're done with the job, then delete it
405 CWorkItem item(*i);
406 lock.unlock();
409 if (item.m_callback)
410 item.m_callback->OnJobComplete(item.m_id, success, item.m_job);
412 catch (...)
414 CLog::Log(LOGERROR, "{} error processing job {}", __FUNCTION__, item.m_job->GetType());
416 lock.lock();
417 Processing::iterator j = find(m_processing.begin(), m_processing.end(), job);
418 if (j != m_processing.end())
419 m_processing.erase(j);
420 lock.unlock();
421 item.FreeJob();
425 void CJobManager::RemoveWorker(const CJobWorker *worker)
427 std::unique_lock<CCriticalSection> lock(m_section);
428 // remove our worker
429 Workers::iterator i = find(m_workers.begin(), m_workers.end(), worker);
430 if (i != m_workers.end())
431 m_workers.erase(i); // workers auto-delete
434 unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority)
436 static const unsigned int max_workers = 5;
437 if (priority == CJob::PRIORITY_DEDICATED)
438 return 10000; // A large number..
439 return max_workers - (CJob::PRIORITY_HIGH - priority);