MediaWiki
REL1_19
|
00001 <?php 00015 class LoadBalancer { 00016 private $mServers, $mConns, $mLoads, $mGroupLoads; 00017 private $mErrorConnection; 00018 private $mReadIndex, $mAllowLagged; 00019 private $mWaitForPos, $mWaitTimeout; 00020 private $mLaggedSlaveMode, $mLastError = 'Unknown error'; 00021 private $mParentInfo, $mLagTimes; 00022 private $mLoadMonitorClass, $mLoadMonitor; 00023 00030 function __construct( $params ) { 00031 if ( !isset( $params['servers'] ) ) { 00032 throw new MWException( __CLASS__.': missing servers parameter' ); 00033 } 00034 $this->mServers = $params['servers']; 00035 00036 if ( isset( $params['waitTimeout'] ) ) { 00037 $this->mWaitTimeout = $params['waitTimeout']; 00038 } else { 00039 $this->mWaitTimeout = 10; 00040 } 00041 00042 $this->mReadIndex = -1; 00043 $this->mWriteIndex = -1; 00044 $this->mConns = array( 00045 'local' => array(), 00046 'foreignUsed' => array(), 00047 'foreignFree' => array() ); 00048 $this->mLoads = array(); 00049 $this->mWaitForPos = false; 00050 $this->mLaggedSlaveMode = false; 00051 $this->mErrorConnection = false; 00052 $this->mAllowLagged = false; 00053 00054 if ( isset( $params['loadMonitor'] ) ) { 00055 $this->mLoadMonitorClass = $params['loadMonitor']; 00056 } else { 00057 $master = reset( $params['servers'] ); 00058 if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) { 00059 $this->mLoadMonitorClass = 'LoadMonitor_MySQL'; 00060 } else { 00061 $this->mLoadMonitorClass = 'LoadMonitor_Null'; 00062 } 00063 } 00064 00065 foreach( $params['servers'] as $i => $server ) { 00066 $this->mLoads[$i] = $server['load']; 00067 if ( isset( $server['groupLoads'] ) ) { 00068 foreach ( $server['groupLoads'] as $group => $ratio ) { 00069 if ( !isset( $this->mGroupLoads[$group] ) ) { 00070 $this->mGroupLoads[$group] = array(); 00071 } 00072 $this->mGroupLoads[$group][$i] = $ratio; 00073 } 00074 } 00075 } 00076 } 00077 00083 function getLoadMonitor() { 00084 if ( !isset( $this->mLoadMonitor ) ) { 00085 $class = $this->mLoadMonitorClass; 00086 $this->mLoadMonitor = new $class( $this ); 00087 } 00088 return $this->mLoadMonitor; 00089 } 00090 00096 function parentInfo( $x = null ) { 00097 return wfSetVar( $this->mParentInfo, $x ); 00098 } 00099 00108 function pickRandom( $weights ) { 00109 if ( !is_array( $weights ) || count( $weights ) == 0 ) { 00110 return false; 00111 } 00112 00113 $sum = array_sum( $weights ); 00114 if ( $sum == 0 ) { 00115 # No loads on any of them 00116 # In previous versions, this triggered an unweighted random selection, 00117 # but this feature has been removed as of April 2006 to allow for strict 00118 # separation of query groups. 00119 return false; 00120 } 00121 $max = mt_getrandmax(); 00122 $rand = mt_rand( 0, $max ) / $max * $sum; 00123 00124 $sum = 0; 00125 foreach ( $weights as $i => $w ) { 00126 $sum += $w; 00127 if ( $sum >= $rand ) { 00128 break; 00129 } 00130 } 00131 return $i; 00132 } 00133 00139 function getRandomNonLagged( $loads, $wiki = false ) { 00140 # Unset excessively lagged servers 00141 $lags = $this->getLagTimes( $wiki ); 00142 foreach ( $lags as $i => $lag ) { 00143 if ( $i != 0 ) { 00144 if ( $lag === false ) { 00145 wfDebugLog( 'replication', "Server #$i is not replicating\n" ); 00146 unset( $loads[$i] ); 00147 } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) { 00148 wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)\n" ); 00149 unset( $loads[$i] ); 00150 } 00151 } 00152 } 00153 00154 # Find out if all the slaves with non-zero load are lagged 00155 $sum = 0; 00156 foreach ( $loads as $load ) { 00157 $sum += $load; 00158 } 00159 if ( $sum == 0 ) { 00160 # No appropriate DB servers except maybe the master and some slaves with zero load 00161 # Do NOT use the master 00162 # Instead, this function will return false, triggering read-only mode, 00163 # and a lagged slave will be used instead. 00164 return false; 00165 } 00166 00167 if ( count( $loads ) == 0 ) { 00168 return false; 00169 } 00170 00171 #wfDebugLog( 'connect', var_export( $loads, true ) ); 00172 00173 # Return a random representative of the remainder 00174 return $this->pickRandom( $loads ); 00175 } 00176 00187 function getReaderIndex( $group = false, $wiki = false ) { 00188 global $wgReadOnly, $wgDBClusterTimeout, $wgDBAvgStatusPoll, $wgDBtype; 00189 00190 # @todo FIXME: For now, only go through all this for mysql databases 00191 if ($wgDBtype != 'mysql') { 00192 return $this->getWriterIndex(); 00193 } 00194 00195 if ( count( $this->mServers ) == 1 ) { 00196 # Skip the load balancing if there's only one server 00197 return 0; 00198 } elseif ( $group === false and $this->mReadIndex >= 0 ) { 00199 # Shortcut if generic reader exists already 00200 return $this->mReadIndex; 00201 } 00202 00203 wfProfileIn( __METHOD__ ); 00204 00205 $totalElapsed = 0; 00206 00207 # convert from seconds to microseconds 00208 $timeout = $wgDBClusterTimeout * 1e6; 00209 00210 # Find the relevant load array 00211 if ( $group !== false ) { 00212 if ( isset( $this->mGroupLoads[$group] ) ) { 00213 $nonErrorLoads = $this->mGroupLoads[$group]; 00214 } else { 00215 # No loads for this group, return false and the caller can use some other group 00216 wfDebug( __METHOD__.": no loads for group $group\n" ); 00217 wfProfileOut( __METHOD__ ); 00218 return false; 00219 } 00220 } else { 00221 $nonErrorLoads = $this->mLoads; 00222 } 00223 00224 if ( !$nonErrorLoads ) { 00225 throw new MWException( "Empty server array given to LoadBalancer" ); 00226 } 00227 00228 # Scale the configured load ratios according to the dynamic load (if the load monitor supports it) 00229 $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki ); 00230 00231 $laggedSlaveMode = false; 00232 00233 # First try quickly looking through the available servers for a server that 00234 # meets our criteria 00235 do { 00236 $totalThreadsConnected = 0; 00237 $overloadedServers = 0; 00238 $currentLoads = $nonErrorLoads; 00239 while ( count( $currentLoads ) ) { 00240 if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) { 00241 $i = $this->pickRandom( $currentLoads ); 00242 } else { 00243 $i = $this->getRandomNonLagged( $currentLoads, $wiki ); 00244 if ( $i === false && count( $currentLoads ) != 0 ) { 00245 # All slaves lagged. Switch to read-only mode 00246 wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode\n" ); 00247 $wgReadOnly = 'The database has been automatically locked ' . 00248 'while the slave database servers catch up to the master'; 00249 $i = $this->pickRandom( $currentLoads ); 00250 $laggedSlaveMode = true; 00251 } 00252 } 00253 00254 if ( $i === false ) { 00255 # pickRandom() returned false 00256 # This is permanent and means the configuration or the load monitor 00257 # wants us to return false. 00258 wfDebugLog( 'connect', __METHOD__.": pickRandom() returned false\n" ); 00259 wfProfileOut( __METHOD__ ); 00260 return false; 00261 } 00262 00263 wfDebugLog( 'connect', __METHOD__.": Using reader #$i: {$this->mServers[$i]['host']}...\n" ); 00264 $conn = $this->openConnection( $i, $wiki ); 00265 00266 if ( !$conn ) { 00267 wfDebugLog( 'connect', __METHOD__.": Failed connecting to $i/$wiki\n" ); 00268 unset( $nonErrorLoads[$i] ); 00269 unset( $currentLoads[$i] ); 00270 continue; 00271 } 00272 00273 // Perform post-connection backoff 00274 $threshold = isset( $this->mServers[$i]['max threads'] ) 00275 ? $this->mServers[$i]['max threads'] : false; 00276 $backoff = $this->getLoadMonitor()->postConnectionBackoff( $conn, $threshold ); 00277 00278 // Decrement reference counter, we are finished with this connection. 00279 // It will be incremented for the caller later. 00280 if ( $wiki !== false ) { 00281 $this->reuseConnection( $conn ); 00282 } 00283 00284 if ( $backoff ) { 00285 # Post-connection overload, don't use this server for now 00286 $totalThreadsConnected += $backoff; 00287 $overloadedServers++; 00288 unset( $currentLoads[$i] ); 00289 } else { 00290 # Return this server 00291 break 2; 00292 } 00293 } 00294 00295 # No server found yet 00296 $i = false; 00297 00298 # If all servers were down, quit now 00299 if ( !count( $nonErrorLoads ) ) { 00300 wfDebugLog( 'connect', "All servers down\n" ); 00301 break; 00302 } 00303 00304 # Some servers must have been overloaded 00305 if ( $overloadedServers == 0 ) { 00306 throw new MWException( __METHOD__.": unexpectedly found no overloaded servers" ); 00307 } 00308 # Back off for a while 00309 # Scale the sleep time by the number of connected threads, to produce a 00310 # roughly constant global poll rate 00311 $avgThreads = $totalThreadsConnected / $overloadedServers; 00312 $totalElapsed += $this->sleep( $wgDBAvgStatusPoll * $avgThreads ); 00313 } while ( $totalElapsed < $timeout ); 00314 00315 if ( $totalElapsed >= $timeout ) { 00316 wfDebugLog( 'connect', "All servers busy\n" ); 00317 $this->mErrorConnection = false; 00318 $this->mLastError = 'All servers busy'; 00319 } 00320 00321 if ( $i !== false ) { 00322 # Slave connection successful 00323 # Wait for the session master pos for a short time 00324 if ( $this->mWaitForPos && $i > 0 ) { 00325 if ( !$this->doWait( $i ) ) { 00326 $this->mServers[$i]['slave pos'] = $conn->getSlavePos(); 00327 } 00328 } 00329 if ( $this->mReadIndex <=0 && $this->mLoads[$i]>0 && $i !== false ) { 00330 $this->mReadIndex = $i; 00331 } 00332 } 00333 wfProfileOut( __METHOD__ ); 00334 return $i; 00335 } 00336 00342 function sleep( $t ) { 00343 wfProfileIn( __METHOD__ ); 00344 wfDebug( __METHOD__.": waiting $t us\n" ); 00345 usleep( $t ); 00346 wfProfileOut( __METHOD__ ); 00347 return $t; 00348 } 00349 00356 public function waitFor( $pos ) { 00357 wfProfileIn( __METHOD__ ); 00358 $this->mWaitForPos = $pos; 00359 $i = $this->mReadIndex; 00360 00361 if ( $i > 0 ) { 00362 if ( !$this->doWait( $i ) ) { 00363 $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos(); 00364 $this->mLaggedSlaveMode = true; 00365 } 00366 } 00367 wfProfileOut( __METHOD__ ); 00368 } 00369 00374 public function waitForAll( $pos ) { 00375 wfProfileIn( __METHOD__ ); 00376 $this->mWaitForPos = $pos; 00377 for ( $i = 1; $i < count( $this->mServers ); $i++ ) { 00378 $this->doWait( $i , true ); 00379 } 00380 wfProfileOut( __METHOD__ ); 00381 } 00382 00390 function getAnyOpenConnection( $i ) { 00391 foreach ( $this->mConns as $conns ) { 00392 if ( !empty( $conns[$i] ) ) { 00393 return reset( $conns[$i] ); 00394 } 00395 } 00396 return false; 00397 } 00398 00405 function doWait( $index, $open = false ) { 00406 # Find a connection to wait on 00407 $conn = $this->getAnyOpenConnection( $index ); 00408 if ( !$conn ) { 00409 if ( !$open ) { 00410 wfDebug( __METHOD__ . ": no connection open\n" ); 00411 return false; 00412 } else { 00413 $conn = $this->openConnection( $index ); 00414 if ( !$conn ) { 00415 wfDebug( __METHOD__ . ": failed to open connection\n" ); 00416 return false; 00417 } 00418 } 00419 } 00420 00421 wfDebug( __METHOD__.": Waiting for slave #$index to catch up...\n" ); 00422 $result = $conn->masterPosWait( $this->mWaitForPos, $this->mWaitTimeout ); 00423 00424 if ( $result == -1 || is_null( $result ) ) { 00425 # Timed out waiting for slave, use master instead 00426 wfDebug( __METHOD__.": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); 00427 return false; 00428 } else { 00429 wfDebug( __METHOD__.": Done\n" ); 00430 return true; 00431 } 00432 } 00433 00444 public function &getConnection( $i, $groups = array(), $wiki = false ) { 00445 wfProfileIn( __METHOD__ ); 00446 00447 if ( $i == DB_LAST ) { 00448 throw new MWException( 'Attempt to call ' . __METHOD__ . ' with deprecated server index DB_LAST' ); 00449 } elseif ( $i === null || $i === false ) { 00450 throw new MWException( 'Attempt to call ' . __METHOD__ . ' with invalid server index' ); 00451 } 00452 00453 if ( $wiki === wfWikiID() ) { 00454 $wiki = false; 00455 } 00456 00457 # Query groups 00458 if ( $i == DB_MASTER ) { 00459 $i = $this->getWriterIndex(); 00460 } elseif ( !is_array( $groups ) ) { 00461 $groupIndex = $this->getReaderIndex( $groups, $wiki ); 00462 if ( $groupIndex !== false ) { 00463 $serverName = $this->getServerName( $groupIndex ); 00464 wfDebug( __METHOD__.": using server $serverName for group $groups\n" ); 00465 $i = $groupIndex; 00466 } 00467 } else { 00468 foreach ( $groups as $group ) { 00469 $groupIndex = $this->getReaderIndex( $group, $wiki ); 00470 if ( $groupIndex !== false ) { 00471 $serverName = $this->getServerName( $groupIndex ); 00472 wfDebug( __METHOD__.": using server $serverName for group $group\n" ); 00473 $i = $groupIndex; 00474 break; 00475 } 00476 } 00477 } 00478 00479 # Operation-based index 00480 if ( $i == DB_SLAVE ) { 00481 $i = $this->getReaderIndex( false, $wiki ); 00482 # Couldn't find a working server in getReaderIndex()? 00483 if ( $i === false ) { 00484 $this->mLastError = 'No working slave server: ' . $this->mLastError; 00485 $this->reportConnectionError( $this->mErrorConnection ); 00486 wfProfileOut( __METHOD__ ); 00487 return false; 00488 } 00489 } 00490 00491 # Now we have an explicit index into the servers array 00492 $conn = $this->openConnection( $i, $wiki ); 00493 if ( !$conn ) { 00494 $this->reportConnectionError( $this->mErrorConnection ); 00495 } 00496 00497 wfProfileOut( __METHOD__ ); 00498 return $conn; 00499 } 00500 00508 public function reuseConnection( $conn ) { 00509 $serverIndex = $conn->getLBInfo('serverIndex'); 00510 $refCount = $conn->getLBInfo('foreignPoolRefCount'); 00511 $dbName = $conn->getDBname(); 00512 $prefix = $conn->tablePrefix(); 00513 if ( strval( $prefix ) !== '' ) { 00514 $wiki = "$dbName-$prefix"; 00515 } else { 00516 $wiki = $dbName; 00517 } 00518 if ( $serverIndex === null || $refCount === null ) { 00519 wfDebug( __METHOD__.": this connection was not opened as a foreign connection\n" ); 00530 return; 00531 } 00532 if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) { 00533 throw new MWException( __METHOD__.": connection not found, has the connection been freed already?" ); 00534 } 00535 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); 00536 if ( $refCount <= 0 ) { 00537 $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn; 00538 unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] ); 00539 wfDebug( __METHOD__.": freed connection $serverIndex/$wiki\n" ); 00540 } else { 00541 wfDebug( __METHOD__.": reference count for $serverIndex/$wiki reduced to $refCount\n" ); 00542 } 00543 } 00544 00559 function openConnection( $i, $wiki = false ) { 00560 wfProfileIn( __METHOD__ ); 00561 if ( $wiki !== false ) { 00562 $conn = $this->openForeignConnection( $i, $wiki ); 00563 wfProfileOut( __METHOD__); 00564 return $conn; 00565 } 00566 if ( isset( $this->mConns['local'][$i][0] ) ) { 00567 $conn = $this->mConns['local'][$i][0]; 00568 } else { 00569 $server = $this->mServers[$i]; 00570 $server['serverIndex'] = $i; 00571 $conn = $this->reallyOpenConnection( $server, false ); 00572 if ( $conn->isOpen() ) { 00573 $this->mConns['local'][$i][0] = $conn; 00574 } else { 00575 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" ); 00576 $this->mErrorConnection = $conn; 00577 $conn = false; 00578 } 00579 } 00580 wfProfileOut( __METHOD__ ); 00581 return $conn; 00582 } 00583 00602 function openForeignConnection( $i, $wiki ) { 00603 wfProfileIn(__METHOD__); 00604 list( $dbName, $prefix ) = wfSplitWikiID( $wiki ); 00605 if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) { 00606 // Reuse an already-used connection 00607 $conn = $this->mConns['foreignUsed'][$i][$wiki]; 00608 wfDebug( __METHOD__.": reusing connection $i/$wiki\n" ); 00609 } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) { 00610 // Reuse a free connection for the same wiki 00611 $conn = $this->mConns['foreignFree'][$i][$wiki]; 00612 unset( $this->mConns['foreignFree'][$i][$wiki] ); 00613 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00614 wfDebug( __METHOD__.": reusing free connection $i/$wiki\n" ); 00615 } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) { 00616 // Reuse a connection from another wiki 00617 $conn = reset( $this->mConns['foreignFree'][$i] ); 00618 $oldWiki = key( $this->mConns['foreignFree'][$i] ); 00619 00620 if ( !$conn->selectDB( $dbName ) ) { 00621 $this->mLastError = "Error selecting database $dbName on server " . 00622 $conn->getServer() . " from client host " . wfHostname() . "\n"; 00623 $this->mErrorConnection = $conn; 00624 $conn = false; 00625 } else { 00626 $conn->tablePrefix( $prefix ); 00627 unset( $this->mConns['foreignFree'][$i][$oldWiki] ); 00628 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00629 wfDebug( __METHOD__.": reusing free connection from $oldWiki for $wiki\n" ); 00630 } 00631 } else { 00632 // Open a new connection 00633 $server = $this->mServers[$i]; 00634 $server['serverIndex'] = $i; 00635 $server['foreignPoolRefCount'] = 0; 00636 $conn = $this->reallyOpenConnection( $server, $dbName ); 00637 if ( !$conn->isOpen() ) { 00638 wfDebug( __METHOD__.": error opening connection for $i/$wiki\n" ); 00639 $this->mErrorConnection = $conn; 00640 $conn = false; 00641 } else { 00642 $conn->tablePrefix( $prefix ); 00643 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00644 wfDebug( __METHOD__.": opened new connection for $i/$wiki\n" ); 00645 } 00646 } 00647 00648 // Increment reference count 00649 if ( $conn ) { 00650 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00651 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 ); 00652 } 00653 wfProfileOut(__METHOD__); 00654 return $conn; 00655 } 00656 00664 function isOpen( $index ) { 00665 if( !is_integer( $index ) ) { 00666 return false; 00667 } 00668 return (bool)$this->getAnyOpenConnection( $index ); 00669 } 00670 00680 function reallyOpenConnection( $server, $dbNameOverride = false ) { 00681 if( !is_array( $server ) ) { 00682 throw new MWException( 'You must update your load-balancing configuration. ' . 00683 'See DefaultSettings.php entry for $wgDBservers.' ); 00684 } 00685 00686 $host = $server['host']; 00687 $dbname = $server['dbname']; 00688 00689 if ( $dbNameOverride !== false ) { 00690 $server['dbname'] = $dbname = $dbNameOverride; 00691 } 00692 00693 # Create object 00694 wfDebug( "Connecting to $host $dbname...\n" ); 00695 try { 00696 $db = DatabaseBase::factory( $server['type'], $server ); 00697 } catch ( DBConnectionError $e ) { 00698 // FIXME: This is probably the ugliest thing I have ever done to 00699 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS 00700 $db = $e->db; 00701 } 00702 00703 if ( $db->isOpen() ) { 00704 wfDebug( "Connected to $host $dbname.\n" ); 00705 } else { 00706 wfDebug( "Connection failed to $host $dbname.\n" ); 00707 } 00708 $db->setLBInfo( $server ); 00709 if ( isset( $server['fakeSlaveLag'] ) ) { 00710 $db->setFakeSlaveLag( $server['fakeSlaveLag'] ); 00711 } 00712 if ( isset( $server['fakeMaster'] ) ) { 00713 $db->setFakeMaster( true ); 00714 } 00715 return $db; 00716 } 00717 00722 function reportConnectionError( &$conn ) { 00723 wfProfileIn( __METHOD__ ); 00724 00725 if ( !is_object( $conn ) ) { 00726 // No last connection, probably due to all servers being too busy 00727 wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}\n" ); 00728 $conn = new Database; 00729 // If all servers were busy, mLastError will contain something sensible 00730 throw new DBConnectionError( $conn, $this->mLastError ); 00731 } else { 00732 $server = $conn->getProperty( 'mServer' ); 00733 wfLogDBError( "Connection error: {$this->mLastError} ({$server})\n" ); 00734 $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); 00735 } 00736 wfProfileOut( __METHOD__ ); 00737 } 00738 00742 function getWriterIndex() { 00743 return 0; 00744 } 00745 00752 function haveIndex( $i ) { 00753 return array_key_exists( $i, $this->mServers ); 00754 } 00755 00762 function isNonZeroLoad( $i ) { 00763 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0; 00764 } 00765 00771 function getServerCount() { 00772 return count( $this->mServers ); 00773 } 00774 00781 function getServerName( $i ) { 00782 if ( isset( $this->mServers[$i]['hostName'] ) ) { 00783 return $this->mServers[$i]['hostName']; 00784 } elseif ( isset( $this->mServers[$i]['host'] ) ) { 00785 return $this->mServers[$i]['host']; 00786 } else { 00787 return ''; 00788 } 00789 } 00790 00796 function getServerInfo( $i ) { 00797 if ( isset( $this->mServers[$i] ) ) { 00798 return $this->mServers[$i]; 00799 } else { 00800 return false; 00801 } 00802 } 00803 00809 function setServerInfo( $i, $serverInfo ) { 00810 $this->mServers[$i] = $serverInfo; 00811 } 00812 00817 function getMasterPos() { 00818 # If this entire request was served from a slave without opening a connection to the 00819 # master (however unlikely that may be), then we can fetch the position from the slave. 00820 $masterConn = $this->getAnyOpenConnection( 0 ); 00821 if ( !$masterConn ) { 00822 for ( $i = 1; $i < count( $this->mServers ); $i++ ) { 00823 $conn = $this->getAnyOpenConnection( $i ); 00824 if ( $conn ) { 00825 wfDebug( "Master pos fetched from slave\n" ); 00826 return $conn->getSlavePos(); 00827 } 00828 } 00829 } else { 00830 wfDebug( "Master pos fetched from master\n" ); 00831 return $masterConn->getMasterPos(); 00832 } 00833 return false; 00834 } 00835 00839 function closeAll() { 00840 foreach ( $this->mConns as $conns2 ) { 00841 foreach ( $conns2 as $conns3 ) { 00842 foreach ( $conns3 as $conn ) { 00843 $conn->close(); 00844 } 00845 } 00846 } 00847 $this->mConns = array( 00848 'local' => array(), 00849 'foreignFree' => array(), 00850 'foreignUsed' => array(), 00851 ); 00852 } 00853 00860 function closeConnecton( $conn ) { 00861 wfDeprecated( __METHOD__, '1.18' ); 00862 $this->closeConnection( $conn ); 00863 } 00864 00871 function closeConnection( $conn ) { 00872 $done = false; 00873 foreach ( $this->mConns as $i1 => $conns2 ) { 00874 foreach ( $conns2 as $i2 => $conns3 ) { 00875 foreach ( $conns3 as $i3 => $candidateConn ) { 00876 if ( $conn === $candidateConn ) { 00877 $conn->close(); 00878 unset( $this->mConns[$i1][$i2][$i3] ); 00879 $done = true; 00880 break; 00881 } 00882 } 00883 } 00884 } 00885 if ( !$done ) { 00886 $conn->close(); 00887 } 00888 } 00889 00893 function commitAll() { 00894 foreach ( $this->mConns as $conns2 ) { 00895 foreach ( $conns2 as $conns3 ) { 00896 foreach ( $conns3 as $conn ) { 00897 $conn->commit(); 00898 } 00899 } 00900 } 00901 } 00902 00906 function commitMasterChanges() { 00907 // Always 0, but who knows.. :) 00908 $masterIndex = $this->getWriterIndex(); 00909 foreach ( $this->mConns as $conns2 ) { 00910 if ( empty( $conns2[$masterIndex] ) ) { 00911 continue; 00912 } 00913 foreach ( $conns2[$masterIndex] as $conn ) { 00914 if ( $conn->doneWrites() ) { 00915 $conn->commit( __METHOD__ ); 00916 } 00917 } 00918 } 00919 } 00920 00925 function waitTimeout( $value = null ) { 00926 return wfSetVar( $this->mWaitTimeout, $value ); 00927 } 00928 00932 function getLaggedSlaveMode() { 00933 return $this->mLaggedSlaveMode; 00934 } 00935 00941 function allowLagged( $mode = null ) { 00942 if ( $mode === null) { 00943 return $this->mAllowLagged; 00944 } 00945 $this->mAllowLagged = $mode; 00946 } 00947 00951 function pingAll() { 00952 $success = true; 00953 foreach ( $this->mConns as $conns2 ) { 00954 foreach ( $conns2 as $conns3 ) { 00955 foreach ( $conns3 as $conn ) { 00956 if ( !$conn->ping() ) { 00957 $success = false; 00958 } 00959 } 00960 } 00961 } 00962 return $success; 00963 } 00964 00970 function forEachOpenConnection( $callback, $params = array() ) { 00971 foreach ( $this->mConns as $conns2 ) { 00972 foreach ( $conns2 as $conns3 ) { 00973 foreach ( $conns3 as $conn ) { 00974 $mergedParams = array_merge( array( $conn ), $params ); 00975 call_user_func_array( $callback, $mergedParams ); 00976 } 00977 } 00978 } 00979 } 00980 00991 function getMaxLag( $wiki = false ) { 00992 $maxLag = -1; 00993 $host = ''; 00994 $maxIndex = 0; 00995 if ( $this->getServerCount() > 1 ) { // no replication = no lag 00996 foreach ( $this->mServers as $i => $conn ) { 00997 $conn = false; 00998 if ( $wiki === false ) { 00999 $conn = $this->getAnyOpenConnection( $i ); 01000 } 01001 if ( !$conn ) { 01002 $conn = $this->openConnection( $i, $wiki ); 01003 } 01004 if ( !$conn ) { 01005 continue; 01006 } 01007 $lag = $conn->getLag(); 01008 if ( $lag > $maxLag ) { 01009 $maxLag = $lag; 01010 $host = $this->mServers[$i]['host']; 01011 $maxIndex = $i; 01012 } 01013 } 01014 } 01015 return array( $host, $maxLag, $maxIndex ); 01016 } 01017 01026 function getLagTimes( $wiki = false ) { 01027 # Try process cache 01028 if ( isset( $this->mLagTimes ) ) { 01029 return $this->mLagTimes; 01030 } 01031 if ( $this->getServerCount() == 1 ) { 01032 # No replication 01033 $this->mLagTimes = array( 0 => 0 ); 01034 } else { 01035 # Send the request to the load monitor 01036 $this->mLagTimes = $this->getLoadMonitor()->getLagTimes( 01037 array_keys( $this->mServers ), $wiki ); 01038 } 01039 return $this->mLagTimes; 01040 } 01041 01057 function safeGetLag( $conn ) { 01058 if ( $this->getServerCount() == 1 ) { 01059 return 0; 01060 } else { 01061 return $conn->getLag(); 01062 } 01063 } 01064 01068 function clearLagTimeCache() { 01069 $this->mLagTimes = null; 01070 } 01071 }