MediaWiki
REL1_23
|
00001 <?php 00049 class JobQueueFederated extends JobQueue { 00051 protected $partitionMap = array(); 00052 00054 protected $partitionQueues = array(); 00055 00057 protected $partitionPushRing; 00058 00060 protected $cache; 00061 00063 protected $maxPartitionsTry; 00064 00065 const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating 00066 const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date 00067 00088 protected function __construct( array $params ) { 00089 parent::__construct( $params ); 00090 $section = isset( $params['sectionsByWiki'][$this->wiki] ) 00091 ? $params['sectionsByWiki'][$this->wiki] 00092 : 'default'; 00093 if ( !isset( $params['partitionsBySection'][$section] ) ) { 00094 throw new MWException( "No configuration for section '$section'." ); 00095 } 00096 $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] ) 00097 ? $params['maxPartitionsTry'] 00098 : 2; 00099 // Get the full partition map 00100 $this->partitionMap = $params['partitionsBySection'][$section]; 00101 arsort( $this->partitionMap, SORT_NUMERIC ); 00102 // Get the partitions jobs can actually be pushed to 00103 $partitionPushMap = $this->partitionMap; 00104 if ( isset( $params['partitionsNoPush'] ) ) { 00105 foreach ( $params['partitionsNoPush'] as $partition ) { 00106 unset( $partitionPushMap[$partition] ); 00107 } 00108 } 00109 // Get the config to pass to merge into each partition queue config 00110 $baseConfig = $params; 00111 foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry', 00112 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o 00113 ) { 00114 unset( $baseConfig[$o] ); // partition queue doesn't care about this 00115 } 00116 // Get the partition queue objects 00117 foreach ( $this->partitionMap as $partition => $w ) { 00118 if ( !isset( $params['configByPartition'][$partition] ) ) { 00119 throw new MWException( "No configuration for partition '$partition'." ); 00120 } 00121 $this->partitionQueues[$partition] = JobQueue::factory( 00122 $baseConfig + $params['configByPartition'][$partition] ); 00123 } 00124 // Get the ring of partitions to push jobs into 00125 $this->partitionPushRing = new HashRing( $partitionPushMap ); 00126 // Aggregate cache some per-queue values if there are multiple partition queues 00127 $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); 00128 } 00129 00130 protected function supportedOrders() { 00131 // No FIFO due to partitioning, though "rough timestamp order" is supported 00132 return array( 'undefined', 'random', 'timestamp' ); 00133 } 00134 00135 protected function optimalOrder() { 00136 return 'undefined'; // defer to the partitions 00137 } 00138 00139 protected function supportsDelayedJobs() { 00140 return true; // defer checks to the partitions 00141 } 00142 00143 protected function doIsEmpty() { 00144 $key = $this->getCacheKey( 'empty' ); 00145 00146 $isEmpty = $this->cache->get( $key ); 00147 if ( $isEmpty === 'true' ) { 00148 return true; 00149 } elseif ( $isEmpty === 'false' ) { 00150 return false; 00151 } 00152 00153 $empty = true; 00154 $failed = 0; 00155 foreach ( $this->partitionQueues as $queue ) { 00156 try { 00157 $empty = $empty && $queue->doIsEmpty(); 00158 } catch ( JobQueueError $e ) { 00159 ++$failed; 00160 MWExceptionHandler::logException( $e ); 00161 } 00162 } 00163 $this->throwErrorIfAllPartitionsDown( $failed ); 00164 00165 $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG ); 00166 return $empty; 00167 } 00168 00169 protected function doGetSize() { 00170 return $this->getCrossPartitionSum( 'size', 'doGetSize' ); 00171 } 00172 00173 protected function doGetAcquiredCount() { 00174 return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); 00175 } 00176 00177 protected function doGetDelayedCount() { 00178 return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); 00179 } 00180 00181 protected function doGetAbandonedCount() { 00182 return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); 00183 } 00184 00190 protected function getCrossPartitionSum( $type, $method ) { 00191 $key = $this->getCacheKey( $type ); 00192 00193 $count = $this->cache->get( $key ); 00194 if ( is_int( $count ) ) { 00195 return $count; 00196 } 00197 00198 $failed = 0; 00199 foreach ( $this->partitionQueues as $queue ) { 00200 try { 00201 $count += $queue->$method(); 00202 } catch ( JobQueueError $e ) { 00203 ++$failed; 00204 MWExceptionHandler::logException( $e ); 00205 } 00206 } 00207 $this->throwErrorIfAllPartitionsDown( $failed ); 00208 00209 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); 00210 00211 return $count; 00212 } 00213 00214 protected function doBatchPush( array $jobs, $flags ) { 00215 // Local ring variable that may be changed to point to a new ring on failure 00216 $partitionRing = $this->partitionPushRing; 00217 // Try to insert the jobs and update $partitionsTry on any failures. 00218 // Retry to insert any remaning jobs again, ignoring the bad partitions. 00219 $jobsLeft = $jobs; 00220 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { 00221 $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); 00222 } 00223 if ( count( $jobsLeft ) ) { 00224 throw new JobQueueError( 00225 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); 00226 } 00227 00228 return true; 00229 } 00230 00238 protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { 00239 $jobsLeft = array(); 00240 00241 // Because jobs are spread across partitions, per-job de-duplication needs 00242 // to use a consistent hash to avoid allowing duplicate jobs per partition. 00243 // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. 00244 $uJobsByPartition = array(); // (partition name => job list) 00246 foreach ( $jobs as $key => $job ) { 00247 if ( $job->ignoreDuplicates() ) { 00248 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); 00249 $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job; 00250 unset( $jobs[$key] ); 00251 } 00252 } 00253 // Get the batches of jobs that are not de-duplicated 00254 if ( $flags & self::QOS_ATOMIC ) { 00255 $nuJobBatches = array( $jobs ); // all or nothing 00256 } else { 00257 // Split the jobs into batches and spread them out over servers if there 00258 // are many jobs. This helps keep the partitions even. Otherwise, send all 00259 // the jobs to a single partition queue to avoids the extra connections. 00260 $nuJobBatches = array_chunk( $jobs, 300 ); 00261 } 00262 00263 // Insert the de-duplicated jobs into the queues... 00264 foreach ( $uJobsByPartition as $partition => $jobBatch ) { 00266 $queue = $this->partitionQueues[$partition]; 00267 try { 00268 $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); 00269 } catch ( JobQueueError $e ) { 00270 $ok = false; 00271 MWExceptionHandler::logException( $e ); 00272 } 00273 if ( $ok ) { 00274 $key = $this->getCacheKey( 'empty' ); 00275 $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); 00276 } else { 00277 $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist 00278 if ( !$partitionRing ) { 00279 throw new JobQueueError( "Could not insert job(s), no partitions available." ); 00280 } 00281 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted 00282 } 00283 } 00284 00285 // Insert the jobs that are not de-duplicated into the queues... 00286 foreach ( $nuJobBatches as $jobBatch ) { 00287 $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() ); 00288 $queue = $this->partitionQueues[$partition]; 00289 try { 00290 $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); 00291 } catch ( JobQueueError $e ) { 00292 $ok = false; 00293 MWExceptionHandler::logException( $e ); 00294 } 00295 if ( $ok ) { 00296 $key = $this->getCacheKey( 'empty' ); 00297 $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); 00298 } else { 00299 $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist 00300 if ( !$partitionRing ) { 00301 throw new JobQueueError( "Could not insert job(s), no partitions available." ); 00302 } 00303 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted 00304 } 00305 } 00306 00307 return $jobsLeft; 00308 } 00309 00310 protected function doPop() { 00311 $key = $this->getCacheKey( 'empty' ); 00312 00313 $isEmpty = $this->cache->get( $key ); 00314 if ( $isEmpty === 'true' ) { 00315 return false; 00316 } 00317 00318 $partitionsTry = $this->partitionMap; // (partition => weight) 00319 00320 $failed = 0; 00321 while ( count( $partitionsTry ) ) { 00322 $partition = ArrayUtils::pickRandom( $partitionsTry ); 00323 if ( $partition === false ) { 00324 break; // all partitions at 0 weight 00325 } 00326 00328 $queue = $this->partitionQueues[$partition]; 00329 try { 00330 $job = $queue->pop(); 00331 } catch ( JobQueueError $e ) { 00332 ++$failed; 00333 MWExceptionHandler::logException( $e ); 00334 $job = false; 00335 } 00336 if ( $job ) { 00337 $job->metadata['QueuePartition'] = $partition; 00338 00339 return $job; 00340 } else { 00341 unset( $partitionsTry[$partition] ); // blacklist partition 00342 } 00343 } 00344 $this->throwErrorIfAllPartitionsDown( $failed ); 00345 00346 $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG ); 00347 00348 return false; 00349 } 00350 00351 protected function doAck( Job $job ) { 00352 if ( !isset( $job->metadata['QueuePartition'] ) ) { 00353 throw new MWException( "The given job has no defined partition name." ); 00354 } 00355 00356 return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); 00357 } 00358 00359 protected function doIsRootJobOldDuplicate( Job $job ) { 00360 $params = $job->getRootJobParams(); 00361 $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); 00362 try { 00363 return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); 00364 } catch ( JobQueueError $e ) { 00365 if ( isset( $partitions[1] ) ) { // check fallback partition 00366 return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job ); 00367 } 00368 } 00369 00370 return false; 00371 } 00372 00373 protected function doDeduplicateRootJob( Job $job ) { 00374 $params = $job->getRootJobParams(); 00375 $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); 00376 try { 00377 return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); 00378 } catch ( JobQueueError $e ) { 00379 if ( isset( $partitions[1] ) ) { // check fallback partition 00380 return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job ); 00381 } 00382 } 00383 00384 return false; 00385 } 00386 00387 protected function doDelete() { 00388 $failed = 0; 00390 foreach ( $this->partitionQueues as $queue ) { 00391 try { 00392 $queue->doDelete(); 00393 } catch ( JobQueueError $e ) { 00394 ++$failed; 00395 MWExceptionHandler::logException( $e ); 00396 } 00397 } 00398 $this->throwErrorIfAllPartitionsDown( $failed ); 00399 return true; 00400 } 00401 00402 protected function doWaitForBackups() { 00403 $failed = 0; 00405 foreach ( $this->partitionQueues as $queue ) { 00406 try { 00407 $queue->waitForBackups(); 00408 } catch ( JobQueueError $e ) { 00409 ++$failed; 00410 MWExceptionHandler::logException( $e ); 00411 } 00412 } 00413 $this->throwErrorIfAllPartitionsDown( $failed ); 00414 } 00415 00416 protected function doGetPeriodicTasks() { 00417 $tasks = array(); 00419 foreach ( $this->partitionQueues as $partition => $queue ) { 00420 foreach ( $queue->getPeriodicTasks() as $task => $def ) { 00421 $tasks["{$partition}:{$task}"] = $def; 00422 } 00423 } 00424 00425 return $tasks; 00426 } 00427 00428 protected function doFlushCaches() { 00429 static $types = array( 00430 'empty', 00431 'size', 00432 'acquiredcount', 00433 'delayedcount', 00434 'abandonedcount' 00435 ); 00436 00437 foreach ( $types as $type ) { 00438 $this->cache->delete( $this->getCacheKey( $type ) ); 00439 } 00440 00442 foreach ( $this->partitionQueues as $queue ) { 00443 $queue->doFlushCaches(); 00444 } 00445 } 00446 00447 public function getAllQueuedJobs() { 00448 $iterator = new AppendIterator(); 00449 00451 foreach ( $this->partitionQueues as $queue ) { 00452 $iterator->append( $queue->getAllQueuedJobs() ); 00453 } 00454 00455 return $iterator; 00456 } 00457 00458 public function getAllDelayedJobs() { 00459 $iterator = new AppendIterator(); 00460 00462 foreach ( $this->partitionQueues as $queue ) { 00463 $iterator->append( $queue->getAllDelayedJobs() ); 00464 } 00465 00466 return $iterator; 00467 } 00468 00469 public function getCoalesceLocationInternal() { 00470 return "JobQueueFederated:wiki:{$this->wiki}" . 00471 sha1( serialize( array_keys( $this->partitionMap ) ) ); 00472 } 00473 00474 protected function doGetSiblingQueuesWithJobs( array $types ) { 00475 $result = array(); 00476 00477 $failed = 0; 00479 foreach ( $this->partitionQueues as $queue ) { 00480 try { 00481 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); 00482 if ( is_array( $nonEmpty ) ) { 00483 $result = array_unique( array_merge( $result, $nonEmpty ) ); 00484 } else { 00485 return null; // not supported on all partitions; bail 00486 } 00487 if ( count( $result ) == count( $types ) ) { 00488 break; // short-circuit 00489 } 00490 } catch ( JobQueueError $e ) { 00491 ++$failed; 00492 MWExceptionHandler::logException( $e ); 00493 } 00494 } 00495 $this->throwErrorIfAllPartitionsDown( $failed ); 00496 00497 return array_values( $result ); 00498 } 00499 00500 protected function doGetSiblingQueueSizes( array $types ) { 00501 $result = array(); 00502 $failed = 0; 00504 foreach ( $this->partitionQueues as $queue ) { 00505 try { 00506 $sizes = $queue->doGetSiblingQueueSizes( $types ); 00507 if ( is_array( $sizes ) ) { 00508 foreach ( $sizes as $type => $size ) { 00509 $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; 00510 } 00511 } else { 00512 return null; // not supported on all partitions; bail 00513 } 00514 } catch ( JobQueueError $e ) { 00515 ++$failed; 00516 MWExceptionHandler::logException( $e ); 00517 } 00518 } 00519 $this->throwErrorIfAllPartitionsDown( $failed ); 00520 00521 return $result; 00522 } 00523 00531 protected function throwErrorIfAllPartitionsDown( $down ) { 00532 if ( $down >= count( $this->partitionQueues ) ) { 00533 throw new JobQueueError( 'No queue partitions available.' ); 00534 } 00535 } 00536 00537 public function setTestingPrefix( $key ) { 00539 foreach ( $this->partitionQueues as $queue ) { 00540 $queue->setTestingPrefix( $key ); 00541 } 00542 } 00543 00548 private function getCacheKey( $property ) { 00549 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00550 00551 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); 00552 } 00553 }