MediaWiki
REL1_24
|
00001 <?php 00038 class MemcLockManager extends QuorumLockManager { 00040 protected $lockTypeMap = array( 00041 self::LOCK_SH => self::LOCK_SH, 00042 self::LOCK_UW => self::LOCK_SH, 00043 self::LOCK_EX => self::LOCK_EX 00044 ); 00045 00047 protected $bagOStuffs = array(); 00048 00050 protected $serversUp = array(); 00051 00053 protected $session = ''; 00054 00066 public function __construct( array $config ) { 00067 parent::__construct( $config ); 00068 00069 // Sanitize srvsByBucket config to prevent PHP errors 00070 $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' ); 00071 $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive 00072 00073 $memcConfig = isset( $config['memcConfig'] ) 00074 ? $config['memcConfig'] 00075 : array( 'class' => 'MemcachedPhpBagOStuff' ); 00076 00077 foreach ( $config['lockServers'] as $name => $address ) { 00078 $params = array( 'servers' => array( $address ) ) + $memcConfig; 00079 $cache = ObjectCache::newFromParams( $params ); 00080 if ( $cache instanceof MemcachedBagOStuff ) { 00081 $this->bagOStuffs[$name] = $cache; 00082 } else { 00083 throw new MWException( 00084 'Only MemcachedBagOStuff classes are supported by MemcLockManager.' ); 00085 } 00086 } 00087 00088 $this->session = wfRandomString( 32 ); 00089 } 00090 00091 // @todo Change this code to work in one batch 00092 protected function getLocksOnServer( $lockSrv, array $pathsByType ) { 00093 $status = Status::newGood(); 00094 00095 $lockedPaths = array(); 00096 foreach ( $pathsByType as $type => $paths ) { 00097 $status->merge( $this->doGetLocksOnServer( $lockSrv, $paths, $type ) ); 00098 if ( $status->isOK() ) { 00099 $lockedPaths[$type] = isset( $lockedPaths[$type] ) 00100 ? array_merge( $lockedPaths[$type], $paths ) 00101 : $paths; 00102 } else { 00103 foreach ( $lockedPaths as $lType => $lPaths ) { 00104 $status->merge( $this->doFreeLocksOnServer( $lockSrv, $lPaths, $lType ) ); 00105 } 00106 break; 00107 } 00108 } 00109 00110 return $status; 00111 } 00112 00113 // @todo Change this code to work in one batch 00114 protected function freeLocksOnServer( $lockSrv, array $pathsByType ) { 00115 $status = Status::newGood(); 00116 00117 foreach ( $pathsByType as $type => $paths ) { 00118 $status->merge( $this->doFreeLocksOnServer( $lockSrv, $paths, $type ) ); 00119 } 00120 00121 return $status; 00122 } 00123 00131 protected function doGetLocksOnServer( $lockSrv, array $paths, $type ) { 00132 $status = Status::newGood(); 00133 00134 $memc = $this->getCache( $lockSrv ); 00135 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00136 00137 // Lock all of the active lock record keys... 00138 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00139 foreach ( $paths as $path ) { 00140 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00141 } 00142 00143 return $status; 00144 } 00145 00146 // Fetch all the existing lock records... 00147 $lockRecords = $memc->getMulti( $keys ); 00148 00149 $now = time(); 00150 // Check if the requested locks conflict with existing ones... 00151 foreach ( $paths as $path ) { 00152 $locksKey = $this->recordKeyForPath( $path ); 00153 $locksHeld = isset( $lockRecords[$locksKey] ) 00154 ? self::sanitizeLockArray( $lockRecords[$locksKey] ) 00155 : self::newLockArray(); // init 00156 foreach ( $locksHeld[self::LOCK_EX] as $session => $expiry ) { 00157 if ( $expiry < $now ) { // stale? 00158 unset( $locksHeld[self::LOCK_EX][$session] ); 00159 } elseif ( $session !== $this->session ) { 00160 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00161 } 00162 } 00163 if ( $type === self::LOCK_EX ) { 00164 foreach ( $locksHeld[self::LOCK_SH] as $session => $expiry ) { 00165 if ( $expiry < $now ) { // stale? 00166 unset( $locksHeld[self::LOCK_SH][$session] ); 00167 } elseif ( $session !== $this->session ) { 00168 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00169 } 00170 } 00171 } 00172 if ( $status->isOK() ) { 00173 // Register the session in the lock record array 00174 $locksHeld[$type][$this->session] = $now + $this->lockTTL; 00175 // We will update this record if none of the other locks conflict 00176 $lockRecords[$locksKey] = $locksHeld; 00177 } 00178 } 00179 00180 // If there were no lock conflicts, update all the lock records... 00181 if ( $status->isOK() ) { 00182 foreach ( $paths as $path ) { 00183 $locksKey = $this->recordKeyForPath( $path ); 00184 $locksHeld = $lockRecords[$locksKey]; 00185 $ok = $memc->set( $locksKey, $locksHeld, 7 * 86400 ); 00186 if ( !$ok ) { 00187 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00188 } else { 00189 wfDebug( __METHOD__ . ": acquired lock on key $locksKey.\n" ); 00190 } 00191 } 00192 } 00193 00194 // Unlock all of the active lock record keys... 00195 $this->releaseMutexes( $memc, $keys ); 00196 00197 return $status; 00198 } 00199 00207 protected function doFreeLocksOnServer( $lockSrv, array $paths, $type ) { 00208 $status = Status::newGood(); 00209 00210 $memc = $this->getCache( $lockSrv ); 00211 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00212 00213 // Lock all of the active lock record keys... 00214 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00215 foreach ( $paths as $path ) { 00216 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00217 } 00218 00219 return $status; 00220 } 00221 00222 // Fetch all the existing lock records... 00223 $lockRecords = $memc->getMulti( $keys ); 00224 00225 // Remove the requested locks from all records... 00226 foreach ( $paths as $path ) { 00227 $locksKey = $this->recordKeyForPath( $path ); // lock record 00228 if ( !isset( $lockRecords[$locksKey] ) ) { 00229 $status->warning( 'lockmanager-fail-releaselock', $path ); 00230 continue; // nothing to do 00231 } 00232 $locksHeld = self::sanitizeLockArray( $lockRecords[$locksKey] ); 00233 if ( isset( $locksHeld[$type][$this->session] ) ) { 00234 unset( $locksHeld[$type][$this->session] ); // unregister this session 00235 if ( $locksHeld === self::newLockArray() ) { 00236 $ok = $memc->delete( $locksKey ); 00237 } else { 00238 $ok = $memc->set( $locksKey, $locksHeld ); 00239 } 00240 if ( !$ok ) { 00241 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00242 } 00243 } else { 00244 $status->warning( 'lockmanager-fail-releaselock', $path ); 00245 } 00246 wfDebug( __METHOD__ . ": released lock on key $locksKey.\n" ); 00247 } 00248 00249 // Unlock all of the active lock record keys... 00250 $this->releaseMutexes( $memc, $keys ); 00251 00252 return $status; 00253 } 00254 00259 protected function releaseAllLocks() { 00260 return Status::newGood(); // not supported 00261 } 00262 00268 protected function isServerUp( $lockSrv ) { 00269 return (bool)$this->getCache( $lockSrv ); 00270 } 00271 00278 protected function getCache( $lockSrv ) { 00279 $memc = null; 00280 if ( isset( $this->bagOStuffs[$lockSrv] ) ) { 00281 $memc = $this->bagOStuffs[$lockSrv]; 00282 if ( !isset( $this->serversUp[$lockSrv] ) ) { 00283 $this->serversUp[$lockSrv] = $memc->set( __CLASS__ . ':ping', 1, 1 ); 00284 if ( !$this->serversUp[$lockSrv] ) { 00285 trigger_error( __METHOD__ . ": Could not contact $lockSrv.", E_USER_WARNING ); 00286 } 00287 } 00288 if ( !$this->serversUp[$lockSrv] ) { 00289 return null; // server appears to be down 00290 } 00291 } 00292 00293 return $memc; 00294 } 00295 00300 protected function recordKeyForPath( $path ) { 00301 return implode( ':', array( __CLASS__, 'locks', $this->sha1Base36Absolute( $path ) ) ); 00302 } 00303 00307 protected static function newLockArray() { 00308 return array( self::LOCK_SH => array(), self::LOCK_EX => array() ); 00309 } 00310 00315 protected static function sanitizeLockArray( $a ) { 00316 if ( is_array( $a ) && isset( $a[self::LOCK_EX] ) && isset( $a[self::LOCK_SH] ) ) { 00317 return $a; 00318 } else { 00319 trigger_error( __METHOD__ . ": reset invalid lock array.", E_USER_WARNING ); 00320 00321 return self::newLockArray(); 00322 } 00323 } 00324 00330 protected function acquireMutexes( MemcachedBagOStuff $memc, array $keys ) { 00331 $lockedKeys = array(); 00332 00333 // Acquire the keys in lexicographical order, to avoid deadlock problems. 00334 // If P1 is waiting to acquire a key P2 has, P2 can't also be waiting for a key P1 has. 00335 sort( $keys ); 00336 00337 // Try to quickly loop to acquire the keys, but back off after a few rounds. 00338 // This reduces memcached spam, especially in the rare case where a server acquires 00339 // some lock keys and dies without releasing them. Lock keys expire after a few minutes. 00340 $rounds = 0; 00341 $start = microtime( true ); 00342 do { 00343 if ( ( ++$rounds % 4 ) == 0 ) { 00344 usleep( 1000 * 50 ); // 50 ms 00345 } 00346 foreach ( array_diff( $keys, $lockedKeys ) as $key ) { 00347 if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record 00348 $lockedKeys[] = $key; 00349 } else { 00350 continue; // acquire in order 00351 } 00352 } 00353 } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 3 ); 00354 00355 if ( count( $lockedKeys ) != count( $keys ) ) { 00356 $this->releaseMutexes( $memc, $lockedKeys ); // failed; release what was locked 00357 return false; 00358 } 00359 00360 return true; 00361 } 00362 00367 protected function releaseMutexes( MemcachedBagOStuff $memc, array $keys ) { 00368 foreach ( $keys as $key ) { 00369 $memc->delete( "$key:mutex" ); 00370 } 00371 } 00372 00376 function __destruct() { 00377 while ( count( $this->locksHeld ) ) { 00378 foreach ( $this->locksHeld as $path => $locks ) { 00379 $this->doUnlock( array( $path ), self::LOCK_EX ); 00380 $this->doUnlock( array( $path ), self::LOCK_SH ); 00381 } 00382 } 00383 } 00384 }