[ Index ]

PHP Cross Reference of MediaWiki-1.24.0

title

Body

[close]

/includes/db/ -> LoadBalancer.php (source)

   1  <?php
   2  /**
   3   * Database load balancing.
   4   *
   5   * This program is free software; you can redistribute it and/or modify
   6   * it under the terms of the GNU General Public License as published by
   7   * the Free Software Foundation; either version 2 of the License, or
   8   * (at your option) any later version.
   9   *
  10   * This program is distributed in the hope that it will be useful,
  11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13   * GNU General Public License for more details.
  14   *
  15   * You should have received a copy of the GNU General Public License along
  16   * with this program; if not, write to the Free Software Foundation, Inc.,
  17   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18   * http://www.gnu.org/copyleft/gpl.html
  19   *
  20   * @file
  21   * @ingroup Database
  22   */
  23  
  24  /**
  25   * Database load balancing object
  26   *
  27   * @todo document
  28   * @ingroup Database
  29   */
  30  class LoadBalancer {
  31      /** @var array Map of (server index => server config array) */
  32      private $mServers;
  33      /** @var array Map of (local/foreignUsed/foreignFree => server index => DatabaseBase array) */
  34      private $mConns;
  35      /** @var array Map of (server index => weight) */
  36      private $mLoads;
  37      /** @var array Map of (group => server index => weight) */
  38      private $mGroupLoads;
  39      /** @var bool Whether to disregard slave lag as a factor in slave selection */
  40      private $mAllowLagged;
  41      /** @var integer Seconds to spend waiting on slave lag to resolve */
  42      private $mWaitTimeout;
  43  
  44      /** @var array LBFactory information */
  45      private $mParentInfo;
  46      /** @var string The LoadMonitor subclass name */
  47      private $mLoadMonitorClass;
  48      /** @var LoadMonitor */
  49      private $mLoadMonitor;
  50  
  51      /** @var bool|DatabaseBase Database connection that caused a problem */
  52      private $mErrorConnection;
  53      /** @var integer The generic (not query grouped) slave index (of $mServers) */
  54      private $mReadIndex;
  55      /** @var bool|DBMasterPos False if not set */
  56      private $mWaitForPos;
  57      /** @var bool Whether the generic reader fell back to a lagged slave */
  58      private $mLaggedSlaveMode;
  59      /** @var string The last DB selection or connection error */
  60      private $mLastError = 'Unknown error';
  61      /** @var array Process cache of LoadMonitor::getLagTimes() */
  62      private $mLagTimes;
  63  
  64      /**
  65       * @param array $params Array with keys:
  66       *   servers           Required. Array of server info structures.
  67       *   loadMonitor       Name of a class used to fetch server lag and load.
  68       * @throws MWException
  69       */
  70  	function __construct( $params ) {
  71          if ( !isset( $params['servers'] ) ) {
  72              throw new MWException( __CLASS__ . ': missing servers parameter' );
  73          }
  74          $this->mServers = $params['servers'];
  75          $this->mWaitTimeout = 10;
  76  
  77          $this->mReadIndex = -1;
  78          $this->mWriteIndex = -1;
  79          $this->mConns = array(
  80              'local' => array(),
  81              'foreignUsed' => array(),
  82              'foreignFree' => array() );
  83          $this->mLoads = array();
  84          $this->mWaitForPos = false;
  85          $this->mLaggedSlaveMode = false;
  86          $this->mErrorConnection = false;
  87          $this->mAllowLagged = false;
  88  
  89          if ( isset( $params['loadMonitor'] ) ) {
  90              $this->mLoadMonitorClass = $params['loadMonitor'];
  91          } else {
  92              $master = reset( $params['servers'] );
  93              if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) {
  94                  $this->mLoadMonitorClass = 'LoadMonitorMySQL';
  95              } else {
  96                  $this->mLoadMonitorClass = 'LoadMonitorNull';
  97              }
  98          }
  99  
 100          foreach ( $params['servers'] as $i => $server ) {
 101              $this->mLoads[$i] = $server['load'];
 102              if ( isset( $server['groupLoads'] ) ) {
 103                  foreach ( $server['groupLoads'] as $group => $ratio ) {
 104                      if ( !isset( $this->mGroupLoads[$group] ) ) {
 105                          $this->mGroupLoads[$group] = array();
 106                      }
 107                      $this->mGroupLoads[$group][$i] = $ratio;
 108                  }
 109              }
 110          }
 111      }
 112  
 113      /**
 114       * Get a LoadMonitor instance
 115       *
 116       * @return LoadMonitor
 117       */
 118  	function getLoadMonitor() {
 119          if ( !isset( $this->mLoadMonitor ) ) {
 120              $class = $this->mLoadMonitorClass;
 121              $this->mLoadMonitor = new $class( $this );
 122          }
 123  
 124          return $this->mLoadMonitor;
 125      }
 126  
 127      /**
 128       * Get or set arbitrary data used by the parent object, usually an LBFactory
 129       * @param mixed $x
 130       * @return mixed
 131       */
 132  	function parentInfo( $x = null ) {
 133          return wfSetVar( $this->mParentInfo, $x );
 134      }
 135  
 136      /**
 137       * Given an array of non-normalised probabilities, this function will select
 138       * an element and return the appropriate key
 139       *
 140       * @deprecated since 1.21, use ArrayUtils::pickRandom()
 141       *
 142       * @param array $weights
 143       * @return bool|int|string
 144       */
 145  	function pickRandom( $weights ) {
 146          return ArrayUtils::pickRandom( $weights );
 147      }
 148  
 149      /**
 150       * @param array $loads
 151       * @param bool|string $wiki Wiki to get non-lagged for
 152       * @return bool|int|string
 153       */
 154  	function getRandomNonLagged( $loads, $wiki = false ) {
 155          # Unset excessively lagged servers
 156          $lags = $this->getLagTimes( $wiki );
 157          foreach ( $lags as $i => $lag ) {
 158              if ( $i != 0 ) {
 159                  if ( $lag === false ) {
 160                      wfDebugLog( 'replication', "Server #$i is not replicating" );
 161                      unset( $loads[$i] );
 162                  } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) {
 163                      wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)" );
 164                      unset( $loads[$i] );
 165                  }
 166              }
 167          }
 168  
 169          # Find out if all the slaves with non-zero load are lagged
 170          $sum = 0;
 171          foreach ( $loads as $load ) {
 172              $sum += $load;
 173          }
 174          if ( $sum == 0 ) {
 175              # No appropriate DB servers except maybe the master and some slaves with zero load
 176              # Do NOT use the master
 177              # Instead, this function will return false, triggering read-only mode,
 178              # and a lagged slave will be used instead.
 179              return false;
 180          }
 181  
 182          if ( count( $loads ) == 0 ) {
 183              return false;
 184          }
 185  
 186          #wfDebugLog( 'connect', var_export( $loads, true ) );
 187  
 188          # Return a random representative of the remainder
 189          return ArrayUtils::pickRandom( $loads );
 190      }
 191  
 192      /**
 193       * Get the index of the reader connection, which may be a slave
 194       * This takes into account load ratios and lag times. It should
 195       * always return a consistent index during a given invocation
 196       *
 197       * Side effect: opens connections to databases
 198       * @param bool|string $group
 199       * @param bool|string $wiki
 200       * @throws MWException
 201       * @return bool|int|string
 202       */
 203  	function getReaderIndex( $group = false, $wiki = false ) {
 204          global $wgReadOnly, $wgDBtype;
 205  
 206          # @todo FIXME: For now, only go through all this for mysql databases
 207          if ( $wgDBtype != 'mysql' ) {
 208              return $this->getWriterIndex();
 209          }
 210  
 211          if ( count( $this->mServers ) == 1 ) {
 212              # Skip the load balancing if there's only one server
 213              return 0;
 214          } elseif ( $group === false && $this->mReadIndex >= 0 ) {
 215              # Shortcut if generic reader exists already
 216              return $this->mReadIndex;
 217          }
 218  
 219          $section = new ProfileSection( __METHOD__ );
 220  
 221          # Find the relevant load array
 222          if ( $group !== false ) {
 223              if ( isset( $this->mGroupLoads[$group] ) ) {
 224                  $nonErrorLoads = $this->mGroupLoads[$group];
 225              } else {
 226                  # No loads for this group, return false and the caller can use some other group
 227                  wfDebug( __METHOD__ . ": no loads for group $group\n" );
 228  
 229                  return false;
 230              }
 231          } else {
 232              $nonErrorLoads = $this->mLoads;
 233          }
 234  
 235          if ( !count( $nonErrorLoads ) ) {
 236              throw new MWException( "Empty server array given to LoadBalancer" );
 237          }
 238  
 239          # Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
 240          $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
 241  
 242          $laggedSlaveMode = false;
 243  
 244          # No server found yet
 245          $i = false;
 246          # First try quickly looking through the available servers for a server that
 247          # meets our criteria
 248          $currentLoads = $nonErrorLoads;
 249          while ( count( $currentLoads ) ) {
 250              if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) {
 251                  $i = ArrayUtils::pickRandom( $currentLoads );
 252              } else {
 253                  $i = $this->getRandomNonLagged( $currentLoads, $wiki );
 254                  if ( $i === false && count( $currentLoads ) != 0 ) {
 255                      # All slaves lagged. Switch to read-only mode
 256                      wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode" );
 257                      $wgReadOnly = 'The database has been automatically locked ' .
 258                          'while the slave database servers catch up to the master';
 259                      $i = ArrayUtils::pickRandom( $currentLoads );
 260                      $laggedSlaveMode = true;
 261                  }
 262              }
 263  
 264              if ( $i === false ) {
 265                  # pickRandom() returned false
 266                  # This is permanent and means the configuration or the load monitor
 267                  # wants us to return false.
 268                  wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" );
 269  
 270                  return false;
 271              }
 272  
 273              wfDebugLog( 'connect', __METHOD__ .
 274                  ": Using reader #$i: {$this->mServers[$i]['host']}..." );
 275  
 276              $conn = $this->openConnection( $i, $wiki );
 277              if ( !$conn ) {
 278                  wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" );
 279                  unset( $nonErrorLoads[$i] );
 280                  unset( $currentLoads[$i] );
 281                  $i = false;
 282                  continue;
 283              }
 284  
 285              // Decrement reference counter, we are finished with this connection.
 286              // It will be incremented for the caller later.
 287              if ( $wiki !== false ) {
 288                  $this->reuseConnection( $conn );
 289              }
 290  
 291              # Return this server
 292              break;
 293          }
 294  
 295          # If all servers were down, quit now
 296          if ( !count( $nonErrorLoads ) ) {
 297              wfDebugLog( 'connect', "All servers down" );
 298          }
 299  
 300          if ( $i !== false ) {
 301              # Slave connection successful
 302              # Wait for the session master pos for a short time
 303              if ( $this->mWaitForPos && $i > 0 ) {
 304                  if ( !$this->doWait( $i ) ) {
 305                      $this->mServers[$i]['slave pos'] = $conn->getSlavePos();
 306                  }
 307              }
 308              if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group !== false ) {
 309                  $this->mReadIndex = $i;
 310              }
 311          }
 312  
 313          return $i;
 314      }
 315  
 316      /**
 317       * Wait for a specified number of microseconds, and return the period waited
 318       * @param int $t
 319       * @return int
 320       */
 321  	function sleep( $t ) {
 322          wfProfileIn( __METHOD__ );
 323          wfDebug( __METHOD__ . ": waiting $t us\n" );
 324          usleep( $t );
 325          wfProfileOut( __METHOD__ );
 326  
 327          return $t;
 328      }
 329  
 330      /**
 331       * Set the master wait position
 332       * If a DB_SLAVE connection has been opened already, waits
 333       * Otherwise sets a variable telling it to wait if such a connection is opened
 334       * @param DBMasterPos $pos
 335       */
 336  	public function waitFor( $pos ) {
 337          wfProfileIn( __METHOD__ );
 338          $this->mWaitForPos = $pos;
 339          $i = $this->mReadIndex;
 340  
 341          if ( $i > 0 ) {
 342              if ( !$this->doWait( $i ) ) {
 343                  $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos();
 344                  $this->mLaggedSlaveMode = true;
 345              }
 346          }
 347          wfProfileOut( __METHOD__ );
 348      }
 349  
 350      /**
 351       * Set the master wait position and wait for ALL slaves to catch up to it
 352       * @param DBMasterPos $pos
 353       * @param int $timeout Max seconds to wait; default is mWaitTimeout
 354       * @return bool Success (able to connect and no timeouts reached)
 355       */
 356  	public function waitForAll( $pos, $timeout = null ) {
 357          wfProfileIn( __METHOD__ );
 358          $this->mWaitForPos = $pos;
 359          $serverCount = count( $this->mServers );
 360  
 361          $ok = true;
 362          for ( $i = 1; $i < $serverCount; $i++ ) {
 363              if ( $this->mLoads[$i] > 0 ) {
 364                  $ok = $this->doWait( $i, true, $timeout ) && $ok;
 365              }
 366          }
 367          wfProfileOut( __METHOD__ );
 368  
 369          return $ok;
 370      }
 371  
 372      /**
 373       * Get any open connection to a given server index, local or foreign
 374       * Returns false if there is no connection open
 375       *
 376       * @param int $i
 377       * @return DatabaseBase|bool False on failure
 378       */
 379  	function getAnyOpenConnection( $i ) {
 380          foreach ( $this->mConns as $conns ) {
 381              if ( !empty( $conns[$i] ) ) {
 382                  return reset( $conns[$i] );
 383              }
 384          }
 385  
 386          return false;
 387      }
 388  
 389      /**
 390       * Wait for a given slave to catch up to the master pos stored in $this
 391       * @param int $index Server index
 392       * @param bool $open Check the server even if a new connection has to be made
 393       * @param int $timeout Max seconds to wait; default is mWaitTimeout
 394       * @return bool
 395       */
 396  	protected function doWait( $index, $open = false, $timeout = null ) {
 397          # Find a connection to wait on
 398          $conn = $this->getAnyOpenConnection( $index );
 399          if ( !$conn ) {
 400              if ( !$open ) {
 401                  wfDebug( __METHOD__ . ": no connection open\n" );
 402  
 403                  return false;
 404              } else {
 405                  $conn = $this->openConnection( $index, '' );
 406                  if ( !$conn ) {
 407                      wfDebug( __METHOD__ . ": failed to open connection\n" );
 408  
 409                      return false;
 410                  }
 411              }
 412          }
 413  
 414          wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" );
 415          $timeout = $timeout ?: $this->mWaitTimeout;
 416          $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
 417  
 418          if ( $result == -1 || is_null( $result ) ) {
 419              # Timed out waiting for slave, use master instead
 420              wfDebug( __METHOD__ . ": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" );
 421  
 422              return false;
 423          } else {
 424              wfDebug( __METHOD__ . ": Done\n" );
 425  
 426              return true;
 427          }
 428      }
 429  
 430      /**
 431       * Get a connection by index
 432       * This is the main entry point for this class.
 433       *
 434       * @param int $i Server index
 435       * @param array $groups Query groups
 436       * @param bool|string $wiki Wiki ID
 437       *
 438       * @throws MWException
 439       * @return DatabaseBase
 440       */
 441      public function &getConnection( $i, $groups = array(), $wiki = false ) {
 442          wfProfileIn( __METHOD__ );
 443  
 444          if ( $i === null || $i === false ) {
 445              wfProfileOut( __METHOD__ );
 446              throw new MWException( 'Attempt to call ' . __METHOD__ .
 447                  ' with invalid server index' );
 448          }
 449  
 450          if ( $wiki === wfWikiID() ) {
 451              $wiki = false;
 452          }
 453  
 454          # Query groups
 455          if ( $i == DB_MASTER ) {
 456              $i = $this->getWriterIndex();
 457          } elseif ( !is_array( $groups ) ) {
 458              $groupIndex = $this->getReaderIndex( $groups, $wiki );
 459              if ( $groupIndex !== false ) {
 460                  $serverName = $this->getServerName( $groupIndex );
 461                  wfDebug( __METHOD__ . ": using server $serverName for group $groups\n" );
 462                  $i = $groupIndex;
 463              }
 464          } else {
 465              foreach ( $groups as $group ) {
 466                  $groupIndex = $this->getReaderIndex( $group, $wiki );
 467                  if ( $groupIndex !== false ) {
 468                      $serverName = $this->getServerName( $groupIndex );
 469                      wfDebug( __METHOD__ . ": using server $serverName for group $group\n" );
 470                      $i = $groupIndex;
 471                      break;
 472                  }
 473              }
 474          }
 475  
 476          # Operation-based index
 477          if ( $i == DB_SLAVE ) {
 478              $this->mLastError = 'Unknown error'; // reset error string
 479              $i = $this->getReaderIndex( false, $wiki );
 480              # Couldn't find a working server in getReaderIndex()?
 481              if ( $i === false ) {
 482                  $this->mLastError = 'No working slave server: ' . $this->mLastError;
 483                  wfProfileOut( __METHOD__ );
 484  
 485                  return $this->reportConnectionError();
 486              }
 487          }
 488  
 489          # Now we have an explicit index into the servers array
 490          $conn = $this->openConnection( $i, $wiki );
 491          if ( !$conn ) {
 492              wfProfileOut( __METHOD__ );
 493  
 494              return $this->reportConnectionError();
 495          }
 496  
 497          wfProfileOut( __METHOD__ );
 498  
 499          return $conn;
 500      }
 501  
 502      /**
 503       * Mark a foreign connection as being available for reuse under a different
 504       * DB name or prefix. This mechanism is reference-counted, and must be called
 505       * the same number of times as getConnection() to work.
 506       *
 507       * @param DatabaseBase $conn
 508       * @throws MWException
 509       */
 510  	public function reuseConnection( $conn ) {
 511          $serverIndex = $conn->getLBInfo( 'serverIndex' );
 512          $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
 513          if ( $serverIndex === null || $refCount === null ) {
 514              wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" );
 515  
 516              /**
 517               * This can happen in code like:
 518               *   foreach ( $dbs as $db ) {
 519               *     $conn = $lb->getConnection( DB_SLAVE, array(), $db );
 520               *     ...
 521               *     $lb->reuseConnection( $conn );
 522               *   }
 523               * When a connection to the local DB is opened in this way, reuseConnection()
 524               * should be ignored
 525               */
 526  
 527              return;
 528          }
 529  
 530          $dbName = $conn->getDBname();
 531          $prefix = $conn->tablePrefix();
 532          if ( strval( $prefix ) !== '' ) {
 533              $wiki = "$dbName-$prefix";
 534          } else {
 535              $wiki = $dbName;
 536          }
 537          if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) {
 538              throw new MWException( __METHOD__ . ": connection not found, has " .
 539                  "the connection been freed already?" );
 540          }
 541          $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
 542          if ( $refCount <= 0 ) {
 543              $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn;
 544              unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] );
 545              wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" );
 546          } else {
 547              wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" );
 548          }
 549      }
 550  
 551      /**
 552       * Get a database connection handle reference
 553       *
 554       * The handle's methods wrap simply wrap those of a DatabaseBase handle
 555       *
 556       * @see LoadBalancer::getConnection() for parameter information
 557       *
 558       * @param int $db
 559       * @param mixed $groups
 560       * @param bool|string $wiki
 561       * @return DBConnRef
 562       */
 563  	public function getConnectionRef( $db, $groups = array(), $wiki = false ) {
 564          return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) );
 565      }
 566  
 567      /**
 568       * Get a database connection handle reference without connecting yet
 569       *
 570       * The handle's methods wrap simply wrap those of a DatabaseBase handle
 571       *
 572       * @see LoadBalancer::getConnection() for parameter information
 573       *
 574       * @param int $db
 575       * @param mixed $groups
 576       * @param bool|string $wiki
 577       * @return DBConnRef
 578       */
 579  	public function getLazyConnectionRef( $db, $groups = array(), $wiki = false ) {
 580          return new DBConnRef( $this, array( $db, $groups, $wiki ) );
 581      }
 582  
 583      /**
 584       * Open a connection to the server given by the specified index
 585       * Index must be an actual index into the array.
 586       * If the server is already open, returns it.
 587       *
 588       * On error, returns false, and the connection which caused the
 589       * error will be available via $this->mErrorConnection.
 590       *
 591       * @param int $i Server index
 592       * @param bool|string $wiki Wiki ID to open
 593       * @return DatabaseBase
 594       *
 595       * @access private
 596       */
 597  	function openConnection( $i, $wiki = false ) {
 598          wfProfileIn( __METHOD__ );
 599          if ( $wiki !== false ) {
 600              $conn = $this->openForeignConnection( $i, $wiki );
 601              wfProfileOut( __METHOD__ );
 602  
 603              return $conn;
 604          }
 605          if ( isset( $this->mConns['local'][$i][0] ) ) {
 606              $conn = $this->mConns['local'][$i][0];
 607          } else {
 608              $server = $this->mServers[$i];
 609              $server['serverIndex'] = $i;
 610              $conn = $this->reallyOpenConnection( $server, false );
 611              if ( $conn->isOpen() ) {
 612                  wfDebug( "Connected to database $i at {$this->mServers[$i]['host']}\n" );
 613                  $this->mConns['local'][$i][0] = $conn;
 614              } else {
 615                  wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" );
 616                  $this->mErrorConnection = $conn;
 617                  $conn = false;
 618              }
 619          }
 620          wfProfileOut( __METHOD__ );
 621  
 622          return $conn;
 623      }
 624  
 625      /**
 626       * Open a connection to a foreign DB, or return one if it is already open.
 627       *
 628       * Increments a reference count on the returned connection which locks the
 629       * connection to the requested wiki. This reference count can be
 630       * decremented by calling reuseConnection().
 631       *
 632       * If a connection is open to the appropriate server already, but with the wrong
 633       * database, it will be switched to the right database and returned, as long as
 634       * it has been freed first with reuseConnection().
 635       *
 636       * On error, returns false, and the connection which caused the
 637       * error will be available via $this->mErrorConnection.
 638       *
 639       * @param int $i Server index
 640       * @param string $wiki Wiki ID to open
 641       * @return DatabaseBase
 642       */
 643  	function openForeignConnection( $i, $wiki ) {
 644          wfProfileIn( __METHOD__ );
 645          list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
 646          if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
 647              // Reuse an already-used connection
 648              $conn = $this->mConns['foreignUsed'][$i][$wiki];
 649              wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" );
 650          } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) {
 651              // Reuse a free connection for the same wiki
 652              $conn = $this->mConns['foreignFree'][$i][$wiki];
 653              unset( $this->mConns['foreignFree'][$i][$wiki] );
 654              $this->mConns['foreignUsed'][$i][$wiki] = $conn;
 655              wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" );
 656          } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
 657              // Reuse a connection from another wiki
 658              $conn = reset( $this->mConns['foreignFree'][$i] );
 659              $oldWiki = key( $this->mConns['foreignFree'][$i] );
 660  
 661              if ( !$conn->selectDB( $dbName ) ) {
 662                  $this->mLastError = "Error selecting database $dbName on server " .
 663                      $conn->getServer() . " from client host " . wfHostname() . "\n";
 664                  $this->mErrorConnection = $conn;
 665                  $conn = false;
 666              } else {
 667                  $conn->tablePrefix( $prefix );
 668                  unset( $this->mConns['foreignFree'][$i][$oldWiki] );
 669                  $this->mConns['foreignUsed'][$i][$wiki] = $conn;
 670                  wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" );
 671              }
 672          } else {
 673              // Open a new connection
 674              $server = $this->mServers[$i];
 675              $server['serverIndex'] = $i;
 676              $server['foreignPoolRefCount'] = 0;
 677              $server['foreign'] = true;
 678              $conn = $this->reallyOpenConnection( $server, $dbName );
 679              if ( !$conn->isOpen() ) {
 680                  wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" );
 681                  $this->mErrorConnection = $conn;
 682                  $conn = false;
 683              } else {
 684                  $conn->tablePrefix( $prefix );
 685                  $this->mConns['foreignUsed'][$i][$wiki] = $conn;
 686                  wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" );
 687              }
 688          }
 689  
 690          // Increment reference count
 691          if ( $conn ) {
 692              $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
 693              $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
 694          }
 695          wfProfileOut( __METHOD__ );
 696  
 697          return $conn;
 698      }
 699  
 700      /**
 701       * Test if the specified index represents an open connection
 702       *
 703       * @param int $index Server index
 704       * @access private
 705       * @return bool
 706       */
 707  	function isOpen( $index ) {
 708          if ( !is_integer( $index ) ) {
 709              return false;
 710          }
 711  
 712          return (bool)$this->getAnyOpenConnection( $index );
 713      }
 714  
 715      /**
 716       * Really opens a connection. Uncached.
 717       * Returns a Database object whether or not the connection was successful.
 718       * @access private
 719       *
 720       * @param array $server
 721       * @param bool $dbNameOverride
 722       * @throws MWException
 723       * @return DatabaseBase
 724       */
 725  	function reallyOpenConnection( $server, $dbNameOverride = false ) {
 726          if ( !is_array( $server ) ) {
 727              throw new MWException( 'You must update your load-balancing configuration. ' .
 728                  'See DefaultSettings.php entry for $wgDBservers.' );
 729          }
 730  
 731          if ( $dbNameOverride !== false ) {
 732              $server['dbname'] = $dbNameOverride;
 733          }
 734  
 735          # Create object
 736          try {
 737              $db = DatabaseBase::factory( $server['type'], $server );
 738          } catch ( DBConnectionError $e ) {
 739              // FIXME: This is probably the ugliest thing I have ever done to
 740              // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
 741              $db = $e->db;
 742          }
 743  
 744          $db->setLBInfo( $server );
 745          if ( isset( $server['fakeSlaveLag'] ) ) {
 746              $db->setFakeSlaveLag( $server['fakeSlaveLag'] );
 747          }
 748          if ( isset( $server['fakeMaster'] ) ) {
 749              $db->setFakeMaster( true );
 750          }
 751  
 752          return $db;
 753      }
 754  
 755      /**
 756       * @throws DBConnectionError
 757       * @return bool
 758       */
 759  	private function reportConnectionError() {
 760          $conn = $this->mErrorConnection; // The connection which caused the error
 761  
 762          if ( !is_object( $conn ) ) {
 763              // No last connection, probably due to all servers being too busy
 764              wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}" );
 765  
 766              // If all servers were busy, mLastError will contain something sensible
 767              throw new DBConnectionError( null, $this->mLastError );
 768          } else {
 769              $server = $conn->getProperty( 'mServer' );
 770              wfLogDBError( "Connection error: {$this->mLastError} ({$server})" );
 771              $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); // throws DBConnectionError
 772          }
 773  
 774          return false; /* not reached */
 775      }
 776  
 777      /**
 778       * @return int
 779       */
 780  	function getWriterIndex() {
 781          return 0;
 782      }
 783  
 784      /**
 785       * Returns true if the specified index is a valid server index
 786       *
 787       * @param string $i
 788       * @return bool
 789       */
 790  	function haveIndex( $i ) {
 791          return array_key_exists( $i, $this->mServers );
 792      }
 793  
 794      /**
 795       * Returns true if the specified index is valid and has non-zero load
 796       *
 797       * @param string $i
 798       * @return bool
 799       */
 800  	function isNonZeroLoad( $i ) {
 801          return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
 802      }
 803  
 804      /**
 805       * Get the number of defined servers (not the number of open connections)
 806       *
 807       * @return int
 808       */
 809  	function getServerCount() {
 810          return count( $this->mServers );
 811      }
 812  
 813      /**
 814       * Get the host name or IP address of the server with the specified index
 815       * Prefer a readable name if available.
 816       * @param string $i
 817       * @return string
 818       */
 819  	function getServerName( $i ) {
 820          if ( isset( $this->mServers[$i]['hostName'] ) ) {
 821              return $this->mServers[$i]['hostName'];
 822          } elseif ( isset( $this->mServers[$i]['host'] ) ) {
 823              return $this->mServers[$i]['host'];
 824          } else {
 825              return '';
 826          }
 827      }
 828  
 829      /**
 830       * Return the server info structure for a given index, or false if the index is invalid.
 831       * @param int $i
 832       * @return array|bool
 833       */
 834  	function getServerInfo( $i ) {
 835          if ( isset( $this->mServers[$i] ) ) {
 836              return $this->mServers[$i];
 837          } else {
 838              return false;
 839          }
 840      }
 841  
 842      /**
 843       * Sets the server info structure for the given index. Entry at index $i
 844       * is created if it doesn't exist
 845       * @param int $i
 846       * @param array $serverInfo
 847       */
 848  	function setServerInfo( $i, $serverInfo ) {
 849          $this->mServers[$i] = $serverInfo;
 850      }
 851  
 852      /**
 853       * Get the current master position for chronology control purposes
 854       * @return mixed
 855       */
 856  	function getMasterPos() {
 857          # If this entire request was served from a slave without opening a connection to the
 858          # master (however unlikely that may be), then we can fetch the position from the slave.
 859          $masterConn = $this->getAnyOpenConnection( 0 );
 860          if ( !$masterConn ) {
 861              $serverCount = count( $this->mServers );
 862              for ( $i = 1; $i < $serverCount; $i++ ) {
 863                  $conn = $this->getAnyOpenConnection( $i );
 864                  if ( $conn ) {
 865                      wfDebug( "Master pos fetched from slave\n" );
 866  
 867                      return $conn->getSlavePos();
 868                  }
 869              }
 870          } else {
 871              wfDebug( "Master pos fetched from master\n" );
 872  
 873              return $masterConn->getMasterPos();
 874          }
 875  
 876          return false;
 877      }
 878  
 879      /**
 880       * Close all open connections
 881       */
 882  	function closeAll() {
 883          foreach ( $this->mConns as $conns2 ) {
 884              foreach ( $conns2 as $conns3 ) {
 885                  /** @var DatabaseBase $conn */
 886                  foreach ( $conns3 as $conn ) {
 887                      $conn->close();
 888                  }
 889              }
 890          }
 891          $this->mConns = array(
 892              'local' => array(),
 893              'foreignFree' => array(),
 894              'foreignUsed' => array(),
 895          );
 896      }
 897  
 898      /**
 899       * Close a connection
 900       * Using this function makes sure the LoadBalancer knows the connection is closed.
 901       * If you use $conn->close() directly, the load balancer won't update its state.
 902       * @param DatabaseBase $conn
 903       */
 904  	function closeConnection( $conn ) {
 905          $done = false;
 906          foreach ( $this->mConns as $i1 => $conns2 ) {
 907              foreach ( $conns2 as $i2 => $conns3 ) {
 908                  foreach ( $conns3 as $i3 => $candidateConn ) {
 909                      if ( $conn === $candidateConn ) {
 910                          $conn->close();
 911                          unset( $this->mConns[$i1][$i2][$i3] );
 912                          $done = true;
 913                          break;
 914                      }
 915                  }
 916              }
 917          }
 918          if ( !$done ) {
 919              $conn->close();
 920          }
 921      }
 922  
 923      /**
 924       * Commit transactions on all open connections
 925       */
 926  	function commitAll() {
 927          foreach ( $this->mConns as $conns2 ) {
 928              foreach ( $conns2 as $conns3 ) {
 929                  /** @var DatabaseBase[] $conns3 */
 930                  foreach ( $conns3 as $conn ) {
 931                      if ( $conn->trxLevel() ) {
 932                          $conn->commit( __METHOD__, 'flush' );
 933                      }
 934                  }
 935              }
 936          }
 937      }
 938  
 939      /**
 940       *  Issue COMMIT only on master, only if queries were done on connection
 941       */
 942  	function commitMasterChanges() {
 943          // Always 0, but who knows.. :)
 944          $masterIndex = $this->getWriterIndex();
 945          foreach ( $this->mConns as $conns2 ) {
 946              if ( empty( $conns2[$masterIndex] ) ) {
 947                  continue;
 948              }
 949              /** @var DatabaseBase $conn */
 950              foreach ( $conns2[$masterIndex] as $conn ) {
 951                  if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
 952                      $conn->commit( __METHOD__, 'flush' );
 953                  }
 954              }
 955          }
 956      }
 957  
 958      /**
 959       * Issue ROLLBACK only on master, only if queries were done on connection
 960       * @since 1.23
 961       */
 962  	function rollbackMasterChanges() {
 963          // Always 0, but who knows.. :)
 964          $masterIndex = $this->getWriterIndex();
 965          foreach ( $this->mConns as $conns2 ) {
 966              if ( empty( $conns2[$masterIndex] ) ) {
 967                  continue;
 968              }
 969              /** @var DatabaseBase $conn */
 970              foreach ( $conns2[$masterIndex] as $conn ) {
 971                  if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
 972                      $conn->rollback( __METHOD__, 'flush' );
 973                  }
 974              }
 975          }
 976      }
 977  
 978      /**
 979       * @return bool Whether a master connection is already open
 980       * @since 1.24
 981       */
 982  	function hasMasterConnection() {
 983          return $this->isOpen( $this->getWriterIndex() );
 984      }
 985  
 986      /**
 987       * Determine if there are any pending changes that need to be rolled back
 988       * or committed.
 989       * @since 1.23
 990       * @return bool
 991       */
 992  	function hasMasterChanges() {
 993          // Always 0, but who knows.. :)
 994          $masterIndex = $this->getWriterIndex();
 995          foreach ( $this->mConns as $conns2 ) {
 996              if ( empty( $conns2[$masterIndex] ) ) {
 997                  continue;
 998              }
 999              /** @var DatabaseBase $conn */
1000              foreach ( $conns2[$masterIndex] as $conn ) {
1001                  if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
1002                      return true;
1003                  }
1004              }
1005          }
1006          return false;
1007      }
1008  
1009      /**
1010       * @param mixed $value
1011       * @return mixed
1012       */
1013  	function waitTimeout( $value = null ) {
1014          return wfSetVar( $this->mWaitTimeout, $value );
1015      }
1016  
1017      /**
1018       * @return bool
1019       */
1020  	function getLaggedSlaveMode() {
1021          return $this->mLaggedSlaveMode;
1022      }
1023  
1024      /**
1025       * Disables/enables lag checks
1026       * @param null|bool $mode
1027       * @return bool
1028       */
1029  	function allowLagged( $mode = null ) {
1030          if ( $mode === null ) {
1031              return $this->mAllowLagged;
1032          }
1033          $this->mAllowLagged = $mode;
1034  
1035          return $this->mAllowLagged;
1036      }
1037  
1038      /**
1039       * @return bool
1040       */
1041  	function pingAll() {
1042          $success = true;
1043          foreach ( $this->mConns as $conns2 ) {
1044              foreach ( $conns2 as $conns3 ) {
1045                  /** @var DatabaseBase[] $conns3 */
1046                  foreach ( $conns3 as $conn ) {
1047                      if ( !$conn->ping() ) {
1048                          $success = false;
1049                      }
1050                  }
1051              }
1052          }
1053  
1054          return $success;
1055      }
1056  
1057      /**
1058       * Call a function with each open connection object
1059       * @param callable $callback
1060       * @param array $params
1061       */
1062  	function forEachOpenConnection( $callback, $params = array() ) {
1063          foreach ( $this->mConns as $conns2 ) {
1064              foreach ( $conns2 as $conns3 ) {
1065                  foreach ( $conns3 as $conn ) {
1066                      $mergedParams = array_merge( array( $conn ), $params );
1067                      call_user_func_array( $callback, $mergedParams );
1068                  }
1069              }
1070          }
1071      }
1072  
1073      /**
1074       * Get the hostname and lag time of the most-lagged slave.
1075       * This is useful for maintenance scripts that need to throttle their updates.
1076       * May attempt to open connections to slaves on the default DB. If there is
1077       * no lag, the maximum lag will be reported as -1.
1078       *
1079       * @param bool|string $wiki Wiki ID, or false for the default database
1080       * @return array ( host, max lag, index of max lagged host )
1081       */
1082  	function getMaxLag( $wiki = false ) {
1083          $maxLag = -1;
1084          $host = '';
1085          $maxIndex = 0;
1086  
1087          if ( $this->getServerCount() <= 1 ) { // no replication = no lag
1088              return array( $host, $maxLag, $maxIndex );
1089          }
1090  
1091          // Try to get the max lag info from the server cache
1092          $key = 'loadbalancer:maxlag:cluster:' . $this->mServers[0]['host'];
1093          $cache = ObjectCache::newAccelerator( array(), 'hash' );
1094          $maxLagInfo = $cache->get( $key ); // (host, lag, index)
1095  
1096          // Fallback to connecting to each slave and getting the lag
1097          if ( !$maxLagInfo ) {
1098              foreach ( $this->mServers as $i => $conn ) {
1099                  if ( $i == $this->getWriterIndex() ) {
1100                      continue; // nothing to check
1101                  }
1102                  $conn = false;
1103                  if ( $wiki === false ) {
1104                      $conn = $this->getAnyOpenConnection( $i );
1105                  }
1106                  if ( !$conn ) {
1107                      $conn = $this->openConnection( $i, $wiki );
1108                  }
1109                  if ( !$conn ) {
1110                      continue;
1111                  }
1112                  $lag = $conn->getLag();
1113                  if ( $lag > $maxLag ) {
1114                      $maxLag = $lag;
1115                      $host = $this->mServers[$i]['host'];
1116                      $maxIndex = $i;
1117                  }
1118              }
1119              $maxLagInfo = array( $host, $maxLag, $maxIndex );
1120              $cache->set( $key, $maxLagInfo, 5 );
1121          }
1122  
1123          return $maxLagInfo;
1124      }
1125  
1126      /**
1127       * Get lag time for each server
1128       * Results are cached for a short time in memcached, and indefinitely in the process cache
1129       *
1130       * @param string|bool $wiki
1131       * @return array
1132       */
1133  	function getLagTimes( $wiki = false ) {
1134          # Try process cache
1135          if ( isset( $this->mLagTimes ) ) {
1136              return $this->mLagTimes;
1137          }
1138          if ( $this->getServerCount() == 1 ) {
1139              # No replication
1140              $this->mLagTimes = array( 0 => 0 );
1141          } else {
1142              # Send the request to the load monitor
1143              $this->mLagTimes = $this->getLoadMonitor()->getLagTimes(
1144                  array_keys( $this->mServers ), $wiki );
1145          }
1146  
1147          return $this->mLagTimes;
1148      }
1149  
1150      /**
1151       * Get the lag in seconds for a given connection, or zero if this load
1152       * balancer does not have replication enabled.
1153       *
1154       * This should be used in preference to Database::getLag() in cases where
1155       * replication may not be in use, since there is no way to determine if
1156       * replication is in use at the connection level without running
1157       * potentially restricted queries such as SHOW SLAVE STATUS. Using this
1158       * function instead of Database::getLag() avoids a fatal error in this
1159       * case on many installations.
1160       *
1161       * @param DatabaseBase $conn
1162       * @return int
1163       */
1164  	function safeGetLag( $conn ) {
1165          if ( $this->getServerCount() == 1 ) {
1166              return 0;
1167          } else {
1168              return $conn->getLag();
1169          }
1170      }
1171  
1172      /**
1173       * Clear the cache for getLagTimes
1174       */
1175  	function clearLagTimeCache() {
1176          $this->mLagTimes = null;
1177      }
1178  }
1179  
1180  /**
1181   * Helper class to handle automatically marking connectons as reusable (via RAII pattern)
1182   * as well handling deferring the actual network connection until the handle is used
1183   *
1184   * @ingroup Database
1185   * @since 1.22
1186   */
1187  class DBConnRef implements IDatabase {
1188      /** @var LoadBalancer */
1189      protected $lb;
1190  
1191      /** @var DatabaseBase|null */
1192      protected $conn;
1193  
1194      /** @var array|null */
1195      protected $params;
1196  
1197      /**
1198       * @param LoadBalancer $lb
1199       * @param DatabaseBase|array $conn Connection or (server index, group, wiki ID) array
1200       */
1201  	public function __construct( LoadBalancer $lb, $conn ) {
1202          $this->lb = $lb;
1203          if ( $conn instanceof DatabaseBase ) {
1204              $this->conn = $conn;
1205          } else {
1206              $this->params = $conn;
1207          }
1208      }
1209  
1210  	public function __call( $name, $arguments ) {
1211          if ( $this->conn === null ) {
1212              list( $db, $groups, $wiki ) = $this->params;
1213              $this->conn = $this->lb->getConnection( $db, $groups, $wiki );
1214          }
1215  
1216          return call_user_func_array( array( $this->conn, $name ), $arguments );
1217      }
1218  
1219  	function __destruct() {
1220          if ( $this->conn !== null ) {
1221              $this->lb->reuseConnection( $this->conn );
1222          }
1223      }
1224  }


Generated: Fri Nov 28 14:03:12 2014 Cross-referenced by PHPXref 0.7.1