MediaWiki  REL1_24
DatabasePostgres.php
Go to the documentation of this file.
00001 <?php
00024 class PostgresField implements Field {
00025     private $name, $tablename, $type, $nullable, $max_length, $deferred, $deferrable, $conname,
00026         $has_default, $default;
00027 
00034     static function fromText( $db, $table, $field ) {
00035         $q = <<<SQL
00036 SELECT
00037  attnotnull, attlen, conname AS conname,
00038  atthasdef,
00039  adsrc,
00040  COALESCE(condeferred, 'f') AS deferred,
00041  COALESCE(condeferrable, 'f') AS deferrable,
00042  CASE WHEN typname = 'int2' THEN 'smallint'
00043   WHEN typname = 'int4' THEN 'integer'
00044   WHEN typname = 'int8' THEN 'bigint'
00045   WHEN typname = 'bpchar' THEN 'char'
00046  ELSE typname END AS typname
00047 FROM pg_class c
00048 JOIN pg_namespace n ON (n.oid = c.relnamespace)
00049 JOIN pg_attribute a ON (a.attrelid = c.oid)
00050 JOIN pg_type t ON (t.oid = a.atttypid)
00051 LEFT JOIN pg_constraint o ON (o.conrelid = c.oid AND a.attnum = ANY(o.conkey) AND o.contype = 'f')
00052 LEFT JOIN pg_attrdef d on c.oid=d.adrelid and a.attnum=d.adnum
00053 WHERE relkind = 'r'
00054 AND nspname=%s
00055 AND relname=%s
00056 AND attname=%s;
00057 SQL;
00058 
00059         $table = $db->tableName( $table, 'raw' );
00060         $res = $db->query(
00061             sprintf( $q,
00062                 $db->addQuotes( $db->getCoreSchema() ),
00063                 $db->addQuotes( $table ),
00064                 $db->addQuotes( $field )
00065             )
00066         );
00067         $row = $db->fetchObject( $res );
00068         if ( !$row ) {
00069             return null;
00070         }
00071         $n = new PostgresField;
00072         $n->type = $row->typname;
00073         $n->nullable = ( $row->attnotnull == 'f' );
00074         $n->name = $field;
00075         $n->tablename = $table;
00076         $n->max_length = $row->attlen;
00077         $n->deferrable = ( $row->deferrable == 't' );
00078         $n->deferred = ( $row->deferred == 't' );
00079         $n->conname = $row->conname;
00080         $n->has_default = ( $row->atthasdef === 't' );
00081         $n->default = $row->adsrc;
00082 
00083         return $n;
00084     }
00085 
00086     function name() {
00087         return $this->name;
00088     }
00089 
00090     function tableName() {
00091         return $this->tablename;
00092     }
00093 
00094     function type() {
00095         return $this->type;
00096     }
00097 
00098     function isNullable() {
00099         return $this->nullable;
00100     }
00101 
00102     function maxLength() {
00103         return $this->max_length;
00104     }
00105 
00106     function is_deferrable() {
00107         return $this->deferrable;
00108     }
00109 
00110     function is_deferred() {
00111         return $this->deferred;
00112     }
00113 
00114     function conname() {
00115         return $this->conname;
00116     }
00117 
00122     function defaultValue() {
00123         if ( $this->has_default ) {
00124             return $this->default;
00125         } else {
00126             return false;
00127         }
00128     }
00129 }
00130 
00138 class PostgresTransactionState {
00139     private static $WATCHED = array(
00140         array(
00141             "desc" => "%s: Connection state changed from %s -> %s\n",
00142             "states" => array(
00143                 PGSQL_CONNECTION_OK => "OK",
00144                 PGSQL_CONNECTION_BAD => "BAD"
00145             )
00146         ),
00147         array(
00148             "desc" => "%s: Transaction state changed from %s -> %s\n",
00149             "states" => array(
00150                 PGSQL_TRANSACTION_IDLE => "IDLE",
00151                 PGSQL_TRANSACTION_ACTIVE => "ACTIVE",
00152                 PGSQL_TRANSACTION_INTRANS => "TRANS",
00153                 PGSQL_TRANSACTION_INERROR => "ERROR",
00154                 PGSQL_TRANSACTION_UNKNOWN => "UNKNOWN"
00155             )
00156         )
00157     );
00158 
00160     private $mNewState;
00161 
00163     private $mCurrentState;
00164 
00165     public function __construct( $conn ) {
00166         $this->mConn = $conn;
00167         $this->update();
00168         $this->mCurrentState = $this->mNewState;
00169     }
00170 
00171     public function update() {
00172         $this->mNewState = array(
00173             pg_connection_status( $this->mConn ),
00174             pg_transaction_status( $this->mConn )
00175         );
00176     }
00177 
00178     public function check() {
00179         global $wgDebugDBTransactions;
00180         $this->update();
00181         if ( $wgDebugDBTransactions ) {
00182             if ( $this->mCurrentState !== $this->mNewState ) {
00183                 $old = reset( $this->mCurrentState );
00184                 $new = reset( $this->mNewState );
00185                 foreach ( self::$WATCHED as $watched ) {
00186                     if ( $old !== $new ) {
00187                         $this->log_changed( $old, $new, $watched );
00188                     }
00189                     $old = next( $this->mCurrentState );
00190                     $new = next( $this->mNewState );
00191                 }
00192             }
00193         }
00194         $this->mCurrentState = $this->mNewState;
00195     }
00196 
00197     protected function describe_changed( $status, $desc_table ) {
00198         if ( isset( $desc_table[$status] ) ) {
00199             return $desc_table[$status];
00200         } else {
00201             return "STATUS " . $status;
00202         }
00203     }
00204 
00205     protected function log_changed( $old, $new, $watched ) {
00206         wfDebug( sprintf( $watched["desc"],
00207             $this->mConn,
00208             $this->describe_changed( $old, $watched["states"] ),
00209             $this->describe_changed( $new, $watched["states"] )
00210         ) );
00211     }
00212 }
00213 
00219 class SavepointPostgres {
00221     protected $dbw;
00222     protected $id;
00223     protected $didbegin;
00224 
00229     public function __construct( $dbw, $id ) {
00230         $this->dbw = $dbw;
00231         $this->id = $id;
00232         $this->didbegin = false;
00233         /* If we are not in a transaction, we need to be for savepoint trickery */
00234         if ( !$dbw->trxLevel() ) {
00235             $dbw->begin( "FOR SAVEPOINT" );
00236             $this->didbegin = true;
00237         }
00238     }
00239 
00240     public function __destruct() {
00241         if ( $this->didbegin ) {
00242             $this->dbw->rollback();
00243             $this->didbegin = false;
00244         }
00245     }
00246 
00247     public function commit() {
00248         if ( $this->didbegin ) {
00249             $this->dbw->commit();
00250             $this->didbegin = false;
00251         }
00252     }
00253 
00254     protected function query( $keyword, $msg_ok, $msg_failed ) {
00255         global $wgDebugDBTransactions;
00256         if ( $this->dbw->doQuery( $keyword . " " . $this->id ) !== false ) {
00257             if ( $wgDebugDBTransactions ) {
00258                 wfDebug( sprintf( $msg_ok, $this->id ) );
00259             }
00260         } else {
00261             wfDebug( sprintf( $msg_failed, $this->id ) );
00262         }
00263     }
00264 
00265     public function savepoint() {
00266         $this->query( "SAVEPOINT",
00267             "Transaction state: savepoint \"%s\" established.\n",
00268             "Transaction state: establishment of savepoint \"%s\" FAILED.\n"
00269         );
00270     }
00271 
00272     public function release() {
00273         $this->query( "RELEASE",
00274             "Transaction state: savepoint \"%s\" released.\n",
00275             "Transaction state: release of savepoint \"%s\" FAILED.\n"
00276         );
00277     }
00278 
00279     public function rollback() {
00280         $this->query( "ROLLBACK TO",
00281             "Transaction state: savepoint \"%s\" rolled back.\n",
00282             "Transaction state: rollback of savepoint \"%s\" FAILED.\n"
00283         );
00284     }
00285 
00286     public function __toString() {
00287         return (string)$this->id;
00288     }
00289 }
00290 
00294 class DatabasePostgres extends DatabaseBase {
00296     protected $mLastResult = null;
00297 
00299     protected $mAffectedRows = null;
00300 
00302     private $mInsertId = null;
00303 
00305     private $numericVersion = null;
00306 
00308     private $connectString;
00309 
00311     private $mTransactionState;
00312 
00314     private $mCoreSchema;
00315 
00316     function getType() {
00317         return 'postgres';
00318     }
00319 
00320     function cascadingDeletes() {
00321         return true;
00322     }
00323 
00324     function cleanupTriggers() {
00325         return true;
00326     }
00327 
00328     function strictIPs() {
00329         return true;
00330     }
00331 
00332     function realTimestamps() {
00333         return true;
00334     }
00335 
00336     function implicitGroupby() {
00337         return false;
00338     }
00339 
00340     function implicitOrderby() {
00341         return false;
00342     }
00343 
00344     function searchableIPs() {
00345         return true;
00346     }
00347 
00348     function functionalIndexes() {
00349         return true;
00350     }
00351 
00352     function hasConstraint( $name ) {
00353         $sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " .
00354             "WHERE c.connamespace = n.oid AND conname = '" .
00355             pg_escape_string( $this->mConn, $name ) . "' AND n.nspname = '" .
00356             pg_escape_string( $this->mConn, $this->getCoreSchema() ) . "'";
00357         $res = $this->doQuery( $sql );
00358 
00359         return $this->numRows( $res );
00360     }
00361 
00371     function open( $server, $user, $password, $dbName ) {
00372         # Test for Postgres support, to avoid suppressed fatal error
00373         if ( !function_exists( 'pg_connect' ) ) {
00374             throw new DBConnectionError(
00375                 $this,
00376                 "Postgres functions missing, have you compiled PHP with the --with-pgsql\n" .
00377                 "option? (Note: if you recently installed PHP, you may need to restart your\n" .
00378                 "webserver and database)\n"
00379             );
00380         }
00381 
00382         global $wgDBport;
00383 
00384         if ( !strlen( $user ) ) { # e.g. the class is being loaded
00385             return null;
00386         }
00387 
00388         $this->mServer = $server;
00389         $port = $wgDBport;
00390         $this->mUser = $user;
00391         $this->mPassword = $password;
00392         $this->mDBname = $dbName;
00393 
00394         $connectVars = array(
00395             'dbname' => $dbName,
00396             'user' => $user,
00397             'password' => $password
00398         );
00399         if ( $server != false && $server != '' ) {
00400             $connectVars['host'] = $server;
00401         }
00402         if ( $port != false && $port != '' ) {
00403             $connectVars['port'] = $port;
00404         }
00405         if ( $this->mFlags & DBO_SSL ) {
00406             $connectVars['sslmode'] = 1;
00407         }
00408 
00409         $this->connectString = $this->makeConnectionString( $connectVars, PGSQL_CONNECT_FORCE_NEW );
00410         $this->close();
00411         $this->installErrorHandler();
00412 
00413         try {
00414             $this->mConn = pg_connect( $this->connectString );
00415         } catch ( Exception $ex ) {
00416             $this->restoreErrorHandler();
00417             throw $ex;
00418         }
00419 
00420         $phpError = $this->restoreErrorHandler();
00421 
00422         if ( !$this->mConn ) {
00423             wfDebug( "DB connection error\n" );
00424             wfDebug( "Server: $server, Database: $dbName, User: $user, Password: " .
00425                 substr( $password, 0, 3 ) . "...\n" );
00426             wfDebug( $this->lastError() . "\n" );
00427             throw new DBConnectionError( $this, str_replace( "\n", ' ', $phpError ) );
00428         }
00429 
00430         $this->mOpened = true;
00431         $this->mTransactionState = new PostgresTransactionState( $this->mConn );
00432 
00433         global $wgCommandLineMode;
00434         # If called from the command-line (e.g. importDump), only show errors
00435         if ( $wgCommandLineMode ) {
00436             $this->doQuery( "SET client_min_messages = 'ERROR'" );
00437         }
00438 
00439         $this->query( "SET client_encoding='UTF8'", __METHOD__ );
00440         $this->query( "SET datestyle = 'ISO, YMD'", __METHOD__ );
00441         $this->query( "SET timezone = 'GMT'", __METHOD__ );
00442         $this->query( "SET standard_conforming_strings = on", __METHOD__ );
00443         if ( $this->getServerVersion() >= 9.0 ) {
00444             $this->query( "SET bytea_output = 'escape'", __METHOD__ ); // PHP bug 53127
00445         }
00446 
00447         global $wgDBmwschema;
00448         $this->determineCoreSchema( $wgDBmwschema );
00449 
00450         return $this->mConn;
00451     }
00452 
00459     function selectDB( $db ) {
00460         if ( $this->mDBname !== $db ) {
00461             return (bool)$this->open( $this->mServer, $this->mUser, $this->mPassword, $db );
00462         } else {
00463             return true;
00464         }
00465     }
00466 
00467     function makeConnectionString( $vars ) {
00468         $s = '';
00469         foreach ( $vars as $name => $value ) {
00470             $s .= "$name='" . str_replace( "'", "\\'", $value ) . "' ";
00471         }
00472 
00473         return $s;
00474     }
00475 
00481     protected function closeConnection() {
00482         return pg_close( $this->mConn );
00483     }
00484 
00485     public function doQuery( $sql ) {
00486         if ( function_exists( 'mb_convert_encoding' ) ) {
00487             $sql = mb_convert_encoding( $sql, 'UTF-8' );
00488         }
00489         $this->mTransactionState->check();
00490         if ( pg_send_query( $this->mConn, $sql ) === false ) {
00491             throw new DBUnexpectedError( $this, "Unable to post new query to PostgreSQL\n" );
00492         }
00493         $this->mLastResult = pg_get_result( $this->mConn );
00494         $this->mTransactionState->check();
00495         $this->mAffectedRows = null;
00496         if ( pg_result_error( $this->mLastResult ) ) {
00497             return false;
00498         }
00499 
00500         return $this->mLastResult;
00501     }
00502 
00503     protected function dumpError() {
00504         $diags = array(
00505             PGSQL_DIAG_SEVERITY,
00506             PGSQL_DIAG_SQLSTATE,
00507             PGSQL_DIAG_MESSAGE_PRIMARY,
00508             PGSQL_DIAG_MESSAGE_DETAIL,
00509             PGSQL_DIAG_MESSAGE_HINT,
00510             PGSQL_DIAG_STATEMENT_POSITION,
00511             PGSQL_DIAG_INTERNAL_POSITION,
00512             PGSQL_DIAG_INTERNAL_QUERY,
00513             PGSQL_DIAG_CONTEXT,
00514             PGSQL_DIAG_SOURCE_FILE,
00515             PGSQL_DIAG_SOURCE_LINE,
00516             PGSQL_DIAG_SOURCE_FUNCTION
00517         );
00518         foreach ( $diags as $d ) {
00519             wfDebug( sprintf( "PgSQL ERROR(%d): %s\n",
00520                 $d, pg_result_error_field( $this->mLastResult, $d ) ) );
00521         }
00522     }
00523 
00524     function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) {
00525         if ( $tempIgnore ) {
00526             /* Check for constraint violation */
00527             if ( $errno === '23505' ) {
00528                 parent::reportQueryError( $error, $errno, $sql, $fname, $tempIgnore );
00529 
00530                 return;
00531             }
00532         }
00533         /* Transaction stays in the ERROR state until rolledback */
00534         if ( $this->mTrxLevel ) {
00535             $this->rollback( __METHOD__ );
00536         };
00537         parent::reportQueryError( $error, $errno, $sql, $fname, false );
00538     }
00539 
00540     function queryIgnore( $sql, $fname = __METHOD__ ) {
00541         return $this->query( $sql, $fname, true );
00542     }
00543 
00548     function freeResult( $res ) {
00549         if ( $res instanceof ResultWrapper ) {
00550             $res = $res->result;
00551         }
00552         wfSuppressWarnings();
00553         $ok = pg_free_result( $res );
00554         wfRestoreWarnings();
00555         if ( !$ok ) {
00556             throw new DBUnexpectedError( $this, "Unable to free Postgres result\n" );
00557         }
00558     }
00559 
00565     function fetchObject( $res ) {
00566         if ( $res instanceof ResultWrapper ) {
00567             $res = $res->result;
00568         }
00569         wfSuppressWarnings();
00570         $row = pg_fetch_object( $res );
00571         wfRestoreWarnings();
00572         # @todo FIXME: HACK HACK HACK HACK debug
00573 
00574         # @todo hashar: not sure if the following test really trigger if the object
00575         #          fetching failed.
00576         if ( pg_last_error( $this->mConn ) ) {
00577             throw new DBUnexpectedError(
00578                 $this,
00579                 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) )
00580             );
00581         }
00582 
00583         return $row;
00584     }
00585 
00586     function fetchRow( $res ) {
00587         if ( $res instanceof ResultWrapper ) {
00588             $res = $res->result;
00589         }
00590         wfSuppressWarnings();
00591         $row = pg_fetch_array( $res );
00592         wfRestoreWarnings();
00593         if ( pg_last_error( $this->mConn ) ) {
00594             throw new DBUnexpectedError(
00595                 $this,
00596                 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) )
00597             );
00598         }
00599 
00600         return $row;
00601     }
00602 
00603     function numRows( $res ) {
00604         if ( $res instanceof ResultWrapper ) {
00605             $res = $res->result;
00606         }
00607         wfSuppressWarnings();
00608         $n = pg_num_rows( $res );
00609         wfRestoreWarnings();
00610         if ( pg_last_error( $this->mConn ) ) {
00611             throw new DBUnexpectedError(
00612                 $this,
00613                 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) )
00614             );
00615         }
00616 
00617         return $n;
00618     }
00619 
00620     function numFields( $res ) {
00621         if ( $res instanceof ResultWrapper ) {
00622             $res = $res->result;
00623         }
00624 
00625         return pg_num_fields( $res );
00626     }
00627 
00628     function fieldName( $res, $n ) {
00629         if ( $res instanceof ResultWrapper ) {
00630             $res = $res->result;
00631         }
00632 
00633         return pg_field_name( $res, $n );
00634     }
00635 
00642     function insertId() {
00643         return $this->mInsertId;
00644     }
00645 
00651     function dataSeek( $res, $row ) {
00652         if ( $res instanceof ResultWrapper ) {
00653             $res = $res->result;
00654         }
00655 
00656         return pg_result_seek( $res, $row );
00657     }
00658 
00659     function lastError() {
00660         if ( $this->mConn ) {
00661             if ( $this->mLastResult ) {
00662                 return pg_result_error( $this->mLastResult );
00663             } else {
00664                 return pg_last_error();
00665             }
00666         } else {
00667             return 'No database connection';
00668         }
00669     }
00670 
00671     function lastErrno() {
00672         if ( $this->mLastResult ) {
00673             return pg_result_error_field( $this->mLastResult, PGSQL_DIAG_SQLSTATE );
00674         } else {
00675             return false;
00676         }
00677     }
00678 
00679     function affectedRows() {
00680         if ( !is_null( $this->mAffectedRows ) ) {
00681             // Forced result for simulated queries
00682             return $this->mAffectedRows;
00683         }
00684         if ( empty( $this->mLastResult ) ) {
00685             return 0;
00686         }
00687 
00688         return pg_affected_rows( $this->mLastResult );
00689     }
00690 
00705     function estimateRowCount( $table, $vars = '*', $conds = '',
00706         $fname = __METHOD__, $options = array()
00707     ) {
00708         $options['EXPLAIN'] = true;
00709         $res = $this->select( $table, $vars, $conds, $fname, $options );
00710         $rows = -1;
00711         if ( $res ) {
00712             $row = $this->fetchRow( $res );
00713             $count = array();
00714             if ( preg_match( '/rows=(\d+)/', $row[0], $count ) ) {
00715                 $rows = $count[1];
00716             }
00717         }
00718 
00719         return $rows;
00720     }
00721 
00731     function indexInfo( $table, $index, $fname = __METHOD__ ) {
00732         $sql = "SELECT indexname FROM pg_indexes WHERE tablename='$table'";
00733         $res = $this->query( $sql, $fname );
00734         if ( !$res ) {
00735             return null;
00736         }
00737         foreach ( $res as $row ) {
00738             if ( $row->indexname == $this->indexName( $index ) ) {
00739                 return $row;
00740             }
00741         }
00742 
00743         return false;
00744     }
00745 
00754     function indexAttributes( $index, $schema = false ) {
00755         if ( $schema === false ) {
00756             $schema = $this->getCoreSchema();
00757         }
00758         /*
00759          * A subquery would be not needed if we didn't care about the order
00760          * of attributes, but we do
00761          */
00762         $sql = <<<__INDEXATTR__
00763 
00764             SELECT opcname,
00765                 attname,
00766                 i.indoption[s.g] as option,
00767                 pg_am.amname
00768             FROM
00769                 (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
00770                     FROM
00771                         pg_index isub
00772                     JOIN pg_class cis
00773                         ON cis.oid=isub.indexrelid
00774                     JOIN pg_namespace ns
00775                         ON cis.relnamespace = ns.oid
00776                     WHERE cis.relname='$index' AND ns.nspname='$schema') AS s,
00777                 pg_attribute,
00778                 pg_opclass opcls,
00779                 pg_am,
00780                 pg_class ci
00781                 JOIN pg_index i
00782                     ON ci.oid=i.indexrelid
00783                 JOIN pg_class ct
00784                     ON ct.oid = i.indrelid
00785                 JOIN pg_namespace n
00786                     ON ci.relnamespace = n.oid
00787                 WHERE
00788                     ci.relname='$index' AND n.nspname='$schema'
00789                     AND attrelid = ct.oid
00790                     AND i.indkey[s.g] = attnum
00791                     AND i.indclass[s.g] = opcls.oid
00792                     AND pg_am.oid = opcls.opcmethod
00793 __INDEXATTR__;
00794         $res = $this->query( $sql, __METHOD__ );
00795         $a = array();
00796         if ( $res ) {
00797             foreach ( $res as $row ) {
00798                 $a[] = array(
00799                     $row->attname,
00800                     $row->opcname,
00801                     $row->amname,
00802                     $row->option );
00803             }
00804         } else {
00805             return null;
00806         }
00807 
00808         return $a;
00809     }
00810 
00811     function indexUnique( $table, $index, $fname = __METHOD__ ) {
00812         $sql = "SELECT indexname FROM pg_indexes WHERE tablename='{$table}'" .
00813             " AND indexdef LIKE 'CREATE UNIQUE%(" .
00814             $this->strencode( $this->indexName( $index ) ) .
00815             ")'";
00816         $res = $this->query( $sql, $fname );
00817         if ( !$res ) {
00818             return null;
00819         }
00820 
00821         return $res->numRows() > 0;
00822     }
00823 
00835     function selectSQLText( $table, $vars, $conds = '', $fname = __METHOD__,
00836         $options = array(), $join_conds = array()
00837     ) {
00838         if ( is_array( $options ) ) {
00839             $forUpdateKey = array_search( 'FOR UPDATE', $options, true );
00840             if ( $forUpdateKey !== false && $join_conds ) {
00841                 unset( $options[$forUpdateKey] );
00842 
00843                 foreach ( $join_conds as $table_cond => $join_cond ) {
00844                     if ( 0 === preg_match( '/^(?:LEFT|RIGHT|FULL)(?: OUTER)? JOIN$/i', $join_cond[0] ) ) {
00845                         $options['FOR UPDATE'][] = $table_cond;
00846                     }
00847                 }
00848             }
00849 
00850             if ( isset( $options['ORDER BY'] ) && $options['ORDER BY'] == 'NULL' ) {
00851                 unset( $options['ORDER BY'] );
00852             }
00853         }
00854 
00855         return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds );
00856     }
00857 
00870     function insert( $table, $args, $fname = __METHOD__, $options = array() ) {
00871         if ( !count( $args ) ) {
00872             return true;
00873         }
00874 
00875         $table = $this->tableName( $table );
00876         if ( !isset( $this->numericVersion ) ) {
00877             $this->getServerVersion();
00878         }
00879 
00880         if ( !is_array( $options ) ) {
00881             $options = array( $options );
00882         }
00883 
00884         if ( isset( $args[0] ) && is_array( $args[0] ) ) {
00885             $multi = true;
00886             $keys = array_keys( $args[0] );
00887         } else {
00888             $multi = false;
00889             $keys = array_keys( $args );
00890         }
00891 
00892         // If IGNORE is set, we use savepoints to emulate mysql's behavior
00893         $savepoint = null;
00894         if ( in_array( 'IGNORE', $options ) ) {
00895             $savepoint = new SavepointPostgres( $this, 'mw' );
00896             $olde = error_reporting( 0 );
00897             // For future use, we may want to track the number of actual inserts
00898             // Right now, insert (all writes) simply return true/false
00899             $numrowsinserted = 0;
00900         }
00901 
00902         $sql = "INSERT INTO $table (" . implode( ',', $keys ) . ') VALUES ';
00903 
00904         if ( $multi ) {
00905             if ( $this->numericVersion >= 8.2 && !$savepoint ) {
00906                 $first = true;
00907                 foreach ( $args as $row ) {
00908                     if ( $first ) {
00909                         $first = false;
00910                     } else {
00911                         $sql .= ',';
00912                     }
00913                     $sql .= '(' . $this->makeList( $row ) . ')';
00914                 }
00915                 $res = (bool)$this->query( $sql, $fname, $savepoint );
00916             } else {
00917                 $res = true;
00918                 $origsql = $sql;
00919                 foreach ( $args as $row ) {
00920                     $tempsql = $origsql;
00921                     $tempsql .= '(' . $this->makeList( $row ) . ')';
00922 
00923                     if ( $savepoint ) {
00924                         $savepoint->savepoint();
00925                     }
00926 
00927                     $tempres = (bool)$this->query( $tempsql, $fname, $savepoint );
00928 
00929                     if ( $savepoint ) {
00930                         $bar = pg_last_error();
00931                         if ( $bar != false ) {
00932                             $savepoint->rollback();
00933                         } else {
00934                             $savepoint->release();
00935                             $numrowsinserted++;
00936                         }
00937                     }
00938 
00939                     // If any of them fail, we fail overall for this function call
00940                     // Note that this will be ignored if IGNORE is set
00941                     if ( !$tempres ) {
00942                         $res = false;
00943                     }
00944                 }
00945             }
00946         } else {
00947             // Not multi, just a lone insert
00948             if ( $savepoint ) {
00949                 $savepoint->savepoint();
00950             }
00951 
00952             $sql .= '(' . $this->makeList( $args ) . ')';
00953             $res = (bool)$this->query( $sql, $fname, $savepoint );
00954             if ( $savepoint ) {
00955                 $bar = pg_last_error();
00956                 if ( $bar != false ) {
00957                     $savepoint->rollback();
00958                 } else {
00959                     $savepoint->release();
00960                     $numrowsinserted++;
00961                 }
00962             }
00963         }
00964         if ( $savepoint ) {
00965             error_reporting( $olde );
00966             $savepoint->commit();
00967 
00968             // Set the affected row count for the whole operation
00969             $this->mAffectedRows = $numrowsinserted;
00970 
00971             // IGNORE always returns true
00972             return true;
00973         }
00974 
00975         return $res;
00976     }
00977 
00996     function insertSelect( $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__,
00997         $insertOptions = array(), $selectOptions = array() ) {
00998         $destTable = $this->tableName( $destTable );
00999 
01000         if ( !is_array( $insertOptions ) ) {
01001             $insertOptions = array( $insertOptions );
01002         }
01003 
01004         /*
01005          * If IGNORE is set, we use savepoints to emulate mysql's behavior
01006          * Ignore LOW PRIORITY option, since it is MySQL-specific
01007          */
01008         $savepoint = null;
01009         if ( in_array( 'IGNORE', $insertOptions ) ) {
01010             $savepoint = new SavepointPostgres( $this, 'mw' );
01011             $olde = error_reporting( 0 );
01012             $numrowsinserted = 0;
01013             $savepoint->savepoint();
01014         }
01015 
01016         if ( !is_array( $selectOptions ) ) {
01017             $selectOptions = array( $selectOptions );
01018         }
01019         list( $startOpts, $useIndex, $tailOpts ) = $this->makeSelectOptions( $selectOptions );
01020         if ( is_array( $srcTable ) ) {
01021             $srcTable = implode( ',', array_map( array( &$this, 'tableName' ), $srcTable ) );
01022         } else {
01023             $srcTable = $this->tableName( $srcTable );
01024         }
01025 
01026         $sql = "INSERT INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ')' .
01027             " SELECT $startOpts " . implode( ',', $varMap ) .
01028             " FROM $srcTable $useIndex";
01029 
01030         if ( $conds != '*' ) {
01031             $sql .= ' WHERE ' . $this->makeList( $conds, LIST_AND );
01032         }
01033 
01034         $sql .= " $tailOpts";
01035 
01036         $res = (bool)$this->query( $sql, $fname, $savepoint );
01037         if ( $savepoint ) {
01038             $bar = pg_last_error();
01039             if ( $bar != false ) {
01040                 $savepoint->rollback();
01041             } else {
01042                 $savepoint->release();
01043                 $numrowsinserted++;
01044             }
01045             error_reporting( $olde );
01046             $savepoint->commit();
01047 
01048             // Set the affected row count for the whole operation
01049             $this->mAffectedRows = $numrowsinserted;
01050 
01051             // IGNORE always returns true
01052             return true;
01053         }
01054 
01055         return $res;
01056     }
01057 
01058     function tableName( $name, $format = 'quoted' ) {
01059         # Replace reserved words with better ones
01060         switch ( $name ) {
01061             case 'user':
01062                 return $this->realTableName( 'mwuser', $format );
01063             case 'text':
01064                 return $this->realTableName( 'pagecontent', $format );
01065             default:
01066                 return $this->realTableName( $name, $format );
01067         }
01068     }
01069 
01070     /* Don't cheat on installer */
01071     function realTableName( $name, $format = 'quoted' ) {
01072         return parent::tableName( $name, $format );
01073     }
01074 
01081     function nextSequenceValue( $seqName ) {
01082         $safeseq = str_replace( "'", "''", $seqName );
01083         $res = $this->query( "SELECT nextval('$safeseq')" );
01084         $row = $this->fetchRow( $res );
01085         $this->mInsertId = $row[0];
01086 
01087         return $this->mInsertId;
01088     }
01089 
01096     function currentSequenceValue( $seqName ) {
01097         $safeseq = str_replace( "'", "''", $seqName );
01098         $res = $this->query( "SELECT currval('$safeseq')" );
01099         $row = $this->fetchRow( $res );
01100         $currval = $row[0];
01101 
01102         return $currval;
01103     }
01104 
01105     # Returns the size of a text field, or -1 for "unlimited"
01106     function textFieldSize( $table, $field ) {
01107         $table = $this->tableName( $table );
01108         $sql = "SELECT t.typname as ftype,a.atttypmod as size
01109             FROM pg_class c, pg_attribute a, pg_type t
01110             WHERE relname='$table' AND a.attrelid=c.oid AND
01111                 a.atttypid=t.oid and a.attname='$field'";
01112         $res = $this->query( $sql );
01113         $row = $this->fetchObject( $res );
01114         if ( $row->ftype == 'varchar' ) {
01115             $size = $row->size - 4;
01116         } else {
01117             $size = $row->size;
01118         }
01119 
01120         return $size;
01121     }
01122 
01123     function limitResult( $sql, $limit, $offset = false ) {
01124         return "$sql LIMIT $limit " . ( is_numeric( $offset ) ? " OFFSET {$offset} " : '' );
01125     }
01126 
01127     function wasDeadlock() {
01128         return $this->lastErrno() == '40P01';
01129     }
01130 
01131     function duplicateTableStructure( $oldName, $newName, $temporary = false, $fname = __METHOD__ ) {
01132         $newName = $this->addIdentifierQuotes( $newName );
01133         $oldName = $this->addIdentifierQuotes( $oldName );
01134 
01135         return $this->query( 'CREATE ' . ( $temporary ? 'TEMPORARY ' : '' ) . " TABLE $newName " .
01136             "(LIKE $oldName INCLUDING DEFAULTS)", $fname );
01137     }
01138 
01139     function listTables( $prefix = null, $fname = __METHOD__ ) {
01140         $eschema = $this->addQuotes( $this->getCoreSchema() );
01141         $result = $this->query( "SELECT tablename FROM pg_tables WHERE schemaname = $eschema", $fname );
01142         $endArray = array();
01143 
01144         foreach ( $result as $table ) {
01145             $vars = get_object_vars( $table );
01146             $table = array_pop( $vars );
01147             if ( !$prefix || strpos( $table, $prefix ) === 0 ) {
01148                 $endArray[] = $table;
01149             }
01150         }
01151 
01152         return $endArray;
01153     }
01154 
01155     function timestamp( $ts = 0 ) {
01156         return wfTimestamp( TS_POSTGRES, $ts );
01157     }
01158 
01177     function pg_array_parse( $text, &$output, $limit = false, $offset = 1 ) {
01178         if ( false === $limit ) {
01179             $limit = strlen( $text ) - 1;
01180             $output = array();
01181         }
01182         if ( '{}' == $text ) {
01183             return $output;
01184         }
01185         do {
01186             if ( '{' != $text[$offset] ) {
01187                 preg_match( "/(\\{?\"([^\"\\\\]|\\\\.)*\"|[^,{}]+)+([,}]+)/",
01188                     $text, $match, 0, $offset );
01189                 $offset += strlen( $match[0] );
01190                 $output[] = ( '"' != $match[1][0]
01191                     ? $match[1]
01192                     : stripcslashes( substr( $match[1], 1, -1 ) ) );
01193                 if ( '},' == $match[3] ) {
01194                     return $output;
01195                 }
01196             } else {
01197                 $offset = $this->pg_array_parse( $text, $output, $limit, $offset + 1 );
01198             }
01199         } while ( $limit > $offset );
01200 
01201         return $output;
01202     }
01203 
01210     public function aggregateValue( $valuedata, $valuename = 'value' ) {
01211         return $valuedata;
01212     }
01213 
01217     public function getSoftwareLink() {
01218         return '[{{int:version-db-postgres-url}} PostgreSQL]';
01219     }
01220 
01228     function getCurrentSchema() {
01229         $res = $this->query( "SELECT current_schema()", __METHOD__ );
01230         $row = $this->fetchRow( $res );
01231 
01232         return $row[0];
01233     }
01234 
01245     function getSchemas() {
01246         $res = $this->query( "SELECT current_schemas(false)", __METHOD__ );
01247         $row = $this->fetchRow( $res );
01248         $schemas = array();
01249 
01250         /* PHP pgsql support does not support array type, "{a,b}" string is returned */
01251 
01252         return $this->pg_array_parse( $row[0], $schemas );
01253     }
01254 
01264     function getSearchPath() {
01265         $res = $this->query( "SHOW search_path", __METHOD__ );
01266         $row = $this->fetchRow( $res );
01267 
01268         /* PostgreSQL returns SHOW values as strings */
01269 
01270         return explode( ",", $row[0] );
01271     }
01272 
01280     function setSearchPath( $search_path ) {
01281         $this->query( "SET search_path = " . implode( ", ", $search_path ) );
01282     }
01283 
01298     function determineCoreSchema( $desiredSchema ) {
01299         $this->begin( __METHOD__ );
01300         if ( $this->schemaExists( $desiredSchema ) ) {
01301             if ( in_array( $desiredSchema, $this->getSchemas() ) ) {
01302                 $this->mCoreSchema = $desiredSchema;
01303                 wfDebug( "Schema \"" . $desiredSchema . "\" already in the search path\n" );
01304             } else {
01310                 $search_path = $this->getSearchPath();
01311                 array_unshift( $search_path,
01312                     $this->addIdentifierQuotes( $desiredSchema ) );
01313                 $this->setSearchPath( $search_path );
01314                 $this->mCoreSchema = $desiredSchema;
01315                 wfDebug( "Schema \"" . $desiredSchema . "\" added to the search path\n" );
01316             }
01317         } else {
01318             $this->mCoreSchema = $this->getCurrentSchema();
01319             wfDebug( "Schema \"" . $desiredSchema . "\" not found, using current \"" .
01320                 $this->mCoreSchema . "\"\n" );
01321         }
01322         /* Commit SET otherwise it will be rollbacked on error or IGNORE SELECT */
01323         $this->commit( __METHOD__ );
01324     }
01325 
01332     function getCoreSchema() {
01333         return $this->mCoreSchema;
01334     }
01335 
01339     function getServerVersion() {
01340         if ( !isset( $this->numericVersion ) ) {
01341             $versionInfo = pg_version( $this->mConn );
01342             if ( version_compare( $versionInfo['client'], '7.4.0', 'lt' ) ) {
01343                 // Old client, abort install
01344                 $this->numericVersion = '7.3 or earlier';
01345             } elseif ( isset( $versionInfo['server'] ) ) {
01346                 // Normal client
01347                 $this->numericVersion = $versionInfo['server'];
01348             } else {
01349                 // Bug 16937: broken pgsql extension from PHP<5.3
01350                 $this->numericVersion = pg_parameter_status( $this->mConn, 'server_version' );
01351             }
01352         }
01353 
01354         return $this->numericVersion;
01355     }
01356 
01365     function relationExists( $table, $types, $schema = false ) {
01366         if ( !is_array( $types ) ) {
01367             $types = array( $types );
01368         }
01369         if ( !$schema ) {
01370             $schema = $this->getCoreSchema();
01371         }
01372         $table = $this->realTableName( $table, 'raw' );
01373         $etable = $this->addQuotes( $table );
01374         $eschema = $this->addQuotes( $schema );
01375         $sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
01376             . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
01377             . "AND c.relkind IN ('" . implode( "','", $types ) . "')";
01378         $res = $this->query( $sql );
01379         $count = $res ? $res->numRows() : 0;
01380 
01381         return (bool)$count;
01382     }
01383 
01392     function tableExists( $table, $fname = __METHOD__, $schema = false ) {
01393         return $this->relationExists( $table, array( 'r', 'v' ), $schema );
01394     }
01395 
01396     function sequenceExists( $sequence, $schema = false ) {
01397         return $this->relationExists( $sequence, 'S', $schema );
01398     }
01399 
01400     function triggerExists( $table, $trigger ) {
01401         $q = <<<SQL
01402     SELECT 1 FROM pg_class, pg_namespace, pg_trigger
01403         WHERE relnamespace=pg_namespace.oid AND relkind='r'
01404               AND tgrelid=pg_class.oid
01405               AND nspname=%s AND relname=%s AND tgname=%s
01406 SQL;
01407         $res = $this->query(
01408             sprintf(
01409                 $q,
01410                 $this->addQuotes( $this->getCoreSchema() ),
01411                 $this->addQuotes( $table ),
01412                 $this->addQuotes( $trigger )
01413             )
01414         );
01415         if ( !$res ) {
01416             return null;
01417         }
01418         $rows = $res->numRows();
01419 
01420         return $rows;
01421     }
01422 
01423     function ruleExists( $table, $rule ) {
01424         $exists = $this->selectField( 'pg_rules', 'rulename',
01425             array(
01426                 'rulename' => $rule,
01427                 'tablename' => $table,
01428                 'schemaname' => $this->getCoreSchema()
01429             )
01430         );
01431 
01432         return $exists === $rule;
01433     }
01434 
01435     function constraintExists( $table, $constraint ) {
01436         $sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
01437             "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
01438             $this->addQuotes( $this->getCoreSchema() ),
01439             $this->addQuotes( $table ),
01440             $this->addQuotes( $constraint )
01441         );
01442         $res = $this->query( $sql );
01443         if ( !$res ) {
01444             return null;
01445         }
01446         $rows = $res->numRows();
01447 
01448         return $rows;
01449     }
01450 
01456     function schemaExists( $schema ) {
01457         $exists = $this->selectField( '"pg_catalog"."pg_namespace"', 1,
01458             array( 'nspname' => $schema ), __METHOD__ );
01459 
01460         return (bool)$exists;
01461     }
01462 
01468     function roleExists( $roleName ) {
01469         $exists = $this->selectField( '"pg_catalog"."pg_roles"', 1,
01470             array( 'rolname' => $roleName ), __METHOD__ );
01471 
01472         return (bool)$exists;
01473     }
01474 
01475     function fieldInfo( $table, $field ) {
01476         return PostgresField::fromText( $this, $table, $field );
01477     }
01478 
01485     function fieldType( $res, $index ) {
01486         if ( $res instanceof ResultWrapper ) {
01487             $res = $res->result;
01488         }
01489 
01490         return pg_field_type( $res, $index );
01491     }
01492 
01497     function encodeBlob( $b ) {
01498         return new Blob( pg_escape_bytea( $this->mConn, $b ) );
01499     }
01500 
01501     function decodeBlob( $b ) {
01502         if ( $b instanceof Blob ) {
01503             $b = $b->fetch();
01504         }
01505 
01506         return pg_unescape_bytea( $b );
01507     }
01508 
01509     function strencode( $s ) { # Should not be called by us
01510         return pg_escape_string( $this->mConn, $s );
01511     }
01512 
01517     function addQuotes( $s ) {
01518         if ( is_null( $s ) ) {
01519             return 'NULL';
01520         } elseif ( is_bool( $s ) ) {
01521             return intval( $s );
01522         } elseif ( $s instanceof Blob ) {
01523             return "'" . $s->fetch( $s ) . "'";
01524         }
01525 
01526         return "'" . pg_escape_string( $this->mConn, $s ) . "'";
01527     }
01528 
01536     protected function replaceVars( $ins ) {
01537         $ins = parent::replaceVars( $ins );
01538 
01539         if ( $this->numericVersion >= 8.3 ) {
01540             // Thanks for not providing backwards-compatibility, 8.3
01541             $ins = preg_replace( "/to_tsvector\s*\(\s*'default'\s*,/", 'to_tsvector(', $ins );
01542         }
01543 
01544         if ( $this->numericVersion <= 8.1 ) { // Our minimum version
01545             $ins = str_replace( 'USING gin', 'USING gist', $ins );
01546         }
01547 
01548         return $ins;
01549     }
01550 
01558     function makeSelectOptions( $options ) {
01559         $preLimitTail = $postLimitTail = '';
01560         $startOpts = $useIndex = '';
01561 
01562         $noKeyOptions = array();
01563         foreach ( $options as $key => $option ) {
01564             if ( is_numeric( $key ) ) {
01565                 $noKeyOptions[$option] = true;
01566             }
01567         }
01568 
01569         $preLimitTail .= $this->makeGroupByWithHaving( $options );
01570 
01571         $preLimitTail .= $this->makeOrderBy( $options );
01572 
01573         //if ( isset( $options['LIMIT'] ) ) {
01574         //  $tailOpts .= $this->limitResult( '', $options['LIMIT'],
01575         //      isset( $options['OFFSET'] ) ? $options['OFFSET']
01576         //      : false );
01577         //}
01578 
01579         if ( isset( $options['FOR UPDATE'] ) ) {
01580             $postLimitTail .= ' FOR UPDATE OF ' .
01581                 implode( ', ', array_map( array( &$this, 'tableName' ), $options['FOR UPDATE'] ) );
01582         } elseif ( isset( $noKeyOptions['FOR UPDATE'] ) ) {
01583             $postLimitTail .= ' FOR UPDATE';
01584         }
01585 
01586         if ( isset( $noKeyOptions['DISTINCT'] ) || isset( $noKeyOptions['DISTINCTROW'] ) ) {
01587             $startOpts .= 'DISTINCT';
01588         }
01589 
01590         return array( $startOpts, $useIndex, $preLimitTail, $postLimitTail );
01591     }
01592 
01593     function getDBname() {
01594         return $this->mDBname;
01595     }
01596 
01597     function getServer() {
01598         return $this->mServer;
01599     }
01600 
01601     function buildConcat( $stringList ) {
01602         return implode( ' || ', $stringList );
01603     }
01604 
01605     public function buildGroupConcatField(
01606         $delimiter, $table, $field, $conds = '', $options = array(), $join_conds = array()
01607     ) {
01608         $fld = "array_to_string(array_agg($field)," . $this->addQuotes( $delimiter ) . ')';
01609 
01610         return '(' . $this->selectSQLText( $table, $fld, $conds, null, array(), $join_conds ) . ')';
01611     }
01612 
01613     public function getSearchEngine() {
01614         return 'SearchPostgres';
01615     }
01616 
01617     public function streamStatementEnd( &$sql, &$newLine ) {
01618         # Allow dollar quoting for function declarations
01619         if ( substr( $newLine, 0, 4 ) == '$mw$' ) {
01620             if ( $this->delimiter ) {
01621                 $this->delimiter = false;
01622             } else {
01623                 $this->delimiter = ';';
01624             }
01625         }
01626 
01627         return parent::streamStatementEnd( $sql, $newLine );
01628     }
01629 
01639     public function lockIsFree( $lockName, $method ) {
01640         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01641         $result = $this->query( "SELECT (CASE(pg_try_advisory_lock($key))
01642             WHEN 'f' THEN 'f' ELSE pg_advisory_unlock($key) END) AS lockstatus", $method );
01643         $row = $this->fetchObject( $result );
01644 
01645         return ( $row->lockstatus === 't' );
01646     }
01647 
01655     public function lock( $lockName, $method, $timeout = 5 ) {
01656         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01657         for ( $attempts = 1; $attempts <= $timeout; ++$attempts ) {
01658             $result = $this->query(
01659                 "SELECT pg_try_advisory_lock($key) AS lockstatus", $method );
01660             $row = $this->fetchObject( $result );
01661             if ( $row->lockstatus === 't' ) {
01662                 return true;
01663             } else {
01664                 sleep( 1 );
01665             }
01666         }
01667         wfDebug( __METHOD__ . " failed to acquire lock\n" );
01668 
01669         return false;
01670     }
01671 
01679     public function unlock( $lockName, $method ) {
01680         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01681         $result = $this->query( "SELECT pg_advisory_unlock($key) as lockstatus", $method );
01682         $row = $this->fetchObject( $result );
01683 
01684         return ( $row->lockstatus === 't' );
01685     }
01686 
01691     private function bigintFromLockName( $lockName ) {
01692         return wfBaseConvert( substr( sha1( $lockName ), 0, 15 ), 16, 10 );
01693     }
01694 } // end DatabasePostgres class