MediaWiki  REL1_24
SqlBagOStuff.php
Go to the documentation of this file.
00001 <?php
00029 class SqlBagOStuff extends BagOStuff {
00031     protected $lb;
00032 
00033     protected $serverInfos;
00034 
00036     protected $serverNames;
00037 
00039     protected $numServers;
00040 
00042     protected $conns;
00043 
00045     protected $lastExpireAll = 0;
00046 
00048     protected $purgePeriod = 100;
00049 
00051     protected $shards = 1;
00052 
00054     protected $tableName = 'objectcache';
00055 
00057     protected $connFailureTimes = array();
00058 
00060     protected $connFailureErrors = array();
00061 
00090     public function __construct( $params ) {
00091         if ( isset( $params['servers'] ) ) {
00092             $this->serverInfos = $params['servers'];
00093             $this->numServers = count( $this->serverInfos );
00094             $this->serverNames = array();
00095             foreach ( $this->serverInfos as $i => $info ) {
00096                 $this->serverNames[$i] = isset( $info['host'] ) ? $info['host'] : "#$i";
00097             }
00098         } elseif ( isset( $params['server'] ) ) {
00099             $this->serverInfos = array( $params['server'] );
00100             $this->numServers = count( $this->serverInfos );
00101         } else {
00102             $this->serverInfos = false;
00103             $this->numServers = 1;
00104         }
00105         if ( isset( $params['purgePeriod'] ) ) {
00106             $this->purgePeriod = intval( $params['purgePeriod'] );
00107         }
00108         if ( isset( $params['tableName'] ) ) {
00109             $this->tableName = $params['tableName'];
00110         }
00111         if ( isset( $params['shards'] ) ) {
00112             $this->shards = intval( $params['shards'] );
00113         }
00114     }
00115 
00122     protected function getDB( $serverIndex ) {
00123         global $wgDebugDBTransactions;
00124 
00125         if ( !isset( $this->conns[$serverIndex] ) ) {
00126             if ( $serverIndex >= $this->numServers ) {
00127                 throw new MWException( __METHOD__ . ": Invalid server index \"$serverIndex\"" );
00128             }
00129 
00130             # Don't keep timing out trying to connect for each call if the DB is down
00131             if ( isset( $this->connFailureErrors[$serverIndex] )
00132                 && ( time() - $this->connFailureTimes[$serverIndex] ) < 60
00133             ) {
00134                 throw $this->connFailureErrors[$serverIndex];
00135             }
00136 
00137             # If server connection info was given, use that
00138             if ( $this->serverInfos ) {
00139                 if ( $wgDebugDBTransactions ) {
00140                     wfDebug( "Using provided serverInfo for SqlBagOStuff\n" );
00141                 }
00142                 $info = $this->serverInfos[$serverIndex];
00143                 $type = isset( $info['type'] ) ? $info['type'] : 'mysql';
00144                 $host = isset( $info['host'] ) ? $info['host'] : '[unknown]';
00145                 wfDebug( __CLASS__ . ": connecting to $host\n" );
00146                 $db = DatabaseBase::factory( $type, $info );
00147                 $db->clearFlag( DBO_TRX );
00148             } else {
00149                 /*
00150                  * We must keep a separate connection to MySQL in order to avoid deadlocks
00151                  * However, SQLite has an opposite behavior. And PostgreSQL needs to know
00152                  * if we are in transaction or no
00153                  */
00154                 if ( wfGetDB( DB_MASTER )->getType() == 'mysql' ) {
00155                     $this->lb = wfGetLBFactory()->newMainLB();
00156                     $db = $this->lb->getConnection( DB_MASTER );
00157                     $db->clearFlag( DBO_TRX ); // auto-commit mode
00158                 } else {
00159                     $db = wfGetDB( DB_MASTER );
00160                 }
00161             }
00162             if ( $wgDebugDBTransactions ) {
00163                 wfDebug( sprintf( "Connection %s will be used for SqlBagOStuff\n", $db ) );
00164             }
00165             $this->conns[$serverIndex] = $db;
00166         }
00167 
00168         return $this->conns[$serverIndex];
00169     }
00170 
00176     protected function getTableByKey( $key ) {
00177         if ( $this->shards > 1 ) {
00178             $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
00179             $tableIndex = $hash % $this->shards;
00180         } else {
00181             $tableIndex = 0;
00182         }
00183         if ( $this->numServers > 1 ) {
00184             $sortedServers = $this->serverNames;
00185             ArrayUtils::consistentHashSort( $sortedServers, $key );
00186             reset( $sortedServers );
00187             $serverIndex = key( $sortedServers );
00188         } else {
00189             $serverIndex = 0;
00190         }
00191         return array( $serverIndex, $this->getTableNameByShard( $tableIndex ) );
00192     }
00193 
00199     protected function getTableNameByShard( $index ) {
00200         if ( $this->shards > 1 ) {
00201             $decimals = strlen( $this->shards - 1 );
00202             return $this->tableName .
00203                 sprintf( "%0{$decimals}d", $index );
00204         } else {
00205             return $this->tableName;
00206         }
00207     }
00208 
00214     public function get( $key, &$casToken = null ) {
00215         $values = $this->getMulti( array( $key ) );
00216         if ( array_key_exists( $key, $values ) ) {
00217             $casToken = $values[$key];
00218             return $values[$key];
00219         }
00220         return false;
00221     }
00222 
00227     public function getMulti( array $keys ) {
00228         $values = array(); // array of (key => value)
00229 
00230         $keysByTable = array();
00231         foreach ( $keys as $key ) {
00232             list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
00233             $keysByTable[$serverIndex][$tableName][] = $key;
00234         }
00235 
00236         $this->garbageCollect(); // expire old entries if any
00237 
00238         $dataRows = array();
00239         foreach ( $keysByTable as $serverIndex => $serverKeys ) {
00240             try {
00241                 $db = $this->getDB( $serverIndex );
00242                 foreach ( $serverKeys as $tableName => $tableKeys ) {
00243                     $res = $db->select( $tableName,
00244                         array( 'keyname', 'value', 'exptime' ),
00245                         array( 'keyname' => $tableKeys ),
00246                         __METHOD__,
00247                         // Approximate write-on-the-fly BagOStuff API via blocking.
00248                         // This approximation fails if a ROLLBACK happens (which is rare).
00249                         // We do not want to flush the TRX as that can break callers.
00250                         $db->trxLevel() ? array( 'LOCK IN SHARE MODE' ) : array()
00251                     );
00252                     if ( $res === false ) {
00253                         continue;
00254                     }
00255                     foreach ( $res as $row ) {
00256                         $row->serverIndex = $serverIndex;
00257                         $row->tableName = $tableName;
00258                         $dataRows[$row->keyname] = $row;
00259                     }
00260                 }
00261             } catch ( DBError $e ) {
00262                 $this->handleReadError( $e, $serverIndex );
00263             }
00264         }
00265 
00266         foreach ( $keys as $key ) {
00267             if ( isset( $dataRows[$key] ) ) { // HIT?
00268                 $row = $dataRows[$key];
00269                 $this->debug( "get: retrieved data; expiry time is " . $row->exptime );
00270                 try {
00271                     $db = $this->getDB( $row->serverIndex );
00272                     if ( $this->isExpired( $db, $row->exptime ) ) { // MISS
00273                         $this->debug( "get: key has expired, deleting" );
00274                         # Put the expiry time in the WHERE condition to avoid deleting a
00275                         # newly-inserted value
00276                         $db->delete( $row->tableName,
00277                             array( 'keyname' => $key, 'exptime' => $row->exptime ),
00278                             __METHOD__ );
00279                     } else { // HIT
00280                         $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) );
00281                     }
00282                 } catch ( DBQueryError $e ) {
00283                     $this->handleWriteError( $e, $row->serverIndex );
00284                 }
00285             } else { // MISS
00286                 $this->debug( 'get: no matching rows' );
00287             }
00288         }
00289 
00290         return $values;
00291     }
00292 
00298     public function setMulti( array $data, $expiry = 0 ) {
00299         $keysByTable = array();
00300         foreach ( $data as $key => $value ) {
00301             list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
00302             $keysByTable[$serverIndex][$tableName][] = $key;
00303         }
00304 
00305         $this->garbageCollect(); // expire old entries if any
00306 
00307         $result = true;
00308         $exptime = (int)$expiry;
00309         foreach ( $keysByTable as $serverIndex => $serverKeys ) {
00310             try {
00311                 $db = $this->getDB( $serverIndex );
00312             } catch ( DBError $e ) {
00313                 $this->handleWriteError( $e, $serverIndex );
00314                 $result = false;
00315                 continue;
00316             }
00317 
00318             if ( $exptime < 0 ) {
00319                 $exptime = 0;
00320             }
00321 
00322             if ( $exptime == 0 ) {
00323                 $encExpiry = $this->getMaxDateTime( $db );
00324             } else {
00325                 if ( $exptime < 3.16e8 ) { # ~10 years
00326                     $exptime += time();
00327                 }
00328                 $encExpiry = $db->timestamp( $exptime );
00329             }
00330             foreach ( $serverKeys as $tableName => $tableKeys ) {
00331                 $rows = array();
00332                 foreach ( $tableKeys as $key ) {
00333                     $rows[] = array(
00334                         'keyname' => $key,
00335                         'value' => $db->encodeBlob( $this->serialize( $data[$key] ) ),
00336                         'exptime' => $encExpiry,
00337                     );
00338                 }
00339 
00340                 try {
00341                     $db->replace(
00342                         $tableName,
00343                         array( 'keyname' ),
00344                         $rows,
00345                         __METHOD__
00346                     );
00347                 } catch ( DBError $e ) {
00348                     $this->handleWriteError( $e, $serverIndex );
00349                     $result = false;
00350                 }
00351 
00352             }
00353 
00354         }
00355 
00356         return $result;
00357     }
00358 
00359 
00360 
00367     public function set( $key, $value, $exptime = 0 ) {
00368         list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
00369         try {
00370             $db = $this->getDB( $serverIndex );
00371             $exptime = intval( $exptime );
00372 
00373             if ( $exptime < 0 ) {
00374                 $exptime = 0;
00375             }
00376 
00377             if ( $exptime == 0 ) {
00378                 $encExpiry = $this->getMaxDateTime( $db );
00379             } else {
00380                 if ( $exptime < 3.16e8 ) { # ~10 years
00381                     $exptime += time();
00382                 }
00383 
00384                 $encExpiry = $db->timestamp( $exptime );
00385             }
00386             // (bug 24425) use a replace if the db supports it instead of
00387             // delete/insert to avoid clashes with conflicting keynames
00388             $db->replace(
00389                 $tableName,
00390                 array( 'keyname' ),
00391                 array(
00392                     'keyname' => $key,
00393                     'value' => $db->encodeBlob( $this->serialize( $value ) ),
00394                     'exptime' => $encExpiry
00395                 ), __METHOD__ );
00396         } catch ( DBError $e ) {
00397             $this->handleWriteError( $e, $serverIndex );
00398             return false;
00399         }
00400 
00401         return true;
00402     }
00403 
00411     public function cas( $casToken, $key, $value, $exptime = 0 ) {
00412         list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
00413         try {
00414             $db = $this->getDB( $serverIndex );
00415             $exptime = intval( $exptime );
00416 
00417             if ( $exptime < 0 ) {
00418                 $exptime = 0;
00419             }
00420 
00421             if ( $exptime == 0 ) {
00422                 $encExpiry = $this->getMaxDateTime( $db );
00423             } else {
00424                 if ( $exptime < 3.16e8 ) { # ~10 years
00425                     $exptime += time();
00426                 }
00427                 $encExpiry = $db->timestamp( $exptime );
00428             }
00429             // (bug 24425) use a replace if the db supports it instead of
00430             // delete/insert to avoid clashes with conflicting keynames
00431             $db->update(
00432                 $tableName,
00433                 array(
00434                     'keyname' => $key,
00435                     'value' => $db->encodeBlob( $this->serialize( $value ) ),
00436                     'exptime' => $encExpiry
00437                 ),
00438                 array(
00439                     'keyname' => $key,
00440                     'value' => $db->encodeBlob( $this->serialize( $casToken ) )
00441                 ),
00442                 __METHOD__
00443             );
00444         } catch ( DBQueryError $e ) {
00445             $this->handleWriteError( $e, $serverIndex );
00446 
00447             return false;
00448         }
00449 
00450         return (bool)$db->affectedRows();
00451     }
00452 
00458     public function delete( $key, $time = 0 ) {
00459         list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
00460         try {
00461             $db = $this->getDB( $serverIndex );
00462             $db->delete(
00463                 $tableName,
00464                 array( 'keyname' => $key ),
00465                 __METHOD__ );
00466         } catch ( DBError $e ) {
00467             $this->handleWriteError( $e, $serverIndex );
00468             return false;
00469         }
00470 
00471         return true;
00472     }
00473 
00479     public function incr( $key, $step = 1 ) {
00480         list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
00481         try {
00482             $db = $this->getDB( $serverIndex );
00483             $step = intval( $step );
00484             $row = $db->selectRow(
00485                 $tableName,
00486                 array( 'value', 'exptime' ),
00487                 array( 'keyname' => $key ),
00488                 __METHOD__,
00489                 array( 'FOR UPDATE' ) );
00490             if ( $row === false ) {
00491                 // Missing
00492 
00493                 return null;
00494             }
00495             $db->delete( $tableName, array( 'keyname' => $key ), __METHOD__ );
00496             if ( $this->isExpired( $db, $row->exptime ) ) {
00497                 // Expired, do not reinsert
00498 
00499                 return null;
00500             }
00501 
00502             $oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) );
00503             $newValue = $oldValue + $step;
00504             $db->insert( $tableName,
00505                 array(
00506                     'keyname' => $key,
00507                     'value' => $db->encodeBlob( $this->serialize( $newValue ) ),
00508                     'exptime' => $row->exptime
00509                 ), __METHOD__, 'IGNORE' );
00510 
00511             if ( $db->affectedRows() == 0 ) {
00512                 // Race condition. See bug 28611
00513                 $newValue = null;
00514             }
00515         } catch ( DBError $e ) {
00516             $this->handleWriteError( $e, $serverIndex );
00517             return null;
00518         }
00519 
00520         return $newValue;
00521     }
00522 
00528     protected function isExpired( $db, $exptime ) {
00529         return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time();
00530     }
00531 
00536     protected function getMaxDateTime( $db ) {
00537         if ( time() > 0x7fffffff ) {
00538             return $db->timestamp( 1 << 62 );
00539         } else {
00540             return $db->timestamp( 0x7fffffff );
00541         }
00542     }
00543 
00544     protected function garbageCollect() {
00545         if ( !$this->purgePeriod ) {
00546             // Disabled
00547             return;
00548         }
00549         // Only purge on one in every $this->purgePeriod requests.
00550         if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) {
00551             return;
00552         }
00553         $now = time();
00554         // Avoid repeating the delete within a few seconds
00555         if ( $now > ( $this->lastExpireAll + 1 ) ) {
00556             $this->lastExpireAll = $now;
00557             $this->expireAll();
00558         }
00559     }
00560 
00561     public function expireAll() {
00562         $this->deleteObjectsExpiringBefore( wfTimestampNow() );
00563     }
00564 
00571     public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) {
00572         for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
00573             try {
00574                 $db = $this->getDB( $serverIndex );
00575                 $dbTimestamp = $db->timestamp( $timestamp );
00576                 $totalSeconds = false;
00577                 $baseConds = array( 'exptime < ' . $db->addQuotes( $dbTimestamp ) );
00578                 for ( $i = 0; $i < $this->shards; $i++ ) {
00579                     $maxExpTime = false;
00580                     while ( true ) {
00581                         $conds = $baseConds;
00582                         if ( $maxExpTime !== false ) {
00583                             $conds[] = 'exptime > ' . $db->addQuotes( $maxExpTime );
00584                         }
00585                         $rows = $db->select(
00586                             $this->getTableNameByShard( $i ),
00587                             array( 'keyname', 'exptime' ),
00588                             $conds,
00589                             __METHOD__,
00590                             array( 'LIMIT' => 100, 'ORDER BY' => 'exptime' ) );
00591                         if ( $rows === false || !$rows->numRows() ) {
00592                             break;
00593                         }
00594                         $keys = array();
00595                         $row = $rows->current();
00596                         $minExpTime = $row->exptime;
00597                         if ( $totalSeconds === false ) {
00598                             $totalSeconds = wfTimestamp( TS_UNIX, $timestamp )
00599                                 - wfTimestamp( TS_UNIX, $minExpTime );
00600                         }
00601                         foreach ( $rows as $row ) {
00602                             $keys[] = $row->keyname;
00603                             $maxExpTime = $row->exptime;
00604                         }
00605 
00606                         $db->delete(
00607                             $this->getTableNameByShard( $i ),
00608                             array(
00609                                 'exptime >= ' . $db->addQuotes( $minExpTime ),
00610                                 'exptime < ' . $db->addQuotes( $dbTimestamp ),
00611                                 'keyname' => $keys
00612                             ),
00613                             __METHOD__ );
00614 
00615                         if ( $progressCallback ) {
00616                             if ( intval( $totalSeconds ) === 0 ) {
00617                                 $percent = 0;
00618                             } else {
00619                                 $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp )
00620                                     - wfTimestamp( TS_UNIX, $maxExpTime );
00621                                 if ( $remainingSeconds > $totalSeconds ) {
00622                                     $totalSeconds = $remainingSeconds;
00623                                 }
00624                                 $percent = ( $i + $remainingSeconds / $totalSeconds )
00625                                     / $this->shards * 100;
00626                             }
00627                             $percent = ( $percent / $this->numServers )
00628                                 + ( $serverIndex / $this->numServers * 100 );
00629                             call_user_func( $progressCallback, $percent );
00630                         }
00631                     }
00632                 }
00633             } catch ( DBError $e ) {
00634                 $this->handleWriteError( $e, $serverIndex );
00635                 return false;
00636             }
00637         }
00638         return true;
00639     }
00640 
00641     public function deleteAll() {
00642         for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
00643             try {
00644                 $db = $this->getDB( $serverIndex );
00645                 for ( $i = 0; $i < $this->shards; $i++ ) {
00646                     $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
00647                 }
00648             } catch ( DBError $e ) {
00649                 $this->handleWriteError( $e, $serverIndex );
00650                 return false;
00651             }
00652         }
00653         return true;
00654     }
00655 
00664     protected function serialize( &$data ) {
00665         $serial = serialize( $data );
00666 
00667         if ( function_exists( 'gzdeflate' ) ) {
00668             return gzdeflate( $serial );
00669         } else {
00670             return $serial;
00671         }
00672     }
00673 
00679     protected function unserialize( $serial ) {
00680         if ( function_exists( 'gzinflate' ) ) {
00681             wfSuppressWarnings();
00682             $decomp = gzinflate( $serial );
00683             wfRestoreWarnings();
00684 
00685             if ( false !== $decomp ) {
00686                 $serial = $decomp;
00687             }
00688         }
00689 
00690         $ret = unserialize( $serial );
00691 
00692         return $ret;
00693     }
00694 
00701     protected function handleReadError( DBError $exception, $serverIndex ) {
00702         if ( $exception instanceof DBConnectionError ) {
00703             $this->markServerDown( $exception, $serverIndex );
00704         }
00705         wfDebugLog( 'SQLBagOStuff', "DBError: {$exception->getMessage()}" );
00706         if ( $exception instanceof DBConnectionError ) {
00707             $this->setLastError( BagOStuff::ERR_UNREACHABLE );
00708             wfDebug( __METHOD__ . ": ignoring connection error\n" );
00709         } else {
00710             $this->setLastError( BagOStuff::ERR_UNEXPECTED );
00711             wfDebug( __METHOD__ . ": ignoring query error\n" );
00712         }
00713     }
00714 
00721     protected function handleWriteError( DBError $exception, $serverIndex ) {
00722         if ( $exception instanceof DBConnectionError ) {
00723             $this->markServerDown( $exception, $serverIndex );
00724         }
00725         if ( $exception->db && $exception->db->wasReadOnlyError() ) {
00726             try {
00727                 $exception->db->rollback( __METHOD__ );
00728             } catch ( DBError $e ) {
00729             }
00730         }
00731         wfDebugLog( 'SQLBagOStuff', "DBError: {$exception->getMessage()}" );
00732         if ( $exception instanceof DBConnectionError ) {
00733             $this->setLastError( BagOStuff::ERR_UNREACHABLE );
00734             wfDebug( __METHOD__ . ": ignoring connection error\n" );
00735         } else {
00736             $this->setLastError( BagOStuff::ERR_UNEXPECTED );
00737             wfDebug( __METHOD__ . ": ignoring query error\n" );
00738         }
00739     }
00740 
00747     protected function markServerDown( $exception, $serverIndex ) {
00748         if ( isset( $this->connFailureTimes[$serverIndex] ) ) {
00749             if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) {
00750                 unset( $this->connFailureTimes[$serverIndex] );
00751                 unset( $this->connFailureErrors[$serverIndex] );
00752             } else {
00753                 wfDebug( __METHOD__ . ": Server #$serverIndex already down\n" );
00754                 return;
00755             }
00756         }
00757         $now = time();
00758         wfDebug( __METHOD__ . ": Server #$serverIndex down until " . ( $now + 60 ) . "\n" );
00759         $this->connFailureTimes[$serverIndex] = $now;
00760         $this->connFailureErrors[$serverIndex] = $exception;
00761     }
00762 
00766     public function createTables() {
00767         for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
00768             $db = $this->getDB( $serverIndex );
00769             if ( $db->getType() !== 'mysql' ) {
00770                 throw new MWException( __METHOD__ . ' is not supported on this DB server' );
00771             }
00772 
00773             for ( $i = 0; $i < $this->shards; $i++ ) {
00774                 $db->query(
00775                     'CREATE TABLE ' . $db->tableName( $this->getTableNameByShard( $i ) ) .
00776                     ' LIKE ' . $db->tableName( 'objectcache' ),
00777                     __METHOD__ );
00778             }
00779         }
00780     }
00781 }
00782 
00786 class MediaWikiBagOStuff extends SqlBagOStuff {
00787 }