MediaWiki
REL1_20
|
00001 <?php 00029 class SqlBagOStuff extends BagOStuff { 00033 var $lb; 00034 00038 var $db; 00039 var $serverInfo; 00040 var $lastExpireAll = 0; 00041 var $purgePeriod = 100; 00042 var $shards = 1; 00043 var $tableName = 'objectcache'; 00044 00045 protected $connFailureTime = 0; // UNIX timestamp 00046 protected $connFailureError; // exception 00047 00072 public function __construct( $params ) { 00073 if ( isset( $params['server'] ) ) { 00074 $this->serverInfo = $params['server']; 00075 $this->serverInfo['load'] = 1; 00076 } 00077 if ( isset( $params['purgePeriod'] ) ) { 00078 $this->purgePeriod = intval( $params['purgePeriod'] ); 00079 } 00080 if ( isset( $params['tableName'] ) ) { 00081 $this->tableName = $params['tableName']; 00082 } 00083 if ( isset( $params['shards'] ) ) { 00084 $this->shards = intval( $params['shards'] ); 00085 } 00086 } 00087 00091 protected function getDB() { 00092 global $wgDebugDBTransactions; 00093 00094 # Don't keep timing out trying to connect for each call if the DB is down 00095 if ( $this->connFailureError && ( time() - $this->connFailureTime ) < 60 ) { 00096 throw $this->connFailureError; 00097 } 00098 00099 if ( !isset( $this->db ) ) { 00100 # If server connection info was given, use that 00101 if ( $this->serverInfo ) { 00102 if ( $wgDebugDBTransactions ) { 00103 wfDebug( sprintf( "Using provided serverInfo for SqlBagOStuff\n" ) ); 00104 } 00105 $this->lb = new LoadBalancer( array( 00106 'servers' => array( $this->serverInfo ) ) ); 00107 $this->db = $this->lb->getConnection( DB_MASTER ); 00108 $this->db->clearFlag( DBO_TRX ); 00109 } else { 00110 /* 00111 * We must keep a separate connection to MySQL in order to avoid deadlocks 00112 * However, SQLite has an opposite behaviour. And PostgreSQL needs to know 00113 * if we are in transaction or no 00114 */ 00115 if ( wfGetDB( DB_MASTER )->getType() == 'mysql' ) { 00116 $this->lb = wfGetLBFactory()->newMainLB(); 00117 $this->db = $this->lb->getConnection( DB_MASTER ); 00118 $this->db->clearFlag( DBO_TRX ); // auto-commit mode 00119 } else { 00120 $this->db = wfGetDB( DB_MASTER ); 00121 } 00122 } 00123 if ( $wgDebugDBTransactions ) { 00124 wfDebug( sprintf( "Connection %s will be used for SqlBagOStuff\n", $this->db ) ); 00125 } 00126 } 00127 00128 return $this->db; 00129 } 00130 00136 protected function getTableByKey( $key ) { 00137 if ( $this->shards > 1 ) { 00138 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; 00139 return $this->getTableByShard( $hash % $this->shards ); 00140 } else { 00141 return $this->tableName; 00142 } 00143 } 00144 00150 protected function getTableByShard( $index ) { 00151 if ( $this->shards > 1 ) { 00152 $decimals = strlen( $this->shards - 1 ); 00153 return $this->tableName . 00154 sprintf( "%0{$decimals}d", $index ); 00155 } else { 00156 return $this->tableName; 00157 } 00158 } 00159 00164 public function get( $key ) { 00165 $values = $this->getMulti( array( $key ) ); 00166 return array_key_exists( $key, $values ) ? $values[$key] : false; 00167 } 00168 00173 public function getMulti( array $keys ) { 00174 $values = array(); // array of (key => value) 00175 00176 try { 00177 $db = $this->getDB(); 00178 $keysByTableName = array(); 00179 foreach ( $keys as $key ) { 00180 $tableName = $this->getTableByKey( $key ); 00181 if ( !isset( $keysByTableName[$tableName] ) ) { 00182 $keysByTableName[$tableName] = array(); 00183 } 00184 $keysByTableName[$tableName][] = $key; 00185 } 00186 00187 $this->garbageCollect(); // expire old entries if any 00188 00189 $dataRows = array(); 00190 foreach ( $keysByTableName as $tableName => $tableKeys ) { 00191 $res = $db->select( $tableName, 00192 array( 'keyname', 'value', 'exptime' ), 00193 array( 'keyname' => $tableKeys ), 00194 __METHOD__ ); 00195 foreach ( $res as $row ) { 00196 $dataRows[$row->keyname] = $row; 00197 } 00198 } 00199 00200 foreach ( $keys as $key ) { 00201 if ( isset( $dataRows[$key] ) ) { // HIT? 00202 $row = $dataRows[$key]; 00203 $this->debug( "get: retrieved data; expiry time is " . $row->exptime ); 00204 if ( $this->isExpired( $row->exptime ) ) { // MISS 00205 $this->debug( "get: key has expired, deleting" ); 00206 try { 00207 $db->begin( __METHOD__ ); 00208 # Put the expiry time in the WHERE condition to avoid deleting a 00209 # newly-inserted value 00210 $db->delete( $this->getTableByKey( $key ), 00211 array( 'keyname' => $key, 'exptime' => $row->exptime ), 00212 __METHOD__ ); 00213 $db->commit( __METHOD__ ); 00214 } catch ( DBQueryError $e ) { 00215 $this->handleWriteError( $e ); 00216 } 00217 $values[$key] = false; 00218 } else { // HIT 00219 $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) ); 00220 } 00221 } else { // MISS 00222 $values[$key] = false; 00223 $this->debug( 'get: no matching rows' ); 00224 } 00225 } 00226 } catch ( DBError $e ) { 00227 $this->handleReadError( $e ); 00228 }; 00229 00230 return $values; 00231 } 00232 00239 public function set( $key, $value, $exptime = 0 ) { 00240 try { 00241 $db = $this->getDB(); 00242 $exptime = intval( $exptime ); 00243 00244 if ( $exptime < 0 ) { 00245 $exptime = 0; 00246 } 00247 00248 if ( $exptime == 0 ) { 00249 $encExpiry = $this->getMaxDateTime(); 00250 } else { 00251 if ( $exptime < 3.16e8 ) { # ~10 years 00252 $exptime += time(); 00253 } 00254 00255 $encExpiry = $db->timestamp( $exptime ); 00256 } 00257 $db->begin( __METHOD__ ); 00258 // (bug 24425) use a replace if the db supports it instead of 00259 // delete/insert to avoid clashes with conflicting keynames 00260 $db->replace( 00261 $this->getTableByKey( $key ), 00262 array( 'keyname' ), 00263 array( 00264 'keyname' => $key, 00265 'value' => $db->encodeBlob( $this->serialize( $value ) ), 00266 'exptime' => $encExpiry 00267 ), __METHOD__ ); 00268 $db->commit( __METHOD__ ); 00269 } catch ( DBError $e ) { 00270 $this->handleWriteError( $e ); 00271 return false; 00272 } 00273 00274 return true; 00275 } 00276 00282 public function delete( $key, $time = 0 ) { 00283 try { 00284 $db = $this->getDB(); 00285 $db->begin( __METHOD__ ); 00286 $db->delete( 00287 $this->getTableByKey( $key ), 00288 array( 'keyname' => $key ), 00289 __METHOD__ ); 00290 $db->commit( __METHOD__ ); 00291 } catch ( DBError $e ) { 00292 $this->handleWriteError( $e ); 00293 return false; 00294 } 00295 00296 return true; 00297 } 00298 00304 public function incr( $key, $step = 1 ) { 00305 try { 00306 $db = $this->getDB(); 00307 $tableName = $this->getTableByKey( $key ); 00308 $step = intval( $step ); 00309 $db->begin( __METHOD__ ); 00310 $row = $db->selectRow( 00311 $tableName, 00312 array( 'value', 'exptime' ), 00313 array( 'keyname' => $key ), 00314 __METHOD__, 00315 array( 'FOR UPDATE' ) ); 00316 if ( $row === false ) { 00317 // Missing 00318 $db->commit( __METHOD__ ); 00319 00320 return null; 00321 } 00322 $db->delete( $tableName, array( 'keyname' => $key ), __METHOD__ ); 00323 if ( $this->isExpired( $row->exptime ) ) { 00324 // Expired, do not reinsert 00325 $db->commit( __METHOD__ ); 00326 00327 return null; 00328 } 00329 00330 $oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) ); 00331 $newValue = $oldValue + $step; 00332 $db->insert( $tableName, 00333 array( 00334 'keyname' => $key, 00335 'value' => $db->encodeBlob( $this->serialize( $newValue ) ), 00336 'exptime' => $row->exptime 00337 ), __METHOD__, 'IGNORE' ); 00338 00339 if ( $db->affectedRows() == 0 ) { 00340 // Race condition. See bug 28611 00341 $newValue = null; 00342 } 00343 $db->commit( __METHOD__ ); 00344 } catch ( DBError $e ) { 00345 $this->handleWriteError( $e ); 00346 return null; 00347 } 00348 00349 return $newValue; 00350 } 00351 00355 public function keys() { 00356 $result = array(); 00357 00358 try { 00359 $db = $this->getDB(); 00360 for ( $i = 0; $i < $this->shards; $i++ ) { 00361 $res = $db->select( $this->getTableByShard( $i ), 00362 array( 'keyname' ), false, __METHOD__ ); 00363 foreach ( $res as $row ) { 00364 $result[] = $row->keyname; 00365 } 00366 } 00367 } catch ( DBError $e ) { 00368 $this->handleReadError( $e ); 00369 } 00370 00371 return $result; 00372 } 00373 00378 protected function isExpired( $exptime ) { 00379 return $exptime != $this->getMaxDateTime() && wfTimestamp( TS_UNIX, $exptime ) < time(); 00380 } 00381 00385 protected function getMaxDateTime() { 00386 if ( time() > 0x7fffffff ) { 00387 return $this->getDB()->timestamp( 1 << 62 ); 00388 } else { 00389 return $this->getDB()->timestamp( 0x7fffffff ); 00390 } 00391 } 00392 00393 protected function garbageCollect() { 00394 if ( !$this->purgePeriod ) { 00395 // Disabled 00396 return; 00397 } 00398 // Only purge on one in every $this->purgePeriod requests. 00399 if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) { 00400 return; 00401 } 00402 $now = time(); 00403 // Avoid repeating the delete within a few seconds 00404 if ( $now > ( $this->lastExpireAll + 1 ) ) { 00405 $this->lastExpireAll = $now; 00406 $this->expireAll(); 00407 } 00408 } 00409 00410 public function expireAll() { 00411 $this->deleteObjectsExpiringBefore( wfTimestampNow() ); 00412 } 00413 00420 public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) { 00421 try { 00422 $db = $this->getDB(); 00423 $dbTimestamp = $db->timestamp( $timestamp ); 00424 $totalSeconds = false; 00425 $baseConds = array( 'exptime < ' . $db->addQuotes( $dbTimestamp ) ); 00426 for ( $i = 0; $i < $this->shards; $i++ ) { 00427 $maxExpTime = false; 00428 while ( true ) { 00429 $conds = $baseConds; 00430 if ( $maxExpTime !== false ) { 00431 $conds[] = 'exptime > ' . $db->addQuotes( $maxExpTime ); 00432 } 00433 $rows = $db->select( 00434 $this->getTableByShard( $i ), 00435 array( 'keyname', 'exptime' ), 00436 $conds, 00437 __METHOD__, 00438 array( 'LIMIT' => 100, 'ORDER BY' => 'exptime' ) ); 00439 if ( !$rows->numRows() ) { 00440 break; 00441 } 00442 $keys = array(); 00443 $row = $rows->current(); 00444 $minExpTime = $row->exptime; 00445 if ( $totalSeconds === false ) { 00446 $totalSeconds = wfTimestamp( TS_UNIX, $timestamp ) 00447 - wfTimestamp( TS_UNIX, $minExpTime ); 00448 } 00449 foreach ( $rows as $row ) { 00450 $keys[] = $row->keyname; 00451 $maxExpTime = $row->exptime; 00452 } 00453 00454 $db->begin( __METHOD__ ); 00455 $db->delete( 00456 $this->getTableByShard( $i ), 00457 array( 00458 'exptime >= ' . $db->addQuotes( $minExpTime ), 00459 'exptime < ' . $db->addQuotes( $dbTimestamp ), 00460 'keyname' => $keys 00461 ), 00462 __METHOD__ ); 00463 $db->commit( __METHOD__ ); 00464 00465 if ( $progressCallback ) { 00466 if ( intval( $totalSeconds ) === 0 ) { 00467 $percent = 0; 00468 } else { 00469 $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp ) 00470 - wfTimestamp( TS_UNIX, $maxExpTime ); 00471 if ( $remainingSeconds > $totalSeconds ) { 00472 $totalSeconds = $remainingSeconds; 00473 } 00474 $percent = ( $i + $remainingSeconds / $totalSeconds ) 00475 / $this->shards * 100; 00476 } 00477 call_user_func( $progressCallback, $percent ); 00478 } 00479 } 00480 } 00481 } catch ( DBError $e ) { 00482 $this->handleWriteError( $e ); 00483 return false; 00484 } 00485 00486 return true; 00487 } 00488 00489 public function deleteAll() { 00490 try { 00491 $db = $this->getDB(); 00492 for ( $i = 0; $i < $this->shards; $i++ ) { 00493 $db->begin( __METHOD__ ); 00494 $db->delete( $this->getTableByShard( $i ), '*', __METHOD__ ); 00495 $db->commit( __METHOD__ ); 00496 } 00497 } catch ( DBError $e ) { 00498 $this->handleWriteError( $e ); 00499 return false; 00500 } 00501 00502 return true; 00503 } 00504 00513 protected function serialize( &$data ) { 00514 $serial = serialize( $data ); 00515 00516 if ( function_exists( 'gzdeflate' ) ) { 00517 return gzdeflate( $serial ); 00518 } else { 00519 return $serial; 00520 } 00521 } 00522 00528 protected function unserialize( $serial ) { 00529 if ( function_exists( 'gzinflate' ) ) { 00530 wfSuppressWarnings(); 00531 $decomp = gzinflate( $serial ); 00532 wfRestoreWarnings(); 00533 00534 if ( false !== $decomp ) { 00535 $serial = $decomp; 00536 } 00537 } 00538 00539 $ret = unserialize( $serial ); 00540 00541 return $ret; 00542 } 00543 00547 protected function handleReadError( DBError $exception ) { 00548 if ( $exception instanceof DBConnectionError ) { 00549 $this->connFailureTime = time(); 00550 $this->connFailureError = $exception; 00551 } 00552 wfDebugLog( 'SQLBagOStuff', "DBError: {$exception->getMessage()}" ); 00553 if ( $this->db ) { 00554 wfDebug( __METHOD__ . ": ignoring query error\n" ); 00555 } else { 00556 wfDebug( __METHOD__ . ": ignoring connection error\n" ); 00557 } 00558 } 00559 00563 protected function handleWriteError( DBError $exception ) { 00564 if ( $exception instanceof DBConnectionError ) { 00565 $this->connFailureTime = time(); 00566 $this->connFailureError = $exception; 00567 } 00568 if ( $this->db && $this->db->wasReadOnlyError() ) { 00569 try { 00570 $this->db->rollback( __METHOD__ ); 00571 } catch ( DBError $e ) {} 00572 } 00573 wfDebugLog( 'SQLBagOStuff', "DBError: {$exception->getMessage()}" ); 00574 if ( $this->db ) { 00575 wfDebug( __METHOD__ . ": ignoring query error\n" ); 00576 } else { 00577 wfDebug( __METHOD__ . ": ignoring connection error\n" ); 00578 } 00579 } 00580 00584 public function createTables() { 00585 $db = $this->getDB(); 00586 if ( $db->getType() !== 'mysql' 00587 || version_compare( $db->getServerVersion(), '4.1.0', '<' ) ) 00588 { 00589 throw new MWException( __METHOD__ . ' is not supported on this DB server' ); 00590 } 00591 00592 for ( $i = 0; $i < $this->shards; $i++ ) { 00593 $db->begin( __METHOD__ ); 00594 $db->query( 00595 'CREATE TABLE ' . $db->tableName( $this->getTableByShard( $i ) ) . 00596 ' LIKE ' . $db->tableName( 'objectcache' ), 00597 __METHOD__ ); 00598 $db->commit( __METHOD__ ); 00599 } 00600 } 00601 } 00602 00606 class MediaWikiBagOStuff extends SqlBagOStuff { } 00607