MediaWiki
REL1_23
|
00001 <?php 00030 class LoadBalancer { 00031 private $mServers, $mConns, $mLoads, $mGroupLoads; 00032 00034 private $mErrorConnection; 00035 private $mReadIndex, $mAllowLagged; 00036 00038 private $mWaitForPos; 00039 00040 private $mWaitTimeout; 00041 private $mLaggedSlaveMode, $mLastError = 'Unknown error'; 00042 private $mParentInfo, $mLagTimes; 00043 private $mLoadMonitorClass, $mLoadMonitor; 00044 00052 function __construct( $params ) { 00053 if ( !isset( $params['servers'] ) ) { 00054 throw new MWException( __CLASS__ . ': missing servers parameter' ); 00055 } 00056 $this->mServers = $params['servers']; 00057 00058 if ( isset( $params['waitTimeout'] ) ) { 00059 $this->mWaitTimeout = $params['waitTimeout']; 00060 } else { 00061 $this->mWaitTimeout = 10; 00062 } 00063 00064 $this->mReadIndex = -1; 00065 $this->mWriteIndex = -1; 00066 $this->mConns = array( 00067 'local' => array(), 00068 'foreignUsed' => array(), 00069 'foreignFree' => array() ); 00070 $this->mLoads = array(); 00071 $this->mWaitForPos = false; 00072 $this->mLaggedSlaveMode = false; 00073 $this->mErrorConnection = false; 00074 $this->mAllowLagged = false; 00075 00076 if ( isset( $params['loadMonitor'] ) ) { 00077 $this->mLoadMonitorClass = $params['loadMonitor']; 00078 } else { 00079 $master = reset( $params['servers'] ); 00080 if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) { 00081 $this->mLoadMonitorClass = 'LoadMonitorMySQL'; 00082 } else { 00083 $this->mLoadMonitorClass = 'LoadMonitorNull'; 00084 } 00085 } 00086 00087 foreach ( $params['servers'] as $i => $server ) { 00088 $this->mLoads[$i] = $server['load']; 00089 if ( isset( $server['groupLoads'] ) ) { 00090 foreach ( $server['groupLoads'] as $group => $ratio ) { 00091 if ( !isset( $this->mGroupLoads[$group] ) ) { 00092 $this->mGroupLoads[$group] = array(); 00093 } 00094 $this->mGroupLoads[$group][$i] = $ratio; 00095 } 00096 } 00097 } 00098 } 00099 00105 function getLoadMonitor() { 00106 if ( !isset( $this->mLoadMonitor ) ) { 00107 $class = $this->mLoadMonitorClass; 00108 $this->mLoadMonitor = new $class( $this ); 00109 } 00110 00111 return $this->mLoadMonitor; 00112 } 00113 00119 function parentInfo( $x = null ) { 00120 return wfSetVar( $this->mParentInfo, $x ); 00121 } 00122 00132 function pickRandom( $weights ) { 00133 return ArrayUtils::pickRandom( $weights ); 00134 } 00135 00141 function getRandomNonLagged( $loads, $wiki = false ) { 00142 # Unset excessively lagged servers 00143 $lags = $this->getLagTimes( $wiki ); 00144 foreach ( $lags as $i => $lag ) { 00145 if ( $i != 0 ) { 00146 if ( $lag === false ) { 00147 wfDebugLog( 'replication', "Server #$i is not replicating" ); 00148 unset( $loads[$i] ); 00149 } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) { 00150 wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)" ); 00151 unset( $loads[$i] ); 00152 } 00153 } 00154 } 00155 00156 # Find out if all the slaves with non-zero load are lagged 00157 $sum = 0; 00158 foreach ( $loads as $load ) { 00159 $sum += $load; 00160 } 00161 if ( $sum == 0 ) { 00162 # No appropriate DB servers except maybe the master and some slaves with zero load 00163 # Do NOT use the master 00164 # Instead, this function will return false, triggering read-only mode, 00165 # and a lagged slave will be used instead. 00166 return false; 00167 } 00168 00169 if ( count( $loads ) == 0 ) { 00170 return false; 00171 } 00172 00173 #wfDebugLog( 'connect', var_export( $loads, true ) ); 00174 00175 # Return a random representative of the remainder 00176 return ArrayUtils::pickRandom( $loads ); 00177 } 00178 00190 function getReaderIndex( $group = false, $wiki = false ) { 00191 global $wgReadOnly, $wgDBtype; 00192 00193 # @todo FIXME: For now, only go through all this for mysql databases 00194 if ( $wgDBtype != 'mysql' ) { 00195 return $this->getWriterIndex(); 00196 } 00197 00198 if ( count( $this->mServers ) == 1 ) { 00199 # Skip the load balancing if there's only one server 00200 return 0; 00201 } elseif ( $group === false && $this->mReadIndex >= 0 ) { 00202 # Shortcut if generic reader exists already 00203 return $this->mReadIndex; 00204 } 00205 00206 $section = new ProfileSection( __METHOD__ ); 00207 00208 # Find the relevant load array 00209 if ( $group !== false ) { 00210 if ( isset( $this->mGroupLoads[$group] ) ) { 00211 $nonErrorLoads = $this->mGroupLoads[$group]; 00212 } else { 00213 # No loads for this group, return false and the caller can use some other group 00214 wfDebug( __METHOD__ . ": no loads for group $group\n" ); 00215 00216 return false; 00217 } 00218 } else { 00219 $nonErrorLoads = $this->mLoads; 00220 } 00221 00222 if ( !count( $nonErrorLoads ) ) { 00223 throw new MWException( "Empty server array given to LoadBalancer" ); 00224 } 00225 00226 # Scale the configured load ratios according to the dynamic load (if the load monitor supports it) 00227 $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki ); 00228 00229 $laggedSlaveMode = false; 00230 00231 # No server found yet 00232 $i = false; 00233 # First try quickly looking through the available servers for a server that 00234 # meets our criteria 00235 $currentLoads = $nonErrorLoads; 00236 while ( count( $currentLoads ) ) { 00237 if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) { 00238 $i = ArrayUtils::pickRandom( $currentLoads ); 00239 } else { 00240 $i = $this->getRandomNonLagged( $currentLoads, $wiki ); 00241 if ( $i === false && count( $currentLoads ) != 0 ) { 00242 # All slaves lagged. Switch to read-only mode 00243 wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode" ); 00244 $wgReadOnly = 'The database has been automatically locked ' . 00245 'while the slave database servers catch up to the master'; 00246 $i = ArrayUtils::pickRandom( $currentLoads ); 00247 $laggedSlaveMode = true; 00248 } 00249 } 00250 00251 if ( $i === false ) { 00252 # pickRandom() returned false 00253 # This is permanent and means the configuration or the load monitor 00254 # wants us to return false. 00255 wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" ); 00256 00257 return false; 00258 } 00259 00260 wfDebugLog( 'connect', __METHOD__ . 00261 ": Using reader #$i: {$this->mServers[$i]['host']}..." ); 00262 00263 $conn = $this->openConnection( $i, $wiki ); 00264 if ( !$conn ) { 00265 wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" ); 00266 unset( $nonErrorLoads[$i] ); 00267 unset( $currentLoads[$i] ); 00268 $i = false; 00269 continue; 00270 } 00271 00272 // Decrement reference counter, we are finished with this connection. 00273 // It will be incremented for the caller later. 00274 if ( $wiki !== false ) { 00275 $this->reuseConnection( $conn ); 00276 } 00277 00278 # Return this server 00279 break; 00280 } 00281 00282 # If all servers were down, quit now 00283 if ( !count( $nonErrorLoads ) ) { 00284 wfDebugLog( 'connect', "All servers down" ); 00285 } 00286 00287 if ( $i !== false ) { 00288 # Slave connection successful 00289 # Wait for the session master pos for a short time 00290 if ( $this->mWaitForPos && $i > 0 ) { 00291 if ( !$this->doWait( $i ) ) { 00292 $this->mServers[$i]['slave pos'] = $conn->getSlavePos(); 00293 } 00294 } 00295 if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group !== false ) { 00296 $this->mReadIndex = $i; 00297 } 00298 } 00299 00300 return $i; 00301 } 00302 00308 function sleep( $t ) { 00309 wfProfileIn( __METHOD__ ); 00310 wfDebug( __METHOD__ . ": waiting $t us\n" ); 00311 usleep( $t ); 00312 wfProfileOut( __METHOD__ ); 00313 00314 return $t; 00315 } 00316 00323 public function waitFor( $pos ) { 00324 wfProfileIn( __METHOD__ ); 00325 $this->mWaitForPos = $pos; 00326 $i = $this->mReadIndex; 00327 00328 if ( $i > 0 ) { 00329 if ( !$this->doWait( $i ) ) { 00330 $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos(); 00331 $this->mLaggedSlaveMode = true; 00332 } 00333 } 00334 wfProfileOut( __METHOD__ ); 00335 } 00336 00341 public function waitForAll( $pos ) { 00342 wfProfileIn( __METHOD__ ); 00343 $this->mWaitForPos = $pos; 00344 $serverCount = count( $this->mServers ); 00345 for ( $i = 1; $i < $serverCount; $i++ ) { 00346 if ( $this->mLoads[$i] > 0 ) { 00347 $this->doWait( $i, true ); 00348 } 00349 } 00350 wfProfileOut( __METHOD__ ); 00351 } 00352 00360 function getAnyOpenConnection( $i ) { 00361 foreach ( $this->mConns as $conns ) { 00362 if ( !empty( $conns[$i] ) ) { 00363 return reset( $conns[$i] ); 00364 } 00365 } 00366 00367 return false; 00368 } 00369 00376 protected function doWait( $index, $open = false ) { 00377 # Find a connection to wait on 00378 $conn = $this->getAnyOpenConnection( $index ); 00379 if ( !$conn ) { 00380 if ( !$open ) { 00381 wfDebug( __METHOD__ . ": no connection open\n" ); 00382 00383 return false; 00384 } else { 00385 $conn = $this->openConnection( $index, '' ); 00386 if ( !$conn ) { 00387 wfDebug( __METHOD__ . ": failed to open connection\n" ); 00388 00389 return false; 00390 } 00391 } 00392 } 00393 00394 wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" ); 00395 $result = $conn->masterPosWait( $this->mWaitForPos, $this->mWaitTimeout ); 00396 00397 if ( $result == -1 || is_null( $result ) ) { 00398 # Timed out waiting for slave, use master instead 00399 wfDebug( __METHOD__ . ": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); 00400 00401 return false; 00402 } else { 00403 wfDebug( __METHOD__ . ": Done\n" ); 00404 00405 return true; 00406 } 00407 } 00408 00420 public function &getConnection( $i, $groups = array(), $wiki = false ) { 00421 wfProfileIn( __METHOD__ ); 00422 00423 if ( $i == DB_LAST ) { 00424 wfProfileOut( __METHOD__ ); 00425 throw new MWException( 'Attempt to call ' . __METHOD__ . 00426 ' with deprecated server index DB_LAST' ); 00427 } elseif ( $i === null || $i === false ) { 00428 wfProfileOut( __METHOD__ ); 00429 throw new MWException( 'Attempt to call ' . __METHOD__ . 00430 ' with invalid server index' ); 00431 } 00432 00433 if ( $wiki === wfWikiID() ) { 00434 $wiki = false; 00435 } 00436 00437 # Query groups 00438 if ( $i == DB_MASTER ) { 00439 $i = $this->getWriterIndex(); 00440 } elseif ( !is_array( $groups ) ) { 00441 $groupIndex = $this->getReaderIndex( $groups, $wiki ); 00442 if ( $groupIndex !== false ) { 00443 $serverName = $this->getServerName( $groupIndex ); 00444 wfDebug( __METHOD__ . ": using server $serverName for group $groups\n" ); 00445 $i = $groupIndex; 00446 } 00447 } else { 00448 foreach ( $groups as $group ) { 00449 $groupIndex = $this->getReaderIndex( $group, $wiki ); 00450 if ( $groupIndex !== false ) { 00451 $serverName = $this->getServerName( $groupIndex ); 00452 wfDebug( __METHOD__ . ": using server $serverName for group $group\n" ); 00453 $i = $groupIndex; 00454 break; 00455 } 00456 } 00457 } 00458 00459 # Operation-based index 00460 if ( $i == DB_SLAVE ) { 00461 $this->mLastError = 'Unknown error'; // reset error string 00462 $i = $this->getReaderIndex( false, $wiki ); 00463 # Couldn't find a working server in getReaderIndex()? 00464 if ( $i === false ) { 00465 $this->mLastError = 'No working slave server: ' . $this->mLastError; 00466 wfProfileOut( __METHOD__ ); 00467 00468 return $this->reportConnectionError(); 00469 } 00470 } 00471 00472 # Now we have an explicit index into the servers array 00473 $conn = $this->openConnection( $i, $wiki ); 00474 if ( !$conn ) { 00475 wfProfileOut( __METHOD__ ); 00476 00477 return $this->reportConnectionError(); 00478 } 00479 00480 wfProfileOut( __METHOD__ ); 00481 00482 return $conn; 00483 } 00484 00493 public function reuseConnection( $conn ) { 00494 $serverIndex = $conn->getLBInfo( 'serverIndex' ); 00495 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00496 if ( $serverIndex === null || $refCount === null ) { 00497 wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" ); 00498 00510 return; 00511 } 00512 00513 $dbName = $conn->getDBname(); 00514 $prefix = $conn->tablePrefix(); 00515 if ( strval( $prefix ) !== '' ) { 00516 $wiki = "$dbName-$prefix"; 00517 } else { 00518 $wiki = $dbName; 00519 } 00520 if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) { 00521 throw new MWException( __METHOD__ . ": connection not found, has " . 00522 "the connection been freed already?" ); 00523 } 00524 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); 00525 if ( $refCount <= 0 ) { 00526 $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn; 00527 unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] ); 00528 wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" ); 00529 } else { 00530 wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" ); 00531 } 00532 } 00533 00546 public function getConnectionRef( $db, $groups = array(), $wiki = false ) { 00547 return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) ); 00548 } 00549 00562 public function getLazyConnectionRef( $db, $groups = array(), $wiki = false ) { 00563 return new DBConnRef( $this, array( $db, $groups, $wiki ) ); 00564 } 00565 00580 function openConnection( $i, $wiki = false ) { 00581 wfProfileIn( __METHOD__ ); 00582 if ( $wiki !== false ) { 00583 $conn = $this->openForeignConnection( $i, $wiki ); 00584 wfProfileOut( __METHOD__ ); 00585 00586 return $conn; 00587 } 00588 if ( isset( $this->mConns['local'][$i][0] ) ) { 00589 $conn = $this->mConns['local'][$i][0]; 00590 } else { 00591 $server = $this->mServers[$i]; 00592 $server['serverIndex'] = $i; 00593 $conn = $this->reallyOpenConnection( $server, false ); 00594 if ( $conn->isOpen() ) { 00595 wfDebug( "Connected to database $i at {$this->mServers[$i]['host']}\n" ); 00596 $this->mConns['local'][$i][0] = $conn; 00597 } else { 00598 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" ); 00599 $this->mErrorConnection = $conn; 00600 $conn = false; 00601 } 00602 } 00603 wfProfileOut( __METHOD__ ); 00604 00605 return $conn; 00606 } 00607 00626 function openForeignConnection( $i, $wiki ) { 00627 wfProfileIn( __METHOD__ ); 00628 list( $dbName, $prefix ) = wfSplitWikiID( $wiki ); 00629 if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) { 00630 // Reuse an already-used connection 00631 $conn = $this->mConns['foreignUsed'][$i][$wiki]; 00632 wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" ); 00633 } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) { 00634 // Reuse a free connection for the same wiki 00635 $conn = $this->mConns['foreignFree'][$i][$wiki]; 00636 unset( $this->mConns['foreignFree'][$i][$wiki] ); 00637 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00638 wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" ); 00639 } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) { 00640 // Reuse a connection from another wiki 00641 $conn = reset( $this->mConns['foreignFree'][$i] ); 00642 $oldWiki = key( $this->mConns['foreignFree'][$i] ); 00643 00644 if ( !$conn->selectDB( $dbName ) ) { 00645 $this->mLastError = "Error selecting database $dbName on server " . 00646 $conn->getServer() . " from client host " . wfHostname() . "\n"; 00647 $this->mErrorConnection = $conn; 00648 $conn = false; 00649 } else { 00650 $conn->tablePrefix( $prefix ); 00651 unset( $this->mConns['foreignFree'][$i][$oldWiki] ); 00652 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00653 wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" ); 00654 } 00655 } else { 00656 // Open a new connection 00657 $server = $this->mServers[$i]; 00658 $server['serverIndex'] = $i; 00659 $server['foreignPoolRefCount'] = 0; 00660 $server['foreign'] = true; 00661 $conn = $this->reallyOpenConnection( $server, $dbName ); 00662 if ( !$conn->isOpen() ) { 00663 wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" ); 00664 $this->mErrorConnection = $conn; 00665 $conn = false; 00666 } else { 00667 $conn->tablePrefix( $prefix ); 00668 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00669 wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" ); 00670 } 00671 } 00672 00673 // Increment reference count 00674 if ( $conn ) { 00675 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00676 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 ); 00677 } 00678 wfProfileOut( __METHOD__ ); 00679 00680 return $conn; 00681 } 00682 00690 function isOpen( $index ) { 00691 if ( !is_integer( $index ) ) { 00692 return false; 00693 } 00694 00695 return (bool)$this->getAnyOpenConnection( $index ); 00696 } 00697 00708 function reallyOpenConnection( $server, $dbNameOverride = false ) { 00709 if ( !is_array( $server ) ) { 00710 throw new MWException( 'You must update your load-balancing configuration. ' . 00711 'See DefaultSettings.php entry for $wgDBservers.' ); 00712 } 00713 00714 if ( $dbNameOverride !== false ) { 00715 $server['dbname'] = $dbNameOverride; 00716 } 00717 00718 # Create object 00719 try { 00720 $db = DatabaseBase::factory( $server['type'], $server ); 00721 } catch ( DBConnectionError $e ) { 00722 // FIXME: This is probably the ugliest thing I have ever done to 00723 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS 00724 $db = $e->db; 00725 } 00726 00727 $db->setLBInfo( $server ); 00728 if ( isset( $server['fakeSlaveLag'] ) ) { 00729 $db->setFakeSlaveLag( $server['fakeSlaveLag'] ); 00730 } 00731 if ( isset( $server['fakeMaster'] ) ) { 00732 $db->setFakeMaster( true ); 00733 } 00734 00735 return $db; 00736 } 00737 00742 private function reportConnectionError() { 00743 $conn = $this->mErrorConnection; // The connection which caused the error 00744 00745 if ( !is_object( $conn ) ) { 00746 // No last connection, probably due to all servers being too busy 00747 wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}" ); 00748 00749 // If all servers were busy, mLastError will contain something sensible 00750 throw new DBConnectionError( null, $this->mLastError ); 00751 } else { 00752 $server = $conn->getProperty( 'mServer' ); 00753 wfLogDBError( "Connection error: {$this->mLastError} ({$server})" ); 00754 $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); // throws DBConnectionError 00755 } 00756 00757 return false; /* not reached */ 00758 } 00759 00763 function getWriterIndex() { 00764 return 0; 00765 } 00766 00773 function haveIndex( $i ) { 00774 return array_key_exists( $i, $this->mServers ); 00775 } 00776 00783 function isNonZeroLoad( $i ) { 00784 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0; 00785 } 00786 00792 function getServerCount() { 00793 return count( $this->mServers ); 00794 } 00795 00802 function getServerName( $i ) { 00803 if ( isset( $this->mServers[$i]['hostName'] ) ) { 00804 return $this->mServers[$i]['hostName']; 00805 } elseif ( isset( $this->mServers[$i]['host'] ) ) { 00806 return $this->mServers[$i]['host']; 00807 } else { 00808 return ''; 00809 } 00810 } 00811 00817 function getServerInfo( $i ) { 00818 if ( isset( $this->mServers[$i] ) ) { 00819 return $this->mServers[$i]; 00820 } else { 00821 return false; 00822 } 00823 } 00824 00831 function setServerInfo( $i, $serverInfo ) { 00832 $this->mServers[$i] = $serverInfo; 00833 } 00834 00839 function getMasterPos() { 00840 # If this entire request was served from a slave without opening a connection to the 00841 # master (however unlikely that may be), then we can fetch the position from the slave. 00842 $masterConn = $this->getAnyOpenConnection( 0 ); 00843 if ( !$masterConn ) { 00844 $serverCount = count( $this->mServers ); 00845 for ( $i = 1; $i < $serverCount; $i++ ) { 00846 $conn = $this->getAnyOpenConnection( $i ); 00847 if ( $conn ) { 00848 wfDebug( "Master pos fetched from slave\n" ); 00849 00850 return $conn->getSlavePos(); 00851 } 00852 } 00853 } else { 00854 wfDebug( "Master pos fetched from master\n" ); 00855 00856 return $masterConn->getMasterPos(); 00857 } 00858 00859 return false; 00860 } 00861 00865 function closeAll() { 00866 foreach ( $this->mConns as $conns2 ) { 00867 foreach ( $conns2 as $conns3 ) { 00869 foreach ( $conns3 as $conn ) { 00870 $conn->close(); 00871 } 00872 } 00873 } 00874 $this->mConns = array( 00875 'local' => array(), 00876 'foreignFree' => array(), 00877 'foreignUsed' => array(), 00878 ); 00879 } 00880 00887 function closeConnecton( $conn ) { 00888 wfDeprecated( __METHOD__, '1.18' ); 00889 $this->closeConnection( $conn ); 00890 } 00891 00898 function closeConnection( $conn ) { 00899 $done = false; 00900 foreach ( $this->mConns as $i1 => $conns2 ) { 00901 foreach ( $conns2 as $i2 => $conns3 ) { 00902 foreach ( $conns3 as $i3 => $candidateConn ) { 00903 if ( $conn === $candidateConn ) { 00904 $conn->close(); 00905 unset( $this->mConns[$i1][$i2][$i3] ); 00906 $done = true; 00907 break; 00908 } 00909 } 00910 } 00911 } 00912 if ( !$done ) { 00913 $conn->close(); 00914 } 00915 } 00916 00920 function commitAll() { 00921 foreach ( $this->mConns as $conns2 ) { 00922 foreach ( $conns2 as $conns3 ) { 00924 foreach ( $conns3 as $conn ) { 00925 if ( $conn->trxLevel() ) { 00926 $conn->commit( __METHOD__, 'flush' ); 00927 } 00928 } 00929 } 00930 } 00931 } 00932 00936 function commitMasterChanges() { 00937 // Always 0, but who knows.. :) 00938 $masterIndex = $this->getWriterIndex(); 00939 foreach ( $this->mConns as $conns2 ) { 00940 if ( empty( $conns2[$masterIndex] ) ) { 00941 continue; 00942 } 00944 foreach ( $conns2[$masterIndex] as $conn ) { 00945 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 00946 $conn->commit( __METHOD__, 'flush' ); 00947 } 00948 } 00949 } 00950 } 00951 00956 function rollbackMasterChanges() { 00957 // Always 0, but who knows.. :) 00958 $masterIndex = $this->getWriterIndex(); 00959 foreach ( $this->mConns as $conns2 ) { 00960 if ( empty( $conns2[$masterIndex] ) ) { 00961 continue; 00962 } 00964 foreach ( $conns2[$masterIndex] as $conn ) { 00965 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 00966 $conn->rollback( __METHOD__, 'flush' ); 00967 } 00968 } 00969 } 00970 } 00971 00978 function hasMasterChanges() { 00979 // Always 0, but who knows.. :) 00980 $masterIndex = $this->getWriterIndex(); 00981 foreach ( $this->mConns as $conns2 ) { 00982 if ( empty( $conns2[$masterIndex] ) ) { 00983 continue; 00984 } 00986 foreach ( $conns2[$masterIndex] as $conn ) { 00987 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 00988 return true; 00989 } 00990 } 00991 } 00992 return false; 00993 } 00994 00999 function waitTimeout( $value = null ) { 01000 return wfSetVar( $this->mWaitTimeout, $value ); 01001 } 01002 01006 function getLaggedSlaveMode() { 01007 return $this->mLaggedSlaveMode; 01008 } 01009 01015 function allowLagged( $mode = null ) { 01016 if ( $mode === null ) { 01017 return $this->mAllowLagged; 01018 } 01019 $this->mAllowLagged = $mode; 01020 01021 return $this->mAllowLagged; 01022 } 01023 01027 function pingAll() { 01028 $success = true; 01029 foreach ( $this->mConns as $conns2 ) { 01030 foreach ( $conns2 as $conns3 ) { 01032 foreach ( $conns3 as $conn ) { 01033 if ( !$conn->ping() ) { 01034 $success = false; 01035 } 01036 } 01037 } 01038 } 01039 01040 return $success; 01041 } 01042 01048 function forEachOpenConnection( $callback, $params = array() ) { 01049 foreach ( $this->mConns as $conns2 ) { 01050 foreach ( $conns2 as $conns3 ) { 01051 foreach ( $conns3 as $conn ) { 01052 $mergedParams = array_merge( array( $conn ), $params ); 01053 call_user_func_array( $callback, $mergedParams ); 01054 } 01055 } 01056 } 01057 } 01058 01068 function getMaxLag( $wiki = false ) { 01069 $maxLag = -1; 01070 $host = ''; 01071 $maxIndex = 0; 01072 if ( $this->getServerCount() > 1 ) { // no replication = no lag 01073 foreach ( $this->mServers as $i => $conn ) { 01074 $conn = false; 01075 if ( $wiki === false ) { 01076 $conn = $this->getAnyOpenConnection( $i ); 01077 } 01078 if ( !$conn ) { 01079 $conn = $this->openConnection( $i, $wiki ); 01080 } 01081 if ( !$conn ) { 01082 continue; 01083 } 01084 $lag = $conn->getLag(); 01085 if ( $lag > $maxLag ) { 01086 $maxLag = $lag; 01087 $host = $this->mServers[$i]['host']; 01088 $maxIndex = $i; 01089 } 01090 } 01091 } 01092 01093 return array( $host, $maxLag, $maxIndex ); 01094 } 01095 01103 function getLagTimes( $wiki = false ) { 01104 # Try process cache 01105 if ( isset( $this->mLagTimes ) ) { 01106 return $this->mLagTimes; 01107 } 01108 if ( $this->getServerCount() == 1 ) { 01109 # No replication 01110 $this->mLagTimes = array( 0 => 0 ); 01111 } else { 01112 # Send the request to the load monitor 01113 $this->mLagTimes = $this->getLoadMonitor()->getLagTimes( 01114 array_keys( $this->mServers ), $wiki ); 01115 } 01116 01117 return $this->mLagTimes; 01118 } 01119 01134 function safeGetLag( $conn ) { 01135 if ( $this->getServerCount() == 1 ) { 01136 return 0; 01137 } else { 01138 return $conn->getLag(); 01139 } 01140 } 01141 01145 function clearLagTimeCache() { 01146 $this->mLagTimes = null; 01147 } 01148 } 01149 01157 class DBConnRef implements IDatabase { 01159 protected $lb; 01160 01162 protected $conn; 01163 01165 protected $params; 01166 01171 public function __construct( LoadBalancer $lb, $conn ) { 01172 $this->lb = $lb; 01173 if ( $conn instanceof DatabaseBase ) { 01174 $this->conn = $conn; 01175 } else { 01176 $this->params = $conn; 01177 } 01178 } 01179 01180 public function __call( $name, $arguments ) { 01181 if ( $this->conn === null ) { 01182 list( $db, $groups, $wiki ) = $this->params; 01183 $this->conn = $this->lb->getConnection( $db, $groups, $wiki ); 01184 } 01185 01186 return call_user_func_array( array( $this->conn, $name ), $arguments ); 01187 } 01188 01189 function __destruct() { 01190 if ( $this->conn !== null ) { 01191 $this->lb->reuseConnection( $this->conn ); 01192 } 01193 } 01194 }