MediaWiki
REL1_24
|
00001 <?php 00030 class LoadBalancer { 00032 private $mServers; 00034 private $mConns; 00036 private $mLoads; 00038 private $mGroupLoads; 00040 private $mAllowLagged; 00042 private $mWaitTimeout; 00043 00045 private $mParentInfo; 00047 private $mLoadMonitorClass; 00049 private $mLoadMonitor; 00050 00052 private $mErrorConnection; 00054 private $mReadIndex; 00056 private $mWaitForPos; 00058 private $mLaggedSlaveMode; 00060 private $mLastError = 'Unknown error'; 00062 private $mLagTimes; 00063 00070 function __construct( $params ) { 00071 if ( !isset( $params['servers'] ) ) { 00072 throw new MWException( __CLASS__ . ': missing servers parameter' ); 00073 } 00074 $this->mServers = $params['servers']; 00075 $this->mWaitTimeout = 10; 00076 00077 $this->mReadIndex = -1; 00078 $this->mWriteIndex = -1; 00079 $this->mConns = array( 00080 'local' => array(), 00081 'foreignUsed' => array(), 00082 'foreignFree' => array() ); 00083 $this->mLoads = array(); 00084 $this->mWaitForPos = false; 00085 $this->mLaggedSlaveMode = false; 00086 $this->mErrorConnection = false; 00087 $this->mAllowLagged = false; 00088 00089 if ( isset( $params['loadMonitor'] ) ) { 00090 $this->mLoadMonitorClass = $params['loadMonitor']; 00091 } else { 00092 $master = reset( $params['servers'] ); 00093 if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) { 00094 $this->mLoadMonitorClass = 'LoadMonitorMySQL'; 00095 } else { 00096 $this->mLoadMonitorClass = 'LoadMonitorNull'; 00097 } 00098 } 00099 00100 foreach ( $params['servers'] as $i => $server ) { 00101 $this->mLoads[$i] = $server['load']; 00102 if ( isset( $server['groupLoads'] ) ) { 00103 foreach ( $server['groupLoads'] as $group => $ratio ) { 00104 if ( !isset( $this->mGroupLoads[$group] ) ) { 00105 $this->mGroupLoads[$group] = array(); 00106 } 00107 $this->mGroupLoads[$group][$i] = $ratio; 00108 } 00109 } 00110 } 00111 } 00112 00118 function getLoadMonitor() { 00119 if ( !isset( $this->mLoadMonitor ) ) { 00120 $class = $this->mLoadMonitorClass; 00121 $this->mLoadMonitor = new $class( $this ); 00122 } 00123 00124 return $this->mLoadMonitor; 00125 } 00126 00132 function parentInfo( $x = null ) { 00133 return wfSetVar( $this->mParentInfo, $x ); 00134 } 00135 00145 function pickRandom( $weights ) { 00146 return ArrayUtils::pickRandom( $weights ); 00147 } 00148 00154 function getRandomNonLagged( $loads, $wiki = false ) { 00155 # Unset excessively lagged servers 00156 $lags = $this->getLagTimes( $wiki ); 00157 foreach ( $lags as $i => $lag ) { 00158 if ( $i != 0 ) { 00159 if ( $lag === false ) { 00160 wfDebugLog( 'replication', "Server #$i is not replicating" ); 00161 unset( $loads[$i] ); 00162 } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) { 00163 wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)" ); 00164 unset( $loads[$i] ); 00165 } 00166 } 00167 } 00168 00169 # Find out if all the slaves with non-zero load are lagged 00170 $sum = 0; 00171 foreach ( $loads as $load ) { 00172 $sum += $load; 00173 } 00174 if ( $sum == 0 ) { 00175 # No appropriate DB servers except maybe the master and some slaves with zero load 00176 # Do NOT use the master 00177 # Instead, this function will return false, triggering read-only mode, 00178 # and a lagged slave will be used instead. 00179 return false; 00180 } 00181 00182 if ( count( $loads ) == 0 ) { 00183 return false; 00184 } 00185 00186 #wfDebugLog( 'connect', var_export( $loads, true ) ); 00187 00188 # Return a random representative of the remainder 00189 return ArrayUtils::pickRandom( $loads ); 00190 } 00191 00203 function getReaderIndex( $group = false, $wiki = false ) { 00204 global $wgReadOnly, $wgDBtype; 00205 00206 # @todo FIXME: For now, only go through all this for mysql databases 00207 if ( $wgDBtype != 'mysql' ) { 00208 return $this->getWriterIndex(); 00209 } 00210 00211 if ( count( $this->mServers ) == 1 ) { 00212 # Skip the load balancing if there's only one server 00213 return 0; 00214 } elseif ( $group === false && $this->mReadIndex >= 0 ) { 00215 # Shortcut if generic reader exists already 00216 return $this->mReadIndex; 00217 } 00218 00219 $section = new ProfileSection( __METHOD__ ); 00220 00221 # Find the relevant load array 00222 if ( $group !== false ) { 00223 if ( isset( $this->mGroupLoads[$group] ) ) { 00224 $nonErrorLoads = $this->mGroupLoads[$group]; 00225 } else { 00226 # No loads for this group, return false and the caller can use some other group 00227 wfDebug( __METHOD__ . ": no loads for group $group\n" ); 00228 00229 return false; 00230 } 00231 } else { 00232 $nonErrorLoads = $this->mLoads; 00233 } 00234 00235 if ( !count( $nonErrorLoads ) ) { 00236 throw new MWException( "Empty server array given to LoadBalancer" ); 00237 } 00238 00239 # Scale the configured load ratios according to the dynamic load (if the load monitor supports it) 00240 $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki ); 00241 00242 $laggedSlaveMode = false; 00243 00244 # No server found yet 00245 $i = false; 00246 # First try quickly looking through the available servers for a server that 00247 # meets our criteria 00248 $currentLoads = $nonErrorLoads; 00249 while ( count( $currentLoads ) ) { 00250 if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) { 00251 $i = ArrayUtils::pickRandom( $currentLoads ); 00252 } else { 00253 $i = $this->getRandomNonLagged( $currentLoads, $wiki ); 00254 if ( $i === false && count( $currentLoads ) != 0 ) { 00255 # All slaves lagged. Switch to read-only mode 00256 wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode" ); 00257 $wgReadOnly = 'The database has been automatically locked ' . 00258 'while the slave database servers catch up to the master'; 00259 $i = ArrayUtils::pickRandom( $currentLoads ); 00260 $laggedSlaveMode = true; 00261 } 00262 } 00263 00264 if ( $i === false ) { 00265 # pickRandom() returned false 00266 # This is permanent and means the configuration or the load monitor 00267 # wants us to return false. 00268 wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" ); 00269 00270 return false; 00271 } 00272 00273 wfDebugLog( 'connect', __METHOD__ . 00274 ": Using reader #$i: {$this->mServers[$i]['host']}..." ); 00275 00276 $conn = $this->openConnection( $i, $wiki ); 00277 if ( !$conn ) { 00278 wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" ); 00279 unset( $nonErrorLoads[$i] ); 00280 unset( $currentLoads[$i] ); 00281 $i = false; 00282 continue; 00283 } 00284 00285 // Decrement reference counter, we are finished with this connection. 00286 // It will be incremented for the caller later. 00287 if ( $wiki !== false ) { 00288 $this->reuseConnection( $conn ); 00289 } 00290 00291 # Return this server 00292 break; 00293 } 00294 00295 # If all servers were down, quit now 00296 if ( !count( $nonErrorLoads ) ) { 00297 wfDebugLog( 'connect', "All servers down" ); 00298 } 00299 00300 if ( $i !== false ) { 00301 # Slave connection successful 00302 # Wait for the session master pos for a short time 00303 if ( $this->mWaitForPos && $i > 0 ) { 00304 if ( !$this->doWait( $i ) ) { 00305 $this->mServers[$i]['slave pos'] = $conn->getSlavePos(); 00306 } 00307 } 00308 if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group !== false ) { 00309 $this->mReadIndex = $i; 00310 } 00311 } 00312 00313 return $i; 00314 } 00315 00321 function sleep( $t ) { 00322 wfProfileIn( __METHOD__ ); 00323 wfDebug( __METHOD__ . ": waiting $t us\n" ); 00324 usleep( $t ); 00325 wfProfileOut( __METHOD__ ); 00326 00327 return $t; 00328 } 00329 00336 public function waitFor( $pos ) { 00337 wfProfileIn( __METHOD__ ); 00338 $this->mWaitForPos = $pos; 00339 $i = $this->mReadIndex; 00340 00341 if ( $i > 0 ) { 00342 if ( !$this->doWait( $i ) ) { 00343 $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos(); 00344 $this->mLaggedSlaveMode = true; 00345 } 00346 } 00347 wfProfileOut( __METHOD__ ); 00348 } 00349 00356 public function waitForAll( $pos, $timeout = null ) { 00357 wfProfileIn( __METHOD__ ); 00358 $this->mWaitForPos = $pos; 00359 $serverCount = count( $this->mServers ); 00360 00361 $ok = true; 00362 for ( $i = 1; $i < $serverCount; $i++ ) { 00363 if ( $this->mLoads[$i] > 0 ) { 00364 $ok = $this->doWait( $i, true, $timeout ) && $ok; 00365 } 00366 } 00367 wfProfileOut( __METHOD__ ); 00368 00369 return $ok; 00370 } 00371 00379 function getAnyOpenConnection( $i ) { 00380 foreach ( $this->mConns as $conns ) { 00381 if ( !empty( $conns[$i] ) ) { 00382 return reset( $conns[$i] ); 00383 } 00384 } 00385 00386 return false; 00387 } 00388 00396 protected function doWait( $index, $open = false, $timeout = null ) { 00397 # Find a connection to wait on 00398 $conn = $this->getAnyOpenConnection( $index ); 00399 if ( !$conn ) { 00400 if ( !$open ) { 00401 wfDebug( __METHOD__ . ": no connection open\n" ); 00402 00403 return false; 00404 } else { 00405 $conn = $this->openConnection( $index, '' ); 00406 if ( !$conn ) { 00407 wfDebug( __METHOD__ . ": failed to open connection\n" ); 00408 00409 return false; 00410 } 00411 } 00412 } 00413 00414 wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" ); 00415 $timeout = $timeout ?: $this->mWaitTimeout; 00416 $result = $conn->masterPosWait( $this->mWaitForPos, $timeout ); 00417 00418 if ( $result == -1 || is_null( $result ) ) { 00419 # Timed out waiting for slave, use master instead 00420 wfDebug( __METHOD__ . ": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); 00421 00422 return false; 00423 } else { 00424 wfDebug( __METHOD__ . ": Done\n" ); 00425 00426 return true; 00427 } 00428 } 00429 00441 public function &getConnection( $i, $groups = array(), $wiki = false ) { 00442 wfProfileIn( __METHOD__ ); 00443 00444 if ( $i === null || $i === false ) { 00445 wfProfileOut( __METHOD__ ); 00446 throw new MWException( 'Attempt to call ' . __METHOD__ . 00447 ' with invalid server index' ); 00448 } 00449 00450 if ( $wiki === wfWikiID() ) { 00451 $wiki = false; 00452 } 00453 00454 # Query groups 00455 if ( $i == DB_MASTER ) { 00456 $i = $this->getWriterIndex(); 00457 } elseif ( !is_array( $groups ) ) { 00458 $groupIndex = $this->getReaderIndex( $groups, $wiki ); 00459 if ( $groupIndex !== false ) { 00460 $serverName = $this->getServerName( $groupIndex ); 00461 wfDebug( __METHOD__ . ": using server $serverName for group $groups\n" ); 00462 $i = $groupIndex; 00463 } 00464 } else { 00465 foreach ( $groups as $group ) { 00466 $groupIndex = $this->getReaderIndex( $group, $wiki ); 00467 if ( $groupIndex !== false ) { 00468 $serverName = $this->getServerName( $groupIndex ); 00469 wfDebug( __METHOD__ . ": using server $serverName for group $group\n" ); 00470 $i = $groupIndex; 00471 break; 00472 } 00473 } 00474 } 00475 00476 # Operation-based index 00477 if ( $i == DB_SLAVE ) { 00478 $this->mLastError = 'Unknown error'; // reset error string 00479 $i = $this->getReaderIndex( false, $wiki ); 00480 # Couldn't find a working server in getReaderIndex()? 00481 if ( $i === false ) { 00482 $this->mLastError = 'No working slave server: ' . $this->mLastError; 00483 wfProfileOut( __METHOD__ ); 00484 00485 return $this->reportConnectionError(); 00486 } 00487 } 00488 00489 # Now we have an explicit index into the servers array 00490 $conn = $this->openConnection( $i, $wiki ); 00491 if ( !$conn ) { 00492 wfProfileOut( __METHOD__ ); 00493 00494 return $this->reportConnectionError(); 00495 } 00496 00497 wfProfileOut( __METHOD__ ); 00498 00499 return $conn; 00500 } 00501 00510 public function reuseConnection( $conn ) { 00511 $serverIndex = $conn->getLBInfo( 'serverIndex' ); 00512 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00513 if ( $serverIndex === null || $refCount === null ) { 00514 wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" ); 00515 00527 return; 00528 } 00529 00530 $dbName = $conn->getDBname(); 00531 $prefix = $conn->tablePrefix(); 00532 if ( strval( $prefix ) !== '' ) { 00533 $wiki = "$dbName-$prefix"; 00534 } else { 00535 $wiki = $dbName; 00536 } 00537 if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) { 00538 throw new MWException( __METHOD__ . ": connection not found, has " . 00539 "the connection been freed already?" ); 00540 } 00541 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); 00542 if ( $refCount <= 0 ) { 00543 $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn; 00544 unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] ); 00545 wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" ); 00546 } else { 00547 wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" ); 00548 } 00549 } 00550 00563 public function getConnectionRef( $db, $groups = array(), $wiki = false ) { 00564 return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) ); 00565 } 00566 00579 public function getLazyConnectionRef( $db, $groups = array(), $wiki = false ) { 00580 return new DBConnRef( $this, array( $db, $groups, $wiki ) ); 00581 } 00582 00597 function openConnection( $i, $wiki = false ) { 00598 wfProfileIn( __METHOD__ ); 00599 if ( $wiki !== false ) { 00600 $conn = $this->openForeignConnection( $i, $wiki ); 00601 wfProfileOut( __METHOD__ ); 00602 00603 return $conn; 00604 } 00605 if ( isset( $this->mConns['local'][$i][0] ) ) { 00606 $conn = $this->mConns['local'][$i][0]; 00607 } else { 00608 $server = $this->mServers[$i]; 00609 $server['serverIndex'] = $i; 00610 $conn = $this->reallyOpenConnection( $server, false ); 00611 if ( $conn->isOpen() ) { 00612 wfDebug( "Connected to database $i at {$this->mServers[$i]['host']}\n" ); 00613 $this->mConns['local'][$i][0] = $conn; 00614 } else { 00615 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" ); 00616 $this->mErrorConnection = $conn; 00617 $conn = false; 00618 } 00619 } 00620 wfProfileOut( __METHOD__ ); 00621 00622 return $conn; 00623 } 00624 00643 function openForeignConnection( $i, $wiki ) { 00644 wfProfileIn( __METHOD__ ); 00645 list( $dbName, $prefix ) = wfSplitWikiID( $wiki ); 00646 if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) { 00647 // Reuse an already-used connection 00648 $conn = $this->mConns['foreignUsed'][$i][$wiki]; 00649 wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" ); 00650 } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) { 00651 // Reuse a free connection for the same wiki 00652 $conn = $this->mConns['foreignFree'][$i][$wiki]; 00653 unset( $this->mConns['foreignFree'][$i][$wiki] ); 00654 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00655 wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" ); 00656 } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) { 00657 // Reuse a connection from another wiki 00658 $conn = reset( $this->mConns['foreignFree'][$i] ); 00659 $oldWiki = key( $this->mConns['foreignFree'][$i] ); 00660 00661 if ( !$conn->selectDB( $dbName ) ) { 00662 $this->mLastError = "Error selecting database $dbName on server " . 00663 $conn->getServer() . " from client host " . wfHostname() . "\n"; 00664 $this->mErrorConnection = $conn; 00665 $conn = false; 00666 } else { 00667 $conn->tablePrefix( $prefix ); 00668 unset( $this->mConns['foreignFree'][$i][$oldWiki] ); 00669 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00670 wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" ); 00671 } 00672 } else { 00673 // Open a new connection 00674 $server = $this->mServers[$i]; 00675 $server['serverIndex'] = $i; 00676 $server['foreignPoolRefCount'] = 0; 00677 $server['foreign'] = true; 00678 $conn = $this->reallyOpenConnection( $server, $dbName ); 00679 if ( !$conn->isOpen() ) { 00680 wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" ); 00681 $this->mErrorConnection = $conn; 00682 $conn = false; 00683 } else { 00684 $conn->tablePrefix( $prefix ); 00685 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00686 wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" ); 00687 } 00688 } 00689 00690 // Increment reference count 00691 if ( $conn ) { 00692 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00693 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 ); 00694 } 00695 wfProfileOut( __METHOD__ ); 00696 00697 return $conn; 00698 } 00699 00707 function isOpen( $index ) { 00708 if ( !is_integer( $index ) ) { 00709 return false; 00710 } 00711 00712 return (bool)$this->getAnyOpenConnection( $index ); 00713 } 00714 00725 function reallyOpenConnection( $server, $dbNameOverride = false ) { 00726 if ( !is_array( $server ) ) { 00727 throw new MWException( 'You must update your load-balancing configuration. ' . 00728 'See DefaultSettings.php entry for $wgDBservers.' ); 00729 } 00730 00731 if ( $dbNameOverride !== false ) { 00732 $server['dbname'] = $dbNameOverride; 00733 } 00734 00735 # Create object 00736 try { 00737 $db = DatabaseBase::factory( $server['type'], $server ); 00738 } catch ( DBConnectionError $e ) { 00739 // FIXME: This is probably the ugliest thing I have ever done to 00740 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS 00741 $db = $e->db; 00742 } 00743 00744 $db->setLBInfo( $server ); 00745 if ( isset( $server['fakeSlaveLag'] ) ) { 00746 $db->setFakeSlaveLag( $server['fakeSlaveLag'] ); 00747 } 00748 if ( isset( $server['fakeMaster'] ) ) { 00749 $db->setFakeMaster( true ); 00750 } 00751 00752 return $db; 00753 } 00754 00759 private function reportConnectionError() { 00760 $conn = $this->mErrorConnection; // The connection which caused the error 00761 00762 if ( !is_object( $conn ) ) { 00763 // No last connection, probably due to all servers being too busy 00764 wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}" ); 00765 00766 // If all servers were busy, mLastError will contain something sensible 00767 throw new DBConnectionError( null, $this->mLastError ); 00768 } else { 00769 $server = $conn->getProperty( 'mServer' ); 00770 wfLogDBError( "Connection error: {$this->mLastError} ({$server})" ); 00771 $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); // throws DBConnectionError 00772 } 00773 00774 return false; /* not reached */ 00775 } 00776 00780 function getWriterIndex() { 00781 return 0; 00782 } 00783 00790 function haveIndex( $i ) { 00791 return array_key_exists( $i, $this->mServers ); 00792 } 00793 00800 function isNonZeroLoad( $i ) { 00801 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0; 00802 } 00803 00809 function getServerCount() { 00810 return count( $this->mServers ); 00811 } 00812 00819 function getServerName( $i ) { 00820 if ( isset( $this->mServers[$i]['hostName'] ) ) { 00821 return $this->mServers[$i]['hostName']; 00822 } elseif ( isset( $this->mServers[$i]['host'] ) ) { 00823 return $this->mServers[$i]['host']; 00824 } else { 00825 return ''; 00826 } 00827 } 00828 00834 function getServerInfo( $i ) { 00835 if ( isset( $this->mServers[$i] ) ) { 00836 return $this->mServers[$i]; 00837 } else { 00838 return false; 00839 } 00840 } 00841 00848 function setServerInfo( $i, $serverInfo ) { 00849 $this->mServers[$i] = $serverInfo; 00850 } 00851 00856 function getMasterPos() { 00857 # If this entire request was served from a slave without opening a connection to the 00858 # master (however unlikely that may be), then we can fetch the position from the slave. 00859 $masterConn = $this->getAnyOpenConnection( 0 ); 00860 if ( !$masterConn ) { 00861 $serverCount = count( $this->mServers ); 00862 for ( $i = 1; $i < $serverCount; $i++ ) { 00863 $conn = $this->getAnyOpenConnection( $i ); 00864 if ( $conn ) { 00865 wfDebug( "Master pos fetched from slave\n" ); 00866 00867 return $conn->getSlavePos(); 00868 } 00869 } 00870 } else { 00871 wfDebug( "Master pos fetched from master\n" ); 00872 00873 return $masterConn->getMasterPos(); 00874 } 00875 00876 return false; 00877 } 00878 00882 function closeAll() { 00883 foreach ( $this->mConns as $conns2 ) { 00884 foreach ( $conns2 as $conns3 ) { 00886 foreach ( $conns3 as $conn ) { 00887 $conn->close(); 00888 } 00889 } 00890 } 00891 $this->mConns = array( 00892 'local' => array(), 00893 'foreignFree' => array(), 00894 'foreignUsed' => array(), 00895 ); 00896 } 00897 00904 function closeConnection( $conn ) { 00905 $done = false; 00906 foreach ( $this->mConns as $i1 => $conns2 ) { 00907 foreach ( $conns2 as $i2 => $conns3 ) { 00908 foreach ( $conns3 as $i3 => $candidateConn ) { 00909 if ( $conn === $candidateConn ) { 00910 $conn->close(); 00911 unset( $this->mConns[$i1][$i2][$i3] ); 00912 $done = true; 00913 break; 00914 } 00915 } 00916 } 00917 } 00918 if ( !$done ) { 00919 $conn->close(); 00920 } 00921 } 00922 00926 function commitAll() { 00927 foreach ( $this->mConns as $conns2 ) { 00928 foreach ( $conns2 as $conns3 ) { 00930 foreach ( $conns3 as $conn ) { 00931 if ( $conn->trxLevel() ) { 00932 $conn->commit( __METHOD__, 'flush' ); 00933 } 00934 } 00935 } 00936 } 00937 } 00938 00942 function commitMasterChanges() { 00943 // Always 0, but who knows.. :) 00944 $masterIndex = $this->getWriterIndex(); 00945 foreach ( $this->mConns as $conns2 ) { 00946 if ( empty( $conns2[$masterIndex] ) ) { 00947 continue; 00948 } 00950 foreach ( $conns2[$masterIndex] as $conn ) { 00951 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 00952 $conn->commit( __METHOD__, 'flush' ); 00953 } 00954 } 00955 } 00956 } 00957 00962 function rollbackMasterChanges() { 00963 // Always 0, but who knows.. :) 00964 $masterIndex = $this->getWriterIndex(); 00965 foreach ( $this->mConns as $conns2 ) { 00966 if ( empty( $conns2[$masterIndex] ) ) { 00967 continue; 00968 } 00970 foreach ( $conns2[$masterIndex] as $conn ) { 00971 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 00972 $conn->rollback( __METHOD__, 'flush' ); 00973 } 00974 } 00975 } 00976 } 00977 00982 function hasMasterConnection() { 00983 return $this->isOpen( $this->getWriterIndex() ); 00984 } 00985 00992 function hasMasterChanges() { 00993 // Always 0, but who knows.. :) 00994 $masterIndex = $this->getWriterIndex(); 00995 foreach ( $this->mConns as $conns2 ) { 00996 if ( empty( $conns2[$masterIndex] ) ) { 00997 continue; 00998 } 01000 foreach ( $conns2[$masterIndex] as $conn ) { 01001 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 01002 return true; 01003 } 01004 } 01005 } 01006 return false; 01007 } 01008 01013 function waitTimeout( $value = null ) { 01014 return wfSetVar( $this->mWaitTimeout, $value ); 01015 } 01016 01020 function getLaggedSlaveMode() { 01021 return $this->mLaggedSlaveMode; 01022 } 01023 01029 function allowLagged( $mode = null ) { 01030 if ( $mode === null ) { 01031 return $this->mAllowLagged; 01032 } 01033 $this->mAllowLagged = $mode; 01034 01035 return $this->mAllowLagged; 01036 } 01037 01041 function pingAll() { 01042 $success = true; 01043 foreach ( $this->mConns as $conns2 ) { 01044 foreach ( $conns2 as $conns3 ) { 01046 foreach ( $conns3 as $conn ) { 01047 if ( !$conn->ping() ) { 01048 $success = false; 01049 } 01050 } 01051 } 01052 } 01053 01054 return $success; 01055 } 01056 01062 function forEachOpenConnection( $callback, $params = array() ) { 01063 foreach ( $this->mConns as $conns2 ) { 01064 foreach ( $conns2 as $conns3 ) { 01065 foreach ( $conns3 as $conn ) { 01066 $mergedParams = array_merge( array( $conn ), $params ); 01067 call_user_func_array( $callback, $mergedParams ); 01068 } 01069 } 01070 } 01071 } 01072 01082 function getMaxLag( $wiki = false ) { 01083 $maxLag = -1; 01084 $host = ''; 01085 $maxIndex = 0; 01086 01087 if ( $this->getServerCount() <= 1 ) { // no replication = no lag 01088 return array( $host, $maxLag, $maxIndex ); 01089 } 01090 01091 // Try to get the max lag info from the server cache 01092 $key = 'loadbalancer:maxlag:cluster:' . $this->mServers[0]['host']; 01093 $cache = ObjectCache::newAccelerator( array(), 'hash' ); 01094 $maxLagInfo = $cache->get( $key ); // (host, lag, index) 01095 01096 // Fallback to connecting to each slave and getting the lag 01097 if ( !$maxLagInfo ) { 01098 foreach ( $this->mServers as $i => $conn ) { 01099 if ( $i == $this->getWriterIndex() ) { 01100 continue; // nothing to check 01101 } 01102 $conn = false; 01103 if ( $wiki === false ) { 01104 $conn = $this->getAnyOpenConnection( $i ); 01105 } 01106 if ( !$conn ) { 01107 $conn = $this->openConnection( $i, $wiki ); 01108 } 01109 if ( !$conn ) { 01110 continue; 01111 } 01112 $lag = $conn->getLag(); 01113 if ( $lag > $maxLag ) { 01114 $maxLag = $lag; 01115 $host = $this->mServers[$i]['host']; 01116 $maxIndex = $i; 01117 } 01118 } 01119 $maxLagInfo = array( $host, $maxLag, $maxIndex ); 01120 $cache->set( $key, $maxLagInfo, 5 ); 01121 } 01122 01123 return $maxLagInfo; 01124 } 01125 01133 function getLagTimes( $wiki = false ) { 01134 # Try process cache 01135 if ( isset( $this->mLagTimes ) ) { 01136 return $this->mLagTimes; 01137 } 01138 if ( $this->getServerCount() == 1 ) { 01139 # No replication 01140 $this->mLagTimes = array( 0 => 0 ); 01141 } else { 01142 # Send the request to the load monitor 01143 $this->mLagTimes = $this->getLoadMonitor()->getLagTimes( 01144 array_keys( $this->mServers ), $wiki ); 01145 } 01146 01147 return $this->mLagTimes; 01148 } 01149 01164 function safeGetLag( $conn ) { 01165 if ( $this->getServerCount() == 1 ) { 01166 return 0; 01167 } else { 01168 return $conn->getLag(); 01169 } 01170 } 01171 01175 function clearLagTimeCache() { 01176 $this->mLagTimes = null; 01177 } 01178 } 01179 01187 class DBConnRef implements IDatabase { 01189 protected $lb; 01190 01192 protected $conn; 01193 01195 protected $params; 01196 01201 public function __construct( LoadBalancer $lb, $conn ) { 01202 $this->lb = $lb; 01203 if ( $conn instanceof DatabaseBase ) { 01204 $this->conn = $conn; 01205 } else { 01206 $this->params = $conn; 01207 } 01208 } 01209 01210 public function __call( $name, $arguments ) { 01211 if ( $this->conn === null ) { 01212 list( $db, $groups, $wiki ) = $this->params; 01213 $this->conn = $this->lb->getConnection( $db, $groups, $wiki ); 01214 } 01215 01216 return call_user_func_array( array( $this->conn, $name ), $arguments ); 01217 } 01218 01219 function __destruct() { 01220 if ( $this->conn !== null ) { 01221 $this->lb->reuseConnection( $this->conn ); 01222 } 01223 } 01224 }