MediaWiki
REL1_19
|
00001 <?php 00002 00018 class LSLockManager extends LockManager { 00020 protected $lockTypeMap = array( 00021 self::LOCK_SH => self::LOCK_SH, 00022 self::LOCK_UW => self::LOCK_SH, 00023 self::LOCK_EX => self::LOCK_EX 00024 ); 00025 00027 protected $lockServers; // (server name => server config array) 00029 protected $srvsByBucket; // (bucket index => (lsrv1, lsrv2, ...)) 00030 00032 protected $conns = array(); 00033 00034 protected $connTimeout; // float number of seconds 00035 protected $session = ''; // random SHA-1 string 00036 00052 public function __construct( array $config ) { 00053 $this->lockServers = $config['lockServers']; 00054 // Sanitize srvsByBucket config to prevent PHP errors 00055 $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' ); 00056 $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive 00057 00058 if ( isset( $config['connTimeout'] ) ) { 00059 $this->connTimeout = $config['connTimeout']; 00060 } else { 00061 $this->connTimeout = 3; // use some sane amount 00062 } 00063 00064 $this->session = ''; 00065 for ( $i = 0; $i < 5; $i++ ) { 00066 $this->session .= mt_rand( 0, 2147483647 ); 00067 } 00068 $this->session = wfBaseConvert( sha1( $this->session ), 16, 36, 31 ); 00069 } 00070 00071 protected function doLock( array $paths, $type ) { 00072 $status = Status::newGood(); 00073 00074 $pathsToLock = array(); 00075 // Get locks that need to be acquired (buckets => locks)... 00076 foreach ( $paths as $path ) { 00077 if ( isset( $this->locksHeld[$path][$type] ) ) { 00078 ++$this->locksHeld[$path][$type]; 00079 } elseif ( isset( $this->locksHeld[$path][self::LOCK_EX] ) ) { 00080 $this->locksHeld[$path][$type] = 1; 00081 } else { 00082 $bucket = $this->getBucketFromKey( $path ); 00083 $pathsToLock[$bucket][] = $path; 00084 } 00085 } 00086 00087 $lockedPaths = array(); // files locked in this attempt 00088 // Attempt to acquire these locks... 00089 foreach ( $pathsToLock as $bucket => $paths ) { 00090 // Try to acquire the locks for this bucket 00091 $res = $this->doLockingRequestAll( $bucket, $paths, $type ); 00092 if ( $res === 'cantacquire' ) { 00093 // Resources already locked by another process. 00094 // Abort and unlock everything we just locked. 00095 foreach ( $paths as $path ) { 00096 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00097 } 00098 $status->merge( $this->doUnlock( $lockedPaths, $type ) ); 00099 return $status; 00100 } elseif ( $res !== true ) { 00101 // Couldn't contact any servers for this bucket. 00102 // Abort and unlock everything we just locked. 00103 foreach ( $paths as $path ) { 00104 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00105 } 00106 $status->merge( $this->doUnlock( $lockedPaths, $type ) ); 00107 return $status; 00108 } 00109 // Record these locks as active 00110 foreach ( $paths as $path ) { 00111 $this->locksHeld[$path][$type] = 1; // locked 00112 } 00113 // Keep track of what locks were made in this attempt 00114 $lockedPaths = array_merge( $lockedPaths, $paths ); 00115 } 00116 00117 return $status; 00118 } 00119 00120 protected function doUnlock( array $paths, $type ) { 00121 $status = Status::newGood(); 00122 00123 foreach ( $paths as $path ) { 00124 if ( !isset( $this->locksHeld[$path] ) ) { 00125 $status->warning( 'lockmanager-notlocked', $path ); 00126 } elseif ( !isset( $this->locksHeld[$path][$type] ) ) { 00127 $status->warning( 'lockmanager-notlocked', $path ); 00128 } else { 00129 --$this->locksHeld[$path][$type]; 00130 if ( $this->locksHeld[$path][$type] <= 0 ) { 00131 unset( $this->locksHeld[$path][$type] ); 00132 } 00133 if ( !count( $this->locksHeld[$path] ) ) { 00134 unset( $this->locksHeld[$path] ); // no SH or EX locks left for key 00135 } 00136 } 00137 } 00138 00139 // Reference count the locks held and release locks when zero 00140 if ( !count( $this->locksHeld ) ) { 00141 $status->merge( $this->releaseLocks() ); 00142 } 00143 00144 return $status; 00145 } 00146 00155 protected function doLockingRequest( $lockSrv, array $paths, $type ) { 00156 if ( $type == self::LOCK_SH ) { // reader locks 00157 $type = 'SH'; 00158 } elseif ( $type == self::LOCK_EX ) { // writer locks 00159 $type = 'EX'; 00160 } else { 00161 return true; // ok... 00162 } 00163 00164 // Send out the command and get the response... 00165 $keys = array_unique( array_map( 'LockManager::sha1Base36', $paths ) ); 00166 $response = $this->sendCommand( $lockSrv, 'ACQUIRE', $type, $keys ); 00167 00168 return ( $response === 'ACQUIRED' ); 00169 } 00170 00180 protected function sendCommand( $lockSrv, $action, $type, $values ) { 00181 $conn = $this->getConnection( $lockSrv ); 00182 if ( !$conn ) { 00183 return false; // no connection 00184 } 00185 $authKey = $this->lockServers[$lockSrv]['authKey']; 00186 // Build of the command as a flat string... 00187 $values = implode( '|', $values ); 00188 $key = sha1( $this->session . $action . $type . $values . $authKey ); 00189 // Send out the command... 00190 if ( fwrite( $conn, "{$this->session}:$key:$action:$type:$values\n" ) === false ) { 00191 return false; 00192 } 00193 // Get the response... 00194 $response = fgets( $conn ); 00195 if ( $response === false ) { 00196 return false; 00197 } 00198 return trim( $response ); 00199 } 00200 00209 protected function doLockingRequestAll( $bucket, array $paths, $type ) { 00210 $yesVotes = 0; // locks made on trustable servers 00211 $votesLeft = count( $this->srvsByBucket[$bucket] ); // remaining peers 00212 $quorum = floor( $votesLeft/2 + 1 ); // simple majority 00213 // Get votes for each peer, in order, until we have enough... 00214 foreach ( $this->srvsByBucket[$bucket] as $lockSrv ) { 00215 // Attempt to acquire the lock on this peer 00216 if ( !$this->doLockingRequest( $lockSrv, $paths, $type ) ) { 00217 return 'cantacquire'; // vetoed; resource locked 00218 } 00219 ++$yesVotes; // success for this peer 00220 if ( $yesVotes >= $quorum ) { 00221 return true; // lock obtained 00222 } 00223 --$votesLeft; 00224 $votesNeeded = $quorum - $yesVotes; 00225 if ( $votesNeeded > $votesLeft ) { 00226 // In "trust cache" mode we don't have to meet the quorum 00227 break; // short-circuit 00228 } 00229 } 00230 // At this point, we must not have meet the quorum 00231 return 'srverrors'; // not enough votes to ensure correctness 00232 } 00233 00240 protected function getConnection( $lockSrv ) { 00241 if ( !isset( $this->conns[$lockSrv] ) ) { 00242 $cfg = $this->lockServers[$lockSrv]; 00243 wfSuppressWarnings(); 00244 $errno = $errstr = ''; 00245 $conn = fsockopen( $cfg['host'], $cfg['port'], $errno, $errstr, $this->connTimeout ); 00246 wfRestoreWarnings(); 00247 if ( $conn === false ) { 00248 return null; 00249 } 00250 $sec = floor( $this->connTimeout ); 00251 $usec = floor( ( $this->connTimeout - floor( $this->connTimeout ) ) * 1e6 ); 00252 stream_set_timeout( $conn, $sec, $usec ); 00253 $this->conns[$lockSrv] = $conn; 00254 } 00255 return $this->conns[$lockSrv]; 00256 } 00257 00263 protected function releaseLocks() { 00264 $status = Status::newGood(); 00265 foreach ( $this->conns as $lockSrv => $conn ) { 00266 $response = $this->sendCommand( $lockSrv, 'RELEASE_ALL', '', array() ); 00267 if ( $response !== 'RELEASED_ALL' ) { 00268 $status->fatal( 'lockmanager-fail-svr-release', $lockSrv ); 00269 } 00270 } 00271 return $status; 00272 } 00273 00281 protected function getBucketFromKey( $path ) { 00282 $prefix = substr( sha1( $path ), 0, 2 ); // first 2 hex chars (8 bits) 00283 return intval( base_convert( $prefix, 16, 10 ) ) % count( $this->srvsByBucket ); 00284 } 00285 00289 function __destruct() { 00290 $this->releaseLocks(); 00291 foreach ( $this->conns as $conn ) { 00292 fclose( $conn ); 00293 } 00294 } 00295 }