MediaWiki
REL1_24
|
00001 <?php 00030 class JobQueueAggregatorMemc extends JobQueueAggregator { 00032 protected $cache; 00033 00034 protected $cacheTTL; // integer; seconds 00035 00042 protected function __construct( array $params ) { 00043 parent::__construct( $params ); 00044 $this->cache = isset( $params['objectCache'] ) 00045 ? wfGetCache( $params['objectCache'] ) 00046 : wfGetMainCache(); 00047 $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min 00048 } 00049 00053 protected function doNotifyQueueEmpty( $wiki, $type ) { 00054 $key = $this->getReadyQueueCacheKey(); 00055 // Delist the queue from the "ready queue" list 00056 if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock 00057 $curInfo = $this->cache->get( $key ); 00058 if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) { 00059 if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) { 00060 $curInfo['pendingDBs'][$type] = array_diff( 00061 $curInfo['pendingDBs'][$type], array( $wiki ) ); 00062 $this->cache->set( $key, $curInfo ); 00063 } 00064 } 00065 $this->cache->delete( "$key:lock" ); // unlock 00066 } 00067 00068 return true; 00069 } 00070 00074 protected function doNotifyQueueNonEmpty( $wiki, $type ) { 00075 return true; // updated periodically 00076 } 00077 00081 protected function doGetAllReadyWikiQueues() { 00082 $key = $this->getReadyQueueCacheKey(); 00083 // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, 00084 // regenerate the cache. Use any available stale cache if another process is 00085 // currently regenerating the pending DB information. 00086 $pendingDbInfo = $this->cache->get( $key ); 00087 if ( !is_array( $pendingDbInfo ) 00088 || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL 00089 || mt_rand( 0, 999 ) == 0 00090 ) { 00091 if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock 00092 $pendingDbInfo = array( 00093 'pendingDBs' => $this->findPendingWikiQueues(), 00094 'timestamp' => time() 00095 ); 00096 for ( $attempts = 1; $attempts <= 25; ++$attempts ) { 00097 if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock 00098 $this->cache->set( $key, $pendingDbInfo ); 00099 $this->cache->delete( "$key:lock" ); // unlock 00100 break; 00101 } 00102 } 00103 $this->cache->delete( "$key:rebuild" ); // unlock 00104 } 00105 } 00106 00107 return is_array( $pendingDbInfo ) 00108 ? $pendingDbInfo['pendingDBs'] 00109 : array(); // cache is both empty and locked 00110 } 00111 00115 protected function doPurge() { 00116 return $this->cache->delete( $this->getReadyQueueCacheKey() ); 00117 } 00118 00122 private function getReadyQueueCacheKey() { 00123 return "jobqueue:aggregator:ready-queues:v1"; // global 00124 } 00125 }