Localisation updates from https://translatewiki.net.
[mediawiki.git] / includes / jobqueue / JobQueue.php
blob3f128851250ad5f0ca47558d594b2be07588d371
1 <?php
2 /**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
18 * @file
21 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
22 use MediaWiki\JobQueue\JobFactory;
23 use MediaWiki\MediaWikiServices;
24 use Wikimedia\ObjectCache\WANObjectCache;
25 use Wikimedia\RequestTimeout\TimeoutException;
26 use Wikimedia\Stats\NullStatsdDataFactory;
27 use Wikimedia\UUID\GlobalIdGenerator;
29 /**
30 * @defgroup JobQueue JobQueue
33 * See [the architecture doc](@ref jobqueuearch) for more information.
36 /**
37 * Base class for queueing and running background jobs from a storage backend.
39 * See [the architecture doc](@ref jobqueuearch) for more information.
41 * @ingroup JobQueue
42 * @since 1.21
43 * @stable to extend
45 abstract class JobQueue {
46 /** @var string DB domain ID */
47 protected $domain;
48 /** @var string Job type */
49 protected $type;
50 /** @var string Job priority for pop() */
51 protected $order;
52 /** @var int Time to live in seconds */
53 protected $claimTTL;
54 /** @var int Maximum number of times to try a job */
55 protected $maxTries;
56 /** @var string|false Read only rationale (or false if r/w) */
57 protected $readOnlyReason;
58 /** @var StatsdDataFactoryInterface */
59 protected $stats;
60 /** @var GlobalIdGenerator */
61 protected $idGenerator;
63 /** @var WANObjectCache */
64 protected $wanCache;
66 /** @var bool */
67 protected $typeAgnostic;
69 private JobFactory $jobFactory;
71 /* Bit flag for "all-or-nothing" job insertions */
72 protected const QOS_ATOMIC = 1;
74 /* Seconds to remember root jobs (28 days) */
75 protected const ROOTJOB_TTL = 28 * 24 * 3600;
77 /**
78 * @stable to call
80 * @param array $params
81 * - type : A job type, 'default' if typeAgnostic is set
82 * - domain : A DB domain ID
83 * - idGenerator : A GlobalIdGenerator instance.
84 * - wanCache : An instance of WANObjectCache to use for caching [default: none]
85 * - stats : An instance of StatsdDataFactoryInterface [default: none]
86 * - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever]
87 * - maxTries : Total times a job can be tried, assuming claims expire [default: 3]
88 * - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
89 * - readOnlyReason : Mark the queue as read-only with this reason [default: false]
90 * - typeAgnostic : If the jobqueue should operate agnostic to the job types
91 * @throws JobQueueError
94 protected function __construct( array $params ) {
95 $this->domain = $params['domain'] ?? $params['wiki']; // b/c
96 $this->type = $params['type'];
97 $this->claimTTL = $params['claimTTL'] ?? 0;
98 $this->maxTries = $params['maxTries'] ?? 3;
99 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
100 $this->order = $params['order'];
101 } else {
102 $this->order = $this->optimalOrder();
104 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
105 throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
107 $this->readOnlyReason = $params['readOnlyReason'] ?? false;
108 $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
109 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
110 $this->idGenerator = $params['idGenerator'];
111 if ( ( $params['typeAgnostic'] ?? false ) && !$this->supportsTypeAgnostic() ) {
112 throw new JobQueueError( __CLASS__ . " does not support type agnostic queues." );
114 $this->typeAgnostic = ( $params['typeAgnostic'] ?? false );
115 if ( $this->typeAgnostic ) {
116 $this->type = 'default';
119 $this->jobFactory = MediaWikiServices::getInstance()->getJobFactory();
123 * Get a job queue object of the specified type.
124 * $params includes:
125 * - class : What job class to use (determines job type)
126 * - domain : Database domain ID of the wiki the jobs are for (defaults to current wiki)
127 * - type : The name of the job types this queue handles
128 * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
129 * If "fifo" is used, the queue will effectively be FIFO. Note that job
130 * completion will not appear to be exactly FIFO if there are multiple
131 * job runners since jobs can take different times to finish once popped.
132 * If "timestamp" is used, the queue will at least be loosely ordered
133 * by timestamp, allowing for some jobs to be popped off out of order.
134 * If "random" is used, pop() will pick jobs in random order.
135 * Note that it may only be weakly random (e.g. a lottery of the oldest X).
136 * If "any" is chosen, the queue will use whatever order is the fastest.
137 * This might be useful for improving concurrency for job acquisition.
138 * - claimTTL : If supported, the queue will recycle jobs that have been popped
139 * but not acknowledged as completed after this many seconds. Recycling
140 * of jobs simply means re-inserting them into the queue. Jobs can be
141 * attempted up to three times before being discarded.
142 * - readOnlyReason : Set this to a string to make the queue read-only. [optional]
143 * - idGenerator : A GlobalIdGenerator instance.
144 * - stats : A StatsdDataFactoryInterface. [optional]
146 * Queue classes should throw an exception if they do not support the options given.
148 * @param array $params
149 * @return JobQueue
150 * @throws JobQueueError
152 final public static function factory( array $params ) {
153 $class = $params['class'];
154 if ( !class_exists( $class ) ) {
155 throw new JobQueueError( "Invalid job queue class '$class'." );
158 $obj = new $class( $params );
159 if ( !( $obj instanceof self ) ) {
160 throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
163 return $obj;
167 * @return string Database domain ID
169 final public function getDomain() {
170 return $this->domain;
174 * @return string Job type that this queue handles
176 final public function getType() {
177 return $this->type;
181 * @return string One of (random, timestamp, fifo, undefined)
183 final public function getOrder() {
184 return $this->order;
188 * Get the allowed queue orders for configuration validation
190 * @return array Subset of (random, timestamp, fifo, undefined)
192 abstract protected function supportedOrders();
195 * Get the default queue order to use if configuration does not specify one
197 * @return string One of (random, timestamp, fifo, undefined)
199 abstract protected function optimalOrder();
202 * Find out if delayed jobs are supported for configuration validation
204 * @stable to override
205 * @return bool Whether delayed jobs are supported
207 protected function supportsDelayedJobs() {
208 return false; // not implemented
212 * @return bool Whether delayed jobs are enabled
213 * @since 1.22
215 final public function delayedJobsEnabled() {
216 return $this->supportsDelayedJobs();
220 * @return string|false Read-only rational or false if r/w
221 * @since 1.27
223 public function getReadOnlyReason() {
224 return $this->readOnlyReason;
228 * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
229 * Queue classes should use caching if they are any slower without memcached.
231 * If caching is used, this might return false when there are actually no jobs.
232 * If pop() is called and returns false then it should correct the cache. Also,
233 * calling flushCaches() first prevents this. However, this effect is typically
234 * not distinguishable from the race condition between isEmpty() and pop().
236 * @return bool
237 * @throws JobQueueError
239 final public function isEmpty() {
240 $res = $this->doIsEmpty();
242 return $res;
246 * @see JobQueue::isEmpty()
247 * @return bool
249 abstract protected function doIsEmpty();
252 * Get the number of available (unacquired, non-delayed) jobs in the queue.
253 * Queue classes should use caching if they are any slower without memcached.
255 * If caching is used, this number might be out of date for a minute.
257 * @return int
258 * @throws JobQueueError
260 final public function getSize() {
261 $res = $this->doGetSize();
263 return $res;
267 * @see JobQueue::getSize()
268 * @return int
270 abstract protected function doGetSize();
273 * Get the number of acquired jobs (these are temporarily out of the queue).
274 * Queue classes should use caching if they are any slower without memcached.
276 * If caching is used, this number might be out of date for a minute.
278 * @return int
279 * @throws JobQueueError
281 final public function getAcquiredCount() {
282 $res = $this->doGetAcquiredCount();
284 return $res;
288 * @see JobQueue::getAcquiredCount()
289 * @return int
291 abstract protected function doGetAcquiredCount();
294 * Get the number of delayed jobs (these are temporarily out of the queue).
295 * Queue classes should use caching if they are any slower without memcached.
297 * If caching is used, this number might be out of date for a minute.
299 * @return int
300 * @throws JobQueueError
301 * @since 1.22
303 final public function getDelayedCount() {
304 $res = $this->doGetDelayedCount();
306 return $res;
310 * @stable to override
311 * @see JobQueue::getDelayedCount()
312 * @return int
314 protected function doGetDelayedCount() {
315 return 0; // not implemented
319 * Get the number of acquired jobs that can no longer be attempted.
320 * Queue classes should use caching if they are any slower without memcached.
322 * If caching is used, this number might be out of date for a minute.
324 * @return int
325 * @throws JobQueueError
327 final public function getAbandonedCount() {
328 $res = $this->doGetAbandonedCount();
330 return $res;
334 * @stable to override
335 * @see JobQueue::getAbandonedCount()
336 * @return int
338 protected function doGetAbandonedCount() {
339 return 0; // not implemented
343 * Push one or more jobs into the queue.
344 * This does not require $wgJobClasses to be set for the given job type.
345 * Outside callers should use JobQueueGroup::push() instead of this function.
347 * @param IJobSpecification|IJobSpecification[] $jobs
348 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
349 * @return void
350 * @throws JobQueueError
352 final public function push( $jobs, $flags = 0 ) {
353 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
354 $this->batchPush( $jobs, $flags );
358 * Push a batch of jobs into the queue.
359 * This does not require $wgJobClasses to be set for the given job type.
360 * Outside callers should use JobQueueGroup::push() instead of this function.
362 * @param IJobSpecification[] $jobs
363 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
364 * @return void
365 * @throws JobQueueError
367 final public function batchPush( array $jobs, $flags = 0 ) {
368 $this->assertNotReadOnly();
370 if ( $jobs === [] ) {
371 return; // nothing to do
374 foreach ( $jobs as $job ) {
375 $this->assertMatchingJobType( $job );
376 if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
377 throw new JobQueueError(
378 "Got delayed '{$job->getType()}' job; delays are not supported." );
382 $this->doBatchPush( $jobs, $flags );
384 foreach ( $jobs as $job ) {
385 if ( $job->isRootJob() ) {
386 $this->deduplicateRootJob( $job );
392 * @see JobQueue::batchPush()
393 * @param IJobSpecification[] $jobs
394 * @param int $flags
396 abstract protected function doBatchPush( array $jobs, $flags );
399 * Pop a job off of the queue.
400 * This requires $wgJobClasses to be set for the given job type.
401 * Outside callers should use JobQueueGroup::pop() instead of this function.
403 * @throws JobQueueError
404 * @return RunnableJob|false Returns false if there are no jobs
406 final public function pop() {
407 $this->assertNotReadOnly();
409 $job = $this->doPop();
411 // Flag this job as an old duplicate based on its "root" job...
412 try {
413 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
414 $this->incrStats( 'dupe_pops', $job->getType() );
415 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
417 } catch ( TimeoutException $e ) {
418 throw $e;
419 } catch ( Exception $e ) {
420 // don't lose jobs over this
423 return $job;
427 * @see JobQueue::pop()
428 * @return RunnableJob|false
430 abstract protected function doPop();
433 * Acknowledge that a job was completed.
435 * This does nothing for certain queue classes or if "claimTTL" is not set.
436 * Outside callers should use JobQueueGroup::ack() instead of this function.
438 * @param RunnableJob $job
439 * @return void
440 * @throws JobQueueError
442 final public function ack( RunnableJob $job ) {
443 $this->assertNotReadOnly();
444 $this->assertMatchingJobType( $job );
446 $this->doAck( $job );
450 * @see JobQueue::ack()
451 * @param RunnableJob $job
453 abstract protected function doAck( RunnableJob $job );
456 * Register the "root job" of a given job into the queue for de-duplication.
457 * This should only be called right *after* all the new jobs have been inserted.
458 * This is used to turn older, duplicate, job entries into no-ops. The root job
459 * information will remain in the registry until it simply falls out of cache.
461 * This requires that $job has two special fields in the "params" array:
462 * - rootJobSignature : hash (e.g. SHA1) that identifies the task
463 * - rootJobTimestamp : TS_MW timestamp of this instance of the task
465 * A "root job" is a conceptual job that consist of potentially many smaller jobs
466 * that are actually inserted into the queue. For example, "refreshLinks" jobs are
467 * spawned when a template is edited. One can think of the task as "update links
468 * of pages that use template X" and an instance of that task as a "root job".
469 * However, what actually goes into the queue are range and leaf job subtypes.
470 * Since these jobs include things like page ID ranges and DB primary positions,
471 * and can morph into smaller jobs recursively, simple duplicate detection
472 * for individual jobs being identical (like that of job_sha1) is not useful.
474 * In the case of "refreshLinks", if these jobs are still in the queue when the template
475 * is edited again, we want all of these old refreshLinks jobs for that template to become
476 * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
477 * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
478 * previous "root job" for the same task of "update links of pages that use template X".
480 * This does nothing for certain queue classes.
482 * @internal For use within JobQueue only
483 * @param IJobSpecification $job
484 * @throws JobQueueError
485 * @return bool
487 final public function deduplicateRootJob( IJobSpecification $job ) {
488 $this->assertNotReadOnly();
489 $this->assertMatchingJobType( $job );
491 return $this->doDeduplicateRootJob( $job );
495 * @stable to override
496 * @see JobQueue::deduplicateRootJob()
497 * @param IJobSpecification $job
498 * @throws JobQueueError
499 * @return bool
501 protected function doDeduplicateRootJob( IJobSpecification $job ) {
502 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
503 if ( !$params ) {
504 throw new JobQueueError( "Cannot register root job; missing parameters." );
507 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
508 // Callers should call JobQueueGroup::push() before this method so that if the
509 // insert fails, the de-duplication registration will be aborted. Having only the
510 // de-duplication registration succeed would cause jobs to become no-ops without
511 // any actual jobs that made them redundant.
512 $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
513 if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
514 return true; // a newer version of this root job was enqueued
517 // Update the timestamp of the last root job started at the location...
518 return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
522 * Check if the "root" job of a given job has been superseded by a newer one
524 * @param IJobSpecification $job
525 * @throws JobQueueError
526 * @return bool
528 final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
529 $this->assertMatchingJobType( $job );
531 return $this->doIsRootJobOldDuplicate( $job );
535 * @stable to override
536 * @see JobQueue::isRootJobOldDuplicate()
537 * @param IJobSpecification $job
538 * @return bool
540 protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
541 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
542 if ( !$params ) {
543 return false; // job has no de-duplication info
546 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
547 // Get the last time this root job was enqueued
548 $timestamp = $this->wanCache->get( $key );
549 if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
550 // Update the timestamp of the last known root job started at the location...
551 $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
554 // Check if a new root job was started at the location after this one's...
555 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
559 * @param string $signature Hash identifier of the root job
560 * @param string $type job type
561 * @return string
563 protected function getRootJobCacheKey( $signature, $type ) {
564 return $this->wanCache->makeGlobalKey(
565 'jobqueue',
566 $this->domain,
567 $type,
568 'rootjob',
569 $signature
574 * Delete all unclaimed and delayed jobs from the queue
576 * @throws JobQueueError
577 * @since 1.22
578 * @return void
580 final public function delete() {
581 $this->assertNotReadOnly();
583 $this->doDelete();
587 * @stable to override
588 * @see JobQueue::delete()
589 * @throws JobQueueError
591 protected function doDelete() {
592 throw new JobQueueError( "This method is not implemented." );
596 * Wait for any replica DBs or backup servers to catch up.
598 * This does nothing for certain queue classes.
600 * @return void
601 * @throws JobQueueError
603 final public function waitForBackups() {
604 $this->doWaitForBackups();
608 * @stable to override
609 * @see JobQueue::waitForBackups()
610 * @return void
612 protected function doWaitForBackups() {
616 * Clear any process and persistent caches
618 * @return void
620 final public function flushCaches() {
621 $this->doFlushCaches();
625 * @stable to override
626 * @see JobQueue::flushCaches()
627 * @return void
629 protected function doFlushCaches() {
633 * Get an iterator to traverse over all available jobs in this queue.
634 * This does not include jobs that are currently acquired or delayed.
635 * Note: results may be stale if the queue is concurrently modified.
637 * @return Iterator<RunnableJob>
638 * @throws JobQueueError
640 abstract public function getAllQueuedJobs();
643 * Get an iterator to traverse over all delayed jobs in this queue.
644 * Note: results may be stale if the queue is concurrently modified.
646 * @stable to override
647 * @return Iterator<RunnableJob>
648 * @throws JobQueueError
649 * @since 1.22
651 public function getAllDelayedJobs() {
652 return new ArrayIterator( [] ); // not implemented
656 * Get an iterator to traverse over all claimed jobs in this queue
658 * Callers should be quick to iterator over it or few results
659 * will be returned due to jobs being acknowledged and deleted
661 * @stable to override
662 * @return Iterator<RunnableJob>
663 * @throws JobQueueError
664 * @since 1.26
666 public function getAllAcquiredJobs() {
667 return new ArrayIterator( [] ); // not implemented
671 * Get an iterator to traverse over all abandoned jobs in this queue
673 * @stable to override
674 * @return Iterator<RunnableJob>
675 * @throws JobQueueError
676 * @since 1.25
678 public function getAllAbandonedJobs() {
679 return new ArrayIterator( [] ); // not implemented
683 * Do not use this function outside of JobQueue/JobQueueGroup
685 * @stable to override
686 * @return string|null
687 * @since 1.22
689 public function getCoalesceLocationInternal() {
690 return null;
694 * Check whether each of the given queues are empty.
695 * This is used for batching checks for queues stored at the same place.
697 * @param array $types List of queues types
698 * @return array|null (list of non-empty queue types) or null if unsupported
699 * @throws JobQueueError
700 * @since 1.22
702 final public function getSiblingQueuesWithJobs( array $types ) {
703 return $this->doGetSiblingQueuesWithJobs( $types );
707 * @stable to override
708 * @see JobQueue::getSiblingQueuesWithJobs()
709 * @param array $types List of queues types
710 * @return array|null (list of queue types) or null if unsupported
712 protected function doGetSiblingQueuesWithJobs( array $types ) {
713 return null; // not supported
717 * Check the size of each of the given queues.
718 * For queues not served by the same store as this one, 0 is returned.
719 * This is used for batching checks for queues stored at the same place.
721 * @param array $types List of queues types
722 * @return array|null (job type => whether queue is empty) or null if unsupported
723 * @throws JobQueueError
724 * @since 1.22
726 final public function getSiblingQueueSizes( array $types ) {
727 return $this->doGetSiblingQueueSizes( $types );
731 * @stable to override
732 * @see JobQueue::getSiblingQueuesSize()
733 * @param array $types List of queues types
734 * @return array|null (list of queue types) or null if unsupported
736 protected function doGetSiblingQueueSizes( array $types ) {
737 return null; // not supported
741 * @param string $command
742 * @param array $params
743 * @return Job
745 protected function factoryJob( $command, $params ) {
746 return $this->jobFactory->newJob( $command, $params );
750 * @throws JobQueueReadOnlyError
752 protected function assertNotReadOnly() {
753 if ( $this->readOnlyReason !== false ) {
754 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
759 * @param IJobSpecification $job
760 * @throws JobQueueError
762 private function assertMatchingJobType( IJobSpecification $job ) {
763 if ( $this->typeAgnostic ) {
764 return;
766 if ( $job->getType() !== $this->type ) {
767 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
772 * Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type
774 * @param string $key Event type
775 * @param string $type Job type
776 * @param int $delta
777 * @since 1.22
779 protected function incrStats( $key, $type, $delta = 1 ) {
780 $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
781 $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
785 * Subclasses should set this to true if they support type agnostic queues
787 * @return bool
788 * @since 1.38
790 protected function supportsTypeAgnostic(): bool {
791 return false;