MediaWiki
REL1_22
|
00001 <?php 00030 class JobQueueAggregatorMemc extends JobQueueAggregator { 00032 protected $cache; 00033 00034 protected $cacheTTL; // integer; seconds 00035 00043 protected function __construct( array $params ) { 00044 parent::__construct( $params ); 00045 $this->cache = isset( $params['objectCache'] ) 00046 ? wfGetCache( $params['objectCache'] ) 00047 : wfGetMainCache(); 00048 $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min 00049 } 00050 00054 protected function doNotifyQueueEmpty( $wiki, $type ) { 00055 $key = $this->getReadyQueueCacheKey(); 00056 // Delist the queue from the "ready queue" list 00057 if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock 00058 $curInfo = $this->cache->get( $key ); 00059 if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) { 00060 if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) { 00061 $curInfo['pendingDBs'][$type] = array_diff( 00062 $curInfo['pendingDBs'][$type], array( $wiki ) ); 00063 $this->cache->set( $key, $curInfo ); 00064 } 00065 } 00066 $this->cache->delete( "$key:lock" ); // unlock 00067 } 00068 return true; 00069 } 00070 00074 protected function doNotifyQueueNonEmpty( $wiki, $type ) { 00075 return true; // updated periodically 00076 } 00077 00081 protected function doGetAllReadyWikiQueues() { 00082 $key = $this->getReadyQueueCacheKey(); 00083 // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, 00084 // regenerate the cache. Use any available stale cache if another process is 00085 // currently regenerating the pending DB information. 00086 $pendingDbInfo = $this->cache->get( $key ); 00087 if ( !is_array( $pendingDbInfo ) 00088 || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL 00089 || mt_rand( 0, 999 ) == 0 00090 ) { 00091 if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock 00092 $pendingDbInfo = array( 00093 'pendingDBs' => $this->findPendingWikiQueues(), 00094 'timestamp' => time() 00095 ); 00096 for ( $attempts = 1; $attempts <= 25; ++$attempts ) { 00097 if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock 00098 $this->cache->set( $key, $pendingDbInfo ); 00099 $this->cache->delete( "$key:lock" ); // unlock 00100 break; 00101 } 00102 } 00103 $this->cache->delete( "$key:rebuild" ); // unlock 00104 } 00105 } 00106 return is_array( $pendingDbInfo ) 00107 ? $pendingDbInfo['pendingDBs'] 00108 : array(); // cache is both empty and locked 00109 } 00110 00114 protected function doPurge() { 00115 return $this->cache->delete( $this->getReadyQueueCacheKey() ); 00116 } 00117 00121 private function getReadyQueueCacheKey() { 00122 return "jobqueue:aggregator:ready-queues:v1"; // global 00123 } 00124 }