00001 <?php
00018 class LSLockManager extends LockManager {
00020         protected $lockTypeMap = array(
00021                 self::LOCK_SH => self::LOCK_SH,
00022                 self::LOCK_UW => self::LOCK_SH,
00023                 self::LOCK_EX => self::LOCK_EX
00024         );
00027         protected $lockServers; // (server name => server config array)
00029         protected $srvsByBucket; // (bucket index => (lsrv1, lsrv2, ...))
00032         protected $conns = array();
00034         protected $connTimeout; // float number of seconds
00035         protected $session = ''; // random SHA-1 string
00052         public function __construct( array $config ) {
00053                 $this->lockServers = $config['lockServers'];
00054                 // Sanitize srvsByBucket config to prevent PHP errors
00055                 $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' );
00056                 $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive
00058                 if ( isset( $config['connTimeout'] ) ) {
00059                         $this->connTimeout = $config['connTimeout'];
00060                 } else {
00061                         $this->connTimeout = 3; // use some sane amount
00062                 }
00064                 $this->session = '';
00065                 for ( $i = 0; $i < 5; $i++ ) {
00066                         $this->session .= mt_rand( 0, 2147483647 );
00067                 }
00068                 $this->session = wfBaseConvert( sha1( $this->session ), 16, 36, 31 );
00069         }
00071         protected function doLock( array $paths, $type ) {
00072                 $status = Status::newGood();
00074                 $pathsToLock = array();
00075                 // Get locks that need to be acquired (buckets => locks)...
00076                 foreach ( $paths as $path ) {
00077                         if ( isset( $this->locksHeld[$path][$type] ) ) {
00078                                 ++$this->locksHeld[$path][$type];
00079                         } elseif ( isset( $this->locksHeld[$path][self::LOCK_EX] ) ) {
00080                                 $this->locksHeld[$path][$type] = 1;
00081                         } else {
00082                                 $bucket = $this->getBucketFromKey( $path );
00083                                 $pathsToLock[$bucket][] = $path;
00084                         }
00085                 }
00087                 $lockedPaths = array(); // files locked in this attempt
00088                 // Attempt to acquire these locks...
00089                 foreach ( $pathsToLock as $bucket => $paths ) {
00090                         // Try to acquire the locks for this bucket
00091                         $res = $this->doLockingRequestAll( $bucket, $paths, $type );
00092                         if ( $res === 'cantacquire' ) {
00093                                 // Resources already locked by another process.
00094                                 // Abort and unlock everything we just locked.
00095                                 foreach ( $paths as $path ) {
00096                                         $status->fatal( 'lockmanager-fail-acquirelock', $path );
00097                                 }
00098                                 $status->merge( $this->doUnlock( $lockedPaths, $type ) );
00099                                 return $status;
00100                         } elseif ( $res !== true ) {
00101                                 // Couldn't contact any servers for this bucket.
00102                                 // Abort and unlock everything we just locked.
00103                                 foreach ( $paths as $path ) {
00104                                         $status->fatal( 'lockmanager-fail-acquirelock', $path );
00105                                 }
00106                                 $status->merge( $this->doUnlock( $lockedPaths, $type ) );
00107                                 return $status;
00108                         }
00109                         // Record these locks as active
00110                         foreach ( $paths as $path ) {
00111                                 $this->locksHeld[$path][$type] = 1; // locked
00112                         }
00113                         // Keep track of what locks were made in this attempt
00114                         $lockedPaths = array_merge( $lockedPaths, $paths );
00115                 }
00117                 return $status;
00118         }
00120         protected function doUnlock( array $paths, $type ) {
00121                 $status = Status::newGood();
00123                 foreach ( $paths as $path ) {
00124                         if ( !isset( $this->locksHeld[$path] ) ) {
00125                                 $status->warning( 'lockmanager-notlocked', $path );
00126                         } elseif ( !isset( $this->locksHeld[$path][$type] ) ) {
00127                                 $status->warning( 'lockmanager-notlocked', $path );
00128                         } else {
00129                                 --$this->locksHeld[$path][$type];
00130                                 if ( $this->locksHeld[$path][$type] <= 0 ) {
00131                                         unset( $this->locksHeld[$path][$type] );
00132                                 }
00133                                 if ( !count( $this->locksHeld[$path] ) ) {
00134                                         unset( $this->locksHeld[$path] ); // no SH or EX locks left for key
00135                                 }
00136                         }
00137                 }
00139                 // Reference count the locks held and release locks when zero
00140                 if ( !count( $this->locksHeld ) ) {
00141                         $status->merge( $this->releaseLocks() );
00142                 }
00144                 return $status;
00145         }
00155         protected function doLockingRequest( $lockSrv, array $paths, $type ) {
00156                 if ( $type == self::LOCK_SH ) { // reader locks
00157                         $type = 'SH';
00158                 } elseif ( $type == self::LOCK_EX ) { // writer locks
00159                         $type = 'EX';
00160                 } else {
00161                         return true; // ok...
00162                 }
00164                 // Send out the command and get the response...
00165                 $keys = array_unique( array_map( 'LockManager::sha1Base36', $paths ) );
00166                 $response = $this->sendCommand( $lockSrv, 'ACQUIRE', $type, $keys );
00168                 return ( $response === 'ACQUIRED' );
00169         }
00180         protected function sendCommand( $lockSrv, $action, $type, $values ) {
00181                 $conn = $this->getConnection( $lockSrv );
00182                 if ( !$conn ) {
00183                         return false; // no connection
00184                 }
00185                 $authKey = $this->lockServers[$lockSrv]['authKey'];
00186                 // Build of the command as a flat string...
00187                 $values = implode( '|', $values );
00188                 $key = sha1( $this->session . $action . $type . $values . $authKey );
00189                 // Send out the command...
00190                 if ( fwrite( $conn, "{$this->session}:$key:$action:$type:$values\n" ) === false ) {
00191                         return false;
00192                 }
00193                 // Get the response...
00194                 $response = fgets( $conn );
00195                 if ( $response === false ) {
00196                         return false;
00197                 }
00198                 return trim( $response );
00199         }
00209         protected function doLockingRequestAll( $bucket, array $paths, $type ) {
00210                 $yesVotes = 0; // locks made on trustable servers
00211                 $votesLeft = count( $this->srvsByBucket[$bucket] ); // remaining peers
00212                 $quorum = floor( $votesLeft/2 + 1 ); // simple majority
00213                 // Get votes for each peer, in order, until we have enough...
00214                 foreach ( $this->srvsByBucket[$bucket] as $lockSrv ) {
00215                         // Attempt to acquire the lock on this peer
00216                         if ( !$this->doLockingRequest( $lockSrv, $paths, $type ) ) {
00217                                 return 'cantacquire'; // vetoed; resource locked
00218                         }
00219                         ++$yesVotes; // success for this peer
00220                         if ( $yesVotes >= $quorum ) {
00221                                 return true; // lock obtained
00222                         }
00223                         --$votesLeft;
00224                         $votesNeeded = $quorum - $yesVotes;
00225                         if ( $votesNeeded > $votesLeft ) {
00226                                 // In "trust cache" mode we don't have to meet the quorum
00227                                 break; // short-circuit
00228                         }
00229                 }
00230                 // At this point, we must not have meet the quorum
00231                 return 'srverrors'; // not enough votes to ensure correctness
00232         }
00240         protected function getConnection( $lockSrv ) {
00241                 if ( !isset( $this->conns[$lockSrv] ) ) {
00242                         $cfg = $this->lockServers[$lockSrv];
00243                         wfSuppressWarnings();
00244                         $errno = $errstr = '';
00245                         $conn = fsockopen( $cfg['host'], $cfg['port'], $errno, $errstr, $this->connTimeout );
00246                         wfRestoreWarnings();
00247                         if ( $conn === false ) {
00248                                 return null;
00249                         }
00250                         $sec = floor( $this->connTimeout );
00251                         $usec = floor( ( $this->connTimeout - floor( $this->connTimeout ) ) * 1e6 );
00252                         stream_set_timeout( $conn, $sec, $usec );
00253                         $this->conns[$lockSrv] = $conn;
00254                 }
00255                 return $this->conns[$lockSrv];
00256         }
00263         protected function releaseLocks() {
00264                 $status = Status::newGood();
00265                 foreach ( $this->conns as $lockSrv => $conn ) {
00266                         $response = $this->sendCommand( $lockSrv, 'RELEASE_ALL', '', array() );
00267                         if ( $response !== 'RELEASED_ALL' ) {
00268                                 $status->fatal( 'lockmanager-fail-svr-release', $lockSrv );
00269                         }
00270                 }
00271                 return $status;
00272         }
00281         protected function getBucketFromKey( $path ) {
00282                 $prefix = substr( sha1( $path ), 0, 2 ); // first 2 hex chars (8 bits)
00283                 return intval( base_convert( $prefix, 16, 10 ) ) % count( $this->srvsByBucket );
00284         }
00289         function __destruct() {
00290                 $this->releaseLocks();
00291                 foreach ( $this->conns as $conn ) {
00292                         fclose( $conn );
00293                 }
00294         }
00295 }