MediaWiki  REL1_23
JobQueueDB.php
Go to the documentation of this file.
00001 <?php
00030 class JobQueueDB extends JobQueue {
00031     const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
00032     const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
00033     const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
00034     const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
00035     const MAX_OFFSET = 255; // integer; maximum number of rows to skip
00036 
00038     protected $cache;
00039 
00041     protected $cluster = false;
00042 
00051     protected function __construct( array $params ) {
00052         global $wgMemc;
00053 
00054         parent::__construct( $params );
00055 
00056         $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
00057         // Make sure that we don't use the SQL cache, which would be harmful
00058         $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc;
00059     }
00060 
00061     protected function supportedOrders() {
00062         return array( 'random', 'timestamp', 'fifo' );
00063     }
00064 
00065     protected function optimalOrder() {
00066         return 'random';
00067     }
00068 
00073     protected function doIsEmpty() {
00074         $key = $this->getCacheKey( 'empty' );
00075 
00076         $isEmpty = $this->cache->get( $key );
00077         if ( $isEmpty === 'true' ) {
00078             return true;
00079         } elseif ( $isEmpty === 'false' ) {
00080             return false;
00081         }
00082 
00083         $dbr = $this->getSlaveDB();
00084         try {
00085             $found = $dbr->selectField( // unclaimed job
00086                 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
00087             );
00088         } catch ( DBError $e ) {
00089             $this->throwDBException( $e );
00090         }
00091         $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
00092 
00093         return !$found;
00094     }
00095 
00100     protected function doGetSize() {
00101         $key = $this->getCacheKey( 'size' );
00102 
00103         $size = $this->cache->get( $key );
00104         if ( is_int( $size ) ) {
00105             return $size;
00106         }
00107 
00108         try {
00109             $dbr = $this->getSlaveDB();
00110             $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
00111                 array( 'job_cmd' => $this->type, 'job_token' => '' ),
00112                 __METHOD__
00113             );
00114         } catch ( DBError $e ) {
00115             $this->throwDBException( $e );
00116         }
00117         $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
00118 
00119         return $size;
00120     }
00121 
00126     protected function doGetAcquiredCount() {
00127         if ( $this->claimTTL <= 0 ) {
00128             return 0; // no acknowledgements
00129         }
00130 
00131         $key = $this->getCacheKey( 'acquiredcount' );
00132 
00133         $count = $this->cache->get( $key );
00134         if ( is_int( $count ) ) {
00135             return $count;
00136         }
00137 
00138         $dbr = $this->getSlaveDB();
00139         try {
00140             $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
00141                 array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
00142                 __METHOD__
00143             );
00144         } catch ( DBError $e ) {
00145             $this->throwDBException( $e );
00146         }
00147         $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
00148 
00149         return $count;
00150     }
00151 
00157     protected function doGetAbandonedCount() {
00158         global $wgMemc;
00159 
00160         if ( $this->claimTTL <= 0 ) {
00161             return 0; // no acknowledgements
00162         }
00163 
00164         $key = $this->getCacheKey( 'abandonedcount' );
00165 
00166         $count = $wgMemc->get( $key );
00167         if ( is_int( $count ) ) {
00168             return $count;
00169         }
00170 
00171         $dbr = $this->getSlaveDB();
00172         try {
00173             $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
00174                 array(
00175                     'job_cmd' => $this->type,
00176                     "job_token != {$dbr->addQuotes( '' )}",
00177                     "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
00178                 ),
00179                 __METHOD__
00180             );
00181         } catch ( DBError $e ) {
00182             $this->throwDBException( $e );
00183         }
00184         $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
00185 
00186         return $count;
00187     }
00188 
00196     protected function doBatchPush( array $jobs, $flags ) {
00197         $dbw = $this->getMasterDB();
00198 
00199         $that = $this;
00200         $method = __METHOD__;
00201         $dbw->onTransactionIdle(
00202             function () use ( $dbw, $that, $jobs, $flags, $method ) {
00203                 $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
00204             }
00205         );
00206 
00207         return true;
00208     }
00209 
00220     public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
00221         if ( !count( $jobs ) ) {
00222             return true;
00223         }
00224 
00225         $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
00226         $rowList = array(); // list of jobs for jobs that are are not de-duplicated
00227         foreach ( $jobs as $job ) {
00228             $row = $this->insertFields( $job );
00229             if ( $job->ignoreDuplicates() ) {
00230                 $rowSet[$row['job_sha1']] = $row;
00231             } else {
00232                 $rowList[] = $row;
00233             }
00234         }
00235 
00236         if ( $flags & self::QOS_ATOMIC ) {
00237             $dbw->begin( $method ); // wrap all the job additions in one transaction
00238         }
00239         try {
00240             // Strip out any duplicate jobs that are already in the queue...
00241             if ( count( $rowSet ) ) {
00242                 $res = $dbw->select( 'job', 'job_sha1',
00243                     array(
00244                         // No job_type condition since it's part of the job_sha1 hash
00245                         'job_sha1' => array_keys( $rowSet ),
00246                         'job_token' => '' // unclaimed
00247                     ),
00248                     $method
00249                 );
00250                 foreach ( $res as $row ) {
00251                     wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
00252                     unset( $rowSet[$row->job_sha1] ); // already enqueued
00253                 }
00254             }
00255             // Build the full list of job rows to insert
00256             $rows = array_merge( $rowList, array_values( $rowSet ) );
00257             // Insert the job rows in chunks to avoid slave lag...
00258             foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
00259                 $dbw->insert( 'job', $rowBatch, $method );
00260             }
00261             JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
00262             JobQueue::incrStats(
00263                 'job-insert-duplicate',
00264                 $this->type,
00265                 count( $rowSet ) + count( $rowList ) - count( $rows ),
00266                 $this->wiki
00267             );
00268         } catch ( DBError $e ) {
00269             if ( $flags & self::QOS_ATOMIC ) {
00270                 $dbw->rollback( $method );
00271             }
00272             throw $e;
00273         }
00274         if ( $flags & self::QOS_ATOMIC ) {
00275             $dbw->commit( $method );
00276         }
00277 
00278         $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
00279 
00280         return true;
00281     }
00282 
00287     protected function doPop() {
00288         if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
00289             return false; // queue is empty
00290         }
00291 
00292         $dbw = $this->getMasterDB();
00293         try {
00294             $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
00295             $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
00296             $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
00297             $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
00298                 $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
00299             } );
00300 
00301             $uuid = wfRandomString( 32 ); // pop attempt
00302             $job = false; // job popped off
00303             do { // retry when our row is invalid or deleted as a duplicate
00304                 // Try to reserve a row in the DB...
00305                 if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
00306                     $row = $this->claimOldest( $uuid );
00307                 } else { // random first
00308                     $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
00309                     $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
00310                     $row = $this->claimRandom( $uuid, $rand, $gte );
00311                 }
00312                 // Check if we found a row to reserve...
00313                 if ( !$row ) {
00314                     $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
00315                     break; // nothing to do
00316                 }
00317                 JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
00318                 // Get the job object from the row...
00319                 $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
00320                 if ( !$title ) {
00321                     $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
00322                     wfDebug( "Row has invalid title '{$row->job_title}'.\n" );
00323                     continue; // try again
00324                 }
00325                 $job = Job::factory( $row->job_cmd, $title,
00326                     self::extractBlob( $row->job_params ), $row->job_id );
00327                 $job->metadata['id'] = $row->job_id;
00328                 break; // done
00329             } while ( true );
00330         } catch ( DBError $e ) {
00331             $this->throwDBException( $e );
00332         }
00333 
00334         return $job;
00335     }
00336 
00345     protected function claimRandom( $uuid, $rand, $gte ) {
00346         $dbw = $this->getMasterDB();
00347         // Check cache to see if the queue has <= OFFSET items
00348         $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
00349 
00350         $row = false; // the row acquired
00351         $invertedDirection = false; // whether one job_random direction was already scanned
00352         // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
00353         // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
00354         // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
00355         // be used here with MySQL.
00356         do {
00357             if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
00358                 // For small queues, using OFFSET will overshoot and return no rows more often.
00359                 // Instead, this uses job_random to pick a row (possibly checking both directions).
00360                 $ineq = $gte ? '>=' : '<=';
00361                 $dir = $gte ? 'ASC' : 'DESC';
00362                 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
00363                     array(
00364                         'job_cmd' => $this->type,
00365                         'job_token' => '', // unclaimed
00366                         "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
00367                     __METHOD__,
00368                     array( 'ORDER BY' => "job_random {$dir}" )
00369                 );
00370                 if ( !$row && !$invertedDirection ) {
00371                     $gte = !$gte;
00372                     $invertedDirection = true;
00373                     continue; // try the other direction
00374                 }
00375             } else { // table *may* have >= MAX_OFFSET rows
00376                 // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
00377                 // in MySQL if there are many rows for some reason. This uses a small OFFSET
00378                 // instead of job_random for reducing excess claim retries.
00379                 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
00380                     array(
00381                         'job_cmd' => $this->type,
00382                         'job_token' => '', // unclaimed
00383                     ),
00384                     __METHOD__,
00385                     array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) )
00386                 );
00387                 if ( !$row ) {
00388                     $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
00389                     $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
00390                     continue; // use job_random
00391                 }
00392             }
00393 
00394             if ( $row ) { // claim the job
00395                 $dbw->update( 'job', // update by PK
00396                     array(
00397                         'job_token' => $uuid,
00398                         'job_token_timestamp' => $dbw->timestamp(),
00399                         'job_attempts = job_attempts+1' ),
00400                     array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
00401                     __METHOD__
00402                 );
00403                 // This might get raced out by another runner when claiming the previously
00404                 // selected row. The use of job_random should minimize this problem, however.
00405                 if ( !$dbw->affectedRows() ) {
00406                     $row = false; // raced out
00407                 }
00408             } else {
00409                 break; // nothing to do
00410             }
00411         } while ( !$row );
00412 
00413         return $row;
00414     }
00415 
00422     protected function claimOldest( $uuid ) {
00423         $dbw = $this->getMasterDB();
00424 
00425         $row = false; // the row acquired
00426         do {
00427             if ( $dbw->getType() === 'mysql' ) {
00428                 // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
00429                 // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
00430                 // Oracle and Postgre have no such limitation. However, MySQL offers an
00431                 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
00432                 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
00433                     "SET " .
00434                         "job_token = {$dbw->addQuotes( $uuid ) }, " .
00435                         "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
00436                         "job_attempts = job_attempts+1 " .
00437                     "WHERE ( " .
00438                         "job_cmd = {$dbw->addQuotes( $this->type )} " .
00439                         "AND job_token = {$dbw->addQuotes( '' )} " .
00440                     ") ORDER BY job_id ASC LIMIT 1",
00441                     __METHOD__
00442                 );
00443             } else {
00444                 // Use a subquery to find the job, within an UPDATE to claim it.
00445                 // This uses as much of the DB wrapper functions as possible.
00446                 $dbw->update( 'job',
00447                     array(
00448                         'job_token' => $uuid,
00449                         'job_token_timestamp' => $dbw->timestamp(),
00450                         'job_attempts = job_attempts+1' ),
00451                     array( 'job_id = (' .
00452                         $dbw->selectSQLText( 'job', 'job_id',
00453                             array( 'job_cmd' => $this->type, 'job_token' => '' ),
00454                             __METHOD__,
00455                             array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
00456                         ')'
00457                     ),
00458                     __METHOD__
00459                 );
00460             }
00461             // Fetch any row that we just reserved...
00462             if ( $dbw->affectedRows() ) {
00463                 $row = $dbw->selectRow( 'job', self::selectFields(),
00464                     array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
00465                 );
00466                 if ( !$row ) { // raced out by duplicate job removal
00467                     wfDebug( "Row deleted as duplicate by another process.\n" );
00468                 }
00469             } else {
00470                 break; // nothing to do
00471             }
00472         } while ( !$row );
00473 
00474         return $row;
00475     }
00476 
00483     protected function doAck( Job $job ) {
00484         if ( !isset( $job->metadata['id'] ) ) {
00485             throw new MWException( "Job of type '{$job->getType()}' has no ID." );
00486         }
00487 
00488         $dbw = $this->getMasterDB();
00489         try {
00490             $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
00491             $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
00492             $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
00493             $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
00494                 $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
00495             } );
00496 
00497             // Delete a row with a single DELETE without holding row locks over RTTs...
00498             $dbw->delete( 'job',
00499                 array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
00500         } catch ( DBError $e ) {
00501             $this->throwDBException( $e );
00502         }
00503 
00504         return true;
00505     }
00506 
00513     protected function doDeduplicateRootJob( Job $job ) {
00514         $params = $job->getParams();
00515         if ( !isset( $params['rootJobSignature'] ) ) {
00516             throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
00517         } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
00518             throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
00519         }
00520         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00521         // Callers should call batchInsert() and then this function so that if the insert
00522         // fails, the de-duplication registration will be aborted. Since the insert is
00523         // deferred till "transaction idle", do the same here, so that the ordering is
00524         // maintained. Having only the de-duplication registration succeed would cause
00525         // jobs to become no-ops without any actual jobs that made them redundant.
00526         $dbw = $this->getMasterDB();
00527         $cache = $this->dupCache;
00528         $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
00529             $timestamp = $cache->get( $key ); // current last timestamp of this job
00530             if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
00531                 return true; // a newer version of this root job was enqueued
00532             }
00533 
00534             // Update the timestamp of the last root job started at the location...
00535             return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
00536         } );
00537 
00538         return true;
00539     }
00540 
00545     protected function doDelete() {
00546         $dbw = $this->getMasterDB();
00547         try {
00548             $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
00549         } catch ( DBError $e ) {
00550             $this->throwDBException( $e );
00551         }
00552 
00553         return true;
00554     }
00555 
00560     protected function doWaitForBackups() {
00561         wfWaitForSlaves();
00562     }
00563 
00567     protected function doGetPeriodicTasks() {
00568         return array(
00569             'recycleAndDeleteStaleJobs' => array(
00570                 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
00571                 'period' => ceil( $this->claimTTL / 2 )
00572             )
00573         );
00574     }
00575 
00579     protected function doFlushCaches() {
00580         foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
00581             $this->cache->delete( $this->getCacheKey( $type ) );
00582         }
00583     }
00584 
00589     public function getAllQueuedJobs() {
00590         $dbr = $this->getSlaveDB();
00591         try {
00592             return new MappedIterator(
00593                 $dbr->select( 'job', self::selectFields(),
00594                     array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
00595                 function ( $row ) use ( $dbr ) {
00596                     $job = Job::factory(
00597                         $row->job_cmd,
00598                         Title::makeTitle( $row->job_namespace, $row->job_title ),
00599                         strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
00600                     );
00601                     $job->metadata['id'] = $row->job_id;
00602                     return $job;
00603                 }
00604             );
00605         } catch ( DBError $e ) {
00606             $this->throwDBException( $e );
00607         }
00608     }
00609 
00610     public function getCoalesceLocationInternal() {
00611         return $this->cluster
00612             ? "DBCluster:{$this->cluster}:{$this->wiki}"
00613             : "LBFactory:{$this->wiki}";
00614     }
00615 
00616     protected function doGetSiblingQueuesWithJobs( array $types ) {
00617         $dbr = $this->getSlaveDB();
00618         $res = $dbr->select( 'job', 'DISTINCT job_cmd',
00619             array( 'job_cmd' => $types ), __METHOD__ );
00620 
00621         $types = array();
00622         foreach ( $res as $row ) {
00623             $types[] = $row->job_cmd;
00624         }
00625 
00626         return $types;
00627     }
00628 
00629     protected function doGetSiblingQueueSizes( array $types ) {
00630         $dbr = $this->getSlaveDB();
00631         $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
00632             array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
00633 
00634         $sizes = array();
00635         foreach ( $res as $row ) {
00636             $sizes[$row->job_cmd] = (int)$row->count;
00637         }
00638 
00639         return $sizes;
00640     }
00641 
00647     public function recycleAndDeleteStaleJobs() {
00648         $now = time();
00649         $count = 0; // affected rows
00650         $dbw = $this->getMasterDB();
00651 
00652         try {
00653             if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
00654                 return $count; // already in progress
00655             }
00656 
00657             // Remove claims on jobs acquired for too long if enabled...
00658             if ( $this->claimTTL > 0 ) {
00659                 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
00660                 // Get the IDs of jobs that have be claimed but not finished after too long.
00661                 // These jobs can be recycled into the queue by expiring the claim. Selecting
00662                 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
00663                 $res = $dbw->select( 'job', 'job_id',
00664                     array(
00665                         'job_cmd' => $this->type,
00666                         "job_token != {$dbw->addQuotes( '' )}", // was acquired
00667                         "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
00668                         "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
00669                     __METHOD__
00670                 );
00671                 $ids = array_map(
00672                     function ( $o ) {
00673                         return $o->job_id;
00674                     }, iterator_to_array( $res )
00675                 );
00676                 if ( count( $ids ) ) {
00677                     // Reset job_token for these jobs so that other runners will pick them up.
00678                     // Set the timestamp to the current time, as it is useful to now that the job
00679                     // was already tried before (the timestamp becomes the "released" time).
00680                     $dbw->update( 'job',
00681                         array(
00682                             'job_token' => '',
00683                             'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
00684                         array(
00685                             'job_id' => $ids ),
00686                         __METHOD__
00687                     );
00688                     $affected = $dbw->affectedRows();
00689                     $count += $affected;
00690                     JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
00691                     $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
00692                 }
00693             }
00694 
00695             // Just destroy any stale jobs...
00696             $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
00697             $conds = array(
00698                 'job_cmd' => $this->type,
00699                 "job_token != {$dbw->addQuotes( '' )}", // was acquired
00700                 "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
00701             );
00702             if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
00703                 $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
00704             }
00705             // Get the IDs of jobs that are considered stale and should be removed. Selecting
00706             // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
00707             $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
00708             $ids = array_map(
00709                 function ( $o ) {
00710                     return $o->job_id;
00711                 }, iterator_to_array( $res )
00712             );
00713             if ( count( $ids ) ) {
00714                 $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
00715                 $affected = $dbw->affectedRows();
00716                 $count += $affected;
00717                 JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
00718             }
00719 
00720             $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
00721         } catch ( DBError $e ) {
00722             $this->throwDBException( $e );
00723         }
00724 
00725         return $count;
00726     }
00727 
00732     protected function insertFields( IJobSpecification $job ) {
00733         $dbw = $this->getMasterDB();
00734 
00735         return array(
00736             // Fields that describe the nature of the job
00737             'job_cmd' => $job->getType(),
00738             'job_namespace' => $job->getTitle()->getNamespace(),
00739             'job_title' => $job->getTitle()->getDBkey(),
00740             'job_params' => self::makeBlob( $job->getParams() ),
00741             // Additional job metadata
00742             'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
00743             'job_timestamp' => $dbw->timestamp(),
00744             'job_sha1' => wfBaseConvert(
00745                 sha1( serialize( $job->getDeduplicationInfo() ) ),
00746                 16, 36, 31
00747             ),
00748             'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
00749         );
00750     }
00751 
00756     protected function getSlaveDB() {
00757         try {
00758             return $this->getDB( DB_SLAVE );
00759         } catch ( DBConnectionError $e ) {
00760             throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
00761         }
00762     }
00763 
00768     protected function getMasterDB() {
00769         try {
00770             return $this->getDB( DB_MASTER );
00771         } catch ( DBConnectionError $e ) {
00772             throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
00773         }
00774     }
00775 
00780     protected function getDB( $index ) {
00781         $lb = ( $this->cluster !== false )
00782             ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
00783             : wfGetLB( $this->wiki );
00784 
00785         return $lb->getConnectionRef( $index, array(), $this->wiki );
00786     }
00787 
00792     private function getCacheKey( $property ) {
00793         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00794         $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
00795 
00796         return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
00797     }
00798 
00803     protected static function makeBlob( $params ) {
00804         if ( $params !== false ) {
00805             return serialize( $params );
00806         } else {
00807             return '';
00808         }
00809     }
00810 
00815     protected static function extractBlob( $blob ) {
00816         if ( (string)$blob !== '' ) {
00817             return unserialize( $blob );
00818         } else {
00819             return false;
00820         }
00821     }
00822 
00827     protected function throwDBException( DBError $e ) {
00828         throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
00829     }
00830 
00836     public static function selectFields() {
00837         return array(
00838             'job_id',
00839             'job_cmd',
00840             'job_namespace',
00841             'job_title',
00842             'job_timestamp',
00843             'job_params',
00844             'job_random',
00845             'job_attempts',
00846             'job_token',
00847             'job_token_timestamp',
00848             'job_sha1',
00849         );
00850     }
00851 }