MediaWiki
REL1_22
|
00001 <?php 00038 class MemcLockManager extends QuorumLockManager { 00040 protected $lockTypeMap = array( 00041 self::LOCK_SH => self::LOCK_SH, 00042 self::LOCK_UW => self::LOCK_SH, 00043 self::LOCK_EX => self::LOCK_EX 00044 ); 00045 00047 protected $bagOStuffs = array(); 00049 protected $serversUp = array(); // (server name => bool) 00050 00051 protected $session = ''; // string; random UUID 00052 00066 public function __construct( array $config ) { 00067 parent::__construct( $config ); 00068 00069 // Sanitize srvsByBucket config to prevent PHP errors 00070 $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' ); 00071 $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive 00072 00073 $memcConfig = isset( $config['memcConfig'] ) 00074 ? $config['memcConfig'] 00075 : array( 'class' => 'MemcachedPhpBagOStuff' ); 00076 00077 foreach ( $config['lockServers'] as $name => $address ) { 00078 $params = array( 'servers' => array( $address ) ) + $memcConfig; 00079 $cache = ObjectCache::newFromParams( $params ); 00080 if ( $cache instanceof MemcachedBagOStuff ) { 00081 $this->bagOStuffs[$name] = $cache; 00082 } else { 00083 throw new MWException( 00084 'Only MemcachedBagOStuff classes are supported by MemcLockManager.' ); 00085 } 00086 } 00087 00088 $this->session = wfRandomString( 32 ); 00089 } 00090 00091 // @TODO: change this code to work in one batch 00092 protected function getLocksOnServer( $lockSrv, array $pathsByType ) { 00093 $status = Status::newGood(); 00094 00095 $lockedPaths = array(); 00096 foreach ( $pathsByType as $type => $paths ) { 00097 $status->merge( $this->doGetLocksOnServer( $lockSrv, $paths, $type ) ); 00098 if ( $status->isOK() ) { 00099 $lockedPaths[$type] = isset( $lockedPaths[$type] ) 00100 ? array_merge( $lockedPaths[$type], $paths ) 00101 : $paths; 00102 } else { 00103 foreach ( $lockedPaths as $type => $paths ) { 00104 $status->merge( $this->doFreeLocksOnServer( $lockSrv, $paths, $type ) ); 00105 } 00106 break; 00107 } 00108 } 00109 00110 return $status; 00111 } 00112 00113 // @TODO: change this code to work in one batch 00114 protected function freeLocksOnServer( $lockSrv, array $pathsByType ) { 00115 $status = Status::newGood(); 00116 00117 foreach ( $pathsByType as $type => $paths ) { 00118 $status->merge( $this->doFreeLocksOnServer( $lockSrv, $paths, $type ) ); 00119 } 00120 00121 return $status; 00122 } 00123 00128 protected function doGetLocksOnServer( $lockSrv, array $paths, $type ) { 00129 $status = Status::newGood(); 00130 00131 $memc = $this->getCache( $lockSrv ); 00132 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00133 00134 // Lock all of the active lock record keys... 00135 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00136 foreach ( $paths as $path ) { 00137 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00138 } 00139 return $status; 00140 } 00141 00142 // Fetch all the existing lock records... 00143 $lockRecords = $memc->getMulti( $keys ); 00144 00145 $now = time(); 00146 // Check if the requested locks conflict with existing ones... 00147 foreach ( $paths as $path ) { 00148 $locksKey = $this->recordKeyForPath( $path ); 00149 $locksHeld = isset( $lockRecords[$locksKey] ) 00150 ? self::sanitizeLockArray( $lockRecords[$locksKey] ) 00151 : self::newLockArray(); // init 00152 foreach ( $locksHeld[self::LOCK_EX] as $session => $expiry ) { 00153 if ( $expiry < $now ) { // stale? 00154 unset( $locksHeld[self::LOCK_EX][$session] ); 00155 } elseif ( $session !== $this->session ) { 00156 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00157 } 00158 } 00159 if ( $type === self::LOCK_EX ) { 00160 foreach ( $locksHeld[self::LOCK_SH] as $session => $expiry ) { 00161 if ( $expiry < $now ) { // stale? 00162 unset( $locksHeld[self::LOCK_SH][$session] ); 00163 } elseif ( $session !== $this->session ) { 00164 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00165 } 00166 } 00167 } 00168 if ( $status->isOK() ) { 00169 // Register the session in the lock record array 00170 $locksHeld[$type][$this->session] = $now + $this->lockTTL; 00171 // We will update this record if none of the other locks conflict 00172 $lockRecords[$locksKey] = $locksHeld; 00173 } 00174 } 00175 00176 // If there were no lock conflicts, update all the lock records... 00177 if ( $status->isOK() ) { 00178 foreach ( $paths as $path ) { 00179 $locksKey = $this->recordKeyForPath( $path ); 00180 $locksHeld = $lockRecords[$locksKey]; 00181 $ok = $memc->set( $locksKey, $locksHeld, 7 * 86400 ); 00182 if ( !$ok ) { 00183 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00184 } else { 00185 wfDebug( __METHOD__ . ": acquired lock on key $locksKey.\n" ); 00186 } 00187 } 00188 } 00189 00190 // Unlock all of the active lock record keys... 00191 $this->releaseMutexes( $memc, $keys ); 00192 00193 return $status; 00194 } 00195 00200 protected function doFreeLocksOnServer( $lockSrv, array $paths, $type ) { 00201 $status = Status::newGood(); 00202 00203 $memc = $this->getCache( $lockSrv ); 00204 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00205 00206 // Lock all of the active lock record keys... 00207 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00208 foreach ( $paths as $path ) { 00209 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00210 } 00211 return; 00212 } 00213 00214 // Fetch all the existing lock records... 00215 $lockRecords = $memc->getMulti( $keys ); 00216 00217 // Remove the requested locks from all records... 00218 foreach ( $paths as $path ) { 00219 $locksKey = $this->recordKeyForPath( $path ); // lock record 00220 if ( !isset( $lockRecords[$locksKey] ) ) { 00221 $status->warning( 'lockmanager-fail-releaselock', $path ); 00222 continue; // nothing to do 00223 } 00224 $locksHeld = self::sanitizeLockArray( $lockRecords[$locksKey] ); 00225 if ( isset( $locksHeld[$type][$this->session] ) ) { 00226 unset( $locksHeld[$type][$this->session] ); // unregister this session 00227 if ( $locksHeld === self::newLockArray() ) { 00228 $ok = $memc->delete( $locksKey ); 00229 } else { 00230 $ok = $memc->set( $locksKey, $locksHeld ); 00231 } 00232 if ( !$ok ) { 00233 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00234 } 00235 } else { 00236 $status->warning( 'lockmanager-fail-releaselock', $path ); 00237 } 00238 wfDebug( __METHOD__ . ": released lock on key $locksKey.\n" ); 00239 } 00240 00241 // Unlock all of the active lock record keys... 00242 $this->releaseMutexes( $memc, $keys ); 00243 00244 return $status; 00245 } 00246 00251 protected function releaseAllLocks() { 00252 return Status::newGood(); // not supported 00253 } 00254 00259 protected function isServerUp( $lockSrv ) { 00260 return (bool)$this->getCache( $lockSrv ); 00261 } 00262 00269 protected function getCache( $lockSrv ) { 00270 $memc = null; 00271 if ( isset( $this->bagOStuffs[$lockSrv] ) ) { 00272 $memc = $this->bagOStuffs[$lockSrv]; 00273 if ( !isset( $this->serversUp[$lockSrv] ) ) { 00274 $this->serversUp[$lockSrv] = $memc->set( __CLASS__ . ':ping', 1, 1 ); 00275 if ( !$this->serversUp[$lockSrv] ) { 00276 trigger_error( __METHOD__ . ": Could not contact $lockSrv.", E_USER_WARNING ); 00277 } 00278 } 00279 if ( !$this->serversUp[$lockSrv] ) { 00280 return null; // server appears to be down 00281 } 00282 } 00283 return $memc; 00284 } 00285 00290 protected function recordKeyForPath( $path ) { 00291 return implode( ':', array( __CLASS__, 'locks', $this->sha1Base36Absolute( $path ) ) ); 00292 } 00293 00297 protected static function newLockArray() { 00298 return array( self::LOCK_SH => array(), self::LOCK_EX => array() ); 00299 } 00300 00305 protected static function sanitizeLockArray( $a ) { 00306 if ( is_array( $a ) && isset( $a[self::LOCK_EX] ) && isset( $a[self::LOCK_SH] ) ) { 00307 return $a; 00308 } else { 00309 trigger_error( __METHOD__ . ": reset invalid lock array.", E_USER_WARNING ); 00310 return self::newLockArray(); 00311 } 00312 } 00313 00319 protected function acquireMutexes( MemcachedBagOStuff $memc, array $keys ) { 00320 $lockedKeys = array(); 00321 00322 // Acquire the keys in lexicographical order, to avoid deadlock problems. 00323 // If P1 is waiting to acquire a key P2 has, P2 can't also be waiting for a key P1 has. 00324 sort( $keys ); 00325 00326 // Try to quickly loop to acquire the keys, but back off after a few rounds. 00327 // This reduces memcached spam, especially in the rare case where a server acquires 00328 // some lock keys and dies without releasing them. Lock keys expire after a few minutes. 00329 $rounds = 0; 00330 $start = microtime( true ); 00331 do { 00332 if ( ( ++$rounds % 4 ) == 0 ) { 00333 usleep( 1000 * 50 ); // 50 ms 00334 } 00335 foreach ( array_diff( $keys, $lockedKeys ) as $key ) { 00336 if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record 00337 $lockedKeys[] = $key; 00338 } else { 00339 continue; // acquire in order 00340 } 00341 } 00342 } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 3 ); 00343 00344 if ( count( $lockedKeys ) != count( $keys ) ) { 00345 $this->releaseMutexes( $memc, $lockedKeys ); // failed; release what was locked 00346 return false; 00347 } 00348 00349 return true; 00350 } 00351 00357 protected function releaseMutexes( MemcachedBagOStuff $memc, array $keys ) { 00358 foreach ( $keys as $key ) { 00359 $memc->delete( "$key:mutex" ); 00360 } 00361 } 00362 00366 function __destruct() { 00367 while ( count( $this->locksHeld ) ) { 00368 foreach ( $this->locksHeld as $path => $locks ) { 00369 $this->doUnlock( array( $path ), self::LOCK_EX ); 00370 $this->doUnlock( array( $path ), self::LOCK_SH ); 00371 } 00372 } 00373 } 00374 }