[ Index ] |
PHP Cross Reference of MediaWiki-1.24.0 |
[Summary view] [Print] [Text view]
1 <?php 2 /** 3 * Database-backed job queue code. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 * @author Aaron Schulz 22 */ 23 24 /** 25 * Class to handle job queues stored in the DB 26 * 27 * @ingroup JobQueue 28 * @since 1.21 29 */ 30 class JobQueueDB extends JobQueue { 31 const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating 32 const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date 33 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed 34 const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random 35 const MAX_OFFSET = 255; // integer; maximum number of rows to skip 36 37 /** @var BagOStuff */ 38 protected $cache; 39 40 /** @var bool|string Name of an external DB cluster. False if not set */ 41 protected $cluster = false; 42 43 /** 44 * Additional parameters include: 45 * - cluster : The name of an external cluster registered via LBFactory. 46 * If not specified, the primary DB cluster for the wiki will be used. 47 * This can be overridden with a custom cluster so that DB handles will 48 * be retrieved via LBFactory::getExternalLB() and getConnection(). 49 * @param array $params 50 */ 51 protected function __construct( array $params ) { 52 global $wgMemc; 53 54 parent::__construct( $params ); 55 56 $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; 57 // Make sure that we don't use the SQL cache, which would be harmful 58 $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc; 59 } 60 61 protected function supportedOrders() { 62 return array( 'random', 'timestamp', 'fifo' ); 63 } 64 65 protected function optimalOrder() { 66 return 'random'; 67 } 68 69 /** 70 * @see JobQueue::doIsEmpty() 71 * @return bool 72 */ 73 protected function doIsEmpty() { 74 $key = $this->getCacheKey( 'empty' ); 75 76 $isEmpty = $this->cache->get( $key ); 77 if ( $isEmpty === 'true' ) { 78 return true; 79 } elseif ( $isEmpty === 'false' ) { 80 return false; 81 } 82 83 $dbr = $this->getSlaveDB(); 84 try { 85 $found = $dbr->selectField( // unclaimed job 86 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ 87 ); 88 } catch ( DBError $e ) { 89 $this->throwDBException( $e ); 90 } 91 $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); 92 93 return !$found; 94 } 95 96 /** 97 * @see JobQueue::doGetSize() 98 * @return int 99 */ 100 protected function doGetSize() { 101 $key = $this->getCacheKey( 'size' ); 102 103 $size = $this->cache->get( $key ); 104 if ( is_int( $size ) ) { 105 return $size; 106 } 107 108 try { 109 $dbr = $this->getSlaveDB(); 110 $size = (int)$dbr->selectField( 'job', 'COUNT(*)', 111 array( 'job_cmd' => $this->type, 'job_token' => '' ), 112 __METHOD__ 113 ); 114 } catch ( DBError $e ) { 115 $this->throwDBException( $e ); 116 } 117 $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); 118 119 return $size; 120 } 121 122 /** 123 * @see JobQueue::doGetAcquiredCount() 124 * @return int 125 */ 126 protected function doGetAcquiredCount() { 127 if ( $this->claimTTL <= 0 ) { 128 return 0; // no acknowledgements 129 } 130 131 $key = $this->getCacheKey( 'acquiredcount' ); 132 133 $count = $this->cache->get( $key ); 134 if ( is_int( $count ) ) { 135 return $count; 136 } 137 138 $dbr = $this->getSlaveDB(); 139 try { 140 $count = (int)$dbr->selectField( 'job', 'COUNT(*)', 141 array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), 142 __METHOD__ 143 ); 144 } catch ( DBError $e ) { 145 $this->throwDBException( $e ); 146 } 147 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); 148 149 return $count; 150 } 151 152 /** 153 * @see JobQueue::doGetAbandonedCount() 154 * @return int 155 * @throws MWException 156 */ 157 protected function doGetAbandonedCount() { 158 global $wgMemc; 159 160 if ( $this->claimTTL <= 0 ) { 161 return 0; // no acknowledgements 162 } 163 164 $key = $this->getCacheKey( 'abandonedcount' ); 165 166 $count = $wgMemc->get( $key ); 167 if ( is_int( $count ) ) { 168 return $count; 169 } 170 171 $dbr = $this->getSlaveDB(); 172 try { 173 $count = (int)$dbr->selectField( 'job', 'COUNT(*)', 174 array( 175 'job_cmd' => $this->type, 176 "job_token != {$dbr->addQuotes( '' )}", 177 "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) 178 ), 179 __METHOD__ 180 ); 181 } catch ( DBError $e ) { 182 $this->throwDBException( $e ); 183 } 184 $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); 185 186 return $count; 187 } 188 189 /** 190 * @see JobQueue::doBatchPush() 191 * @param array $jobs 192 * @param int $flags 193 * @throws DBError|Exception 194 * @return void 195 */ 196 protected function doBatchPush( array $jobs, $flags ) { 197 $dbw = $this->getMasterDB(); 198 199 $that = $this; 200 $method = __METHOD__; 201 $dbw->onTransactionIdle( 202 function () use ( $dbw, $that, $jobs, $flags, $method ) { 203 $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); 204 } 205 ); 206 } 207 208 /** 209 * This function should *not* be called outside of JobQueueDB 210 * 211 * @param IDatabase $dbw 212 * @param array $jobs 213 * @param int $flags 214 * @param string $method 215 * @throws DBError 216 * @return void 217 */ 218 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { 219 if ( !count( $jobs ) ) { 220 return; 221 } 222 223 $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated 224 $rowList = array(); // list of jobs for jobs that are are not de-duplicated 225 foreach ( $jobs as $job ) { 226 $row = $this->insertFields( $job ); 227 if ( $job->ignoreDuplicates() ) { 228 $rowSet[$row['job_sha1']] = $row; 229 } else { 230 $rowList[] = $row; 231 } 232 } 233 234 if ( $flags & self::QOS_ATOMIC ) { 235 $dbw->begin( $method ); // wrap all the job additions in one transaction 236 } 237 try { 238 // Strip out any duplicate jobs that are already in the queue... 239 if ( count( $rowSet ) ) { 240 $res = $dbw->select( 'job', 'job_sha1', 241 array( 242 // No job_type condition since it's part of the job_sha1 hash 243 'job_sha1' => array_keys( $rowSet ), 244 'job_token' => '' // unclaimed 245 ), 246 $method 247 ); 248 foreach ( $res as $row ) { 249 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" ); 250 unset( $rowSet[$row->job_sha1] ); // already enqueued 251 } 252 } 253 // Build the full list of job rows to insert 254 $rows = array_merge( $rowList, array_values( $rowSet ) ); 255 // Insert the job rows in chunks to avoid slave lag... 256 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { 257 $dbw->insert( 'job', $rowBatch, $method ); 258 } 259 JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki ); 260 JobQueue::incrStats( 261 'job-insert-duplicate', 262 $this->type, 263 count( $rowSet ) + count( $rowList ) - count( $rows ), 264 $this->wiki 265 ); 266 } catch ( DBError $e ) { 267 if ( $flags & self::QOS_ATOMIC ) { 268 $dbw->rollback( $method ); 269 } 270 throw $e; 271 } 272 if ( $flags & self::QOS_ATOMIC ) { 273 $dbw->commit( $method ); 274 } 275 276 $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); 277 278 return; 279 } 280 281 /** 282 * @see JobQueue::doPop() 283 * @return Job|bool 284 */ 285 protected function doPop() { 286 if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { 287 return false; // queue is empty 288 } 289 290 $dbw = $this->getMasterDB(); 291 try { 292 $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction 293 $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting 294 $dbw->clearFlag( DBO_TRX ); // make each query its own transaction 295 $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { 296 $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting 297 } ); 298 299 $uuid = wfRandomString( 32 ); // pop attempt 300 $job = false; // job popped off 301 do { // retry when our row is invalid or deleted as a duplicate 302 // Try to reserve a row in the DB... 303 if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { 304 $row = $this->claimOldest( $uuid ); 305 } else { // random first 306 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs 307 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand 308 $row = $this->claimRandom( $uuid, $rand, $gte ); 309 } 310 // Check if we found a row to reserve... 311 if ( !$row ) { 312 $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); 313 break; // nothing to do 314 } 315 JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); 316 // Get the job object from the row... 317 $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); 318 if ( !$title ) { 319 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); 320 wfDebug( "Row has invalid title '{$row->job_title}'.\n" ); 321 continue; // try again 322 } 323 $job = Job::factory( $row->job_cmd, $title, 324 self::extractBlob( $row->job_params ), $row->job_id ); 325 $job->metadata['id'] = $row->job_id; 326 break; // done 327 } while ( true ); 328 } catch ( DBError $e ) { 329 $this->throwDBException( $e ); 330 } 331 332 return $job; 333 } 334 335 /** 336 * Reserve a row with a single UPDATE without holding row locks over RTTs... 337 * 338 * @param string $uuid 32 char hex string 339 * @param int $rand Random unsigned integer (31 bits) 340 * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) 341 * @return stdClass|bool Row|false 342 */ 343 protected function claimRandom( $uuid, $rand, $gte ) { 344 $dbw = $this->getMasterDB(); 345 // Check cache to see if the queue has <= OFFSET items 346 $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); 347 348 $row = false; // the row acquired 349 $invertedDirection = false; // whether one job_random direction was already scanned 350 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT 351 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is 352 // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot 353 // be used here with MySQL. 354 do { 355 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows 356 // For small queues, using OFFSET will overshoot and return no rows more often. 357 // Instead, this uses job_random to pick a row (possibly checking both directions). 358 $ineq = $gte ? '>=' : '<='; 359 $dir = $gte ? 'ASC' : 'DESC'; 360 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job 361 array( 362 'job_cmd' => $this->type, 363 'job_token' => '', // unclaimed 364 "job_random {$ineq} {$dbw->addQuotes( $rand )}" ), 365 __METHOD__, 366 array( 'ORDER BY' => "job_random {$dir}" ) 367 ); 368 if ( !$row && !$invertedDirection ) { 369 $gte = !$gte; 370 $invertedDirection = true; 371 continue; // try the other direction 372 } 373 } else { // table *may* have >= MAX_OFFSET rows 374 // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU 375 // in MySQL if there are many rows for some reason. This uses a small OFFSET 376 // instead of job_random for reducing excess claim retries. 377 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job 378 array( 379 'job_cmd' => $this->type, 380 'job_token' => '', // unclaimed 381 ), 382 __METHOD__, 383 array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ) 384 ); 385 if ( !$row ) { 386 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows 387 $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); 388 continue; // use job_random 389 } 390 } 391 392 if ( $row ) { // claim the job 393 $dbw->update( 'job', // update by PK 394 array( 395 'job_token' => $uuid, 396 'job_token_timestamp' => $dbw->timestamp(), 397 'job_attempts = job_attempts+1' ), 398 array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ), 399 __METHOD__ 400 ); 401 // This might get raced out by another runner when claiming the previously 402 // selected row. The use of job_random should minimize this problem, however. 403 if ( !$dbw->affectedRows() ) { 404 $row = false; // raced out 405 } 406 } else { 407 break; // nothing to do 408 } 409 } while ( !$row ); 410 411 return $row; 412 } 413 414 /** 415 * Reserve a row with a single UPDATE without holding row locks over RTTs... 416 * 417 * @param string $uuid 32 char hex string 418 * @return stdClass|bool Row|false 419 */ 420 protected function claimOldest( $uuid ) { 421 $dbw = $this->getMasterDB(); 422 423 $row = false; // the row acquired 424 do { 425 if ( $dbw->getType() === 'mysql' ) { 426 // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the 427 // same table being changed in an UPDATE query in MySQL (gives Error: 1093). 428 // Oracle and Postgre have no such limitation. However, MySQL offers an 429 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. 430 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . 431 "SET " . 432 "job_token = {$dbw->addQuotes( $uuid ) }, " . 433 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . 434 "job_attempts = job_attempts+1 " . 435 "WHERE ( " . 436 "job_cmd = {$dbw->addQuotes( $this->type )} " . 437 "AND job_token = {$dbw->addQuotes( '' )} " . 438 ") ORDER BY job_id ASC LIMIT 1", 439 __METHOD__ 440 ); 441 } else { 442 // Use a subquery to find the job, within an UPDATE to claim it. 443 // This uses as much of the DB wrapper functions as possible. 444 $dbw->update( 'job', 445 array( 446 'job_token' => $uuid, 447 'job_token_timestamp' => $dbw->timestamp(), 448 'job_attempts = job_attempts+1' ), 449 array( 'job_id = (' . 450 $dbw->selectSQLText( 'job', 'job_id', 451 array( 'job_cmd' => $this->type, 'job_token' => '' ), 452 __METHOD__, 453 array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) . 454 ')' 455 ), 456 __METHOD__ 457 ); 458 } 459 // Fetch any row that we just reserved... 460 if ( $dbw->affectedRows() ) { 461 $row = $dbw->selectRow( 'job', self::selectFields(), 462 array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ 463 ); 464 if ( !$row ) { // raced out by duplicate job removal 465 wfDebug( "Row deleted as duplicate by another process.\n" ); 466 } 467 } else { 468 break; // nothing to do 469 } 470 } while ( !$row ); 471 472 return $row; 473 } 474 475 /** 476 * @see JobQueue::doAck() 477 * @param Job $job 478 * @throws MWException 479 * @return Job|bool 480 */ 481 protected function doAck( Job $job ) { 482 if ( !isset( $job->metadata['id'] ) ) { 483 throw new MWException( "Job of type '{$job->getType()}' has no ID." ); 484 } 485 486 $dbw = $this->getMasterDB(); 487 try { 488 $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction 489 $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting 490 $dbw->clearFlag( DBO_TRX ); // make each query its own transaction 491 $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { 492 $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting 493 } ); 494 495 // Delete a row with a single DELETE without holding row locks over RTTs... 496 $dbw->delete( 'job', 497 array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); 498 } catch ( DBError $e ) { 499 $this->throwDBException( $e ); 500 } 501 502 return true; 503 } 504 505 /** 506 * @see JobQueue::doDeduplicateRootJob() 507 * @param Job $job 508 * @throws MWException 509 * @return bool 510 */ 511 protected function doDeduplicateRootJob( Job $job ) { 512 $params = $job->getParams(); 513 if ( !isset( $params['rootJobSignature'] ) ) { 514 throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); 515 } elseif ( !isset( $params['rootJobTimestamp'] ) ) { 516 throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); 517 } 518 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 519 // Callers should call batchInsert() and then this function so that if the insert 520 // fails, the de-duplication registration will be aborted. Since the insert is 521 // deferred till "transaction idle", do the same here, so that the ordering is 522 // maintained. Having only the de-duplication registration succeed would cause 523 // jobs to become no-ops without any actual jobs that made them redundant. 524 $dbw = $this->getMasterDB(); 525 $cache = $this->dupCache; 526 $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) { 527 $timestamp = $cache->get( $key ); // current last timestamp of this job 528 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 529 return true; // a newer version of this root job was enqueued 530 } 531 532 // Update the timestamp of the last root job started at the location... 533 return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); 534 } ); 535 536 return true; 537 } 538 539 /** 540 * @see JobQueue::doDelete() 541 * @return bool 542 */ 543 protected function doDelete() { 544 $dbw = $this->getMasterDB(); 545 try { 546 $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); 547 } catch ( DBError $e ) { 548 $this->throwDBException( $e ); 549 } 550 551 return true; 552 } 553 554 /** 555 * @see JobQueue::doWaitForBackups() 556 * @return void 557 */ 558 protected function doWaitForBackups() { 559 wfWaitForSlaves(); 560 } 561 562 /** 563 * @return array 564 */ 565 protected function doGetPeriodicTasks() { 566 return array( 567 'recycleAndDeleteStaleJobs' => array( 568 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), 569 'period' => ceil( $this->claimTTL / 2 ) 570 ) 571 ); 572 } 573 574 /** 575 * @return void 576 */ 577 protected function doFlushCaches() { 578 foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { 579 $this->cache->delete( $this->getCacheKey( $type ) ); 580 } 581 } 582 583 /** 584 * @see JobQueue::getAllQueuedJobs() 585 * @return Iterator 586 */ 587 public function getAllQueuedJobs() { 588 $dbr = $this->getSlaveDB(); 589 try { 590 return new MappedIterator( 591 $dbr->select( 'job', self::selectFields(), 592 array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), 593 function ( $row ) use ( $dbr ) { 594 $job = Job::factory( 595 $row->job_cmd, 596 Title::makeTitle( $row->job_namespace, $row->job_title ), 597 strlen( $row->job_params ) ? unserialize( $row->job_params ) : false 598 ); 599 $job->metadata['id'] = $row->job_id; 600 return $job; 601 } 602 ); 603 } catch ( DBError $e ) { 604 $this->throwDBException( $e ); 605 } 606 } 607 608 public function getCoalesceLocationInternal() { 609 return $this->cluster 610 ? "DBCluster:{$this->cluster}:{$this->wiki}" 611 : "LBFactory:{$this->wiki}"; 612 } 613 614 protected function doGetSiblingQueuesWithJobs( array $types ) { 615 $dbr = $this->getSlaveDB(); 616 $res = $dbr->select( 'job', 'DISTINCT job_cmd', 617 array( 'job_cmd' => $types ), __METHOD__ ); 618 619 $types = array(); 620 foreach ( $res as $row ) { 621 $types[] = $row->job_cmd; 622 } 623 624 return $types; 625 } 626 627 protected function doGetSiblingQueueSizes( array $types ) { 628 $dbr = $this->getSlaveDB(); 629 $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), 630 array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); 631 632 $sizes = array(); 633 foreach ( $res as $row ) { 634 $sizes[$row->job_cmd] = (int)$row->count; 635 } 636 637 return $sizes; 638 } 639 640 /** 641 * Recycle or destroy any jobs that have been claimed for too long 642 * 643 * @return int Number of jobs recycled/deleted 644 */ 645 public function recycleAndDeleteStaleJobs() { 646 $now = time(); 647 $count = 0; // affected rows 648 $dbw = $this->getMasterDB(); 649 650 try { 651 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { 652 return $count; // already in progress 653 } 654 655 // Remove claims on jobs acquired for too long if enabled... 656 if ( $this->claimTTL > 0 ) { 657 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); 658 // Get the IDs of jobs that have be claimed but not finished after too long. 659 // These jobs can be recycled into the queue by expiring the claim. Selecting 660 // the IDs first means that the UPDATE can be done by primary key (less deadlocks). 661 $res = $dbw->select( 'job', 'job_id', 662 array( 663 'job_cmd' => $this->type, 664 "job_token != {$dbw->addQuotes( '' )}", // was acquired 665 "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale 666 "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left 667 __METHOD__ 668 ); 669 $ids = array_map( 670 function ( $o ) { 671 return $o->job_id; 672 }, iterator_to_array( $res ) 673 ); 674 if ( count( $ids ) ) { 675 // Reset job_token for these jobs so that other runners will pick them up. 676 // Set the timestamp to the current time, as it is useful to now that the job 677 // was already tried before (the timestamp becomes the "released" time). 678 $dbw->update( 'job', 679 array( 680 'job_token' => '', 681 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release 682 array( 683 'job_id' => $ids ), 684 __METHOD__ 685 ); 686 $affected = $dbw->affectedRows(); 687 $count += $affected; 688 JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki ); 689 $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); 690 } 691 } 692 693 // Just destroy any stale jobs... 694 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); 695 $conds = array( 696 'job_cmd' => $this->type, 697 "job_token != {$dbw->addQuotes( '' )}", // was acquired 698 "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale 699 ); 700 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... 701 $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; 702 } 703 // Get the IDs of jobs that are considered stale and should be removed. Selecting 704 // the IDs first means that the UPDATE can be done by primary key (less deadlocks). 705 $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); 706 $ids = array_map( 707 function ( $o ) { 708 return $o->job_id; 709 }, iterator_to_array( $res ) 710 ); 711 if ( count( $ids ) ) { 712 $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); 713 $affected = $dbw->affectedRows(); 714 $count += $affected; 715 JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki ); 716 } 717 718 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); 719 } catch ( DBError $e ) { 720 $this->throwDBException( $e ); 721 } 722 723 return $count; 724 } 725 726 /** 727 * @param IJobSpecification $job 728 * @return array 729 */ 730 protected function insertFields( IJobSpecification $job ) { 731 $dbw = $this->getMasterDB(); 732 733 return array( 734 // Fields that describe the nature of the job 735 'job_cmd' => $job->getType(), 736 'job_namespace' => $job->getTitle()->getNamespace(), 737 'job_title' => $job->getTitle()->getDBkey(), 738 'job_params' => self::makeBlob( $job->getParams() ), 739 // Additional job metadata 740 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), 741 'job_timestamp' => $dbw->timestamp(), 742 'job_sha1' => wfBaseConvert( 743 sha1( serialize( $job->getDeduplicationInfo() ) ), 744 16, 36, 31 745 ), 746 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) 747 ); 748 } 749 750 /** 751 * @throws JobQueueConnectionError 752 * @return DBConnRef 753 */ 754 protected function getSlaveDB() { 755 try { 756 return $this->getDB( DB_SLAVE ); 757 } catch ( DBConnectionError $e ) { 758 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); 759 } 760 } 761 762 /** 763 * @throws JobQueueConnectionError 764 * @return DBConnRef 765 */ 766 protected function getMasterDB() { 767 try { 768 return $this->getDB( DB_MASTER ); 769 } catch ( DBConnectionError $e ) { 770 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); 771 } 772 } 773 774 /** 775 * @param int $index (DB_SLAVE/DB_MASTER) 776 * @return DBConnRef 777 */ 778 protected function getDB( $index ) { 779 $lb = ( $this->cluster !== false ) 780 ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) 781 : wfGetLB( $this->wiki ); 782 783 return $lb->getConnectionRef( $index, array(), $this->wiki ); 784 } 785 786 /** 787 * @param string $property 788 * @return string 789 */ 790 private function getCacheKey( $property ) { 791 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 792 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; 793 794 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property ); 795 } 796 797 /** 798 * @param array|bool $params 799 * @return string 800 */ 801 protected static function makeBlob( $params ) { 802 if ( $params !== false ) { 803 return serialize( $params ); 804 } else { 805 return ''; 806 } 807 } 808 809 /** 810 * @param string $blob 811 * @return bool|mixed 812 */ 813 protected static function extractBlob( $blob ) { 814 if ( (string)$blob !== '' ) { 815 return unserialize( $blob ); 816 } else { 817 return false; 818 } 819 } 820 821 /** 822 * @param DBError $e 823 * @throws JobQueueError 824 */ 825 protected function throwDBException( DBError $e ) { 826 throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); 827 } 828 829 /** 830 * Return the list of job fields that should be selected. 831 * @since 1.23 832 * @return array 833 */ 834 public static function selectFields() { 835 return array( 836 'job_id', 837 'job_cmd', 838 'job_namespace', 839 'job_title', 840 'job_timestamp', 841 'job_params', 842 'job_random', 843 'job_attempts', 844 'job_token', 845 'job_token_timestamp', 846 'job_sha1', 847 ); 848 } 849 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Fri Nov 28 14:03:12 2014 | Cross-referenced by PHPXref 0.7.1 |