MediaWiki  REL1_22
DatabasePostgres.php
Go to the documentation of this file.
00001 <?php
00024 class PostgresField implements Field {
00025     private $name, $tablename, $type, $nullable, $max_length, $deferred, $deferrable, $conname,
00026         $has_default, $default;
00027 
00034     static function fromText( $db, $table, $field ) {
00035         $q = <<<SQL
00036 SELECT
00037  attnotnull, attlen, conname AS conname,
00038  atthasdef,
00039  adsrc,
00040  COALESCE(condeferred, 'f') AS deferred,
00041  COALESCE(condeferrable, 'f') AS deferrable,
00042  CASE WHEN typname = 'int2' THEN 'smallint'
00043   WHEN typname = 'int4' THEN 'integer'
00044   WHEN typname = 'int8' THEN 'bigint'
00045   WHEN typname = 'bpchar' THEN 'char'
00046  ELSE typname END AS typname
00047 FROM pg_class c
00048 JOIN pg_namespace n ON (n.oid = c.relnamespace)
00049 JOIN pg_attribute a ON (a.attrelid = c.oid)
00050 JOIN pg_type t ON (t.oid = a.atttypid)
00051 LEFT JOIN pg_constraint o ON (o.conrelid = c.oid AND a.attnum = ANY(o.conkey) AND o.contype = 'f')
00052 LEFT JOIN pg_attrdef d on c.oid=d.adrelid and a.attnum=d.adnum
00053 WHERE relkind = 'r'
00054 AND nspname=%s
00055 AND relname=%s
00056 AND attname=%s;
00057 SQL;
00058 
00059         $table = $db->tableName( $table, 'raw' );
00060         $res = $db->query(
00061             sprintf( $q,
00062                 $db->addQuotes( $db->getCoreSchema() ),
00063                 $db->addQuotes( $table ),
00064                 $db->addQuotes( $field )
00065             )
00066         );
00067         $row = $db->fetchObject( $res );
00068         if ( !$row ) {
00069             return null;
00070         }
00071         $n = new PostgresField;
00072         $n->type = $row->typname;
00073         $n->nullable = ( $row->attnotnull == 'f' );
00074         $n->name = $field;
00075         $n->tablename = $table;
00076         $n->max_length = $row->attlen;
00077         $n->deferrable = ( $row->deferrable == 't' );
00078         $n->deferred = ( $row->deferred == 't' );
00079         $n->conname = $row->conname;
00080         $n->has_default = ( $row->atthasdef === 't' );
00081         $n->default = $row->adsrc;
00082         return $n;
00083     }
00084 
00085     function name() {
00086         return $this->name;
00087     }
00088 
00089     function tableName() {
00090         return $this->tablename;
00091     }
00092 
00093     function type() {
00094         return $this->type;
00095     }
00096 
00097     function isNullable() {
00098         return $this->nullable;
00099     }
00100 
00101     function maxLength() {
00102         return $this->max_length;
00103     }
00104 
00105     function is_deferrable() {
00106         return $this->deferrable;
00107     }
00108 
00109     function is_deferred() {
00110         return $this->deferred;
00111     }
00112 
00113     function conname() {
00114         return $this->conname;
00115     }
00119     function defaultValue() {
00120         if ( $this->has_default ) {
00121             return $this->default;
00122         } else {
00123             return false;
00124         }
00125     }
00126 
00127 }
00128 
00136 class PostgresTransactionState {
00137 
00138     static $WATCHED = array(
00139         array(
00140             "desc" => "%s: Connection state changed from %s -> %s\n",
00141             "states" => array(
00142                 PGSQL_CONNECTION_OK => "OK",
00143                 PGSQL_CONNECTION_BAD => "BAD"
00144             )
00145         ),
00146         array(
00147             "desc" => "%s: Transaction state changed from %s -> %s\n",
00148             "states" => array(
00149                 PGSQL_TRANSACTION_IDLE => "IDLE",
00150                 PGSQL_TRANSACTION_ACTIVE => "ACTIVE",
00151                 PGSQL_TRANSACTION_INTRANS => "TRANS",
00152                 PGSQL_TRANSACTION_INERROR => "ERROR",
00153                 PGSQL_TRANSACTION_UNKNOWN => "UNKNOWN"
00154             )
00155         )
00156     );
00157 
00158     public function __construct( $conn ) {
00159         $this->mConn = $conn;
00160         $this->update();
00161         $this->mCurrentState = $this->mNewState;
00162     }
00163 
00164     public function update() {
00165         $this->mNewState = array(
00166             pg_connection_status( $this->mConn ),
00167             pg_transaction_status( $this->mConn )
00168         );
00169     }
00170 
00171     public function check() {
00172         global $wgDebugDBTransactions;
00173         $this->update();
00174         if ( $wgDebugDBTransactions ) {
00175             if ( $this->mCurrentState !== $this->mNewState ) {
00176                 $old = reset( $this->mCurrentState );
00177                 $new = reset( $this->mNewState );
00178                 foreach ( self::$WATCHED as $watched ) {
00179                     if ( $old !== $new ) {
00180                         $this->log_changed( $old, $new, $watched );
00181                     }
00182                     $old = next( $this->mCurrentState );
00183                     $new = next( $this->mNewState );
00184 
00185                 }
00186             }
00187         }
00188         $this->mCurrentState = $this->mNewState;
00189     }
00190 
00191     protected function describe_changed( $status, $desc_table ) {
00192         if ( isset( $desc_table[$status] ) ) {
00193             return $desc_table[$status];
00194         } else {
00195             return "STATUS " . $status;
00196         }
00197     }
00198 
00199     protected function log_changed( $old, $new, $watched ) {
00200         wfDebug( sprintf( $watched["desc"],
00201             $this->mConn,
00202             $this->describe_changed( $old, $watched["states"] ),
00203             $this->describe_changed( $new, $watched["states"] )
00204         ) );
00205     }
00206 }
00207 
00213 class SavepointPostgres {
00217     protected $dbw;
00218     protected $id;
00219     protected $didbegin;
00220 
00221     public function __construct( $dbw, $id ) {
00222         $this->dbw = $dbw;
00223         $this->id = $id;
00224         $this->didbegin = false;
00225         /* If we are not in a transaction, we need to be for savepoint trickery */
00226         if ( !$dbw->trxLevel() ) {
00227                 $dbw->begin( "FOR SAVEPOINT" );
00228                 $this->didbegin = true;
00229         }
00230     }
00231 
00232     public function __destruct() {
00233         if ( $this->didbegin ) {
00234             $this->dbw->rollback();
00235             $this->didbegin = false;
00236         }
00237     }
00238 
00239     public function commit() {
00240         if ( $this->didbegin ) {
00241             $this->dbw->commit();
00242             $this->didbegin = false;
00243         }
00244     }
00245 
00246     protected function query( $keyword, $msg_ok, $msg_failed ) {
00247         global $wgDebugDBTransactions;
00248         if ( $this->dbw->doQuery( $keyword . " " . $this->id ) !== false ) {
00249             if ( $wgDebugDBTransactions ) {
00250                 wfDebug( sprintf ( $msg_ok, $this->id ) );
00251             }
00252         } else {
00253             wfDebug( sprintf ( $msg_failed, $this->id ) );
00254         }
00255     }
00256 
00257     public function savepoint() {
00258         $this->query( "SAVEPOINT",
00259             "Transaction state: savepoint \"%s\" established.\n",
00260             "Transaction state: establishment of savepoint \"%s\" FAILED.\n"
00261         );
00262     }
00263 
00264     public function release() {
00265         $this->query( "RELEASE",
00266             "Transaction state: savepoint \"%s\" released.\n",
00267             "Transaction state: release of savepoint \"%s\" FAILED.\n"
00268         );
00269     }
00270 
00271     public function rollback() {
00272         $this->query( "ROLLBACK TO",
00273             "Transaction state: savepoint \"%s\" rolled back.\n",
00274             "Transaction state: rollback of savepoint \"%s\" FAILED.\n"
00275         );
00276     }
00277 
00278     public function __toString() {
00279         return (string)$this->id;
00280     }
00281 }
00282 
00286 class DatabasePostgres extends DatabaseBase {
00287     var $mInsertId = null;
00288     var $mLastResult = null;
00289     var $numeric_version = null;
00290     var $mAffectedRows = null;
00291 
00292     function getType() {
00293         return 'postgres';
00294     }
00295 
00296     function cascadingDeletes() {
00297         return true;
00298     }
00299     function cleanupTriggers() {
00300         return true;
00301     }
00302     function strictIPs() {
00303         return true;
00304     }
00305     function realTimestamps() {
00306         return true;
00307     }
00308     function implicitGroupby() {
00309         return false;
00310     }
00311     function implicitOrderby() {
00312         return false;
00313     }
00314     function searchableIPs() {
00315         return true;
00316     }
00317     function functionalIndexes() {
00318         return true;
00319     }
00320 
00321     function hasConstraint( $name ) {
00322         $SQL = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n WHERE c.connamespace = n.oid AND conname = '" .
00323                 pg_escape_string( $this->mConn, $name ) . "' AND n.nspname = '" . pg_escape_string( $this->mConn, $this->getCoreSchema() ) . "'";
00324         $res = $this->doQuery( $SQL );
00325         return $this->numRows( $res );
00326     }
00327 
00337     function open( $server, $user, $password, $dbName ) {
00338         # Test for Postgres support, to avoid suppressed fatal error
00339         if ( !function_exists( 'pg_connect' ) ) {
00340             throw new DBConnectionError( $this, "Postgres functions missing, have you compiled PHP with the --with-pgsql option?\n (Note: if you recently installed PHP, you may need to restart your webserver and database)\n" );
00341         }
00342 
00343         global $wgDBport;
00344 
00345         if ( !strlen( $user ) ) { # e.g. the class is being loaded
00346             return;
00347         }
00348 
00349         $this->mServer = $server;
00350         $port = $wgDBport;
00351         $this->mUser = $user;
00352         $this->mPassword = $password;
00353         $this->mDBname = $dbName;
00354 
00355         $connectVars = array(
00356             'dbname' => $dbName,
00357             'user' => $user,
00358             'password' => $password
00359         );
00360         if ( $server != false && $server != '' ) {
00361             $connectVars['host'] = $server;
00362         }
00363         if ( $port != false && $port != '' ) {
00364             $connectVars['port'] = $port;
00365         }
00366         if ( $this->mFlags & DBO_SSL ) {
00367             $connectVars['sslmode'] = 1;
00368         }
00369 
00370         $this->connectString = $this->makeConnectionString( $connectVars, PGSQL_CONNECT_FORCE_NEW );
00371         $this->close();
00372         $this->installErrorHandler();
00373         $this->mConn = pg_connect( $this->connectString );
00374         $phpError = $this->restoreErrorHandler();
00375 
00376         if ( !$this->mConn ) {
00377             wfDebug( "DB connection error\n" );
00378             wfDebug( "Server: $server, Database: $dbName, User: $user, Password: " . substr( $password, 0, 3 ) . "...\n" );
00379             wfDebug( $this->lastError() . "\n" );
00380             throw new DBConnectionError( $this, str_replace( "\n", ' ', $phpError ) );
00381         }
00382 
00383         $this->mOpened = true;
00384         $this->mTransactionState = new PostgresTransactionState( $this->mConn );
00385 
00386         global $wgCommandLineMode;
00387         # If called from the command-line (e.g. importDump), only show errors
00388         if ( $wgCommandLineMode ) {
00389             $this->doQuery( "SET client_min_messages = 'ERROR'" );
00390         }
00391 
00392         $this->query( "SET client_encoding='UTF8'", __METHOD__ );
00393         $this->query( "SET datestyle = 'ISO, YMD'", __METHOD__ );
00394         $this->query( "SET timezone = 'GMT'", __METHOD__ );
00395         $this->query( "SET standard_conforming_strings = on", __METHOD__ );
00396         if ( $this->getServerVersion() >= 9.0 ) {
00397             $this->query( "SET bytea_output = 'escape'", __METHOD__ ); // PHP bug 53127
00398         }
00399 
00400         global $wgDBmwschema;
00401         $this->determineCoreSchema( $wgDBmwschema );
00402 
00403         return $this->mConn;
00404     }
00405 
00411     function selectDB( $db ) {
00412         if ( $this->mDBname !== $db ) {
00413             return (bool)$this->open( $this->mServer, $this->mUser, $this->mPassword, $db );
00414         } else {
00415             return true;
00416         }
00417     }
00418 
00419     function makeConnectionString( $vars ) {
00420         $s = '';
00421         foreach ( $vars as $name => $value ) {
00422             $s .= "$name='" . str_replace( "'", "\\'", $value ) . "' ";
00423         }
00424         return $s;
00425     }
00426 
00432     protected function closeConnection() {
00433         return pg_close( $this->mConn );
00434     }
00435 
00436     public function doQuery( $sql ) {
00437         if ( function_exists( 'mb_convert_encoding' ) ) {
00438             $sql = mb_convert_encoding( $sql, 'UTF-8' );
00439         }
00440         $this->mTransactionState->check();
00441         if ( pg_send_query( $this->mConn, $sql ) === false ) {
00442             throw new DBUnexpectedError( $this, "Unable to post new query to PostgreSQL\n" );
00443         }
00444         $this->mLastResult = pg_get_result( $this->mConn );
00445         $this->mTransactionState->check();
00446         $this->mAffectedRows = null;
00447         if ( pg_result_error( $this->mLastResult ) ) {
00448             return false;
00449         }
00450         return $this->mLastResult;
00451     }
00452 
00453     protected function dumpError() {
00454         $diags = array( PGSQL_DIAG_SEVERITY,
00455                 PGSQL_DIAG_SQLSTATE,
00456                 PGSQL_DIAG_MESSAGE_PRIMARY,
00457                 PGSQL_DIAG_MESSAGE_DETAIL,
00458                 PGSQL_DIAG_MESSAGE_HINT,
00459                 PGSQL_DIAG_STATEMENT_POSITION,
00460                 PGSQL_DIAG_INTERNAL_POSITION,
00461                 PGSQL_DIAG_INTERNAL_QUERY,
00462                 PGSQL_DIAG_CONTEXT,
00463                 PGSQL_DIAG_SOURCE_FILE,
00464                 PGSQL_DIAG_SOURCE_LINE,
00465                 PGSQL_DIAG_SOURCE_FUNCTION );
00466         foreach ( $diags as $d ) {
00467             wfDebug( sprintf( "PgSQL ERROR(%d): %s\n", $d, pg_result_error_field( $this->mLastResult, $d ) ) );
00468         }
00469     }
00470 
00471     function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) {
00472         /* Transaction stays in the ERROR state until rolledback */
00473         if ( $tempIgnore ) {
00474             /* Check for constraint violation */
00475             if ( $errno === '23505' ) {
00476                 parent::reportQueryError( $error, $errno, $sql, $fname, $tempIgnore );
00477                 return;
00478             }
00479         }
00480         /* Don't ignore serious errors */
00481         $this->rollback( __METHOD__ );
00482         parent::reportQueryError( $error, $errno, $sql, $fname, false );
00483     }
00484 
00485     function queryIgnore( $sql, $fname = __METHOD__ ) {
00486         return $this->query( $sql, $fname, true );
00487     }
00488 
00489     function freeResult( $res ) {
00490         if ( $res instanceof ResultWrapper ) {
00491             $res = $res->result;
00492         }
00493         wfSuppressWarnings();
00494         $ok = pg_free_result( $res );
00495         wfRestoreWarnings();
00496         if ( !$ok ) {
00497             throw new DBUnexpectedError( $this, "Unable to free Postgres result\n" );
00498         }
00499     }
00500 
00501     function fetchObject( $res ) {
00502         if ( $res instanceof ResultWrapper ) {
00503             $res = $res->result;
00504         }
00505         wfSuppressWarnings();
00506         $row = pg_fetch_object( $res );
00507         wfRestoreWarnings();
00508         # @todo FIXME: HACK HACK HACK HACK debug
00509 
00510         # @todo hashar: not sure if the following test really trigger if the object
00511         #          fetching failed.
00512         if ( pg_last_error( $this->mConn ) ) {
00513             throw new DBUnexpectedError( $this, 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) ) );
00514         }
00515         return $row;
00516     }
00517 
00518     function fetchRow( $res ) {
00519         if ( $res instanceof ResultWrapper ) {
00520             $res = $res->result;
00521         }
00522         wfSuppressWarnings();
00523         $row = pg_fetch_array( $res );
00524         wfRestoreWarnings();
00525         if ( pg_last_error( $this->mConn ) ) {
00526             throw new DBUnexpectedError( $this, 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) ) );
00527         }
00528         return $row;
00529     }
00530 
00531     function numRows( $res ) {
00532         if ( $res instanceof ResultWrapper ) {
00533             $res = $res->result;
00534         }
00535         wfSuppressWarnings();
00536         $n = pg_num_rows( $res );
00537         wfRestoreWarnings();
00538         if ( pg_last_error( $this->mConn ) ) {
00539             throw new DBUnexpectedError( $this, 'SQL error: ' . htmlspecialchars( pg_last_error( $this->mConn ) ) );
00540         }
00541         return $n;
00542     }
00543 
00544     function numFields( $res ) {
00545         if ( $res instanceof ResultWrapper ) {
00546             $res = $res->result;
00547         }
00548         return pg_num_fields( $res );
00549     }
00550 
00551     function fieldName( $res, $n ) {
00552         if ( $res instanceof ResultWrapper ) {
00553             $res = $res->result;
00554         }
00555         return pg_field_name( $res, $n );
00556     }
00557 
00564     function insertId() {
00565         return $this->mInsertId;
00566     }
00567 
00568     function dataSeek( $res, $row ) {
00569         if ( $res instanceof ResultWrapper ) {
00570             $res = $res->result;
00571         }
00572         return pg_result_seek( $res, $row );
00573     }
00574 
00575     function lastError() {
00576         if ( $this->mConn ) {
00577             if ( $this->mLastResult ) {
00578                 return pg_result_error( $this->mLastResult );
00579             } else {
00580                 return pg_last_error();
00581             }
00582         } else {
00583             return 'No database connection';
00584         }
00585     }
00586     function lastErrno() {
00587         if ( $this->mLastResult ) {
00588             return pg_result_error_field( $this->mLastResult, PGSQL_DIAG_SQLSTATE );
00589         } else {
00590             return false;
00591         }
00592     }
00593 
00594     function affectedRows() {
00595         if ( !is_null( $this->mAffectedRows ) ) {
00596             // Forced result for simulated queries
00597             return $this->mAffectedRows;
00598         }
00599         if ( empty( $this->mLastResult ) ) {
00600             return 0;
00601         }
00602         return pg_affected_rows( $this->mLastResult );
00603     }
00604 
00613     function estimateRowCount( $table, $vars = '*', $conds = '', $fname = __METHOD__, $options = array() ) {
00614         $options['EXPLAIN'] = true;
00615         $res = $this->select( $table, $vars, $conds, $fname, $options );
00616         $rows = -1;
00617         if ( $res ) {
00618             $row = $this->fetchRow( $res );
00619             $count = array();
00620             if ( preg_match( '/rows=(\d+)/', $row[0], $count ) ) {
00621                 $rows = $count[1];
00622             }
00623         }
00624         return $rows;
00625     }
00626 
00632     function indexInfo( $table, $index, $fname = __METHOD__ ) {
00633         $sql = "SELECT indexname FROM pg_indexes WHERE tablename='$table'";
00634         $res = $this->query( $sql, $fname );
00635         if ( !$res ) {
00636             return null;
00637         }
00638         foreach ( $res as $row ) {
00639             if ( $row->indexname == $this->indexName( $index ) ) {
00640                 return $row;
00641             }
00642         }
00643         return false;
00644     }
00645 
00652     function indexAttributes( $index, $schema = false ) {
00653         if ( $schema === false ) {
00654             $schema = $this->getCoreSchema();
00655         }
00656         /*
00657          * A subquery would be not needed if we didn't care about the order
00658          * of attributes, but we do
00659          */
00660         $sql = <<<__INDEXATTR__
00661 
00662             SELECT opcname,
00663                 attname,
00664                 i.indoption[s.g] as option,
00665                 pg_am.amname
00666             FROM
00667                 (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
00668                     FROM
00669                         pg_index isub
00670                     JOIN pg_class cis
00671                         ON cis.oid=isub.indexrelid
00672                     JOIN pg_namespace ns
00673                         ON cis.relnamespace = ns.oid
00674                     WHERE cis.relname='$index' AND ns.nspname='$schema') AS s,
00675                 pg_attribute,
00676                 pg_opclass opcls,
00677                 pg_am,
00678                 pg_class ci
00679                 JOIN pg_index i
00680                     ON ci.oid=i.indexrelid
00681                 JOIN pg_class ct
00682                     ON ct.oid = i.indrelid
00683                 JOIN pg_namespace n
00684                     ON ci.relnamespace = n.oid
00685                 WHERE
00686                     ci.relname='$index' AND n.nspname='$schema'
00687                     AND attrelid = ct.oid
00688                     AND i.indkey[s.g] = attnum
00689                     AND i.indclass[s.g] = opcls.oid
00690                     AND pg_am.oid = opcls.opcmethod
00691 __INDEXATTR__;
00692         $res = $this->query( $sql, __METHOD__ );
00693         $a = array();
00694         if ( $res ) {
00695             foreach ( $res as $row ) {
00696                 $a[] = array(
00697                     $row->attname,
00698                     $row->opcname,
00699                     $row->amname,
00700                     $row->option );
00701             }
00702         } else {
00703             return null;
00704         }
00705         return $a;
00706     }
00707 
00708     function indexUnique( $table, $index, $fname = __METHOD__ ) {
00709         $sql = "SELECT indexname FROM pg_indexes WHERE tablename='{$table}'" .
00710             " AND indexdef LIKE 'CREATE UNIQUE%(" .
00711             $this->strencode( $this->indexName( $index ) ) .
00712             ")'";
00713         $res = $this->query( $sql, $fname );
00714         if ( !$res ) {
00715             return null;
00716         }
00717         foreach ( $res as $row ) {
00718             return true;
00719         }
00720         return false;
00721     }
00722 
00731     function selectSQLText( $table, $vars, $conds = '', $fname = __METHOD__, $options = array(), $join_conds = array() ) {
00732         if ( is_array( $options ) ) {
00733             $forUpdateKey = array_search( 'FOR UPDATE', $options );
00734             if ( $forUpdateKey !== false && $join_conds ) {
00735                 unset( $options[$forUpdateKey] );
00736 
00737                 foreach ( $join_conds as $table_cond => $join_cond ) {
00738                     if ( 0 === preg_match( '/^(?:LEFT|RIGHT|FULL)(?: OUTER)? JOIN$/i', $join_cond[0] ) ) {
00739                         $options['FOR UPDATE'][] = $table_cond;
00740                     }
00741                 }
00742             }
00743         }
00744 
00745         return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds );
00746     }
00747 
00761     function insert( $table, $args, $fname = __METHOD__, $options = array() ) {
00762         if ( !count( $args ) ) {
00763             return true;
00764         }
00765 
00766         $table = $this->tableName( $table );
00767         if ( !isset( $this->numeric_version ) ) {
00768             $this->getServerVersion();
00769         }
00770 
00771         if ( !is_array( $options ) ) {
00772             $options = array( $options );
00773         }
00774 
00775         if ( isset( $args[0] ) && is_array( $args[0] ) ) {
00776             $multi = true;
00777             $keys = array_keys( $args[0] );
00778         } else {
00779             $multi = false;
00780             $keys = array_keys( $args );
00781         }
00782 
00783         // If IGNORE is set, we use savepoints to emulate mysql's behavior
00784         $savepoint = null;
00785         if ( in_array( 'IGNORE', $options ) ) {
00786             $savepoint = new SavepointPostgres( $this, 'mw' );
00787             $olde = error_reporting( 0 );
00788             // For future use, we may want to track the number of actual inserts
00789             // Right now, insert (all writes) simply return true/false
00790             $numrowsinserted = 0;
00791         }
00792 
00793         $sql = "INSERT INTO $table (" . implode( ',', $keys ) . ') VALUES ';
00794 
00795         if ( $multi ) {
00796             if ( $this->numeric_version >= 8.2 && !$savepoint ) {
00797                 $first = true;
00798                 foreach ( $args as $row ) {
00799                     if ( $first ) {
00800                         $first = false;
00801                     } else {
00802                         $sql .= ',';
00803                     }
00804                     $sql .= '(' . $this->makeList( $row ) . ')';
00805                 }
00806                 $res = (bool)$this->query( $sql, $fname, $savepoint );
00807             } else {
00808                 $res = true;
00809                 $origsql = $sql;
00810                 foreach ( $args as $row ) {
00811                     $tempsql = $origsql;
00812                     $tempsql .= '(' . $this->makeList( $row ) . ')';
00813 
00814                     if ( $savepoint ) {
00815                         $savepoint->savepoint();
00816                     }
00817 
00818                     $tempres = (bool)$this->query( $tempsql, $fname, $savepoint );
00819 
00820                     if ( $savepoint ) {
00821                         $bar = pg_last_error();
00822                         if ( $bar != false ) {
00823                             $savepoint->rollback();
00824                         } else {
00825                             $savepoint->release();
00826                             $numrowsinserted++;
00827                         }
00828                     }
00829 
00830                     // If any of them fail, we fail overall for this function call
00831                     // Note that this will be ignored if IGNORE is set
00832                     if ( !$tempres ) {
00833                         $res = false;
00834                     }
00835                 }
00836             }
00837         } else {
00838             // Not multi, just a lone insert
00839             if ( $savepoint ) {
00840                 $savepoint->savepoint();
00841             }
00842 
00843             $sql .= '(' . $this->makeList( $args ) . ')';
00844             $res = (bool)$this->query( $sql, $fname, $savepoint );
00845             if ( $savepoint ) {
00846                 $bar = pg_last_error();
00847                 if ( $bar != false ) {
00848                     $savepoint->rollback();
00849                 } else {
00850                     $savepoint->release();
00851                     $numrowsinserted++;
00852                 }
00853             }
00854         }
00855         if ( $savepoint ) {
00856             $olde = error_reporting( $olde );
00857             $savepoint->commit();
00858 
00859             // Set the affected row count for the whole operation
00860             $this->mAffectedRows = $numrowsinserted;
00861 
00862             // IGNORE always returns true
00863             return true;
00864         }
00865 
00866         return $res;
00867     }
00868 
00878     function insertSelect( $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__,
00879         $insertOptions = array(), $selectOptions = array() )
00880     {
00881         $destTable = $this->tableName( $destTable );
00882 
00883         if ( !is_array( $insertOptions ) ) {
00884             $insertOptions = array( $insertOptions );
00885         }
00886 
00887         /*
00888          * If IGNORE is set, we use savepoints to emulate mysql's behavior
00889          * Ignore LOW PRIORITY option, since it is MySQL-specific
00890          */
00891         $savepoint = null;
00892         if ( in_array( 'IGNORE', $insertOptions ) ) {
00893             $savepoint = new SavepointPostgres( $this, 'mw' );
00894             $olde = error_reporting( 0 );
00895             $numrowsinserted = 0;
00896             $savepoint->savepoint();
00897         }
00898 
00899         if ( !is_array( $selectOptions ) ) {
00900             $selectOptions = array( $selectOptions );
00901         }
00902         list( $startOpts, $useIndex, $tailOpts ) = $this->makeSelectOptions( $selectOptions );
00903         if ( is_array( $srcTable ) ) {
00904             $srcTable = implode( ',', array_map( array( &$this, 'tableName' ), $srcTable ) );
00905         } else {
00906             $srcTable = $this->tableName( $srcTable );
00907         }
00908 
00909         $sql = "INSERT INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ')' .
00910                 " SELECT $startOpts " . implode( ',', $varMap ) .
00911                 " FROM $srcTable $useIndex";
00912 
00913         if ( $conds != '*' ) {
00914             $sql .= ' WHERE ' . $this->makeList( $conds, LIST_AND );
00915         }
00916 
00917         $sql .= " $tailOpts";
00918 
00919         $res = (bool)$this->query( $sql, $fname, $savepoint );
00920         if ( $savepoint ) {
00921             $bar = pg_last_error();
00922             if ( $bar != false ) {
00923                 $savepoint->rollback();
00924             } else {
00925                 $savepoint->release();
00926                 $numrowsinserted++;
00927             }
00928             $olde = error_reporting( $olde );
00929             $savepoint->commit();
00930 
00931             // Set the affected row count for the whole operation
00932             $this->mAffectedRows = $numrowsinserted;
00933 
00934             // IGNORE always returns true
00935             return true;
00936         }
00937 
00938         return $res;
00939     }
00940 
00941     function tableName( $name, $format = 'quoted' ) {
00942         # Replace reserved words with better ones
00943         switch ( $name ) {
00944             case 'user':
00945                 return $this->realTableName( 'mwuser', $format );
00946             case 'text':
00947                 return $this->realTableName( 'pagecontent', $format );
00948             default:
00949                 return $this->realTableName( $name, $format );
00950         }
00951     }
00952 
00953     /* Don't cheat on installer */
00954     function realTableName( $name, $format = 'quoted' ) {
00955         return parent::tableName( $name, $format );
00956     }
00957 
00962     function nextSequenceValue( $seqName ) {
00963         $safeseq = str_replace( "'", "''", $seqName );
00964         $res = $this->query( "SELECT nextval('$safeseq')" );
00965         $row = $this->fetchRow( $res );
00966         $this->mInsertId = $row[0];
00967         return $this->mInsertId;
00968     }
00969 
00974     function currentSequenceValue( $seqName ) {
00975         $safeseq = str_replace( "'", "''", $seqName );
00976         $res = $this->query( "SELECT currval('$safeseq')" );
00977         $row = $this->fetchRow( $res );
00978         $currval = $row[0];
00979         return $currval;
00980     }
00981 
00982     # Returns the size of a text field, or -1 for "unlimited"
00983     function textFieldSize( $table, $field ) {
00984         $table = $this->tableName( $table );
00985         $sql = "SELECT t.typname as ftype,a.atttypmod as size
00986             FROM pg_class c, pg_attribute a, pg_type t
00987             WHERE relname='$table' AND a.attrelid=c.oid AND
00988                 a.atttypid=t.oid and a.attname='$field'";
00989         $res = $this->query( $sql );
00990         $row = $this->fetchObject( $res );
00991         if ( $row->ftype == 'varchar' ) {
00992             $size = $row->size - 4;
00993         } else {
00994             $size = $row->size;
00995         }
00996         return $size;
00997     }
00998 
00999     function limitResult( $sql, $limit, $offset = false ) {
01000         return "$sql LIMIT $limit " . ( is_numeric( $offset ) ? " OFFSET {$offset} " : '' );
01001     }
01002 
01003     function wasDeadlock() {
01004         return $this->lastErrno() == '40P01';
01005     }
01006 
01007     function duplicateTableStructure( $oldName, $newName, $temporary = false, $fname = __METHOD__ ) {
01008         $newName = $this->addIdentifierQuotes( $newName );
01009         $oldName = $this->addIdentifierQuotes( $oldName );
01010         return $this->query( 'CREATE ' . ( $temporary ? 'TEMPORARY ' : '' ) . " TABLE $newName (LIKE $oldName INCLUDING DEFAULTS)", $fname );
01011     }
01012 
01013     function listTables( $prefix = null, $fname = __METHOD__ ) {
01014         $eschema = $this->addQuotes( $this->getCoreSchema() );
01015         $result = $this->query( "SELECT tablename FROM pg_tables WHERE schemaname = $eschema", $fname );
01016         $endArray = array();
01017 
01018         foreach ( $result as $table ) {
01019             $vars = get_object_vars( $table );
01020             $table = array_pop( $vars );
01021             if ( !$prefix || strpos( $table, $prefix ) === 0 ) {
01022                 $endArray[] = $table;
01023             }
01024         }
01025 
01026         return $endArray;
01027     }
01028 
01029     function timestamp( $ts = 0 ) {
01030         return wfTimestamp( TS_POSTGRES, $ts );
01031     }
01032 
01033     /*
01034      * Posted by cc[plus]php[at]c2se[dot]com on 25-Mar-2009 09:12
01035      * to http://www.php.net/manual/en/ref.pgsql.php
01036      *
01037      * Parsing a postgres array can be a tricky problem, he's my
01038      * take on this, it handles multi-dimensional arrays plus
01039      * escaping using a nasty regexp to determine the limits of each
01040      * data-item.
01041      *
01042      * This should really be handled by PHP PostgreSQL module
01043      *
01044      * @since 1.19
01045      * @param $text   string: postgreql array returned in a text form like {a,b}
01046      * @param $output string
01047      * @param $limit  int
01048      * @param $offset int
01049      * @return string
01050      */
01051     function pg_array_parse( $text, &$output, $limit = false, $offset = 1 ) {
01052         if ( false === $limit ) {
01053             $limit = strlen( $text ) - 1;
01054             $output = array();
01055         }
01056         if ( '{}' == $text ) {
01057             return $output;
01058         }
01059         do {
01060             if ( '{' != $text[$offset] ) {
01061                 preg_match( "/(\\{?\"([^\"\\\\]|\\\\.)*\"|[^,{}]+)+([,}]+)/",
01062                     $text, $match, 0, $offset );
01063                 $offset += strlen( $match[0] );
01064                 $output[] = ( '"' != $match[1][0]
01065                         ? $match[1]
01066                         : stripcslashes( substr( $match[1], 1, -1 ) ) );
01067                 if ( '},' == $match[3] ) {
01068                     return $output;
01069                 }
01070             } else {
01071                 $offset = $this->pg_array_parse( $text, $output, $limit, $offset + 1 );
01072             }
01073         } while ( $limit > $offset );
01074         return $output;
01075     }
01076 
01080     public function aggregateValue( $valuedata, $valuename = 'value' ) {
01081         return $valuedata;
01082     }
01083 
01087     public function getSoftwareLink() {
01088         return '[{{int:version-db-postgres-url}} PostgreSQL]';
01089     }
01090 
01098     function getCurrentSchema() {
01099         $res = $this->query( "SELECT current_schema()", __METHOD__ );
01100         $row = $this->fetchRow( $res );
01101         return $row[0];
01102     }
01103 
01114     function getSchemas() {
01115         $res = $this->query( "SELECT current_schemas(false)", __METHOD__ );
01116         $row = $this->fetchRow( $res );
01117         $schemas = array();
01118         /* PHP pgsql support does not support array type, "{a,b}" string is returned */
01119         return $this->pg_array_parse( $row[0], $schemas );
01120     }
01121 
01131     function getSearchPath() {
01132         $res = $this->query( "SHOW search_path", __METHOD__ );
01133         $row = $this->fetchRow( $res );
01134         /* PostgreSQL returns SHOW values as strings */
01135         return explode( ",", $row[0] );
01136     }
01137 
01145     function setSearchPath( $search_path ) {
01146         $this->query( "SET search_path = " . implode( ", ", $search_path ) );
01147     }
01148 
01162     function determineCoreSchema( $desired_schema ) {
01163         $this->begin( __METHOD__ );
01164         if ( $this->schemaExists( $desired_schema ) ) {
01165             if ( in_array( $desired_schema, $this->getSchemas() ) ) {
01166                 $this->mCoreSchema = $desired_schema;
01167                 wfDebug( "Schema \"" . $desired_schema . "\" already in the search path\n" );
01168             } else {
01174                 $search_path = $this->getSearchPath();
01175                 array_unshift( $search_path,
01176                     $this->addIdentifierQuotes( $desired_schema ));
01177                 $this->setSearchPath( $search_path );
01178                 $this->mCoreSchema = $desired_schema;
01179                 wfDebug( "Schema \"" . $desired_schema . "\" added to the search path\n" );
01180             }
01181         } else {
01182             $this->mCoreSchema = $this->getCurrentSchema();
01183             wfDebug( "Schema \"" . $desired_schema . "\" not found, using current \"" . $this->mCoreSchema . "\"\n" );
01184         }
01185         /* Commit SET otherwise it will be rollbacked on error or IGNORE SELECT */
01186         $this->commit( __METHOD__ );
01187     }
01188 
01195     function getCoreSchema() {
01196         return $this->mCoreSchema;
01197     }
01198 
01202     function getServerVersion() {
01203         if ( !isset( $this->numeric_version ) ) {
01204             $versionInfo = pg_version( $this->mConn );
01205             if ( version_compare( $versionInfo['client'], '7.4.0', 'lt' ) ) {
01206                 // Old client, abort install
01207                 $this->numeric_version = '7.3 or earlier';
01208             } elseif ( isset( $versionInfo['server'] ) ) {
01209                 // Normal client
01210                 $this->numeric_version = $versionInfo['server'];
01211             } else {
01212                 // Bug 16937: broken pgsql extension from PHP<5.3
01213                 $this->numeric_version = pg_parameter_status( $this->mConn, 'server_version' );
01214             }
01215         }
01216         return $this->numeric_version;
01217     }
01218 
01224     function relationExists( $table, $types, $schema = false ) {
01225         if ( !is_array( $types ) ) {
01226             $types = array( $types );
01227         }
01228         if ( !$schema ) {
01229             $schema = $this->getCoreSchema();
01230         }
01231         $table = $this->realTableName( $table, 'raw' );
01232         $etable = $this->addQuotes( $table );
01233         $eschema = $this->addQuotes( $schema );
01234         $SQL = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
01235             . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
01236             . "AND c.relkind IN ('" . implode( "','", $types ) . "')";
01237         $res = $this->query( $SQL );
01238         $count = $res ? $res->numRows() : 0;
01239         return (bool)$count;
01240     }
01241 
01247     function tableExists( $table, $fname = __METHOD__, $schema = false ) {
01248         return $this->relationExists( $table, array( 'r', 'v' ), $schema );
01249     }
01250 
01251     function sequenceExists( $sequence, $schema = false ) {
01252         return $this->relationExists( $sequence, 'S', $schema );
01253     }
01254 
01255     function triggerExists( $table, $trigger ) {
01256         $q = <<<SQL
01257     SELECT 1 FROM pg_class, pg_namespace, pg_trigger
01258         WHERE relnamespace=pg_namespace.oid AND relkind='r'
01259               AND tgrelid=pg_class.oid
01260               AND nspname=%s AND relname=%s AND tgname=%s
01261 SQL;
01262         $res = $this->query(
01263             sprintf(
01264                 $q,
01265                 $this->addQuotes( $this->getCoreSchema() ),
01266                 $this->addQuotes( $table ),
01267                 $this->addQuotes( $trigger )
01268             )
01269         );
01270         if ( !$res ) {
01271             return null;
01272         }
01273         $rows = $res->numRows();
01274         return $rows;
01275     }
01276 
01277     function ruleExists( $table, $rule ) {
01278         $exists = $this->selectField( 'pg_rules', 'rulename',
01279             array(
01280                 'rulename' => $rule,
01281                 'tablename' => $table,
01282                 'schemaname' => $this->getCoreSchema()
01283             )
01284         );
01285         return $exists === $rule;
01286     }
01287 
01288     function constraintExists( $table, $constraint ) {
01289         $SQL = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
01290                 "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
01291             $this->addQuotes( $this->getCoreSchema() ),
01292             $this->addQuotes( $table ),
01293             $this->addQuotes( $constraint )
01294         );
01295         $res = $this->query( $SQL );
01296         if ( !$res ) {
01297             return null;
01298         }
01299         $rows = $res->numRows();
01300         return $rows;
01301     }
01302 
01307     function schemaExists( $schema ) {
01308         $exists = $this->selectField( '"pg_catalog"."pg_namespace"', 1,
01309             array( 'nspname' => $schema ), __METHOD__ );
01310         return (bool)$exists;
01311     }
01312 
01317     function roleExists( $roleName ) {
01318         $exists = $this->selectField( '"pg_catalog"."pg_roles"', 1,
01319             array( 'rolname' => $roleName ), __METHOD__ );
01320         return (bool)$exists;
01321     }
01322 
01323     function fieldInfo( $table, $field ) {
01324         return PostgresField::fromText( $this, $table, $field );
01325     }
01326 
01331     function fieldType( $res, $index ) {
01332         if ( $res instanceof ResultWrapper ) {
01333             $res = $res->result;
01334         }
01335         return pg_field_type( $res, $index );
01336     }
01337 
01342     function encodeBlob( $b ) {
01343         return new Blob( pg_escape_bytea( $this->mConn, $b ) );
01344     }
01345 
01346     function decodeBlob( $b ) {
01347         if ( $b instanceof Blob ) {
01348             $b = $b->fetch();
01349         }
01350         return pg_unescape_bytea( $b );
01351     }
01352 
01353     function strencode( $s ) { # Should not be called by us
01354         return pg_escape_string( $this->mConn, $s );
01355     }
01356 
01361     function addQuotes( $s ) {
01362         if ( is_null( $s ) ) {
01363             return 'NULL';
01364         } elseif ( is_bool( $s ) ) {
01365             return intval( $s );
01366         } elseif ( $s instanceof Blob ) {
01367             return "'" . $s->fetch( $s ) . "'";
01368         }
01369         return "'" . pg_escape_string( $this->mConn, $s ) . "'";
01370     }
01371 
01382     protected function replaceVars( $ins ) {
01383         $ins = parent::replaceVars( $ins );
01384 
01385         if ( $this->numeric_version >= 8.3 ) {
01386             // Thanks for not providing backwards-compatibility, 8.3
01387             $ins = preg_replace( "/to_tsvector\s*\(\s*'default'\s*,/", 'to_tsvector(', $ins );
01388         }
01389 
01390         if ( $this->numeric_version <= 8.1 ) { // Our minimum version
01391             $ins = str_replace( 'USING gin', 'USING gist', $ins );
01392         }
01393 
01394         return $ins;
01395     }
01396 
01406     function makeSelectOptions( $options ) {
01407         $preLimitTail = $postLimitTail = '';
01408         $startOpts = $useIndex = '';
01409 
01410         $noKeyOptions = array();
01411         foreach ( $options as $key => $option ) {
01412             if ( is_numeric( $key ) ) {
01413                 $noKeyOptions[$option] = true;
01414             }
01415         }
01416 
01417         $preLimitTail .= $this->makeGroupByWithHaving( $options );
01418 
01419         $preLimitTail .= $this->makeOrderBy( $options );
01420 
01421         //if ( isset( $options['LIMIT'] ) ) {
01422         //  $tailOpts .= $this->limitResult( '', $options['LIMIT'],
01423         //      isset( $options['OFFSET'] ) ? $options['OFFSET']
01424         //      : false );
01425         //}
01426 
01427         if ( isset( $options['FOR UPDATE'] ) ) {
01428             $postLimitTail .= ' FOR UPDATE OF ' . implode( ', ', $options['FOR UPDATE'] );
01429         } else if ( isset( $noKeyOptions['FOR UPDATE'] ) ) {
01430             $postLimitTail .= ' FOR UPDATE';
01431         }
01432 
01433         if ( isset( $noKeyOptions['DISTINCT'] ) || isset( $noKeyOptions['DISTINCTROW'] ) ) {
01434             $startOpts .= 'DISTINCT';
01435         }
01436 
01437         return array( $startOpts, $useIndex, $preLimitTail, $postLimitTail );
01438     }
01439 
01440     function setFakeMaster( $enabled = true ) {
01441     }
01442 
01443     function getDBname() {
01444         return $this->mDBname;
01445     }
01446 
01447     function getServer() {
01448         return $this->mServer;
01449     }
01450 
01451     function buildConcat( $stringList ) {
01452         return implode( ' || ', $stringList );
01453     }
01454 
01455     public function getSearchEngine() {
01456         return 'SearchPostgres';
01457     }
01458 
01459     public function streamStatementEnd( &$sql, &$newLine ) {
01460         # Allow dollar quoting for function declarations
01461         if ( substr( $newLine, 0, 4 ) == '$mw$' ) {
01462             if ( $this->delimiter ) {
01463                 $this->delimiter = false;
01464             }
01465             else {
01466                 $this->delimiter = ';';
01467             }
01468         }
01469         return parent::streamStatementEnd( $sql, $newLine );
01470     }
01471 
01481     public function lockIsFree( $lockName, $method ) {
01482         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01483         $result = $this->query( "SELECT (CASE(pg_try_advisory_lock($key))
01484             WHEN 'f' THEN 'f' ELSE pg_advisory_unlock($key) END) AS lockstatus", $method );
01485         $row = $this->fetchObject( $result );
01486         return ( $row->lockstatus === 't' );
01487     }
01488 
01496     public function lock( $lockName, $method, $timeout = 5 ) {
01497         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01498         for ( $attempts = 1; $attempts <= $timeout; ++$attempts ) {
01499             $result = $this->query(
01500                 "SELECT pg_try_advisory_lock($key) AS lockstatus", $method );
01501             $row = $this->fetchObject( $result );
01502             if ( $row->lockstatus === 't' ) {
01503                 return true;
01504             } else {
01505                 sleep( 1 );
01506             }
01507         }
01508         wfDebug( __METHOD__ . " failed to acquire lock\n" );
01509         return false;
01510     }
01511 
01518     public function unlock( $lockName, $method ) {
01519         $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
01520         $result = $this->query( "SELECT pg_advisory_unlock($key) as lockstatus", $method );
01521         $row = $this->fetchObject( $result );
01522         return ( $row->lockstatus === 't' );
01523     }
01524 
01529     private function bigintFromLockName( $lockName ) {
01530         return wfBaseConvert( substr( sha1( $lockName ), 0, 15 ), 16, 10 );
01531     }
01532 } // end DatabasePostgres class