MediaWiki  REL1_23
DatabasePostgres.php
Go to the documentation of this file.
00001 <?php
00024 class PostgresField implements Field {
00025     private $name, $tablename, $type, $nullable, $max_length, $deferred, $deferrable, $conname,
00026         $has_default, $default;
00027 
00034     static function fromText( $db, $table, $field ) {
00035         $q = <<<SQL
00036 SELECT
00037  attnotnull, attlen, conname AS conname,
00038  atthasdef,
00039  adsrc,
00040  COALESCE(condeferred, 'f') AS deferred,
00041  COALESCE(condeferrable, 'f') AS deferrable,
00042  CASE WHEN typname = 'int2' THEN 'smallint'
00043   WHEN typname = 'int4' THEN 'integer'
00044   WHEN typname = 'int8' THEN 'bigint'
00045   WHEN typname = 'bpchar' THEN 'char'
00046  ELSE typname END AS typname
00047 FROM pg_class c
00048 JOIN pg_namespace n ON (n.oid = c.relnamespace)
00049 JOIN pg_attribute a ON (a.attrelid = c.oid)
00050 JOIN pg_type t ON (t.oid = a.atttypid)
00051 LEFT JOIN pg_constraint o ON (o.conrelid = c.oid AND a.attnum = ANY(o.conkey) AND o.contype = 'f')
00052 LEFT JOIN pg_attrdef d on c.oid=d.adrelid and a.attnum=d.adnum
00053 WHERE relkind = 'r'
00054 AND nspname=%s
00055 AND relname=%s
00056 AND attname=%s;
00057 SQL;
00058 
00059         $table = $db->tableName( $table, 'raw' );
00060         $res = $db->query(
00061             sprintf( $q,
00062                 $db->addQuotes( $db->getCoreSchema() ),
00063                 $db->addQuotes( $table ),
00064                 $db->addQuotes( $field )
00065             )
00066         );
00067         $row = $db->fetchObject( $res );
00068         if ( !$row ) {
00069             return null;
00070         }
00071         $n = new PostgresField;
00072         $n->type = $row->typname;
00073         $n->nullable = ( $row->attnotnull == 'f' );
00074         $n->name = $field;
00075         $n->tablename = $table;
00076         $n->max_length = $row->attlen;
00077         $n->deferrable = ( $row->deferrable == 't' );
00078         $n->deferred = ( $row->deferred == 't' );
00079         $n->conname = $row->conname;
00080         $n->has_default = ( $row->atthasdef === 't' );
00081         $n->default = $row->adsrc;
00082 
00083         return $n;
00084     }
00085 
00086     function name() {
00087         return $this->name;
00088     }
00089 
00090     function tableName() {
00091         return $this->tablename;
00092     }
00093 
00094     function type() {
00095         return $this->type;
00096     }
00097 
00098     function isNullable() {
00099         return $this->nullable;
00100     }
00101 
00102     function maxLength() {
00103         return $this->max_length;
00104     }
00105 
00106     function is_deferrable() {
00107         return $this->deferrable;
00108     }
00109 
00110     function is_deferred() {
00111         return $this->deferred;
00112     }
00113 
00114     function conname() {
00115         return $this->conname;
00116     }
00117 
00121     function defaultValue() {
00122         if ( $this->has_default ) {
00123             return $this->default;
00124         } else {
00125             return false;
00126         }
00127     }
00128 }
00129 
00137 class PostgresTransactionState {
00138     private static $WATCHED = array(
00139         array(
00140             "desc" => "%s: Connection state changed from %s -> %s\n",
00141             "states" => array(
00142                 PGSQL_CONNECTION_OK => "OK",
00143                 PGSQL_CONNECTION_BAD => "BAD"
00144             )
00145         ),
00146         array(
00147             "desc" => "%s: Transaction state changed from %s -> %s\n",
00148             "states" => array(
00149                 PGSQL_TRANSACTION_IDLE => "IDLE",
00150                 PGSQL_TRANSACTION_ACTIVE => "ACTIVE",
00151                 PGSQL_TRANSACTION_INTRANS => "TRANS",
00152                 PGSQL_TRANSACTION_INERROR => "ERROR",
00153                 PGSQL_TRANSACTION_UNKNOWN => "UNKNOWN"
00154             )
00155         )
00156     );
00157 
00159     private $mNewState;
00160 
00162     private $mCurrentState;
00163 
00164     public function __construct( $conn ) {
00165         $this->mConn = $conn;
00166         $this->update();
00167         $this->mCurrentState = $this->mNewState;
00168     }
00169 
00170     public function update() {
00171         $this->mNewState = array(
00172             pg_connection_status( $this->mConn ),
00173             pg_transaction_status( $this->mConn )
00174         );
00175     }
00176 
00177     public function check() {
00178         global $wgDebugDBTransactions;
00179         $this->update();
00180         if ( $wgDebugDBTransactions ) {
00181             if ( $this->mCurrentState !== $this->mNewState ) {
00182                 $old = reset( $this->mCurrentState );
00183                 $new = reset( $this->mNewState );
00184                 foreach ( self::$WATCHED as $watched ) {
00185                     if ( $old !== $new ) {
00186                         $this->log_changed( $old, $new, $watched );
00187                     }
00188                     $old = next( $this->mCurrentState );
00189                     $new = next( $this->mNewState );
00190                 }
00191             }
00192         }
00193         $this->mCurrentState = $this->mNewState;
00194     }
00195 
00196     protected function describe_changed( $status, $desc_table ) {
00197         if ( isset( $desc_table[$status] ) ) {
00198             return $desc_table[$status];
00199         } else {
00200             return "STATUS " . $status;
00201         }
00202     }
00203 
00204     protected function log_changed( $old, $new, $watched ) {
00205         wfDebug( sprintf( $watched["desc"],
00206             $this->mConn,
00207             $this->describe_changed( $old, $watched["states"] ),
00208             $this->describe_changed( $new, $watched["states"] )
00209         ) );
00210     }
00211 }
00212 
00218 class SavepointPostgres {
00220     protected $dbw;
00221     protected $id;
00222     protected $didbegin;
00223 
00228     public function __construct( $dbw, $id ) {
00229         $this->dbw = $dbw;
00230         $this->id = $id;
00231         $this->didbegin = false;
00232         /* If we are not in a transaction, we need to be for savepoint trickery */
00233         if ( !$dbw->trxLevel() ) {
00234             $dbw->begin( "FOR SAVEPOINT" );
00235             $this->didbegin = true;
00236         }
00237     }
00238 
00239     public function __destruct() {
00240         if ( $this->didbegin ) {
00241             $this->dbw->rollback();
00242             $this->didbegin = false;
00243         }
00244     }
00245 
00246     public function commit() {
00247         if ( $this->didbegin ) {
00248             $this->dbw->commit();
00249             $this->didbegin = false;
00250         }
00251     }
00252 
00253     protected function query( $keyword, $msg_ok, $msg_failed ) {
00254         global $wgDebugDBTransactions;
00255         if ( $this->dbw->doQuery( $keyword . " " . $this->id ) !== false ) {
00256             if ( $wgDebugDBTransactions ) {
00257                 wfDebug( sprintf( $msg_ok, $this->id ) );
00258             }
00259         } else {
00260             wfDebug( sprintf( $msg_failed, $this->id ) );
00261         }
00262     }
00263 
00264     public function savepoint() {
00265         $this->query( "SAVEPOINT",
00266             "Transaction state: savepoint \"%s\" established.\n",
00267             "Transaction state: establishment of savepoint \"%s\" FAILED.\n"
00268         );
00269     }
00270 
00271     public function release() {
00272         $this->query( "RELEASE",
00273             "Transaction state: savepoint \"%s\" released.\n",
00274             "Transaction state: release of savepoint \"%s\" FAILED.\n"
00275         );
00276     }
00277 
00278     public function rollback() {
00279         $this->query( "ROLLBACK TO",
00280             "Transaction state: savepoint \"%s\" rolled back.\n",
00281             "Transaction state: rollback of savepoint \"%s\" FAILED.\n"
00282         );
00283     }
00284 
00285     public function __toString() {
00286         return (string)$this->id;
00287     }
00288 }
00289 
00293 class DatabasePostgres extends DatabaseBase {
00295     protected $mLastResult = null;
00296 
00298     protected $mAffectedRows = null;
00299 
00301     private $mInsertId = null;
00302 
00304     private $numericVersion = null;
00305 
00307     private $connectString;
00308 
00310     private $mTransactionState;
00311 
00313     private $mCoreSchema;
00314 
00315     function getType() {
00316         return 'postgres';
00317     }
00318 
00319     function cascadingDeletes() {
00320         return true;
00321     }
00322 
00323     function cleanupTriggers() {
00324         return true;
00325     }
00326 
00327     function strictIPs() {
00328         return true;
00329     }
00330 
00331     function realTimestamps() {
00332         return true;
00333     }
00334 
00335     function implicitGroupby() {
00336         return false;
00337     }
00338 
00339     function implicitOrderby() {
00340         return false;
00341     }
00342 
00343     function searchableIPs() {
00344         return true;
00345     }
00346 
00347     function functionalIndexes() {
00348         return true;
00349     }
00350 
00351     function hasConstraint( $name ) {
00352         $sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " .
00353             "WHERE c.connamespace = n.oid AND conname = '" .
00354             pg_escape_string( $this->mConn, $name ) . "' AND n.nspname = '" .
00355             pg_escape_string( $this->mConn, $this->getCoreSchema() ) . "'";
00356         $res = $this->doQuery( $sql );
00357 
00358         return $this->numRows( $res );
00359     }
00360 
00370     function open( $server, $user, $password, $dbName ) {
00371         # Test for Postgres support, to avoid suppressed fatal error
00372         if ( !function_exists( 'pg_connect' ) ) {
00373             throw new DBConnectionError(
00374                 $this,
00375                 "Postgres functions missing, have you compiled PHP with the --with-pgsql\n" .
00376                 "option? (Note: if you recently installed PHP, you may need to restart your\n" .
00377                 "webserver and database)\n"
00378             );
00379         }
00380 
00381         global $wgDBport;
00382 
00383         if ( !strlen( $user ) ) { # e.g. the class is being loaded
00384             return null;
00385         }
00386 
00387         $this->mServer = $server;
00388         $port = $wgDBport;
00389         $this->mUser = $user;
00390         $this->mPassword = $password;
00391         $this->mDBname = $dbName;
00392 
00393         $connectVars = array(
00394             'dbname' => $dbName,
00395             'user' => $user,
00396             'password' => $password
00397         );
00398         if ( $server != false && $server != '' ) {
00399             $connectVars['host'] = $server;
00400         }
00401         if ( $port != false && $port != '' ) {
00402             $connectVars['port'] = $port;
00403         }
00404         if ( $this->mFlags & DBO_SSL ) {
00405             $connectVars['sslmode'] = 1;
00406         }
00407 
00408         $this->connectString = $this->makeConnectionString( $connectVars, PGSQL_CONNECT_FORCE_NEW );
00409         $this->close();
00410         $this->installErrorHandler();
00411 
00412         try {
00413             $this->mConn = pg_connect( $this->connectString );
00414         } catch ( Exception $ex ) {
00415             $this->restoreErrorHandler();
00416             throw $ex;
00417         }
00418 
00419         $phpError = $this->restoreErrorHandler();
00420 
00421         if ( !$this->mConn ) {
00422             wfDebug( "DB connection error\n" );
00423             wfDebug( "Server: $server, Database: $dbName, User: $user, Password: " .
00424                 substr( $password, 0, 3 ) . "...\n" );
00425             wfDebug( $this->lastError() . "\n" );
00426             throw new DBConnectionError( $this, str_replace( "\n", ' ', $phpError ) );
00427         }
00428 
00429         $this->mOpened = true;
00430         $this->mTransactionState = new PostgresTransactionState( $this->mConn );
00431 
00432         global $wgCommandLineMode;
00433         # If called from the command-line (e.g. importDump), only show errors
00434         if ( $wgCommandLineMode ) {
00435             $this->doQuery( "SET client_min_messages = 'ERROR'" );
00436         }
00437 
00438         $this->query( "SET client_encoding='UTF8'", __METHOD__ );
00439         $this->query( "SET datestyle = 'ISO, YMD'", __METHOD__ );
00440         $this->query( "SET timezone = 'GMT'", __METHOD__ );
00441         $this->query( "SET standard_conforming_strings = on", __METHOD__ );
00442         if ( $this->getServerVersion() >= 9.0 ) {
00443             $this->query( "SET bytea_output = 'escape'", __METHOD__ ); // PHP bug 53127
00444         }
00445 
00446         global $wgDBmwschema;
00447         $this->determineCoreSchema( $wgDBmwschema );
00448 
00449         return $this->mConn;
00450     }
00451 
00458     function selectDB( $db ) {
00459         if ( $this->mDBname !== $db ) {
00460             return (bool)$this->open( $this->mServer, $this->mUser, $this->mPassword, $db );
00461         } else {
00462             return true;
00463         }
00464     }
00465 
00466     function makeConnectionString( $vars ) {
00467         $s = '';
00468         foreach ( $vars as $name => $value ) {
00469             $s .= "$name='" . str_replace( "'", "\\'", $value ) . "' ";
00470         }
00471 
00472         return $s;
00473     }
00474 
00480     protected function closeConnection() {
00481         return pg_close( $this->mConn );
00482     }
00483 
00484     public function doQuery( $sql ) {
00485         if ( function_exists( 'mb_convert_encoding' ) ) {
00486             $sql = mb_convert_encoding( $sql, 'UTF-8' );
00487         }
00488         $this->mTransactionState->check();
00489         if ( pg_send_query( $this->mConn, $sql ) === false ) {
00490             throw new DBUnexpectedError( $this, "Unable to post new query to PostgreSQL\n" );
00491         }
00492         $this->mLastResult = pg_get_result( $this->mConn );
00493         $this->mTransactionState->check();
00494         $this->mAffectedRows = null;
00495         if ( pg_result_error( $this->mLastResult ) ) {
00496             return false;
00497         }
00498 
00499         return $this->mLastResult;
00500     }
00501 
00502     protected function dumpError() {
00503         $diags = array(
00504             PGSQL_DIAG_SEVERITY,
00505             PGSQL_DIAG_SQLSTATE,
00506             PGSQL_DIAG_MESSAGE_PRIMARY,
00507             PGSQL_DIAG_MESSAGE_DETAIL,
00508             PGSQL_DIAG_MESSAGE_HINT,
00509             PGSQL_DIAG_STATEMENT_POSITION,
00510             PGSQL_DIAG_INTERNAL_POSITION,
00511             PGSQL_DIAG_INTERNAL_QUERY,
00512             PGSQL_DIAG_CONTEXT,
00513             PGSQL_DIAG_SOURCE_FILE,
00514             PGSQL_DIAG_SOURCE_LINE,
00515             PGSQL_DIAG_SOURCE_FUNCTION
00516         );
00517         foreach ( $diags as $d ) {
00518             wfDebug( sprintf( "PgSQL ERROR(%d): %s\n",
00519                 $d, pg_result_error_field( $this->mLastResult, $d ) ) );
00520         }
00521     }
00522 
00523     function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) {
00524         /* Transaction stays in the ERROR state until rolledback */
00525         if ( $tempIgnore ) {
00526             /* Check for constraint violation */
00527             if ( $errno === '23505' ) {
00528                 parent::reportQueryError( $error, $errno, $sql, $fname, $tempIgnore );
00529 
00530                 return;
00531             }
00532         }
00533         /* Don't ignore serious errors */
00534         $this->rollback( __METHOD__ );
00535         parent::reportQueryError( $error, $errno, $sql, $fname, false );
00536     }
00537 
00538     function queryIgnore( $sql, $fname = __METHOD__ ) {
00539         return $this->query( $sql, $fname, true );
00540     }
00541 
00546     function freeResult( $res ) {
00547         if ( $res instanceof ResultWrapper ) {
00548             $res = $res->result;
00549         }
00550         wfSuppressWarnings();
00551         $ok = pg_free_result( $res );
00552         wfRestoreWarnings();
00553         if ( !$ok ) {
00554             throw new DBUnexpectedError( $this, "Unable to free Postgres result\n" );
00555         }
00556     }
00557 
00563     function fetchObject( $res ) {
00564         if ( $res instanceof ResultWrapper ) {
00565             $res = $res->result;
00566         }
00567         wfSuppressWarnings();
00568         $row = pg_fetch_object( $res );
00569         wfRestoreWarnings();
00570         # @todo FIXME: HACK HACK HACK HACK debug
00571 
00572         # @todo hashar: not sure if the following test really trigger if the object
00573         #          fetching failed.
00574         if ( pg_last_error( $this->mConn ) ) {
00575             throw new DBUnexpectedError(
00576                 $this,
00577                 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) )
00578             );
00579         }
00580 
00581         return $row;
00582     }
00583 
00584     function fetchRow( $res ) {
00585         if ( $res instanceof ResultWrapper ) {
00586             $res = $res->result;
00587         }
00588         wfSuppressWarnings();
00589         $row = pg_fetch_array( $res );
00590         wfRestoreWarnings();
00591         if ( pg_last_error( $this->mConn ) ) {
00592             throw new DBUnexpectedError(
00593                 $this,
00594                 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) )
00595             );
00596         }
00597 
00598         return $row;
00599     }
00600 
00601     function numRows( $res ) {
00602         if ( $res instanceof ResultWrapper ) {
00603             $res = $res->result;
00604         }
00605         wfSuppressWarnings();
00606         $n = pg_num_rows( $res );
00607         wfRestoreWarnings();
00608         if ( pg_last_error( $this->mConn ) ) {
00609             throw new DBUnexpectedError(
00610                 $this,
00611                 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) )
00612             );
00613         }
00614 
00615         return $n;
00616     }
00617 
00618     function numFields( $res ) {
00619         if ( $res instanceof ResultWrapper ) {
00620             $res = $res->result;
00621         }
00622 
00623         return pg_num_fields( $res );
00624     }
00625 
00626     function fieldName( $res, $n ) {
00627         if ( $res instanceof ResultWrapper ) {
00628             $res = $res->result;
00629         }
00630 
00631         return pg_field_name( $res, $n );
00632     }
00633 
00640     function insertId() {
00641         return $this->mInsertId;
00642     }
00643 
00649     function dataSeek( $res, $row ) {
00650         if ( $res instanceof ResultWrapper ) {
00651             $res = $res->result;
00652         }
00653 
00654         return pg_result_seek( $res, $row );
00655     }
00656 
00657     function lastError() {
00658         if ( $this->mConn ) {
00659             if ( $this->mLastResult ) {
00660                 return pg_result_error( $this->mLastResult );
00661             } else {
00662                 return pg_last_error();
00663             }
00664         } else {
00665             return 'No database connection';
00666         }
00667     }
00668 
00669     function lastErrno() {
00670         if ( $this->mLastResult ) {
00671             return pg_result_error_field( $this->mLastResult, PGSQL_DIAG_SQLSTATE );
00672         } else {
00673             return false;
00674         }
00675     }
00676 
00677     function affectedRows() {
00678         if ( !is_null( $this->mAffectedRows ) ) {
00679             // Forced result for simulated queries
00680             return $this->mAffectedRows;
00681         }
00682         if ( empty( $this->mLastResult ) ) {
00683             return 0;
00684         }
00685 
00686         return pg_affected_rows( $this->mLastResult );
00687     }
00688 
00703     function estimateRowCount( $table, $vars = '*', $conds = '',
00704         $fname = __METHOD__, $options = array()
00705     ) {
00706         $options['EXPLAIN'] = true;
00707         $res = $this->select( $table, $vars, $conds, $fname, $options );
00708         $rows = -1;
00709         if ( $res ) {
00710             $row = $this->fetchRow( $res );
00711             $count = array();
00712             if ( preg_match( '/rows=(\d+)/', $row[0], $count ) ) {
00713                 $rows = $count[1];
00714             }
00715         }
00716 
00717         return $rows;
00718     }
00719 
00729     function indexInfo( $table, $index, $fname = __METHOD__ ) {
00730         $sql = "SELECT indexname FROM pg_indexes WHERE tablename='$table'";
00731         $res = $this->query( $sql, $fname );
00732         if ( !$res ) {
00733             return null;
00734         }
00735         foreach ( $res as $row ) {
00736             if ( $row->indexname == $this->indexName( $index ) ) {
00737                 return $row;
00738             }
00739         }
00740 
00741         return false;
00742     }
00743 
00752     function indexAttributes( $index, $schema = false ) {
00753         if ( $schema === false ) {
00754             $schema = $this->getCoreSchema();
00755         }
00756         /*
00757          * A subquery would be not needed if we didn't care about the order
00758          * of attributes, but we do
00759          */
00760         $sql = <<<__INDEXATTR__
00761 
00762             SELECT opcname,
00763                 attname,
00764                 i.indoption[s.g] as option,
00765                 pg_am.amname
00766             FROM
00767                 (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
00768                     FROM
00769                         pg_index isub
00770                     JOIN pg_class cis
00771                         ON cis.oid=isub.indexrelid
00772                     JOIN pg_namespace ns
00773                         ON cis.relnamespace = ns.oid
00774                     WHERE cis.relname='$index' AND ns.nspname='$schema') AS s,
00775                 pg_attribute,
00776                 pg_opclass opcls,
00777                 pg_am,
00778                 pg_class ci
00779                 JOIN pg_index i
00780                     ON ci.oid=i.indexrelid
00781                 JOIN pg_class ct
00782                     ON ct.oid = i.indrelid
00783                 JOIN pg_namespace n
00784                     ON ci.relnamespace = n.oid
00785                 WHERE
00786                     ci.relname='$index' AND n.nspname='$schema'
00787                     AND attrelid = ct.oid
00788                     AND i.indkey[s.g] = attnum
00789                     AND i.indclass[s.g] = opcls.oid
00790                     AND pg_am.oid = opcls.opcmethod
00791 __INDEXATTR__;
00792         $res = $this->query( $sql, __METHOD__ );
00793         $a = array();
00794         if ( $res ) {
00795             foreach ( $res as $row ) {
00796                 $a[] = array(
00797                     $row->attname,
00798                     $row->opcname,
00799                     $row->amname,
00800                     $row->option );
00801             }
00802         } else {
00803             return null;
00804         }
00805 
00806         return $a;
00807     }
00808 
00809     function indexUnique( $table, $index, $fname = __METHOD__ ) {
00810         $sql = "SELECT indexname FROM pg_indexes WHERE tablename='{$table}'" .
00811             " AND indexdef LIKE 'CREATE UNIQUE%(" .
00812             $this->strencode( $this->indexName( $index ) ) .
00813             ")'";
00814         $res = $this->query( $sql, $fname );
00815         if ( !$res ) {
00816             return null;
00817         }
00818 
00819         return $res->numRows() > 0;
00820     }
00821 
00832     function selectSQLText( $table, $vars, $conds = '', $fname = __METHOD__,
00833         $options = array(), $join_conds = array()
00834     ) {
00835         if ( is_array( $options ) ) {
00836             $forUpdateKey = array_search( 'FOR UPDATE', $options );
00837             if ( $forUpdateKey !== false && $join_conds ) {
00838                 unset( $options[$forUpdateKey] );
00839 
00840                 foreach ( $join_conds as $table_cond => $join_cond ) {
00841                     if ( 0 === preg_match( '/^(?:LEFT|RIGHT|FULL)(?: OUTER)? JOIN$/i', $join_cond[0] ) ) {
00842                         $options['FOR UPDATE'][] = $table_cond;
00843                     }
00844                 }
00845             }
00846 
00847             if ( isset( $options['ORDER BY'] ) && $options['ORDER BY'] == 'NULL' ) {
00848                 unset( $options['ORDER BY'] );
00849             }
00850         }
00851 
00852         return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds );
00853     }
00854 
00867     function insert( $table, $args, $fname = __METHOD__, $options = array() ) {
00868         if ( !count( $args ) ) {
00869             return true;
00870         }
00871 
00872         $table = $this->tableName( $table );
00873         if ( !isset( $this->numericVersion ) ) {
00874             $this->getServerVersion();
00875         }
00876 
00877         if ( !is_array( $options ) ) {
00878             $options = array( $options );
00879         }
00880 
00881         if ( isset( $args[0] ) && is_array( $args[0] ) ) {
00882             $multi = true;
00883             $keys = array_keys( $args[0] );
00884         } else {
00885             $multi = false;
00886             $keys = array_keys( $args );
00887         }
00888 
00889         // If IGNORE is set, we use savepoints to emulate mysql's behavior
00890         $savepoint = null;
00891         if ( in_array( 'IGNORE', $options ) ) {
00892             $savepoint = new SavepointPostgres( $this, 'mw' );
00893             $olde = error_reporting( 0 );
00894             // For future use, we may want to track the number of actual inserts
00895             // Right now, insert (all writes) simply return true/false
00896             $numrowsinserted = 0;
00897         }
00898 
00899         $sql = "INSERT INTO $table (" . implode( ',', $keys ) . ') VALUES ';
00900 
00901         if ( $multi ) {
00902             if ( $this->numericVersion >= 8.2 && !$savepoint ) {
00903                 $first = true;
00904                 foreach ( $args as $row ) {
00905                     if ( $first ) {
00906                         $first = false;
00907                     } else {
00908                         $sql .= ',';
00909                     }
00910                     $sql .= '(' . $this->makeList( $row ) . ')';
00911                 }
00912                 $res = (bool)$this->query( $sql, $fname, $savepoint );
00913             } else {
00914                 $res = true;
00915                 $origsql = $sql;
00916                 foreach ( $args as $row ) {
00917                     $tempsql = $origsql;
00918                     $tempsql .= '(' . $this->makeList( $row ) . ')';
00919 
00920                     if ( $savepoint ) {
00921                         $savepoint->savepoint();
00922                     }
00923 
00924                     $tempres = (bool)$this->query( $tempsql, $fname, $savepoint );
00925 
00926                     if ( $savepoint ) {
00927                         $bar = pg_last_error();
00928                         if ( $bar != false ) {
00929                             $savepoint->rollback();
00930                         } else {
00931                             $savepoint->release();
00932                             $numrowsinserted++;
00933                         }
00934                     }
00935 
00936                     // If any of them fail, we fail overall for this function call
00937                     // Note that this will be ignored if IGNORE is set
00938                     if ( !$tempres ) {
00939                         $res = false;
00940                     }
00941                 }
00942             }
00943         } else {
00944             // Not multi, just a lone insert
00945             if ( $savepoint ) {
00946                 $savepoint->savepoint();
00947             }
00948 
00949             $sql .= '(' . $this->makeList( $args ) . ')';
00950             $res = (bool)$this->query( $sql, $fname, $savepoint );
00951             if ( $savepoint ) {
00952                 $bar = pg_last_error();
00953                 if ( $bar != false ) {
00954                     $savepoint->rollback();
00955                 } else {
00956                     $savepoint->release();
00957                     $numrowsinserted++;
00958                 }
00959             }
00960         }
00961         if ( $savepoint ) {
00962             error_reporting( $olde );
00963             $savepoint->commit();
00964 
00965             // Set the affected row count for the whole operation
00966             $this->mAffectedRows = $numrowsinserted;
00967 
00968             // IGNORE always returns true
00969             return true;
00970         }
00971 
00972         return $res;
00973     }
00974 
00993     function insertSelect( $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__,
00994         $insertOptions = array(), $selectOptions = array() ) {
00995         $destTable = $this->tableName( $destTable );
00996 
00997         if ( !is_array( $insertOptions ) ) {
00998             $insertOptions = array( $insertOptions );
00999         }
01000 
01001         /*
01002          * If IGNORE is set, we use savepoints to emulate mysql's behavior
01003          * Ignore LOW PRIORITY option, since it is MySQL-specific
01004          */
01005         $savepoint = null;
01006         if ( in_array( 'IGNORE', $insertOptions ) ) {
01007             $savepoint = new SavepointPostgres( $this, 'mw' );
01008             $olde = error_reporting( 0 );
01009             $numrowsinserted = 0;
01010             $savepoint->savepoint();
01011         }
01012 
01013         if ( !is_array( $selectOptions ) ) {
01014             $selectOptions = array( $selectOptions );
01015         }
01016         list( $startOpts, $useIndex, $tailOpts ) = $this->makeSelectOptions( $selectOptions );
01017         if ( is_array( $srcTable ) ) {
01018             $srcTable = implode( ',', array_map( array( &$this, 'tableName' ), $srcTable ) );
01019         } else {
01020             $srcTable = $this->tableName( $srcTable );
01021         }
01022 
01023         $sql = "INSERT INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ')' .
01024             " SELECT $startOpts " . implode( ',', $varMap ) .
01025             " FROM $srcTable $useIndex";
01026 
01027         if ( $conds != '*' ) {
01028             $sql .= ' WHERE ' . $this->makeList( $conds, LIST_AND );
01029         }
01030 
01031         $sql .= " $tailOpts";
01032 
01033         $res = (bool)$this->query( $sql, $fname, $savepoint );
01034         if ( $savepoint ) {
01035             $bar = pg_last_error();
01036             if ( $bar != false ) {
01037                 $savepoint->rollback();
01038             } else {
01039                 $savepoint->release();
01040                 $numrowsinserted++;
01041             }
01042             error_reporting( $olde );
01043             $savepoint->commit();
01044 
01045             // Set the affected row count for the whole operation
01046             $this->mAffectedRows = $numrowsinserted;
01047 
01048             // IGNORE always returns true
01049             return true;
01050         }
01051 
01052         return $res;
01053     }
01054 
01055     function tableName( $name, $format = 'quoted' ) {
01056         # Replace reserved words with better ones
01057         switch ( $name ) {
01058             case 'user':
01059                 return $this->realTableName( 'mwuser', $format );
01060             case 'text':
01061                 return $this->realTableName( 'pagecontent', $format );
01062             default:
01063                 return $this->realTableName( $name, $format );
01064         }
01065     }
01066 
01067     /* Don't cheat on installer */
01068     function realTableName( $name, $format = 'quoted' ) {
01069         return parent::tableName( $name, $format );
01070     }
01071 
01078     function nextSequenceValue( $seqName ) {
01079         $safeseq = str_replace( "'", "''", $seqName );
01080         $res = $this->query( "SELECT nextval('$safeseq')" );
01081         $row = $this->fetchRow( $res );
01082         $this->mInsertId = $row[0];
01083 
01084         return $this->mInsertId;
01085     }
01086 
01093     function currentSequenceValue( $seqName ) {
01094         $safeseq = str_replace( "'", "''", $seqName );
01095         $res = $this->query( "SELECT currval('$safeseq')" );
01096         $row = $this->fetchRow( $res );
01097         $currval = $row[0];
01098 
01099         return $currval;
01100     }
01101 
01102     # Returns the size of a text field, or -1 for "unlimited"
01103     function textFieldSize( $table, $field ) {
01104         $table = $this->tableName( $table );
01105         $sql = "SELECT t.typname as ftype,a.atttypmod as size
01106             FROM pg_class c, pg_attribute a, pg_type t
01107             WHERE relname='$table' AND a.attrelid=c.oid AND
01108                 a.atttypid=t.oid and a.attname='$field'";
01109         $res = $this->query( $sql );
01110         $row = $this->fetchObject( $res );
01111         if ( $row->ftype == 'varchar' ) {
01112             $size = $row->size - 4;
01113         } else {
01114             $size = $row->size;
01115         }
01116 
01117         return $size;
01118     }
01119 
01120     function limitResult( $sql, $limit, $offset = false ) {
01121         return "$sql LIMIT $limit " . ( is_numeric( $offset ) ? " OFFSET {$offset} " : '' );
01122     }
01123 
01124     function wasDeadlock() {
01125         return $this->lastErrno() == '40P01';
01126     }
01127 
01128     function duplicateTableStructure( $oldName, $newName, $temporary = false, $fname = __METHOD__ ) {
01129         $newName = $this->addIdentifierQuotes( $newName );
01130         $oldName = $this->addIdentifierQuotes( $oldName );
01131 
01132         return $this->query( 'CREATE ' . ( $temporary ? 'TEMPORARY ' : '' ) . " TABLE $newName " .
01133             "(LIKE $oldName INCLUDING DEFAULTS)", $fname );
01134     }
01135 
01136     function listTables( $prefix = null, $fname = __METHOD__ ) {
01137         $eschema = $this->addQuotes( $this->getCoreSchema() );
01138         $result = $this->query( "SELECT tablename FROM pg_tables WHERE schemaname = $eschema", $fname );
01139         $endArray = array();
01140 
01141         foreach ( $result as $table ) {
01142             $vars = get_object_vars( $table );
01143             $table = array_pop( $vars );
01144             if ( !$prefix || strpos( $table, $prefix ) === 0 ) {
01145                 $endArray[] = $table;
01146             }
01147         }
01148 
01149         return $endArray;
01150     }
01151 
01152     function timestamp( $ts = 0 ) {
01153         return wfTimestamp( TS_POSTGRES, $ts );
01154     }
01155 
01156     /*
01157      * Posted by cc[plus]php[at]c2se[dot]com on 25-Mar-2009 09:12
01158      * to http://www.php.net/manual/en/ref.pgsql.php
01159      *
01160      * Parsing a postgres array can be a tricky problem, he's my
01161      * take on this, it handles multi-dimensional arrays plus
01162      * escaping using a nasty regexp to determine the limits of each
01163      * data-item.
01164      *
01165      * This should really be handled by PHP PostgreSQL module
01166      *
01167      * @since 1.19
01168      * @param string $text Postgreql array returned in a text form like {a,b}
01169      * @param string $output
01170      * @param int $limit
01171      * @param int $offset
01172      * @return string
01173      */
01174     function pg_array_parse( $text, &$output, $limit = false, $offset = 1 ) {
01175         if ( false === $limit ) {
01176             $limit = strlen( $text ) - 1;
01177             $output = array();
01178         }
01179         if ( '{}' == $text ) {
01180             return $output;
01181         }
01182         do {
01183             if ( '{' != $text[$offset] ) {
01184                 preg_match( "/(\\{?\"([^\"\\\\]|\\\\.)*\"|[^,{}]+)+([,}]+)/",
01185                     $text, $match, 0, $offset );
01186                 $offset += strlen( $match[0] );
01187                 $output[] = ( '"' != $match[1][0]
01188                     ? $match[1]
01189                     : stripcslashes( substr( $match[1], 1, -1 ) ) );
01190                 if ( '},' == $match[3] ) {
01191                     return $output;
01192                 }
01193             } else {
01194                 $offset = $this->pg_array_parse( $text, $output, $limit, $offset + 1 );
01195             }
01196         } while ( $limit > $offset );
01197 
01198         return $output;
01199     }
01200 
01204     public function aggregateValue( $valuedata, $valuename = 'value' ) {
01205         return $valuedata;
01206     }
01207 
01211     public function getSoftwareLink() {
01212         return '[{{int:version-db-postgres-url}} PostgreSQL]';
01213     }
01214 
01222     function getCurrentSchema() {
01223         $res = $this->query( "SELECT current_schema()", __METHOD__ );
01224         $row = $this->fetchRow( $res );
01225 
01226         return $row[0];
01227     }
01228 
01239     function getSchemas() {
01240         $res = $this->query( "SELECT current_schemas(false)", __METHOD__ );
01241         $row = $this->fetchRow( $res );
01242         $schemas = array();
01243 
01244         /* PHP pgsql support does not support array type, "{a,b}" string is returned */
01245 
01246         return $this->pg_array_parse( $row[0], $schemas );
01247     }
01248 
01258     function getSearchPath() {
01259         $res = $this->query( "SHOW search_path", __METHOD__ );
01260         $row = $this->fetchRow( $res );
01261 
01262         /* PostgreSQL returns SHOW values as strings */
01263 
01264         return explode( ",", $row[0] );
01265     }
01266 
01274     function setSearchPath( $search_path ) {
01275         $this->query( "SET search_path = " . implode( ", ", $search_path ) );
01276     }
01277 
01292     function determineCoreSchema( $desiredSchema ) {
01293         $this->begin( __METHOD__ );
01294         if ( $this->schemaExists( $desiredSchema ) ) {
01295             if ( in_array( $desiredSchema, $this->getSchemas() ) ) {
01296                 $this->mCoreSchema = $desiredSchema;
01297                 wfDebug( "Schema \"" . $desiredSchema . "\" already in the search path\n" );
01298             } else {
01304                 $search_path = $this->getSearchPath();
01305                 array_unshift( $search_path,
01306                     $this->addIdentifierQuotes( $desiredSchema ) );
01307                 $this->setSearchPath( $search_path );
01308                 $this->mCoreSchema = $desiredSchema;
01309                 wfDebug( "Schema \"" . $desiredSchema . "\" added to the search path\n" );
01310             }
01311         } else {
01312             $this->mCoreSchema = $this->getCurrentSchema();
01313             wfDebug( "Schema \"" . $desiredSchema . "\" not found, using current \"" .
01314                 $this->mCoreSchema . "\"\n" );
01315         }
01316         /* Commit SET otherwise it will be rollbacked on error or IGNORE SELECT */
01317         $this->commit( __METHOD__ );
01318     }
01319 
01326     function getCoreSchema() {
01327         return $this->mCoreSchema;
01328     }
01329 
01333     function getServerVersion() {
01334         if ( !isset( $this->numericVersion ) ) {
01335             $versionInfo = pg_version( $this->mConn );
01336             if ( version_compare( $versionInfo['client'], '7.4.0', 'lt' ) ) {
01337                 // Old client, abort install
01338                 $this->numericVersion = '7.3 or earlier';
01339             } elseif ( isset( $versionInfo['server'] ) ) {
01340                 // Normal client
01341                 $this->numericVersion = $versionInfo['server'];
01342             } else {
01343                 // Bug 16937: broken pgsql extension from PHP<5.3
01344                 $this->numericVersion = pg_parameter_status( $this->mConn, 'server_version' );
01345             }
01346         }
01347 
01348         return $this->numericVersion;
01349     }
01350 
01359     function relationExists( $table, $types, $schema = false ) {
01360         if ( !is_array( $types ) ) {
01361             $types = array( $types );
01362         }
01363         if ( !$schema ) {
01364             $schema = $this->getCoreSchema();
01365         }
01366         $table = $this->realTableName( $table, 'raw' );
01367         $etable = $this->addQuotes( $table );
01368         $eschema = $this->addQuotes( $schema );
01369         $sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
01370             . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
01371             . "AND c.relkind IN ('" . implode( "','", $types ) . "')";
01372         $res = $this->query( $sql );
01373         $count = $res ? $res->numRows() : 0;
01374 
01375         return (bool)$count;
01376     }
01377 
01386     function tableExists( $table, $fname = __METHOD__, $schema = false ) {
01387         return $this->relationExists( $table, array( 'r', 'v' ), $schema );
01388     }
01389 
01390     function sequenceExists( $sequence, $schema = false ) {
01391         return $this->relationExists( $sequence, 'S', $schema );
01392     }
01393 
01394     function triggerExists( $table, $trigger ) {
01395         $q = <<<SQL
01396     SELECT 1 FROM pg_class, pg_namespace, pg_trigger
01397         WHERE relnamespace=pg_namespace.oid AND relkind='r'
01398               AND tgrelid=pg_class.oid
01399               AND nspname=%s AND relname=%s AND tgname=%s
01400 SQL;
01401         $res = $this->query(
01402             sprintf(
01403                 $q,
01404                 $this->addQuotes( $this->getCoreSchema() ),
01405                 $this->addQuotes( $table ),
01406                 $this->addQuotes( $trigger )
01407             )
01408         );
01409         if ( !$res ) {
01410             return null;
01411         }
01412         $rows = $res->numRows();
01413 
01414         return $rows;
01415     }
01416 
01417     function ruleExists( $table, $rule ) {
01418         $exists = $this->selectField( 'pg_rules', 'rulename',
01419             array(
01420                 'rulename' => $rule,
01421                 'tablename' => $table,
01422                 'schemaname' => $this->getCoreSchema()
01423             )
01424         );
01425 
01426         return $exists === $rule;
01427     }
01428 
01429     function constraintExists( $table, $constraint ) {
01430         $sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
01431             "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
01432             $this->addQuotes( $this->getCoreSchema() ),
01433             $this->addQuotes( $table ),
01434             $this->addQuotes( $constraint )
01435         );
01436         $res = $this->query( $sql );
01437         if ( !$res ) {
01438             return null;
01439         }
01440         $rows = $res->numRows();
01441 
01442         return $rows;
01443     }
01444 
01450     function schemaExists( $schema ) {
01451         $exists = $this->selectField( '"pg_catalog"."pg_namespace"', 1,
01452             array( 'nspname' => $schema ), __METHOD__ );
01453 
01454         return (bool)$exists;
01455     }
01456 
01462     function roleExists( $roleName ) {
01463         $exists = $this->selectField( '"pg_catalog"."pg_roles"', 1,
01464             array( 'rolname' => $roleName ), __METHOD__ );
01465 
01466         return (bool)$exists;
01467     }
01468 
01469     function fieldInfo( $table, $field ) {
01470         return PostgresField::fromText( $this, $table, $field );
01471     }
01472 
01479     function fieldType( $res, $index ) {
01480         if ( $res instanceof ResultWrapper ) {
01481             $res = $res->result;
01482         }
01483 
01484         return pg_field_type( $res, $index );
01485     }
01486 
01491     function encodeBlob( $b ) {
01492         return new Blob( pg_escape_bytea( $this->mConn, $b ) );
01493     }
01494 
01495     function decodeBlob( $b ) {
01496         if ( $b instanceof Blob ) {
01497             $b = $b->fetch();
01498         }
01499 
01500         return pg_unescape_bytea( $b );
01501     }
01502 
01503     function strencode( $s ) { # Should not be called by us
01504         return pg_escape_string( $this->mConn, $s );
01505     }
01506 
01511     function addQuotes( $s ) {
01512         if ( is_null( $s ) ) {
01513             return 'NULL';
01514         } elseif ( is_bool( $s ) ) {
01515             return intval( $s );
01516         } elseif ( $s instanceof Blob ) {
01517             return "'" . $s->fetch( $s ) . "'";
01518         }
01519 
01520         return "'" . pg_escape_string( $this->mConn, $s ) . "'";
01521     }
01522 
01530     protected function replaceVars( $ins ) {
01531         $ins = parent::replaceVars( $ins );
01532 
01533         if ( $this->numericVersion >= 8.3 ) {
01534             // Thanks for not providing backwards-compatibility, 8.3
01535             $ins = preg_replace( "/to_tsvector\s*\(\s*'default'\s*,/", 'to_tsvector(', $ins );
01536         }
01537 
01538         if ( $this->numericVersion <= 8.1 ) { // Our minimum version
01539             $ins = str_replace( 'USING gin', 'USING gist', $ins );
01540         }
01541 
01542         return $ins;
01543     }
01544 
01552     function makeSelectOptions( $options ) {
01553         $preLimitTail = $postLimitTail = '';
01554         $startOpts = $useIndex = '';
01555 
01556         $noKeyOptions = array();
01557         foreach ( $options as $key => $option ) {
01558             if ( is_numeric( $key ) ) {
01559                 $noKeyOptions[$option] = true;
01560             }
01561         }
01562 
01563         $preLimitTail .= $this->makeGroupByWithHaving( $options );
01564 
01565         $preLimitTail .= $this->makeOrderBy( $options );
01566 
01567         //if ( isset( $options['LIMIT'] ) ) {
01568         //  $tailOpts .= $this->limitResult( '', $options['LIMIT'],
01569         //      isset( $options['OFFSET'] ) ? $options['OFFSET']
01570         //      : false );
01571         //}
01572 
01573         if ( isset( $options['FOR UPDATE'] ) ) {
01574             $postLimitTail .= ' FOR UPDATE OF ' . implode( ', ', $options['FOR UPDATE'] );
01575         } elseif ( isset( $noKeyOptions['FOR UPDATE'] ) ) {
01576             $postLimitTail .= ' FOR UPDATE';
01577         }
01578 
01579         if ( isset( $noKeyOptions['DISTINCT'] ) || isset( $noKeyOptions['DISTINCTROW'] ) ) {
01580             $startOpts .= 'DISTINCT';
01581         }
01582 
01583         return array( $startOpts, $useIndex, $preLimitTail, $postLimitTail );
01584     }
01585 
01586     function getDBname() {
01587         return $this->mDBname;
01588     }
01589 
01590     function getServer() {
01591         return $this->mServer;
01592     }
01593 
01594     function buildConcat( $stringList ) {
01595         return implode( ' || ', $stringList );
01596     }
01597 
01598     public function buildGroupConcatField(
01599         $delimiter, $table, $field, $conds = '', $options = array(), $join_conds = array()
01600     ) {
01601         $fld = "array_to_string(array_agg($field)," . $this->addQuotes( $delimiter ) . ')';
01602 
01603         return '(' . $this->selectSQLText( $table, $fld, $conds, null, array(), $join_conds ) . ')';
01604     }
01605 
01606     public function getSearchEngine() {
01607         return 'SearchPostgres';
01608     }
01609 
01610     public function streamStatementEnd( &$sql, &$newLine ) {
01611         # Allow dollar quoting for function declarations
01612         if ( substr( $newLine, 0, 4 ) == '$mw$' ) {
01613             if ( $this->delimiter ) {
01614                 $this->delimiter = false;
01615             } else {
01616                 $this->delimiter = ';';
01617             }
01618         }
01619 
01620         return parent::streamStatementEnd( $sql, $newLine );
01621     }
01622 
01632     public function lockIsFree( $lockName, $method ) {
01633         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01634         $result = $this->query( "SELECT (CASE(pg_try_advisory_lock($key))
01635             WHEN 'f' THEN 'f' ELSE pg_advisory_unlock($key) END) AS lockstatus", $method );
01636         $row = $this->fetchObject( $result );
01637 
01638         return ( $row->lockstatus === 't' );
01639     }
01640 
01648     public function lock( $lockName, $method, $timeout = 5 ) {
01649         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01650         for ( $attempts = 1; $attempts <= $timeout; ++$attempts ) {
01651             $result = $this->query(
01652                 "SELECT pg_try_advisory_lock($key) AS lockstatus", $method );
01653             $row = $this->fetchObject( $result );
01654             if ( $row->lockstatus === 't' ) {
01655                 return true;
01656             } else {
01657                 sleep( 1 );
01658             }
01659         }
01660         wfDebug( __METHOD__ . " failed to acquire lock\n" );
01661 
01662         return false;
01663     }
01664 
01672     public function unlock( $lockName, $method ) {
01673         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01674         $result = $this->query( "SELECT pg_advisory_unlock($key) as lockstatus", $method );
01675         $row = $this->fetchObject( $result );
01676 
01677         return ( $row->lockstatus === 't' );
01678     }
01679 
01684     private function bigintFromLockName( $lockName ) {
01685         return wfBaseConvert( substr( sha1( $lockName ), 0, 15 ), 16, 10 );
01686     }
01687 } // end DatabasePostgres class