[ Index ] |
PHP Cross Reference of MediaWiki-1.24.0 |
[Summary view] [Print] [Text view]
1 <?php 2 /** 3 * Database load balancing. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 * @ingroup Database 22 */ 23 24 /** 25 * Database load balancing object 26 * 27 * @todo document 28 * @ingroup Database 29 */ 30 class LoadBalancer { 31 /** @var array Map of (server index => server config array) */ 32 private $mServers; 33 /** @var array Map of (local/foreignUsed/foreignFree => server index => DatabaseBase array) */ 34 private $mConns; 35 /** @var array Map of (server index => weight) */ 36 private $mLoads; 37 /** @var array Map of (group => server index => weight) */ 38 private $mGroupLoads; 39 /** @var bool Whether to disregard slave lag as a factor in slave selection */ 40 private $mAllowLagged; 41 /** @var integer Seconds to spend waiting on slave lag to resolve */ 42 private $mWaitTimeout; 43 44 /** @var array LBFactory information */ 45 private $mParentInfo; 46 /** @var string The LoadMonitor subclass name */ 47 private $mLoadMonitorClass; 48 /** @var LoadMonitor */ 49 private $mLoadMonitor; 50 51 /** @var bool|DatabaseBase Database connection that caused a problem */ 52 private $mErrorConnection; 53 /** @var integer The generic (not query grouped) slave index (of $mServers) */ 54 private $mReadIndex; 55 /** @var bool|DBMasterPos False if not set */ 56 private $mWaitForPos; 57 /** @var bool Whether the generic reader fell back to a lagged slave */ 58 private $mLaggedSlaveMode; 59 /** @var string The last DB selection or connection error */ 60 private $mLastError = 'Unknown error'; 61 /** @var array Process cache of LoadMonitor::getLagTimes() */ 62 private $mLagTimes; 63 64 /** 65 * @param array $params Array with keys: 66 * servers Required. Array of server info structures. 67 * loadMonitor Name of a class used to fetch server lag and load. 68 * @throws MWException 69 */ 70 function __construct( $params ) { 71 if ( !isset( $params['servers'] ) ) { 72 throw new MWException( __CLASS__ . ': missing servers parameter' ); 73 } 74 $this->mServers = $params['servers']; 75 $this->mWaitTimeout = 10; 76 77 $this->mReadIndex = -1; 78 $this->mWriteIndex = -1; 79 $this->mConns = array( 80 'local' => array(), 81 'foreignUsed' => array(), 82 'foreignFree' => array() ); 83 $this->mLoads = array(); 84 $this->mWaitForPos = false; 85 $this->mLaggedSlaveMode = false; 86 $this->mErrorConnection = false; 87 $this->mAllowLagged = false; 88 89 if ( isset( $params['loadMonitor'] ) ) { 90 $this->mLoadMonitorClass = $params['loadMonitor']; 91 } else { 92 $master = reset( $params['servers'] ); 93 if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) { 94 $this->mLoadMonitorClass = 'LoadMonitorMySQL'; 95 } else { 96 $this->mLoadMonitorClass = 'LoadMonitorNull'; 97 } 98 } 99 100 foreach ( $params['servers'] as $i => $server ) { 101 $this->mLoads[$i] = $server['load']; 102 if ( isset( $server['groupLoads'] ) ) { 103 foreach ( $server['groupLoads'] as $group => $ratio ) { 104 if ( !isset( $this->mGroupLoads[$group] ) ) { 105 $this->mGroupLoads[$group] = array(); 106 } 107 $this->mGroupLoads[$group][$i] = $ratio; 108 } 109 } 110 } 111 } 112 113 /** 114 * Get a LoadMonitor instance 115 * 116 * @return LoadMonitor 117 */ 118 function getLoadMonitor() { 119 if ( !isset( $this->mLoadMonitor ) ) { 120 $class = $this->mLoadMonitorClass; 121 $this->mLoadMonitor = new $class( $this ); 122 } 123 124 return $this->mLoadMonitor; 125 } 126 127 /** 128 * Get or set arbitrary data used by the parent object, usually an LBFactory 129 * @param mixed $x 130 * @return mixed 131 */ 132 function parentInfo( $x = null ) { 133 return wfSetVar( $this->mParentInfo, $x ); 134 } 135 136 /** 137 * Given an array of non-normalised probabilities, this function will select 138 * an element and return the appropriate key 139 * 140 * @deprecated since 1.21, use ArrayUtils::pickRandom() 141 * 142 * @param array $weights 143 * @return bool|int|string 144 */ 145 function pickRandom( $weights ) { 146 return ArrayUtils::pickRandom( $weights ); 147 } 148 149 /** 150 * @param array $loads 151 * @param bool|string $wiki Wiki to get non-lagged for 152 * @return bool|int|string 153 */ 154 function getRandomNonLagged( $loads, $wiki = false ) { 155 # Unset excessively lagged servers 156 $lags = $this->getLagTimes( $wiki ); 157 foreach ( $lags as $i => $lag ) { 158 if ( $i != 0 ) { 159 if ( $lag === false ) { 160 wfDebugLog( 'replication', "Server #$i is not replicating" ); 161 unset( $loads[$i] ); 162 } elseif ( isset( $this->mServers[$i]['max lag'] ) && $lag > $this->mServers[$i]['max lag'] ) { 163 wfDebugLog( 'replication', "Server #$i is excessively lagged ($lag seconds)" ); 164 unset( $loads[$i] ); 165 } 166 } 167 } 168 169 # Find out if all the slaves with non-zero load are lagged 170 $sum = 0; 171 foreach ( $loads as $load ) { 172 $sum += $load; 173 } 174 if ( $sum == 0 ) { 175 # No appropriate DB servers except maybe the master and some slaves with zero load 176 # Do NOT use the master 177 # Instead, this function will return false, triggering read-only mode, 178 # and a lagged slave will be used instead. 179 return false; 180 } 181 182 if ( count( $loads ) == 0 ) { 183 return false; 184 } 185 186 #wfDebugLog( 'connect', var_export( $loads, true ) ); 187 188 # Return a random representative of the remainder 189 return ArrayUtils::pickRandom( $loads ); 190 } 191 192 /** 193 * Get the index of the reader connection, which may be a slave 194 * This takes into account load ratios and lag times. It should 195 * always return a consistent index during a given invocation 196 * 197 * Side effect: opens connections to databases 198 * @param bool|string $group 199 * @param bool|string $wiki 200 * @throws MWException 201 * @return bool|int|string 202 */ 203 function getReaderIndex( $group = false, $wiki = false ) { 204 global $wgReadOnly, $wgDBtype; 205 206 # @todo FIXME: For now, only go through all this for mysql databases 207 if ( $wgDBtype != 'mysql' ) { 208 return $this->getWriterIndex(); 209 } 210 211 if ( count( $this->mServers ) == 1 ) { 212 # Skip the load balancing if there's only one server 213 return 0; 214 } elseif ( $group === false && $this->mReadIndex >= 0 ) { 215 # Shortcut if generic reader exists already 216 return $this->mReadIndex; 217 } 218 219 $section = new ProfileSection( __METHOD__ ); 220 221 # Find the relevant load array 222 if ( $group !== false ) { 223 if ( isset( $this->mGroupLoads[$group] ) ) { 224 $nonErrorLoads = $this->mGroupLoads[$group]; 225 } else { 226 # No loads for this group, return false and the caller can use some other group 227 wfDebug( __METHOD__ . ": no loads for group $group\n" ); 228 229 return false; 230 } 231 } else { 232 $nonErrorLoads = $this->mLoads; 233 } 234 235 if ( !count( $nonErrorLoads ) ) { 236 throw new MWException( "Empty server array given to LoadBalancer" ); 237 } 238 239 # Scale the configured load ratios according to the dynamic load (if the load monitor supports it) 240 $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki ); 241 242 $laggedSlaveMode = false; 243 244 # No server found yet 245 $i = false; 246 # First try quickly looking through the available servers for a server that 247 # meets our criteria 248 $currentLoads = $nonErrorLoads; 249 while ( count( $currentLoads ) ) { 250 if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) { 251 $i = ArrayUtils::pickRandom( $currentLoads ); 252 } else { 253 $i = $this->getRandomNonLagged( $currentLoads, $wiki ); 254 if ( $i === false && count( $currentLoads ) != 0 ) { 255 # All slaves lagged. Switch to read-only mode 256 wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode" ); 257 $wgReadOnly = 'The database has been automatically locked ' . 258 'while the slave database servers catch up to the master'; 259 $i = ArrayUtils::pickRandom( $currentLoads ); 260 $laggedSlaveMode = true; 261 } 262 } 263 264 if ( $i === false ) { 265 # pickRandom() returned false 266 # This is permanent and means the configuration or the load monitor 267 # wants us to return false. 268 wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" ); 269 270 return false; 271 } 272 273 wfDebugLog( 'connect', __METHOD__ . 274 ": Using reader #$i: {$this->mServers[$i]['host']}..." ); 275 276 $conn = $this->openConnection( $i, $wiki ); 277 if ( !$conn ) { 278 wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" ); 279 unset( $nonErrorLoads[$i] ); 280 unset( $currentLoads[$i] ); 281 $i = false; 282 continue; 283 } 284 285 // Decrement reference counter, we are finished with this connection. 286 // It will be incremented for the caller later. 287 if ( $wiki !== false ) { 288 $this->reuseConnection( $conn ); 289 } 290 291 # Return this server 292 break; 293 } 294 295 # If all servers were down, quit now 296 if ( !count( $nonErrorLoads ) ) { 297 wfDebugLog( 'connect', "All servers down" ); 298 } 299 300 if ( $i !== false ) { 301 # Slave connection successful 302 # Wait for the session master pos for a short time 303 if ( $this->mWaitForPos && $i > 0 ) { 304 if ( !$this->doWait( $i ) ) { 305 $this->mServers[$i]['slave pos'] = $conn->getSlavePos(); 306 } 307 } 308 if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group !== false ) { 309 $this->mReadIndex = $i; 310 } 311 } 312 313 return $i; 314 } 315 316 /** 317 * Wait for a specified number of microseconds, and return the period waited 318 * @param int $t 319 * @return int 320 */ 321 function sleep( $t ) { 322 wfProfileIn( __METHOD__ ); 323 wfDebug( __METHOD__ . ": waiting $t us\n" ); 324 usleep( $t ); 325 wfProfileOut( __METHOD__ ); 326 327 return $t; 328 } 329 330 /** 331 * Set the master wait position 332 * If a DB_SLAVE connection has been opened already, waits 333 * Otherwise sets a variable telling it to wait if such a connection is opened 334 * @param DBMasterPos $pos 335 */ 336 public function waitFor( $pos ) { 337 wfProfileIn( __METHOD__ ); 338 $this->mWaitForPos = $pos; 339 $i = $this->mReadIndex; 340 341 if ( $i > 0 ) { 342 if ( !$this->doWait( $i ) ) { 343 $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos(); 344 $this->mLaggedSlaveMode = true; 345 } 346 } 347 wfProfileOut( __METHOD__ ); 348 } 349 350 /** 351 * Set the master wait position and wait for ALL slaves to catch up to it 352 * @param DBMasterPos $pos 353 * @param int $timeout Max seconds to wait; default is mWaitTimeout 354 * @return bool Success (able to connect and no timeouts reached) 355 */ 356 public function waitForAll( $pos, $timeout = null ) { 357 wfProfileIn( __METHOD__ ); 358 $this->mWaitForPos = $pos; 359 $serverCount = count( $this->mServers ); 360 361 $ok = true; 362 for ( $i = 1; $i < $serverCount; $i++ ) { 363 if ( $this->mLoads[$i] > 0 ) { 364 $ok = $this->doWait( $i, true, $timeout ) && $ok; 365 } 366 } 367 wfProfileOut( __METHOD__ ); 368 369 return $ok; 370 } 371 372 /** 373 * Get any open connection to a given server index, local or foreign 374 * Returns false if there is no connection open 375 * 376 * @param int $i 377 * @return DatabaseBase|bool False on failure 378 */ 379 function getAnyOpenConnection( $i ) { 380 foreach ( $this->mConns as $conns ) { 381 if ( !empty( $conns[$i] ) ) { 382 return reset( $conns[$i] ); 383 } 384 } 385 386 return false; 387 } 388 389 /** 390 * Wait for a given slave to catch up to the master pos stored in $this 391 * @param int $index Server index 392 * @param bool $open Check the server even if a new connection has to be made 393 * @param int $timeout Max seconds to wait; default is mWaitTimeout 394 * @return bool 395 */ 396 protected function doWait( $index, $open = false, $timeout = null ) { 397 # Find a connection to wait on 398 $conn = $this->getAnyOpenConnection( $index ); 399 if ( !$conn ) { 400 if ( !$open ) { 401 wfDebug( __METHOD__ . ": no connection open\n" ); 402 403 return false; 404 } else { 405 $conn = $this->openConnection( $index, '' ); 406 if ( !$conn ) { 407 wfDebug( __METHOD__ . ": failed to open connection\n" ); 408 409 return false; 410 } 411 } 412 } 413 414 wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" ); 415 $timeout = $timeout ?: $this->mWaitTimeout; 416 $result = $conn->masterPosWait( $this->mWaitForPos, $timeout ); 417 418 if ( $result == -1 || is_null( $result ) ) { 419 # Timed out waiting for slave, use master instead 420 wfDebug( __METHOD__ . ": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); 421 422 return false; 423 } else { 424 wfDebug( __METHOD__ . ": Done\n" ); 425 426 return true; 427 } 428 } 429 430 /** 431 * Get a connection by index 432 * This is the main entry point for this class. 433 * 434 * @param int $i Server index 435 * @param array $groups Query groups 436 * @param bool|string $wiki Wiki ID 437 * 438 * @throws MWException 439 * @return DatabaseBase 440 */ 441 public function &getConnection( $i, $groups = array(), $wiki = false ) { 442 wfProfileIn( __METHOD__ ); 443 444 if ( $i === null || $i === false ) { 445 wfProfileOut( __METHOD__ ); 446 throw new MWException( 'Attempt to call ' . __METHOD__ . 447 ' with invalid server index' ); 448 } 449 450 if ( $wiki === wfWikiID() ) { 451 $wiki = false; 452 } 453 454 # Query groups 455 if ( $i == DB_MASTER ) { 456 $i = $this->getWriterIndex(); 457 } elseif ( !is_array( $groups ) ) { 458 $groupIndex = $this->getReaderIndex( $groups, $wiki ); 459 if ( $groupIndex !== false ) { 460 $serverName = $this->getServerName( $groupIndex ); 461 wfDebug( __METHOD__ . ": using server $serverName for group $groups\n" ); 462 $i = $groupIndex; 463 } 464 } else { 465 foreach ( $groups as $group ) { 466 $groupIndex = $this->getReaderIndex( $group, $wiki ); 467 if ( $groupIndex !== false ) { 468 $serverName = $this->getServerName( $groupIndex ); 469 wfDebug( __METHOD__ . ": using server $serverName for group $group\n" ); 470 $i = $groupIndex; 471 break; 472 } 473 } 474 } 475 476 # Operation-based index 477 if ( $i == DB_SLAVE ) { 478 $this->mLastError = 'Unknown error'; // reset error string 479 $i = $this->getReaderIndex( false, $wiki ); 480 # Couldn't find a working server in getReaderIndex()? 481 if ( $i === false ) { 482 $this->mLastError = 'No working slave server: ' . $this->mLastError; 483 wfProfileOut( __METHOD__ ); 484 485 return $this->reportConnectionError(); 486 } 487 } 488 489 # Now we have an explicit index into the servers array 490 $conn = $this->openConnection( $i, $wiki ); 491 if ( !$conn ) { 492 wfProfileOut( __METHOD__ ); 493 494 return $this->reportConnectionError(); 495 } 496 497 wfProfileOut( __METHOD__ ); 498 499 return $conn; 500 } 501 502 /** 503 * Mark a foreign connection as being available for reuse under a different 504 * DB name or prefix. This mechanism is reference-counted, and must be called 505 * the same number of times as getConnection() to work. 506 * 507 * @param DatabaseBase $conn 508 * @throws MWException 509 */ 510 public function reuseConnection( $conn ) { 511 $serverIndex = $conn->getLBInfo( 'serverIndex' ); 512 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 513 if ( $serverIndex === null || $refCount === null ) { 514 wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" ); 515 516 /** 517 * This can happen in code like: 518 * foreach ( $dbs as $db ) { 519 * $conn = $lb->getConnection( DB_SLAVE, array(), $db ); 520 * ... 521 * $lb->reuseConnection( $conn ); 522 * } 523 * When a connection to the local DB is opened in this way, reuseConnection() 524 * should be ignored 525 */ 526 527 return; 528 } 529 530 $dbName = $conn->getDBname(); 531 $prefix = $conn->tablePrefix(); 532 if ( strval( $prefix ) !== '' ) { 533 $wiki = "$dbName-$prefix"; 534 } else { 535 $wiki = $dbName; 536 } 537 if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) { 538 throw new MWException( __METHOD__ . ": connection not found, has " . 539 "the connection been freed already?" ); 540 } 541 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); 542 if ( $refCount <= 0 ) { 543 $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn; 544 unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] ); 545 wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" ); 546 } else { 547 wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" ); 548 } 549 } 550 551 /** 552 * Get a database connection handle reference 553 * 554 * The handle's methods wrap simply wrap those of a DatabaseBase handle 555 * 556 * @see LoadBalancer::getConnection() for parameter information 557 * 558 * @param int $db 559 * @param mixed $groups 560 * @param bool|string $wiki 561 * @return DBConnRef 562 */ 563 public function getConnectionRef( $db, $groups = array(), $wiki = false ) { 564 return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) ); 565 } 566 567 /** 568 * Get a database connection handle reference without connecting yet 569 * 570 * The handle's methods wrap simply wrap those of a DatabaseBase handle 571 * 572 * @see LoadBalancer::getConnection() for parameter information 573 * 574 * @param int $db 575 * @param mixed $groups 576 * @param bool|string $wiki 577 * @return DBConnRef 578 */ 579 public function getLazyConnectionRef( $db, $groups = array(), $wiki = false ) { 580 return new DBConnRef( $this, array( $db, $groups, $wiki ) ); 581 } 582 583 /** 584 * Open a connection to the server given by the specified index 585 * Index must be an actual index into the array. 586 * If the server is already open, returns it. 587 * 588 * On error, returns false, and the connection which caused the 589 * error will be available via $this->mErrorConnection. 590 * 591 * @param int $i Server index 592 * @param bool|string $wiki Wiki ID to open 593 * @return DatabaseBase 594 * 595 * @access private 596 */ 597 function openConnection( $i, $wiki = false ) { 598 wfProfileIn( __METHOD__ ); 599 if ( $wiki !== false ) { 600 $conn = $this->openForeignConnection( $i, $wiki ); 601 wfProfileOut( __METHOD__ ); 602 603 return $conn; 604 } 605 if ( isset( $this->mConns['local'][$i][0] ) ) { 606 $conn = $this->mConns['local'][$i][0]; 607 } else { 608 $server = $this->mServers[$i]; 609 $server['serverIndex'] = $i; 610 $conn = $this->reallyOpenConnection( $server, false ); 611 if ( $conn->isOpen() ) { 612 wfDebug( "Connected to database $i at {$this->mServers[$i]['host']}\n" ); 613 $this->mConns['local'][$i][0] = $conn; 614 } else { 615 wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" ); 616 $this->mErrorConnection = $conn; 617 $conn = false; 618 } 619 } 620 wfProfileOut( __METHOD__ ); 621 622 return $conn; 623 } 624 625 /** 626 * Open a connection to a foreign DB, or return one if it is already open. 627 * 628 * Increments a reference count on the returned connection which locks the 629 * connection to the requested wiki. This reference count can be 630 * decremented by calling reuseConnection(). 631 * 632 * If a connection is open to the appropriate server already, but with the wrong 633 * database, it will be switched to the right database and returned, as long as 634 * it has been freed first with reuseConnection(). 635 * 636 * On error, returns false, and the connection which caused the 637 * error will be available via $this->mErrorConnection. 638 * 639 * @param int $i Server index 640 * @param string $wiki Wiki ID to open 641 * @return DatabaseBase 642 */ 643 function openForeignConnection( $i, $wiki ) { 644 wfProfileIn( __METHOD__ ); 645 list( $dbName, $prefix ) = wfSplitWikiID( $wiki ); 646 if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) { 647 // Reuse an already-used connection 648 $conn = $this->mConns['foreignUsed'][$i][$wiki]; 649 wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" ); 650 } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) { 651 // Reuse a free connection for the same wiki 652 $conn = $this->mConns['foreignFree'][$i][$wiki]; 653 unset( $this->mConns['foreignFree'][$i][$wiki] ); 654 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 655 wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" ); 656 } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) { 657 // Reuse a connection from another wiki 658 $conn = reset( $this->mConns['foreignFree'][$i] ); 659 $oldWiki = key( $this->mConns['foreignFree'][$i] ); 660 661 if ( !$conn->selectDB( $dbName ) ) { 662 $this->mLastError = "Error selecting database $dbName on server " . 663 $conn->getServer() . " from client host " . wfHostname() . "\n"; 664 $this->mErrorConnection = $conn; 665 $conn = false; 666 } else { 667 $conn->tablePrefix( $prefix ); 668 unset( $this->mConns['foreignFree'][$i][$oldWiki] ); 669 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 670 wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" ); 671 } 672 } else { 673 // Open a new connection 674 $server = $this->mServers[$i]; 675 $server['serverIndex'] = $i; 676 $server['foreignPoolRefCount'] = 0; 677 $server['foreign'] = true; 678 $conn = $this->reallyOpenConnection( $server, $dbName ); 679 if ( !$conn->isOpen() ) { 680 wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" ); 681 $this->mErrorConnection = $conn; 682 $conn = false; 683 } else { 684 $conn->tablePrefix( $prefix ); 685 $this->mConns['foreignUsed'][$i][$wiki] = $conn; 686 wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" ); 687 } 688 } 689 690 // Increment reference count 691 if ( $conn ) { 692 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); 693 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 ); 694 } 695 wfProfileOut( __METHOD__ ); 696 697 return $conn; 698 } 699 700 /** 701 * Test if the specified index represents an open connection 702 * 703 * @param int $index Server index 704 * @access private 705 * @return bool 706 */ 707 function isOpen( $index ) { 708 if ( !is_integer( $index ) ) { 709 return false; 710 } 711 712 return (bool)$this->getAnyOpenConnection( $index ); 713 } 714 715 /** 716 * Really opens a connection. Uncached. 717 * Returns a Database object whether or not the connection was successful. 718 * @access private 719 * 720 * @param array $server 721 * @param bool $dbNameOverride 722 * @throws MWException 723 * @return DatabaseBase 724 */ 725 function reallyOpenConnection( $server, $dbNameOverride = false ) { 726 if ( !is_array( $server ) ) { 727 throw new MWException( 'You must update your load-balancing configuration. ' . 728 'See DefaultSettings.php entry for $wgDBservers.' ); 729 } 730 731 if ( $dbNameOverride !== false ) { 732 $server['dbname'] = $dbNameOverride; 733 } 734 735 # Create object 736 try { 737 $db = DatabaseBase::factory( $server['type'], $server ); 738 } catch ( DBConnectionError $e ) { 739 // FIXME: This is probably the ugliest thing I have ever done to 740 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS 741 $db = $e->db; 742 } 743 744 $db->setLBInfo( $server ); 745 if ( isset( $server['fakeSlaveLag'] ) ) { 746 $db->setFakeSlaveLag( $server['fakeSlaveLag'] ); 747 } 748 if ( isset( $server['fakeMaster'] ) ) { 749 $db->setFakeMaster( true ); 750 } 751 752 return $db; 753 } 754 755 /** 756 * @throws DBConnectionError 757 * @return bool 758 */ 759 private function reportConnectionError() { 760 $conn = $this->mErrorConnection; // The connection which caused the error 761 762 if ( !is_object( $conn ) ) { 763 // No last connection, probably due to all servers being too busy 764 wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}" ); 765 766 // If all servers were busy, mLastError will contain something sensible 767 throw new DBConnectionError( null, $this->mLastError ); 768 } else { 769 $server = $conn->getProperty( 'mServer' ); 770 wfLogDBError( "Connection error: {$this->mLastError} ({$server})" ); 771 $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); // throws DBConnectionError 772 } 773 774 return false; /* not reached */ 775 } 776 777 /** 778 * @return int 779 */ 780 function getWriterIndex() { 781 return 0; 782 } 783 784 /** 785 * Returns true if the specified index is a valid server index 786 * 787 * @param string $i 788 * @return bool 789 */ 790 function haveIndex( $i ) { 791 return array_key_exists( $i, $this->mServers ); 792 } 793 794 /** 795 * Returns true if the specified index is valid and has non-zero load 796 * 797 * @param string $i 798 * @return bool 799 */ 800 function isNonZeroLoad( $i ) { 801 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0; 802 } 803 804 /** 805 * Get the number of defined servers (not the number of open connections) 806 * 807 * @return int 808 */ 809 function getServerCount() { 810 return count( $this->mServers ); 811 } 812 813 /** 814 * Get the host name or IP address of the server with the specified index 815 * Prefer a readable name if available. 816 * @param string $i 817 * @return string 818 */ 819 function getServerName( $i ) { 820 if ( isset( $this->mServers[$i]['hostName'] ) ) { 821 return $this->mServers[$i]['hostName']; 822 } elseif ( isset( $this->mServers[$i]['host'] ) ) { 823 return $this->mServers[$i]['host']; 824 } else { 825 return ''; 826 } 827 } 828 829 /** 830 * Return the server info structure for a given index, or false if the index is invalid. 831 * @param int $i 832 * @return array|bool 833 */ 834 function getServerInfo( $i ) { 835 if ( isset( $this->mServers[$i] ) ) { 836 return $this->mServers[$i]; 837 } else { 838 return false; 839 } 840 } 841 842 /** 843 * Sets the server info structure for the given index. Entry at index $i 844 * is created if it doesn't exist 845 * @param int $i 846 * @param array $serverInfo 847 */ 848 function setServerInfo( $i, $serverInfo ) { 849 $this->mServers[$i] = $serverInfo; 850 } 851 852 /** 853 * Get the current master position for chronology control purposes 854 * @return mixed 855 */ 856 function getMasterPos() { 857 # If this entire request was served from a slave without opening a connection to the 858 # master (however unlikely that may be), then we can fetch the position from the slave. 859 $masterConn = $this->getAnyOpenConnection( 0 ); 860 if ( !$masterConn ) { 861 $serverCount = count( $this->mServers ); 862 for ( $i = 1; $i < $serverCount; $i++ ) { 863 $conn = $this->getAnyOpenConnection( $i ); 864 if ( $conn ) { 865 wfDebug( "Master pos fetched from slave\n" ); 866 867 return $conn->getSlavePos(); 868 } 869 } 870 } else { 871 wfDebug( "Master pos fetched from master\n" ); 872 873 return $masterConn->getMasterPos(); 874 } 875 876 return false; 877 } 878 879 /** 880 * Close all open connections 881 */ 882 function closeAll() { 883 foreach ( $this->mConns as $conns2 ) { 884 foreach ( $conns2 as $conns3 ) { 885 /** @var DatabaseBase $conn */ 886 foreach ( $conns3 as $conn ) { 887 $conn->close(); 888 } 889 } 890 } 891 $this->mConns = array( 892 'local' => array(), 893 'foreignFree' => array(), 894 'foreignUsed' => array(), 895 ); 896 } 897 898 /** 899 * Close a connection 900 * Using this function makes sure the LoadBalancer knows the connection is closed. 901 * If you use $conn->close() directly, the load balancer won't update its state. 902 * @param DatabaseBase $conn 903 */ 904 function closeConnection( $conn ) { 905 $done = false; 906 foreach ( $this->mConns as $i1 => $conns2 ) { 907 foreach ( $conns2 as $i2 => $conns3 ) { 908 foreach ( $conns3 as $i3 => $candidateConn ) { 909 if ( $conn === $candidateConn ) { 910 $conn->close(); 911 unset( $this->mConns[$i1][$i2][$i3] ); 912 $done = true; 913 break; 914 } 915 } 916 } 917 } 918 if ( !$done ) { 919 $conn->close(); 920 } 921 } 922 923 /** 924 * Commit transactions on all open connections 925 */ 926 function commitAll() { 927 foreach ( $this->mConns as $conns2 ) { 928 foreach ( $conns2 as $conns3 ) { 929 /** @var DatabaseBase[] $conns3 */ 930 foreach ( $conns3 as $conn ) { 931 if ( $conn->trxLevel() ) { 932 $conn->commit( __METHOD__, 'flush' ); 933 } 934 } 935 } 936 } 937 } 938 939 /** 940 * Issue COMMIT only on master, only if queries were done on connection 941 */ 942 function commitMasterChanges() { 943 // Always 0, but who knows.. :) 944 $masterIndex = $this->getWriterIndex(); 945 foreach ( $this->mConns as $conns2 ) { 946 if ( empty( $conns2[$masterIndex] ) ) { 947 continue; 948 } 949 /** @var DatabaseBase $conn */ 950 foreach ( $conns2[$masterIndex] as $conn ) { 951 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 952 $conn->commit( __METHOD__, 'flush' ); 953 } 954 } 955 } 956 } 957 958 /** 959 * Issue ROLLBACK only on master, only if queries were done on connection 960 * @since 1.23 961 */ 962 function rollbackMasterChanges() { 963 // Always 0, but who knows.. :) 964 $masterIndex = $this->getWriterIndex(); 965 foreach ( $this->mConns as $conns2 ) { 966 if ( empty( $conns2[$masterIndex] ) ) { 967 continue; 968 } 969 /** @var DatabaseBase $conn */ 970 foreach ( $conns2[$masterIndex] as $conn ) { 971 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 972 $conn->rollback( __METHOD__, 'flush' ); 973 } 974 } 975 } 976 } 977 978 /** 979 * @return bool Whether a master connection is already open 980 * @since 1.24 981 */ 982 function hasMasterConnection() { 983 return $this->isOpen( $this->getWriterIndex() ); 984 } 985 986 /** 987 * Determine if there are any pending changes that need to be rolled back 988 * or committed. 989 * @since 1.23 990 * @return bool 991 */ 992 function hasMasterChanges() { 993 // Always 0, but who knows.. :) 994 $masterIndex = $this->getWriterIndex(); 995 foreach ( $this->mConns as $conns2 ) { 996 if ( empty( $conns2[$masterIndex] ) ) { 997 continue; 998 } 999 /** @var DatabaseBase $conn */ 1000 foreach ( $conns2[$masterIndex] as $conn ) { 1001 if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { 1002 return true; 1003 } 1004 } 1005 } 1006 return false; 1007 } 1008 1009 /** 1010 * @param mixed $value 1011 * @return mixed 1012 */ 1013 function waitTimeout( $value = null ) { 1014 return wfSetVar( $this->mWaitTimeout, $value ); 1015 } 1016 1017 /** 1018 * @return bool 1019 */ 1020 function getLaggedSlaveMode() { 1021 return $this->mLaggedSlaveMode; 1022 } 1023 1024 /** 1025 * Disables/enables lag checks 1026 * @param null|bool $mode 1027 * @return bool 1028 */ 1029 function allowLagged( $mode = null ) { 1030 if ( $mode === null ) { 1031 return $this->mAllowLagged; 1032 } 1033 $this->mAllowLagged = $mode; 1034 1035 return $this->mAllowLagged; 1036 } 1037 1038 /** 1039 * @return bool 1040 */ 1041 function pingAll() { 1042 $success = true; 1043 foreach ( $this->mConns as $conns2 ) { 1044 foreach ( $conns2 as $conns3 ) { 1045 /** @var DatabaseBase[] $conns3 */ 1046 foreach ( $conns3 as $conn ) { 1047 if ( !$conn->ping() ) { 1048 $success = false; 1049 } 1050 } 1051 } 1052 } 1053 1054 return $success; 1055 } 1056 1057 /** 1058 * Call a function with each open connection object 1059 * @param callable $callback 1060 * @param array $params 1061 */ 1062 function forEachOpenConnection( $callback, $params = array() ) { 1063 foreach ( $this->mConns as $conns2 ) { 1064 foreach ( $conns2 as $conns3 ) { 1065 foreach ( $conns3 as $conn ) { 1066 $mergedParams = array_merge( array( $conn ), $params ); 1067 call_user_func_array( $callback, $mergedParams ); 1068 } 1069 } 1070 } 1071 } 1072 1073 /** 1074 * Get the hostname and lag time of the most-lagged slave. 1075 * This is useful for maintenance scripts that need to throttle their updates. 1076 * May attempt to open connections to slaves on the default DB. If there is 1077 * no lag, the maximum lag will be reported as -1. 1078 * 1079 * @param bool|string $wiki Wiki ID, or false for the default database 1080 * @return array ( host, max lag, index of max lagged host ) 1081 */ 1082 function getMaxLag( $wiki = false ) { 1083 $maxLag = -1; 1084 $host = ''; 1085 $maxIndex = 0; 1086 1087 if ( $this->getServerCount() <= 1 ) { // no replication = no lag 1088 return array( $host, $maxLag, $maxIndex ); 1089 } 1090 1091 // Try to get the max lag info from the server cache 1092 $key = 'loadbalancer:maxlag:cluster:' . $this->mServers[0]['host']; 1093 $cache = ObjectCache::newAccelerator( array(), 'hash' ); 1094 $maxLagInfo = $cache->get( $key ); // (host, lag, index) 1095 1096 // Fallback to connecting to each slave and getting the lag 1097 if ( !$maxLagInfo ) { 1098 foreach ( $this->mServers as $i => $conn ) { 1099 if ( $i == $this->getWriterIndex() ) { 1100 continue; // nothing to check 1101 } 1102 $conn = false; 1103 if ( $wiki === false ) { 1104 $conn = $this->getAnyOpenConnection( $i ); 1105 } 1106 if ( !$conn ) { 1107 $conn = $this->openConnection( $i, $wiki ); 1108 } 1109 if ( !$conn ) { 1110 continue; 1111 } 1112 $lag = $conn->getLag(); 1113 if ( $lag > $maxLag ) { 1114 $maxLag = $lag; 1115 $host = $this->mServers[$i]['host']; 1116 $maxIndex = $i; 1117 } 1118 } 1119 $maxLagInfo = array( $host, $maxLag, $maxIndex ); 1120 $cache->set( $key, $maxLagInfo, 5 ); 1121 } 1122 1123 return $maxLagInfo; 1124 } 1125 1126 /** 1127 * Get lag time for each server 1128 * Results are cached for a short time in memcached, and indefinitely in the process cache 1129 * 1130 * @param string|bool $wiki 1131 * @return array 1132 */ 1133 function getLagTimes( $wiki = false ) { 1134 # Try process cache 1135 if ( isset( $this->mLagTimes ) ) { 1136 return $this->mLagTimes; 1137 } 1138 if ( $this->getServerCount() == 1 ) { 1139 # No replication 1140 $this->mLagTimes = array( 0 => 0 ); 1141 } else { 1142 # Send the request to the load monitor 1143 $this->mLagTimes = $this->getLoadMonitor()->getLagTimes( 1144 array_keys( $this->mServers ), $wiki ); 1145 } 1146 1147 return $this->mLagTimes; 1148 } 1149 1150 /** 1151 * Get the lag in seconds for a given connection, or zero if this load 1152 * balancer does not have replication enabled. 1153 * 1154 * This should be used in preference to Database::getLag() in cases where 1155 * replication may not be in use, since there is no way to determine if 1156 * replication is in use at the connection level without running 1157 * potentially restricted queries such as SHOW SLAVE STATUS. Using this 1158 * function instead of Database::getLag() avoids a fatal error in this 1159 * case on many installations. 1160 * 1161 * @param DatabaseBase $conn 1162 * @return int 1163 */ 1164 function safeGetLag( $conn ) { 1165 if ( $this->getServerCount() == 1 ) { 1166 return 0; 1167 } else { 1168 return $conn->getLag(); 1169 } 1170 } 1171 1172 /** 1173 * Clear the cache for getLagTimes 1174 */ 1175 function clearLagTimeCache() { 1176 $this->mLagTimes = null; 1177 } 1178 } 1179 1180 /** 1181 * Helper class to handle automatically marking connectons as reusable (via RAII pattern) 1182 * as well handling deferring the actual network connection until the handle is used 1183 * 1184 * @ingroup Database 1185 * @since 1.22 1186 */ 1187 class DBConnRef implements IDatabase { 1188 /** @var LoadBalancer */ 1189 protected $lb; 1190 1191 /** @var DatabaseBase|null */ 1192 protected $conn; 1193 1194 /** @var array|null */ 1195 protected $params; 1196 1197 /** 1198 * @param LoadBalancer $lb 1199 * @param DatabaseBase|array $conn Connection or (server index, group, wiki ID) array 1200 */ 1201 public function __construct( LoadBalancer $lb, $conn ) { 1202 $this->lb = $lb; 1203 if ( $conn instanceof DatabaseBase ) { 1204 $this->conn = $conn; 1205 } else { 1206 $this->params = $conn; 1207 } 1208 } 1209 1210 public function __call( $name, $arguments ) { 1211 if ( $this->conn === null ) { 1212 list( $db, $groups, $wiki ) = $this->params; 1213 $this->conn = $this->lb->getConnection( $db, $groups, $wiki ); 1214 } 1215 1216 return call_user_func_array( array( $this->conn, $name ), $arguments ); 1217 } 1218 1219 function __destruct() { 1220 if ( $this->conn !== null ) { 1221 $this->lb->reuseConnection( $this->conn ); 1222 } 1223 } 1224 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Fri Nov 28 14:03:12 2014 | Cross-referenced by PHPXref 0.7.1 |