MediaWiki  REL1_22
JobQueueFederated.php
Go to the documentation of this file.
00001 <?php
00049 class JobQueueFederated extends JobQueue {
00051     protected $partitionMap = array();
00053     protected $partitionQueues = array();
00055     protected $partitionPushRing;
00057     protected $cache;
00058 
00059     const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
00060     const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
00061 
00077     protected function __construct( array $params ) {
00078         parent::__construct( $params );
00079         $section = isset( $params['sectionsByWiki'][$this->wiki] )
00080             ? $params['sectionsByWiki'][$this->wiki]
00081             : 'default';
00082         if ( !isset( $params['partitionsBySection'][$section] ) ) {
00083             throw new MWException( "No configuration for section '$section'." );
00084         }
00085         // Get the full partition map
00086         $this->partitionMap = $params['partitionsBySection'][$section];
00087         arsort( $this->partitionMap, SORT_NUMERIC );
00088         // Get the partitions jobs can actually be pushed to
00089         $partitionPushMap = $this->partitionMap;
00090         if ( isset( $params['partitionsNoPush'] ) ) {
00091             foreach ( $params['partitionsNoPush'] as $partition ) {
00092                 unset( $partitionPushMap[$partition] );
00093             }
00094         }
00095         // Get the config to pass to merge into each partition queue config
00096         $baseConfig = $params;
00097         foreach ( array( 'class', 'sectionsByWiki',
00098             'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o )
00099         {
00100             unset( $baseConfig[$o] );
00101         }
00102         // Get the partition queue objects
00103         foreach ( $this->partitionMap as $partition => $w ) {
00104             if ( !isset( $params['configByPartition'][$partition] ) ) {
00105                 throw new MWException( "No configuration for partition '$partition'." );
00106             }
00107             $this->partitionQueues[$partition] = JobQueue::factory(
00108                 $baseConfig + $params['configByPartition'][$partition] );
00109         }
00110         // Get the ring of partitions to push jobs into
00111         $this->partitionPushRing = new HashRing( $partitionPushMap );
00112         // Aggregate cache some per-queue values if there are multiple partition queues
00113         $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
00114     }
00115 
00116     protected function supportedOrders() {
00117         // No FIFO due to partitioning, though "rough timestamp order" is supported
00118         return array( 'undefined', 'random', 'timestamp' );
00119     }
00120 
00121     protected function optimalOrder() {
00122         return 'undefined'; // defer to the partitions
00123     }
00124 
00125     protected function supportsDelayedJobs() {
00126         return true; // defer checks to the partitions
00127     }
00128 
00129     protected function doIsEmpty() {
00130         $key = $this->getCacheKey( 'empty' );
00131 
00132         $isEmpty = $this->cache->get( $key );
00133         if ( $isEmpty === 'true' ) {
00134             return true;
00135         } elseif ( $isEmpty === 'false' ) {
00136             return false;
00137         }
00138 
00139         foreach ( $this->partitionQueues as $queue ) {
00140             try {
00141                 if ( !$queue->doIsEmpty() ) {
00142                     $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
00143                     return false;
00144                 }
00145             } catch ( JobQueueError $e ) {
00146                 MWExceptionHandler::logException( $e );
00147             }
00148         }
00149 
00150         $this->cache->add( $key, 'true', self::CACHE_TTL_LONG );
00151         return true;
00152     }
00153 
00154     protected function doGetSize() {
00155         return $this->getCrossPartitionSum( 'size', 'doGetSize' );
00156     }
00157 
00158     protected function doGetAcquiredCount() {
00159         return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
00160     }
00161 
00162     protected function doGetDelayedCount() {
00163         return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
00164     }
00165 
00166     protected function doGetAbandonedCount() {
00167         return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
00168     }
00169 
00175     protected function getCrossPartitionSum( $type, $method ) {
00176         $key = $this->getCacheKey( $type );
00177 
00178         $count = $this->cache->get( $key );
00179         if ( is_int( $count ) ) {
00180             return $count;
00181         }
00182 
00183         $count = 0;
00184         foreach ( $this->partitionQueues as $queue ) {
00185             try {
00186                 $count += $queue->$method();
00187             } catch ( JobQueueError $e ) {
00188                 MWExceptionHandler::logException( $e );
00189             }
00190         }
00191 
00192         $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
00193         return $count;
00194     }
00195 
00196     protected function doBatchPush( array $jobs, $flags ) {
00197         if ( !count( $jobs ) ) {
00198             return true; // nothing to do
00199         }
00200         // Local ring variable that may be changed to point to a new ring on failure
00201         $partitionRing = $this->partitionPushRing;
00202         // Try to insert the jobs and update $partitionsTry on any failures
00203         $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags );
00204         if ( count( $jobsLeft ) ) { // some jobs failed to insert?
00205             // Try to insert the remaning jobs once more, ignoring the bad partitions
00206             return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) );
00207         }
00208         return true;
00209     }
00210 
00217     protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
00218         $jobsLeft = array();
00219 
00220         // Because jobs are spread across partitions, per-job de-duplication needs
00221         // to use a consistent hash to avoid allowing duplicate jobs per partition.
00222         // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
00223         $uJobsByPartition = array(); // (partition name => job list)
00224         foreach ( $jobs as $key => $job ) {
00225             if ( $job->ignoreDuplicates() ) {
00226                 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
00227                 $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job;
00228                 unset( $jobs[$key] );
00229             }
00230         }
00231         // Get the batches of jobs that are not de-duplicated
00232         if ( $flags & self::QOS_ATOMIC ) {
00233             $nuJobBatches = array( $jobs ); // all or nothing
00234         } else {
00235             // Split the jobs into batches and spread them out over servers if there
00236             // are many jobs. This helps keep the partitions even. Otherwise, send all
00237             // the jobs to a single partition queue to avoids the extra connections.
00238             $nuJobBatches = array_chunk( $jobs, 300 );
00239         }
00240 
00241         // Insert the de-duplicated jobs into the queues...
00242         foreach ( $uJobsByPartition as $partition => $jobBatch ) {
00243             $queue = $this->partitionQueues[$partition];
00244             try {
00245                 $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
00246             } catch ( JobQueueError $e ) {
00247                 $ok = false;
00248                 MWExceptionHandler::logException( $e );
00249             }
00250             if ( $ok ) {
00251                 $key = $this->getCacheKey( 'empty' );
00252                 $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
00253             } else {
00254                 $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
00255                 if ( !$partitionRing ) {
00256                     throw new JobQueueError( "Could not insert job(s), all partitions are down." );
00257                 }
00258                 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
00259             }
00260         }
00261 
00262         // Insert the jobs that are not de-duplicated into the queues...
00263         foreach ( $nuJobBatches as $jobBatch ) {
00264             $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() );
00265             $queue = $this->partitionQueues[$partition];
00266             try {
00267                 $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
00268             } catch ( JobQueueError $e ) {
00269                 $ok = false;
00270                 MWExceptionHandler::logException( $e );
00271             }
00272             if ( $ok ) {
00273                 $key = $this->getCacheKey( 'empty' );
00274                 $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
00275             } else {
00276                 $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
00277                 if ( !$partitionRing ) {
00278                     throw new JobQueueError( "Could not insert job(s), all partitions are down." );
00279                 }
00280                 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
00281             }
00282         }
00283 
00284         return $jobsLeft;
00285     }
00286 
00287     protected function doPop() {
00288         $key = $this->getCacheKey( 'empty' );
00289 
00290         $isEmpty = $this->cache->get( $key );
00291         if ( $isEmpty === 'true' ) {
00292             return false;
00293         }
00294 
00295         $partitionsTry = $this->partitionMap; // (partition => weight)
00296 
00297         while ( count( $partitionsTry ) ) {
00298             $partition = ArrayUtils::pickRandom( $partitionsTry );
00299             if ( $partition === false ) {
00300                 break; // all partitions at 0 weight
00301             }
00302             $queue = $this->partitionQueues[$partition];
00303             try {
00304                 $job = $queue->pop();
00305             } catch ( JobQueueError $e ) {
00306                 $job = false;
00307                 MWExceptionHandler::logException( $e );
00308             }
00309             if ( $job ) {
00310                 $job->metadata['QueuePartition'] = $partition;
00311                 return $job;
00312             } else {
00313                 unset( $partitionsTry[$partition] ); // blacklist partition
00314             }
00315         }
00316 
00317         $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG );
00318         return false;
00319     }
00320 
00321     protected function doAck( Job $job ) {
00322         if ( !isset( $job->metadata['QueuePartition'] ) ) {
00323             throw new MWException( "The given job has no defined partition name." );
00324         }
00325         return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
00326     }
00327 
00328     protected function doIsRootJobOldDuplicate( Job $job ) {
00329         $params = $job->getRootJobParams();
00330         $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
00331         try {
00332             return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
00333         } catch ( JobQueueError $e ) {
00334             if ( isset( $partitions[1] ) ) { // check fallback partition
00335                 return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
00336             }
00337         }
00338         return false;
00339     }
00340 
00341     protected function doDeduplicateRootJob( Job $job ) {
00342         $params = $job->getRootJobParams();
00343         $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
00344         try {
00345             return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
00346         } catch ( JobQueueError $e ) {
00347             if ( isset( $partitions[1] ) ) { // check fallback partition
00348                 return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
00349             }
00350         }
00351         return false;
00352     }
00353 
00354     protected function doDelete() {
00355         foreach ( $this->partitionQueues as $queue ) {
00356             try {
00357                 $queue->doDelete();
00358             } catch ( JobQueueError $e ) {
00359                 MWExceptionHandler::logException( $e );
00360             }
00361         }
00362     }
00363 
00364     protected function doWaitForBackups() {
00365         foreach ( $this->partitionQueues as $queue ) {
00366             try {
00367                 $queue->waitForBackups();
00368             } catch ( JobQueueError $e ) {
00369                 MWExceptionHandler::logException( $e );
00370             }
00371         }
00372     }
00373 
00374     protected function doGetPeriodicTasks() {
00375         $tasks = array();
00376         foreach ( $this->partitionQueues as $partition => $queue ) {
00377             foreach ( $queue->getPeriodicTasks() as $task => $def ) {
00378                 $tasks["{$partition}:{$task}"] = $def;
00379             }
00380         }
00381         return $tasks;
00382     }
00383 
00384     protected function doFlushCaches() {
00385         static $types = array(
00386             'empty',
00387             'size',
00388             'acquiredcount',
00389             'delayedcount',
00390             'abandonedcount'
00391         );
00392         foreach ( $types as $type ) {
00393             $this->cache->delete( $this->getCacheKey( $type ) );
00394         }
00395         foreach ( $this->partitionQueues as $queue ) {
00396             $queue->doFlushCaches();
00397         }
00398     }
00399 
00400     public function getAllQueuedJobs() {
00401         $iterator = new AppendIterator();
00402         foreach ( $this->partitionQueues as $queue ) {
00403             $iterator->append( $queue->getAllQueuedJobs() );
00404         }
00405         return $iterator;
00406     }
00407 
00408     public function getAllDelayedJobs() {
00409         $iterator = new AppendIterator();
00410         foreach ( $this->partitionQueues as $queue ) {
00411             $iterator->append( $queue->getAllDelayedJobs() );
00412         }
00413         return $iterator;
00414     }
00415 
00416     public function getCoalesceLocationInternal() {
00417         return "JobQueueFederated:wiki:{$this->wiki}" .
00418             sha1( serialize( array_keys( $this->partitionMap ) ) );
00419     }
00420 
00421     protected function doGetSiblingQueuesWithJobs( array $types ) {
00422         $result = array();
00423         foreach ( $this->partitionQueues as $queue ) {
00424             try {
00425                 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
00426                 if ( is_array( $nonEmpty ) ) {
00427                     $result = array_unique( array_merge( $result, $nonEmpty ) );
00428                 } else {
00429                     return null; // not supported on all partitions; bail
00430                 }
00431                 if ( count( $result ) == count( $types ) ) {
00432                     break; // short-circuit
00433                 }
00434             } catch ( JobQueueError $e ) {
00435                 MWExceptionHandler::logException( $e );
00436             }
00437         }
00438         return array_values( $result );
00439     }
00440 
00441     protected function doGetSiblingQueueSizes( array $types ) {
00442         $result = array();
00443         foreach ( $this->partitionQueues as $queue ) {
00444             try {
00445                 $sizes = $queue->doGetSiblingQueueSizes( $types );
00446                 if ( is_array( $sizes ) ) {
00447                     foreach ( $sizes as $type => $size ) {
00448                         $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
00449                     }
00450                 } else {
00451                     return null; // not supported on all partitions; bail
00452                 }
00453             } catch ( JobQueueError $e ) {
00454                 MWExceptionHandler::logException( $e );
00455             }
00456         }
00457         return $result;
00458     }
00459 
00460     public function setTestingPrefix( $key ) {
00461         foreach ( $this->partitionQueues as $queue ) {
00462             $queue->setTestingPrefix( $key );
00463         }
00464     }
00465 
00469     private function getCacheKey( $property ) {
00470         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00471         return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
00472     }
00473 }