Update git submodules
[mediawiki.git] / includes / deferred / DeferredUpdates.php
blobb4a1b6dc8350b4f26a018d5b796d4ec70e96f95a
1 <?php
2 /**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
18 * @file
21 use MediaWiki\Logger\LoggerFactory;
22 use Wikimedia\Rdbms\IDatabase;
23 use Wikimedia\ScopedCallback;
25 /**
26 * Defer callable updates to run later in the PHP process
28 * This is a performance feature that enables MediaWiki to produce faster web responses.
29 * It allows you to postpone non-blocking work (e.g. work that does not change the web
30 * response) to after the HTTP response has been sent to the client (i.e. web browser).
32 * Once the response is finalized and sent to the browser, the webserver process stays
33 * for a little while longer (detached from the web request) to run your POSTSEND tasks.
35 * There is also a PRESEND option, which runs your task right before the finalized response
36 * is sent to the browser. This is for critical tasks that does need to block the response,
37 * but where you'd like to benefit from other DeferredUpdates features. Such as:
39 * - MergeableUpdate: batch updates from different components without coupling
40 * or awareness of each other.
41 * - Automatic cancellation: pass a IDatabase object (for any wiki or database) to
42 * DeferredUpdates::addCallableUpdate or AtomicSectionUpdate.
43 * - Reducing lock contention: if the response is likely to take several seconds
44 * (e.g. uploading a large file to FileBackend, or saving an edit to a large article)
45 * much of that work may overlap with a database transaction that is staying open for
46 * the entire duration. By moving contentious writes out to a PRESEND update, these
47 * get their own transaction (after the main one is committed), which give up some
48 * atomicity for improved throughput.
50 * ## Expectation and comparison to job queue
52 * When scheduling a POSTSEND via the DeferredUpdates system you can generally expect
53 * it to complete well before the client makes their next request. Updates runs directly after
54 * the web response is sent, from the same process on the same server. This unlike the JobQueue,
55 * where jobs may need to wait in line for some minutes or hours.
57 * If your update fails, this failure is not known to the client and gets no retry. For updates
58 * that need re-tries for system consistency or data integrity, it is recommended to implement
59 * it as a job instead and use JobQueueGroup::lazyPush. This has the caveat of being delayed
60 * by default, the same as any other job.
62 * A hybrid solution is available via the EnqueueableDataUpdate interface. By implementing
63 * this interface, you can queue your update via the DeferredUpdates first, and if it fails,
64 * the system will automatically catch this and queue it as a job instead.
66 * ## How it works during web requests
68 * 1. Your request route is executed (e.g. Action or SpecialPage class, or API).
69 * 2. Output is finalized and main database transaction is committed.
70 * 3. PRESEND updates run via DeferredUpdates::doUpdates.
71 * 5. The web response is sent to the browser.
72 * 6. POSTSEND updates run via DeferredUpdates::doUpdates.
74 * @see MediaWiki::preOutputCommit
75 * @see MediaWiki::restInPeace
77 * ## How it works for Maintenance scripts
79 * In CLI mode, no distinction is made between PRESEND and POSTSEND deferred updates,
80 * and the queue is periodically executed throughout the process.
82 * @see DeferredUpdates::tryOpportunisticExecute
84 * ## How it works internally
86 * Each update is added via DeferredUpdates::addUpdate and stored in either the PRESEND or
87 * POSTSEND queue. If an update gets queued while another update is already running, then
88 * we store in a "sub"-queue associated with the current update. This allows nested updates
89 * to be completed before other updates, which improves ordering for process caching.
91 * @since 1.19
93 class DeferredUpdates {
94 /** @var int Process all updates; in web requests, use only after flushing output buffer */
95 public const ALL = 0;
96 /** @var int Specify/process updates that should run before flushing output buffer */
97 public const PRESEND = 1;
98 /** @var int Specify/process updates that should run after flushing output buffer */
99 public const POSTSEND = 2;
101 /** @var int[] List of "defer until" queue stages that can be reached */
102 public const STAGES = [ self::PRESEND, self::POSTSEND ];
104 /** @var int Queue size threshold for converting updates into jobs */
105 private const BIG_QUEUE_SIZE = 100;
107 /** @var DeferredUpdatesScopeStack|null Queue states based on recursion level */
108 private static $scopeStack;
111 * @var int Nesting level for preventOpportunisticUpdates()
113 private static $preventOpportunisticUpdates = 0;
116 * @return DeferredUpdatesScopeStack
118 private static function getScopeStack(): DeferredUpdatesScopeStack {
119 self::$scopeStack ??= new DeferredUpdatesScopeMediaWikiStack();
120 return self::$scopeStack;
124 * @param DeferredUpdatesScopeStack $scopeStack
125 * @internal Only for use in tests.
127 public static function setScopeStack( DeferredUpdatesScopeStack $scopeStack ): void {
128 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
129 throw new LogicException( 'Cannot reconfigure DeferredUpdates outside tests' );
131 self::$scopeStack = $scopeStack;
135 * Add an update to the pending update queue for execution at the appropriate time
137 * In CLI mode, callback magic will also be used to run updates when safe
139 * If an update is already in progress, then what happens to this update is as follows:
140 * - If it has a "defer until" stage at/before the actual run stage of the innermost
141 * in-progress update, then it will go into the sub-queue of that in-progress update.
142 * As soon as that update completes, MergeableUpdate instances in its sub-queue will be
143 * merged into the top-queues and the non-MergeableUpdate instances will be executed.
144 * This is done to better isolate updates from the failures of other updates and reduce
145 * the chance of race conditions caused by updates not fully seeing the intended changes
146 * of previously enqueued and executed updates.
147 * - If it has a "defer until" stage later than the actual run stage of the innermost
148 * in-progress update, then it will go into the normal top-queue for that stage.
150 * @param DeferrableUpdate $update Some object that implements doUpdate()
151 * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
152 * @since 1.28 Added the $stage parameter
154 public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
155 self::getScopeStack()->current()->addUpdate( $update, $stage );
156 self::tryOpportunisticExecute();
160 * Add an update to the pending update queue that invokes the specified callback when run
162 * @param callable $callable
163 * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
164 * @param IDatabase|IDatabase[]|null $dbw Cancel the update if a DB transaction
165 * is rolled back [optional]
166 * @since 1.27 Added $stage parameter
167 * @since 1.28 Added the $dbw parameter
169 public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
170 self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
174 * Run an update, and, if an error was thrown, catch/log it and enqueue the update as
175 * a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate)
177 * @param DeferrableUpdate $update
178 * @return Throwable|null
180 private static function run( DeferrableUpdate $update ): ?Throwable {
181 $logger = LoggerFactory::getInstance( 'DeferredUpdates' );
183 $type = get_class( $update )
184 . ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' );
185 $updateId = spl_object_id( $update );
186 $logger->debug( __METHOD__ . ": started $type #$updateId" );
188 $updateException = null;
190 $startTime = microtime( true );
191 try {
192 self::attemptUpdate( $update );
193 } catch ( Throwable $updateException ) {
194 MWExceptionHandler::logException( $updateException );
195 $logger->error(
196 "Deferred update '{deferred_type}' failed to run.",
198 'deferred_type' => $type,
199 'exception' => $updateException,
202 self::getScopeStack()->onRunUpdateFailed( $update );
203 } finally {
204 $walltime = microtime( true ) - $startTime;
205 $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" );
208 // Try to push the update as a job so it can run later if possible
209 if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
210 try {
211 self::getScopeStack()->queueDataUpdate( $update );
212 } catch ( Throwable $jobException ) {
213 MWExceptionHandler::logException( $jobException );
214 $logger->error(
215 "Deferred update '{deferred_type}' failed to enqueue as a job.",
217 'deferred_type' => $type,
218 'exception' => $jobException,
221 self::getScopeStack()->onRunUpdateFailed( $update );
225 return $updateException;
229 * Consume and execute all pending updates
231 * Note that it is rarely the case that this method should be called outside of a few
232 * select entry points. For simplicity, that kind of recursion is discouraged. Recursion
233 * cannot happen if an explicit transaction round is active, which limits usage to updates
234 * with TRX_ROUND_ABSENT that do not leave open any transactions round of their own during
235 * the call to this method.
237 * In the less-common case of this being called within an in-progress DeferrableUpdate,
238 * this will not see any top-queue updates (since they were consumed and are being run
239 * inside an outer execution loop). In that case, it will instead operate on the sub-queue
240 * of the innermost in-progress update on the stack.
242 * @internal For use by MediaWiki, Maintenance, JobRunner, JobExecutor
243 * @param int $stage Which updates to process. One of
244 * (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
246 public static function doUpdates( $stage = self::ALL ) {
247 /** @var ErrorPageError $guiError First presentable client-level error thrown */
248 $guiError = null;
249 /** @var Throwable $exception First of any error thrown */
250 $exception = null;
252 $scope = self::getScopeStack()->current();
254 // T249069: recursion is not possible once explicit transaction rounds are involved
255 $activeUpdate = $scope->getActiveUpdate();
256 if ( $activeUpdate ) {
257 $class = get_class( $activeUpdate );
258 if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
259 throw new LogicException(
260 __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
263 if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
264 throw new LogicException(
265 __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
270 $scope->processUpdates(
271 $stage,
272 static function ( DeferrableUpdate $update, $activeStage ) use ( &$guiError, &$exception ) {
273 $scopeStack = self::getScopeStack();
274 $childScope = $scopeStack->descend( $activeStage, $update );
275 try {
276 $e = self::run( $update );
277 $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
278 $exception = $exception ?: $e;
279 // Any addUpdate() calls between descend() and ascend() used the sub-queue.
280 // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
281 // doUpdates() itself. In any case, process remaining updates in the subqueue.
282 // them, enqueueing them, or transferring them to the parent scope
283 // queues as appropriate...
284 $childScope->processUpdates(
285 $activeStage,
286 static function ( DeferrableUpdate $sub ) use ( &$guiError, &$exception ) {
287 $e = self::run( $sub );
288 $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
289 $exception = $exception ?: $e;
292 } finally {
293 $scopeStack->ascend();
298 // VW-style hack to work around T190178, so we can make sure
299 // PageMetaDataUpdater doesn't throw exceptions.
300 if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
301 throw $exception;
304 // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
305 // callers should check permissions *before* enqueueing updates. If the main transaction
306 // round actions succeed but some deferred updates fail due to permissions errors then
307 // there is a risk that some secondary data was not properly updated.
308 if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
309 throw $guiError;
314 * Consume and execute pending updates now if possible, instead of waiting.
316 * In web requests, updates are always deferred until the end of the request.
318 * In CLI mode, updates run earlier and more often. This is important for long-running
319 * Maintenance scripts that would otherwise grow an excessively large queue, which increases
320 * memory use, and risks losing all updates if the script ends early or crashes.
322 * The folllowing conditions are required for updates to run early in CLI mode:
324 * - No update is already in progress (ensure linear flow, recursion guard).
325 * - LBFactory indicates that we don't have any "busy" database connections, i.e.
326 * there are no pending writes or otherwise active and uncommitted transactions,
327 * except if the transaction is empty and merely used for primary DB read queries,
328 * in which case the transaction (and its repeatable-read snapshot) can be safely flushed.
330 * How this works:
332 * - When a maintenance script commits a change or waits for replication, such as
333 * via. IConnectionProvider::commitAndWaitForReplication, then ILBFactory calls
334 * tryOpportunisticExecute(). This is injected via MWLBFactory::applyGlobalState.
336 * - For maintenance scripts that don't do much with the database, we also call
337 * tryOpportunisticExecute() after every addUpdate() call.
339 * - Upon the completion of Maintenance::execute() via Maintenance::shutdown(),
340 * any remaining updates are run.
342 * Note that this method runs both PRESEND and POSTSEND updates and thus should not be called
343 * during web requests. It is only intended for long-running Maintenance scripts.
345 * @internal For use by Maintenance
346 * @since 1.28
347 * @return bool Whether updates were allowed to run
349 public static function tryOpportunisticExecute(): bool {
350 // Leave execution up to the current loop if an update is already in progress
351 // or if updates are explicitly disabled
352 if ( self::getRecursiveExecutionStackDepth()
353 || self::$preventOpportunisticUpdates
355 return false;
358 if ( self::getScopeStack()->allowOpportunisticUpdates() ) {
359 self::doUpdates( self::ALL );
360 return true;
363 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
364 // There are a large number of pending updates and none of them can run yet.
365 // The odds of losing updates due to an error increases when executing long queues
366 // and when large amounts of time pass while tasks are queued. Mitigate this by
367 // trying to eagerly move updates to the JobQueue when possible.
369 // TODO: Do we still need this now maintenance scripts automatically call
370 // tryOpportunisticExecute from addUpdate, from every commit, and every
371 // waitForReplication call?
372 self::getScopeStack()->current()->consumeMatchingUpdates(
373 self::ALL,
374 EnqueueableDataUpdate::class,
375 static function ( EnqueueableDataUpdate $update ) {
376 self::getScopeStack()->queueDataUpdate( $update );
381 return false;
385 * Prevent opportunistic updates until the returned ScopedCallback is
386 * consumed.
388 * @return ScopedCallback
390 public static function preventOpportunisticUpdates() {
391 self::$preventOpportunisticUpdates++;
392 return new ScopedCallback( static function () {
393 self::$preventOpportunisticUpdates--;
394 } );
398 * Get the number of pending updates for the current execution context
400 * If an update is in progress, then this operates on the sub-queues of the
401 * innermost in-progress update. Otherwise, it acts on the top-queues.
403 * @return int
404 * @since 1.28
406 public static function pendingUpdatesCount() {
407 return self::getScopeStack()->current()->pendingUpdatesCount();
411 * Get a list of the pending updates for the current execution context
413 * If an update is in progress, then this operates on the sub-queues of the
414 * innermost in-progress update. Otherwise, it acts on the top-queues.
416 * @param int $stage Look for updates with this "defer until" stage. One of
417 * (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
418 * @return DeferrableUpdate[]
419 * @internal This method should only be used for unit tests
420 * @since 1.29
422 public static function getPendingUpdates( $stage = self::ALL ) {
423 return self::getScopeStack()->current()->getPendingUpdates( $stage );
427 * Cancel all pending updates for the current execution context
429 * If an update is in progress, then this operates on the sub-queues of the
430 * innermost in-progress update. Otherwise, it acts on the top-queues.
432 * @internal This method should only be used for unit tests
434 public static function clearPendingUpdates() {
435 self::getScopeStack()->current()->clearPendingUpdates();
439 * Get the number of in-progress calls to DeferredUpdates::doUpdates()
441 * @return int
442 * @internal This method should only be used for unit tests
444 public static function getRecursiveExecutionStackDepth() {
445 return self::getScopeStack()->getRecursiveDepth();
449 * Attempt to run an update with the appropriate transaction round state if needed
451 * It is allowed for a DeferredUpdate to directly execute one or more other DeferredUpdate
452 * instances without queueing them by calling this method. In that case, the outer update
453 * must use TransactionRoundAwareUpdate::TRX_ROUND_ABSENT, e.g. by extending
454 * TransactionRoundDefiningUpdate, so that this method can give each update its own
455 * transaction round.
457 * @param DeferrableUpdate $update
458 * @since 1.34
460 public static function attemptUpdate( DeferrableUpdate $update ) {
461 self::getScopeStack()->onRunUpdateStart( $update );
463 $update->doUpdate();
465 self::getScopeStack()->onRunUpdateEnd( $update );