MediaWiki
REL1_21
|
00001 <?php 00038 class MemcLockManager extends QuorumLockManager { 00040 protected $lockTypeMap = array( 00041 self::LOCK_SH => self::LOCK_SH, 00042 self::LOCK_UW => self::LOCK_SH, 00043 self::LOCK_EX => self::LOCK_EX 00044 ); 00045 00047 protected $bagOStuffs = array(); 00049 protected $serversUp = array(); // (server name => bool) 00050 00051 protected $session = ''; // string; random UUID 00052 00066 public function __construct( array $config ) { 00067 parent::__construct( $config ); 00068 00069 // Sanitize srvsByBucket config to prevent PHP errors 00070 $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' ); 00071 $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive 00072 00073 $memcConfig = isset( $config['memcConfig'] ) 00074 ? $config['memcConfig'] 00075 : array( 'class' => 'MemcachedPhpBagOStuff' ); 00076 00077 foreach ( $config['lockServers'] as $name => $address ) { 00078 $params = array( 'servers' => array( $address ) ) + $memcConfig; 00079 $cache = ObjectCache::newFromParams( $params ); 00080 if ( $cache instanceof MemcachedBagOStuff ) { 00081 $this->bagOStuffs[$name] = $cache; 00082 } else { 00083 throw new MWException( 00084 'Only MemcachedBagOStuff classes are supported by MemcLockManager.' ); 00085 } 00086 } 00087 00088 $this->session = wfRandomString( 32 ); 00089 } 00090 00095 protected function getLocksOnServer( $lockSrv, array $paths, $type ) { 00096 $status = Status::newGood(); 00097 00098 $memc = $this->getCache( $lockSrv ); 00099 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00100 00101 // Lock all of the active lock record keys... 00102 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00103 foreach ( $paths as $path ) { 00104 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00105 } 00106 return $status; 00107 } 00108 00109 // Fetch all the existing lock records... 00110 $lockRecords = $memc->getMulti( $keys ); 00111 00112 $now = time(); 00113 // Check if the requested locks conflict with existing ones... 00114 foreach ( $paths as $path ) { 00115 $locksKey = $this->recordKeyForPath( $path ); 00116 $locksHeld = isset( $lockRecords[$locksKey] ) 00117 ? self::sanitizeLockArray( $lockRecords[$locksKey] ) 00118 : self::newLockArray(); // init 00119 foreach ( $locksHeld[self::LOCK_EX] as $session => $expiry ) { 00120 if ( $expiry < $now ) { // stale? 00121 unset( $locksHeld[self::LOCK_EX][$session] ); 00122 } elseif ( $session !== $this->session ) { 00123 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00124 } 00125 } 00126 if ( $type === self::LOCK_EX ) { 00127 foreach ( $locksHeld[self::LOCK_SH] as $session => $expiry ) { 00128 if ( $expiry < $now ) { // stale? 00129 unset( $locksHeld[self::LOCK_SH][$session] ); 00130 } elseif ( $session !== $this->session ) { 00131 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00132 } 00133 } 00134 } 00135 if ( $status->isOK() ) { 00136 // Register the session in the lock record array 00137 $locksHeld[$type][$this->session] = $now + $this->lockTTL; 00138 // We will update this record if none of the other locks conflict 00139 $lockRecords[$locksKey] = $locksHeld; 00140 } 00141 } 00142 00143 // If there were no lock conflicts, update all the lock records... 00144 if ( $status->isOK() ) { 00145 foreach ( $paths as $path ) { 00146 $locksKey = $this->recordKeyForPath( $path ); 00147 $locksHeld = $lockRecords[$locksKey]; 00148 $ok = $memc->set( $locksKey, $locksHeld, 7*86400 ); 00149 if ( !$ok ) { 00150 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00151 } else { 00152 wfDebug( __METHOD__ . ": acquired lock on key $locksKey.\n" ); 00153 } 00154 } 00155 } 00156 00157 // Unlock all of the active lock record keys... 00158 $this->releaseMutexes( $memc, $keys ); 00159 00160 return $status; 00161 } 00162 00167 protected function freeLocksOnServer( $lockSrv, array $paths, $type ) { 00168 $status = Status::newGood(); 00169 00170 $memc = $this->getCache( $lockSrv ); 00171 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00172 00173 // Lock all of the active lock record keys... 00174 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00175 foreach ( $paths as $path ) { 00176 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00177 } 00178 return; 00179 } 00180 00181 // Fetch all the existing lock records... 00182 $lockRecords = $memc->getMulti( $keys ); 00183 00184 // Remove the requested locks from all records... 00185 foreach ( $paths as $path ) { 00186 $locksKey = $this->recordKeyForPath( $path ); // lock record 00187 if ( !isset( $lockRecords[$locksKey] ) ) { 00188 $status->warning( 'lockmanager-fail-releaselock', $path ); 00189 continue; // nothing to do 00190 } 00191 $locksHeld = self::sanitizeLockArray( $lockRecords[$locksKey] ); 00192 if ( isset( $locksHeld[$type][$this->session] ) ) { 00193 unset( $locksHeld[$type][$this->session] ); // unregister this session 00194 if ( $locksHeld === self::newLockArray() ) { 00195 $ok = $memc->delete( $locksKey ); 00196 } else { 00197 $ok = $memc->set( $locksKey, $locksHeld ); 00198 } 00199 if ( !$ok ) { 00200 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00201 } 00202 } else { 00203 $status->warning( 'lockmanager-fail-releaselock', $path ); 00204 } 00205 wfDebug( __METHOD__ . ": released lock on key $locksKey.\n" ); 00206 } 00207 00208 // Unlock all of the active lock record keys... 00209 $this->releaseMutexes( $memc, $keys ); 00210 00211 return $status; 00212 } 00213 00218 protected function releaseAllLocks() { 00219 return Status::newGood(); // not supported 00220 } 00221 00226 protected function isServerUp( $lockSrv ) { 00227 return (bool)$this->getCache( $lockSrv ); 00228 } 00229 00236 protected function getCache( $lockSrv ) { 00237 $memc = null; 00238 if ( isset( $this->bagOStuffs[$lockSrv] ) ) { 00239 $memc = $this->bagOStuffs[$lockSrv]; 00240 if ( !isset( $this->serversUp[$lockSrv] ) ) { 00241 $this->serversUp[$lockSrv] = $memc->set( __CLASS__ . ':ping', 1, 1 ); 00242 if ( !$this->serversUp[$lockSrv] ) { 00243 trigger_error( __METHOD__ . ": Could not contact $lockSrv.", E_USER_WARNING ); 00244 } 00245 } 00246 if ( !$this->serversUp[$lockSrv] ) { 00247 return null; // server appears to be down 00248 } 00249 } 00250 return $memc; 00251 } 00252 00257 protected function recordKeyForPath( $path ) { 00258 return implode( ':', array( __CLASS__, 'locks', $this->sha1Base36Absolute( $path ) ) ); 00259 } 00260 00264 protected static function newLockArray() { 00265 return array( self::LOCK_SH => array(), self::LOCK_EX => array() ); 00266 } 00267 00272 protected static function sanitizeLockArray( $a ) { 00273 if ( is_array( $a ) && isset( $a[self::LOCK_EX] ) && isset( $a[self::LOCK_SH] ) ) { 00274 return $a; 00275 } else { 00276 trigger_error( __METHOD__ . ": reset invalid lock array.", E_USER_WARNING ); 00277 return self::newLockArray(); 00278 } 00279 } 00280 00286 protected function acquireMutexes( MemcachedBagOStuff $memc, array $keys ) { 00287 $lockedKeys = array(); 00288 00289 // Acquire the keys in lexicographical order, to avoid deadlock problems. 00290 // If P1 is waiting to acquire a key P2 has, P2 can't also be waiting for a key P1 has. 00291 sort( $keys ); 00292 00293 // Try to quickly loop to acquire the keys, but back off after a few rounds. 00294 // This reduces memcached spam, especially in the rare case where a server acquires 00295 // some lock keys and dies without releasing them. Lock keys expire after a few minutes. 00296 $rounds = 0; 00297 $start = microtime( true ); 00298 do { 00299 if ( ( ++$rounds % 4 ) == 0 ) { 00300 usleep( 1000*50 ); // 50 ms 00301 } 00302 foreach ( array_diff( $keys, $lockedKeys ) as $key ) { 00303 if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record 00304 $lockedKeys[] = $key; 00305 } else { 00306 continue; // acquire in order 00307 } 00308 } 00309 } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 3 ); 00310 00311 if ( count( $lockedKeys ) != count( $keys ) ) { 00312 $this->releaseMutexes( $memc, $lockedKeys ); // failed; release what was locked 00313 return false; 00314 } 00315 00316 return true; 00317 } 00318 00324 protected function releaseMutexes( MemcachedBagOStuff $memc, array $keys ) { 00325 foreach ( $keys as $key ) { 00326 $memc->delete( "$key:mutex" ); 00327 } 00328 } 00329 00333 function __destruct() { 00334 while ( count( $this->locksHeld ) ) { 00335 foreach ( $this->locksHeld as $path => $locks ) { 00336 $this->doUnlock( array( $path ), self::LOCK_EX ); 00337 $this->doUnlock( array( $path ), self::LOCK_SH ); 00338 } 00339 } 00340 } 00341 }