3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
21 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface
;
22 use MediaWiki\JobQueue\JobFactory
;
23 use MediaWiki\MediaWikiServices
;
24 use Wikimedia\ObjectCache\WANObjectCache
;
25 use Wikimedia\RequestTimeout\TimeoutException
;
26 use Wikimedia\Stats\NullStatsdDataFactory
;
27 use Wikimedia\UUID\GlobalIdGenerator
;
30 * @defgroup JobQueue JobQueue
33 * See [the architecture doc](@ref jobqueuearch) for more information.
37 * Base class for queueing and running background jobs from a storage backend.
39 * See [the architecture doc](@ref jobqueuearch) for more information.
45 abstract class JobQueue
{
46 /** @var string DB domain ID */
48 /** @var string Job type */
50 /** @var string Job priority for pop() */
52 /** @var int Time to live in seconds */
54 /** @var int Maximum number of times to try a job */
56 /** @var string|false Read only rationale (or false if r/w) */
57 protected $readOnlyReason;
58 /** @var StatsdDataFactoryInterface */
60 /** @var GlobalIdGenerator */
61 protected $idGenerator;
63 /** @var WANObjectCache */
67 protected $typeAgnostic;
69 private JobFactory
$jobFactory;
71 /* Bit flag for "all-or-nothing" job insertions */
72 protected const QOS_ATOMIC
= 1;
74 /* Seconds to remember root jobs (28 days) */
75 protected const ROOTJOB_TTL
= 28 * 24 * 3600;
80 * @param array $params
81 * - type : A job type, 'default' if typeAgnostic is set
82 * - domain : A DB domain ID
83 * - idGenerator : A GlobalIdGenerator instance.
84 * - wanCache : An instance of WANObjectCache to use for caching [default: none]
85 * - stats : An instance of StatsdDataFactoryInterface [default: none]
86 * - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever]
87 * - maxTries : Total times a job can be tried, assuming claims expire [default: 3]
88 * - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
89 * - readOnlyReason : Mark the queue as read-only with this reason [default: false]
90 * - typeAgnostic : If the jobqueue should operate agnostic to the job types
91 * @throws JobQueueError
94 protected function __construct( array $params ) {
95 $this->domain
= $params['domain'] ??
$params['wiki']; // b/c
96 $this->type
= $params['type'];
97 $this->claimTTL
= $params['claimTTL'] ??
0;
98 $this->maxTries
= $params['maxTries'] ??
3;
99 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
100 $this->order
= $params['order'];
102 $this->order
= $this->optimalOrder();
104 if ( !in_array( $this->order
, $this->supportedOrders() ) ) {
105 throw new JobQueueError( __CLASS__
. " does not support '{$this->order}' order." );
107 $this->readOnlyReason
= $params['readOnlyReason'] ??
false;
108 $this->stats
= $params['stats'] ??
new NullStatsdDataFactory();
109 $this->wanCache
= $params['wanCache'] ?? WANObjectCache
::newEmpty();
110 $this->idGenerator
= $params['idGenerator'];
111 if ( ( $params['typeAgnostic'] ??
false ) && !$this->supportsTypeAgnostic() ) {
112 throw new JobQueueError( __CLASS__
. " does not support type agnostic queues." );
114 $this->typeAgnostic
= ( $params['typeAgnostic'] ??
false );
115 if ( $this->typeAgnostic
) {
116 $this->type
= 'default';
119 $this->jobFactory
= MediaWikiServices
::getInstance()->getJobFactory();
123 * Get a job queue object of the specified type.
125 * - class : What job class to use (determines job type)
126 * - domain : Database domain ID of the wiki the jobs are for (defaults to current wiki)
127 * - type : The name of the job types this queue handles
128 * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
129 * If "fifo" is used, the queue will effectively be FIFO. Note that job
130 * completion will not appear to be exactly FIFO if there are multiple
131 * job runners since jobs can take different times to finish once popped.
132 * If "timestamp" is used, the queue will at least be loosely ordered
133 * by timestamp, allowing for some jobs to be popped off out of order.
134 * If "random" is used, pop() will pick jobs in random order.
135 * Note that it may only be weakly random (e.g. a lottery of the oldest X).
136 * If "any" is chosen, the queue will use whatever order is the fastest.
137 * This might be useful for improving concurrency for job acquisition.
138 * - claimTTL : If supported, the queue will recycle jobs that have been popped
139 * but not acknowledged as completed after this many seconds. Recycling
140 * of jobs simply means re-inserting them into the queue. Jobs can be
141 * attempted up to three times before being discarded.
142 * - readOnlyReason : Set this to a string to make the queue read-only. [optional]
143 * - idGenerator : A GlobalIdGenerator instance.
144 * - stats : A StatsdDataFactoryInterface. [optional]
146 * Queue classes should throw an exception if they do not support the options given.
148 * @param array $params
150 * @throws JobQueueError
152 final public static function factory( array $params ) {
153 $class = $params['class'];
154 if ( !class_exists( $class ) ) {
155 throw new JobQueueError( "Invalid job queue class '$class'." );
158 $obj = new $class( $params );
159 if ( !( $obj instanceof self
) ) {
160 throw new JobQueueError( "Class '$class' is not a " . __CLASS__
. " class." );
167 * @return string Database domain ID
169 final public function getDomain() {
170 return $this->domain
;
174 * @return string Job type that this queue handles
176 final public function getType() {
181 * @return string One of (random, timestamp, fifo, undefined)
183 final public function getOrder() {
188 * Get the allowed queue orders for configuration validation
190 * @return array Subset of (random, timestamp, fifo, undefined)
192 abstract protected function supportedOrders();
195 * Get the default queue order to use if configuration does not specify one
197 * @return string One of (random, timestamp, fifo, undefined)
199 abstract protected function optimalOrder();
202 * Find out if delayed jobs are supported for configuration validation
204 * @stable to override
205 * @return bool Whether delayed jobs are supported
207 protected function supportsDelayedJobs() {
208 return false; // not implemented
212 * @return bool Whether delayed jobs are enabled
215 final public function delayedJobsEnabled() {
216 return $this->supportsDelayedJobs();
220 * @return string|false Read-only rational or false if r/w
223 public function getReadOnlyReason() {
224 return $this->readOnlyReason
;
228 * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
229 * Queue classes should use caching if they are any slower without memcached.
231 * If caching is used, this might return false when there are actually no jobs.
232 * If pop() is called and returns false then it should correct the cache. Also,
233 * calling flushCaches() first prevents this. However, this effect is typically
234 * not distinguishable from the race condition between isEmpty() and pop().
237 * @throws JobQueueError
239 final public function isEmpty() {
240 $res = $this->doIsEmpty();
246 * @see JobQueue::isEmpty()
249 abstract protected function doIsEmpty();
252 * Get the number of available (unacquired, non-delayed) jobs in the queue.
253 * Queue classes should use caching if they are any slower without memcached.
255 * If caching is used, this number might be out of date for a minute.
258 * @throws JobQueueError
260 final public function getSize() {
261 $res = $this->doGetSize();
267 * @see JobQueue::getSize()
270 abstract protected function doGetSize();
273 * Get the number of acquired jobs (these are temporarily out of the queue).
274 * Queue classes should use caching if they are any slower without memcached.
276 * If caching is used, this number might be out of date for a minute.
279 * @throws JobQueueError
281 final public function getAcquiredCount() {
282 $res = $this->doGetAcquiredCount();
288 * @see JobQueue::getAcquiredCount()
291 abstract protected function doGetAcquiredCount();
294 * Get the number of delayed jobs (these are temporarily out of the queue).
295 * Queue classes should use caching if they are any slower without memcached.
297 * If caching is used, this number might be out of date for a minute.
300 * @throws JobQueueError
303 final public function getDelayedCount() {
304 $res = $this->doGetDelayedCount();
310 * @stable to override
311 * @see JobQueue::getDelayedCount()
314 protected function doGetDelayedCount() {
315 return 0; // not implemented
319 * Get the number of acquired jobs that can no longer be attempted.
320 * Queue classes should use caching if they are any slower without memcached.
322 * If caching is used, this number might be out of date for a minute.
325 * @throws JobQueueError
327 final public function getAbandonedCount() {
328 $res = $this->doGetAbandonedCount();
334 * @stable to override
335 * @see JobQueue::getAbandonedCount()
338 protected function doGetAbandonedCount() {
339 return 0; // not implemented
343 * Push one or more jobs into the queue.
344 * This does not require $wgJobClasses to be set for the given job type.
345 * Outside callers should use JobQueueGroup::push() instead of this function.
347 * @param IJobSpecification|IJobSpecification[] $jobs
348 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
350 * @throws JobQueueError
352 final public function push( $jobs, $flags = 0 ) {
353 $jobs = is_array( $jobs ) ?
$jobs : [ $jobs ];
354 $this->batchPush( $jobs, $flags );
358 * Push a batch of jobs into the queue.
359 * This does not require $wgJobClasses to be set for the given job type.
360 * Outside callers should use JobQueueGroup::push() instead of this function.
362 * @param IJobSpecification[] $jobs
363 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
365 * @throws JobQueueError
367 final public function batchPush( array $jobs, $flags = 0 ) {
368 $this->assertNotReadOnly();
370 if ( $jobs === [] ) {
371 return; // nothing to do
374 foreach ( $jobs as $job ) {
375 $this->assertMatchingJobType( $job );
376 if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
377 throw new JobQueueError(
378 "Got delayed '{$job->getType()}' job; delays are not supported." );
382 $this->doBatchPush( $jobs, $flags );
384 foreach ( $jobs as $job ) {
385 if ( $job->isRootJob() ) {
386 $this->deduplicateRootJob( $job );
392 * @see JobQueue::batchPush()
393 * @param IJobSpecification[] $jobs
396 abstract protected function doBatchPush( array $jobs, $flags );
399 * Pop a job off of the queue.
400 * This requires $wgJobClasses to be set for the given job type.
401 * Outside callers should use JobQueueGroup::pop() instead of this function.
403 * @throws JobQueueError
404 * @return RunnableJob|false Returns false if there are no jobs
406 final public function pop() {
407 $this->assertNotReadOnly();
409 $job = $this->doPop();
411 // Flag this job as an old duplicate based on its "root" job...
413 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
414 $this->incrStats( 'dupe_pops', $job->getType() );
415 $job = DuplicateJob
::newFromJob( $job ); // convert to a no-op
417 } catch ( TimeoutException
$e ) {
419 } catch ( Exception
$e ) {
420 // don't lose jobs over this
427 * @see JobQueue::pop()
428 * @return RunnableJob|false
430 abstract protected function doPop();
433 * Acknowledge that a job was completed.
435 * This does nothing for certain queue classes or if "claimTTL" is not set.
436 * Outside callers should use JobQueueGroup::ack() instead of this function.
438 * @param RunnableJob $job
440 * @throws JobQueueError
442 final public function ack( RunnableJob
$job ) {
443 $this->assertNotReadOnly();
444 $this->assertMatchingJobType( $job );
446 $this->doAck( $job );
450 * @see JobQueue::ack()
451 * @param RunnableJob $job
453 abstract protected function doAck( RunnableJob
$job );
456 * Register the "root job" of a given job into the queue for de-duplication.
457 * This should only be called right *after* all the new jobs have been inserted.
458 * This is used to turn older, duplicate, job entries into no-ops. The root job
459 * information will remain in the registry until it simply falls out of cache.
461 * This requires that $job has two special fields in the "params" array:
462 * - rootJobSignature : hash (e.g. SHA1) that identifies the task
463 * - rootJobTimestamp : TS_MW timestamp of this instance of the task
465 * A "root job" is a conceptual job that consist of potentially many smaller jobs
466 * that are actually inserted into the queue. For example, "refreshLinks" jobs are
467 * spawned when a template is edited. One can think of the task as "update links
468 * of pages that use template X" and an instance of that task as a "root job".
469 * However, what actually goes into the queue are range and leaf job subtypes.
470 * Since these jobs include things like page ID ranges and DB primary positions,
471 * and can morph into smaller jobs recursively, simple duplicate detection
472 * for individual jobs being identical (like that of job_sha1) is not useful.
474 * In the case of "refreshLinks", if these jobs are still in the queue when the template
475 * is edited again, we want all of these old refreshLinks jobs for that template to become
476 * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
477 * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
478 * previous "root job" for the same task of "update links of pages that use template X".
480 * This does nothing for certain queue classes.
482 * @internal For use within JobQueue only
483 * @param IJobSpecification $job
484 * @throws JobQueueError
487 final public function deduplicateRootJob( IJobSpecification
$job ) {
488 $this->assertNotReadOnly();
489 $this->assertMatchingJobType( $job );
491 return $this->doDeduplicateRootJob( $job );
495 * @stable to override
496 * @see JobQueue::deduplicateRootJob()
497 * @param IJobSpecification $job
498 * @throws JobQueueError
501 protected function doDeduplicateRootJob( IJobSpecification
$job ) {
502 $params = $job->hasRootJobParams() ?
$job->getRootJobParams() : null;
504 throw new JobQueueError( "Cannot register root job; missing parameters." );
507 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
508 // Callers should call JobQueueGroup::push() before this method so that if the
509 // insert fails, the de-duplication registration will be aborted. Having only the
510 // de-duplication registration succeed would cause jobs to become no-ops without
511 // any actual jobs that made them redundant.
512 $timestamp = $this->wanCache
->get( $key ); // last known timestamp of such a root job
513 if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
514 return true; // a newer version of this root job was enqueued
517 // Update the timestamp of the last root job started at the location...
518 return $this->wanCache
->set( $key, $params['rootJobTimestamp'], self
::ROOTJOB_TTL
);
522 * Check if the "root" job of a given job has been superseded by a newer one
524 * @param IJobSpecification $job
525 * @throws JobQueueError
528 final protected function isRootJobOldDuplicate( IJobSpecification
$job ) {
529 $this->assertMatchingJobType( $job );
531 return $this->doIsRootJobOldDuplicate( $job );
535 * @stable to override
536 * @see JobQueue::isRootJobOldDuplicate()
537 * @param IJobSpecification $job
540 protected function doIsRootJobOldDuplicate( IJobSpecification
$job ) {
541 $params = $job->hasRootJobParams() ?
$job->getRootJobParams() : null;
543 return false; // job has no de-duplication info
546 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
547 // Get the last time this root job was enqueued
548 $timestamp = $this->wanCache
->get( $key );
549 if ( $timestamp === false ||
$params['rootJobTimestamp'] > $timestamp ) {
550 // Update the timestamp of the last known root job started at the location...
551 $this->wanCache
->set( $key, $params['rootJobTimestamp'], self
::ROOTJOB_TTL
);
554 // Check if a new root job was started at the location after this one's...
555 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
559 * @param string $signature Hash identifier of the root job
560 * @param string $type job type
563 protected function getRootJobCacheKey( $signature, $type ) {
564 return $this->wanCache
->makeGlobalKey(
574 * Delete all unclaimed and delayed jobs from the queue
576 * @throws JobQueueError
580 final public function delete() {
581 $this->assertNotReadOnly();
587 * @stable to override
588 * @see JobQueue::delete()
589 * @throws JobQueueError
591 protected function doDelete() {
592 throw new JobQueueError( "This method is not implemented." );
596 * Wait for any replica DBs or backup servers to catch up.
598 * This does nothing for certain queue classes.
601 * @throws JobQueueError
603 final public function waitForBackups() {
604 $this->doWaitForBackups();
608 * @stable to override
609 * @see JobQueue::waitForBackups()
612 protected function doWaitForBackups() {
616 * Clear any process and persistent caches
620 final public function flushCaches() {
621 $this->doFlushCaches();
625 * @stable to override
626 * @see JobQueue::flushCaches()
629 protected function doFlushCaches() {
633 * Get an iterator to traverse over all available jobs in this queue.
634 * This does not include jobs that are currently acquired or delayed.
635 * Note: results may be stale if the queue is concurrently modified.
637 * @return Iterator<RunnableJob>
638 * @throws JobQueueError
640 abstract public function getAllQueuedJobs();
643 * Get an iterator to traverse over all delayed jobs in this queue.
644 * Note: results may be stale if the queue is concurrently modified.
646 * @stable to override
647 * @return Iterator<RunnableJob>
648 * @throws JobQueueError
651 public function getAllDelayedJobs() {
652 return new ArrayIterator( [] ); // not implemented
656 * Get an iterator to traverse over all claimed jobs in this queue
658 * Callers should be quick to iterator over it or few results
659 * will be returned due to jobs being acknowledged and deleted
661 * @stable to override
662 * @return Iterator<RunnableJob>
663 * @throws JobQueueError
666 public function getAllAcquiredJobs() {
667 return new ArrayIterator( [] ); // not implemented
671 * Get an iterator to traverse over all abandoned jobs in this queue
673 * @stable to override
674 * @return Iterator<RunnableJob>
675 * @throws JobQueueError
678 public function getAllAbandonedJobs() {
679 return new ArrayIterator( [] ); // not implemented
683 * Do not use this function outside of JobQueue/JobQueueGroup
685 * @stable to override
686 * @return string|null
689 public function getCoalesceLocationInternal() {
694 * Check whether each of the given queues are empty.
695 * This is used for batching checks for queues stored at the same place.
697 * @param array $types List of queues types
698 * @return array|null (list of non-empty queue types) or null if unsupported
699 * @throws JobQueueError
702 final public function getSiblingQueuesWithJobs( array $types ) {
703 return $this->doGetSiblingQueuesWithJobs( $types );
707 * @stable to override
708 * @see JobQueue::getSiblingQueuesWithJobs()
709 * @param array $types List of queues types
710 * @return array|null (list of queue types) or null if unsupported
712 protected function doGetSiblingQueuesWithJobs( array $types ) {
713 return null; // not supported
717 * Check the size of each of the given queues.
718 * For queues not served by the same store as this one, 0 is returned.
719 * This is used for batching checks for queues stored at the same place.
721 * @param array $types List of queues types
722 * @return array|null (job type => whether queue is empty) or null if unsupported
723 * @throws JobQueueError
726 final public function getSiblingQueueSizes( array $types ) {
727 return $this->doGetSiblingQueueSizes( $types );
731 * @stable to override
732 * @see JobQueue::getSiblingQueuesSize()
733 * @param array $types List of queues types
734 * @return array|null (list of queue types) or null if unsupported
736 protected function doGetSiblingQueueSizes( array $types ) {
737 return null; // not supported
741 * @param string $command
742 * @param array $params
745 protected function factoryJob( $command, $params ) {
746 return $this->jobFactory
->newJob( $command, $params );
750 * @throws JobQueueReadOnlyError
752 protected function assertNotReadOnly() {
753 if ( $this->readOnlyReason
!== false ) {
754 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
759 * @param IJobSpecification $job
760 * @throws JobQueueError
762 private function assertMatchingJobType( IJobSpecification
$job ) {
763 if ( $this->typeAgnostic
) {
766 if ( $job->getType() !== $this->type
) {
767 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
772 * Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type
774 * @param string $key Event type
775 * @param string $type Job type
779 protected function incrStats( $key, $type, $delta = 1 ) {
780 $this->stats
->updateCount( "jobqueue.{$key}.all", $delta );
781 $this->stats
->updateCount( "jobqueue.{$key}.{$type}", $delta );
785 * Subclasses should set this to true if they support type agnostic queues
790 protected function supportsTypeAgnostic(): bool {