MediaWiki
REL1_23
|
00001 <?php 00029 class SqlBagOStuff extends BagOStuff { 00033 var $lb; 00034 00035 var $serverInfos; 00036 var $serverNames; 00037 var $numServers; 00038 var $conns; 00039 var $lastExpireAll = 0; 00040 var $purgePeriod = 100; 00041 var $shards = 1; 00042 var $tableName = 'objectcache'; 00043 00044 protected $connFailureTimes = array(); // UNIX timestamps 00045 protected $connFailureErrors = array(); // exceptions 00046 00075 public function __construct( $params ) { 00076 if ( isset( $params['servers'] ) ) { 00077 $this->serverInfos = $params['servers']; 00078 $this->numServers = count( $this->serverInfos ); 00079 $this->serverNames = array(); 00080 foreach ( $this->serverInfos as $i => $info ) { 00081 $this->serverNames[$i] = isset( $info['host'] ) ? $info['host'] : "#$i"; 00082 } 00083 } elseif ( isset( $params['server'] ) ) { 00084 $this->serverInfos = array( $params['server'] ); 00085 $this->numServers = count( $this->serverInfos ); 00086 } else { 00087 $this->serverInfos = false; 00088 $this->numServers = 1; 00089 } 00090 if ( isset( $params['purgePeriod'] ) ) { 00091 $this->purgePeriod = intval( $params['purgePeriod'] ); 00092 } 00093 if ( isset( $params['tableName'] ) ) { 00094 $this->tableName = $params['tableName']; 00095 } 00096 if ( isset( $params['shards'] ) ) { 00097 $this->shards = intval( $params['shards'] ); 00098 } 00099 } 00100 00107 protected function getDB( $serverIndex ) { 00108 global $wgDebugDBTransactions; 00109 00110 if ( !isset( $this->conns[$serverIndex] ) ) { 00111 if ( $serverIndex >= $this->numServers ) { 00112 throw new MWException( __METHOD__ . ": Invalid server index \"$serverIndex\"" ); 00113 } 00114 00115 # Don't keep timing out trying to connect for each call if the DB is down 00116 if ( isset( $this->connFailureErrors[$serverIndex] ) 00117 && ( time() - $this->connFailureTimes[$serverIndex] ) < 60 00118 ) { 00119 throw $this->connFailureErrors[$serverIndex]; 00120 } 00121 00122 # If server connection info was given, use that 00123 if ( $this->serverInfos ) { 00124 if ( $wgDebugDBTransactions ) { 00125 wfDebug( "Using provided serverInfo for SqlBagOStuff\n" ); 00126 } 00127 $info = $this->serverInfos[$serverIndex]; 00128 $type = isset( $info['type'] ) ? $info['type'] : 'mysql'; 00129 $host = isset( $info['host'] ) ? $info['host'] : '[unknown]'; 00130 wfDebug( __CLASS__ . ": connecting to $host\n" ); 00131 $db = DatabaseBase::factory( $type, $info ); 00132 $db->clearFlag( DBO_TRX ); 00133 } else { 00134 /* 00135 * We must keep a separate connection to MySQL in order to avoid deadlocks 00136 * However, SQLite has an opposite behavior. And PostgreSQL needs to know 00137 * if we are in transaction or no 00138 */ 00139 if ( wfGetDB( DB_MASTER )->getType() == 'mysql' ) { 00140 $this->lb = wfGetLBFactory()->newMainLB(); 00141 $db = $this->lb->getConnection( DB_MASTER ); 00142 $db->clearFlag( DBO_TRX ); // auto-commit mode 00143 } else { 00144 $db = wfGetDB( DB_MASTER ); 00145 } 00146 } 00147 if ( $wgDebugDBTransactions ) { 00148 wfDebug( sprintf( "Connection %s will be used for SqlBagOStuff\n", $db ) ); 00149 } 00150 $this->conns[$serverIndex] = $db; 00151 } 00152 00153 return $this->conns[$serverIndex]; 00154 } 00155 00161 protected function getTableByKey( $key ) { 00162 if ( $this->shards > 1 ) { 00163 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; 00164 $tableIndex = $hash % $this->shards; 00165 } else { 00166 $tableIndex = 0; 00167 } 00168 if ( $this->numServers > 1 ) { 00169 $sortedServers = $this->serverNames; 00170 ArrayUtils::consistentHashSort( $sortedServers, $key ); 00171 reset( $sortedServers ); 00172 $serverIndex = key( $sortedServers ); 00173 } else { 00174 $serverIndex = 0; 00175 } 00176 return array( $serverIndex, $this->getTableNameByShard( $tableIndex ) ); 00177 } 00178 00184 protected function getTableNameByShard( $index ) { 00185 if ( $this->shards > 1 ) { 00186 $decimals = strlen( $this->shards - 1 ); 00187 return $this->tableName . 00188 sprintf( "%0{$decimals}d", $index ); 00189 } else { 00190 return $this->tableName; 00191 } 00192 } 00193 00199 public function get( $key, &$casToken = null ) { 00200 $values = $this->getMulti( array( $key ) ); 00201 if ( array_key_exists( $key, $values ) ) { 00202 $casToken = $values[$key]; 00203 return $values[$key]; 00204 } 00205 return false; 00206 } 00207 00212 public function getMulti( array $keys ) { 00213 $values = array(); // array of (key => value) 00214 00215 $keysByTable = array(); 00216 foreach ( $keys as $key ) { 00217 list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); 00218 $keysByTable[$serverIndex][$tableName][] = $key; 00219 } 00220 00221 $this->garbageCollect(); // expire old entries if any 00222 00223 $dataRows = array(); 00224 foreach ( $keysByTable as $serverIndex => $serverKeys ) { 00225 try { 00226 $db = $this->getDB( $serverIndex ); 00227 foreach ( $serverKeys as $tableName => $tableKeys ) { 00228 $res = $db->select( $tableName, 00229 array( 'keyname', 'value', 'exptime' ), 00230 array( 'keyname' => $tableKeys ), 00231 __METHOD__ ); 00232 if ( $res === false ) { 00233 continue; 00234 } 00235 foreach ( $res as $row ) { 00236 $row->serverIndex = $serverIndex; 00237 $row->tableName = $tableName; 00238 $dataRows[$row->keyname] = $row; 00239 } 00240 } 00241 } catch ( DBError $e ) { 00242 $this->handleReadError( $e, $serverIndex ); 00243 } 00244 } 00245 00246 foreach ( $keys as $key ) { 00247 if ( isset( $dataRows[$key] ) ) { // HIT? 00248 $row = $dataRows[$key]; 00249 $this->debug( "get: retrieved data; expiry time is " . $row->exptime ); 00250 try { 00251 $db = $this->getDB( $row->serverIndex ); 00252 if ( $this->isExpired( $db, $row->exptime ) ) { // MISS 00253 $this->debug( "get: key has expired, deleting" ); 00254 $db->commit( __METHOD__, 'flush' ); 00255 # Put the expiry time in the WHERE condition to avoid deleting a 00256 # newly-inserted value 00257 $db->delete( $row->tableName, 00258 array( 'keyname' => $key, 'exptime' => $row->exptime ), 00259 __METHOD__ ); 00260 $db->commit( __METHOD__, 'flush' ); 00261 $values[$key] = false; 00262 } else { // HIT 00263 $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) ); 00264 } 00265 } catch ( DBQueryError $e ) { 00266 $this->handleWriteError( $e, $row->serverIndex ); 00267 } 00268 } else { // MISS 00269 $values[$key] = false; 00270 $this->debug( 'get: no matching rows' ); 00271 } 00272 } 00273 00274 return $values; 00275 } 00276 00283 public function set( $key, $value, $exptime = 0 ) { 00284 list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); 00285 try { 00286 $db = $this->getDB( $serverIndex ); 00287 $exptime = intval( $exptime ); 00288 00289 if ( $exptime < 0 ) { 00290 $exptime = 0; 00291 } 00292 00293 if ( $exptime == 0 ) { 00294 $encExpiry = $this->getMaxDateTime( $db ); 00295 } else { 00296 if ( $exptime < 3.16e8 ) { # ~10 years 00297 $exptime += time(); 00298 } 00299 00300 $encExpiry = $db->timestamp( $exptime ); 00301 } 00302 $db->commit( __METHOD__, 'flush' ); 00303 // (bug 24425) use a replace if the db supports it instead of 00304 // delete/insert to avoid clashes with conflicting keynames 00305 $db->replace( 00306 $tableName, 00307 array( 'keyname' ), 00308 array( 00309 'keyname' => $key, 00310 'value' => $db->encodeBlob( $this->serialize( $value ) ), 00311 'exptime' => $encExpiry 00312 ), __METHOD__ ); 00313 $db->commit( __METHOD__, 'flush' ); 00314 } catch ( DBError $e ) { 00315 $this->handleWriteError( $e, $serverIndex ); 00316 return false; 00317 } 00318 00319 return true; 00320 } 00321 00329 public function cas( $casToken, $key, $value, $exptime = 0 ) { 00330 list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); 00331 try { 00332 $db = $this->getDB( $serverIndex ); 00333 $exptime = intval( $exptime ); 00334 00335 if ( $exptime < 0 ) { 00336 $exptime = 0; 00337 } 00338 00339 if ( $exptime == 0 ) { 00340 $encExpiry = $this->getMaxDateTime( $db ); 00341 } else { 00342 if ( $exptime < 3.16e8 ) { # ~10 years 00343 $exptime += time(); 00344 } 00345 $encExpiry = $db->timestamp( $exptime ); 00346 } 00347 $db->commit( __METHOD__, 'flush' ); 00348 // (bug 24425) use a replace if the db supports it instead of 00349 // delete/insert to avoid clashes with conflicting keynames 00350 $db->update( 00351 $tableName, 00352 array( 00353 'keyname' => $key, 00354 'value' => $db->encodeBlob( $this->serialize( $value ) ), 00355 'exptime' => $encExpiry 00356 ), 00357 array( 00358 'keyname' => $key, 00359 'value' => $db->encodeBlob( $this->serialize( $casToken ) ) 00360 ), 00361 __METHOD__ 00362 ); 00363 $db->commit( __METHOD__, 'flush' ); 00364 } catch ( DBQueryError $e ) { 00365 $this->handleWriteError( $e, $serverIndex ); 00366 00367 return false; 00368 } 00369 00370 return (bool)$db->affectedRows(); 00371 } 00372 00378 public function delete( $key, $time = 0 ) { 00379 list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); 00380 try { 00381 $db = $this->getDB( $serverIndex ); 00382 $db->commit( __METHOD__, 'flush' ); 00383 $db->delete( 00384 $tableName, 00385 array( 'keyname' => $key ), 00386 __METHOD__ ); 00387 $db->commit( __METHOD__, 'flush' ); 00388 } catch ( DBError $e ) { 00389 $this->handleWriteError( $e, $serverIndex ); 00390 return false; 00391 } 00392 00393 return true; 00394 } 00395 00401 public function incr( $key, $step = 1 ) { 00402 list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); 00403 try { 00404 $db = $this->getDB( $serverIndex ); 00405 $step = intval( $step ); 00406 $db->commit( __METHOD__, 'flush' ); 00407 $row = $db->selectRow( 00408 $tableName, 00409 array( 'value', 'exptime' ), 00410 array( 'keyname' => $key ), 00411 __METHOD__, 00412 array( 'FOR UPDATE' ) ); 00413 if ( $row === false ) { 00414 // Missing 00415 $db->commit( __METHOD__, 'flush' ); 00416 00417 return null; 00418 } 00419 $db->delete( $tableName, array( 'keyname' => $key ), __METHOD__ ); 00420 if ( $this->isExpired( $db, $row->exptime ) ) { 00421 // Expired, do not reinsert 00422 $db->commit( __METHOD__, 'flush' ); 00423 00424 return null; 00425 } 00426 00427 $oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) ); 00428 $newValue = $oldValue + $step; 00429 $db->insert( $tableName, 00430 array( 00431 'keyname' => $key, 00432 'value' => $db->encodeBlob( $this->serialize( $newValue ) ), 00433 'exptime' => $row->exptime 00434 ), __METHOD__, 'IGNORE' ); 00435 00436 if ( $db->affectedRows() == 0 ) { 00437 // Race condition. See bug 28611 00438 $newValue = null; 00439 } 00440 $db->commit( __METHOD__, 'flush' ); 00441 } catch ( DBError $e ) { 00442 $this->handleWriteError( $e, $serverIndex ); 00443 return null; 00444 } 00445 00446 return $newValue; 00447 } 00448 00453 protected function isExpired( $db, $exptime ) { 00454 return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time(); 00455 } 00456 00460 protected function getMaxDateTime( $db ) { 00461 if ( time() > 0x7fffffff ) { 00462 return $db->timestamp( 1 << 62 ); 00463 } else { 00464 return $db->timestamp( 0x7fffffff ); 00465 } 00466 } 00467 00468 protected function garbageCollect() { 00469 if ( !$this->purgePeriod ) { 00470 // Disabled 00471 return; 00472 } 00473 // Only purge on one in every $this->purgePeriod requests. 00474 if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) { 00475 return; 00476 } 00477 $now = time(); 00478 // Avoid repeating the delete within a few seconds 00479 if ( $now > ( $this->lastExpireAll + 1 ) ) { 00480 $this->lastExpireAll = $now; 00481 $this->expireAll(); 00482 } 00483 } 00484 00485 public function expireAll() { 00486 $this->deleteObjectsExpiringBefore( wfTimestampNow() ); 00487 } 00488 00495 public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) { 00496 for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { 00497 try { 00498 $db = $this->getDB( $serverIndex ); 00499 $dbTimestamp = $db->timestamp( $timestamp ); 00500 $totalSeconds = false; 00501 $baseConds = array( 'exptime < ' . $db->addQuotes( $dbTimestamp ) ); 00502 for ( $i = 0; $i < $this->shards; $i++ ) { 00503 $maxExpTime = false; 00504 while ( true ) { 00505 $conds = $baseConds; 00506 if ( $maxExpTime !== false ) { 00507 $conds[] = 'exptime > ' . $db->addQuotes( $maxExpTime ); 00508 } 00509 $rows = $db->select( 00510 $this->getTableNameByShard( $i ), 00511 array( 'keyname', 'exptime' ), 00512 $conds, 00513 __METHOD__, 00514 array( 'LIMIT' => 100, 'ORDER BY' => 'exptime' ) ); 00515 if ( $rows === false || !$rows->numRows() ) { 00516 break; 00517 } 00518 $keys = array(); 00519 $row = $rows->current(); 00520 $minExpTime = $row->exptime; 00521 if ( $totalSeconds === false ) { 00522 $totalSeconds = wfTimestamp( TS_UNIX, $timestamp ) 00523 - wfTimestamp( TS_UNIX, $minExpTime ); 00524 } 00525 foreach ( $rows as $row ) { 00526 $keys[] = $row->keyname; 00527 $maxExpTime = $row->exptime; 00528 } 00529 00530 $db->commit( __METHOD__, 'flush' ); 00531 $db->delete( 00532 $this->getTableNameByShard( $i ), 00533 array( 00534 'exptime >= ' . $db->addQuotes( $minExpTime ), 00535 'exptime < ' . $db->addQuotes( $dbTimestamp ), 00536 'keyname' => $keys 00537 ), 00538 __METHOD__ ); 00539 $db->commit( __METHOD__, 'flush' ); 00540 00541 if ( $progressCallback ) { 00542 if ( intval( $totalSeconds ) === 0 ) { 00543 $percent = 0; 00544 } else { 00545 $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp ) 00546 - wfTimestamp( TS_UNIX, $maxExpTime ); 00547 if ( $remainingSeconds > $totalSeconds ) { 00548 $totalSeconds = $remainingSeconds; 00549 } 00550 $percent = ( $i + $remainingSeconds / $totalSeconds ) 00551 / $this->shards * 100; 00552 } 00553 $percent = ( $percent / $this->numServers ) 00554 + ( $serverIndex / $this->numServers * 100 ); 00555 call_user_func( $progressCallback, $percent ); 00556 } 00557 } 00558 } 00559 } catch ( DBError $e ) { 00560 $this->handleWriteError( $e, $serverIndex ); 00561 return false; 00562 } 00563 } 00564 return true; 00565 } 00566 00567 public function deleteAll() { 00568 for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { 00569 try { 00570 $db = $this->getDB( $serverIndex ); 00571 for ( $i = 0; $i < $this->shards; $i++ ) { 00572 $db->commit( __METHOD__, 'flush' ); 00573 $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ ); 00574 $db->commit( __METHOD__, 'flush' ); 00575 } 00576 } catch ( DBError $e ) { 00577 $this->handleWriteError( $e, $serverIndex ); 00578 return false; 00579 } 00580 } 00581 return true; 00582 } 00583 00592 protected function serialize( &$data ) { 00593 $serial = serialize( $data ); 00594 00595 if ( function_exists( 'gzdeflate' ) ) { 00596 return gzdeflate( $serial ); 00597 } else { 00598 return $serial; 00599 } 00600 } 00601 00607 protected function unserialize( $serial ) { 00608 if ( function_exists( 'gzinflate' ) ) { 00609 wfSuppressWarnings(); 00610 $decomp = gzinflate( $serial ); 00611 wfRestoreWarnings(); 00612 00613 if ( false !== $decomp ) { 00614 $serial = $decomp; 00615 } 00616 } 00617 00618 $ret = unserialize( $serial ); 00619 00620 return $ret; 00621 } 00622 00626 protected function handleReadError( DBError $exception, $serverIndex ) { 00627 if ( $exception instanceof DBConnectionError ) { 00628 $this->markServerDown( $exception, $serverIndex ); 00629 } 00630 wfDebugLog( 'SQLBagOStuff', "DBError: {$exception->getMessage()}" ); 00631 if ( $exception instanceof DBConnectionError ) { 00632 $this->setLastError( BagOStuff::ERR_UNREACHABLE ); 00633 wfDebug( __METHOD__ . ": ignoring connection error\n" ); 00634 } else { 00635 $this->setLastError( BagOStuff::ERR_UNEXPECTED ); 00636 wfDebug( __METHOD__ . ": ignoring query error\n" ); 00637 } 00638 } 00639 00643 protected function handleWriteError( DBError $exception, $serverIndex ) { 00644 if ( $exception instanceof DBConnectionError ) { 00645 $this->markServerDown( $exception, $serverIndex ); 00646 } 00647 if ( $exception->db && $exception->db->wasReadOnlyError() ) { 00648 try { 00649 $exception->db->rollback( __METHOD__ ); 00650 } catch ( DBError $e ) {} 00651 } 00652 wfDebugLog( 'SQLBagOStuff', "DBError: {$exception->getMessage()}" ); 00653 if ( $exception instanceof DBConnectionError ) { 00654 $this->setLastError( BagOStuff::ERR_UNREACHABLE ); 00655 wfDebug( __METHOD__ . ": ignoring connection error\n" ); 00656 } else { 00657 $this->setLastError( BagOStuff::ERR_UNEXPECTED ); 00658 wfDebug( __METHOD__ . ": ignoring query error\n" ); 00659 } 00660 } 00661 00665 protected function markServerDown( $exception, $serverIndex ) { 00666 if ( isset( $this->connFailureTimes[$serverIndex] ) ) { 00667 if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) { 00668 unset( $this->connFailureTimes[$serverIndex] ); 00669 unset( $this->connFailureErrors[$serverIndex] ); 00670 } else { 00671 wfDebug( __METHOD__ . ": Server #$serverIndex already down\n" ); 00672 return; 00673 } 00674 } 00675 $now = time(); 00676 wfDebug( __METHOD__ . ": Server #$serverIndex down until " . ( $now + 60 ) . "\n" ); 00677 $this->connFailureTimes[$serverIndex] = $now; 00678 $this->connFailureErrors[$serverIndex] = $exception; 00679 } 00680 00684 public function createTables() { 00685 for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { 00686 $db = $this->getDB( $serverIndex ); 00687 if ( $db->getType() !== 'mysql' ) { 00688 throw new MWException( __METHOD__ . ' is not supported on this DB server' ); 00689 } 00690 00691 for ( $i = 0; $i < $this->shards; $i++ ) { 00692 $db->commit( __METHOD__, 'flush' ); 00693 $db->query( 00694 'CREATE TABLE ' . $db->tableName( $this->getTableNameByShard( $i ) ) . 00695 ' LIKE ' . $db->tableName( 'objectcache' ), 00696 __METHOD__ ); 00697 $db->commit( __METHOD__, 'flush' ); 00698 } 00699 } 00700 } 00701 } 00702 00706 class MediaWikiBagOStuff extends SqlBagOStuff { }