MediaWiki
REL1_20
|
00001 <?php 00038 class MemcLockManager extends QuorumLockManager { 00040 protected $lockTypeMap = array( 00041 self::LOCK_SH => self::LOCK_SH, 00042 self::LOCK_UW => self::LOCK_SH, 00043 self::LOCK_EX => self::LOCK_EX 00044 ); 00045 00047 protected $bagOStuffs = array(); 00049 protected $serversUp = array(); // (server name => bool) 00050 00051 protected $lockExpiry; // integer; maximum time locks can be held 00052 protected $session = ''; // string; random SHA-1 UUID 00053 protected $wikiId = ''; // string 00054 00068 public function __construct( array $config ) { 00069 parent::__construct( $config ); 00070 00071 // Sanitize srvsByBucket config to prevent PHP errors 00072 $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' ); 00073 $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive 00074 00075 $memcConfig = isset( $config['memcConfig'] ) 00076 ? $config['memcConfig'] 00077 : array( 'class' => 'MemcachedPhpBagOStuff' ); 00078 00079 foreach ( $config['lockServers'] as $name => $address ) { 00080 $params = array( 'servers' => array( $address ) ) + $memcConfig; 00081 $cache = ObjectCache::newFromParams( $params ); 00082 if ( $cache instanceof MemcachedBagOStuff ) { 00083 $this->bagOStuffs[$name] = $cache; 00084 } else { 00085 throw new MWException( 00086 'Only MemcachedBagOStuff classes are supported by MemcLockManager.' ); 00087 } 00088 } 00089 00090 $this->wikiId = isset( $config['wikiId'] ) ? $config['wikiId'] : wfWikiID(); 00091 00092 $met = ini_get( 'max_execution_time' ); // this is 0 in CLI mode 00093 $this->lockExpiry = $met ? 2*(int)$met : 2*3600; 00094 00095 $this->session = wfRandomString( 32 ); 00096 } 00097 00102 protected function getLocksOnServer( $lockSrv, array $paths, $type ) { 00103 $status = Status::newGood(); 00104 00105 $memc = $this->getCache( $lockSrv ); 00106 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00107 00108 // Lock all of the active lock record keys... 00109 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00110 foreach ( $paths as $path ) { 00111 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00112 } 00113 return; 00114 } 00115 00116 // Fetch all the existing lock records... 00117 $lockRecords = $memc->getMulti( $keys ); 00118 00119 $now = time(); 00120 // Check if the requested locks conflict with existing ones... 00121 foreach ( $paths as $path ) { 00122 $locksKey = $this->recordKeyForPath( $path ); 00123 $locksHeld = isset( $lockRecords[$locksKey] ) 00124 ? $lockRecords[$locksKey] 00125 : array( self::LOCK_SH => array(), self::LOCK_EX => array() ); // init 00126 foreach ( $locksHeld[self::LOCK_EX] as $session => $expiry ) { 00127 if ( $expiry < $now ) { // stale? 00128 unset( $locksHeld[self::LOCK_EX][$session] ); 00129 } elseif ( $session !== $this->session ) { 00130 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00131 } 00132 } 00133 if ( $type === self::LOCK_EX ) { 00134 foreach ( $locksHeld[self::LOCK_SH] as $session => $expiry ) { 00135 if ( $expiry < $now ) { // stale? 00136 unset( $locksHeld[self::LOCK_SH][$session] ); 00137 } elseif ( $session !== $this->session ) { 00138 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00139 } 00140 } 00141 } 00142 if ( $status->isOK() ) { 00143 // Register the session in the lock record array 00144 $locksHeld[$type][$this->session] = $now + $this->lockExpiry; 00145 // We will update this record if none of the other locks conflict 00146 $lockRecords[$locksKey] = $locksHeld; 00147 } 00148 } 00149 00150 // If there were no lock conflicts, update all the lock records... 00151 if ( $status->isOK() ) { 00152 foreach ( $lockRecords as $locksKey => $locksHeld ) { 00153 $memc->set( $locksKey, $locksHeld ); 00154 wfDebug( __METHOD__ . ": acquired lock on key $locksKey.\n" ); 00155 } 00156 } 00157 00158 // Unlock all of the active lock record keys... 00159 $this->releaseMutexes( $memc, $keys ); 00160 00161 return $status; 00162 } 00163 00168 protected function freeLocksOnServer( $lockSrv, array $paths, $type ) { 00169 $status = Status::newGood(); 00170 00171 $memc = $this->getCache( $lockSrv ); 00172 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00173 00174 // Lock all of the active lock record keys... 00175 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00176 foreach ( $paths as $path ) { 00177 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00178 } 00179 return; 00180 } 00181 00182 // Fetch all the existing lock records... 00183 $lockRecords = $memc->getMulti( $keys ); 00184 00185 // Remove the requested locks from all records... 00186 foreach ( $paths as $path ) { 00187 $locksKey = $this->recordKeyForPath( $path ); // lock record 00188 if ( !isset( $lockRecords[$locksKey] ) ) { 00189 continue; // nothing to do 00190 } 00191 $locksHeld = $lockRecords[$locksKey]; 00192 if ( is_array( $locksHeld ) && isset( $locksHeld[$type] ) ) { 00193 unset( $locksHeld[$type][$this->session] ); 00194 $ok = $memc->set( $locksKey, $locksHeld ); 00195 } else { 00196 $ok = true; 00197 } 00198 if ( !$ok ) { 00199 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00200 } 00201 wfDebug( __METHOD__ . ": released lock on key $locksKey.\n" ); 00202 } 00203 00204 // Unlock all of the active lock record keys... 00205 $this->releaseMutexes( $memc, $keys ); 00206 00207 return $status; 00208 } 00209 00214 protected function releaseAllLocks() { 00215 return Status::newGood(); // not supported 00216 } 00217 00222 protected function isServerUp( $lockSrv ) { 00223 return (bool)$this->getCache( $lockSrv ); 00224 } 00225 00232 protected function getCache( $lockSrv ) { 00233 $memc = null; 00234 if ( isset( $this->bagOStuffs[$lockSrv] ) ) { 00235 $memc = $this->bagOStuffs[$lockSrv]; 00236 if ( !isset( $this->serversUp[$lockSrv] ) ) { 00237 $this->serversUp[$lockSrv] = $memc->set( 'MemcLockManager:ping', 1, 1 ); 00238 if ( !$this->serversUp[$lockSrv] ) { 00239 trigger_error( __METHOD__ . ": Could not contact $lockSrv.", E_USER_WARNING ); 00240 } 00241 } 00242 if ( !$this->serversUp[$lockSrv] ) { 00243 return null; // server appears to be down 00244 } 00245 } 00246 return $memc; 00247 } 00248 00253 protected function recordKeyForPath( $path ) { 00254 $hash = LockManager::sha1Base36( $path ); 00255 list( $db, $prefix ) = wfSplitWikiID( $this->wikiId ); 00256 return wfForeignMemcKey( $db, $prefix, __CLASS__, 'locks', $hash ); 00257 } 00258 00264 protected function acquireMutexes( MemcachedBagOStuff $memc, array $keys ) { 00265 $lockedKeys = array(); 00266 00267 // Acquire the keys in lexicographical order, to avoid deadlock problems. 00268 // If P1 is waiting to acquire a key P2 has, P2 can't also be waiting for a key P1 has. 00269 sort( $keys ); 00270 00271 // Try to quickly loop to acquire the keys, but back off after a few rounds. 00272 // This reduces memcached spam, especially in the rare case where a server acquires 00273 // some lock keys and dies without releasing them. Lock keys expire after a few minutes. 00274 $rounds = 0; 00275 $start = microtime( true ); 00276 do { 00277 if ( ( ++$rounds % 4 ) == 0 ) { 00278 usleep( 1000*50 ); // 50 ms 00279 } 00280 foreach ( array_diff( $keys, $lockedKeys ) as $key ) { 00281 if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record 00282 $lockedKeys[] = $key; 00283 } else { 00284 continue; // acquire in order 00285 } 00286 } 00287 } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 6 ); 00288 00289 if ( count( $lockedKeys ) != count( $keys ) ) { 00290 $this->releaseMutexes( $lockedKeys ); // failed; release what was locked 00291 return false; 00292 } 00293 00294 return true; 00295 } 00296 00302 protected function releaseMutexes( MemcachedBagOStuff $memc, array $keys ) { 00303 foreach ( $keys as $key ) { 00304 $memc->delete( "$key:mutex" ); 00305 } 00306 } 00307 00311 function __destruct() { 00312 while ( count( $this->locksHeld ) ) { 00313 foreach ( $this->locksHeld as $path => $locks ) { 00314 $this->doUnlock( array( $path ), self::LOCK_EX ); 00315 $this->doUnlock( array( $path ), self::LOCK_SH ); 00316 } 00317 } 00318 } 00319 }