Refactored duplicated code into JobRunner.php
[mediawiki.git] / includes / jobqueue / JobRunner.php
blob0f585c7dc650b1df811b796f88368913bb822273
1 <?php
2 /**
3 * Job queue runner utility methods
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
20 * @file
21 * @ingroup JobQueue
24 /**
25 * Job queue runner utility methods
27 * @ingroup JobQueue
28 * @since 1.24
30 class JobRunner {
31 /** @var callable|null Debug output handler */
32 protected $debug;
34 /**
35 * @param callable $debug Optional debug output handler
37 public function setDebugHandler( $debug ) {
38 $this->debug = $debug;
41 /**
42 * Run jobs of the specified number/type for the specified time
44 * The response map has a 'job' field that lists status of each job, including:
45 * - type : the job type
46 * - status : ok/failed
47 * - error : any error message string
48 * - time : the job run time in ms
49 * The response map also has:
50 * - backoffs : the (job type => seconds) map of backoff times
51 * - elapsed : the total time spent running tasks in ms
52 * - reached : the reason the script finished, one of (none-ready, job-limit, time-limit)
54 * @param array $options
55 * @return array Summary response that can easily be JSON serialized
57 public function run( array $options ) {
58 $response = array( 'jobs' => array(), 'reached' => 'none-ready' );
60 $type = isset( $options['type'] ) ? $options['type'] : false;
61 $maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false;
62 $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false;
63 $noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
65 $group = JobQueueGroup::singleton();
66 // Handle any required periodic queue maintenance
67 $count = $group->executeReadyPeriodicTasks();
68 if ( $count > 0 ) {
69 $this->runJobsLog( "Executed $count periodic queue task(s)." );
72 // Flush any pending DB writes for sanity
73 wfGetLBFactory()->commitMasterChanges();
75 $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry)
76 $startingBackoffs = $backoffs; // avoid unnecessary writes
77 $backoffExpireFunc = function ( $t ) {
78 return $t > time();
81 $jobsRun = 0; // counter
82 $timeMsTotal = 0;
83 $flags = JobQueueGroup::USE_CACHE;
84 $startTime = microtime( true ); // time since jobs started running
85 $lastTime = microtime( true ); // time since last slave check
86 do {
87 $backoffs = array_filter( $backoffs, $backoffExpireFunc );
88 $blacklist = $noThrottle ? array() : array_keys( $backoffs );
89 if ( $type === false ) {
90 $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist );
91 } elseif ( in_array( $type, $blacklist ) ) {
92 $job = false; // requested queue in backoff state
93 } else {
94 $job = $group->pop( $type ); // job from a single queue
96 if ( $job ) { // found a job
97 $jType = $job->getType();
99 $this->runJobsLog( $job->toString() . " STARTING" );
101 // Run the job...
102 wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
103 $t = microtime( true );
104 try {
105 ++$jobsRun;
106 $status = $job->run();
107 $error = $job->getLastError();
108 wfGetLBFactory()->commitMasterChanges();
109 } catch ( MWException $e ) {
110 MWExceptionHandler::rollbackMasterChangesAndLog( $e );
111 $status = false;
112 $error = get_class( $e ) . ': ' . $e->getMessage();
113 $e->report(); // write error to STDERR and the log
115 $timeMs = intval( ( microtime( true ) - $t ) * 1000 );
116 wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
117 $timeMsTotal += $timeMs;
119 // Mark the job as done on success or when the job cannot be retried
120 if ( $status !== false || !$job->allowRetries() ) {
121 $group->ack( $job ); // done
124 if ( $status === false ) {
125 $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
126 } else {
127 $this->runJobsLog( $job->toString() . " t=$timeMs good" );
130 $response['jobs'][] = array(
131 'type' => $jType,
132 'status' => ( $status === false ) ? 'failed' : 'ok',
133 'error' => $error,
134 'time' => $timeMs
137 // Back off of certain jobs for a while (for throttling and for errors)
138 $ttw = $this->getBackoffTimeToWait( $job );
139 if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
140 $ttw = max( $ttw, 30 );
142 if ( $ttw > 0 ) {
143 $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0;
144 $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw );
147 // Break out if we hit the job count or wall time limits...
148 if ( $maxJobs && $jobsRun >= $maxJobs ) {
149 $response['reached'] = 'job-limit';
150 break;
151 } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
152 $response['reached'] = 'time-limit';
153 break;
156 // Don't let any of the main DB slaves get backed up
157 $timePassed = microtime( true ) - $lastTime;
158 if ( $timePassed >= 5 || $timePassed < 0 ) {
159 wfWaitForSlaves( $lastTime );
160 $lastTime = microtime( true );
162 // Don't let any queue slaves/backups fall behind
163 if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
164 $group->waitForBackups();
167 // Bail if near-OOM instead of in a job
168 $this->assertMemoryOK();
170 } while ( $job ); // stop when there are no jobs
172 // Sync the persistent backoffs for the next runJobs.php pass
173 $backoffs = array_filter( $backoffs, $backoffExpireFunc );
174 if ( $backoffs !== $startingBackoffs ) {
175 $this->syncBackoffs( $backoffs );
178 $response['backoffs'] = $backoffs;
179 $response['elapsed'] = $timeMsTotal;
181 return $response;
185 * @param Job $job
186 * @return int Seconds for this runner to avoid doing more jobs of this type
187 * @see $wgJobBackoffThrottling
189 private function getBackoffTimeToWait( Job $job ) {
190 global $wgJobBackoffThrottling;
192 if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
193 $job instanceof DuplicateJob // no work was done
195 return 0; // not throttled
198 $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
199 if ( $itemsPerSecond <= 0 ) {
200 return 0; // not throttled
203 $seconds = 0;
204 if ( $job->workItemCount() > 0 ) {
205 $exactSeconds = $job->workItemCount() / $itemsPerSecond;
206 // use randomized rounding
207 $seconds = floor( $exactSeconds );
208 $remainder = $exactSeconds - $seconds;
209 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
212 return (int)$seconds;
216 * Get the previous backoff expiries from persistent storage
218 * @return array Map of (job type => backoff expiry timestamp)
220 private function loadBackoffs() {
221 $section = new ProfileSection( __METHOD__ );
223 $backoffs = array();
224 $file = wfTempDir() . '/mw-runJobs-backoffs.json';
225 if ( is_file( $file ) ) {
226 $handle = fopen( $file, 'rb' );
227 flock( $handle, LOCK_SH );
228 $content = stream_get_contents( $handle );
229 flock( $handle, LOCK_UN );
230 fclose( $handle );
231 $backoffs = json_decode( $content, true ) ? : array();
234 return $backoffs;
238 * Merge the current backoff expiries from persistent storage
240 * @param array $backoffs Map of (job type => backoff expiry timestamp)
242 private function syncBackoffs( array $backoffs ) {
243 $section = new ProfileSection( __METHOD__ );
245 $file = wfTempDir() . '/mw-runJobs-backoffs.json';
246 $handle = fopen( $file, 'wb+' );
247 flock( $handle, LOCK_EX );
248 $content = stream_get_contents( $handle );
249 $cBackoffs = json_decode( $content, true ) ? : array();
250 foreach ( $backoffs as $type => $timestamp ) {
251 $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0;
252 $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] );
254 ftruncate( $handle, 0 );
255 fwrite( $handle, json_encode( $backoffs ) );
256 flock( $handle, LOCK_UN );
257 fclose( $handle );
261 * Make sure that this script is not too close to the memory usage limit.
262 * It is better to die in between jobs than OOM right in the middle of one.
263 * @throws MWException
265 private function assertMemoryOK() {
266 static $maxBytes = null;
267 if ( $maxBytes === null ) {
268 $m = array();
269 if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
270 list( , $num, $unit ) = $m;
271 $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 );
272 $maxBytes = $num * $conv[strtolower( $unit )];
273 } else {
274 $maxBytes = 0;
277 $usedBytes = memory_get_usage();
278 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
279 throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." );
284 * Log the job message
285 * @param string $msg The message to log
287 private function runJobsLog( $msg ) {
288 if ( $this->debug ) {
289 call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
291 wfDebugLog( 'runJobs', $msg );