MediaWiki
REL1_19
|
00001 <?php 00002 00019 class DBLockManager extends LockManager { 00021 protected $dbServers; // (DB name => server config array) 00023 protected $dbsByBucket; // (bucket index => (ldb1, ldb2, ...)) 00025 protected $statusCache; 00026 00027 protected $lockExpiry; // integer number of seconds 00028 protected $safeDelay; // integer number of seconds 00029 00030 protected $session = 0; // random integer 00032 protected $conns = array(); 00033 00057 public function __construct( array $config ) { 00058 $this->dbServers = isset( $config['dbServers'] ) 00059 ? $config['dbServers'] 00060 : array(); // likely just using 'localDBMaster' 00061 // Sanitize dbsByBucket config to prevent PHP errors 00062 $this->dbsByBucket = array_filter( $config['dbsByBucket'], 'is_array' ); 00063 $this->dbsByBucket = array_values( $this->dbsByBucket ); // consecutive 00064 00065 if ( isset( $config['lockExpiry'] ) ) { 00066 $this->lockExpiry = $config['lockExpiry']; 00067 } else { 00068 $met = ini_get( 'max_execution_time' ); 00069 $this->lockExpiry = $met ? $met : 60; // use some sane amount if 0 00070 } 00071 $this->safeDelay = ( $this->lockExpiry <= 0 ) 00072 ? 60 // pick a safe-ish number to match DB timeout default 00073 : $this->lockExpiry; // cover worst case 00074 00075 foreach ( $this->dbsByBucket as $bucket ) { 00076 if ( count( $bucket ) > 1 ) { 00077 // Tracks peers that couldn't be queried recently to avoid lengthy 00078 // connection timeouts. This is useless if each bucket has one peer. 00079 $this->statusCache = wfGetMainCache(); 00080 break; 00081 } 00082 } 00083 00084 $this->session = ''; 00085 for ( $i = 0; $i < 5; $i++ ) { 00086 $this->session .= mt_rand( 0, 2147483647 ); 00087 } 00088 $this->session = wfBaseConvert( sha1( $this->session ), 16, 36, 31 ); 00089 } 00090 00094 protected function doLock( array $paths, $type ) { 00095 $status = Status::newGood(); 00096 00097 $pathsToLock = array(); 00098 // Get locks that need to be acquired (buckets => locks)... 00099 foreach ( $paths as $path ) { 00100 if ( isset( $this->locksHeld[$path][$type] ) ) { 00101 ++$this->locksHeld[$path][$type]; 00102 } elseif ( isset( $this->locksHeld[$path][self::LOCK_EX] ) ) { 00103 $this->locksHeld[$path][$type] = 1; 00104 } else { 00105 $bucket = $this->getBucketFromKey( $path ); 00106 $pathsToLock[$bucket][] = $path; 00107 } 00108 } 00109 00110 $lockedPaths = array(); // files locked in this attempt 00111 // Attempt to acquire these locks... 00112 foreach ( $pathsToLock as $bucket => $paths ) { 00113 // Try to acquire the locks for this bucket 00114 $res = $this->doLockingQueryAll( $bucket, $paths, $type ); 00115 if ( $res === 'cantacquire' ) { 00116 // Resources already locked by another process. 00117 // Abort and unlock everything we just locked. 00118 foreach ( $paths as $path ) { 00119 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00120 } 00121 $status->merge( $this->doUnlock( $lockedPaths, $type ) ); 00122 return $status; 00123 } elseif ( $res !== true ) { 00124 // Couldn't contact any DBs for this bucket. 00125 // Abort and unlock everything we just locked. 00126 $status->fatal( 'lockmanager-fail-db-bucket', $bucket ); 00127 $status->merge( $this->doUnlock( $lockedPaths, $type ) ); 00128 return $status; 00129 } 00130 // Record these locks as active 00131 foreach ( $paths as $path ) { 00132 $this->locksHeld[$path][$type] = 1; // locked 00133 } 00134 // Keep track of what locks were made in this attempt 00135 $lockedPaths = array_merge( $lockedPaths, $paths ); 00136 } 00137 00138 return $status; 00139 } 00140 00144 protected function doUnlock( array $paths, $type ) { 00145 $status = Status::newGood(); 00146 00147 foreach ( $paths as $path ) { 00148 if ( !isset( $this->locksHeld[$path] ) ) { 00149 $status->warning( 'lockmanager-notlocked', $path ); 00150 } elseif ( !isset( $this->locksHeld[$path][$type] ) ) { 00151 $status->warning( 'lockmanager-notlocked', $path ); 00152 } else { 00153 --$this->locksHeld[$path][$type]; 00154 if ( $this->locksHeld[$path][$type] <= 0 ) { 00155 unset( $this->locksHeld[$path][$type] ); 00156 } 00157 if ( !count( $this->locksHeld[$path] ) ) { 00158 unset( $this->locksHeld[$path] ); // no SH or EX locks left for key 00159 } 00160 } 00161 } 00162 00163 // Reference count the locks held and COMMIT when zero 00164 if ( !count( $this->locksHeld ) ) { 00165 $status->merge( $this->finishLockTransactions() ); 00166 } 00167 00168 return $status; 00169 } 00170 00181 protected function doLockingQuery( $lockDb, array $paths, $type ) { 00182 if ( $type == self::LOCK_EX ) { // writer locks 00183 $db = $this->getConnection( $lockDb ); 00184 if ( !$db ) { 00185 return false; // bad config 00186 } 00187 $keys = array_unique( array_map( 'LockManager::sha1Base36', $paths ) ); 00188 # Build up values for INSERT clause 00189 $data = array(); 00190 foreach ( $keys as $key ) { 00191 $data[] = array( 'fle_key' => $key ); 00192 } 00193 # Wait on any existing writers and block new ones if we get in 00194 $db->insert( 'filelocks_exclusive', $data, __METHOD__ ); 00195 } 00196 return true; 00197 } 00198 00208 protected function doLockingQueryAll( $bucket, array $paths, $type ) { 00209 $yesVotes = 0; // locks made on trustable DBs 00210 $votesLeft = count( $this->dbsByBucket[$bucket] ); // remaining DBs 00211 $quorum = floor( $votesLeft/2 + 1 ); // simple majority 00212 // Get votes for each DB, in order, until we have enough... 00213 foreach ( $this->dbsByBucket[$bucket] as $lockDb ) { 00214 // Check that DB is not *known* to be down 00215 if ( $this->cacheCheckFailures( $lockDb ) ) { 00216 try { 00217 // Attempt to acquire the lock on this DB 00218 if ( !$this->doLockingQuery( $lockDb, $paths, $type ) ) { 00219 return 'cantacquire'; // vetoed; resource locked 00220 } 00221 ++$yesVotes; // success for this peer 00222 if ( $yesVotes >= $quorum ) { 00223 return true; // lock obtained 00224 } 00225 } catch ( DBConnectionError $e ) { 00226 $this->cacheRecordFailure( $lockDb ); 00227 } catch ( DBError $e ) { 00228 if ( $this->lastErrorIndicatesLocked( $lockDb ) ) { 00229 return 'cantacquire'; // vetoed; resource locked 00230 } 00231 } 00232 } 00233 --$votesLeft; 00234 $votesNeeded = $quorum - $yesVotes; 00235 if ( $votesNeeded > $votesLeft ) { 00236 // In "trust cache" mode we don't have to meet the quorum 00237 break; // short-circuit 00238 } 00239 } 00240 // At this point, we must not have meet the quorum 00241 return 'dberrors'; // not enough votes to ensure correctness 00242 } 00243 00251 protected function getConnection( $lockDb ) { 00252 if ( !isset( $this->conns[$lockDb] ) ) { 00253 $db = null; 00254 if ( $lockDb === 'localDBMaster' ) { 00255 $lb = wfGetLBFactory()->newMainLB(); 00256 $db = $lb->getConnection( DB_MASTER ); 00257 } elseif ( isset( $this->dbServers[$lockDb] ) ) { 00258 $config = $this->dbServers[$lockDb]; 00259 $db = DatabaseBase::factory( $config['type'], $config ); 00260 } 00261 if ( !$db ) { 00262 return null; // config error? 00263 } 00264 $this->conns[$lockDb] = $db; 00265 $this->conns[$lockDb]->clearFlag( DBO_TRX ); 00266 # If the connection drops, try to avoid letting the DB rollback 00267 # and release the locks before the file operations are finished. 00268 # This won't handle the case of DB server restarts however. 00269 $options = array(); 00270 if ( $this->lockExpiry > 0 ) { 00271 $options['connTimeout'] = $this->lockExpiry; 00272 } 00273 $this->conns[$lockDb]->setSessionOptions( $options ); 00274 $this->initConnection( $lockDb, $this->conns[$lockDb] ); 00275 } 00276 if ( !$this->conns[$lockDb]->trxLevel() ) { 00277 $this->conns[$lockDb]->begin(); // start transaction 00278 } 00279 return $this->conns[$lockDb]; 00280 } 00281 00290 protected function initConnection( $lockDb, DatabaseBase $db ) {} 00291 00298 protected function finishLockTransactions() { 00299 $status = Status::newGood(); 00300 foreach ( $this->conns as $lockDb => $db ) { 00301 if ( $db->trxLevel() ) { // in transaction 00302 try { 00303 $db->rollback(); // finish transaction and kill any rows 00304 } catch ( DBError $e ) { 00305 $status->fatal( 'lockmanager-fail-db-release', $lockDb ); 00306 } 00307 } 00308 } 00309 return $status; 00310 } 00311 00320 protected function lastErrorIndicatesLocked( $lockDb ) { 00321 if ( isset( $this->conns[$lockDb] ) ) { // sanity 00322 $db = $this->conns[$lockDb]; 00323 return ( $db->wasDeadlock() || $db->wasLockTimeout() ); 00324 } 00325 return false; 00326 } 00327 00335 protected function cacheCheckFailures( $lockDb ) { 00336 if ( $this->statusCache && $this->safeDelay > 0 ) { 00337 $path = $this->getMissKey( $lockDb ); 00338 $misses = $this->statusCache->get( $path ); 00339 return !$misses; 00340 } 00341 return true; 00342 } 00343 00350 protected function cacheRecordFailure( $lockDb ) { 00351 if ( $this->statusCache && $this->safeDelay > 0 ) { 00352 $path = $this->getMissKey( $lockDb ); 00353 $misses = $this->statusCache->get( $path ); 00354 if ( $misses ) { 00355 return $this->statusCache->incr( $path ); 00356 } else { 00357 return $this->statusCache->add( $path, 1, $this->safeDelay ); 00358 } 00359 } 00360 return true; 00361 } 00362 00369 protected function getMissKey( $lockDb ) { 00370 return 'lockmanager:querymisses:' . str_replace( ' ', '_', $lockDb ); 00371 } 00372 00380 protected function getBucketFromKey( $path ) { 00381 $prefix = substr( sha1( $path ), 0, 2 ); // first 2 hex chars (8 bits) 00382 return intval( base_convert( $prefix, 16, 10 ) ) % count( $this->dbsByBucket ); 00383 } 00384 00388 function __destruct() { 00389 foreach ( $this->conns as $lockDb => $db ) { 00390 if ( $db->trxLevel() ) { // in transaction 00391 try { 00392 $db->rollback(); // finish transaction and kill any rows 00393 } catch ( DBError $e ) { 00394 // oh well 00395 } 00396 } 00397 $db->close(); 00398 } 00399 } 00400 } 00401 00408 class MySqlLockManager extends DBLockManager { 00410 protected $lockTypeMap = array( 00411 self::LOCK_SH => self::LOCK_SH, 00412 self::LOCK_UW => self::LOCK_SH, 00413 self::LOCK_EX => self::LOCK_EX 00414 ); 00415 00416 protected function initConnection( $lockDb, DatabaseBase $db ) { 00417 # Let this transaction see lock rows from other transactions 00418 $db->query( "SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;" ); 00419 } 00420 00421 protected function doLockingQuery( $lockDb, array $paths, $type ) { 00422 $db = $this->getConnection( $lockDb ); 00423 if ( !$db ) { 00424 return false; 00425 } 00426 $keys = array_unique( array_map( 'LockManager::sha1Base36', $paths ) ); 00427 # Build up values for INSERT clause 00428 $data = array(); 00429 foreach ( $keys as $key ) { 00430 $data[] = array( 'fls_key' => $key, 'fls_session' => $this->session ); 00431 } 00432 # Block new writers... 00433 $db->insert( 'filelocks_shared', $data, __METHOD__, array( 'IGNORE' ) ); 00434 # Actually do the locking queries... 00435 if ( $type == self::LOCK_SH ) { // reader locks 00436 # Bail if there are any existing writers... 00437 $blocked = $db->selectField( 'filelocks_exclusive', '1', 00438 array( 'fle_key' => $keys ), 00439 __METHOD__ 00440 ); 00441 # Prospective writers that haven't yet updated filelocks_exclusive 00442 # will recheck filelocks_shared after doing so and bail due to our entry. 00443 } else { // writer locks 00444 $encSession = $db->addQuotes( $this->session ); 00445 # Bail if there are any existing writers... 00446 # The may detect readers, but the safe check for them is below. 00447 # Note: if two writers come at the same time, both bail :) 00448 $blocked = $db->selectField( 'filelocks_shared', '1', 00449 array( 'fls_key' => $keys, "fls_session != $encSession" ), 00450 __METHOD__ 00451 ); 00452 if ( !$blocked ) { 00453 # Build up values for INSERT clause 00454 $data = array(); 00455 foreach ( $keys as $key ) { 00456 $data[] = array( 'fle_key' => $key ); 00457 } 00458 # Block new readers/writers... 00459 $db->insert( 'filelocks_exclusive', $data, __METHOD__ ); 00460 # Bail if there are any existing readers... 00461 $blocked = $db->selectField( 'filelocks_shared', '1', 00462 array( 'fls_key' => $keys, "fls_session != $encSession" ), 00463 __METHOD__ 00464 ); 00465 } 00466 } 00467 return !$blocked; 00468 } 00469 }