MediaWiki
REL1_23
|
00001 <?php 00030 class JobQueueAggregatorMemc extends JobQueueAggregator { 00032 protected $cache; 00033 00034 protected $cacheTTL; // integer; seconds 00035 00043 protected function __construct( array $params ) { 00044 parent::__construct( $params ); 00045 $this->cache = isset( $params['objectCache'] ) 00046 ? wfGetCache( $params['objectCache'] ) 00047 : wfGetMainCache(); 00048 $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min 00049 } 00050 00054 protected function doNotifyQueueEmpty( $wiki, $type ) { 00055 $key = $this->getReadyQueueCacheKey(); 00056 // Delist the queue from the "ready queue" list 00057 if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock 00058 $curInfo = $this->cache->get( $key ); 00059 if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) { 00060 if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) { 00061 $curInfo['pendingDBs'][$type] = array_diff( 00062 $curInfo['pendingDBs'][$type], array( $wiki ) ); 00063 $this->cache->set( $key, $curInfo ); 00064 } 00065 } 00066 $this->cache->delete( "$key:lock" ); // unlock 00067 } 00068 00069 return true; 00070 } 00071 00075 protected function doNotifyQueueNonEmpty( $wiki, $type ) { 00076 return true; // updated periodically 00077 } 00078 00082 protected function doGetAllReadyWikiQueues() { 00083 $key = $this->getReadyQueueCacheKey(); 00084 // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, 00085 // regenerate the cache. Use any available stale cache if another process is 00086 // currently regenerating the pending DB information. 00087 $pendingDbInfo = $this->cache->get( $key ); 00088 if ( !is_array( $pendingDbInfo ) 00089 || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL 00090 || mt_rand( 0, 999 ) == 0 00091 ) { 00092 if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock 00093 $pendingDbInfo = array( 00094 'pendingDBs' => $this->findPendingWikiQueues(), 00095 'timestamp' => time() 00096 ); 00097 for ( $attempts = 1; $attempts <= 25; ++$attempts ) { 00098 if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock 00099 $this->cache->set( $key, $pendingDbInfo ); 00100 $this->cache->delete( "$key:lock" ); // unlock 00101 break; 00102 } 00103 } 00104 $this->cache->delete( "$key:rebuild" ); // unlock 00105 } 00106 } 00107 00108 return is_array( $pendingDbInfo ) 00109 ? $pendingDbInfo['pendingDBs'] 00110 : array(); // cache is both empty and locked 00111 } 00112 00116 protected function doPurge() { 00117 return $this->cache->delete( $this->getReadyQueueCacheKey() ); 00118 } 00119 00123 private function getReadyQueueCacheKey() { 00124 return "jobqueue:aggregator:ready-queues:v1"; // global 00125 } 00126 }