MediaWiki
REL1_22
|
00001 <?php 00049 class JobQueueFederated extends JobQueue { 00051 protected $partitionMap = array(); 00053 protected $partitionQueues = array(); 00055 protected $partitionPushRing; 00057 protected $cache; 00058 00059 const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating 00060 const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date 00061 00077 protected function __construct( array $params ) { 00078 parent::__construct( $params ); 00079 $section = isset( $params['sectionsByWiki'][$this->wiki] ) 00080 ? $params['sectionsByWiki'][$this->wiki] 00081 : 'default'; 00082 if ( !isset( $params['partitionsBySection'][$section] ) ) { 00083 throw new MWException( "No configuration for section '$section'." ); 00084 } 00085 // Get the full partition map 00086 $this->partitionMap = $params['partitionsBySection'][$section]; 00087 arsort( $this->partitionMap, SORT_NUMERIC ); 00088 // Get the partitions jobs can actually be pushed to 00089 $partitionPushMap = $this->partitionMap; 00090 if ( isset( $params['partitionsNoPush'] ) ) { 00091 foreach ( $params['partitionsNoPush'] as $partition ) { 00092 unset( $partitionPushMap[$partition] ); 00093 } 00094 } 00095 // Get the config to pass to merge into each partition queue config 00096 $baseConfig = $params; 00097 foreach ( array( 'class', 'sectionsByWiki', 00098 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o ) 00099 { 00100 unset( $baseConfig[$o] ); 00101 } 00102 // Get the partition queue objects 00103 foreach ( $this->partitionMap as $partition => $w ) { 00104 if ( !isset( $params['configByPartition'][$partition] ) ) { 00105 throw new MWException( "No configuration for partition '$partition'." ); 00106 } 00107 $this->partitionQueues[$partition] = JobQueue::factory( 00108 $baseConfig + $params['configByPartition'][$partition] ); 00109 } 00110 // Get the ring of partitions to push jobs into 00111 $this->partitionPushRing = new HashRing( $partitionPushMap ); 00112 // Aggregate cache some per-queue values if there are multiple partition queues 00113 $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); 00114 } 00115 00116 protected function supportedOrders() { 00117 // No FIFO due to partitioning, though "rough timestamp order" is supported 00118 return array( 'undefined', 'random', 'timestamp' ); 00119 } 00120 00121 protected function optimalOrder() { 00122 return 'undefined'; // defer to the partitions 00123 } 00124 00125 protected function supportsDelayedJobs() { 00126 return true; // defer checks to the partitions 00127 } 00128 00129 protected function doIsEmpty() { 00130 $key = $this->getCacheKey( 'empty' ); 00131 00132 $isEmpty = $this->cache->get( $key ); 00133 if ( $isEmpty === 'true' ) { 00134 return true; 00135 } elseif ( $isEmpty === 'false' ) { 00136 return false; 00137 } 00138 00139 foreach ( $this->partitionQueues as $queue ) { 00140 try { 00141 if ( !$queue->doIsEmpty() ) { 00142 $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); 00143 return false; 00144 } 00145 } catch ( JobQueueError $e ) { 00146 MWExceptionHandler::logException( $e ); 00147 } 00148 } 00149 00150 $this->cache->add( $key, 'true', self::CACHE_TTL_LONG ); 00151 return true; 00152 } 00153 00154 protected function doGetSize() { 00155 return $this->getCrossPartitionSum( 'size', 'doGetSize' ); 00156 } 00157 00158 protected function doGetAcquiredCount() { 00159 return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); 00160 } 00161 00162 protected function doGetDelayedCount() { 00163 return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); 00164 } 00165 00166 protected function doGetAbandonedCount() { 00167 return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); 00168 } 00169 00175 protected function getCrossPartitionSum( $type, $method ) { 00176 $key = $this->getCacheKey( $type ); 00177 00178 $count = $this->cache->get( $key ); 00179 if ( is_int( $count ) ) { 00180 return $count; 00181 } 00182 00183 $count = 0; 00184 foreach ( $this->partitionQueues as $queue ) { 00185 try { 00186 $count += $queue->$method(); 00187 } catch ( JobQueueError $e ) { 00188 MWExceptionHandler::logException( $e ); 00189 } 00190 } 00191 00192 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); 00193 return $count; 00194 } 00195 00196 protected function doBatchPush( array $jobs, $flags ) { 00197 if ( !count( $jobs ) ) { 00198 return true; // nothing to do 00199 } 00200 // Local ring variable that may be changed to point to a new ring on failure 00201 $partitionRing = $this->partitionPushRing; 00202 // Try to insert the jobs and update $partitionsTry on any failures 00203 $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags ); 00204 if ( count( $jobsLeft ) ) { // some jobs failed to insert? 00205 // Try to insert the remaning jobs once more, ignoring the bad partitions 00206 return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) ); 00207 } 00208 return true; 00209 } 00210 00217 protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { 00218 $jobsLeft = array(); 00219 00220 // Because jobs are spread across partitions, per-job de-duplication needs 00221 // to use a consistent hash to avoid allowing duplicate jobs per partition. 00222 // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. 00223 $uJobsByPartition = array(); // (partition name => job list) 00224 foreach ( $jobs as $key => $job ) { 00225 if ( $job->ignoreDuplicates() ) { 00226 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); 00227 $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job; 00228 unset( $jobs[$key] ); 00229 } 00230 } 00231 // Get the batches of jobs that are not de-duplicated 00232 if ( $flags & self::QOS_ATOMIC ) { 00233 $nuJobBatches = array( $jobs ); // all or nothing 00234 } else { 00235 // Split the jobs into batches and spread them out over servers if there 00236 // are many jobs. This helps keep the partitions even. Otherwise, send all 00237 // the jobs to a single partition queue to avoids the extra connections. 00238 $nuJobBatches = array_chunk( $jobs, 300 ); 00239 } 00240 00241 // Insert the de-duplicated jobs into the queues... 00242 foreach ( $uJobsByPartition as $partition => $jobBatch ) { 00243 $queue = $this->partitionQueues[$partition]; 00244 try { 00245 $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); 00246 } catch ( JobQueueError $e ) { 00247 $ok = false; 00248 MWExceptionHandler::logException( $e ); 00249 } 00250 if ( $ok ) { 00251 $key = $this->getCacheKey( 'empty' ); 00252 $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); 00253 } else { 00254 $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist 00255 if ( !$partitionRing ) { 00256 throw new JobQueueError( "Could not insert job(s), all partitions are down." ); 00257 } 00258 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted 00259 } 00260 } 00261 00262 // Insert the jobs that are not de-duplicated into the queues... 00263 foreach ( $nuJobBatches as $jobBatch ) { 00264 $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() ); 00265 $queue = $this->partitionQueues[$partition]; 00266 try { 00267 $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); 00268 } catch ( JobQueueError $e ) { 00269 $ok = false; 00270 MWExceptionHandler::logException( $e ); 00271 } 00272 if ( $ok ) { 00273 $key = $this->getCacheKey( 'empty' ); 00274 $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); 00275 } else { 00276 $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist 00277 if ( !$partitionRing ) { 00278 throw new JobQueueError( "Could not insert job(s), all partitions are down." ); 00279 } 00280 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted 00281 } 00282 } 00283 00284 return $jobsLeft; 00285 } 00286 00287 protected function doPop() { 00288 $key = $this->getCacheKey( 'empty' ); 00289 00290 $isEmpty = $this->cache->get( $key ); 00291 if ( $isEmpty === 'true' ) { 00292 return false; 00293 } 00294 00295 $partitionsTry = $this->partitionMap; // (partition => weight) 00296 00297 while ( count( $partitionsTry ) ) { 00298 $partition = ArrayUtils::pickRandom( $partitionsTry ); 00299 if ( $partition === false ) { 00300 break; // all partitions at 0 weight 00301 } 00302 $queue = $this->partitionQueues[$partition]; 00303 try { 00304 $job = $queue->pop(); 00305 } catch ( JobQueueError $e ) { 00306 $job = false; 00307 MWExceptionHandler::logException( $e ); 00308 } 00309 if ( $job ) { 00310 $job->metadata['QueuePartition'] = $partition; 00311 return $job; 00312 } else { 00313 unset( $partitionsTry[$partition] ); // blacklist partition 00314 } 00315 } 00316 00317 $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG ); 00318 return false; 00319 } 00320 00321 protected function doAck( Job $job ) { 00322 if ( !isset( $job->metadata['QueuePartition'] ) ) { 00323 throw new MWException( "The given job has no defined partition name." ); 00324 } 00325 return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); 00326 } 00327 00328 protected function doIsRootJobOldDuplicate( Job $job ) { 00329 $params = $job->getRootJobParams(); 00330 $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); 00331 try { 00332 return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); 00333 } catch ( JobQueueError $e ) { 00334 if ( isset( $partitions[1] ) ) { // check fallback partition 00335 return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job ); 00336 } 00337 } 00338 return false; 00339 } 00340 00341 protected function doDeduplicateRootJob( Job $job ) { 00342 $params = $job->getRootJobParams(); 00343 $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); 00344 try { 00345 return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); 00346 } catch ( JobQueueError $e ) { 00347 if ( isset( $partitions[1] ) ) { // check fallback partition 00348 return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job ); 00349 } 00350 } 00351 return false; 00352 } 00353 00354 protected function doDelete() { 00355 foreach ( $this->partitionQueues as $queue ) { 00356 try { 00357 $queue->doDelete(); 00358 } catch ( JobQueueError $e ) { 00359 MWExceptionHandler::logException( $e ); 00360 } 00361 } 00362 } 00363 00364 protected function doWaitForBackups() { 00365 foreach ( $this->partitionQueues as $queue ) { 00366 try { 00367 $queue->waitForBackups(); 00368 } catch ( JobQueueError $e ) { 00369 MWExceptionHandler::logException( $e ); 00370 } 00371 } 00372 } 00373 00374 protected function doGetPeriodicTasks() { 00375 $tasks = array(); 00376 foreach ( $this->partitionQueues as $partition => $queue ) { 00377 foreach ( $queue->getPeriodicTasks() as $task => $def ) { 00378 $tasks["{$partition}:{$task}"] = $def; 00379 } 00380 } 00381 return $tasks; 00382 } 00383 00384 protected function doFlushCaches() { 00385 static $types = array( 00386 'empty', 00387 'size', 00388 'acquiredcount', 00389 'delayedcount', 00390 'abandonedcount' 00391 ); 00392 foreach ( $types as $type ) { 00393 $this->cache->delete( $this->getCacheKey( $type ) ); 00394 } 00395 foreach ( $this->partitionQueues as $queue ) { 00396 $queue->doFlushCaches(); 00397 } 00398 } 00399 00400 public function getAllQueuedJobs() { 00401 $iterator = new AppendIterator(); 00402 foreach ( $this->partitionQueues as $queue ) { 00403 $iterator->append( $queue->getAllQueuedJobs() ); 00404 } 00405 return $iterator; 00406 } 00407 00408 public function getAllDelayedJobs() { 00409 $iterator = new AppendIterator(); 00410 foreach ( $this->partitionQueues as $queue ) { 00411 $iterator->append( $queue->getAllDelayedJobs() ); 00412 } 00413 return $iterator; 00414 } 00415 00416 public function getCoalesceLocationInternal() { 00417 return "JobQueueFederated:wiki:{$this->wiki}" . 00418 sha1( serialize( array_keys( $this->partitionMap ) ) ); 00419 } 00420 00421 protected function doGetSiblingQueuesWithJobs( array $types ) { 00422 $result = array(); 00423 foreach ( $this->partitionQueues as $queue ) { 00424 try { 00425 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); 00426 if ( is_array( $nonEmpty ) ) { 00427 $result = array_unique( array_merge( $result, $nonEmpty ) ); 00428 } else { 00429 return null; // not supported on all partitions; bail 00430 } 00431 if ( count( $result ) == count( $types ) ) { 00432 break; // short-circuit 00433 } 00434 } catch ( JobQueueError $e ) { 00435 MWExceptionHandler::logException( $e ); 00436 } 00437 } 00438 return array_values( $result ); 00439 } 00440 00441 protected function doGetSiblingQueueSizes( array $types ) { 00442 $result = array(); 00443 foreach ( $this->partitionQueues as $queue ) { 00444 try { 00445 $sizes = $queue->doGetSiblingQueueSizes( $types ); 00446 if ( is_array( $sizes ) ) { 00447 foreach ( $sizes as $type => $size ) { 00448 $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; 00449 } 00450 } else { 00451 return null; // not supported on all partitions; bail 00452 } 00453 } catch ( JobQueueError $e ) { 00454 MWExceptionHandler::logException( $e ); 00455 } 00456 } 00457 return $result; 00458 } 00459 00460 public function setTestingPrefix( $key ) { 00461 foreach ( $this->partitionQueues as $queue ) { 00462 $queue->setTestingPrefix( $key ); 00463 } 00464 } 00465 00469 private function getCacheKey( $property ) { 00470 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00471 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); 00472 } 00473 }