MediaWiki
REL1_21
|
00001 <?php 00030 class LoadBalancer { 00031 private $mServers, $mConns, $mLoads, $mGroupLoads; 00032 private $mErrorConnection; 00033 private $mReadIndex, $mAllowLagged; 00034 private $mWaitForPos, $mWaitTimeout; 00035 private $mLaggedSlaveMode, $mLastError = 'Unknown error'; 00036 private $mParentInfo, $mLagTimes; 00037 private $mLoadMonitorClass, $mLoadMonitor; 00038 00046 function __construct( $params ) { 00047 if ( !isset( $params['servers'] ) ) { 00048 throw new MWException( __CLASS__ . ': missing servers parameter' ); 00049 } 00050 $this->mServers = $params['servers']; 00051 00052 if ( isset( $params['waitTimeout'] ) ) { 00053 $this->mWaitTimeout = $params['waitTimeout']; 00054 } else { 00055 $this->mWaitTimeout = 10; 00056 } 00057 00058 $this->mReadIndex = -1; 00059 $this->mWriteIndex = -1; 00060 $this->mConns = array( 00061 'local' => array(), 00062 'foreignUsed' => array(), 00063 'foreignFree' => array() ); 00064 $this->mLoads = array(); 00065 $this->mWaitForPos = false; 00066 $this->mLaggedSlaveMode = false; 00067 $this->mErrorConnection = false; 00068 $this->mAllowLagged = false; 00069 00070 if ( isset( $params['loadMonitor'] ) ) { 00071 $this->mLoadMonitorClass = $params['loadMonitor']; 00072 } else { 00073 $master = reset( $params['servers'] ); 00074 if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) { 00075 $this->mLoadMonitorClass = 'LoadMonitor_MySQL'; 00076 } else { 00077 $this->mLoadMonitorClass = 'LoadMonitor_Null'; 00078 } 00079 } 00080 00081 foreach( $params['servers'] as $i => $server ) { 00082 $this->mLoads[$i] = $server['load']; 00083 if ( isset( $server['groupLoads'] ) ) { 00084 foreach ( $server['groupLoads'] as $group => $ratio ) { 00085 if ( !isset( $this->mGroupLoads[$group] ) ) { 00086 $this->mGroupLoads[$group] = array(); 00087 } 00088 $this->mGroupLoads[$group][$i] = $ratio; 00089 } 00090 } 00091 } 00092 } 00093 00099 function getLoadMonitor() { 00100 if ( !isset( $this->mLoadMonitor ) ) { 00101 $class = $this->mLoadMonitorClass; 00102 $this->mLoadMonitor = new $class( $this ); 00103 } 00104 return $this->mLoadMonitor; 00105 } 00106 00112 function parentInfo( $x = null ) { 00113 return wfSetVar( $this->mParentInfo, $x ); 00114 } 00115 00126 function pickRandom( $weights ) { 00127 return ArrayUtils::pickRandom( $weights ); 00128 } 00129 00135 function getRandomNonLagged( $loads, $wiki = false ) { 00136 # Unset excessively lagged servers 00137 $lags = $this->getLagTimes( $wiki ); 00138 foreach ( $lags as $i => $lag ) { 00139 if ( $i != 0 ) { 00140 if ( $lag === false ) { 00141 wfDebugLog( 'replication', "Server #$i is not replicating\n" ); 00142 unset( $loads[$i] ); 00143 } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) { 00144 wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)\n" ); 00145 unset( $loads[$i] ); 00146 } 00147 } 00148 } 00149 00150 # Find out if all the slaves with non-zero load are lagged 00151 $sum = 0; 00152 foreach ( $loads as $load ) { 00153 $sum += $load; 00154 } 00155 if ( $sum == 0 ) { 00156 # No appropriate DB servers except maybe the master and some slaves with zero load 00157 # Do NOT use the master 00158 # Instead, this function will return false, triggering read-only mode, 00159 # and a lagged slave will be used instead. 00160 return false; 00161 } 00162 00163 if ( count( $loads ) == 0 ) { 00164 return false; 00165 } 00166 00167 #wfDebugLog( 'connect', var_export( $loads, true ) ); 00168 00169 # Return a random representative of the remainder 00170 return $this->pickRandom( $loads ); 00171 } 00172 00184 function getReaderIndex( $group = false, $wiki = false ) { 00185 global $wgReadOnly, $wgDBClusterTimeout, $wgDBAvgStatusPoll, $wgDBtype; 00186 00187 # @todo FIXME: For now, only go through all this for mysql databases 00188 if ( $wgDBtype != 'mysql' ) { 00189 return $this->getWriterIndex(); 00190 } 00191 00192 if ( count( $this->mServers ) == 1 ) { 00193 # Skip the load balancing if there's only one server 00194 return 0; 00195 } elseif ( $group === false and $this->mReadIndex >= 0 ) { 00196 # Shortcut if generic reader exists already 00197 return $this->mReadIndex; 00198 } 00199 00200 wfProfileIn( __METHOD__ ); 00201 00202 $totalElapsed = 0; 00203 00204 # convert from seconds to microseconds 00205 $timeout = $wgDBClusterTimeout * 1e6; 00206 00207 # Find the relevant load array 00208 if ( $group !== false ) { 00209 if ( isset( $this->mGroupLoads[$group] ) ) { 00210 $nonErrorLoads = $this->mGroupLoads[$group]; 00211 } else { 00212 # No loads for this group, return false and the caller can use some other group 00213 wfDebug( __METHOD__ . ": no loads for group $group\n" ); 00214 wfProfileOut( __METHOD__ ); 00215 return false; 00216 } 00217 } else { 00218 $nonErrorLoads = $this->mLoads; 00219 } 00220 00221 if ( !$nonErrorLoads ) { 00222 throw new MWException( "Empty server array given to LoadBalancer" ); 00223 } 00224 00225 # Scale the configured load ratios according to the dynamic load (if the load monitor supports it) 00226 $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki ); 00227 00228 $laggedSlaveMode = false; 00229 00230 # First try quickly looking through the available servers for a server that 00231 # meets our criteria 00232 do { 00233 $totalThreadsConnected = 0; 00234 $overloadedServers = 0; 00235 $currentLoads = $nonErrorLoads; 00236 while ( count( $currentLoads ) ) { 00237 if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) { 00238 $i = $this->pickRandom( $currentLoads ); 00239 } else { 00240 $i = $this->getRandomNonLagged( $currentLoads, $wiki ); 00241 if ( $i === false && count( $currentLoads ) != 0 ) { 00242 # All slaves lagged. Switch to read-only mode 00243 wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode\n" ); 00244 $wgReadOnly = 'The database has been automatically locked ' . 00245 'while the slave database servers catch up to the master'; 00246 $i = $this->pickRandom( $currentLoads ); 00247 $laggedSlaveMode = true; 00248 } 00249 } 00250 00251 if ( $i === false ) { 00252 # pickRandom() returned false 00253 # This is permanent and means the configuration or the load monitor 00254 # wants us to return false. 00255 wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false\n" ); 00256 wfProfileOut( __METHOD__ ); 00257 return false; 00258 } 00259 00260 wfDebugLog( 'connect', __METHOD__ . ": Using reader #$i: {$this->mServers[$i]['host']}...\n" ); 00261 $conn = $this->openConnection( $i, $wiki ); 00262 00263 if ( !$conn ) { 00264 wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki\n" ); 00265 unset( $nonErrorLoads[$i] ); 00266 unset( $currentLoads[$i] ); 00267 continue; 00268 } 00269 00270 // Perform post-connection backoff 00271 $threshold = isset( $this->mServers[$i]['max threads'] ) 00272 ? $this->mServers[$i]['max threads'] : false; 00273 $backoff = $this->getLoadMonitor()->postConnectionBackoff( $conn, $threshold ); 00274 00275 // Decrement reference counter, we are finished with this connection. 00276 // It will be incremented for the caller later. 00277 if ( $wiki !== false ) { 00278 $this->reuseConnection( $conn ); 00279 } 00280 00281 if ( $backoff ) { 00282 # Post-connection overload, don't use this server for now 00283 $totalThreadsConnected += $backoff; 00284 $overloadedServers++; 00285 unset( $currentLoads[$i] ); 00286 } else { 00287 # Return this server 00288 break 2; 00289 } 00290 } 00291 00292 # No server found yet 00293 $i = false; 00294 00295 # If all servers were down, quit now 00296 if ( !count( $nonErrorLoads ) ) { 00297 wfDebugLog( 'connect', "All servers down\n" ); 00298 break; 00299 } 00300 00301 # Some servers must have been overloaded 00302 if ( $overloadedServers == 0 ) { 00303 throw new MWException( __METHOD__ . ": unexpectedly found no overloaded servers" ); 00304 } 00305 # Back off for a while 00306 # Scale the sleep time by the number of connected threads, to produce a 00307 # roughly constant global poll rate 00308 $avgThreads = $totalThreadsConnected / $overloadedServers; 00309 $totalElapsed += $this->sleep( $wgDBAvgStatusPoll * $avgThreads ); 00310 } while ( $totalElapsed < $timeout ); 00311 00312 if ( $totalElapsed >= $timeout ) { 00313 wfDebugLog( 'connect', "All servers busy\n" ); 00314 $this->mErrorConnection = false; 00315 $this->mLastError = 'All servers busy'; 00316 } 00317 00318 if ( $i !== false ) { 00319 # Slave connection successful 00320 # Wait for the session master pos for a short time 00321 if ( $this->mWaitForPos && $i > 0 ) { 00322 if ( !$this->doWait( $i ) ) { 00323 $this->mServers[$i]['slave pos'] = $conn->getSlavePos(); 00324 } 00325 } 00326 if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $i !== false ) { 00327 $this->mReadIndex = $i; 00328 } 00329 } 00330 wfProfileOut( __METHOD__ ); 00331 return $i; 00332 } 00333 00339 function sleep( $t ) { 00340 wfProfileIn( __METHOD__ ); 00341 wfDebug( __METHOD__ . ": waiting $t us\n" ); 00342 usleep( $t ); 00343 wfProfileOut( __METHOD__ ); 00344 return $t; 00345 } 00346 00353 public function waitFor( $pos ) { 00354 wfProfileIn( __METHOD__ ); 00355 $this->mWaitForPos = $pos; 00356 $i = $this->mReadIndex; 00357 00358 if ( $i > 0 ) { 00359 if ( !$this->doWait( $i ) ) { 00360 $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos(); 00361 $this->mLaggedSlaveMode = true; 00362 } 00363 } 00364 wfProfileOut( __METHOD__ ); 00365 } 00366 00371 public function waitForAll( $pos ) { 00372 wfProfileIn( __METHOD__ ); 00373 $this->mWaitForPos = $pos; 00374 for ( $i = 1; $i < count( $this->mServers ); $i++ ) { 00375 $this->doWait( $i, true ); 00376 } 00377 wfProfileOut( __METHOD__ ); 00378 } 00379 00387 function getAnyOpenConnection( $i ) { 00388 foreach ( $this->mConns as $conns ) { 00389 if ( !empty( $conns[$i] ) ) { 00390 return reset( $conns[$i] ); 00391 } 00392 } 00393 return false; 00394 } 00395 00402 function doWait( $index, $open = false ) { 00403 # Find a connection to wait on 00404 $conn = $this->getAnyOpenConnection( $index ); 00405 if ( !$conn ) { 00406 if ( !$open ) { 00407 wfDebug( __METHOD__ . ": no connection open\n" ); 00408 return false; 00409 } else { 00410 $conn = $this->openConnection( $index ); 00411 if ( !$conn ) { 00412 wfDebug( __METHOD__ . ": failed to open connection\n" ); 00413 return false; 00414 } 00415 } 00416 } 00417 00418 wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" ); 00419 $result = $conn->masterPosWait( $this->mWaitForPos, $this->mWaitTimeout ); 00420 00421 if ( $result == -1 || is_null( $result ) ) { 00422 # Timed out waiting for slave, use master instead 00423 wfDebug( __METHOD__ . ": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); 00424 return false; 00425 } else { 00426 wfDebug( __METHOD__ . ": Done\n" ); 00427 return true; 00428 } 00429 } 00430 00442 public function &getConnection( $i, $groups = array(), $wiki = false ) { 00443 wfProfileIn( __METHOD__ ); 00444 00445 if ( $i == DB_LAST ) { 00446 throw new MWException( 'Attempt to call ' . __METHOD__ . ' with deprecated server index DB_LAST' ); 00447 } elseif ( $i === null || $i === false ) { 00448 throw new MWException( 'Attempt to call ' . __METHOD__ . ' with invalid server index' ); 00449 } 00450 00451 if ( $wiki === wfWikiID() ) { 00452 $wiki = false; 00453 } 00454 00455 # Query groups 00456 if ( $i == DB_MASTER ) { 00457 $i = $this->getWriterIndex(); 00458 } elseif ( !is_array( $groups ) ) { 00459 $groupIndex = $this->getReaderIndex( $groups, $wiki ); 00460 if ( $groupIndex !== false ) { 00461 $serverName = $this->getServerName( $groupIndex ); 00462 wfDebug( __METHOD__ . ": using server $serverName for group $groups\n" ); 00463 $i = $groupIndex; 00464 } 00465 } else { 00466 foreach ( $groups as $group ) { 00467 $groupIndex = $this->getReaderIndex( $group, $wiki ); 00468 if ( $groupIndex !== false ) { 00469 $serverName = $this->getServerName( $groupIndex ); 00470 wfDebug( __METHOD__ . ": using server $serverName for group $group\n" ); 00471 $i = $groupIndex; 00472 break; 00473 } 00474 } 00475 } 00476 00477 # Operation-based index 00478 if ( $i == DB_SLAVE ) { 00479 $i = $this->getReaderIndex( false, $wiki ); 00480 # Couldn't find a working server in getReaderIndex()? 00481 if ( $i === false ) { 00482 $this->mLastError = 'No working slave server: ' . $this->mLastError; 00483 wfProfileOut( __METHOD__ ); 00484 return $this->reportConnectionError(); 00485 } 00486 } 00487 00488 # Now we have an explicit index into the servers array 00489 $conn = $this->openConnection( $i, $wiki ); 00490 if ( !$conn ) { 00491 wfProfileOut( __METHOD__ ); 00492 return $this->reportConnectionError(); 00493 } 00494 00495 wfProfileOut( __METHOD__ ); 00496 return $conn; 00497 } 00498 00507 public function reuseConnection( $conn ) { 00508 $serverIndex = $conn->getLBInfo( 'serverIndex' ); 00509 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00510 $dbName = $conn->getDBname(); 00511 $prefix = $conn->tablePrefix(); 00512 if ( strval( $prefix ) !== '' ) { 00513 $wiki = "$dbName-$prefix"; 00514 } else { 00515 $wiki = $dbName; 00516 } 00517 if ( $serverIndex === null || $refCount === null ) { 00518 wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" ); 00529 return; 00530 } 00531 if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) { 00532 throw new MWException( __METHOD__ . ": connection not found, has the connection been freed already?" ); 00533 } 00534 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); 00535 if ( $refCount <= 0 ) { 00536 $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn; 00537 unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] ); 00538 wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" ); 00539 } else { 00540 wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" ); 00541 } 00542 } 00543 00558 function openConnection( $i, $wiki = false ) { 00559 wfProfileIn( __METHOD__ ); 00560 if ( $wiki !== false ) { 00561 $conn = $this->openForeignConnection( $i, $wiki ); 00562 wfProfileOut( __METHOD__ ); 00563 return $conn; 00564 } 00565 if ( isset( $this->mConns['local'][$i][0] ) ) { 00566 $conn = $this->mConns['local'][$i][0]; 00567 } else { 00568 $server = $this->mServers[$i]; 00569 $server['serverIndex'] = $i; 00570 $conn = $this->reallyOpenConnection( $server, false ); 00571 if ( $conn->isOpen() ) { 00572 wfDebug( "Connected to database $i at {$this->mServers[$i]['host']}\n" ); 00573 $this->mConns['local'][$i][0] = $conn; 00574 } else { 00575 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" ); 00576 $this->mErrorConnection = $conn; 00577 $conn = false; 00578 } 00579 } 00580 wfProfileOut( __METHOD__ ); 00581 return $conn; 00582 } 00583 00602 function openForeignConnection( $i, $wiki ) { 00603 wfProfileIn( __METHOD__ ); 00604 list( $dbName, $prefix ) = wfSplitWikiID( $wiki ); 00605 if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) { 00606 // Reuse an already-used connection 00607 $conn = $this->mConns['foreignUsed'][$i][$wiki]; 00608 wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" ); 00609 } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) { 00610 // Reuse a free connection for the same wiki 00611 $conn = $this->mConns['foreignFree'][$i][$wiki]; 00612 unset( $this->mConns['foreignFree'][$i][$wiki] ); 00613 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00614 wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" ); 00615 } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) { 00616 // Reuse a connection from another wiki 00617 $conn = reset( $this->mConns['foreignFree'][$i] ); 00618 $oldWiki = key( $this->mConns['foreignFree'][$i] ); 00619 00620 if ( !$conn->selectDB( $dbName ) ) { 00621 $this->mLastError = "Error selecting database $dbName on server " . 00622 $conn->getServer() . " from client host " . wfHostname() . "\n"; 00623 $this->mErrorConnection = $conn; 00624 $conn = false; 00625 } else { 00626 $conn->tablePrefix( $prefix ); 00627 unset( $this->mConns['foreignFree'][$i][$oldWiki] ); 00628 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00629 wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" ); 00630 } 00631 } else { 00632 // Open a new connection 00633 $server = $this->mServers[$i]; 00634 $server['serverIndex'] = $i; 00635 $server['foreignPoolRefCount'] = 0; 00636 $conn = $this->reallyOpenConnection( $server, $dbName ); 00637 if ( !$conn->isOpen() ) { 00638 wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" ); 00639 $this->mErrorConnection = $conn; 00640 $conn = false; 00641 } else { 00642 $conn->tablePrefix( $prefix ); 00643 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 00644 wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" ); 00645 } 00646 } 00647 00648 // Increment reference count 00649 if ( $conn ) { 00650 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 00651 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 ); 00652 } 00653 wfProfileOut( __METHOD__ ); 00654 return $conn; 00655 } 00656 00664 function isOpen( $index ) { 00665 if( !is_integer( $index ) ) { 00666 return false; 00667 } 00668 return (bool)$this->getAnyOpenConnection( $index ); 00669 } 00670 00681 function reallyOpenConnection( $server, $dbNameOverride = false ) { 00682 if( !is_array( $server ) ) { 00683 throw new MWException( 'You must update your load-balancing configuration. ' . 00684 'See DefaultSettings.php entry for $wgDBservers.' ); 00685 } 00686 00687 if ( $dbNameOverride !== false ) { 00688 $server['dbname'] = $dbNameOverride; 00689 } 00690 00691 # Create object 00692 try { 00693 $db = DatabaseBase::factory( $server['type'], $server ); 00694 } catch ( DBConnectionError $e ) { 00695 // FIXME: This is probably the ugliest thing I have ever done to 00696 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS 00697 $db = $e->db; 00698 } 00699 00700 $db->setLBInfo( $server ); 00701 if ( isset( $server['fakeSlaveLag'] ) ) { 00702 $db->setFakeSlaveLag( $server['fakeSlaveLag'] ); 00703 } 00704 if ( isset( $server['fakeMaster'] ) ) { 00705 $db->setFakeMaster( true ); 00706 } 00707 return $db; 00708 } 00709 00714 private function reportConnectionError() { 00715 $conn = $this->mErrorConnection; // The connection which caused the error 00716 00717 if ( !is_object( $conn ) ) { 00718 // No last connection, probably due to all servers being too busy 00719 wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}\n" ); 00720 00721 // If all servers were busy, mLastError will contain something sensible 00722 throw new DBConnectionError( null, $this->mLastError ); 00723 } else { 00724 $server = $conn->getProperty( 'mServer' ); 00725 wfLogDBError( "Connection error: {$this->mLastError} ({$server})\n" ); 00726 $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); // throws DBConnectionError 00727 } 00728 return false; /* not reached */ 00729 } 00730 00734 function getWriterIndex() { 00735 return 0; 00736 } 00737 00744 function haveIndex( $i ) { 00745 return array_key_exists( $i, $this->mServers ); 00746 } 00747 00754 function isNonZeroLoad( $i ) { 00755 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0; 00756 } 00757 00763 function getServerCount() { 00764 return count( $this->mServers ); 00765 } 00766 00773 function getServerName( $i ) { 00774 if ( isset( $this->mServers[$i]['hostName'] ) ) { 00775 return $this->mServers[$i]['hostName']; 00776 } elseif ( isset( $this->mServers[$i]['host'] ) ) { 00777 return $this->mServers[$i]['host']; 00778 } else { 00779 return ''; 00780 } 00781 } 00782 00788 function getServerInfo( $i ) { 00789 if ( isset( $this->mServers[$i] ) ) { 00790 return $this->mServers[$i]; 00791 } else { 00792 return false; 00793 } 00794 } 00795 00801 function setServerInfo( $i, $serverInfo ) { 00802 $this->mServers[$i] = $serverInfo; 00803 } 00804 00809 function getMasterPos() { 00810 # If this entire request was served from a slave without opening a connection to the 00811 # master (however unlikely that may be), then we can fetch the position from the slave. 00812 $masterConn = $this->getAnyOpenConnection( 0 ); 00813 if ( !$masterConn ) { 00814 for ( $i = 1; $i < count( $this->mServers ); $i++ ) { 00815 $conn = $this->getAnyOpenConnection( $i ); 00816 if ( $conn ) { 00817 wfDebug( "Master pos fetched from slave\n" ); 00818 return $conn->getSlavePos(); 00819 } 00820 } 00821 } else { 00822 wfDebug( "Master pos fetched from master\n" ); 00823 return $masterConn->getMasterPos(); 00824 } 00825 return false; 00826 } 00827 00831 function closeAll() { 00832 foreach ( $this->mConns as $conns2 ) { 00833 foreach ( $conns2 as $conns3 ) { 00834 foreach ( $conns3 as $conn ) { 00835 $conn->close(); 00836 } 00837 } 00838 } 00839 $this->mConns = array( 00840 'local' => array(), 00841 'foreignFree' => array(), 00842 'foreignUsed' => array(), 00843 ); 00844 } 00845 00852 function closeConnecton( $conn ) { 00853 wfDeprecated( __METHOD__, '1.18' ); 00854 $this->closeConnection( $conn ); 00855 } 00856 00863 function closeConnection( $conn ) { 00864 $done = false; 00865 foreach ( $this->mConns as $i1 => $conns2 ) { 00866 foreach ( $conns2 as $i2 => $conns3 ) { 00867 foreach ( $conns3 as $i3 => $candidateConn ) { 00868 if ( $conn === $candidateConn ) { 00869 $conn->close(); 00870 unset( $this->mConns[$i1][$i2][$i3] ); 00871 $done = true; 00872 break; 00873 } 00874 } 00875 } 00876 } 00877 if ( !$done ) { 00878 $conn->close(); 00879 } 00880 } 00881 00885 function commitAll() { 00886 foreach ( $this->mConns as $conns2 ) { 00887 foreach ( $conns2 as $conns3 ) { 00888 foreach ( $conns3 as $conn ) { 00889 if ( $conn->trxLevel() ) { 00890 $conn->commit( __METHOD__, 'flush' ); 00891 } 00892 } 00893 } 00894 } 00895 } 00896 00900 function commitMasterChanges() { 00901 // Always 0, but who knows.. :) 00902 $masterIndex = $this->getWriterIndex(); 00903 foreach ( $this->mConns as $conns2 ) { 00904 if ( empty( $conns2[$masterIndex] ) ) { 00905 continue; 00906 } 00907 foreach ( $conns2[$masterIndex] as $conn ) { 00908 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 00909 $conn->commit( __METHOD__, 'flush' ); 00910 } 00911 } 00912 } 00913 } 00914 00919 function waitTimeout( $value = null ) { 00920 return wfSetVar( $this->mWaitTimeout, $value ); 00921 } 00922 00926 function getLaggedSlaveMode() { 00927 return $this->mLaggedSlaveMode; 00928 } 00929 00935 function allowLagged( $mode = null ) { 00936 if ( $mode === null ) { 00937 return $this->mAllowLagged; 00938 } 00939 $this->mAllowLagged = $mode; 00940 return $this->mAllowLagged; 00941 } 00942 00946 function pingAll() { 00947 $success = true; 00948 foreach ( $this->mConns as $conns2 ) { 00949 foreach ( $conns2 as $conns3 ) { 00950 foreach ( $conns3 as $conn ) { 00951 if ( !$conn->ping() ) { 00952 $success = false; 00953 } 00954 } 00955 } 00956 } 00957 return $success; 00958 } 00959 00965 function forEachOpenConnection( $callback, $params = array() ) { 00966 foreach ( $this->mConns as $conns2 ) { 00967 foreach ( $conns2 as $conns3 ) { 00968 foreach ( $conns3 as $conn ) { 00969 $mergedParams = array_merge( array( $conn ), $params ); 00970 call_user_func_array( $callback, $mergedParams ); 00971 } 00972 } 00973 } 00974 } 00975 00986 function getMaxLag( $wiki = false ) { 00987 $maxLag = -1; 00988 $host = ''; 00989 $maxIndex = 0; 00990 if ( $this->getServerCount() > 1 ) { // no replication = no lag 00991 foreach ( $this->mServers as $i => $conn ) { 00992 $conn = false; 00993 if ( $wiki === false ) { 00994 $conn = $this->getAnyOpenConnection( $i ); 00995 } 00996 if ( !$conn ) { 00997 $conn = $this->openConnection( $i, $wiki ); 00998 } 00999 if ( !$conn ) { 01000 continue; 01001 } 01002 $lag = $conn->getLag(); 01003 if ( $lag > $maxLag ) { 01004 $maxLag = $lag; 01005 $host = $this->mServers[$i]['host']; 01006 $maxIndex = $i; 01007 } 01008 } 01009 } 01010 return array( $host, $maxLag, $maxIndex ); 01011 } 01012 01021 function getLagTimes( $wiki = false ) { 01022 # Try process cache 01023 if ( isset( $this->mLagTimes ) ) { 01024 return $this->mLagTimes; 01025 } 01026 if ( $this->getServerCount() == 1 ) { 01027 # No replication 01028 $this->mLagTimes = array( 0 => 0 ); 01029 } else { 01030 # Send the request to the load monitor 01031 $this->mLagTimes = $this->getLoadMonitor()->getLagTimes( 01032 array_keys( $this->mServers ), $wiki ); 01033 } 01034 return $this->mLagTimes; 01035 } 01036 01052 function safeGetLag( $conn ) { 01053 if ( $this->getServerCount() == 1 ) { 01054 return 0; 01055 } else { 01056 return $conn->getLag(); 01057 } 01058 } 01059 01063 function clearLagTimeCache() { 01064 $this->mLagTimes = null; 01065 } 01066 }