5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
21 * @defgroup JobQueue JobQueue
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface
;
24 use MediaWiki\JobQueue\JobFactory
;
25 use MediaWiki\MediaWikiServices
;
26 use MediaWiki\WikiMap\WikiMap
;
27 use Wikimedia\RequestTimeout\TimeoutException
;
28 use Wikimedia\UUID\GlobalIdGenerator
;
31 * Class to handle enqueueing and running of background jobs
33 * See [the architecture doc](@ref jobqueuearch) for more information.
39 abstract class JobQueue
{
40 /** @var string DB domain ID */
42 /** @var string Job type */
44 /** @var string Job priority for pop() */
46 /** @var int Time to live in seconds */
48 /** @var int Maximum number of times to try a job */
50 /** @var string|false Read only rationale (or false if r/w) */
51 protected $readOnlyReason;
52 /** @var StatsdDataFactoryInterface */
54 /** @var GlobalIdGenerator */
55 protected $idGenerator;
57 /** @var WANObjectCache */
61 protected $typeAgnostic;
63 private JobFactory
$jobFactory;
65 protected const QOS_ATOMIC
= 1; // integer; "all-or-nothing" job insertions
67 protected const ROOTJOB_TTL
= 2419200; // integer; seconds to remember root jobs (28 days)
72 * @param array $params
73 * - type : A job type, 'default' if typeAgnostic is set
74 * - domain : A DB domain ID
75 * - idGenerator : A GlobalIdGenerator instance.
76 * - wanCache : An instance of WANObjectCache to use for caching [default: none]
77 * - stats : An instance of StatsdDataFactoryInterface [default: none]
78 * - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever]
79 * - maxTries : Total times a job can be tried, assuming claims expire [default: 3]
80 * - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
81 * - readOnlyReason : Mark the queue as read-only with this reason [default: false]
82 * - typeAgnostic : If the jobqueue should operate agnostic to the job types
83 * @throws JobQueueError
86 protected function __construct( array $params ) {
87 $this->domain
= $params['domain'] ??
$params['wiki']; // b/c
88 $this->type
= $params['type'];
89 $this->claimTTL
= $params['claimTTL'] ??
0;
90 $this->maxTries
= $params['maxTries'] ??
3;
91 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
92 $this->order
= $params['order'];
94 $this->order
= $this->optimalOrder();
96 if ( !in_array( $this->order
, $this->supportedOrders() ) ) {
97 throw new JobQueueError( __CLASS__
. " does not support '{$this->order}' order." );
99 $this->readOnlyReason
= $params['readOnlyReason'] ??
false;
100 $this->stats
= $params['stats'] ??
new NullStatsdDataFactory();
101 $this->wanCache
= $params['wanCache'] ?? WANObjectCache
::newEmpty();
102 $this->idGenerator
= $params['idGenerator'];
103 if ( ( $params['typeAgnostic'] ??
false ) && !$this->supportsTypeAgnostic() ) {
104 throw new JobQueueError( __CLASS__
. " does not support type agnostic queues." );
106 $this->typeAgnostic
= ( $params['typeAgnostic'] ??
false );
107 if ( $this->typeAgnostic
) {
108 $this->type
= 'default';
111 $this->jobFactory
= MediaWikiServices
::getInstance()->getJobFactory();
115 * Get a job queue object of the specified type.
117 * - class : What job class to use (determines job type)
118 * - domain : Database domain ID of the wiki the jobs are for (defaults to current wiki)
119 * - type : The name of the job types this queue handles
120 * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
121 * If "fifo" is used, the queue will effectively be FIFO. Note that job
122 * completion will not appear to be exactly FIFO if there are multiple
123 * job runners since jobs can take different times to finish once popped.
124 * If "timestamp" is used, the queue will at least be loosely ordered
125 * by timestamp, allowing for some jobs to be popped off out of order.
126 * If "random" is used, pop() will pick jobs in random order.
127 * Note that it may only be weakly random (e.g. a lottery of the oldest X).
128 * If "any" is chosen, the queue will use whatever order is the fastest.
129 * This might be useful for improving concurrency for job acquisition.
130 * - claimTTL : If supported, the queue will recycle jobs that have been popped
131 * but not acknowledged as completed after this many seconds. Recycling
132 * of jobs simply means re-inserting them into the queue. Jobs can be
133 * attempted up to three times before being discarded.
134 * - readOnlyReason : Set this to a string to make the queue read-only. [optional]
135 * - idGenerator : A GlobalIdGenerator instance.
136 * - stats : A StatsdDataFactoryInterface. [optional]
138 * Queue classes should throw an exception if they do not support the options given.
140 * @param array $params
142 * @throws JobQueueError
144 final public static function factory( array $params ) {
145 $class = $params['class'];
146 if ( !class_exists( $class ) ) {
147 throw new JobQueueError( "Invalid job queue class '$class'." );
150 $obj = new $class( $params );
151 if ( !( $obj instanceof self
) ) {
152 throw new JobQueueError( "Class '$class' is not a " . __CLASS__
. " class." );
159 * @return string Database domain ID
161 final public function getDomain() {
162 return $this->domain
;
166 * @return string Wiki ID
167 * @deprecated since 1.33 (hard deprecated since 1.37)
169 final public function getWiki() {
170 wfDeprecated( __METHOD__
, '1.33' );
171 return WikiMap
::getWikiIdFromDbDomain( $this->domain
);
175 * @return string Job type that this queue handles
177 final public function getType() {
182 * @return string One of (random, timestamp, fifo, undefined)
184 final public function getOrder() {
189 * Get the allowed queue orders for configuration validation
191 * @return array Subset of (random, timestamp, fifo, undefined)
193 abstract protected function supportedOrders();
196 * Get the default queue order to use if configuration does not specify one
198 * @return string One of (random, timestamp, fifo, undefined)
200 abstract protected function optimalOrder();
203 * Find out if delayed jobs are supported for configuration validation
205 * @stable to override
206 * @return bool Whether delayed jobs are supported
208 protected function supportsDelayedJobs() {
209 return false; // not implemented
213 * @return bool Whether delayed jobs are enabled
216 final public function delayedJobsEnabled() {
217 return $this->supportsDelayedJobs();
221 * @return string|false Read-only rational or false if r/w
224 public function getReadOnlyReason() {
225 return $this->readOnlyReason
;
229 * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
230 * Queue classes should use caching if they are any slower without memcached.
232 * If caching is used, this might return false when there are actually no jobs.
233 * If pop() is called and returns false then it should correct the cache. Also,
234 * calling flushCaches() first prevents this. However, this effect is typically
235 * not distinguishable from the race condition between isEmpty() and pop().
238 * @throws JobQueueError
240 final public function isEmpty() {
241 $res = $this->doIsEmpty();
247 * @see JobQueue::isEmpty()
250 abstract protected function doIsEmpty();
253 * Get the number of available (unacquired, non-delayed) jobs in the queue.
254 * Queue classes should use caching if they are any slower without memcached.
256 * If caching is used, this number might be out of date for a minute.
259 * @throws JobQueueError
261 final public function getSize() {
262 $res = $this->doGetSize();
268 * @see JobQueue::getSize()
271 abstract protected function doGetSize();
274 * Get the number of acquired jobs (these are temporarily out of the queue).
275 * Queue classes should use caching if they are any slower without memcached.
277 * If caching is used, this number might be out of date for a minute.
280 * @throws JobQueueError
282 final public function getAcquiredCount() {
283 $res = $this->doGetAcquiredCount();
289 * @see JobQueue::getAcquiredCount()
292 abstract protected function doGetAcquiredCount();
295 * Get the number of delayed jobs (these are temporarily out of the queue).
296 * Queue classes should use caching if they are any slower without memcached.
298 * If caching is used, this number might be out of date for a minute.
301 * @throws JobQueueError
304 final public function getDelayedCount() {
305 $res = $this->doGetDelayedCount();
311 * @stable to override
312 * @see JobQueue::getDelayedCount()
315 protected function doGetDelayedCount() {
316 return 0; // not implemented
320 * Get the number of acquired jobs that can no longer be attempted.
321 * Queue classes should use caching if they are any slower without memcached.
323 * If caching is used, this number might be out of date for a minute.
326 * @throws JobQueueError
328 final public function getAbandonedCount() {
329 $res = $this->doGetAbandonedCount();
335 * @stable to override
336 * @see JobQueue::getAbandonedCount()
339 protected function doGetAbandonedCount() {
340 return 0; // not implemented
344 * Push one or more jobs into the queue.
345 * This does not require $wgJobClasses to be set for the given job type.
346 * Outside callers should use JobQueueGroup::push() instead of this function.
348 * @param IJobSpecification|IJobSpecification[] $jobs
349 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
351 * @throws JobQueueError
353 final public function push( $jobs, $flags = 0 ) {
354 $jobs = is_array( $jobs ) ?
$jobs : [ $jobs ];
355 $this->batchPush( $jobs, $flags );
359 * Push a batch of jobs into the queue.
360 * This does not require $wgJobClasses to be set for the given job type.
361 * Outside callers should use JobQueueGroup::push() instead of this function.
363 * @param IJobSpecification[] $jobs
364 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
366 * @throws JobQueueError
368 final public function batchPush( array $jobs, $flags = 0 ) {
369 $this->assertNotReadOnly();
371 if ( $jobs === [] ) {
372 return; // nothing to do
375 foreach ( $jobs as $job ) {
376 $this->assertMatchingJobType( $job );
377 if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
378 throw new JobQueueError(
379 "Got delayed '{$job->getType()}' job; delays are not supported." );
383 $this->doBatchPush( $jobs, $flags );
385 foreach ( $jobs as $job ) {
386 if ( $job->isRootJob() ) {
387 $this->deduplicateRootJob( $job );
393 * @see JobQueue::batchPush()
394 * @param IJobSpecification[] $jobs
397 abstract protected function doBatchPush( array $jobs, $flags );
400 * Pop a job off of the queue.
401 * This requires $wgJobClasses to be set for the given job type.
402 * Outside callers should use JobQueueGroup::pop() instead of this function.
404 * @throws JobQueueError
405 * @return RunnableJob|false Returns false if there are no jobs
407 final public function pop() {
408 $this->assertNotReadOnly();
410 $job = $this->doPop();
412 // Flag this job as an old duplicate based on its "root" job...
414 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
415 $this->incrStats( 'dupe_pops', $job->getType() );
416 $job = DuplicateJob
::newFromJob( $job ); // convert to a no-op
418 } catch ( TimeoutException
$e ) {
420 } catch ( Exception
$e ) {
421 // don't lose jobs over this
428 * @see JobQueue::pop()
429 * @return RunnableJob|false
431 abstract protected function doPop();
434 * Acknowledge that a job was completed.
436 * This does nothing for certain queue classes or if "claimTTL" is not set.
437 * Outside callers should use JobQueueGroup::ack() instead of this function.
439 * @param RunnableJob $job
441 * @throws JobQueueError
443 final public function ack( RunnableJob
$job ) {
444 $this->assertNotReadOnly();
445 $this->assertMatchingJobType( $job );
447 $this->doAck( $job );
451 * @see JobQueue::ack()
452 * @param RunnableJob $job
454 abstract protected function doAck( RunnableJob
$job );
457 * Register the "root job" of a given job into the queue for de-duplication.
458 * This should only be called right *after* all the new jobs have been inserted.
459 * This is used to turn older, duplicate, job entries into no-ops. The root job
460 * information will remain in the registry until it simply falls out of cache.
462 * This requires that $job has two special fields in the "params" array:
463 * - rootJobSignature : hash (e.g. SHA1) that identifies the task
464 * - rootJobTimestamp : TS_MW timestamp of this instance of the task
466 * A "root job" is a conceptual job that consist of potentially many smaller jobs
467 * that are actually inserted into the queue. For example, "refreshLinks" jobs are
468 * spawned when a template is edited. One can think of the task as "update links
469 * of pages that use template X" and an instance of that task as a "root job".
470 * However, what actually goes into the queue are range and leaf job subtypes.
471 * Since these jobs include things like page ID ranges and DB primary positions,
472 * and can morph into smaller jobs recursively, simple duplicate detection
473 * for individual jobs being identical (like that of job_sha1) is not useful.
475 * In the case of "refreshLinks", if these jobs are still in the queue when the template
476 * is edited again, we want all of these old refreshLinks jobs for that template to become
477 * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
478 * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
479 * previous "root job" for the same task of "update links of pages that use template X".
481 * This does nothing for certain queue classes.
483 * @internal For use within JobQueue only
484 * @param IJobSpecification $job
485 * @throws JobQueueError
488 final public function deduplicateRootJob( IJobSpecification
$job ) {
489 $this->assertNotReadOnly();
490 $this->assertMatchingJobType( $job );
492 return $this->doDeduplicateRootJob( $job );
496 * @stable to override
497 * @see JobQueue::deduplicateRootJob()
498 * @param IJobSpecification $job
499 * @throws JobQueueError
502 protected function doDeduplicateRootJob( IJobSpecification
$job ) {
503 $params = $job->hasRootJobParams() ?
$job->getRootJobParams() : null;
505 throw new JobQueueError( "Cannot register root job; missing parameters." );
508 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
509 // Callers should call JobQueueGroup::push() before this method so that if the
510 // insert fails, the de-duplication registration will be aborted. Having only the
511 // de-duplication registration succeed would cause jobs to become no-ops without
512 // any actual jobs that made them redundant.
513 $timestamp = $this->wanCache
->get( $key ); // last known timestamp of such a root job
514 if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
515 return true; // a newer version of this root job was enqueued
518 // Update the timestamp of the last root job started at the location...
519 return $this->wanCache
->set( $key, $params['rootJobTimestamp'], self
::ROOTJOB_TTL
);
523 * Check if the "root" job of a given job has been superseded by a newer one
525 * @param IJobSpecification $job
526 * @throws JobQueueError
529 final protected function isRootJobOldDuplicate( IJobSpecification
$job ) {
530 $this->assertMatchingJobType( $job );
532 return $this->doIsRootJobOldDuplicate( $job );
536 * @stable to override
537 * @see JobQueue::isRootJobOldDuplicate()
538 * @param IJobSpecification $job
541 protected function doIsRootJobOldDuplicate( IJobSpecification
$job ) {
542 $params = $job->hasRootJobParams() ?
$job->getRootJobParams() : null;
544 return false; // job has no de-duplication info
547 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
548 // Get the last time this root job was enqueued
549 $timestamp = $this->wanCache
->get( $key );
550 if ( $timestamp === false ||
$params['rootJobTimestamp'] > $timestamp ) {
551 // Update the timestamp of the last known root job started at the location...
552 $this->wanCache
->set( $key, $params['rootJobTimestamp'], self
::ROOTJOB_TTL
);
555 // Check if a new root job was started at the location after this one's...
556 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
560 * @param string $signature Hash identifier of the root job
561 * @param string $type job type
564 protected function getRootJobCacheKey( $signature, $type ) {
565 return $this->wanCache
->makeGlobalKey(
575 * Delete all unclaimed and delayed jobs from the queue
577 * @throws JobQueueError
581 final public function delete() {
582 $this->assertNotReadOnly();
588 * @stable to override
589 * @see JobQueue::delete()
590 * @throws JobQueueError
592 protected function doDelete() {
593 throw new JobQueueError( "This method is not implemented." );
597 * Wait for any replica DBs or backup servers to catch up.
599 * This does nothing for certain queue classes.
602 * @throws JobQueueError
604 final public function waitForBackups() {
605 $this->doWaitForBackups();
609 * @stable to override
610 * @see JobQueue::waitForBackups()
613 protected function doWaitForBackups() {
617 * Clear any process and persistent caches
621 final public function flushCaches() {
622 $this->doFlushCaches();
626 * @stable to override
627 * @see JobQueue::flushCaches()
630 protected function doFlushCaches() {
634 * Get an iterator to traverse over all available jobs in this queue.
635 * This does not include jobs that are currently acquired or delayed.
636 * Note: results may be stale if the queue is concurrently modified.
638 * @return Iterator<RunnableJob>
639 * @throws JobQueueError
641 abstract public function getAllQueuedJobs();
644 * Get an iterator to traverse over all delayed jobs in this queue.
645 * Note: results may be stale if the queue is concurrently modified.
647 * @stable to override
648 * @return Iterator<RunnableJob>
649 * @throws JobQueueError
652 public function getAllDelayedJobs() {
653 return new ArrayIterator( [] ); // not implemented
657 * Get an iterator to traverse over all claimed jobs in this queue
659 * Callers should be quick to iterator over it or few results
660 * will be returned due to jobs being acknowledged and deleted
662 * @stable to override
663 * @return Iterator<RunnableJob>
664 * @throws JobQueueError
667 public function getAllAcquiredJobs() {
668 return new ArrayIterator( [] ); // not implemented
672 * Get an iterator to traverse over all abandoned jobs in this queue
674 * @stable to override
675 * @return Iterator<RunnableJob>
676 * @throws JobQueueError
679 public function getAllAbandonedJobs() {
680 return new ArrayIterator( [] ); // not implemented
684 * Do not use this function outside of JobQueue/JobQueueGroup
686 * @stable to override
687 * @return string|null
690 public function getCoalesceLocationInternal() {
695 * Check whether each of the given queues are empty.
696 * This is used for batching checks for queues stored at the same place.
698 * @param array $types List of queues types
699 * @return array|null (list of non-empty queue types) or null if unsupported
700 * @throws JobQueueError
703 final public function getSiblingQueuesWithJobs( array $types ) {
704 return $this->doGetSiblingQueuesWithJobs( $types );
708 * @stable to override
709 * @see JobQueue::getSiblingQueuesWithJobs()
710 * @param array $types List of queues types
711 * @return array|null (list of queue types) or null if unsupported
713 protected function doGetSiblingQueuesWithJobs( array $types ) {
714 return null; // not supported
718 * Check the size of each of the given queues.
719 * For queues not served by the same store as this one, 0 is returned.
720 * This is used for batching checks for queues stored at the same place.
722 * @param array $types List of queues types
723 * @return array|null (job type => whether queue is empty) or null if unsupported
724 * @throws JobQueueError
727 final public function getSiblingQueueSizes( array $types ) {
728 return $this->doGetSiblingQueueSizes( $types );
732 * @stable to override
733 * @see JobQueue::getSiblingQueuesSize()
734 * @param array $types List of queues types
735 * @return array|null (list of queue types) or null if unsupported
737 protected function doGetSiblingQueueSizes( array $types ) {
738 return null; // not supported
742 * @param string $command
743 * @param array $params
746 protected function factoryJob( $command, $params ) {
747 return $this->jobFactory
->newJob( $command, $params );
751 * @throws JobQueueReadOnlyError
753 protected function assertNotReadOnly() {
754 if ( $this->readOnlyReason
!== false ) {
755 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
760 * @param IJobSpecification $job
761 * @throws JobQueueError
763 private function assertMatchingJobType( IJobSpecification
$job ) {
764 if ( $this->typeAgnostic
) {
767 if ( $job->getType() !== $this->type
) {
768 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
773 * Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type
775 * @param string $key Event type
776 * @param string $type Job type
780 protected function incrStats( $key, $type, $delta = 1 ) {
781 $this->stats
->updateCount( "jobqueue.{$key}.all", $delta );
782 $this->stats
->updateCount( "jobqueue.{$key}.{$type}", $delta );
786 * Subclasses should set this to true if they support type agnostic queues
791 protected function supportsTypeAgnostic(): bool {