MediaWiki
REL1_20
|
00001 <?php 00030 class LoadBalancer { 00031 private $mServers, $mConns, $mLoads, $mGroupLoads; 00032 private $mErrorConnection; 00033 private $mReadIndex, $mAllowLagged; 00034 private $mWaitForPos, $mWaitTimeout; 00035 private $mLaggedSlaveMode, $mLastError = 'Unknown error'; 00036 private $mParentInfo, $mLagTimes; 00037 private $mLoadMonitorClass, $mLoadMonitor; 00038 00045 function __construct( $params ) { 00046 if ( !isset( $params['servers'] ) ) { 00047 throw new MWException( __CLASS__.': missing servers parameter' ); 00048 } 00049 $this->mServers = $params['servers']; 00050 00051 if ( isset( $params['waitTimeout'] ) ) { 00052 $this->mWaitTimeout = $params['waitTimeout']; 00053 } else { 00054 $this->mWaitTimeout = 10; 00055 } 00056 00057 $this->mReadIndex = -1; 00058 $this->mWriteIndex = -1; 00059 $this->mConns = array( 00060 'local' => array(), 00061 'foreignUsed' => array(), 00062 'foreignFree' => array() ); 00063 $this->mLoads = array(); 00064 $this->mWaitForPos = false; 00065 $this->mLaggedSlaveMode = false; 00066 $this->mErrorConnection = false; 00067 $this->mAllowLagged = false; 00068 00069 if ( isset( $params['loadMonitor'] ) ) { 00070 $this->mLoadMonitorClass = $params['loadMonitor']; 00071 } else { 00072 $master = reset( $params['servers'] ); 00073 if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) { 00074 $this->mLoadMonitorClass = 'LoadMonitor_MySQL'; 00075 } else { 00076 $this->mLoadMonitorClass = 'LoadMonitor_Null'; 00077 } 00078 } 00079 00080 foreach( $params['servers'] as $i => $server ) { 00081 $this->mLoads[$i] = $server['load']; 00082 if ( isset( $server['groupLoads'] ) ) { 00083 foreach ( $server['groupLoads'] as $group => $ratio ) { 00084 if ( !isset( $this->mGroupLoads[$group] ) ) { 00085 $this->mGroupLoads[$group] = array(); 00086 } 00087 $this->mGroupLoads[$group][$i] = $ratio; 00088 } 00089 } 00090 } 00091 } 00092 00098 function getLoadMonitor() { 00099 if ( !isset( $this->mLoadMonitor ) ) { 00100 $class = $this->mLoadMonitorClass; 00101 $this->mLoadMonitor = new $class( $this ); 00102 } 00103 return $this->mLoadMonitor; 00104 } 00105 00111 function parentInfo( $x = null ) { 00112 return wfSetVar( $this->mParentInfo, $x ); 00113 } 00114 00123 function pickRandom( $weights ) { 00124 if ( !is_array( $weights ) || count( $weights ) == 0 ) { 00125 return false; 00126 } 00127 00128 $sum = array_sum( $weights ); 00129 if ( $sum == 0 ) { 00130 # No loads on any of them 00131 # In previous versions, this triggered an unweighted random selection, 00132 # but this feature has been removed as of April 2006 to allow for strict 00133 # separation of query groups. 00134 return false; 00135 } 00136 $max = mt_getrandmax(); 00137 $rand = mt_rand( 0, $max ) / $max * $sum; 00138 00139 $sum = 0; 00140 foreach ( $weights as $i => $w ) { 00141 $sum += $w; 00142 if ( $sum >= $rand ) { 00143 break; 00144 } 00145 } 00146 return $i; 00147 } 00148 00154 function getRandomNonLagged( $loads, $wiki = false ) { 00155 # Unset excessively lagged servers 00156 $lags = $this->getLagTimes( $wiki ); 00157 foreach ( $lags as $i => $lag ) { 00158 if ( $i != 0 ) { 00159 if ( $lag === false ) { 00160 wfDebugLog( 'replication', "Server #$i is not replicating\n" ); 00161 unset( $loads[$i] ); 00162 } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) { 00163 wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)\n" ); 00164 unset( $loads[$i] ); 00165 } 00166 } 00167 } 00168 00169 # Find out if all the slaves with non-zero load are lagged 00170 $sum = 0; 00171 foreach ( $loads as $load ) { 00172 $sum += $load; 00173 } 00174 if ( $sum == 0 ) { 00175 # No appropriate DB servers except maybe the master and some slaves with zero load 00176 # Do NOT use the master 00177 # Instead, this function will return false, triggering read-only mode, 00178 # and a lagged slave will be used instead. 00179 return false; 00180 } 00181 00182 if ( count( $loads ) == 0 ) { 00183 return false; 00184 } 00185 00186 #wfDebugLog( 'connect', var_export( $loads, true ) ); 00187 00188 # Return a random representative of the remainder 00189 return $this->pickRandom( $loads ); 00190 } 00191 00202 function getReaderIndex( $group = false, $wiki = false ) { 00203 global $wgReadOnly, $wgDBClusterTimeout, $wgDBAvgStatusPoll, $wgDBtype; 00204 00205 # @todo FIXME: For now, only go through all this for mysql databases 00206 if ($wgDBtype != 'mysql') { 00207 return $this->getWriterIndex(); 00208 } 00209 00210 if ( count( $this->mServers ) == 1 ) { 00211 # Skip the load balancing if there's only one server 00212 return 0; 00213 } elseif ( $group === false and $this->mReadIndex >= 0 ) { 00214 # Shortcut if generic reader exists already 00215 return $this->mReadIndex; 00216 } 00217 00218 wfProfileIn( __METHOD__ ); 00219 00220 $totalElapsed = 0; 00221 00222 # convert from seconds to microseconds 00223 $timeout = $wgDBClusterTimeout * 1e6; 00224 00225 # Find the relevant load array 00226 if ( $group !== false ) { 00227 if ( isset( $this->mGroupLoads[$group] ) ) { 00228 $nonErrorLoads = $this->mGroupLoads[$group]; 00229 } else { 00230 # No loads for this group, return false and the caller can use some other group 00231 wfDebug( __METHOD__.": no loads for group $group\n" ); 00232 wfProfileOut( __METHOD__ ); 00233 return false; 00234 } 00235 } else { 00236 $nonErrorLoads = $this->mLoads; 00237 } 00238 00239 if ( !$nonErrorLoads ) { 00240 throw new MWException( "Empty server array given to LoadBalancer" ); 00241 } 00242 00243 # Scale the configured load ratios according to the dynamic load (if the load monitor supports it) 00244 $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki ); 00245 00246 $laggedSlaveMode = false; 00247 00248 # First try quickly looking through the available servers for a server that 00249 # meets our criteria 00250 do { 00251 $totalThreadsConnected = 0; 00252 $overloadedServers = 0; 00253 $currentLoads = $nonErrorLoads; 00254 while ( count( $currentLoads ) ) { 00255 if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) { 00256 $i = $this->pickRandom( $currentLoads ); 00257 } else { 00258 $i = $this->getRandomNonLagged( $currentLoads, $wiki ); 00259 if ( $i === false && count( $currentLoads ) != 0 ) { 00260 # All slaves lagged. Switch to read-only mode 00261 wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode\n" ); 00262 $wgReadOnly = 'The database has been automatically locked ' . 00263 'while the slave database servers catch up to the master'; 00264 $i = $this->pickRandom( $currentLoads ); 00265 $laggedSlaveMode = true; 00266 } 00267 } 00268 00269 if ( $i === false ) { 00270 # pickRandom() returned false 00271 # This is permanent and means the configuration or the load monitor 00272 # wants us to return false. 00273 wfDebugLog( 'connect', __METHOD__.": pickRandom() returned false\n" ); 00274 wfProfileOut( __METHOD__ ); 00275 return false; 00276 } 00277 00278 wfDebugLog( 'connect', __METHOD__.": Using reader #$i: {$this->mServers[$i]['host']}...\n" ); 00279 $conn = $this->openConnection( $i, $wiki ); 00280 00281 if ( !$conn ) { 00282 wfDebugLog( 'connect', __METHOD__.": Failed connecting to $i/$wiki\n" ); 00283 unset( $nonErrorLoads[$i] ); 00284 unset( $currentLoads[$i] ); 00285 continue; 00286 } 00287 00288 // Perform post-connection backoff 00289 $threshold = isset( $this->mServers[$i]['max threads'] ) 00290 ? $this->mServers[$i]['max threads'] : false; 00291 $backoff = $this->getLoadMonitor()->postConnectionBackoff( $conn, $threshold ); 00292 00293 // Decrement reference counter, we are finished with this connection. 00294 // It will be incremented for the caller later. 00295 if ( $wiki !== false ) { 00296 $this->reuseConnection( $conn ); 00297 } 00298 00299 if ( $backoff ) { 00300 # Post-connection overload, don't use this server for now 00301 $totalThreadsConnected += $backoff; 00302 $overloadedServers++; 00303 unset( $currentLoads[$i] ); 00304 } else { 00305 # Return this server 00306 break 2; 00307 } 00308 } 00309 00310 # No server found yet 00311 $i = false; 00312 00313 # If all servers were down, quit now 00314 if ( !count( $nonErrorLoads ) ) { 00315 wfDebugLog( 'connect', "All servers down\n" ); 00316 break; 00317 } 00318 00319 # Some servers must have been overloaded 00320 if ( $overloadedServers == 0 ) { 00321 throw new MWException( __METHOD__.": unexpectedly found no overloaded servers" ); 00322 } 00323 # Back off for a while 00324 # Scale the sleep time by the number of connected threads, to produce a 00325 # roughly constant global poll rate 00326 $avgThreads = $totalThreadsConnected / $overloadedServers; 00327 $totalElapsed += $this->sleep( $wgDBAvgStatusPoll * $avgThreads ); 00328 } while ( $totalElapsed < $timeout ); 00329 00330 if ( $totalElapsed >= $timeout ) { 00331 wfDebugLog( 'connect', "All servers busy\n" ); 00332 $this->mErrorConnection = false; 00333 $this->mLastError = 'All servers busy'; 00334 } 00335 00336 if ( $i !== false ) { 00337 # Slave connection successful 00338 # Wait for the session master pos for a short time 00339 if ( $this->mWaitForPos && $i > 0 ) { 00340 if ( !$this->doWait( $i ) ) { 00341 $this->mServers[$i]['slave pos'] = $conn->getSlavePos(); 00342 } 00343 } 00344 if ( $this->mReadIndex <=0 && $this->mLoads[$i]>0 && $i !== false ) { 00345 $this->mReadIndex = $i; 00346 } 00347 } 00348 wfProfileOut( __METHOD__ ); 00349 return $i; 00350 } 00351 00357 function sleep( $t ) { 00358 wfProfileIn( __METHOD__ ); 00359 wfDebug( __METHOD__.": waiting $t us\n" ); 00360 usleep( $t ); 00361 wfProfileOut( __METHOD__ ); 00362 return $t; 00363 } 00364 00371 public function waitFor( $pos ) { 00372 wfProfileIn( __METHOD__ ); 00373 $this->mWaitForPos = $pos; 00374 $i = $this->mReadIndex; 00375 00376 if ( $i > 0 ) { 00377 if ( !$this->doWait( $i ) ) { 00378 $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos(); 00379 $this->mLaggedSlaveMode = true; 00380 } 00381 } 00382 wfProfileOut( __METHOD__ ); 00383 } 00384 00389 public function waitForAll( $pos ) { 00390 wfProfileIn( __METHOD__ ); 00391 $this->mWaitForPos = $pos; 00392 for ( $i = 1; $i < count( $this->mServers ); $i++ ) { 00393 $this->doWait( $i , true ); 00394 } 00395 wfProfileOut( __METHOD__ ); 00396 } 00397 00405 function getAnyOpenConnection( $i ) { 00406 foreach ( $this->mConns as $conns ) { 00407 if ( !empty( $conns[$i] ) ) { 00408 return reset( $conns[$i] ); 00409 } 00410 } 00411 return false; 00412 } 00413 00420 function doWait( $index, $open = false ) { 00421 # Find a connection to wait on 00422 $conn = $this->getAnyOpenConnection( $index ); 00423 if ( !$conn ) { 00424 if ( !$open ) { 00425 wfDebug( __METHOD__ . ": no connection open\n" ); 00426 return false; 00427 } else { 00428 $conn = $this->openConnection( $index ); 00429 if ( !$conn ) { 00430 wfDebug( __METHOD__ . ": failed to open connection\n" ); 00431 return false; 00432 } 00433 } 00434 } 00435 00436 wfDebug( __METHOD__.": Waiting for slave #$index to catch up...\n" ); 00437 $result = $conn->masterPosWait( $this->mWaitForPos, $this->mWaitTimeout ); 00438 00439 if ( $result == -1 || is_null( $result ) ) { 00440 # Timed out waiting for slave, use master instead 00441 wfDebug( __METHOD__.": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); 00442 return false; 00443 } else { 00444 wfDebug( __METHOD__.": Done\n" ); 00445 return true; 00446 } 00447 } 00448 00459 public function &getConnection( $i, $groups = array(), $wiki = false ) { 00460 wfProfileIn( __METHOD__ ); 00461 00462 if ( $i == DB_LAST ) { 00463 throw new MWException( 'Attempt to call ' . __METHOD__ . ' with deprecated server index DB_LAST' ); 00464 } elseif ( $i === null || $i === false ) { 00465 throw new MWException( 'Attempt to call ' . __METHOD__ . ' with invalid server index' ); 00466 } 00467 00468 if ( $wiki === wfWikiID() ) { 00469 $wiki = false; 00470 } 00471 00472 # Query groups 00473 if ( $i == DB_MASTER ) { 00474 $i = $this->getWriterIndex(); 00475 } elseif ( !is_array( $groups ) ) { 00476 $groupIndex = $this->getReaderIndex( $groups, $wiki ); 00477 if ( $groupIndex !== false ) { 00478 $serverName = $this->getServerName( $groupIndex ); 00479 wfDebug( __METHOD__.": using server $serverName for group $groups\n" ); 00480 $i = $groupIndex; 00481 } 00482 } else { 00483 foreach ( $groups as $group ) { 00484 $groupIndex = $this->getReaderIndex( $group, $wiki ); 00485 if ( $groupIndex !== false ) { 00486 $serverName = $this->getServerName( $groupIndex ); 00487 wfDebug( __METHOD__.": using server $serverName for group $group\n" ); 00488 $i = $groupIndex; 00489 break; 00490 } 00491 } 00492 } 00493 00494 # Operation-based index 00495 if ( $i == DB_SLAVE ) { 00496 $i = $this->getReaderIndex( false, $wiki ); 00497 # Couldn't find a working server in getReaderIndex()? 00498 if ( $i === false ) { 00499 $this->mLastError = 'No working slave server: ' . $this->mLastError; 00500 $this->reportConnectionError( $this->mErrorConnection ); 00501 wfProfileOut( __METHOD__ ); 00502 return false; 00503 } 00504 } 00505 00506 # Now we have an explicit index into the servers array 00507 $conn = $this->openConnection( $i, $wiki ); 00508 if ( !$conn ) { 00509 $this->reportConnectionError( $this->mErrorConnection ); 00510 } 00511 00512 wfProfileOut( __METHOD__ ); 00513 return $conn; 00514 } 00515 00523 public function reuseConnection( $conn ) { 00524 $serverIndex = $conn->getLBInfo('serverIndex'); 00525 $refCount = $conn->getLBInfo('foreignPoolRefCount'); 00526 $dbName = $conn->getDBname(); 00527 $prefix = $conn->tablePrefix(); 00528 if ( strval( $prefix ) !== '' ) { 00529 $wiki = "$dbName-$prefix"; 00530 } else { 00531 $wiki = $dbName; 00532 } 00533 if ( $serverIndex === null || $refCount === null ) { 00534 wfDebug( __METHOD__.": this connection was not opened as a foreign connection\n" ); 00545 return; 00546 } 00547 if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) { 00548 throw new MWException( __METHOD__.": connection not found, has the connection been freed already?" ); 00549 } 00550 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); 00551 if ( $refCount <= 0 ) { 00552 $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn; 00553 unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] ); 00554 wfDebug( __METHOD__.": freed connection $serverIndex/$wiki\n" ); 00555 } else { 00556 wfDebug( __METHOD__.": reference count for $serverIndex/$wiki reduced to $refCount\n" ); 00557 } 00558 } 00559 00574 function openConnection( $i, $wiki = false ) { 00575 wfProfileIn( __METHOD__ ); 00576 if ( $wiki !== false ) { 00577 $conn = $this->openForeignConnection( $i, $wiki ); 00578 wfProfileOut( __METHOD__); 00579 return $conn; 00580 } 00581 if ( isset( $this->mConns['local'][$i][0] ) ) { 00582 $conn = $this->mConns['local'][$i][0]; 00583 } else { 00584 $server = $this->mServers[$i]; 00585 $server['serverIndex'] = $i; 00586 $conn = $this->reallyOpenConnection( $server, false ); 00587 if ( $conn->isOpen() ) { 00588 $this->mConns['local'][$i][0] = $conn; 00589 } else { 00590 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" ); 00591 $this->mErrorConnection = $conn; 00592 $conn = false; 00593 } 00594 } 00595 wfProfileOut( __METHOD__ ); 00596 return $conn; 00597 } 00598 00617 function openForeignConnection( $i, $wiki ) { 00618 wfProfileIn(__METHOD__); 00619 list( $dbName, $prefix ) = wfSplitWikiID( $wiki ); 00620 if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) { 00621 // Reuse an already-used connection 00622 $conn = $this->mConns['foreignUsed'][$i][$wiki]; 00623 wfDebug( __METHOD__.": reusing connection $i/$wiki\n" ); 00624 } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) { 00625 // Reuse a free connection for the same wiki 00626 $conn = $this->mConns['foreignFree'][$i][$wiki]; 00627 unset( $this->mConns['foreignFree'][$i][$wiki] ); 00628 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00629 wfDebug( __METHOD__.": reusing free connection $i/$wiki\n" ); 00630 } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) { 00631 // Reuse a connection from another wiki 00632 $conn = reset( $this->mConns['foreignFree'][$i] ); 00633 $oldWiki = key( $this->mConns['foreignFree'][$i] ); 00634 00635 if ( !$conn->selectDB( $dbName ) ) { 00636 $this->mLastError = "Error selecting database $dbName on server " . 00637 $conn->getServer() . " from client host " . wfHostname() . "\n"; 00638 $this->mErrorConnection = $conn; 00639 $conn = false; 00640 } else { 00641 $conn->tablePrefix( $prefix ); 00642 unset( $this->mConns['foreignFree'][$i][$oldWiki] ); 00643 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00644 wfDebug( __METHOD__.": reusing free connection from $oldWiki for $wiki\n" ); 00645 } 00646 } else { 00647 // Open a new connection 00648 $server = $this->mServers[$i]; 00649 $server['serverIndex'] = $i; 00650 $server['foreignPoolRefCount'] = 0; 00651 $conn = $this->reallyOpenConnection( $server, $dbName ); 00652 if ( !$conn->isOpen() ) { 00653 wfDebug( __METHOD__.": error opening connection for $i/$wiki\n" ); 00654 $this->mErrorConnection = $conn; 00655 $conn = false; 00656 } else { 00657 $conn->tablePrefix( $prefix ); 00658 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00659 wfDebug( __METHOD__.": opened new connection for $i/$wiki\n" ); 00660 } 00661 } 00662 00663 // Increment reference count 00664 if ( $conn ) { 00665 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00666 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 ); 00667 } 00668 wfProfileOut(__METHOD__); 00669 return $conn; 00670 } 00671 00679 function isOpen( $index ) { 00680 if( !is_integer( $index ) ) { 00681 return false; 00682 } 00683 return (bool)$this->getAnyOpenConnection( $index ); 00684 } 00685 00695 function reallyOpenConnection( $server, $dbNameOverride = false ) { 00696 if( !is_array( $server ) ) { 00697 throw new MWException( 'You must update your load-balancing configuration. ' . 00698 'See DefaultSettings.php entry for $wgDBservers.' ); 00699 } 00700 00701 $host = $server['host']; 00702 $dbname = $server['dbname']; 00703 00704 if ( $dbNameOverride !== false ) { 00705 $server['dbname'] = $dbname = $dbNameOverride; 00706 } 00707 00708 # Create object 00709 wfDebug( "Connecting to $host $dbname...\n" ); 00710 try { 00711 $db = DatabaseBase::factory( $server['type'], $server ); 00712 } catch ( DBConnectionError $e ) { 00713 // FIXME: This is probably the ugliest thing I have ever done to 00714 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS 00715 $db = $e->db; 00716 } 00717 00718 if ( $db->isOpen() ) { 00719 wfDebug( "Connected to $host $dbname.\n" ); 00720 } else { 00721 wfDebug( "Connection failed to $host $dbname.\n" ); 00722 } 00723 $db->setLBInfo( $server ); 00724 if ( isset( $server['fakeSlaveLag'] ) ) { 00725 $db->setFakeSlaveLag( $server['fakeSlaveLag'] ); 00726 } 00727 if ( isset( $server['fakeMaster'] ) ) { 00728 $db->setFakeMaster( true ); 00729 } 00730 return $db; 00731 } 00732 00737 function reportConnectionError( &$conn ) { 00738 wfProfileIn( __METHOD__ ); 00739 00740 if ( !is_object( $conn ) ) { 00741 // No last connection, probably due to all servers being too busy 00742 wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}\n" ); 00743 $conn = new Database; 00744 // If all servers were busy, mLastError will contain something sensible 00745 throw new DBConnectionError( $conn, $this->mLastError ); 00746 } else { 00747 $server = $conn->getProperty( 'mServer' ); 00748 wfLogDBError( "Connection error: {$this->mLastError} ({$server})\n" ); 00749 $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); 00750 } 00751 wfProfileOut( __METHOD__ ); 00752 } 00753 00757 function getWriterIndex() { 00758 return 0; 00759 } 00760 00767 function haveIndex( $i ) { 00768 return array_key_exists( $i, $this->mServers ); 00769 } 00770 00777 function isNonZeroLoad( $i ) { 00778 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0; 00779 } 00780 00786 function getServerCount() { 00787 return count( $this->mServers ); 00788 } 00789 00796 function getServerName( $i ) { 00797 if ( isset( $this->mServers[$i]['hostName'] ) ) { 00798 return $this->mServers[$i]['hostName']; 00799 } elseif ( isset( $this->mServers[$i]['host'] ) ) { 00800 return $this->mServers[$i]['host']; 00801 } else { 00802 return ''; 00803 } 00804 } 00805 00811 function getServerInfo( $i ) { 00812 if ( isset( $this->mServers[$i] ) ) { 00813 return $this->mServers[$i]; 00814 } else { 00815 return false; 00816 } 00817 } 00818 00824 function setServerInfo( $i, $serverInfo ) { 00825 $this->mServers[$i] = $serverInfo; 00826 } 00827 00832 function getMasterPos() { 00833 # If this entire request was served from a slave without opening a connection to the 00834 # master (however unlikely that may be), then we can fetch the position from the slave. 00835 $masterConn = $this->getAnyOpenConnection( 0 ); 00836 if ( !$masterConn ) { 00837 for ( $i = 1; $i < count( $this->mServers ); $i++ ) { 00838 $conn = $this->getAnyOpenConnection( $i ); 00839 if ( $conn ) { 00840 wfDebug( "Master pos fetched from slave\n" ); 00841 return $conn->getSlavePos(); 00842 } 00843 } 00844 } else { 00845 wfDebug( "Master pos fetched from master\n" ); 00846 return $masterConn->getMasterPos(); 00847 } 00848 return false; 00849 } 00850 00854 function closeAll() { 00855 foreach ( $this->mConns as $conns2 ) { 00856 foreach ( $conns2 as $conns3 ) { 00857 foreach ( $conns3 as $conn ) { 00858 $conn->close(); 00859 } 00860 } 00861 } 00862 $this->mConns = array( 00863 'local' => array(), 00864 'foreignFree' => array(), 00865 'foreignUsed' => array(), 00866 ); 00867 } 00868 00875 function closeConnecton( $conn ) { 00876 wfDeprecated( __METHOD__, '1.18' ); 00877 $this->closeConnection( $conn ); 00878 } 00879 00886 function closeConnection( $conn ) { 00887 $done = false; 00888 foreach ( $this->mConns as $i1 => $conns2 ) { 00889 foreach ( $conns2 as $i2 => $conns3 ) { 00890 foreach ( $conns3 as $i3 => $candidateConn ) { 00891 if ( $conn === $candidateConn ) { 00892 $conn->close(); 00893 unset( $this->mConns[$i1][$i2][$i3] ); 00894 $done = true; 00895 break; 00896 } 00897 } 00898 } 00899 } 00900 if ( !$done ) { 00901 $conn->close(); 00902 } 00903 } 00904 00908 function commitAll() { 00909 foreach ( $this->mConns as $conns2 ) { 00910 foreach ( $conns2 as $conns3 ) { 00911 foreach ( $conns3 as $conn ) { 00912 $conn->commit( __METHOD__ ); 00913 } 00914 } 00915 } 00916 } 00917 00921 function commitMasterChanges() { 00922 // Always 0, but who knows.. :) 00923 $masterIndex = $this->getWriterIndex(); 00924 foreach ( $this->mConns as $conns2 ) { 00925 if ( empty( $conns2[$masterIndex] ) ) { 00926 continue; 00927 } 00928 foreach ( $conns2[$masterIndex] as $conn ) { 00929 if ( $conn->writesOrCallbacksPending() ) { 00930 $conn->commit( __METHOD__ ); 00931 } 00932 } 00933 } 00934 } 00935 00940 function waitTimeout( $value = null ) { 00941 return wfSetVar( $this->mWaitTimeout, $value ); 00942 } 00943 00947 function getLaggedSlaveMode() { 00948 return $this->mLaggedSlaveMode; 00949 } 00950 00956 function allowLagged( $mode = null ) { 00957 if ( $mode === null) { 00958 return $this->mAllowLagged; 00959 } 00960 $this->mAllowLagged = $mode; 00961 } 00962 00966 function pingAll() { 00967 $success = true; 00968 foreach ( $this->mConns as $conns2 ) { 00969 foreach ( $conns2 as $conns3 ) { 00970 foreach ( $conns3 as $conn ) { 00971 if ( !$conn->ping() ) { 00972 $success = false; 00973 } 00974 } 00975 } 00976 } 00977 return $success; 00978 } 00979 00985 function forEachOpenConnection( $callback, $params = array() ) { 00986 foreach ( $this->mConns as $conns2 ) { 00987 foreach ( $conns2 as $conns3 ) { 00988 foreach ( $conns3 as $conn ) { 00989 $mergedParams = array_merge( array( $conn ), $params ); 00990 call_user_func_array( $callback, $mergedParams ); 00991 } 00992 } 00993 } 00994 } 00995 01006 function getMaxLag( $wiki = false ) { 01007 $maxLag = -1; 01008 $host = ''; 01009 $maxIndex = 0; 01010 if ( $this->getServerCount() > 1 ) { // no replication = no lag 01011 foreach ( $this->mServers as $i => $conn ) { 01012 $conn = false; 01013 if ( $wiki === false ) { 01014 $conn = $this->getAnyOpenConnection( $i ); 01015 } 01016 if ( !$conn ) { 01017 $conn = $this->openConnection( $i, $wiki ); 01018 } 01019 if ( !$conn ) { 01020 continue; 01021 } 01022 $lag = $conn->getLag(); 01023 if ( $lag > $maxLag ) { 01024 $maxLag = $lag; 01025 $host = $this->mServers[$i]['host']; 01026 $maxIndex = $i; 01027 } 01028 } 01029 } 01030 return array( $host, $maxLag, $maxIndex ); 01031 } 01032 01041 function getLagTimes( $wiki = false ) { 01042 # Try process cache 01043 if ( isset( $this->mLagTimes ) ) { 01044 return $this->mLagTimes; 01045 } 01046 if ( $this->getServerCount() == 1 ) { 01047 # No replication 01048 $this->mLagTimes = array( 0 => 0 ); 01049 } else { 01050 # Send the request to the load monitor 01051 $this->mLagTimes = $this->getLoadMonitor()->getLagTimes( 01052 array_keys( $this->mServers ), $wiki ); 01053 } 01054 return $this->mLagTimes; 01055 } 01056 01072 function safeGetLag( $conn ) { 01073 if ( $this->getServerCount() == 1 ) { 01074 return 0; 01075 } else { 01076 return $conn->getLag(); 01077 } 01078 } 01079 01083 function clearLagTimeCache() { 01084 $this->mLagTimes = null; 01085 } 01086 }