MediaWiki  REL1_23
LoadBalancer.php
Go to the documentation of this file.
00001 <?php
00030 class LoadBalancer {
00031     private $mServers, $mConns, $mLoads, $mGroupLoads;
00032 
00034     private $mErrorConnection;
00035     private $mReadIndex, $mAllowLagged;
00036 
00038     private $mWaitForPos;
00039 
00040     private $mWaitTimeout;
00041     private $mLaggedSlaveMode, $mLastError = 'Unknown error';
00042     private $mParentInfo, $mLagTimes;
00043     private $mLoadMonitorClass, $mLoadMonitor;
00044 
00052     function __construct( $params ) {
00053         if ( !isset( $params['servers'] ) ) {
00054             throw new MWException( __CLASS__ . ': missing servers parameter' );
00055         }
00056         $this->mServers = $params['servers'];
00057 
00058         if ( isset( $params['waitTimeout'] ) ) {
00059             $this->mWaitTimeout = $params['waitTimeout'];
00060         } else {
00061             $this->mWaitTimeout = 10;
00062         }
00063 
00064         $this->mReadIndex = -1;
00065         $this->mWriteIndex = -1;
00066         $this->mConns = array(
00067             'local' => array(),
00068             'foreignUsed' => array(),
00069             'foreignFree' => array() );
00070         $this->mLoads = array();
00071         $this->mWaitForPos = false;
00072         $this->mLaggedSlaveMode = false;
00073         $this->mErrorConnection = false;
00074         $this->mAllowLagged = false;
00075 
00076         if ( isset( $params['loadMonitor'] ) ) {
00077             $this->mLoadMonitorClass = $params['loadMonitor'];
00078         } else {
00079             $master = reset( $params['servers'] );
00080             if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) {
00081                 $this->mLoadMonitorClass = 'LoadMonitorMySQL';
00082             } else {
00083                 $this->mLoadMonitorClass = 'LoadMonitorNull';
00084             }
00085         }
00086 
00087         foreach ( $params['servers'] as $i => $server ) {
00088             $this->mLoads[$i] = $server['load'];
00089             if ( isset( $server['groupLoads'] ) ) {
00090                 foreach ( $server['groupLoads'] as $group => $ratio ) {
00091                     if ( !isset( $this->mGroupLoads[$group] ) ) {
00092                         $this->mGroupLoads[$group] = array();
00093                     }
00094                     $this->mGroupLoads[$group][$i] = $ratio;
00095                 }
00096             }
00097         }
00098     }
00099 
00105     function getLoadMonitor() {
00106         if ( !isset( $this->mLoadMonitor ) ) {
00107             $class = $this->mLoadMonitorClass;
00108             $this->mLoadMonitor = new $class( $this );
00109         }
00110 
00111         return $this->mLoadMonitor;
00112     }
00113 
00119     function parentInfo( $x = null ) {
00120         return wfSetVar( $this->mParentInfo, $x );
00121     }
00122 
00132     function pickRandom( $weights ) {
00133         return ArrayUtils::pickRandom( $weights );
00134     }
00135 
00141     function getRandomNonLagged( $loads, $wiki = false ) {
00142         # Unset excessively lagged servers
00143         $lags = $this->getLagTimes( $wiki );
00144         foreach ( $lags as $i => $lag ) {
00145             if ( $i != 0 ) {
00146                 if ( $lag === false ) {
00147                     wfDebugLog( 'replication', "Server #$i is not replicating" );
00148                     unset( $loads[$i] );
00149                 } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) {
00150                     wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)" );
00151                     unset( $loads[$i] );
00152                 }
00153             }
00154         }
00155 
00156         # Find out if all the slaves with non-zero load are lagged
00157         $sum = 0;
00158         foreach ( $loads as $load ) {
00159             $sum += $load;
00160         }
00161         if ( $sum == 0 ) {
00162             # No appropriate DB servers except maybe the master and some slaves with zero load
00163             # Do NOT use the master
00164             # Instead, this function will return false, triggering read-only mode,
00165             # and a lagged slave will be used instead.
00166             return false;
00167         }
00168 
00169         if ( count( $loads ) == 0 ) {
00170             return false;
00171         }
00172 
00173         #wfDebugLog( 'connect', var_export( $loads, true ) );
00174 
00175         # Return a random representative of the remainder
00176         return ArrayUtils::pickRandom( $loads );
00177     }
00178 
00190     function getReaderIndex( $group = false, $wiki = false ) {
00191         global $wgReadOnly, $wgDBtype;
00192 
00193         # @todo FIXME: For now, only go through all this for mysql databases
00194         if ( $wgDBtype != 'mysql' ) {
00195             return $this->getWriterIndex();
00196         }
00197 
00198         if ( count( $this->mServers ) == 1 ) {
00199             # Skip the load balancing if there's only one server
00200             return 0;
00201         } elseif ( $group === false && $this->mReadIndex >= 0 ) {
00202             # Shortcut if generic reader exists already
00203             return $this->mReadIndex;
00204         }
00205 
00206         $section = new ProfileSection( __METHOD__ );
00207 
00208         # Find the relevant load array
00209         if ( $group !== false ) {
00210             if ( isset( $this->mGroupLoads[$group] ) ) {
00211                 $nonErrorLoads = $this->mGroupLoads[$group];
00212             } else {
00213                 # No loads for this group, return false and the caller can use some other group
00214                 wfDebug( __METHOD__ . ": no loads for group $group\n" );
00215 
00216                 return false;
00217             }
00218         } else {
00219             $nonErrorLoads = $this->mLoads;
00220         }
00221 
00222         if ( !count( $nonErrorLoads ) ) {
00223             throw new MWException( "Empty server array given to LoadBalancer" );
00224         }
00225 
00226         # Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
00227         $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
00228 
00229         $laggedSlaveMode = false;
00230 
00231         # No server found yet
00232         $i = false;
00233         # First try quickly looking through the available servers for a server that
00234         # meets our criteria
00235         $currentLoads = $nonErrorLoads;
00236         while ( count( $currentLoads ) ) {
00237             if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) {
00238                 $i = ArrayUtils::pickRandom( $currentLoads );
00239             } else {
00240                 $i = $this->getRandomNonLagged( $currentLoads, $wiki );
00241                 if ( $i === false && count( $currentLoads ) != 0 ) {
00242                     # All slaves lagged. Switch to read-only mode
00243                     wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode" );
00244                     $wgReadOnly = 'The database has been automatically locked ' .
00245                         'while the slave database servers catch up to the master';
00246                     $i = ArrayUtils::pickRandom( $currentLoads );
00247                     $laggedSlaveMode = true;
00248                 }
00249             }
00250 
00251             if ( $i === false ) {
00252                 # pickRandom() returned false
00253                 # This is permanent and means the configuration or the load monitor
00254                 # wants us to return false.
00255                 wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" );
00256 
00257                 return false;
00258             }
00259 
00260             wfDebugLog( 'connect', __METHOD__ .
00261                 ": Using reader #$i: {$this->mServers[$i]['host']}..." );
00262 
00263             $conn = $this->openConnection( $i, $wiki );
00264             if ( !$conn ) {
00265                 wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" );
00266                 unset( $nonErrorLoads[$i] );
00267                 unset( $currentLoads[$i] );
00268                 $i = false;
00269                 continue;
00270             }
00271 
00272             // Decrement reference counter, we are finished with this connection.
00273             // It will be incremented for the caller later.
00274             if ( $wiki !== false ) {
00275                 $this->reuseConnection( $conn );
00276             }
00277 
00278             # Return this server
00279             break;
00280         }
00281 
00282         # If all servers were down, quit now
00283         if ( !count( $nonErrorLoads ) ) {
00284             wfDebugLog( 'connect', "All servers down" );
00285         }
00286 
00287         if ( $i !== false ) {
00288             # Slave connection successful
00289             # Wait for the session master pos for a short time
00290             if ( $this->mWaitForPos && $i > 0 ) {
00291                 if ( !$this->doWait( $i ) ) {
00292                     $this->mServers[$i]['slave pos'] = $conn->getSlavePos();
00293                 }
00294             }
00295             if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group !== false ) {
00296                 $this->mReadIndex = $i;
00297             }
00298         }
00299 
00300         return $i;
00301     }
00302 
00308     function sleep( $t ) {
00309         wfProfileIn( __METHOD__ );
00310         wfDebug( __METHOD__ . ": waiting $t us\n" );
00311         usleep( $t );
00312         wfProfileOut( __METHOD__ );
00313 
00314         return $t;
00315     }
00316 
00323     public function waitFor( $pos ) {
00324         wfProfileIn( __METHOD__ );
00325         $this->mWaitForPos = $pos;
00326         $i = $this->mReadIndex;
00327 
00328         if ( $i > 0 ) {
00329             if ( !$this->doWait( $i ) ) {
00330                 $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos();
00331                 $this->mLaggedSlaveMode = true;
00332             }
00333         }
00334         wfProfileOut( __METHOD__ );
00335     }
00336 
00341     public function waitForAll( $pos ) {
00342         wfProfileIn( __METHOD__ );
00343         $this->mWaitForPos = $pos;
00344         $serverCount = count( $this->mServers );
00345         for ( $i = 1; $i < $serverCount; $i++ ) {
00346             if ( $this->mLoads[$i] > 0 ) {
00347                 $this->doWait( $i, true );
00348             }
00349         }
00350         wfProfileOut( __METHOD__ );
00351     }
00352 
00360     function getAnyOpenConnection( $i ) {
00361         foreach ( $this->mConns as $conns ) {
00362             if ( !empty( $conns[$i] ) ) {
00363                 return reset( $conns[$i] );
00364             }
00365         }
00366 
00367         return false;
00368     }
00369 
00376     protected function doWait( $index, $open = false ) {
00377         # Find a connection to wait on
00378         $conn = $this->getAnyOpenConnection( $index );
00379         if ( !$conn ) {
00380             if ( !$open ) {
00381                 wfDebug( __METHOD__ . ": no connection open\n" );
00382 
00383                 return false;
00384             } else {
00385                 $conn = $this->openConnection( $index, '' );
00386                 if ( !$conn ) {
00387                     wfDebug( __METHOD__ . ": failed to open connection\n" );
00388 
00389                     return false;
00390                 }
00391             }
00392         }
00393 
00394         wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" );
00395         $result = $conn->masterPosWait( $this->mWaitForPos, $this->mWaitTimeout );
00396 
00397         if ( $result == -1 || is_null( $result ) ) {
00398             # Timed out waiting for slave, use master instead
00399             wfDebug( __METHOD__ . ": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" );
00400 
00401             return false;
00402         } else {
00403             wfDebug( __METHOD__ . ": Done\n" );
00404 
00405             return true;
00406         }
00407     }
00408 
00420     public function &getConnection( $i, $groups = array(), $wiki = false ) {
00421         wfProfileIn( __METHOD__ );
00422 
00423         if ( $i == DB_LAST ) {
00424             wfProfileOut( __METHOD__ );
00425             throw new MWException( 'Attempt to call ' . __METHOD__ .
00426                 ' with deprecated server index DB_LAST' );
00427         } elseif ( $i === null || $i === false ) {
00428             wfProfileOut( __METHOD__ );
00429             throw new MWException( 'Attempt to call ' . __METHOD__ .
00430                 ' with invalid server index' );
00431         }
00432 
00433         if ( $wiki === wfWikiID() ) {
00434             $wiki = false;
00435         }
00436 
00437         # Query groups
00438         if ( $i == DB_MASTER ) {
00439             $i = $this->getWriterIndex();
00440         } elseif ( !is_array( $groups ) ) {
00441             $groupIndex = $this->getReaderIndex( $groups, $wiki );
00442             if ( $groupIndex !== false ) {
00443                 $serverName = $this->getServerName( $groupIndex );
00444                 wfDebug( __METHOD__ . ": using server $serverName for group $groups\n" );
00445                 $i = $groupIndex;
00446             }
00447         } else {
00448             foreach ( $groups as $group ) {
00449                 $groupIndex = $this->getReaderIndex( $group, $wiki );
00450                 if ( $groupIndex !== false ) {
00451                     $serverName = $this->getServerName( $groupIndex );
00452                     wfDebug( __METHOD__ . ": using server $serverName for group $group\n" );
00453                     $i = $groupIndex;
00454                     break;
00455                 }
00456             }
00457         }
00458 
00459         # Operation-based index
00460         if ( $i == DB_SLAVE ) {
00461             $this->mLastError = 'Unknown error'; // reset error string
00462             $i = $this->getReaderIndex( false, $wiki );
00463             # Couldn't find a working server in getReaderIndex()?
00464             if ( $i === false ) {
00465                 $this->mLastError = 'No working slave server: ' . $this->mLastError;
00466                 wfProfileOut( __METHOD__ );
00467 
00468                 return $this->reportConnectionError();
00469             }
00470         }
00471 
00472         # Now we have an explicit index into the servers array
00473         $conn = $this->openConnection( $i, $wiki );
00474         if ( !$conn ) {
00475             wfProfileOut( __METHOD__ );
00476 
00477             return $this->reportConnectionError();
00478         }
00479 
00480         wfProfileOut( __METHOD__ );
00481 
00482         return $conn;
00483     }
00484 
00493     public function reuseConnection( $conn ) {
00494         $serverIndex = $conn->getLBInfo( 'serverIndex' );
00495         $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
00496         if ( $serverIndex === null || $refCount === null ) {
00497             wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" );
00498 
00510             return;
00511         }
00512 
00513         $dbName = $conn->getDBname();
00514         $prefix = $conn->tablePrefix();
00515         if ( strval( $prefix ) !== '' ) {
00516             $wiki = "$dbName-$prefix";
00517         } else {
00518             $wiki = $dbName;
00519         }
00520         if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) {
00521             throw new MWException( __METHOD__ . ": connection not found, has " .
00522                 "the connection been freed already?" );
00523         }
00524         $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
00525         if ( $refCount <= 0 ) {
00526             $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn;
00527             unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] );
00528             wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" );
00529         } else {
00530             wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" );
00531         }
00532     }
00533 
00546     public function getConnectionRef( $db, $groups = array(), $wiki = false ) {
00547         return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) );
00548     }
00549 
00562     public function getLazyConnectionRef( $db, $groups = array(), $wiki = false ) {
00563         return new DBConnRef( $this, array( $db, $groups, $wiki ) );
00564     }
00565 
00580     function openConnection( $i, $wiki = false ) {
00581         wfProfileIn( __METHOD__ );
00582         if ( $wiki !== false ) {
00583             $conn = $this->openForeignConnection( $i, $wiki );
00584             wfProfileOut( __METHOD__ );
00585 
00586             return $conn;
00587         }
00588         if ( isset( $this->mConns['local'][$i][0] ) ) {
00589             $conn = $this->mConns['local'][$i][0];
00590         } else {
00591             $server = $this->mServers[$i];
00592             $server['serverIndex'] = $i;
00593             $conn = $this->reallyOpenConnection( $server, false );
00594             if ( $conn->isOpen() ) {
00595                 wfDebug( "Connected to database $i at {$this->mServers[$i]['host']}\n" );
00596                 $this->mConns['local'][$i][0] = $conn;
00597             } else {
00598                 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" );
00599                 $this->mErrorConnection = $conn;
00600                 $conn = false;
00601             }
00602         }
00603         wfProfileOut( __METHOD__ );
00604 
00605         return $conn;
00606     }
00607 
00626     function openForeignConnection( $i, $wiki ) {
00627         wfProfileIn( __METHOD__ );
00628         list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
00629         if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
00630             // Reuse an already-used connection
00631             $conn = $this->mConns['foreignUsed'][$i][$wiki];
00632             wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" );
00633         } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) {
00634             // Reuse a free connection for the same wiki
00635             $conn = $this->mConns['foreignFree'][$i][$wiki];
00636             unset( $this->mConns['foreignFree'][$i][$wiki] );
00637             $this->mConns['foreignUsed'][$i][$wiki] = $conn;
00638             wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" );
00639         } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
00640             // Reuse a connection from another wiki
00641             $conn = reset( $this->mConns['foreignFree'][$i] );
00642             $oldWiki = key( $this->mConns['foreignFree'][$i] );
00643 
00644             if ( !$conn->selectDB( $dbName ) ) {
00645                 $this->mLastError = "Error selecting database $dbName on server " .
00646                     $conn->getServer() . " from client host " . wfHostname() . "\n";
00647                 $this->mErrorConnection = $conn;
00648                 $conn = false;
00649             } else {
00650                 $conn->tablePrefix( $prefix );
00651                 unset( $this->mConns['foreignFree'][$i][$oldWiki] );
00652                 $this->mConns['foreignUsed'][$i][$wiki] = $conn;
00653                 wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" );
00654             }
00655         } else {
00656             // Open a new connection
00657             $server = $this->mServers[$i];
00658             $server['serverIndex'] = $i;
00659             $server['foreignPoolRefCount'] = 0;
00660             $server['foreign'] = true;
00661             $conn = $this->reallyOpenConnection( $server, $dbName );
00662             if ( !$conn->isOpen() ) {
00663                 wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" );
00664                 $this->mErrorConnection = $conn;
00665                 $conn = false;
00666             } else {
00667                 $conn->tablePrefix( $prefix );
00668                 $this->mConns['foreignUsed'][$i][$wiki] = $conn;
00669                 wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" );
00670             }
00671         }
00672 
00673         // Increment reference count
00674         if ( $conn ) {
00675             $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
00676             $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
00677         }
00678         wfProfileOut( __METHOD__ );
00679 
00680         return $conn;
00681     }
00682 
00690     function isOpen( $index ) {
00691         if ( !is_integer( $index ) ) {
00692             return false;
00693         }
00694 
00695         return (bool)$this->getAnyOpenConnection( $index );
00696     }
00697 
00708     function reallyOpenConnection( $server, $dbNameOverride = false ) {
00709         if ( !is_array( $server ) ) {
00710             throw new MWException( 'You must update your load-balancing configuration. ' .
00711                 'See DefaultSettings.php entry for $wgDBservers.' );
00712         }
00713 
00714         if ( $dbNameOverride !== false ) {
00715             $server['dbname'] = $dbNameOverride;
00716         }
00717 
00718         # Create object
00719         try {
00720             $db = DatabaseBase::factory( $server['type'], $server );
00721         } catch ( DBConnectionError $e ) {
00722             // FIXME: This is probably the ugliest thing I have ever done to
00723             // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
00724             $db = $e->db;
00725         }
00726 
00727         $db->setLBInfo( $server );
00728         if ( isset( $server['fakeSlaveLag'] ) ) {
00729             $db->setFakeSlaveLag( $server['fakeSlaveLag'] );
00730         }
00731         if ( isset( $server['fakeMaster'] ) ) {
00732             $db->setFakeMaster( true );
00733         }
00734 
00735         return $db;
00736     }
00737 
00742     private function reportConnectionError() {
00743         $conn = $this->mErrorConnection; // The connection which caused the error
00744 
00745         if ( !is_object( $conn ) ) {
00746             // No last connection, probably due to all servers being too busy
00747             wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}" );
00748 
00749             // If all servers were busy, mLastError will contain something sensible
00750             throw new DBConnectionError( null, $this->mLastError );
00751         } else {
00752             $server = $conn->getProperty( 'mServer' );
00753             wfLogDBError( "Connection error: {$this->mLastError} ({$server})" );
00754             $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); // throws DBConnectionError
00755         }
00756 
00757         return false; /* not reached */
00758     }
00759 
00763     function getWriterIndex() {
00764         return 0;
00765     }
00766 
00773     function haveIndex( $i ) {
00774         return array_key_exists( $i, $this->mServers );
00775     }
00776 
00783     function isNonZeroLoad( $i ) {
00784         return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
00785     }
00786 
00792     function getServerCount() {
00793         return count( $this->mServers );
00794     }
00795 
00802     function getServerName( $i ) {
00803         if ( isset( $this->mServers[$i]['hostName'] ) ) {
00804             return $this->mServers[$i]['hostName'];
00805         } elseif ( isset( $this->mServers[$i]['host'] ) ) {
00806             return $this->mServers[$i]['host'];
00807         } else {
00808             return '';
00809         }
00810     }
00811 
00817     function getServerInfo( $i ) {
00818         if ( isset( $this->mServers[$i] ) ) {
00819             return $this->mServers[$i];
00820         } else {
00821             return false;
00822         }
00823     }
00824 
00831     function setServerInfo( $i, $serverInfo ) {
00832         $this->mServers[$i] = $serverInfo;
00833     }
00834 
00839     function getMasterPos() {
00840         # If this entire request was served from a slave without opening a connection to the
00841         # master (however unlikely that may be), then we can fetch the position from the slave.
00842         $masterConn = $this->getAnyOpenConnection( 0 );
00843         if ( !$masterConn ) {
00844             $serverCount = count( $this->mServers );
00845             for ( $i = 1; $i < $serverCount; $i++ ) {
00846                 $conn = $this->getAnyOpenConnection( $i );
00847                 if ( $conn ) {
00848                     wfDebug( "Master pos fetched from slave\n" );
00849 
00850                     return $conn->getSlavePos();
00851                 }
00852             }
00853         } else {
00854             wfDebug( "Master pos fetched from master\n" );
00855 
00856             return $masterConn->getMasterPos();
00857         }
00858 
00859         return false;
00860     }
00861 
00865     function closeAll() {
00866         foreach ( $this->mConns as $conns2 ) {
00867             foreach ( $conns2 as $conns3 ) {
00869                 foreach ( $conns3 as $conn ) {
00870                     $conn->close();
00871                 }
00872             }
00873         }
00874         $this->mConns = array(
00875             'local' => array(),
00876             'foreignFree' => array(),
00877             'foreignUsed' => array(),
00878         );
00879     }
00880 
00887     function closeConnecton( $conn ) {
00888         wfDeprecated( __METHOD__, '1.18' );
00889         $this->closeConnection( $conn );
00890     }
00891 
00898     function closeConnection( $conn ) {
00899         $done = false;
00900         foreach ( $this->mConns as $i1 => $conns2 ) {
00901             foreach ( $conns2 as $i2 => $conns3 ) {
00902                 foreach ( $conns3 as $i3 => $candidateConn ) {
00903                     if ( $conn === $candidateConn ) {
00904                         $conn->close();
00905                         unset( $this->mConns[$i1][$i2][$i3] );
00906                         $done = true;
00907                         break;
00908                     }
00909                 }
00910             }
00911         }
00912         if ( !$done ) {
00913             $conn->close();
00914         }
00915     }
00916 
00920     function commitAll() {
00921         foreach ( $this->mConns as $conns2 ) {
00922             foreach ( $conns2 as $conns3 ) {
00924                 foreach ( $conns3 as $conn ) {
00925                     if ( $conn->trxLevel() ) {
00926                         $conn->commit( __METHOD__, 'flush' );
00927                     }
00928                 }
00929             }
00930         }
00931     }
00932 
00936     function commitMasterChanges() {
00937         // Always 0, but who knows.. :)
00938         $masterIndex = $this->getWriterIndex();
00939         foreach ( $this->mConns as $conns2 ) {
00940             if ( empty( $conns2[$masterIndex] ) ) {
00941                 continue;
00942             }
00944             foreach ( $conns2[$masterIndex] as $conn ) {
00945                 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
00946                     $conn->commit( __METHOD__, 'flush' );
00947                 }
00948             }
00949         }
00950     }
00951 
00956     function rollbackMasterChanges() {
00957         // Always 0, but who knows.. :)
00958         $masterIndex = $this->getWriterIndex();
00959         foreach ( $this->mConns as $conns2 ) {
00960             if ( empty( $conns2[$masterIndex] ) ) {
00961                 continue;
00962             }
00964             foreach ( $conns2[$masterIndex] as $conn ) {
00965                 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
00966                     $conn->rollback( __METHOD__, 'flush' );
00967                 }
00968             }
00969         }
00970     }
00971 
00978     function hasMasterChanges() {
00979         // Always 0, but who knows.. :)
00980         $masterIndex = $this->getWriterIndex();
00981         foreach ( $this->mConns as $conns2 ) {
00982             if ( empty( $conns2[$masterIndex] ) ) {
00983                 continue;
00984             }
00986             foreach ( $conns2[$masterIndex] as $conn ) {
00987                 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
00988                     return true;
00989                 }
00990             }
00991         }
00992         return false;
00993     }
00994 
00999     function waitTimeout( $value = null ) {
01000         return wfSetVar( $this->mWaitTimeout, $value );
01001     }
01002 
01006     function getLaggedSlaveMode() {
01007         return $this->mLaggedSlaveMode;
01008     }
01009 
01015     function allowLagged( $mode = null ) {
01016         if ( $mode === null ) {
01017             return $this->mAllowLagged;
01018         }
01019         $this->mAllowLagged = $mode;
01020 
01021         return $this->mAllowLagged;
01022     }
01023 
01027     function pingAll() {
01028         $success = true;
01029         foreach ( $this->mConns as $conns2 ) {
01030             foreach ( $conns2 as $conns3 ) {
01032                 foreach ( $conns3 as $conn ) {
01033                     if ( !$conn->ping() ) {
01034                         $success = false;
01035                     }
01036                 }
01037             }
01038         }
01039 
01040         return $success;
01041     }
01042 
01048     function forEachOpenConnection( $callback, $params = array() ) {
01049         foreach ( $this->mConns as $conns2 ) {
01050             foreach ( $conns2 as $conns3 ) {
01051                 foreach ( $conns3 as $conn ) {
01052                     $mergedParams = array_merge( array( $conn ), $params );
01053                     call_user_func_array( $callback, $mergedParams );
01054                 }
01055             }
01056         }
01057     }
01058 
01068     function getMaxLag( $wiki = false ) {
01069         $maxLag = -1;
01070         $host = '';
01071         $maxIndex = 0;
01072         if ( $this->getServerCount() > 1 ) { // no replication = no lag
01073             foreach ( $this->mServers as $i => $conn ) {
01074                 $conn = false;
01075                 if ( $wiki === false ) {
01076                     $conn = $this->getAnyOpenConnection( $i );
01077                 }
01078                 if ( !$conn ) {
01079                     $conn = $this->openConnection( $i, $wiki );
01080                 }
01081                 if ( !$conn ) {
01082                     continue;
01083                 }
01084                 $lag = $conn->getLag();
01085                 if ( $lag > $maxLag ) {
01086                     $maxLag = $lag;
01087                     $host = $this->mServers[$i]['host'];
01088                     $maxIndex = $i;
01089                 }
01090             }
01091         }
01092 
01093         return array( $host, $maxLag, $maxIndex );
01094     }
01095 
01103     function getLagTimes( $wiki = false ) {
01104         # Try process cache
01105         if ( isset( $this->mLagTimes ) ) {
01106             return $this->mLagTimes;
01107         }
01108         if ( $this->getServerCount() == 1 ) {
01109             # No replication
01110             $this->mLagTimes = array( 0 => 0 );
01111         } else {
01112             # Send the request to the load monitor
01113             $this->mLagTimes = $this->getLoadMonitor()->getLagTimes(
01114                 array_keys( $this->mServers ), $wiki );
01115         }
01116 
01117         return $this->mLagTimes;
01118     }
01119 
01134     function safeGetLag( $conn ) {
01135         if ( $this->getServerCount() == 1 ) {
01136             return 0;
01137         } else {
01138             return $conn->getLag();
01139         }
01140     }
01141 
01145     function clearLagTimeCache() {
01146         $this->mLagTimes = null;
01147     }
01148 }
01149 
01157 class DBConnRef implements IDatabase {
01159     protected $lb;
01160 
01162     protected $conn;
01163 
01165     protected $params;
01166 
01171     public function __construct( LoadBalancer $lb, $conn ) {
01172         $this->lb = $lb;
01173         if ( $conn instanceof DatabaseBase ) {
01174             $this->conn = $conn;
01175         } else {
01176             $this->params = $conn;
01177         }
01178     }
01179 
01180     public function __call( $name, $arguments ) {
01181         if ( $this->conn === null ) {
01182             list( $db, $groups, $wiki ) = $this->params;
01183             $this->conn = $this->lb->getConnection( $db, $groups, $wiki );
01184         }
01185 
01186         return call_user_func_array( array( $this->conn, $name ), $arguments );
01187     }
01188 
01189     function __destruct() {
01190         if ( $this->conn !== null ) {
01191             $this->lb->reuseConnection( $this->conn );
01192         }
01193     }
01194 }