MediaWiki  REL1_24
JobRunner.php
Go to the documentation of this file.
00001 <?php
00030 class JobRunner {
00032     protected $debug;
00033 
00037     public function setDebugHandler( $debug ) {
00038         $this->debug = $debug;
00039     }
00040 
00064     public function run( array $options ) {
00065         $response = array( 'jobs' => array(), 'reached' => 'none-ready' );
00066 
00067         $type = isset( $options['type'] ) ? $options['type'] : false;
00068         $maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false;
00069         $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false;
00070         $noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
00071 
00072         $group = JobQueueGroup::singleton();
00073         // Handle any required periodic queue maintenance
00074         $count = $group->executeReadyPeriodicTasks();
00075         if ( $count > 0 ) {
00076             $this->runJobsLog( "Executed $count periodic queue task(s)." );
00077         }
00078 
00079         // Flush any pending DB writes for sanity
00080         wfGetLBFactory()->commitMasterChanges();
00081 
00082         // Some jobs types should not run until a certain timestamp
00083         $backoffs = array(); // map of (type => UNIX expiry)
00084         $backoffDeltas = array(); // map of (type => seconds)
00085         $wait = 'wait'; // block to read backoffs the first time
00086 
00087         $jobsRun = 0;
00088         $timeMsTotal = 0;
00089         $flags = JobQueueGroup::USE_CACHE;
00090         $startTime = microtime( true ); // time since jobs started running
00091         $lastTime = microtime( true ); // time since last slave check
00092         do {
00093             // Sync the persistent backoffs with concurrent runners
00094             $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
00095             $blacklist = $noThrottle ? array() : array_keys( $backoffs );
00096             $wait = 'nowait'; // less important now
00097 
00098             if ( $type === false ) {
00099                 $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist );
00100             } elseif ( in_array( $type, $blacklist ) ) {
00101                 $job = false; // requested queue in backoff state
00102             } else {
00103                 $job = $group->pop( $type ); // job from a single queue
00104             }
00105 
00106             if ( $job ) { // found a job
00107                 $jType = $job->getType();
00108 
00109                 // Back off of certain jobs for a while (for throttling and for errors)
00110                 $ttw = $this->getBackoffTimeToWait( $job );
00111                 if ( $ttw > 0 ) {
00112                     // Always add the delta for other runners in case the time running the
00113                     // job negated the backoff for each individually but not collectively.
00114                     $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
00115                         ? $backoffDeltas[$jType] + $ttw
00116                         : $ttw;
00117                     $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
00118                 }
00119 
00120                 $this->runJobsLog( $job->toString() . " STARTING" );
00121 
00122                 // Run the job...
00123                 wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
00124                 $jobStartTime = microtime( true );
00125                 try {
00126                     ++$jobsRun;
00127                     $status = $job->run();
00128                     $error = $job->getLastError();
00129                     wfGetLBFactory()->commitMasterChanges();
00130                 } catch ( MWException $e ) {
00131                     MWExceptionHandler::rollbackMasterChangesAndLog( $e );
00132                     $status = false;
00133                     $error = get_class( $e ) . ': ' . $e->getMessage();
00134                     MWExceptionHandler::logException( $e );
00135                 }
00136                 $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
00137                 wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
00138                 $timeMsTotal += $timeMs;
00139 
00140                 // Mark the job as done on success or when the job cannot be retried
00141                 if ( $status !== false || !$job->allowRetries() ) {
00142                     $group->ack( $job ); // done
00143                 }
00144 
00145                 // Back off of certain jobs for a while (for throttling and for errors)
00146                 if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
00147                     $ttw = max( $ttw, 30 ); // too many errors
00148                     $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
00149                         ? $backoffDeltas[$jType] + $ttw
00150                         : $ttw;
00151                 }
00152 
00153                 if ( $status === false ) {
00154                     $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
00155                 } else {
00156                     $this->runJobsLog( $job->toString() . " t=$timeMs good" );
00157                 }
00158 
00159                 $response['jobs'][] = array(
00160                     'type'   => $jType,
00161                     'status' => ( $status === false ) ? 'failed' : 'ok',
00162                     'error'  => $error,
00163                     'time'   => $timeMs
00164                 );
00165 
00166                 // Break out if we hit the job count or wall time limits...
00167                 if ( $maxJobs && $jobsRun >= $maxJobs ) {
00168                     $response['reached'] = 'job-limit';
00169                     break;
00170                 } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
00171                     $response['reached'] = 'time-limit';
00172                     break;
00173                 }
00174 
00175                 // Don't let any of the main DB slaves get backed up
00176                 $timePassed = microtime( true ) - $lastTime;
00177                 if ( $timePassed >= 5 || $timePassed < 0 ) {
00178                     wfWaitForSlaves( $lastTime );
00179                     $lastTime = microtime( true );
00180                 }
00181                 // Don't let any queue slaves/backups fall behind
00182                 if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
00183                     $group->waitForBackups();
00184                 }
00185 
00186                 // Bail if near-OOM instead of in a job
00187                 $this->assertMemoryOK();
00188             }
00189         } while ( $job ); // stop when there are no jobs
00190 
00191         // Sync the persistent backoffs for the next runJobs.php pass
00192         if ( $backoffDeltas ) {
00193             $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
00194         }
00195 
00196         $response['backoffs'] = $backoffs;
00197         $response['elapsed'] = $timeMsTotal;
00198 
00199         return $response;
00200     }
00201 
00207     private function getBackoffTimeToWait( Job $job ) {
00208         global $wgJobBackoffThrottling;
00209 
00210         if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
00211             $job instanceof DuplicateJob // no work was done
00212         ) {
00213             return 0; // not throttled
00214         }
00215 
00216         $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
00217         if ( $itemsPerSecond <= 0 ) {
00218             return 0; // not throttled
00219         }
00220 
00221         $seconds = 0;
00222         if ( $job->workItemCount() > 0 ) {
00223             $exactSeconds = $job->workItemCount() / $itemsPerSecond;
00224             // use randomized rounding
00225             $seconds = floor( $exactSeconds );
00226             $remainder = $exactSeconds - $seconds;
00227             $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
00228         }
00229 
00230         return (int)$seconds;
00231     }
00232 
00241     private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
00242         $section = new ProfileSection( __METHOD__ );
00243 
00244         $file = wfTempDir() . '/mw-runJobs-backoffs.json';
00245         if ( is_file( $file ) ) {
00246             $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
00247             $handle = fopen( $file, 'rb' );
00248             if ( !flock( $handle, LOCK_SH | $noblock ) ) {
00249                 fclose( $handle );
00250                 return $backoffs; // don't wait on lock
00251             }
00252             $content = stream_get_contents( $handle );
00253             flock( $handle, LOCK_UN );
00254             fclose( $handle );
00255             $ctime = microtime( true );
00256             $cBackoffs = json_decode( $content, true ) ?: array();
00257             foreach ( $cBackoffs as $type => $timestamp ) {
00258                 if ( $timestamp < $ctime ) {
00259                     unset( $cBackoffs[$type] );
00260                 }
00261             }
00262         } else {
00263             $cBackoffs = array();
00264         }
00265 
00266         return $cBackoffs;
00267     }
00268 
00280     private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
00281         $section = new ProfileSection( __METHOD__ );
00282 
00283         if ( !$deltas ) {
00284             return $this->loadBackoffs( $backoffs, $mode );
00285         }
00286 
00287         $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
00288         $file = wfTempDir() . '/mw-runJobs-backoffs.json';
00289         $handle = fopen( $file, 'wb+' );
00290         if ( !flock( $handle, LOCK_EX | $noblock ) ) {
00291             fclose( $handle );
00292             return $backoffs; // don't wait on lock
00293         }
00294         $ctime = microtime( true );
00295         $content = stream_get_contents( $handle );
00296         $cBackoffs = json_decode( $content, true ) ?: array();
00297         foreach ( $deltas as $type => $seconds ) {
00298             $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
00299                 ? $cBackoffs[$type] + $seconds
00300                 : $ctime + $seconds;
00301         }
00302         foreach ( $cBackoffs as $type => $timestamp ) {
00303             if ( $timestamp < $ctime ) {
00304                 unset( $cBackoffs[$type] );
00305             }
00306         }
00307         ftruncate( $handle, 0 );
00308         fwrite( $handle, json_encode( $cBackoffs ) );
00309         flock( $handle, LOCK_UN );
00310         fclose( $handle );
00311 
00312         $deltas = array();
00313 
00314         return $cBackoffs;
00315     }
00316 
00322     private function assertMemoryOK() {
00323         static $maxBytes = null;
00324         if ( $maxBytes === null ) {
00325             $m = array();
00326             if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
00327                 list( , $num, $unit ) = $m;
00328                 $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 );
00329                 $maxBytes = $num * $conv[strtolower( $unit )];
00330             } else {
00331                 $maxBytes = 0;
00332             }
00333         }
00334         $usedBytes = memory_get_usage();
00335         if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
00336             throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." );
00337         }
00338     }
00339 
00344     private function runJobsLog( $msg ) {
00345         if ( $this->debug ) {
00346             call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
00347         }
00348         wfDebugLog( 'runJobs', $msg );
00349     }
00350 }