MediaWiki  REL1_24
MemcLockManager.php
Go to the documentation of this file.
00001 <?php
00038 class MemcLockManager extends QuorumLockManager {
00040     protected $lockTypeMap = array(
00041         self::LOCK_SH => self::LOCK_SH,
00042         self::LOCK_UW => self::LOCK_SH,
00043         self::LOCK_EX => self::LOCK_EX
00044     );
00045 
00047     protected $bagOStuffs = array();
00048 
00050     protected $serversUp = array();
00051 
00053     protected $session = '';
00054 
00066     public function __construct( array $config ) {
00067         parent::__construct( $config );
00068 
00069         // Sanitize srvsByBucket config to prevent PHP errors
00070         $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' );
00071         $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive
00072 
00073         $memcConfig = isset( $config['memcConfig'] )
00074             ? $config['memcConfig']
00075             : array( 'class' => 'MemcachedPhpBagOStuff' );
00076 
00077         foreach ( $config['lockServers'] as $name => $address ) {
00078             $params = array( 'servers' => array( $address ) ) + $memcConfig;
00079             $cache = ObjectCache::newFromParams( $params );
00080             if ( $cache instanceof MemcachedBagOStuff ) {
00081                 $this->bagOStuffs[$name] = $cache;
00082             } else {
00083                 throw new MWException(
00084                     'Only MemcachedBagOStuff classes are supported by MemcLockManager.' );
00085             }
00086         }
00087 
00088         $this->session = wfRandomString( 32 );
00089     }
00090 
00091     // @todo Change this code to work in one batch
00092     protected function getLocksOnServer( $lockSrv, array $pathsByType ) {
00093         $status = Status::newGood();
00094 
00095         $lockedPaths = array();
00096         foreach ( $pathsByType as $type => $paths ) {
00097             $status->merge( $this->doGetLocksOnServer( $lockSrv, $paths, $type ) );
00098             if ( $status->isOK() ) {
00099                 $lockedPaths[$type] = isset( $lockedPaths[$type] )
00100                     ? array_merge( $lockedPaths[$type], $paths )
00101                     : $paths;
00102             } else {
00103                 foreach ( $lockedPaths as $lType => $lPaths ) {
00104                     $status->merge( $this->doFreeLocksOnServer( $lockSrv, $lPaths, $lType ) );
00105                 }
00106                 break;
00107             }
00108         }
00109 
00110         return $status;
00111     }
00112 
00113     // @todo Change this code to work in one batch
00114     protected function freeLocksOnServer( $lockSrv, array $pathsByType ) {
00115         $status = Status::newGood();
00116 
00117         foreach ( $pathsByType as $type => $paths ) {
00118             $status->merge( $this->doFreeLocksOnServer( $lockSrv, $paths, $type ) );
00119         }
00120 
00121         return $status;
00122     }
00123 
00131     protected function doGetLocksOnServer( $lockSrv, array $paths, $type ) {
00132         $status = Status::newGood();
00133 
00134         $memc = $this->getCache( $lockSrv );
00135         $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records
00136 
00137         // Lock all of the active lock record keys...
00138         if ( !$this->acquireMutexes( $memc, $keys ) ) {
00139             foreach ( $paths as $path ) {
00140                 $status->fatal( 'lockmanager-fail-acquirelock', $path );
00141             }
00142 
00143             return $status;
00144         }
00145 
00146         // Fetch all the existing lock records...
00147         $lockRecords = $memc->getMulti( $keys );
00148 
00149         $now = time();
00150         // Check if the requested locks conflict with existing ones...
00151         foreach ( $paths as $path ) {
00152             $locksKey = $this->recordKeyForPath( $path );
00153             $locksHeld = isset( $lockRecords[$locksKey] )
00154                 ? self::sanitizeLockArray( $lockRecords[$locksKey] )
00155                 : self::newLockArray(); // init
00156             foreach ( $locksHeld[self::LOCK_EX] as $session => $expiry ) {
00157                 if ( $expiry < $now ) { // stale?
00158                     unset( $locksHeld[self::LOCK_EX][$session] );
00159                 } elseif ( $session !== $this->session ) {
00160                     $status->fatal( 'lockmanager-fail-acquirelock', $path );
00161                 }
00162             }
00163             if ( $type === self::LOCK_EX ) {
00164                 foreach ( $locksHeld[self::LOCK_SH] as $session => $expiry ) {
00165                     if ( $expiry < $now ) { // stale?
00166                         unset( $locksHeld[self::LOCK_SH][$session] );
00167                     } elseif ( $session !== $this->session ) {
00168                         $status->fatal( 'lockmanager-fail-acquirelock', $path );
00169                     }
00170                 }
00171             }
00172             if ( $status->isOK() ) {
00173                 // Register the session in the lock record array
00174                 $locksHeld[$type][$this->session] = $now + $this->lockTTL;
00175                 // We will update this record if none of the other locks conflict
00176                 $lockRecords[$locksKey] = $locksHeld;
00177             }
00178         }
00179 
00180         // If there were no lock conflicts, update all the lock records...
00181         if ( $status->isOK() ) {
00182             foreach ( $paths as $path ) {
00183                 $locksKey = $this->recordKeyForPath( $path );
00184                 $locksHeld = $lockRecords[$locksKey];
00185                 $ok = $memc->set( $locksKey, $locksHeld, 7 * 86400 );
00186                 if ( !$ok ) {
00187                     $status->fatal( 'lockmanager-fail-acquirelock', $path );
00188                 } else {
00189                     wfDebug( __METHOD__ . ": acquired lock on key $locksKey.\n" );
00190                 }
00191             }
00192         }
00193 
00194         // Unlock all of the active lock record keys...
00195         $this->releaseMutexes( $memc, $keys );
00196 
00197         return $status;
00198     }
00199 
00207     protected function doFreeLocksOnServer( $lockSrv, array $paths, $type ) {
00208         $status = Status::newGood();
00209 
00210         $memc = $this->getCache( $lockSrv );
00211         $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records
00212 
00213         // Lock all of the active lock record keys...
00214         if ( !$this->acquireMutexes( $memc, $keys ) ) {
00215             foreach ( $paths as $path ) {
00216                 $status->fatal( 'lockmanager-fail-releaselock', $path );
00217             }
00218 
00219             return $status;
00220         }
00221 
00222         // Fetch all the existing lock records...
00223         $lockRecords = $memc->getMulti( $keys );
00224 
00225         // Remove the requested locks from all records...
00226         foreach ( $paths as $path ) {
00227             $locksKey = $this->recordKeyForPath( $path ); // lock record
00228             if ( !isset( $lockRecords[$locksKey] ) ) {
00229                 $status->warning( 'lockmanager-fail-releaselock', $path );
00230                 continue; // nothing to do
00231             }
00232             $locksHeld = self::sanitizeLockArray( $lockRecords[$locksKey] );
00233             if ( isset( $locksHeld[$type][$this->session] ) ) {
00234                 unset( $locksHeld[$type][$this->session] ); // unregister this session
00235                 if ( $locksHeld === self::newLockArray() ) {
00236                     $ok = $memc->delete( $locksKey );
00237                 } else {
00238                     $ok = $memc->set( $locksKey, $locksHeld );
00239                 }
00240                 if ( !$ok ) {
00241                     $status->fatal( 'lockmanager-fail-releaselock', $path );
00242                 }
00243             } else {
00244                 $status->warning( 'lockmanager-fail-releaselock', $path );
00245             }
00246             wfDebug( __METHOD__ . ": released lock on key $locksKey.\n" );
00247         }
00248 
00249         // Unlock all of the active lock record keys...
00250         $this->releaseMutexes( $memc, $keys );
00251 
00252         return $status;
00253     }
00254 
00259     protected function releaseAllLocks() {
00260         return Status::newGood(); // not supported
00261     }
00262 
00268     protected function isServerUp( $lockSrv ) {
00269         return (bool)$this->getCache( $lockSrv );
00270     }
00271 
00278     protected function getCache( $lockSrv ) {
00279         $memc = null;
00280         if ( isset( $this->bagOStuffs[$lockSrv] ) ) {
00281             $memc = $this->bagOStuffs[$lockSrv];
00282             if ( !isset( $this->serversUp[$lockSrv] ) ) {
00283                 $this->serversUp[$lockSrv] = $memc->set( __CLASS__ . ':ping', 1, 1 );
00284                 if ( !$this->serversUp[$lockSrv] ) {
00285                     trigger_error( __METHOD__ . ": Could not contact $lockSrv.", E_USER_WARNING );
00286                 }
00287             }
00288             if ( !$this->serversUp[$lockSrv] ) {
00289                 return null; // server appears to be down
00290             }
00291         }
00292 
00293         return $memc;
00294     }
00295 
00300     protected function recordKeyForPath( $path ) {
00301         return implode( ':', array( __CLASS__, 'locks', $this->sha1Base36Absolute( $path ) ) );
00302     }
00303 
00307     protected static function newLockArray() {
00308         return array( self::LOCK_SH => array(), self::LOCK_EX => array() );
00309     }
00310 
00315     protected static function sanitizeLockArray( $a ) {
00316         if ( is_array( $a ) && isset( $a[self::LOCK_EX] ) && isset( $a[self::LOCK_SH] ) ) {
00317             return $a;
00318         } else {
00319             trigger_error( __METHOD__ . ": reset invalid lock array.", E_USER_WARNING );
00320 
00321             return self::newLockArray();
00322         }
00323     }
00324 
00330     protected function acquireMutexes( MemcachedBagOStuff $memc, array $keys ) {
00331         $lockedKeys = array();
00332 
00333         // Acquire the keys in lexicographical order, to avoid deadlock problems.
00334         // If P1 is waiting to acquire a key P2 has, P2 can't also be waiting for a key P1 has.
00335         sort( $keys );
00336 
00337         // Try to quickly loop to acquire the keys, but back off after a few rounds.
00338         // This reduces memcached spam, especially in the rare case where a server acquires
00339         // some lock keys and dies without releasing them. Lock keys expire after a few minutes.
00340         $rounds = 0;
00341         $start = microtime( true );
00342         do {
00343             if ( ( ++$rounds % 4 ) == 0 ) {
00344                 usleep( 1000 * 50 ); // 50 ms
00345             }
00346             foreach ( array_diff( $keys, $lockedKeys ) as $key ) {
00347                 if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record
00348                     $lockedKeys[] = $key;
00349                 } else {
00350                     continue; // acquire in order
00351                 }
00352             }
00353         } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 3 );
00354 
00355         if ( count( $lockedKeys ) != count( $keys ) ) {
00356             $this->releaseMutexes( $memc, $lockedKeys ); // failed; release what was locked
00357             return false;
00358         }
00359 
00360         return true;
00361     }
00362 
00367     protected function releaseMutexes( MemcachedBagOStuff $memc, array $keys ) {
00368         foreach ( $keys as $key ) {
00369             $memc->delete( "$key:mutex" );
00370         }
00371     }
00372 
00376     function __destruct() {
00377         while ( count( $this->locksHeld ) ) {
00378             foreach ( $this->locksHeld as $path => $locks ) {
00379                 $this->doUnlock( array( $path ), self::LOCK_EX );
00380                 $this->doUnlock( array( $path ), self::LOCK_SH );
00381             }
00382         }
00383     }
00384 }