3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
21 use MediaWiki\Logger\LoggerFactory
;
22 use Wikimedia\Rdbms\IDatabase
;
23 use Wikimedia\ScopedCallback
;
26 * Defer callable updates to run later in the PHP process
28 * This is a performance feature that enables MediaWiki to produce faster web responses.
29 * It allows you to postpone non-blocking work (e.g. work that does not change the web
30 * response) to after the HTTP response has been sent to the client (i.e. web browser).
32 * Once the response is finalized and sent to the browser, the webserver process stays
33 * for a little while longer (detached from the web request) to run your POSTSEND tasks.
35 * There is also a PRESEND option, which runs your task right before the finalized response
36 * is sent to the browser. This is for critical tasks that does need to block the response,
37 * but where you'd like to benefit from other DeferredUpdates features. Such as:
39 * - MergeableUpdate: batch updates from different components without coupling
40 * or awareness of each other.
41 * - Automatic cancellation: pass a IDatabase object (for any wiki or database) to
42 * DeferredUpdates::addCallableUpdate or AtomicSectionUpdate.
43 * - Reducing lock contention: if the response is likely to take several seconds
44 * (e.g. uploading a large file to FileBackend, or saving an edit to a large article)
45 * much of that work may overlap with a database transaction that is staying open for
46 * the entire duration. By moving contentious writes out to a PRESEND update, these
47 * get their own transaction (after the main one is committed), which give up some
48 * atomicity for improved throughput.
50 * ## Expectation and comparison to job queue
52 * When scheduling a POSTSEND via the DeferredUpdates system you can generally expect
53 * it to complete well before the client makes their next request. Updates runs directly after
54 * the web response is sent, from the same process on the same server. This unlike the JobQueue,
55 * where jobs may need to wait in line for some minutes or hours.
57 * If your update fails, this failure is not known to the client and gets no retry. For updates
58 * that need re-tries for system consistency or data integrity, it is recommended to implement
59 * it as a job instead and use JobQueueGroup::lazyPush. This has the caveat of being delayed
60 * by default, the same as any other job.
62 * A hybrid solution is available via the EnqueueableDataUpdate interface. By implementing
63 * this interface, you can queue your update via the DeferredUpdates first, and if it fails,
64 * the system will automatically catch this and queue it as a job instead.
66 * ## How it works during web requests
68 * 1. Your request route is executed (e.g. Action or SpecialPage class, or API).
69 * 2. Output is finalized and main database transaction is committed.
70 * 3. PRESEND updates run via DeferredUpdates::doUpdates.
71 * 5. The web response is sent to the browser.
72 * 6. POSTSEND updates run via DeferredUpdates::doUpdates.
74 * @see MediaWiki::preOutputCommit
75 * @see MediaWiki::restInPeace
77 * ## How it works for Maintenance scripts
79 * In CLI mode, no distinction is made between PRESEND and POSTSEND deferred updates,
80 * and the queue is periodically executed throughout the process.
82 * @see DeferredUpdates::tryOpportunisticExecute
84 * ## How it works internally
86 * Each update is added via DeferredUpdates::addUpdate and stored in either the PRESEND or
87 * POSTSEND queue. If an update gets queued while another update is already running, then
88 * we store in a "sub"-queue associated with the current update. This allows nested updates
89 * to be completed before other updates, which improves ordering for process caching.
93 class DeferredUpdates
{
94 /** @var int Process all updates; in web requests, use only after flushing output buffer */
96 /** @var int Specify/process updates that should run before flushing output buffer */
97 public const PRESEND
= 1;
98 /** @var int Specify/process updates that should run after flushing output buffer */
99 public const POSTSEND
= 2;
101 /** @var int[] List of "defer until" queue stages that can be reached */
102 public const STAGES
= [ self
::PRESEND
, self
::POSTSEND
];
104 /** @var int Queue size threshold for converting updates into jobs */
105 private const BIG_QUEUE_SIZE
= 100;
107 /** @var DeferredUpdatesScopeStack|null Queue states based on recursion level */
108 private static $scopeStack;
111 * @var int Nesting level for preventOpportunisticUpdates()
113 private static $preventOpportunisticUpdates = 0;
116 * @return DeferredUpdatesScopeStack
118 private static function getScopeStack(): DeferredUpdatesScopeStack
{
119 self
::$scopeStack ??
= new DeferredUpdatesScopeMediaWikiStack();
120 return self
::$scopeStack;
124 * @param DeferredUpdatesScopeStack $scopeStack
125 * @internal Only for use in tests.
127 public static function setScopeStack( DeferredUpdatesScopeStack
$scopeStack ): void
{
128 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
129 throw new LogicException( 'Cannot reconfigure DeferredUpdates outside tests' );
131 self
::$scopeStack = $scopeStack;
135 * Add an update to the pending update queue for execution at the appropriate time
137 * In CLI mode, callback magic will also be used to run updates when safe
139 * If an update is already in progress, then what happens to this update is as follows:
140 * - If it has a "defer until" stage at/before the actual run stage of the innermost
141 * in-progress update, then it will go into the sub-queue of that in-progress update.
142 * As soon as that update completes, MergeableUpdate instances in its sub-queue will be
143 * merged into the top-queues and the non-MergeableUpdate instances will be executed.
144 * This is done to better isolate updates from the failures of other updates and reduce
145 * the chance of race conditions caused by updates not fully seeing the intended changes
146 * of previously enqueued and executed updates.
147 * - If it has a "defer until" stage later than the actual run stage of the innermost
148 * in-progress update, then it will go into the normal top-queue for that stage.
150 * @param DeferrableUpdate $update Some object that implements doUpdate()
151 * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
152 * @since 1.28 Added the $stage parameter
154 public static function addUpdate( DeferrableUpdate
$update, $stage = self
::POSTSEND
) {
155 self
::getScopeStack()->current()->addUpdate( $update, $stage );
156 self
::tryOpportunisticExecute();
160 * Add an update to the pending update queue that invokes the specified callback when run
162 * @param callable $callable
163 * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
164 * @param IDatabase|IDatabase[]|null $dbw Cancel the update if a DB transaction
165 * is rolled back [optional]
166 * @since 1.27 Added $stage parameter
167 * @since 1.28 Added the $dbw parameter
169 public static function addCallableUpdate( $callable, $stage = self
::POSTSEND
, $dbw = null ) {
170 self
::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
174 * Run an update, and, if an error was thrown, catch/log it and enqueue the update as
175 * a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate)
177 * @param DeferrableUpdate $update
178 * @return Throwable|null
180 private static function run( DeferrableUpdate
$update ): ?Throwable
{
181 $logger = LoggerFactory
::getInstance( 'DeferredUpdates' );
183 $type = get_class( $update )
184 . ( $update instanceof DeferrableCallback ?
'_' . $update->getOrigin() : '' );
185 $updateId = spl_object_id( $update );
186 $logger->debug( __METHOD__
. ": started $type #$updateId" );
188 $updateException = null;
190 $startTime = microtime( true );
192 self
::attemptUpdate( $update );
193 } catch ( Throwable
$updateException ) {
194 MWExceptionHandler
::logException( $updateException );
196 "Deferred update '{deferred_type}' failed to run.",
198 'deferred_type' => $type,
199 'exception' => $updateException,
202 self
::getScopeStack()->onRunUpdateFailed( $update );
204 $walltime = microtime( true ) - $startTime;
205 $logger->debug( __METHOD__
. ": ended $type #$updateId, processing time: $walltime" );
208 // Try to push the update as a job so it can run later if possible
209 if ( $updateException && $update instanceof EnqueueableDataUpdate
) {
211 self
::getScopeStack()->queueDataUpdate( $update );
212 } catch ( Throwable
$jobException ) {
213 MWExceptionHandler
::logException( $jobException );
215 "Deferred update '{deferred_type}' failed to enqueue as a job.",
217 'deferred_type' => $type,
218 'exception' => $jobException,
221 self
::getScopeStack()->onRunUpdateFailed( $update );
225 return $updateException;
229 * Consume and execute all pending updates
231 * Note that it is rarely the case that this method should be called outside of a few
232 * select entry points. For simplicity, that kind of recursion is discouraged. Recursion
233 * cannot happen if an explicit transaction round is active, which limits usage to updates
234 * with TRX_ROUND_ABSENT that do not leave open any transactions round of their own during
235 * the call to this method.
237 * In the less-common case of this being called within an in-progress DeferrableUpdate,
238 * this will not see any top-queue updates (since they were consumed and are being run
239 * inside an outer execution loop). In that case, it will instead operate on the sub-queue
240 * of the innermost in-progress update on the stack.
242 * @internal For use by MediaWiki, Maintenance, JobRunner, JobExecutor
243 * @param int $stage Which updates to process. One of
244 * (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
246 public static function doUpdates( $stage = self
::ALL
) {
247 /** @var ErrorPageError $guiError First presentable client-level error thrown */
249 /** @var Throwable $exception First of any error thrown */
252 $scope = self
::getScopeStack()->current();
254 // T249069: recursion is not possible once explicit transaction rounds are involved
255 $activeUpdate = $scope->getActiveUpdate();
256 if ( $activeUpdate ) {
257 $class = get_class( $activeUpdate );
258 if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate
) ) {
259 throw new LogicException(
260 __METHOD__
. ": reached from $class, which is not TransactionRoundAwareUpdate"
263 if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT
) {
264 throw new LogicException(
265 __METHOD__
. ": reached from $class, which does not specify TRX_ROUND_ABSENT"
270 $scope->processUpdates(
272 static function ( DeferrableUpdate
$update, $activeStage ) use ( &$guiError, &$exception ) {
273 $scopeStack = self
::getScopeStack();
274 $childScope = $scopeStack->descend( $activeStage, $update );
276 $e = self
::run( $update );
277 $guiError = $guiError ?
: ( $e instanceof ErrorPageError ?
$e : null );
278 $exception = $exception ?
: $e;
279 // Any addUpdate() calls between descend() and ascend() used the sub-queue.
280 // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
281 // doUpdates() itself. In any case, process remaining updates in the subqueue.
282 // them, enqueueing them, or transferring them to the parent scope
283 // queues as appropriate...
284 $childScope->processUpdates(
286 static function ( DeferrableUpdate
$sub ) use ( &$guiError, &$exception ) {
287 $e = self
::run( $sub );
288 $guiError = $guiError ?
: ( $e instanceof ErrorPageError ?
$e : null );
289 $exception = $exception ?
: $e;
293 $scopeStack->ascend();
298 // VW-style hack to work around T190178, so we can make sure
299 // PageMetaDataUpdater doesn't throw exceptions.
300 if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
304 // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
305 // callers should check permissions *before* enqueueing updates. If the main transaction
306 // round actions succeed but some deferred updates fail due to permissions errors then
307 // there is a risk that some secondary data was not properly updated.
308 if ( $guiError && $stage === self
::PRESEND
&& !headers_sent() ) {
314 * Consume and execute pending updates now if possible, instead of waiting.
316 * In web requests, updates are always deferred until the end of the request.
318 * In CLI mode, updates run earlier and more often. This is important for long-running
319 * Maintenance scripts that would otherwise grow an excessively large queue, which increases
320 * memory use, and risks losing all updates if the script ends early or crashes.
322 * The folllowing conditions are required for updates to run early in CLI mode:
324 * - No update is already in progress (ensure linear flow, recursion guard).
325 * - LBFactory indicates that we don't have any "busy" database connections, i.e.
326 * there are no pending writes or otherwise active and uncommitted transactions,
327 * except if the transaction is empty and merely used for primary DB read queries,
328 * in which case the transaction (and its repeatable-read snapshot) can be safely flushed.
332 * - When a maintenance script commits a change or waits for replication, such as
333 * via. IConnectionProvider::commitAndWaitForReplication, then ILBFactory calls
334 * tryOpportunisticExecute(). This is injected via MWLBFactory::applyGlobalState.
336 * - For maintenance scripts that don't do much with the database, we also call
337 * tryOpportunisticExecute() after every addUpdate() call.
339 * - Upon the completion of Maintenance::execute() via Maintenance::shutdown(),
340 * any remaining updates are run.
342 * Note that this method runs both PRESEND and POSTSEND updates and thus should not be called
343 * during web requests. It is only intended for long-running Maintenance scripts.
345 * @internal For use by Maintenance
347 * @return bool Whether updates were allowed to run
349 public static function tryOpportunisticExecute(): bool {
350 // Leave execution up to the current loop if an update is already in progress
351 // or if updates are explicitly disabled
352 if ( self
::getRecursiveExecutionStackDepth()
353 || self
::$preventOpportunisticUpdates
358 if ( self
::getScopeStack()->allowOpportunisticUpdates() ) {
359 self
::doUpdates( self
::ALL
);
363 if ( self
::pendingUpdatesCount() >= self
::BIG_QUEUE_SIZE
) {
364 // There are a large number of pending updates and none of them can run yet.
365 // The odds of losing updates due to an error increases when executing long queues
366 // and when large amounts of time pass while tasks are queued. Mitigate this by
367 // trying to eagerly move updates to the JobQueue when possible.
369 // TODO: Do we still need this now maintenance scripts automatically call
370 // tryOpportunisticExecute from addUpdate, from every commit, and every
371 // waitForReplication call?
372 self
::getScopeStack()->current()->consumeMatchingUpdates(
374 EnqueueableDataUpdate
::class,
375 static function ( EnqueueableDataUpdate
$update ) {
376 self
::getScopeStack()->queueDataUpdate( $update );
385 * Prevent opportunistic updates until the returned ScopedCallback is
388 * @return ScopedCallback
390 public static function preventOpportunisticUpdates() {
391 self
::$preventOpportunisticUpdates++
;
392 return new ScopedCallback( static function () {
393 self
::$preventOpportunisticUpdates--;
398 * Get the number of pending updates for the current execution context
400 * If an update is in progress, then this operates on the sub-queues of the
401 * innermost in-progress update. Otherwise, it acts on the top-queues.
406 public static function pendingUpdatesCount() {
407 return self
::getScopeStack()->current()->pendingUpdatesCount();
411 * Get a list of the pending updates for the current execution context
413 * If an update is in progress, then this operates on the sub-queues of the
414 * innermost in-progress update. Otherwise, it acts on the top-queues.
416 * @param int $stage Look for updates with this "defer until" stage. One of
417 * (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
418 * @return DeferrableUpdate[]
419 * @internal This method should only be used for unit tests
422 public static function getPendingUpdates( $stage = self
::ALL
) {
423 return self
::getScopeStack()->current()->getPendingUpdates( $stage );
427 * Cancel all pending updates for the current execution context
429 * If an update is in progress, then this operates on the sub-queues of the
430 * innermost in-progress update. Otherwise, it acts on the top-queues.
432 * @internal This method should only be used for unit tests
434 public static function clearPendingUpdates() {
435 self
::getScopeStack()->current()->clearPendingUpdates();
439 * Get the number of in-progress calls to DeferredUpdates::doUpdates()
442 * @internal This method should only be used for unit tests
444 public static function getRecursiveExecutionStackDepth() {
445 return self
::getScopeStack()->getRecursiveDepth();
449 * Attempt to run an update with the appropriate transaction round state if needed
451 * It is allowed for a DeferredUpdate to directly execute one or more other DeferredUpdate
452 * instances without queueing them by calling this method. In that case, the outer update
453 * must use TransactionRoundAwareUpdate::TRX_ROUND_ABSENT, e.g. by extending
454 * TransactionRoundDefiningUpdate, so that this method can give each update its own
457 * @param DeferrableUpdate $update
460 public static function attemptUpdate( DeferrableUpdate
$update ) {
461 self
::getScopeStack()->onRunUpdateStart( $update );
465 self
::getScopeStack()->onRunUpdateEnd( $update );