MediaWiki  REL1_22
MemcLockManager.php
Go to the documentation of this file.
00001 <?php
00038 class MemcLockManager extends QuorumLockManager {
00040     protected $lockTypeMap = array(
00041         self::LOCK_SH => self::LOCK_SH,
00042         self::LOCK_UW => self::LOCK_SH,
00043         self::LOCK_EX => self::LOCK_EX
00044     );
00045 
00047     protected $bagOStuffs = array();
00049     protected $serversUp = array(); // (server name => bool)
00050 
00051     protected $session = ''; // string; random UUID
00052 
00066     public function __construct( array $config ) {
00067         parent::__construct( $config );
00068 
00069         // Sanitize srvsByBucket config to prevent PHP errors
00070         $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' );
00071         $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive
00072 
00073         $memcConfig = isset( $config['memcConfig'] )
00074             ? $config['memcConfig']
00075             : array( 'class' => 'MemcachedPhpBagOStuff' );
00076 
00077         foreach ( $config['lockServers'] as $name => $address ) {
00078             $params = array( 'servers' => array( $address ) ) + $memcConfig;
00079             $cache = ObjectCache::newFromParams( $params );
00080             if ( $cache instanceof MemcachedBagOStuff ) {
00081                 $this->bagOStuffs[$name] = $cache;
00082             } else {
00083                 throw new MWException(
00084                     'Only MemcachedBagOStuff classes are supported by MemcLockManager.' );
00085             }
00086         }
00087 
00088         $this->session = wfRandomString( 32 );
00089     }
00090 
00091     // @TODO: change this code to work in one batch
00092     protected function getLocksOnServer( $lockSrv, array $pathsByType ) {
00093         $status = Status::newGood();
00094 
00095         $lockedPaths = array();
00096         foreach ( $pathsByType as $type => $paths ) {
00097             $status->merge( $this->doGetLocksOnServer( $lockSrv, $paths, $type ) );
00098             if ( $status->isOK() ) {
00099                 $lockedPaths[$type] = isset( $lockedPaths[$type] )
00100                     ? array_merge( $lockedPaths[$type], $paths )
00101                     : $paths;
00102             } else {
00103                 foreach ( $lockedPaths as $type => $paths ) {
00104                     $status->merge( $this->doFreeLocksOnServer( $lockSrv, $paths, $type ) );
00105                 }
00106                 break;
00107             }
00108         }
00109 
00110         return $status;
00111     }
00112 
00113     // @TODO: change this code to work in one batch
00114     protected function freeLocksOnServer( $lockSrv, array $pathsByType ) {
00115         $status = Status::newGood();
00116 
00117         foreach ( $pathsByType as $type => $paths ) {
00118             $status->merge( $this->doFreeLocksOnServer( $lockSrv, $paths, $type ) );
00119         }
00120 
00121         return $status;
00122     }
00123 
00128     protected function doGetLocksOnServer( $lockSrv, array $paths, $type ) {
00129         $status = Status::newGood();
00130 
00131         $memc = $this->getCache( $lockSrv );
00132         $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records
00133 
00134         // Lock all of the active lock record keys...
00135         if ( !$this->acquireMutexes( $memc, $keys ) ) {
00136             foreach ( $paths as $path ) {
00137                 $status->fatal( 'lockmanager-fail-acquirelock', $path );
00138             }
00139             return $status;
00140         }
00141 
00142         // Fetch all the existing lock records...
00143         $lockRecords = $memc->getMulti( $keys );
00144 
00145         $now = time();
00146         // Check if the requested locks conflict with existing ones...
00147         foreach ( $paths as $path ) {
00148             $locksKey = $this->recordKeyForPath( $path );
00149             $locksHeld = isset( $lockRecords[$locksKey] )
00150                 ? self::sanitizeLockArray( $lockRecords[$locksKey] )
00151                 : self::newLockArray(); // init
00152             foreach ( $locksHeld[self::LOCK_EX] as $session => $expiry ) {
00153                 if ( $expiry < $now ) { // stale?
00154                     unset( $locksHeld[self::LOCK_EX][$session] );
00155                 } elseif ( $session !== $this->session ) {
00156                     $status->fatal( 'lockmanager-fail-acquirelock', $path );
00157                 }
00158             }
00159             if ( $type === self::LOCK_EX ) {
00160                 foreach ( $locksHeld[self::LOCK_SH] as $session => $expiry ) {
00161                     if ( $expiry < $now ) { // stale?
00162                         unset( $locksHeld[self::LOCK_SH][$session] );
00163                     } elseif ( $session !== $this->session ) {
00164                         $status->fatal( 'lockmanager-fail-acquirelock', $path );
00165                     }
00166                 }
00167             }
00168             if ( $status->isOK() ) {
00169                 // Register the session in the lock record array
00170                 $locksHeld[$type][$this->session] = $now + $this->lockTTL;
00171                 // We will update this record if none of the other locks conflict
00172                 $lockRecords[$locksKey] = $locksHeld;
00173             }
00174         }
00175 
00176         // If there were no lock conflicts, update all the lock records...
00177         if ( $status->isOK() ) {
00178             foreach ( $paths as $path ) {
00179                 $locksKey = $this->recordKeyForPath( $path );
00180                 $locksHeld = $lockRecords[$locksKey];
00181                 $ok = $memc->set( $locksKey, $locksHeld, 7 * 86400 );
00182                 if ( !$ok ) {
00183                     $status->fatal( 'lockmanager-fail-acquirelock', $path );
00184                 } else {
00185                     wfDebug( __METHOD__ . ": acquired lock on key $locksKey.\n" );
00186                 }
00187             }
00188         }
00189 
00190         // Unlock all of the active lock record keys...
00191         $this->releaseMutexes( $memc, $keys );
00192 
00193         return $status;
00194     }
00195 
00200     protected function doFreeLocksOnServer( $lockSrv, array $paths, $type ) {
00201         $status = Status::newGood();
00202 
00203         $memc = $this->getCache( $lockSrv );
00204         $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records
00205 
00206         // Lock all of the active lock record keys...
00207         if ( !$this->acquireMutexes( $memc, $keys ) ) {
00208             foreach ( $paths as $path ) {
00209                 $status->fatal( 'lockmanager-fail-releaselock', $path );
00210             }
00211             return;
00212         }
00213 
00214         // Fetch all the existing lock records...
00215         $lockRecords = $memc->getMulti( $keys );
00216 
00217         // Remove the requested locks from all records...
00218         foreach ( $paths as $path ) {
00219             $locksKey = $this->recordKeyForPath( $path ); // lock record
00220             if ( !isset( $lockRecords[$locksKey] ) ) {
00221                 $status->warning( 'lockmanager-fail-releaselock', $path );
00222                 continue; // nothing to do
00223             }
00224             $locksHeld = self::sanitizeLockArray( $lockRecords[$locksKey] );
00225             if ( isset( $locksHeld[$type][$this->session] ) ) {
00226                 unset( $locksHeld[$type][$this->session] ); // unregister this session
00227                 if ( $locksHeld === self::newLockArray() ) {
00228                     $ok = $memc->delete( $locksKey );
00229                 } else {
00230                     $ok = $memc->set( $locksKey, $locksHeld );
00231                 }
00232                 if ( !$ok ) {
00233                     $status->fatal( 'lockmanager-fail-releaselock', $path );
00234                 }
00235             } else {
00236                 $status->warning( 'lockmanager-fail-releaselock', $path );
00237             }
00238             wfDebug( __METHOD__ . ": released lock on key $locksKey.\n" );
00239         }
00240 
00241         // Unlock all of the active lock record keys...
00242         $this->releaseMutexes( $memc, $keys );
00243 
00244         return $status;
00245     }
00246 
00251     protected function releaseAllLocks() {
00252         return Status::newGood(); // not supported
00253     }
00254 
00259     protected function isServerUp( $lockSrv ) {
00260         return (bool)$this->getCache( $lockSrv );
00261     }
00262 
00269     protected function getCache( $lockSrv ) {
00270         $memc = null;
00271         if ( isset( $this->bagOStuffs[$lockSrv] ) ) {
00272             $memc = $this->bagOStuffs[$lockSrv];
00273             if ( !isset( $this->serversUp[$lockSrv] ) ) {
00274                 $this->serversUp[$lockSrv] = $memc->set( __CLASS__ . ':ping', 1, 1 );
00275                 if ( !$this->serversUp[$lockSrv] ) {
00276                     trigger_error( __METHOD__ . ": Could not contact $lockSrv.", E_USER_WARNING );
00277                 }
00278             }
00279             if ( !$this->serversUp[$lockSrv] ) {
00280                 return null; // server appears to be down
00281             }
00282         }
00283         return $memc;
00284     }
00285 
00290     protected function recordKeyForPath( $path ) {
00291         return implode( ':', array( __CLASS__, 'locks', $this->sha1Base36Absolute( $path ) ) );
00292     }
00293 
00297     protected static function newLockArray() {
00298         return array( self::LOCK_SH => array(), self::LOCK_EX => array() );
00299     }
00300 
00305     protected static function sanitizeLockArray( $a ) {
00306         if ( is_array( $a ) && isset( $a[self::LOCK_EX] ) && isset( $a[self::LOCK_SH] ) ) {
00307             return $a;
00308         } else {
00309             trigger_error( __METHOD__ . ": reset invalid lock array.", E_USER_WARNING );
00310             return self::newLockArray();
00311         }
00312     }
00313 
00319     protected function acquireMutexes( MemcachedBagOStuff $memc, array $keys ) {
00320         $lockedKeys = array();
00321 
00322         // Acquire the keys in lexicographical order, to avoid deadlock problems.
00323         // If P1 is waiting to acquire a key P2 has, P2 can't also be waiting for a key P1 has.
00324         sort( $keys );
00325 
00326         // Try to quickly loop to acquire the keys, but back off after a few rounds.
00327         // This reduces memcached spam, especially in the rare case where a server acquires
00328         // some lock keys and dies without releasing them. Lock keys expire after a few minutes.
00329         $rounds = 0;
00330         $start = microtime( true );
00331         do {
00332             if ( ( ++$rounds % 4 ) == 0 ) {
00333                 usleep( 1000 * 50 ); // 50 ms
00334             }
00335             foreach ( array_diff( $keys, $lockedKeys ) as $key ) {
00336                 if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record
00337                     $lockedKeys[] = $key;
00338                 } else {
00339                     continue; // acquire in order
00340                 }
00341             }
00342         } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 3 );
00343 
00344         if ( count( $lockedKeys ) != count( $keys ) ) {
00345             $this->releaseMutexes( $memc, $lockedKeys ); // failed; release what was locked
00346             return false;
00347         }
00348 
00349         return true;
00350     }
00351 
00357     protected function releaseMutexes( MemcachedBagOStuff $memc, array $keys ) {
00358         foreach ( $keys as $key ) {
00359             $memc->delete( "$key:mutex" );
00360         }
00361     }
00362 
00366     function __destruct() {
00367         while ( count( $this->locksHeld ) ) {
00368             foreach ( $this->locksHeld as $path => $locks ) {
00369                 $this->doUnlock( array( $path ), self::LOCK_EX );
00370                 $this->doUnlock( array( $path ), self::LOCK_SH );
00371             }
00372         }
00373     }
00374 }