MediaWiki
REL1_22
|
00001 <?php 00030 class LoadBalancer { 00031 private $mServers, $mConns, $mLoads, $mGroupLoads; 00032 private $mErrorConnection; 00033 private $mReadIndex, $mAllowLagged; 00034 private $mWaitForPos, $mWaitTimeout; 00035 private $mLaggedSlaveMode, $mLastError = 'Unknown error'; 00036 private $mParentInfo, $mLagTimes; 00037 private $mLoadMonitorClass, $mLoadMonitor; 00038 00046 function __construct( $params ) { 00047 if ( !isset( $params['servers'] ) ) { 00048 throw new MWException( __CLASS__ . ': missing servers parameter' ); 00049 } 00050 $this->mServers = $params['servers']; 00051 00052 if ( isset( $params['waitTimeout'] ) ) { 00053 $this->mWaitTimeout = $params['waitTimeout']; 00054 } else { 00055 $this->mWaitTimeout = 10; 00056 } 00057 00058 $this->mReadIndex = -1; 00059 $this->mWriteIndex = -1; 00060 $this->mConns = array( 00061 'local' => array(), 00062 'foreignUsed' => array(), 00063 'foreignFree' => array() ); 00064 $this->mLoads = array(); 00065 $this->mWaitForPos = false; 00066 $this->mLaggedSlaveMode = false; 00067 $this->mErrorConnection = false; 00068 $this->mAllowLagged = false; 00069 00070 if ( isset( $params['loadMonitor'] ) ) { 00071 $this->mLoadMonitorClass = $params['loadMonitor']; 00072 } else { 00073 $master = reset( $params['servers'] ); 00074 if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) { 00075 $this->mLoadMonitorClass = 'LoadMonitor_MySQL'; 00076 } else { 00077 $this->mLoadMonitorClass = 'LoadMonitor_Null'; 00078 } 00079 } 00080 00081 foreach ( $params['servers'] as $i => $server ) { 00082 $this->mLoads[$i] = $server['load']; 00083 if ( isset( $server['groupLoads'] ) ) { 00084 foreach ( $server['groupLoads'] as $group => $ratio ) { 00085 if ( !isset( $this->mGroupLoads[$group] ) ) { 00086 $this->mGroupLoads[$group] = array(); 00087 } 00088 $this->mGroupLoads[$group][$i] = $ratio; 00089 } 00090 } 00091 } 00092 } 00093 00099 function getLoadMonitor() { 00100 if ( !isset( $this->mLoadMonitor ) ) { 00101 $class = $this->mLoadMonitorClass; 00102 $this->mLoadMonitor = new $class( $this ); 00103 } 00104 return $this->mLoadMonitor; 00105 } 00106 00112 function parentInfo( $x = null ) { 00113 return wfSetVar( $this->mParentInfo, $x ); 00114 } 00115 00126 function pickRandom( $weights ) { 00127 return ArrayUtils::pickRandom( $weights ); 00128 } 00129 00135 function getRandomNonLagged( $loads, $wiki = false ) { 00136 # Unset excessively lagged servers 00137 $lags = $this->getLagTimes( $wiki ); 00138 foreach ( $lags as $i => $lag ) { 00139 if ( $i != 0 ) { 00140 if ( $lag === false ) { 00141 wfDebugLog( 'replication', "Server #$i is not replicating\n" ); 00142 unset( $loads[$i] ); 00143 } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) { 00144 wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)\n" ); 00145 unset( $loads[$i] ); 00146 } 00147 } 00148 } 00149 00150 # Find out if all the slaves with non-zero load are lagged 00151 $sum = 0; 00152 foreach ( $loads as $load ) { 00153 $sum += $load; 00154 } 00155 if ( $sum == 0 ) { 00156 # No appropriate DB servers except maybe the master and some slaves with zero load 00157 # Do NOT use the master 00158 # Instead, this function will return false, triggering read-only mode, 00159 # and a lagged slave will be used instead. 00160 return false; 00161 } 00162 00163 if ( count( $loads ) == 0 ) { 00164 return false; 00165 } 00166 00167 #wfDebugLog( 'connect', var_export( $loads, true ) ); 00168 00169 # Return a random representative of the remainder 00170 return ArrayUtils::pickRandom( $loads ); 00171 } 00172 00184 function getReaderIndex( $group = false, $wiki = false ) { 00185 global $wgReadOnly, $wgDBClusterTimeout, $wgDBAvgStatusPoll, $wgDBtype; 00186 00187 # @todo FIXME: For now, only go through all this for mysql databases 00188 if ( $wgDBtype != 'mysql' ) { 00189 return $this->getWriterIndex(); 00190 } 00191 00192 if ( count( $this->mServers ) == 1 ) { 00193 # Skip the load balancing if there's only one server 00194 return 0; 00195 } elseif ( $group === false and $this->mReadIndex >= 0 ) { 00196 # Shortcut if generic reader exists already 00197 return $this->mReadIndex; 00198 } 00199 00200 wfProfileIn( __METHOD__ ); 00201 00202 $totalElapsed = 0; 00203 00204 # convert from seconds to microseconds 00205 $timeout = $wgDBClusterTimeout * 1e6; 00206 00207 # Find the relevant load array 00208 if ( $group !== false ) { 00209 if ( isset( $this->mGroupLoads[$group] ) ) { 00210 $nonErrorLoads = $this->mGroupLoads[$group]; 00211 } else { 00212 # No loads for this group, return false and the caller can use some other group 00213 wfDebug( __METHOD__ . ": no loads for group $group\n" ); 00214 wfProfileOut( __METHOD__ ); 00215 return false; 00216 } 00217 } else { 00218 $nonErrorLoads = $this->mLoads; 00219 } 00220 00221 if ( !$nonErrorLoads ) { 00222 wfProfileOut( __METHOD__ ); 00223 throw new MWException( "Empty server array given to LoadBalancer" ); 00224 } 00225 00226 # Scale the configured load ratios according to the dynamic load (if the load monitor supports it) 00227 $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki ); 00228 00229 $laggedSlaveMode = false; 00230 00231 # First try quickly looking through the available servers for a server that 00232 # meets our criteria 00233 do { 00234 $totalThreadsConnected = 0; 00235 $overloadedServers = 0; 00236 $currentLoads = $nonErrorLoads; 00237 while ( count( $currentLoads ) ) { 00238 if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) { 00239 $i = ArrayUtils::pickRandom( $currentLoads ); 00240 } else { 00241 $i = $this->getRandomNonLagged( $currentLoads, $wiki ); 00242 if ( $i === false && count( $currentLoads ) != 0 ) { 00243 # All slaves lagged. Switch to read-only mode 00244 wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode\n" ); 00245 $wgReadOnly = 'The database has been automatically locked ' . 00246 'while the slave database servers catch up to the master'; 00247 $i = ArrayUtils::pickRandom( $currentLoads ); 00248 $laggedSlaveMode = true; 00249 } 00250 } 00251 00252 if ( $i === false ) { 00253 # pickRandom() returned false 00254 # This is permanent and means the configuration or the load monitor 00255 # wants us to return false. 00256 wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false\n" ); 00257 wfProfileOut( __METHOD__ ); 00258 return false; 00259 } 00260 00261 wfDebugLog( 'connect', __METHOD__ . ": Using reader #$i: {$this->mServers[$i]['host']}...\n" ); 00262 $conn = $this->openConnection( $i, $wiki ); 00263 00264 if ( !$conn ) { 00265 wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki\n" ); 00266 unset( $nonErrorLoads[$i] ); 00267 unset( $currentLoads[$i] ); 00268 continue; 00269 } 00270 00271 // Perform post-connection backoff 00272 $threshold = isset( $this->mServers[$i]['max threads'] ) 00273 ? $this->mServers[$i]['max threads'] : false; 00274 $backoff = $this->getLoadMonitor()->postConnectionBackoff( $conn, $threshold ); 00275 00276 // Decrement reference counter, we are finished with this connection. 00277 // It will be incremented for the caller later. 00278 if ( $wiki !== false ) { 00279 $this->reuseConnection( $conn ); 00280 } 00281 00282 if ( $backoff ) { 00283 # Post-connection overload, don't use this server for now 00284 $totalThreadsConnected += $backoff; 00285 $overloadedServers++; 00286 unset( $currentLoads[$i] ); 00287 } else { 00288 # Return this server 00289 break 2; 00290 } 00291 } 00292 00293 # No server found yet 00294 $i = false; 00295 00296 # If all servers were down, quit now 00297 if ( !count( $nonErrorLoads ) ) { 00298 wfDebugLog( 'connect', "All servers down\n" ); 00299 break; 00300 } 00301 00302 # Some servers must have been overloaded 00303 if ( $overloadedServers == 0 ) { 00304 throw new MWException( __METHOD__ . ": unexpectedly found no overloaded servers" ); 00305 } 00306 # Back off for a while 00307 # Scale the sleep time by the number of connected threads, to produce a 00308 # roughly constant global poll rate 00309 $avgThreads = $totalThreadsConnected / $overloadedServers; 00310 $totalElapsed += $this->sleep( $wgDBAvgStatusPoll * $avgThreads ); 00311 } while ( $totalElapsed < $timeout ); 00312 00313 if ( $totalElapsed >= $timeout ) { 00314 wfDebugLog( 'connect', "All servers busy\n" ); 00315 $this->mErrorConnection = false; 00316 $this->mLastError = 'All servers busy'; 00317 } 00318 00319 if ( $i !== false ) { 00320 # Slave connection successful 00321 # Wait for the session master pos for a short time 00322 if ( $this->mWaitForPos && $i > 0 ) { 00323 if ( !$this->doWait( $i ) ) { 00324 $this->mServers[$i]['slave pos'] = $conn->getSlavePos(); 00325 } 00326 } 00327 if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $i !== false ) { 00328 $this->mReadIndex = $i; 00329 } 00330 } 00331 wfProfileOut( __METHOD__ ); 00332 return $i; 00333 } 00334 00340 function sleep( $t ) { 00341 wfProfileIn( __METHOD__ ); 00342 wfDebug( __METHOD__ . ": waiting $t us\n" ); 00343 usleep( $t ); 00344 wfProfileOut( __METHOD__ ); 00345 return $t; 00346 } 00347 00354 public function waitFor( $pos ) { 00355 wfProfileIn( __METHOD__ ); 00356 $this->mWaitForPos = $pos; 00357 $i = $this->mReadIndex; 00358 00359 if ( $i > 0 ) { 00360 if ( !$this->doWait( $i ) ) { 00361 $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos(); 00362 $this->mLaggedSlaveMode = true; 00363 } 00364 } 00365 wfProfileOut( __METHOD__ ); 00366 } 00367 00372 public function waitForAll( $pos ) { 00373 wfProfileIn( __METHOD__ ); 00374 $this->mWaitForPos = $pos; 00375 for ( $i = 1; $i < count( $this->mServers ); $i++ ) { 00376 $this->doWait( $i, true ); 00377 } 00378 wfProfileOut( __METHOD__ ); 00379 } 00380 00388 function getAnyOpenConnection( $i ) { 00389 foreach ( $this->mConns as $conns ) { 00390 if ( !empty( $conns[$i] ) ) { 00391 return reset( $conns[$i] ); 00392 } 00393 } 00394 return false; 00395 } 00396 00403 protected function doWait( $index, $open = false ) { 00404 # Find a connection to wait on 00405 $conn = $this->getAnyOpenConnection( $index ); 00406 if ( !$conn ) { 00407 if ( !$open ) { 00408 wfDebug( __METHOD__ . ": no connection open\n" ); 00409 return false; 00410 } else { 00411 $conn = $this->openConnection( $index, '' ); 00412 if ( !$conn ) { 00413 wfDebug( __METHOD__ . ": failed to open connection\n" ); 00414 return false; 00415 } 00416 } 00417 } 00418 00419 wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" ); 00420 $result = $conn->masterPosWait( $this->mWaitForPos, $this->mWaitTimeout ); 00421 00422 if ( $result == -1 || is_null( $result ) ) { 00423 # Timed out waiting for slave, use master instead 00424 wfDebug( __METHOD__ . ": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); 00425 return false; 00426 } else { 00427 wfDebug( __METHOD__ . ": Done\n" ); 00428 return true; 00429 } 00430 } 00431 00443 public function &getConnection( $i, $groups = array(), $wiki = false ) { 00444 wfProfileIn( __METHOD__ ); 00445 00446 if ( $i == DB_LAST ) { 00447 wfProfileOut( __METHOD__ ); 00448 throw new MWException( 'Attempt to call ' . __METHOD__ . ' with deprecated server index DB_LAST' ); 00449 } elseif ( $i === null || $i === false ) { 00450 wfProfileOut( __METHOD__ ); 00451 throw new MWException( 'Attempt to call ' . __METHOD__ . ' with invalid server index' ); 00452 } 00453 00454 if ( $wiki === wfWikiID() ) { 00455 $wiki = false; 00456 } 00457 00458 # Query groups 00459 if ( $i == DB_MASTER ) { 00460 $i = $this->getWriterIndex(); 00461 } elseif ( !is_array( $groups ) ) { 00462 $groupIndex = $this->getReaderIndex( $groups, $wiki ); 00463 if ( $groupIndex !== false ) { 00464 $serverName = $this->getServerName( $groupIndex ); 00465 wfDebug( __METHOD__ . ": using server $serverName for group $groups\n" ); 00466 $i = $groupIndex; 00467 } 00468 } else { 00469 foreach ( $groups as $group ) { 00470 $groupIndex = $this->getReaderIndex( $group, $wiki ); 00471 if ( $groupIndex !== false ) { 00472 $serverName = $this->getServerName( $groupIndex ); 00473 wfDebug( __METHOD__ . ": using server $serverName for group $group\n" ); 00474 $i = $groupIndex; 00475 break; 00476 } 00477 } 00478 } 00479 00480 # Operation-based index 00481 if ( $i == DB_SLAVE ) { 00482 $this->mLastError = 'Unknown error'; // reset error string 00483 $i = $this->getReaderIndex( false, $wiki ); 00484 # Couldn't find a working server in getReaderIndex()? 00485 if ( $i === false ) { 00486 $this->mLastError = 'No working slave server: ' . $this->mLastError; 00487 wfProfileOut( __METHOD__ ); 00488 return $this->reportConnectionError(); 00489 } 00490 } 00491 00492 # Now we have an explicit index into the servers array 00493 $conn = $this->openConnection( $i, $wiki ); 00494 if ( !$conn ) { 00495 wfProfileOut( __METHOD__ ); 00496 return $this->reportConnectionError(); 00497 } 00498 00499 wfProfileOut( __METHOD__ ); 00500 return $conn; 00501 } 00502 00511 public function reuseConnection( $conn ) { 00512 $serverIndex = $conn->getLBInfo( 'serverIndex' ); 00513 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00514 $dbName = $conn->getDBname(); 00515 $prefix = $conn->tablePrefix(); 00516 if ( strval( $prefix ) !== '' ) { 00517 $wiki = "$dbName-$prefix"; 00518 } else { 00519 $wiki = $dbName; 00520 } 00521 if ( $serverIndex === null || $refCount === null ) { 00522 wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" ); 00533 return; 00534 } 00535 if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) { 00536 throw new MWException( __METHOD__ . ": connection not found, has the connection been freed already?" ); 00537 } 00538 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); 00539 if ( $refCount <= 0 ) { 00540 $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn; 00541 unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] ); 00542 wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" ); 00543 } else { 00544 wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" ); 00545 } 00546 } 00547 00560 public function getConnectionRef( $db, $groups = array(), $wiki = false ) { 00561 return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) ); 00562 } 00563 00576 public function getLazyConnectionRef( $db, $groups = array(), $wiki = false ) { 00577 return new DBConnRef( $this, array( $db, $groups, $wiki ) ); 00578 } 00579 00594 function openConnection( $i, $wiki = false ) { 00595 wfProfileIn( __METHOD__ ); 00596 if ( $wiki !== false ) { 00597 $conn = $this->openForeignConnection( $i, $wiki ); 00598 wfProfileOut( __METHOD__ ); 00599 return $conn; 00600 } 00601 if ( isset( $this->mConns['local'][$i][0] ) ) { 00602 $conn = $this->mConns['local'][$i][0]; 00603 } else { 00604 $server = $this->mServers[$i]; 00605 $server['serverIndex'] = $i; 00606 $conn = $this->reallyOpenConnection( $server, false ); 00607 if ( $conn->isOpen() ) { 00608 wfDebug( "Connected to database $i at {$this->mServers[$i]['host']}\n" ); 00609 $this->mConns['local'][$i][0] = $conn; 00610 } else { 00611 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" ); 00612 $this->mErrorConnection = $conn; 00613 $conn = false; 00614 } 00615 } 00616 wfProfileOut( __METHOD__ ); 00617 return $conn; 00618 } 00619 00638 function openForeignConnection( $i, $wiki ) { 00639 wfProfileIn( __METHOD__ ); 00640 list( $dbName, $prefix ) = wfSplitWikiID( $wiki ); 00641 if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) { 00642 // Reuse an already-used connection 00643 $conn = $this->mConns['foreignUsed'][$i][$wiki]; 00644 wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" ); 00645 } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) { 00646 // Reuse a free connection for the same wiki 00647 $conn = $this->mConns['foreignFree'][$i][$wiki]; 00648 unset( $this->mConns['foreignFree'][$i][$wiki] ); 00649 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00650 wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" ); 00651 } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) { 00652 // Reuse a connection from another wiki 00653 $conn = reset( $this->mConns['foreignFree'][$i] ); 00654 $oldWiki = key( $this->mConns['foreignFree'][$i] ); 00655 00656 if ( !$conn->selectDB( $dbName ) ) { 00657 $this->mLastError = "Error selecting database $dbName on server " . 00658 $conn->getServer() . " from client host " . wfHostname() . "\n"; 00659 $this->mErrorConnection = $conn; 00660 $conn = false; 00661 } else { 00662 $conn->tablePrefix( $prefix ); 00663 unset( $this->mConns['foreignFree'][$i][$oldWiki] ); 00664 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00665 wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" ); 00666 } 00667 } else { 00668 // Open a new connection 00669 $server = $this->mServers[$i]; 00670 $server['serverIndex'] = $i; 00671 $server['foreignPoolRefCount'] = 0; 00672 $server['foreign'] = true; 00673 $conn = $this->reallyOpenConnection( $server, $dbName ); 00674 if ( !$conn->isOpen() ) { 00675 wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" ); 00676 $this->mErrorConnection = $conn; 00677 $conn = false; 00678 } else { 00679 $conn->tablePrefix( $prefix ); 00680 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00681 wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" ); 00682 } 00683 } 00684 00685 // Increment reference count 00686 if ( $conn ) { 00687 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00688 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 ); 00689 } 00690 wfProfileOut( __METHOD__ ); 00691 return $conn; 00692 } 00693 00701 function isOpen( $index ) { 00702 if ( !is_integer( $index ) ) { 00703 return false; 00704 } 00705 return (bool)$this->getAnyOpenConnection( $index ); 00706 } 00707 00718 function reallyOpenConnection( $server, $dbNameOverride = false ) { 00719 if ( !is_array( $server ) ) { 00720 throw new MWException( 'You must update your load-balancing configuration. ' . 00721 'See DefaultSettings.php entry for $wgDBservers.' ); 00722 } 00723 00724 if ( $dbNameOverride !== false ) { 00725 $server['dbname'] = $dbNameOverride; 00726 } 00727 00728 # Create object 00729 try { 00730 $db = DatabaseBase::factory( $server['type'], $server ); 00731 } catch ( DBConnectionError $e ) { 00732 // FIXME: This is probably the ugliest thing I have ever done to 00733 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS 00734 $db = $e->db; 00735 } 00736 00737 $db->setLBInfo( $server ); 00738 if ( isset( $server['fakeSlaveLag'] ) ) { 00739 $db->setFakeSlaveLag( $server['fakeSlaveLag'] ); 00740 } 00741 if ( isset( $server['fakeMaster'] ) ) { 00742 $db->setFakeMaster( true ); 00743 } 00744 return $db; 00745 } 00746 00751 private function reportConnectionError() { 00752 $conn = $this->mErrorConnection; // The connection which caused the error 00753 00754 if ( !is_object( $conn ) ) { 00755 // No last connection, probably due to all servers being too busy 00756 wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}\n" ); 00757 00758 // If all servers were busy, mLastError will contain something sensible 00759 throw new DBConnectionError( null, $this->mLastError ); 00760 } else { 00761 $server = $conn->getProperty( 'mServer' ); 00762 wfLogDBError( "Connection error: {$this->mLastError} ({$server})\n" ); 00763 $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); // throws DBConnectionError 00764 } 00765 return false; /* not reached */ 00766 } 00767 00771 function getWriterIndex() { 00772 return 0; 00773 } 00774 00781 function haveIndex( $i ) { 00782 return array_key_exists( $i, $this->mServers ); 00783 } 00784 00791 function isNonZeroLoad( $i ) { 00792 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0; 00793 } 00794 00800 function getServerCount() { 00801 return count( $this->mServers ); 00802 } 00803 00810 function getServerName( $i ) { 00811 if ( isset( $this->mServers[$i]['hostName'] ) ) { 00812 return $this->mServers[$i]['hostName']; 00813 } elseif ( isset( $this->mServers[$i]['host'] ) ) { 00814 return $this->mServers[$i]['host']; 00815 } else { 00816 return ''; 00817 } 00818 } 00819 00825 function getServerInfo( $i ) { 00826 if ( isset( $this->mServers[$i] ) ) { 00827 return $this->mServers[$i]; 00828 } else { 00829 return false; 00830 } 00831 } 00832 00838 function setServerInfo( $i, $serverInfo ) { 00839 $this->mServers[$i] = $serverInfo; 00840 } 00841 00846 function getMasterPos() { 00847 # If this entire request was served from a slave without opening a connection to the 00848 # master (however unlikely that may be), then we can fetch the position from the slave. 00849 $masterConn = $this->getAnyOpenConnection( 0 ); 00850 if ( !$masterConn ) { 00851 for ( $i = 1; $i < count( $this->mServers ); $i++ ) { 00852 $conn = $this->getAnyOpenConnection( $i ); 00853 if ( $conn ) { 00854 wfDebug( "Master pos fetched from slave\n" ); 00855 return $conn->getSlavePos(); 00856 } 00857 } 00858 } else { 00859 wfDebug( "Master pos fetched from master\n" ); 00860 return $masterConn->getMasterPos(); 00861 } 00862 return false; 00863 } 00864 00868 function closeAll() { 00869 foreach ( $this->mConns as $conns2 ) { 00870 foreach ( $conns2 as $conns3 ) { 00871 foreach ( $conns3 as $conn ) { 00872 $conn->close(); 00873 } 00874 } 00875 } 00876 $this->mConns = array( 00877 'local' => array(), 00878 'foreignFree' => array(), 00879 'foreignUsed' => array(), 00880 ); 00881 } 00882 00889 function closeConnecton( $conn ) { 00890 wfDeprecated( __METHOD__, '1.18' ); 00891 $this->closeConnection( $conn ); 00892 } 00893 00900 function closeConnection( $conn ) { 00901 $done = false; 00902 foreach ( $this->mConns as $i1 => $conns2 ) { 00903 foreach ( $conns2 as $i2 => $conns3 ) { 00904 foreach ( $conns3 as $i3 => $candidateConn ) { 00905 if ( $conn === $candidateConn ) { 00906 $conn->close(); 00907 unset( $this->mConns[$i1][$i2][$i3] ); 00908 $done = true; 00909 break; 00910 } 00911 } 00912 } 00913 } 00914 if ( !$done ) { 00915 $conn->close(); 00916 } 00917 } 00918 00922 function commitAll() { 00923 foreach ( $this->mConns as $conns2 ) { 00924 foreach ( $conns2 as $conns3 ) { 00925 foreach ( $conns3 as $conn ) { 00926 if ( $conn->trxLevel() ) { 00927 $conn->commit( __METHOD__, 'flush' ); 00928 } 00929 } 00930 } 00931 } 00932 } 00933 00937 function commitMasterChanges() { 00938 // Always 0, but who knows.. :) 00939 $masterIndex = $this->getWriterIndex(); 00940 foreach ( $this->mConns as $conns2 ) { 00941 if ( empty( $conns2[$masterIndex] ) ) { 00942 continue; 00943 } 00944 foreach ( $conns2[$masterIndex] as $conn ) { 00945 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 00946 $conn->commit( __METHOD__, 'flush' ); 00947 } 00948 } 00949 } 00950 } 00951 00956 function waitTimeout( $value = null ) { 00957 return wfSetVar( $this->mWaitTimeout, $value ); 00958 } 00959 00963 function getLaggedSlaveMode() { 00964 return $this->mLaggedSlaveMode; 00965 } 00966 00972 function allowLagged( $mode = null ) { 00973 if ( $mode === null ) { 00974 return $this->mAllowLagged; 00975 } 00976 $this->mAllowLagged = $mode; 00977 return $this->mAllowLagged; 00978 } 00979 00983 function pingAll() { 00984 $success = true; 00985 foreach ( $this->mConns as $conns2 ) { 00986 foreach ( $conns2 as $conns3 ) { 00987 foreach ( $conns3 as $conn ) { 00988 if ( !$conn->ping() ) { 00989 $success = false; 00990 } 00991 } 00992 } 00993 } 00994 return $success; 00995 } 00996 01002 function forEachOpenConnection( $callback, $params = array() ) { 01003 foreach ( $this->mConns as $conns2 ) { 01004 foreach ( $conns2 as $conns3 ) { 01005 foreach ( $conns3 as $conn ) { 01006 $mergedParams = array_merge( array( $conn ), $params ); 01007 call_user_func_array( $callback, $mergedParams ); 01008 } 01009 } 01010 } 01011 } 01012 01023 function getMaxLag( $wiki = false ) { 01024 $maxLag = -1; 01025 $host = ''; 01026 $maxIndex = 0; 01027 if ( $this->getServerCount() > 1 ) { // no replication = no lag 01028 foreach ( $this->mServers as $i => $conn ) { 01029 $conn = false; 01030 if ( $wiki === false ) { 01031 $conn = $this->getAnyOpenConnection( $i ); 01032 } 01033 if ( !$conn ) { 01034 $conn = $this->openConnection( $i, $wiki ); 01035 } 01036 if ( !$conn ) { 01037 continue; 01038 } 01039 $lag = $conn->getLag(); 01040 if ( $lag > $maxLag ) { 01041 $maxLag = $lag; 01042 $host = $this->mServers[$i]['host']; 01043 $maxIndex = $i; 01044 } 01045 } 01046 } 01047 return array( $host, $maxLag, $maxIndex ); 01048 } 01049 01058 function getLagTimes( $wiki = false ) { 01059 # Try process cache 01060 if ( isset( $this->mLagTimes ) ) { 01061 return $this->mLagTimes; 01062 } 01063 if ( $this->getServerCount() == 1 ) { 01064 # No replication 01065 $this->mLagTimes = array( 0 => 0 ); 01066 } else { 01067 # Send the request to the load monitor 01068 $this->mLagTimes = $this->getLoadMonitor()->getLagTimes( 01069 array_keys( $this->mServers ), $wiki ); 01070 } 01071 return $this->mLagTimes; 01072 } 01073 01089 function safeGetLag( $conn ) { 01090 if ( $this->getServerCount() == 1 ) { 01091 return 0; 01092 } else { 01093 return $conn->getLag(); 01094 } 01095 } 01096 01100 function clearLagTimeCache() { 01101 $this->mLagTimes = null; 01102 } 01103 } 01104 01112 class DBConnRef implements IDatabase { 01114 protected $lb; 01116 protected $conn; 01118 protected $params; 01119 01124 public function __construct( LoadBalancer $lb, $conn ) { 01125 $this->lb = $lb; 01126 if ( $conn instanceof DatabaseBase ) { 01127 $this->conn = $conn; 01128 } else { 01129 $this->params = $conn; 01130 } 01131 } 01132 01133 public function __call( $name, $arguments ) { 01134 if ( $this->conn === null ) { 01135 list( $db, $groups, $wiki ) = $this->params; 01136 $this->conn = $this->lb->getConnection( $db, $groups, $wiki ); 01137 } 01138 return call_user_func_array( array( $this->conn, $name ), $arguments ); 01139 } 01140 01141 function __destruct() { 01142 if ( $this->conn !== null ) { 01143 $this->lb->reuseConnection( $this->conn ); 01144 } 01145 } 01146 }