MediaWiki  REL1_23
runJobs.php
Go to the documentation of this file.
00001 <?php
00024 require_once __DIR__ . '/Maintenance.php';
00025 
00031 class RunJobs extends Maintenance {
00032     public function __construct() {
00033         parent::__construct();
00034         $this->mDescription = "Run pending jobs";
00035         $this->addOption( 'maxjobs', 'Maximum number of jobs to run', false, true );
00036         $this->addOption( 'maxtime', 'Maximum amount of wall-clock time', false, true );
00037         $this->addOption( 'type', 'Type of job to run', false, true );
00038         $this->addOption( 'procs', 'Number of processes to use', false, true );
00039         $this->addOption( 'nothrottle', 'Ignore job throttling configuration', false, false );
00040     }
00041 
00042     public function memoryLimit() {
00043         if ( $this->hasOption( 'memory-limit' ) ) {
00044             return parent::memoryLimit();
00045         }
00046         // Don't eat all memory on the machine if we get a bad job.
00047         return "150M";
00048     }
00049 
00050     public function execute() {
00051         if ( wfReadOnly() ) {
00052             $this->error( "Unable to run jobs; the wiki is in read-only mode.", 1 ); // die
00053         }
00054 
00055         if ( $this->hasOption( 'procs' ) ) {
00056             $procs = intval( $this->getOption( 'procs' ) );
00057             if ( $procs < 1 || $procs > 1000 ) {
00058                 $this->error( "Invalid argument to --procs", true );
00059             } elseif ( $procs != 1 ) {
00060                 $fc = new ForkController( $procs );
00061                 if ( $fc->start() != 'child' ) {
00062                     exit( 0 );
00063                 }
00064             }
00065         }
00066 
00067         $type = $this->getOption( 'type', false );
00068         $maxJobs = $this->getOption( 'maxjobs', false );
00069         $maxTime = $this->getOption( 'maxtime', false );
00070         $noThrottle = $this->hasOption( 'nothrottle' );
00071         $startTime = time();
00072 
00073         $group = JobQueueGroup::singleton();
00074         // Handle any required periodic queue maintenance
00075         $count = $group->executeReadyPeriodicTasks();
00076         if ( $count > 0 ) {
00077             $this->runJobsLog( "Executed $count periodic queue task(s)." );
00078         }
00079 
00080         $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry)
00081         $startingBackoffs = $backoffs; // avoid unnecessary writes
00082         $backoffExpireFunc = function( $t ) { return $t > time(); };
00083 
00084         $jobsRun = 0; // counter
00085         $flags = JobQueueGroup::USE_CACHE;
00086         $lastTime = time(); // time since last slave check
00087         do {
00088             $backoffs = array_filter( $backoffs, $backoffExpireFunc );
00089             $blacklist = $noThrottle ? array() : array_keys( $backoffs );
00090             if ( $type === false ) {
00091                 $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist );
00092             } elseif ( in_array( $type, $blacklist ) ) {
00093                 $job = false; // requested queue in backoff state
00094             } else {
00095                 $job = $group->pop( $type ); // job from a single queue
00096             }
00097             if ( $job ) { // found a job
00098                 ++$jobsRun;
00099                 $this->runJobsLog( $job->toString() . " STARTING" );
00100 
00101                 // Set timer to stop the job if too much CPU time is used
00102                 set_time_limit( $maxTime ?: 0 );
00103                 // Run the job...
00104                 wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
00105                 $t = microtime( true );
00106                 try {
00107                     $status = $job->run();
00108                     $error = $job->getLastError();
00109                 } catch ( MWException $e ) {
00110                     MWExceptionHandler::rollbackMasterChangesAndLog( $e );
00111                     $status = false;
00112                     $error = get_class( $e ) . ': ' . $e->getMessage();
00113                     $e->report(); // write error to STDERR and the log
00114                 }
00115                 $timeMs = intval( ( microtime( true ) - $t ) * 1000 );
00116                 wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
00117                 // Disable the timer
00118                 set_time_limit( 0 );
00119 
00120                 // Mark the job as done on success or when the job cannot be retried
00121                 if ( $status !== false || !$job->allowRetries() ) {
00122                     $group->ack( $job ); // done
00123                 }
00124 
00125                 if ( $status === false ) {
00126                     $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
00127                 } else {
00128                     $this->runJobsLog( $job->toString() . " t=$timeMs good" );
00129                 }
00130 
00131                 // Back off of certain jobs for a while
00132                 $ttw = $this->getBackoffTimeToWait( $job );
00133                 if ( $ttw > 0 ) {
00134                     $jType = $job->getType();
00135                     $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0;
00136                     $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw );
00137                 }
00138 
00139                 // Break out if we hit the job count or wall time limits...
00140                 if ( $maxJobs && $jobsRun >= $maxJobs ) {
00141                     break;
00142                 } elseif ( $maxTime && ( time() - $startTime ) > $maxTime ) {
00143                     break;
00144                 }
00145 
00146                 // Don't let any of the main DB slaves get backed up
00147                 $timePassed = time() - $lastTime;
00148                 if ( $timePassed >= 5 || $timePassed < 0 ) {
00149                     wfWaitForSlaves();
00150                     $lastTime = time();
00151                 }
00152                 // Don't let any queue slaves/backups fall behind
00153                 if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
00154                     $group->waitForBackups();
00155                 }
00156 
00157                 // Bail if near-OOM instead of in a job
00158                 $this->assertMemoryOK();
00159             }
00160         } while ( $job ); // stop when there are no jobs
00161         // Sync the persistent backoffs for the next runJobs.php pass
00162         $backoffs = array_filter( $backoffs, $backoffExpireFunc );
00163         if ( $backoffs !== $startingBackoffs ) {
00164             $this->syncBackoffs( $backoffs );
00165         }
00166     }
00167 
00173     private function getBackoffTimeToWait( Job $job ) {
00174         global $wgJobBackoffThrottling;
00175 
00176         if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ) {
00177             return 0; // not throttled
00178         }
00179         $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
00180         if ( $itemsPerSecond <= 0 ) {
00181             return 0; // not throttled
00182         }
00183 
00184         $seconds = 0;
00185         if ( $job->workItemCount() > 0 ) {
00186             $seconds = floor( $job->workItemCount() / $itemsPerSecond );
00187             $remainder = $job->workItemCount() % $itemsPerSecond;
00188             $seconds += ( mt_rand( 1, $itemsPerSecond ) <= $remainder ) ? 1 : 0;
00189         }
00190 
00191         return (int)$seconds;
00192     }
00193 
00199     private function loadBackoffs() {
00200         $section = new ProfileSection( __METHOD__ );
00201 
00202         $backoffs = array();
00203         $file = wfTempDir() . '/mw-runJobs-backoffs.json';
00204         if ( is_file( $file ) ) {
00205             $handle = fopen( $file, 'rb' );
00206             flock( $handle, LOCK_SH );
00207             $content = stream_get_contents( $handle );
00208             flock( $handle, LOCK_UN );
00209             fclose( $handle );
00210             $backoffs = json_decode( $content, true ) ?: array();
00211         }
00212 
00213         return $backoffs;
00214     }
00215 
00221     private function syncBackoffs( array $backoffs ) {
00222         $section = new ProfileSection( __METHOD__ );
00223 
00224         $file = wfTempDir() . '/mw-runJobs-backoffs.json';
00225         $handle = fopen( $file, 'wb+' );
00226         flock( $handle, LOCK_EX );
00227         $content = stream_get_contents( $handle );
00228         $cBackoffs = json_decode( $content, true ) ?: array();
00229         foreach ( $backoffs as $type => $timestamp ) {
00230             $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0;
00231             $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] );
00232         }
00233         ftruncate( $handle, 0 );
00234         fwrite( $handle, json_encode( $backoffs ) );
00235         flock( $handle, LOCK_UN );
00236         fclose( $handle );
00237     }
00238 
00244     private function assertMemoryOK() {
00245         static $maxBytes = null;
00246         if ( $maxBytes === null ) {
00247             $m = array();
00248             if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
00249                 list( , $num, $unit ) = $m;
00250                 $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 );
00251                 $maxBytes = $num * $conv[strtolower( $unit )];
00252             } else {
00253                 $maxBytes = 0;
00254             }
00255         }
00256         $usedBytes = memory_get_usage();
00257         if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
00258             throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." );
00259         }
00260     }
00261 
00266     private function runJobsLog( $msg ) {
00267         $this->output( wfTimestamp( TS_DB ) . " $msg\n" );
00268         wfDebugLog( 'runJobs', $msg );
00269     }
00270 }
00271 
00272 $maintClass = "RunJobs";
00273 require_once RUN_MAINTENANCE_IF_MAIN;