ParsoidParser: Record ParserOptions watcher on ParserOutput object
[mediawiki.git] / includes / jobqueue / JobQueue.php
blob7acc10858e8d629f1a7764cb59613b83af243312
1 <?php
2 /**
3 * Job queue base code.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
20 * @file
21 * @defgroup JobQueue JobQueue
23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
24 use MediaWiki\JobQueue\JobFactory;
25 use MediaWiki\MediaWikiServices;
26 use MediaWiki\WikiMap\WikiMap;
27 use Wikimedia\RequestTimeout\TimeoutException;
28 use Wikimedia\UUID\GlobalIdGenerator;
30 /**
31 * Class to handle enqueueing and running of background jobs
33 * See [the architecture doc](@ref jobqueuearch) for more information.
35 * @ingroup JobQueue
36 * @since 1.21
37 * @stable to extend
39 abstract class JobQueue {
40 /** @var string DB domain ID */
41 protected $domain;
42 /** @var string Job type */
43 protected $type;
44 /** @var string Job priority for pop() */
45 protected $order;
46 /** @var int Time to live in seconds */
47 protected $claimTTL;
48 /** @var int Maximum number of times to try a job */
49 protected $maxTries;
50 /** @var string|false Read only rationale (or false if r/w) */
51 protected $readOnlyReason;
52 /** @var StatsdDataFactoryInterface */
53 protected $stats;
54 /** @var GlobalIdGenerator */
55 protected $idGenerator;
57 /** @var WANObjectCache */
58 protected $wanCache;
60 /** @var bool */
61 protected $typeAgnostic;
63 private JobFactory $jobFactory;
65 protected const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
67 protected const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
69 /**
70 * @stable to call
72 * @param array $params
73 * - type : A job type, 'default' if typeAgnostic is set
74 * - domain : A DB domain ID
75 * - idGenerator : A GlobalIdGenerator instance.
76 * - wanCache : An instance of WANObjectCache to use for caching [default: none]
77 * - stats : An instance of StatsdDataFactoryInterface [default: none]
78 * - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever]
79 * - maxTries : Total times a job can be tried, assuming claims expire [default: 3]
80 * - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
81 * - readOnlyReason : Mark the queue as read-only with this reason [default: false]
82 * - typeAgnostic : If the jobqueue should operate agnostic to the job types
83 * @throws JobQueueError
86 protected function __construct( array $params ) {
87 $this->domain = $params['domain'] ?? $params['wiki']; // b/c
88 $this->type = $params['type'];
89 $this->claimTTL = $params['claimTTL'] ?? 0;
90 $this->maxTries = $params['maxTries'] ?? 3;
91 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
92 $this->order = $params['order'];
93 } else {
94 $this->order = $this->optimalOrder();
96 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
97 throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." );
99 $this->readOnlyReason = $params['readOnlyReason'] ?? false;
100 $this->stats = $params['stats'] ?? new NullStatsdDataFactory();
101 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
102 $this->idGenerator = $params['idGenerator'];
103 if ( ( $params['typeAgnostic'] ?? false ) && !$this->supportsTypeAgnostic() ) {
104 throw new JobQueueError( __CLASS__ . " does not support type agnostic queues." );
106 $this->typeAgnostic = ( $params['typeAgnostic'] ?? false );
107 if ( $this->typeAgnostic ) {
108 $this->type = 'default';
111 $this->jobFactory = MediaWikiServices::getInstance()->getJobFactory();
115 * Get a job queue object of the specified type.
116 * $params includes:
117 * - class : What job class to use (determines job type)
118 * - domain : Database domain ID of the wiki the jobs are for (defaults to current wiki)
119 * - type : The name of the job types this queue handles
120 * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
121 * If "fifo" is used, the queue will effectively be FIFO. Note that job
122 * completion will not appear to be exactly FIFO if there are multiple
123 * job runners since jobs can take different times to finish once popped.
124 * If "timestamp" is used, the queue will at least be loosely ordered
125 * by timestamp, allowing for some jobs to be popped off out of order.
126 * If "random" is used, pop() will pick jobs in random order.
127 * Note that it may only be weakly random (e.g. a lottery of the oldest X).
128 * If "any" is chosen, the queue will use whatever order is the fastest.
129 * This might be useful for improving concurrency for job acquisition.
130 * - claimTTL : If supported, the queue will recycle jobs that have been popped
131 * but not acknowledged as completed after this many seconds. Recycling
132 * of jobs simply means re-inserting them into the queue. Jobs can be
133 * attempted up to three times before being discarded.
134 * - readOnlyReason : Set this to a string to make the queue read-only. [optional]
135 * - idGenerator : A GlobalIdGenerator instance.
136 * - stats : A StatsdDataFactoryInterface. [optional]
138 * Queue classes should throw an exception if they do not support the options given.
140 * @param array $params
141 * @return JobQueue
142 * @throws JobQueueError
144 final public static function factory( array $params ) {
145 $class = $params['class'];
146 if ( !class_exists( $class ) ) {
147 throw new JobQueueError( "Invalid job queue class '$class'." );
150 $obj = new $class( $params );
151 if ( !( $obj instanceof self ) ) {
152 throw new JobQueueError( "Class '$class' is not a " . __CLASS__ . " class." );
155 return $obj;
159 * @return string Database domain ID
161 final public function getDomain() {
162 return $this->domain;
166 * @return string Wiki ID
167 * @deprecated since 1.33 (hard deprecated since 1.37)
169 final public function getWiki() {
170 wfDeprecated( __METHOD__, '1.33' );
171 return WikiMap::getWikiIdFromDbDomain( $this->domain );
175 * @return string Job type that this queue handles
177 final public function getType() {
178 return $this->type;
182 * @return string One of (random, timestamp, fifo, undefined)
184 final public function getOrder() {
185 return $this->order;
189 * Get the allowed queue orders for configuration validation
191 * @return array Subset of (random, timestamp, fifo, undefined)
193 abstract protected function supportedOrders();
196 * Get the default queue order to use if configuration does not specify one
198 * @return string One of (random, timestamp, fifo, undefined)
200 abstract protected function optimalOrder();
203 * Find out if delayed jobs are supported for configuration validation
205 * @stable to override
206 * @return bool Whether delayed jobs are supported
208 protected function supportsDelayedJobs() {
209 return false; // not implemented
213 * @return bool Whether delayed jobs are enabled
214 * @since 1.22
216 final public function delayedJobsEnabled() {
217 return $this->supportsDelayedJobs();
221 * @return string|false Read-only rational or false if r/w
222 * @since 1.27
224 public function getReadOnlyReason() {
225 return $this->readOnlyReason;
229 * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
230 * Queue classes should use caching if they are any slower without memcached.
232 * If caching is used, this might return false when there are actually no jobs.
233 * If pop() is called and returns false then it should correct the cache. Also,
234 * calling flushCaches() first prevents this. However, this effect is typically
235 * not distinguishable from the race condition between isEmpty() and pop().
237 * @return bool
238 * @throws JobQueueError
240 final public function isEmpty() {
241 $res = $this->doIsEmpty();
243 return $res;
247 * @see JobQueue::isEmpty()
248 * @return bool
250 abstract protected function doIsEmpty();
253 * Get the number of available (unacquired, non-delayed) jobs in the queue.
254 * Queue classes should use caching if they are any slower without memcached.
256 * If caching is used, this number might be out of date for a minute.
258 * @return int
259 * @throws JobQueueError
261 final public function getSize() {
262 $res = $this->doGetSize();
264 return $res;
268 * @see JobQueue::getSize()
269 * @return int
271 abstract protected function doGetSize();
274 * Get the number of acquired jobs (these are temporarily out of the queue).
275 * Queue classes should use caching if they are any slower without memcached.
277 * If caching is used, this number might be out of date for a minute.
279 * @return int
280 * @throws JobQueueError
282 final public function getAcquiredCount() {
283 $res = $this->doGetAcquiredCount();
285 return $res;
289 * @see JobQueue::getAcquiredCount()
290 * @return int
292 abstract protected function doGetAcquiredCount();
295 * Get the number of delayed jobs (these are temporarily out of the queue).
296 * Queue classes should use caching if they are any slower without memcached.
298 * If caching is used, this number might be out of date for a minute.
300 * @return int
301 * @throws JobQueueError
302 * @since 1.22
304 final public function getDelayedCount() {
305 $res = $this->doGetDelayedCount();
307 return $res;
311 * @stable to override
312 * @see JobQueue::getDelayedCount()
313 * @return int
315 protected function doGetDelayedCount() {
316 return 0; // not implemented
320 * Get the number of acquired jobs that can no longer be attempted.
321 * Queue classes should use caching if they are any slower without memcached.
323 * If caching is used, this number might be out of date for a minute.
325 * @return int
326 * @throws JobQueueError
328 final public function getAbandonedCount() {
329 $res = $this->doGetAbandonedCount();
331 return $res;
335 * @stable to override
336 * @see JobQueue::getAbandonedCount()
337 * @return int
339 protected function doGetAbandonedCount() {
340 return 0; // not implemented
344 * Push one or more jobs into the queue.
345 * This does not require $wgJobClasses to be set for the given job type.
346 * Outside callers should use JobQueueGroup::push() instead of this function.
348 * @param IJobSpecification|IJobSpecification[] $jobs
349 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
350 * @return void
351 * @throws JobQueueError
353 final public function push( $jobs, $flags = 0 ) {
354 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
355 $this->batchPush( $jobs, $flags );
359 * Push a batch of jobs into the queue.
360 * This does not require $wgJobClasses to be set for the given job type.
361 * Outside callers should use JobQueueGroup::push() instead of this function.
363 * @param IJobSpecification[] $jobs
364 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
365 * @return void
366 * @throws JobQueueError
368 final public function batchPush( array $jobs, $flags = 0 ) {
369 $this->assertNotReadOnly();
371 if ( $jobs === [] ) {
372 return; // nothing to do
375 foreach ( $jobs as $job ) {
376 $this->assertMatchingJobType( $job );
377 if ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
378 throw new JobQueueError(
379 "Got delayed '{$job->getType()}' job; delays are not supported." );
383 $this->doBatchPush( $jobs, $flags );
385 foreach ( $jobs as $job ) {
386 if ( $job->isRootJob() ) {
387 $this->deduplicateRootJob( $job );
393 * @see JobQueue::batchPush()
394 * @param IJobSpecification[] $jobs
395 * @param int $flags
397 abstract protected function doBatchPush( array $jobs, $flags );
400 * Pop a job off of the queue.
401 * This requires $wgJobClasses to be set for the given job type.
402 * Outside callers should use JobQueueGroup::pop() instead of this function.
404 * @throws JobQueueError
405 * @return RunnableJob|false Returns false if there are no jobs
407 final public function pop() {
408 $this->assertNotReadOnly();
410 $job = $this->doPop();
412 // Flag this job as an old duplicate based on its "root" job...
413 try {
414 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
415 $this->incrStats( 'dupe_pops', $job->getType() );
416 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
418 } catch ( TimeoutException $e ) {
419 throw $e;
420 } catch ( Exception $e ) {
421 // don't lose jobs over this
424 return $job;
428 * @see JobQueue::pop()
429 * @return RunnableJob|false
431 abstract protected function doPop();
434 * Acknowledge that a job was completed.
436 * This does nothing for certain queue classes or if "claimTTL" is not set.
437 * Outside callers should use JobQueueGroup::ack() instead of this function.
439 * @param RunnableJob $job
440 * @return void
441 * @throws JobQueueError
443 final public function ack( RunnableJob $job ) {
444 $this->assertNotReadOnly();
445 $this->assertMatchingJobType( $job );
447 $this->doAck( $job );
451 * @see JobQueue::ack()
452 * @param RunnableJob $job
454 abstract protected function doAck( RunnableJob $job );
457 * Register the "root job" of a given job into the queue for de-duplication.
458 * This should only be called right *after* all the new jobs have been inserted.
459 * This is used to turn older, duplicate, job entries into no-ops. The root job
460 * information will remain in the registry until it simply falls out of cache.
462 * This requires that $job has two special fields in the "params" array:
463 * - rootJobSignature : hash (e.g. SHA1) that identifies the task
464 * - rootJobTimestamp : TS_MW timestamp of this instance of the task
466 * A "root job" is a conceptual job that consist of potentially many smaller jobs
467 * that are actually inserted into the queue. For example, "refreshLinks" jobs are
468 * spawned when a template is edited. One can think of the task as "update links
469 * of pages that use template X" and an instance of that task as a "root job".
470 * However, what actually goes into the queue are range and leaf job subtypes.
471 * Since these jobs include things like page ID ranges and DB primary positions,
472 * and can morph into smaller jobs recursively, simple duplicate detection
473 * for individual jobs being identical (like that of job_sha1) is not useful.
475 * In the case of "refreshLinks", if these jobs are still in the queue when the template
476 * is edited again, we want all of these old refreshLinks jobs for that template to become
477 * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
478 * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
479 * previous "root job" for the same task of "update links of pages that use template X".
481 * This does nothing for certain queue classes.
483 * @internal For use within JobQueue only
484 * @param IJobSpecification $job
485 * @throws JobQueueError
486 * @return bool
488 final public function deduplicateRootJob( IJobSpecification $job ) {
489 $this->assertNotReadOnly();
490 $this->assertMatchingJobType( $job );
492 return $this->doDeduplicateRootJob( $job );
496 * @stable to override
497 * @see JobQueue::deduplicateRootJob()
498 * @param IJobSpecification $job
499 * @throws JobQueueError
500 * @return bool
502 protected function doDeduplicateRootJob( IJobSpecification $job ) {
503 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
504 if ( !$params ) {
505 throw new JobQueueError( "Cannot register root job; missing parameters." );
508 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
509 // Callers should call JobQueueGroup::push() before this method so that if the
510 // insert fails, the de-duplication registration will be aborted. Having only the
511 // de-duplication registration succeed would cause jobs to become no-ops without
512 // any actual jobs that made them redundant.
513 $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job
514 if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) {
515 return true; // a newer version of this root job was enqueued
518 // Update the timestamp of the last root job started at the location...
519 return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
523 * Check if the "root" job of a given job has been superseded by a newer one
525 * @param IJobSpecification $job
526 * @throws JobQueueError
527 * @return bool
529 final protected function isRootJobOldDuplicate( IJobSpecification $job ) {
530 $this->assertMatchingJobType( $job );
532 return $this->doIsRootJobOldDuplicate( $job );
536 * @stable to override
537 * @see JobQueue::isRootJobOldDuplicate()
538 * @param IJobSpecification $job
539 * @return bool
541 protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
542 $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null;
543 if ( !$params ) {
544 return false; // job has no de-duplication info
547 $key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
548 // Get the last time this root job was enqueued
549 $timestamp = $this->wanCache->get( $key );
550 if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
551 // Update the timestamp of the last known root job started at the location...
552 $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL );
555 // Check if a new root job was started at the location after this one's...
556 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
560 * @param string $signature Hash identifier of the root job
561 * @param string $type job type
562 * @return string
564 protected function getRootJobCacheKey( $signature, $type ) {
565 return $this->wanCache->makeGlobalKey(
566 'jobqueue',
567 $this->domain,
568 $type,
569 'rootjob',
570 $signature
575 * Delete all unclaimed and delayed jobs from the queue
577 * @throws JobQueueError
578 * @since 1.22
579 * @return void
581 final public function delete() {
582 $this->assertNotReadOnly();
584 $this->doDelete();
588 * @stable to override
589 * @see JobQueue::delete()
590 * @throws JobQueueError
592 protected function doDelete() {
593 throw new JobQueueError( "This method is not implemented." );
597 * Wait for any replica DBs or backup servers to catch up.
599 * This does nothing for certain queue classes.
601 * @return void
602 * @throws JobQueueError
604 final public function waitForBackups() {
605 $this->doWaitForBackups();
609 * @stable to override
610 * @see JobQueue::waitForBackups()
611 * @return void
613 protected function doWaitForBackups() {
617 * Clear any process and persistent caches
619 * @return void
621 final public function flushCaches() {
622 $this->doFlushCaches();
626 * @stable to override
627 * @see JobQueue::flushCaches()
628 * @return void
630 protected function doFlushCaches() {
634 * Get an iterator to traverse over all available jobs in this queue.
635 * This does not include jobs that are currently acquired or delayed.
636 * Note: results may be stale if the queue is concurrently modified.
638 * @return Iterator<RunnableJob>
639 * @throws JobQueueError
641 abstract public function getAllQueuedJobs();
644 * Get an iterator to traverse over all delayed jobs in this queue.
645 * Note: results may be stale if the queue is concurrently modified.
647 * @stable to override
648 * @return Iterator<RunnableJob>
649 * @throws JobQueueError
650 * @since 1.22
652 public function getAllDelayedJobs() {
653 return new ArrayIterator( [] ); // not implemented
657 * Get an iterator to traverse over all claimed jobs in this queue
659 * Callers should be quick to iterator over it or few results
660 * will be returned due to jobs being acknowledged and deleted
662 * @stable to override
663 * @return Iterator<RunnableJob>
664 * @throws JobQueueError
665 * @since 1.26
667 public function getAllAcquiredJobs() {
668 return new ArrayIterator( [] ); // not implemented
672 * Get an iterator to traverse over all abandoned jobs in this queue
674 * @stable to override
675 * @return Iterator<RunnableJob>
676 * @throws JobQueueError
677 * @since 1.25
679 public function getAllAbandonedJobs() {
680 return new ArrayIterator( [] ); // not implemented
684 * Do not use this function outside of JobQueue/JobQueueGroup
686 * @stable to override
687 * @return string|null
688 * @since 1.22
690 public function getCoalesceLocationInternal() {
691 return null;
695 * Check whether each of the given queues are empty.
696 * This is used for batching checks for queues stored at the same place.
698 * @param array $types List of queues types
699 * @return array|null (list of non-empty queue types) or null if unsupported
700 * @throws JobQueueError
701 * @since 1.22
703 final public function getSiblingQueuesWithJobs( array $types ) {
704 return $this->doGetSiblingQueuesWithJobs( $types );
708 * @stable to override
709 * @see JobQueue::getSiblingQueuesWithJobs()
710 * @param array $types List of queues types
711 * @return array|null (list of queue types) or null if unsupported
713 protected function doGetSiblingQueuesWithJobs( array $types ) {
714 return null; // not supported
718 * Check the size of each of the given queues.
719 * For queues not served by the same store as this one, 0 is returned.
720 * This is used for batching checks for queues stored at the same place.
722 * @param array $types List of queues types
723 * @return array|null (job type => whether queue is empty) or null if unsupported
724 * @throws JobQueueError
725 * @since 1.22
727 final public function getSiblingQueueSizes( array $types ) {
728 return $this->doGetSiblingQueueSizes( $types );
732 * @stable to override
733 * @see JobQueue::getSiblingQueuesSize()
734 * @param array $types List of queues types
735 * @return array|null (list of queue types) or null if unsupported
737 protected function doGetSiblingQueueSizes( array $types ) {
738 return null; // not supported
742 * @param string $command
743 * @param array $params
744 * @return Job
746 protected function factoryJob( $command, $params ) {
747 return $this->jobFactory->newJob( $command, $params );
751 * @throws JobQueueReadOnlyError
753 protected function assertNotReadOnly() {
754 if ( $this->readOnlyReason !== false ) {
755 throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
760 * @param IJobSpecification $job
761 * @throws JobQueueError
763 private function assertMatchingJobType( IJobSpecification $job ) {
764 if ( $this->typeAgnostic ) {
765 return;
767 if ( $job->getType() !== $this->type ) {
768 throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
773 * Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type
775 * @param string $key Event type
776 * @param string $type Job type
777 * @param int $delta
778 * @since 1.22
780 protected function incrStats( $key, $type, $delta = 1 ) {
781 $this->stats->updateCount( "jobqueue.{$key}.all", $delta );
782 $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
786 * Subclasses should set this to true if they support type agnostic queues
788 * @return bool
789 * @since 1.38
791 protected function supportsTypeAgnostic(): bool {
792 return false;