MediaWiki  REL1_23
JobQueueFederated.php
Go to the documentation of this file.
00001 <?php
00049 class JobQueueFederated extends JobQueue {
00051     protected $partitionMap = array();
00052 
00054     protected $partitionQueues = array();
00055 
00057     protected $partitionPushRing;
00058 
00060     protected $cache;
00061 
00063     protected $maxPartitionsTry;
00064 
00065     const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
00066     const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
00067 
00088     protected function __construct( array $params ) {
00089         parent::__construct( $params );
00090         $section = isset( $params['sectionsByWiki'][$this->wiki] )
00091             ? $params['sectionsByWiki'][$this->wiki]
00092             : 'default';
00093         if ( !isset( $params['partitionsBySection'][$section] ) ) {
00094             throw new MWException( "No configuration for section '$section'." );
00095         }
00096         $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] )
00097             ? $params['maxPartitionsTry']
00098             : 2;
00099         // Get the full partition map
00100         $this->partitionMap = $params['partitionsBySection'][$section];
00101         arsort( $this->partitionMap, SORT_NUMERIC );
00102         // Get the partitions jobs can actually be pushed to
00103         $partitionPushMap = $this->partitionMap;
00104         if ( isset( $params['partitionsNoPush'] ) ) {
00105             foreach ( $params['partitionsNoPush'] as $partition ) {
00106                 unset( $partitionPushMap[$partition] );
00107             }
00108         }
00109         // Get the config to pass to merge into each partition queue config
00110         $baseConfig = $params;
00111         foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
00112             'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
00113         ) {
00114             unset( $baseConfig[$o] ); // partition queue doesn't care about this
00115         }
00116         // Get the partition queue objects
00117         foreach ( $this->partitionMap as $partition => $w ) {
00118             if ( !isset( $params['configByPartition'][$partition] ) ) {
00119                 throw new MWException( "No configuration for partition '$partition'." );
00120             }
00121             $this->partitionQueues[$partition] = JobQueue::factory(
00122                 $baseConfig + $params['configByPartition'][$partition] );
00123         }
00124         // Get the ring of partitions to push jobs into
00125         $this->partitionPushRing = new HashRing( $partitionPushMap );
00126         // Aggregate cache some per-queue values if there are multiple partition queues
00127         $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
00128     }
00129 
00130     protected function supportedOrders() {
00131         // No FIFO due to partitioning, though "rough timestamp order" is supported
00132         return array( 'undefined', 'random', 'timestamp' );
00133     }
00134 
00135     protected function optimalOrder() {
00136         return 'undefined'; // defer to the partitions
00137     }
00138 
00139     protected function supportsDelayedJobs() {
00140         return true; // defer checks to the partitions
00141     }
00142 
00143     protected function doIsEmpty() {
00144         $key = $this->getCacheKey( 'empty' );
00145 
00146         $isEmpty = $this->cache->get( $key );
00147         if ( $isEmpty === 'true' ) {
00148             return true;
00149         } elseif ( $isEmpty === 'false' ) {
00150             return false;
00151         }
00152 
00153         $empty = true;
00154         $failed = 0;
00155         foreach ( $this->partitionQueues as $queue ) {
00156             try {
00157                 $empty = $empty && $queue->doIsEmpty();
00158             } catch ( JobQueueError $e ) {
00159                 ++$failed;
00160                 MWExceptionHandler::logException( $e );
00161             }
00162         }
00163         $this->throwErrorIfAllPartitionsDown( $failed );
00164 
00165         $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
00166         return $empty;
00167     }
00168 
00169     protected function doGetSize() {
00170         return $this->getCrossPartitionSum( 'size', 'doGetSize' );
00171     }
00172 
00173     protected function doGetAcquiredCount() {
00174         return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
00175     }
00176 
00177     protected function doGetDelayedCount() {
00178         return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
00179     }
00180 
00181     protected function doGetAbandonedCount() {
00182         return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
00183     }
00184 
00190     protected function getCrossPartitionSum( $type, $method ) {
00191         $key = $this->getCacheKey( $type );
00192 
00193         $count = $this->cache->get( $key );
00194         if ( is_int( $count ) ) {
00195             return $count;
00196         }
00197 
00198         $failed = 0;
00199         foreach ( $this->partitionQueues as $queue ) {
00200             try {
00201                 $count += $queue->$method();
00202             } catch ( JobQueueError $e ) {
00203                 ++$failed;
00204                 MWExceptionHandler::logException( $e );
00205             }
00206         }
00207         $this->throwErrorIfAllPartitionsDown( $failed );
00208 
00209         $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
00210 
00211         return $count;
00212     }
00213 
00214     protected function doBatchPush( array $jobs, $flags ) {
00215         // Local ring variable that may be changed to point to a new ring on failure
00216         $partitionRing = $this->partitionPushRing;
00217         // Try to insert the jobs and update $partitionsTry on any failures.
00218         // Retry to insert any remaning jobs again, ignoring the bad partitions.
00219         $jobsLeft = $jobs;
00220         for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
00221             $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
00222         }
00223         if ( count( $jobsLeft ) ) {
00224             throw new JobQueueError(
00225                 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
00226         }
00227 
00228         return true;
00229     }
00230 
00238     protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
00239         $jobsLeft = array();
00240 
00241         // Because jobs are spread across partitions, per-job de-duplication needs
00242         // to use a consistent hash to avoid allowing duplicate jobs per partition.
00243         // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
00244         $uJobsByPartition = array(); // (partition name => job list)
00246         foreach ( $jobs as $key => $job ) {
00247             if ( $job->ignoreDuplicates() ) {
00248                 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
00249                 $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job;
00250                 unset( $jobs[$key] );
00251             }
00252         }
00253         // Get the batches of jobs that are not de-duplicated
00254         if ( $flags & self::QOS_ATOMIC ) {
00255             $nuJobBatches = array( $jobs ); // all or nothing
00256         } else {
00257             // Split the jobs into batches and spread them out over servers if there
00258             // are many jobs. This helps keep the partitions even. Otherwise, send all
00259             // the jobs to a single partition queue to avoids the extra connections.
00260             $nuJobBatches = array_chunk( $jobs, 300 );
00261         }
00262 
00263         // Insert the de-duplicated jobs into the queues...
00264         foreach ( $uJobsByPartition as $partition => $jobBatch ) {
00266             $queue = $this->partitionQueues[$partition];
00267             try {
00268                 $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
00269             } catch ( JobQueueError $e ) {
00270                 $ok = false;
00271                 MWExceptionHandler::logException( $e );
00272             }
00273             if ( $ok ) {
00274                 $key = $this->getCacheKey( 'empty' );
00275                 $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
00276             } else {
00277                 $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
00278                 if ( !$partitionRing ) {
00279                     throw new JobQueueError( "Could not insert job(s), no partitions available." );
00280                 }
00281                 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
00282             }
00283         }
00284 
00285         // Insert the jobs that are not de-duplicated into the queues...
00286         foreach ( $nuJobBatches as $jobBatch ) {
00287             $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() );
00288             $queue = $this->partitionQueues[$partition];
00289             try {
00290                 $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
00291             } catch ( JobQueueError $e ) {
00292                 $ok = false;
00293                 MWExceptionHandler::logException( $e );
00294             }
00295             if ( $ok ) {
00296                 $key = $this->getCacheKey( 'empty' );
00297                 $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
00298             } else {
00299                 $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
00300                 if ( !$partitionRing ) {
00301                     throw new JobQueueError( "Could not insert job(s), no partitions available." );
00302                 }
00303                 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
00304             }
00305         }
00306 
00307         return $jobsLeft;
00308     }
00309 
00310     protected function doPop() {
00311         $key = $this->getCacheKey( 'empty' );
00312 
00313         $isEmpty = $this->cache->get( $key );
00314         if ( $isEmpty === 'true' ) {
00315             return false;
00316         }
00317 
00318         $partitionsTry = $this->partitionMap; // (partition => weight)
00319 
00320         $failed = 0;
00321         while ( count( $partitionsTry ) ) {
00322             $partition = ArrayUtils::pickRandom( $partitionsTry );
00323             if ( $partition === false ) {
00324                 break; // all partitions at 0 weight
00325             }
00326 
00328             $queue = $this->partitionQueues[$partition];
00329             try {
00330                 $job = $queue->pop();
00331             } catch ( JobQueueError $e ) {
00332                 ++$failed;
00333                 MWExceptionHandler::logException( $e );
00334                 $job = false;
00335             }
00336             if ( $job ) {
00337                 $job->metadata['QueuePartition'] = $partition;
00338 
00339                 return $job;
00340             } else {
00341                 unset( $partitionsTry[$partition] ); // blacklist partition
00342             }
00343         }
00344         $this->throwErrorIfAllPartitionsDown( $failed );
00345 
00346         $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG );
00347 
00348         return false;
00349     }
00350 
00351     protected function doAck( Job $job ) {
00352         if ( !isset( $job->metadata['QueuePartition'] ) ) {
00353             throw new MWException( "The given job has no defined partition name." );
00354         }
00355 
00356         return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
00357     }
00358 
00359     protected function doIsRootJobOldDuplicate( Job $job ) {
00360         $params = $job->getRootJobParams();
00361         $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
00362         try {
00363             return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
00364         } catch ( JobQueueError $e ) {
00365             if ( isset( $partitions[1] ) ) { // check fallback partition
00366                 return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
00367             }
00368         }
00369 
00370         return false;
00371     }
00372 
00373     protected function doDeduplicateRootJob( Job $job ) {
00374         $params = $job->getRootJobParams();
00375         $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
00376         try {
00377             return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
00378         } catch ( JobQueueError $e ) {
00379             if ( isset( $partitions[1] ) ) { // check fallback partition
00380                 return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
00381             }
00382         }
00383 
00384         return false;
00385     }
00386 
00387     protected function doDelete() {
00388         $failed = 0;
00390         foreach ( $this->partitionQueues as $queue ) {
00391             try {
00392                 $queue->doDelete();
00393             } catch ( JobQueueError $e ) {
00394                 ++$failed;
00395                 MWExceptionHandler::logException( $e );
00396             }
00397         }
00398         $this->throwErrorIfAllPartitionsDown( $failed );
00399         return true;
00400     }
00401 
00402     protected function doWaitForBackups() {
00403         $failed = 0;
00405         foreach ( $this->partitionQueues as $queue ) {
00406             try {
00407                 $queue->waitForBackups();
00408             } catch ( JobQueueError $e ) {
00409                 ++$failed;
00410                 MWExceptionHandler::logException( $e );
00411             }
00412         }
00413         $this->throwErrorIfAllPartitionsDown( $failed );
00414     }
00415 
00416     protected function doGetPeriodicTasks() {
00417         $tasks = array();
00419         foreach ( $this->partitionQueues as $partition => $queue ) {
00420             foreach ( $queue->getPeriodicTasks() as $task => $def ) {
00421                 $tasks["{$partition}:{$task}"] = $def;
00422             }
00423         }
00424 
00425         return $tasks;
00426     }
00427 
00428     protected function doFlushCaches() {
00429         static $types = array(
00430             'empty',
00431             'size',
00432             'acquiredcount',
00433             'delayedcount',
00434             'abandonedcount'
00435         );
00436 
00437         foreach ( $types as $type ) {
00438             $this->cache->delete( $this->getCacheKey( $type ) );
00439         }
00440 
00442         foreach ( $this->partitionQueues as $queue ) {
00443             $queue->doFlushCaches();
00444         }
00445     }
00446 
00447     public function getAllQueuedJobs() {
00448         $iterator = new AppendIterator();
00449 
00451         foreach ( $this->partitionQueues as $queue ) {
00452             $iterator->append( $queue->getAllQueuedJobs() );
00453         }
00454 
00455         return $iterator;
00456     }
00457 
00458     public function getAllDelayedJobs() {
00459         $iterator = new AppendIterator();
00460 
00462         foreach ( $this->partitionQueues as $queue ) {
00463             $iterator->append( $queue->getAllDelayedJobs() );
00464         }
00465 
00466         return $iterator;
00467     }
00468 
00469     public function getCoalesceLocationInternal() {
00470         return "JobQueueFederated:wiki:{$this->wiki}" .
00471             sha1( serialize( array_keys( $this->partitionMap ) ) );
00472     }
00473 
00474     protected function doGetSiblingQueuesWithJobs( array $types ) {
00475         $result = array();
00476 
00477         $failed = 0;
00479         foreach ( $this->partitionQueues as $queue ) {
00480             try {
00481                 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
00482                 if ( is_array( $nonEmpty ) ) {
00483                     $result = array_unique( array_merge( $result, $nonEmpty ) );
00484                 } else {
00485                     return null; // not supported on all partitions; bail
00486                 }
00487                 if ( count( $result ) == count( $types ) ) {
00488                     break; // short-circuit
00489                 }
00490             } catch ( JobQueueError $e ) {
00491                 ++$failed;
00492                 MWExceptionHandler::logException( $e );
00493             }
00494         }
00495         $this->throwErrorIfAllPartitionsDown( $failed );
00496 
00497         return array_values( $result );
00498     }
00499 
00500     protected function doGetSiblingQueueSizes( array $types ) {
00501         $result = array();
00502         $failed = 0;
00504         foreach ( $this->partitionQueues as $queue ) {
00505             try {
00506                 $sizes = $queue->doGetSiblingQueueSizes( $types );
00507                 if ( is_array( $sizes ) ) {
00508                     foreach ( $sizes as $type => $size ) {
00509                         $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
00510                     }
00511                 } else {
00512                     return null; // not supported on all partitions; bail
00513                 }
00514             } catch ( JobQueueError $e ) {
00515                 ++$failed;
00516                 MWExceptionHandler::logException( $e );
00517             }
00518         }
00519         $this->throwErrorIfAllPartitionsDown( $failed );
00520 
00521         return $result;
00522     }
00523 
00531     protected function throwErrorIfAllPartitionsDown( $down ) {
00532         if ( $down >= count( $this->partitionQueues ) ) {
00533             throw new JobQueueError( 'No queue partitions available.' );
00534         }
00535     }
00536 
00537     public function setTestingPrefix( $key ) {
00539         foreach ( $this->partitionQueues as $queue ) {
00540             $queue->setTestingPrefix( $key );
00541         }
00542     }
00543 
00548     private function getCacheKey( $property ) {
00549         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00550 
00551         return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
00552     }
00553 }