MediaWiki  REL1_24
JobQueueFederated.php
Go to the documentation of this file.
00001 <?php
00049 class JobQueueFederated extends JobQueue {
00051     protected $partitionRing;
00053     protected $partitionPushRing;
00055     protected $partitionQueues = array();
00056 
00058     protected $cache;
00059 
00061     protected $maxPartitionsTry;
00062 
00063     const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
00064     const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
00065 
00085     protected function __construct( array $params ) {
00086         parent::__construct( $params );
00087         $section = isset( $params['sectionsByWiki'][$this->wiki] )
00088             ? $params['sectionsByWiki'][$this->wiki]
00089             : 'default';
00090         if ( !isset( $params['partitionsBySection'][$section] ) ) {
00091             throw new MWException( "No configuration for section '$section'." );
00092         }
00093         $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] )
00094             ? $params['maxPartitionsTry']
00095             : 2;
00096         // Get the full partition map
00097         $partitionMap = $params['partitionsBySection'][$section];
00098         arsort( $partitionMap, SORT_NUMERIC );
00099         // Get the partitions jobs can actually be pushed to
00100         $partitionPushMap = $partitionMap;
00101         if ( isset( $params['partitionsNoPush'] ) ) {
00102             foreach ( $params['partitionsNoPush'] as $partition ) {
00103                 unset( $partitionPushMap[$partition] );
00104             }
00105         }
00106         // Get the config to pass to merge into each partition queue config
00107         $baseConfig = $params;
00108         foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
00109             'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
00110         ) {
00111             unset( $baseConfig[$o] ); // partition queue doesn't care about this
00112         }
00113         // Get the partition queue objects
00114         foreach ( $partitionMap as $partition => $w ) {
00115             if ( !isset( $params['configByPartition'][$partition] ) ) {
00116                 throw new MWException( "No configuration for partition '$partition'." );
00117             }
00118             $this->partitionQueues[$partition] = JobQueue::factory(
00119                 $baseConfig + $params['configByPartition'][$partition] );
00120         }
00121         // Ring of all partitions
00122         $this->partitionRing = new HashRing( $partitionMap );
00123         // Get the ring of partitions to push jobs into
00124         if ( count( $partitionPushMap ) === count( $partitionMap ) ) {
00125             $this->partitionPushRing = clone $this->partitionRing; // faster
00126         } else {
00127             $this->partitionPushRing = new HashRing( $partitionPushMap );
00128         }
00129         // Aggregate cache some per-queue values if there are multiple partition queues
00130         $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
00131     }
00132 
00133     protected function supportedOrders() {
00134         // No FIFO due to partitioning, though "rough timestamp order" is supported
00135         return array( 'undefined', 'random', 'timestamp' );
00136     }
00137 
00138     protected function optimalOrder() {
00139         return 'undefined'; // defer to the partitions
00140     }
00141 
00142     protected function supportsDelayedJobs() {
00143         return true; // defer checks to the partitions
00144     }
00145 
00146     protected function doIsEmpty() {
00147         $key = $this->getCacheKey( 'empty' );
00148 
00149         $isEmpty = $this->cache->get( $key );
00150         if ( $isEmpty === 'true' ) {
00151             return true;
00152         } elseif ( $isEmpty === 'false' ) {
00153             return false;
00154         }
00155 
00156         $empty = true;
00157         $failed = 0;
00158         foreach ( $this->partitionQueues as $queue ) {
00159             try {
00160                 $empty = $empty && $queue->doIsEmpty();
00161             } catch ( JobQueueError $e ) {
00162                 ++$failed;
00163                 MWExceptionHandler::logException( $e );
00164             }
00165         }
00166         $this->throwErrorIfAllPartitionsDown( $failed );
00167 
00168         $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
00169         return $empty;
00170     }
00171 
00172     protected function doGetSize() {
00173         return $this->getCrossPartitionSum( 'size', 'doGetSize' );
00174     }
00175 
00176     protected function doGetAcquiredCount() {
00177         return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
00178     }
00179 
00180     protected function doGetDelayedCount() {
00181         return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
00182     }
00183 
00184     protected function doGetAbandonedCount() {
00185         return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
00186     }
00187 
00193     protected function getCrossPartitionSum( $type, $method ) {
00194         $key = $this->getCacheKey( $type );
00195 
00196         $count = $this->cache->get( $key );
00197         if ( $count !== false ) {
00198             return $count;
00199         }
00200 
00201         $failed = 0;
00202         foreach ( $this->partitionQueues as $queue ) {
00203             try {
00204                 $count += $queue->$method();
00205             } catch ( JobQueueError $e ) {
00206                 ++$failed;
00207                 MWExceptionHandler::logException( $e );
00208             }
00209         }
00210         $this->throwErrorIfAllPartitionsDown( $failed );
00211 
00212         $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
00213 
00214         return $count;
00215     }
00216 
00217     protected function doBatchPush( array $jobs, $flags ) {
00218         // Local ring variable that may be changed to point to a new ring on failure
00219         $partitionRing = $this->partitionPushRing;
00220         // Try to insert the jobs and update $partitionsTry on any failures.
00221         // Retry to insert any remaning jobs again, ignoring the bad partitions.
00222         $jobsLeft = $jobs;
00223         // @codingStandardsIgnoreStart Generic.CodeAnalysis.ForLoopWithTestFunctionCall.NotAllowed
00224         for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
00225             // @codingStandardsIgnoreEnd
00226             try {
00227                 $partitionRing->getLiveRing();
00228             } catch ( UnexpectedValueException $e ) {
00229                 break; // all servers down; nothing to insert to
00230             }
00231             $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
00232         }
00233         if ( count( $jobsLeft ) ) {
00234             throw new JobQueueError(
00235                 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
00236         }
00237     }
00238 
00246     protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
00247         $jobsLeft = array();
00248 
00249         // Because jobs are spread across partitions, per-job de-duplication needs
00250         // to use a consistent hash to avoid allowing duplicate jobs per partition.
00251         // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
00252         $uJobsByPartition = array(); // (partition name => job list)
00254         foreach ( $jobs as $key => $job ) {
00255             if ( $job->ignoreDuplicates() ) {
00256                 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
00257                 $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
00258                 unset( $jobs[$key] );
00259             }
00260         }
00261         // Get the batches of jobs that are not de-duplicated
00262         if ( $flags & self::QOS_ATOMIC ) {
00263             $nuJobBatches = array( $jobs ); // all or nothing
00264         } else {
00265             // Split the jobs into batches and spread them out over servers if there
00266             // are many jobs. This helps keep the partitions even. Otherwise, send all
00267             // the jobs to a single partition queue to avoids the extra connections.
00268             $nuJobBatches = array_chunk( $jobs, 300 );
00269         }
00270 
00271         // Insert the de-duplicated jobs into the queues...
00272         foreach ( $uJobsByPartition as $partition => $jobBatch ) {
00274             $queue = $this->partitionQueues[$partition];
00275             try {
00276                 $ok = true;
00277                 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
00278             } catch ( JobQueueError $e ) {
00279                 $ok = false;
00280                 MWExceptionHandler::logException( $e );
00281             }
00282             if ( $ok ) {
00283                 $key = $this->getCacheKey( 'empty' );
00284                 $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
00285             } else {
00286                 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
00287                     throw new JobQueueError( "Could not insert job(s), no partitions available." );
00288                 }
00289                 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
00290             }
00291         }
00292 
00293         // Insert the jobs that are not de-duplicated into the queues...
00294         foreach ( $nuJobBatches as $jobBatch ) {
00295             $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
00296             $queue = $this->partitionQueues[$partition];
00297             try {
00298                 $ok = true;
00299                 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
00300             } catch ( JobQueueError $e ) {
00301                 $ok = false;
00302                 MWExceptionHandler::logException( $e );
00303             }
00304             if ( $ok ) {
00305                 $key = $this->getCacheKey( 'empty' );
00306                 $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
00307             } else {
00308                 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
00309                     throw new JobQueueError( "Could not insert job(s), no partitions available." );
00310                 }
00311                 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
00312             }
00313         }
00314 
00315         return $jobsLeft;
00316     }
00317 
00318     protected function doPop() {
00319         $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
00320 
00321         $failed = 0;
00322         while ( count( $partitionsTry ) ) {
00323             $partition = ArrayUtils::pickRandom( $partitionsTry );
00324             if ( $partition === false ) {
00325                 break; // all partitions at 0 weight
00326             }
00327 
00329             $queue = $this->partitionQueues[$partition];
00330             try {
00331                 $job = $queue->pop();
00332             } catch ( JobQueueError $e ) {
00333                 ++$failed;
00334                 MWExceptionHandler::logException( $e );
00335                 $job = false;
00336             }
00337             if ( $job ) {
00338                 $job->metadata['QueuePartition'] = $partition;
00339 
00340                 return $job;
00341             } else {
00342                 unset( $partitionsTry[$partition] ); // blacklist partition
00343             }
00344         }
00345         $this->throwErrorIfAllPartitionsDown( $failed );
00346 
00347         $key = $this->getCacheKey( 'empty' );
00348         $this->cache->set( $key, 'true', self::CACHE_TTL_LONG );
00349 
00350         return false;
00351     }
00352 
00353     protected function doAck( Job $job ) {
00354         if ( !isset( $job->metadata['QueuePartition'] ) ) {
00355             throw new MWException( "The given job has no defined partition name." );
00356         }
00357 
00358         return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
00359     }
00360 
00361     protected function doIsRootJobOldDuplicate( Job $job ) {
00362         $params = $job->getRootJobParams();
00363         $sigature = $params['rootJobSignature'];
00364         $partition = $this->partitionPushRing->getLiveLocation( $sigature );
00365         try {
00366             return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
00367         } catch ( JobQueueError $e ) {
00368             if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
00369                 $partition = $this->partitionPushRing->getLiveLocation( $sigature );
00370                 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
00371             }
00372         }
00373 
00374         return false;
00375     }
00376 
00377     protected function doDeduplicateRootJob( Job $job ) {
00378         $params = $job->getRootJobParams();
00379         $sigature = $params['rootJobSignature'];
00380         $partition = $this->partitionPushRing->getLiveLocation( $sigature );
00381         try {
00382             return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
00383         } catch ( JobQueueError $e ) {
00384             if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
00385                 $partition = $this->partitionPushRing->getLiveLocation( $sigature );
00386                 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
00387             }
00388         }
00389 
00390         return false;
00391     }
00392 
00393     protected function doDelete() {
00394         $failed = 0;
00396         foreach ( $this->partitionQueues as $queue ) {
00397             try {
00398                 $queue->doDelete();
00399             } catch ( JobQueueError $e ) {
00400                 ++$failed;
00401                 MWExceptionHandler::logException( $e );
00402             }
00403         }
00404         $this->throwErrorIfAllPartitionsDown( $failed );
00405         return true;
00406     }
00407 
00408     protected function doWaitForBackups() {
00409         $failed = 0;
00411         foreach ( $this->partitionQueues as $queue ) {
00412             try {
00413                 $queue->waitForBackups();
00414             } catch ( JobQueueError $e ) {
00415                 ++$failed;
00416                 MWExceptionHandler::logException( $e );
00417             }
00418         }
00419         $this->throwErrorIfAllPartitionsDown( $failed );
00420     }
00421 
00422     protected function doGetPeriodicTasks() {
00423         $tasks = array();
00425         foreach ( $this->partitionQueues as $partition => $queue ) {
00426             foreach ( $queue->getPeriodicTasks() as $task => $def ) {
00427                 $tasks["{$partition}:{$task}"] = $def;
00428             }
00429         }
00430 
00431         return $tasks;
00432     }
00433 
00434     protected function doFlushCaches() {
00435         static $types = array(
00436             'empty',
00437             'size',
00438             'acquiredcount',
00439             'delayedcount',
00440             'abandonedcount'
00441         );
00442 
00443         foreach ( $types as $type ) {
00444             $this->cache->delete( $this->getCacheKey( $type ) );
00445         }
00446 
00448         foreach ( $this->partitionQueues as $queue ) {
00449             $queue->doFlushCaches();
00450         }
00451     }
00452 
00453     public function getAllQueuedJobs() {
00454         $iterator = new AppendIterator();
00455 
00457         foreach ( $this->partitionQueues as $queue ) {
00458             $iterator->append( $queue->getAllQueuedJobs() );
00459         }
00460 
00461         return $iterator;
00462     }
00463 
00464     public function getAllDelayedJobs() {
00465         $iterator = new AppendIterator();
00466 
00468         foreach ( $this->partitionQueues as $queue ) {
00469             $iterator->append( $queue->getAllDelayedJobs() );
00470         }
00471 
00472         return $iterator;
00473     }
00474 
00475     public function getCoalesceLocationInternal() {
00476         return "JobQueueFederated:wiki:{$this->wiki}" .
00477             sha1( serialize( array_keys( $this->partitionQueues ) ) );
00478     }
00479 
00480     protected function doGetSiblingQueuesWithJobs( array $types ) {
00481         $result = array();
00482 
00483         $failed = 0;
00485         foreach ( $this->partitionQueues as $queue ) {
00486             try {
00487                 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
00488                 if ( is_array( $nonEmpty ) ) {
00489                     $result = array_unique( array_merge( $result, $nonEmpty ) );
00490                 } else {
00491                     return null; // not supported on all partitions; bail
00492                 }
00493                 if ( count( $result ) == count( $types ) ) {
00494                     break; // short-circuit
00495                 }
00496             } catch ( JobQueueError $e ) {
00497                 ++$failed;
00498                 MWExceptionHandler::logException( $e );
00499             }
00500         }
00501         $this->throwErrorIfAllPartitionsDown( $failed );
00502 
00503         return array_values( $result );
00504     }
00505 
00506     protected function doGetSiblingQueueSizes( array $types ) {
00507         $result = array();
00508         $failed = 0;
00510         foreach ( $this->partitionQueues as $queue ) {
00511             try {
00512                 $sizes = $queue->doGetSiblingQueueSizes( $types );
00513                 if ( is_array( $sizes ) ) {
00514                     foreach ( $sizes as $type => $size ) {
00515                         $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
00516                     }
00517                 } else {
00518                     return null; // not supported on all partitions; bail
00519                 }
00520             } catch ( JobQueueError $e ) {
00521                 ++$failed;
00522                 MWExceptionHandler::logException( $e );
00523             }
00524         }
00525         $this->throwErrorIfAllPartitionsDown( $failed );
00526 
00527         return $result;
00528     }
00529 
00537     protected function throwErrorIfAllPartitionsDown( $down ) {
00538         if ( $down >= count( $this->partitionQueues ) ) {
00539             throw new JobQueueError( 'No queue partitions available.' );
00540         }
00541     }
00542 
00543     public function setTestingPrefix( $key ) {
00545         foreach ( $this->partitionQueues as $queue ) {
00546             $queue->setTestingPrefix( $key );
00547         }
00548     }
00549 
00554     private function getCacheKey( $property ) {
00555         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00556 
00557         return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
00558     }
00559 }