MediaWiki
REL1_24
|
00001 <?php 00049 class JobQueueFederated extends JobQueue { 00051 protected $partitionRing; 00053 protected $partitionPushRing; 00055 protected $partitionQueues = array(); 00056 00058 protected $cache; 00059 00061 protected $maxPartitionsTry; 00062 00063 const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating 00064 const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date 00065 00085 protected function __construct( array $params ) { 00086 parent::__construct( $params ); 00087 $section = isset( $params['sectionsByWiki'][$this->wiki] ) 00088 ? $params['sectionsByWiki'][$this->wiki] 00089 : 'default'; 00090 if ( !isset( $params['partitionsBySection'][$section] ) ) { 00091 throw new MWException( "No configuration for section '$section'." ); 00092 } 00093 $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] ) 00094 ? $params['maxPartitionsTry'] 00095 : 2; 00096 // Get the full partition map 00097 $partitionMap = $params['partitionsBySection'][$section]; 00098 arsort( $partitionMap, SORT_NUMERIC ); 00099 // Get the partitions jobs can actually be pushed to 00100 $partitionPushMap = $partitionMap; 00101 if ( isset( $params['partitionsNoPush'] ) ) { 00102 foreach ( $params['partitionsNoPush'] as $partition ) { 00103 unset( $partitionPushMap[$partition] ); 00104 } 00105 } 00106 // Get the config to pass to merge into each partition queue config 00107 $baseConfig = $params; 00108 foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry', 00109 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o 00110 ) { 00111 unset( $baseConfig[$o] ); // partition queue doesn't care about this 00112 } 00113 // Get the partition queue objects 00114 foreach ( $partitionMap as $partition => $w ) { 00115 if ( !isset( $params['configByPartition'][$partition] ) ) { 00116 throw new MWException( "No configuration for partition '$partition'." ); 00117 } 00118 $this->partitionQueues[$partition] = JobQueue::factory( 00119 $baseConfig + $params['configByPartition'][$partition] ); 00120 } 00121 // Ring of all partitions 00122 $this->partitionRing = new HashRing( $partitionMap ); 00123 // Get the ring of partitions to push jobs into 00124 if ( count( $partitionPushMap ) === count( $partitionMap ) ) { 00125 $this->partitionPushRing = clone $this->partitionRing; // faster 00126 } else { 00127 $this->partitionPushRing = new HashRing( $partitionPushMap ); 00128 } 00129 // Aggregate cache some per-queue values if there are multiple partition queues 00130 $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); 00131 } 00132 00133 protected function supportedOrders() { 00134 // No FIFO due to partitioning, though "rough timestamp order" is supported 00135 return array( 'undefined', 'random', 'timestamp' ); 00136 } 00137 00138 protected function optimalOrder() { 00139 return 'undefined'; // defer to the partitions 00140 } 00141 00142 protected function supportsDelayedJobs() { 00143 return true; // defer checks to the partitions 00144 } 00145 00146 protected function doIsEmpty() { 00147 $key = $this->getCacheKey( 'empty' ); 00148 00149 $isEmpty = $this->cache->get( $key ); 00150 if ( $isEmpty === 'true' ) { 00151 return true; 00152 } elseif ( $isEmpty === 'false' ) { 00153 return false; 00154 } 00155 00156 $empty = true; 00157 $failed = 0; 00158 foreach ( $this->partitionQueues as $queue ) { 00159 try { 00160 $empty = $empty && $queue->doIsEmpty(); 00161 } catch ( JobQueueError $e ) { 00162 ++$failed; 00163 MWExceptionHandler::logException( $e ); 00164 } 00165 } 00166 $this->throwErrorIfAllPartitionsDown( $failed ); 00167 00168 $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG ); 00169 return $empty; 00170 } 00171 00172 protected function doGetSize() { 00173 return $this->getCrossPartitionSum( 'size', 'doGetSize' ); 00174 } 00175 00176 protected function doGetAcquiredCount() { 00177 return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); 00178 } 00179 00180 protected function doGetDelayedCount() { 00181 return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); 00182 } 00183 00184 protected function doGetAbandonedCount() { 00185 return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); 00186 } 00187 00193 protected function getCrossPartitionSum( $type, $method ) { 00194 $key = $this->getCacheKey( $type ); 00195 00196 $count = $this->cache->get( $key ); 00197 if ( $count !== false ) { 00198 return $count; 00199 } 00200 00201 $failed = 0; 00202 foreach ( $this->partitionQueues as $queue ) { 00203 try { 00204 $count += $queue->$method(); 00205 } catch ( JobQueueError $e ) { 00206 ++$failed; 00207 MWExceptionHandler::logException( $e ); 00208 } 00209 } 00210 $this->throwErrorIfAllPartitionsDown( $failed ); 00211 00212 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); 00213 00214 return $count; 00215 } 00216 00217 protected function doBatchPush( array $jobs, $flags ) { 00218 // Local ring variable that may be changed to point to a new ring on failure 00219 $partitionRing = $this->partitionPushRing; 00220 // Try to insert the jobs and update $partitionsTry on any failures. 00221 // Retry to insert any remaning jobs again, ignoring the bad partitions. 00222 $jobsLeft = $jobs; 00223 // @codingStandardsIgnoreStart Generic.CodeAnalysis.ForLoopWithTestFunctionCall.NotAllowed 00224 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { 00225 // @codingStandardsIgnoreEnd 00226 try { 00227 $partitionRing->getLiveRing(); 00228 } catch ( UnexpectedValueException $e ) { 00229 break; // all servers down; nothing to insert to 00230 } 00231 $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); 00232 } 00233 if ( count( $jobsLeft ) ) { 00234 throw new JobQueueError( 00235 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); 00236 } 00237 } 00238 00246 protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { 00247 $jobsLeft = array(); 00248 00249 // Because jobs are spread across partitions, per-job de-duplication needs 00250 // to use a consistent hash to avoid allowing duplicate jobs per partition. 00251 // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. 00252 $uJobsByPartition = array(); // (partition name => job list) 00254 foreach ( $jobs as $key => $job ) { 00255 if ( $job->ignoreDuplicates() ) { 00256 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); 00257 $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job; 00258 unset( $jobs[$key] ); 00259 } 00260 } 00261 // Get the batches of jobs that are not de-duplicated 00262 if ( $flags & self::QOS_ATOMIC ) { 00263 $nuJobBatches = array( $jobs ); // all or nothing 00264 } else { 00265 // Split the jobs into batches and spread them out over servers if there 00266 // are many jobs. This helps keep the partitions even. Otherwise, send all 00267 // the jobs to a single partition queue to avoids the extra connections. 00268 $nuJobBatches = array_chunk( $jobs, 300 ); 00269 } 00270 00271 // Insert the de-duplicated jobs into the queues... 00272 foreach ( $uJobsByPartition as $partition => $jobBatch ) { 00274 $queue = $this->partitionQueues[$partition]; 00275 try { 00276 $ok = true; 00277 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); 00278 } catch ( JobQueueError $e ) { 00279 $ok = false; 00280 MWExceptionHandler::logException( $e ); 00281 } 00282 if ( $ok ) { 00283 $key = $this->getCacheKey( 'empty' ); 00284 $this->cache->set( $key, 'false', self::CACHE_TTL_LONG ); 00285 } else { 00286 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist 00287 throw new JobQueueError( "Could not insert job(s), no partitions available." ); 00288 } 00289 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted 00290 } 00291 } 00292 00293 // Insert the jobs that are not de-duplicated into the queues... 00294 foreach ( $nuJobBatches as $jobBatch ) { 00295 $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() ); 00296 $queue = $this->partitionQueues[$partition]; 00297 try { 00298 $ok = true; 00299 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); 00300 } catch ( JobQueueError $e ) { 00301 $ok = false; 00302 MWExceptionHandler::logException( $e ); 00303 } 00304 if ( $ok ) { 00305 $key = $this->getCacheKey( 'empty' ); 00306 $this->cache->set( $key, 'false', self::CACHE_TTL_LONG ); 00307 } else { 00308 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist 00309 throw new JobQueueError( "Could not insert job(s), no partitions available." ); 00310 } 00311 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted 00312 } 00313 } 00314 00315 return $jobsLeft; 00316 } 00317 00318 protected function doPop() { 00319 $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight) 00320 00321 $failed = 0; 00322 while ( count( $partitionsTry ) ) { 00323 $partition = ArrayUtils::pickRandom( $partitionsTry ); 00324 if ( $partition === false ) { 00325 break; // all partitions at 0 weight 00326 } 00327 00329 $queue = $this->partitionQueues[$partition]; 00330 try { 00331 $job = $queue->pop(); 00332 } catch ( JobQueueError $e ) { 00333 ++$failed; 00334 MWExceptionHandler::logException( $e ); 00335 $job = false; 00336 } 00337 if ( $job ) { 00338 $job->metadata['QueuePartition'] = $partition; 00339 00340 return $job; 00341 } else { 00342 unset( $partitionsTry[$partition] ); // blacklist partition 00343 } 00344 } 00345 $this->throwErrorIfAllPartitionsDown( $failed ); 00346 00347 $key = $this->getCacheKey( 'empty' ); 00348 $this->cache->set( $key, 'true', self::CACHE_TTL_LONG ); 00349 00350 return false; 00351 } 00352 00353 protected function doAck( Job $job ) { 00354 if ( !isset( $job->metadata['QueuePartition'] ) ) { 00355 throw new MWException( "The given job has no defined partition name." ); 00356 } 00357 00358 return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); 00359 } 00360 00361 protected function doIsRootJobOldDuplicate( Job $job ) { 00362 $params = $job->getRootJobParams(); 00363 $sigature = $params['rootJobSignature']; 00364 $partition = $this->partitionPushRing->getLiveLocation( $sigature ); 00365 try { 00366 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); 00367 } catch ( JobQueueError $e ) { 00368 if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { 00369 $partition = $this->partitionPushRing->getLiveLocation( $sigature ); 00370 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); 00371 } 00372 } 00373 00374 return false; 00375 } 00376 00377 protected function doDeduplicateRootJob( Job $job ) { 00378 $params = $job->getRootJobParams(); 00379 $sigature = $params['rootJobSignature']; 00380 $partition = $this->partitionPushRing->getLiveLocation( $sigature ); 00381 try { 00382 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); 00383 } catch ( JobQueueError $e ) { 00384 if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { 00385 $partition = $this->partitionPushRing->getLiveLocation( $sigature ); 00386 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); 00387 } 00388 } 00389 00390 return false; 00391 } 00392 00393 protected function doDelete() { 00394 $failed = 0; 00396 foreach ( $this->partitionQueues as $queue ) { 00397 try { 00398 $queue->doDelete(); 00399 } catch ( JobQueueError $e ) { 00400 ++$failed; 00401 MWExceptionHandler::logException( $e ); 00402 } 00403 } 00404 $this->throwErrorIfAllPartitionsDown( $failed ); 00405 return true; 00406 } 00407 00408 protected function doWaitForBackups() { 00409 $failed = 0; 00411 foreach ( $this->partitionQueues as $queue ) { 00412 try { 00413 $queue->waitForBackups(); 00414 } catch ( JobQueueError $e ) { 00415 ++$failed; 00416 MWExceptionHandler::logException( $e ); 00417 } 00418 } 00419 $this->throwErrorIfAllPartitionsDown( $failed ); 00420 } 00421 00422 protected function doGetPeriodicTasks() { 00423 $tasks = array(); 00425 foreach ( $this->partitionQueues as $partition => $queue ) { 00426 foreach ( $queue->getPeriodicTasks() as $task => $def ) { 00427 $tasks["{$partition}:{$task}"] = $def; 00428 } 00429 } 00430 00431 return $tasks; 00432 } 00433 00434 protected function doFlushCaches() { 00435 static $types = array( 00436 'empty', 00437 'size', 00438 'acquiredcount', 00439 'delayedcount', 00440 'abandonedcount' 00441 ); 00442 00443 foreach ( $types as $type ) { 00444 $this->cache->delete( $this->getCacheKey( $type ) ); 00445 } 00446 00448 foreach ( $this->partitionQueues as $queue ) { 00449 $queue->doFlushCaches(); 00450 } 00451 } 00452 00453 public function getAllQueuedJobs() { 00454 $iterator = new AppendIterator(); 00455 00457 foreach ( $this->partitionQueues as $queue ) { 00458 $iterator->append( $queue->getAllQueuedJobs() ); 00459 } 00460 00461 return $iterator; 00462 } 00463 00464 public function getAllDelayedJobs() { 00465 $iterator = new AppendIterator(); 00466 00468 foreach ( $this->partitionQueues as $queue ) { 00469 $iterator->append( $queue->getAllDelayedJobs() ); 00470 } 00471 00472 return $iterator; 00473 } 00474 00475 public function getCoalesceLocationInternal() { 00476 return "JobQueueFederated:wiki:{$this->wiki}" . 00477 sha1( serialize( array_keys( $this->partitionQueues ) ) ); 00478 } 00479 00480 protected function doGetSiblingQueuesWithJobs( array $types ) { 00481 $result = array(); 00482 00483 $failed = 0; 00485 foreach ( $this->partitionQueues as $queue ) { 00486 try { 00487 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); 00488 if ( is_array( $nonEmpty ) ) { 00489 $result = array_unique( array_merge( $result, $nonEmpty ) ); 00490 } else { 00491 return null; // not supported on all partitions; bail 00492 } 00493 if ( count( $result ) == count( $types ) ) { 00494 break; // short-circuit 00495 } 00496 } catch ( JobQueueError $e ) { 00497 ++$failed; 00498 MWExceptionHandler::logException( $e ); 00499 } 00500 } 00501 $this->throwErrorIfAllPartitionsDown( $failed ); 00502 00503 return array_values( $result ); 00504 } 00505 00506 protected function doGetSiblingQueueSizes( array $types ) { 00507 $result = array(); 00508 $failed = 0; 00510 foreach ( $this->partitionQueues as $queue ) { 00511 try { 00512 $sizes = $queue->doGetSiblingQueueSizes( $types ); 00513 if ( is_array( $sizes ) ) { 00514 foreach ( $sizes as $type => $size ) { 00515 $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; 00516 } 00517 } else { 00518 return null; // not supported on all partitions; bail 00519 } 00520 } catch ( JobQueueError $e ) { 00521 ++$failed; 00522 MWExceptionHandler::logException( $e ); 00523 } 00524 } 00525 $this->throwErrorIfAllPartitionsDown( $failed ); 00526 00527 return $result; 00528 } 00529 00537 protected function throwErrorIfAllPartitionsDown( $down ) { 00538 if ( $down >= count( $this->partitionQueues ) ) { 00539 throw new JobQueueError( 'No queue partitions available.' ); 00540 } 00541 } 00542 00543 public function setTestingPrefix( $key ) { 00545 foreach ( $this->partitionQueues as $queue ) { 00546 $queue->setTestingPrefix( $key ); 00547 } 00548 } 00549 00554 private function getCacheKey( $property ) { 00555 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00556 00557 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); 00558 } 00559 }