Header And Logo

PostgreSQL
| The world's most advanced open source database.

matview.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * matview.c
00004  *    materialized view support
00005  *
00006  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/commands/matview.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include "access/heapam_xlog.h"
00018 #include "access/multixact.h"
00019 #include "access/relscan.h"
00020 #include "access/xact.h"
00021 #include "catalog/catalog.h"
00022 #include "catalog/heap.h"
00023 #include "catalog/namespace.h"
00024 #include "commands/cluster.h"
00025 #include "commands/matview.h"
00026 #include "commands/tablecmds.h"
00027 #include "executor/executor.h"
00028 #include "miscadmin.h"
00029 #include "rewrite/rewriteHandler.h"
00030 #include "storage/lmgr.h"
00031 #include "storage/smgr.h"
00032 #include "tcop/tcopprot.h"
00033 #include "utils/snapmgr.h"
00034 
00035 
00036 typedef struct
00037 {
00038     DestReceiver pub;           /* publicly-known function pointers */
00039     Oid         transientoid;   /* OID of new heap into which to store */
00040     /* These fields are filled by transientrel_startup: */
00041     Relation    transientrel;   /* relation to write to */
00042     CommandId   output_cid;     /* cmin to insert in output tuples */
00043     int         hi_options;     /* heap_insert performance options */
00044     BulkInsertState bistate;    /* bulk insert state */
00045 } DR_transientrel;
00046 
00047 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
00048 static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
00049 static void transientrel_shutdown(DestReceiver *self);
00050 static void transientrel_destroy(DestReceiver *self);
00051 static void refresh_matview_datafill(DestReceiver *dest, Query *query,
00052                                      const char *queryString);
00053 
00054 /*
00055  * SetMatViewToPopulated
00056  *      Indicate that the materialized view has been populated by its query.
00057  *
00058  * NOTE: The heap starts out in a state that doesn't look scannable, and can
00059  * only transition from there to scannable at the time a new heap is created.
00060  *
00061  * NOTE: caller must be holding an appropriate lock on the relation.
00062  */
00063 void
00064 SetMatViewToPopulated(Relation relation)
00065 {
00066     Page        page;
00067 
00068     Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
00069     Assert(relation->rd_ispopulated == false);
00070 
00071     page = (Page) palloc(BLCKSZ);
00072     PageInit(page, BLCKSZ, 0);
00073 
00074     if (RelationNeedsWAL(relation))
00075         log_newpage(&(relation->rd_node), MAIN_FORKNUM, 0, page);
00076 
00077     RelationOpenSmgr(relation);
00078 
00079     PageSetChecksumInplace(page, 0);
00080     smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true);
00081 
00082     pfree(page);
00083 
00084     smgrimmedsync(relation->rd_smgr, MAIN_FORKNUM);
00085 
00086     RelationCacheInvalidateEntry(relation->rd_id);
00087 }
00088 
00089 /*
00090  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
00091  *
00092  * This refreshes the materialized view by creating a new table and swapping
00093  * the relfilenodes of the new table and the old materialized view, so the OID
00094  * of the original materialized view is preserved. Thus we do not lose GRANT
00095  * nor references to this materialized view.
00096  *
00097  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
00098  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
00099  * statement associated with the materialized view.  The statement node's
00100  * skipData field is used to indicate that the clause was used.
00101  *
00102  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
00103  * the new heap, it's better to create the indexes afterwards than to fill them
00104  * incrementally while we load.
00105  *
00106  * The scannable state is changed based on whether the contents reflect the
00107  * result set of the materialized view's query.
00108  */
00109 void
00110 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
00111                   ParamListInfo params, char *completionTag)
00112 {
00113     Oid         matviewOid;
00114     Relation    matviewRel;
00115     RewriteRule *rule;
00116     List       *actions;
00117     Query      *dataQuery;
00118     Oid         tableSpace;
00119     Oid         OIDNewHeap;
00120     DestReceiver *dest;
00121 
00122     /*
00123      * Get a lock until end of transaction.
00124      */
00125     matviewOid = RangeVarGetRelidExtended(stmt->relation,
00126                                            AccessExclusiveLock, false, false,
00127                                            RangeVarCallbackOwnsTable, NULL);
00128     matviewRel = heap_open(matviewOid, NoLock);
00129 
00130     /* Make sure it is a materialized view. */
00131     if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
00132         ereport(ERROR,
00133                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00134                  errmsg("\"%s\" is not a materialized view",
00135                         RelationGetRelationName(matviewRel))));
00136 
00137     /*
00138      * We're not using materialized views in the system catalogs.
00139      */
00140     Assert(!IsSystemRelation(matviewRel));
00141 
00142     Assert(!matviewRel->rd_rel->relhasoids);
00143 
00144     /*
00145      * Check that everything is correct for a refresh. Problems at this point
00146      * are internal errors, so elog is sufficient.
00147      */
00148     if (matviewRel->rd_rel->relhasrules == false ||
00149         matviewRel->rd_rules->numLocks < 1)
00150         elog(ERROR,
00151              "materialized view \"%s\" is missing rewrite information",
00152              RelationGetRelationName(matviewRel));
00153 
00154     if (matviewRel->rd_rules->numLocks > 1)
00155         elog(ERROR,
00156              "materialized view \"%s\" has too many rules",
00157              RelationGetRelationName(matviewRel));
00158 
00159     rule = matviewRel->rd_rules->rules[0];
00160     if (rule->event != CMD_SELECT || !(rule->isInstead))
00161         elog(ERROR,
00162              "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
00163              RelationGetRelationName(matviewRel));
00164 
00165     actions = rule->actions;
00166     if (list_length(actions) != 1)
00167         elog(ERROR,
00168              "the rule for materialized view \"%s\" is not a single action",
00169              RelationGetRelationName(matviewRel));
00170 
00171     /*
00172      * The stored query was rewritten at the time of the MV definition, but
00173      * has not been scribbled on by the planner.
00174      */
00175     dataQuery = (Query *) linitial(actions);
00176     Assert(IsA(dataQuery, Query));
00177 
00178     /*
00179      * Check for active uses of the relation in the current transaction, such
00180      * as open scans.
00181      *
00182      * NB: We count on this to protect us against problems with refreshing the
00183      * data using HEAP_INSERT_FROZEN.
00184      */
00185     CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
00186 
00187     tableSpace = matviewRel->rd_rel->reltablespace;
00188 
00189     heap_close(matviewRel, NoLock);
00190 
00191     /* Create the transient table that will receive the regenerated data. */
00192     OIDNewHeap = make_new_heap(matviewOid, tableSpace);
00193     dest = CreateTransientRelDestReceiver(OIDNewHeap);
00194 
00195     if (!stmt->skipData)
00196         refresh_matview_datafill(dest, dataQuery, queryString);
00197 
00198     /*
00199      * Swap the physical files of the target and transient tables, then
00200      * rebuild the target's indexes and throw away the transient table.
00201      */
00202     finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
00203                      RecentXmin, ReadNextMultiXactId());
00204 
00205     RelationCacheInvalidateEntry(matviewOid);
00206 }
00207 
00208 /*
00209  * refresh_matview_datafill
00210  */
00211 static void
00212 refresh_matview_datafill(DestReceiver *dest, Query *query,
00213                          const char *queryString)
00214 {
00215     List       *rewritten;
00216     PlannedStmt *plan;
00217     QueryDesc  *queryDesc;
00218 
00219     /* Rewrite, copying the given Query to make sure it's not changed */
00220     rewritten = QueryRewrite((Query *) copyObject(query));
00221 
00222     /* SELECT should never rewrite to more or less than one SELECT query */
00223     if (list_length(rewritten) != 1)
00224         elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
00225     query = (Query *) linitial(rewritten);
00226 
00227     /* Check for user-requested abort. */
00228     CHECK_FOR_INTERRUPTS();
00229 
00230     /* Plan the query which will generate data for the refresh. */
00231     plan = pg_plan_query(query, 0, NULL);
00232 
00233     /*
00234      * Use a snapshot with an updated command ID to ensure this query sees
00235      * results of any previously executed queries.  (This could only matter if
00236      * the planner executed an allegedly-stable function that changed the
00237      * database contents, but let's do it anyway to be safe.)
00238      */
00239     PushCopiedSnapshot(GetActiveSnapshot());
00240     UpdateActiveSnapshotCommandId();
00241 
00242     /* Create a QueryDesc, redirecting output to our tuple receiver */
00243     queryDesc = CreateQueryDesc(plan, queryString,
00244                                 GetActiveSnapshot(), InvalidSnapshot,
00245                                 dest, NULL, 0);
00246 
00247     /* call ExecutorStart to prepare the plan for execution */
00248     ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
00249 
00250     /* run the plan */
00251     ExecutorRun(queryDesc, ForwardScanDirection, 0L);
00252 
00253     /* and clean up */
00254     ExecutorFinish(queryDesc);
00255     ExecutorEnd(queryDesc);
00256 
00257     FreeQueryDesc(queryDesc);
00258 
00259     PopActiveSnapshot();
00260 }
00261 
00262 DestReceiver *
00263 CreateTransientRelDestReceiver(Oid transientoid)
00264 {
00265     DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
00266 
00267     self->pub.receiveSlot = transientrel_receive;
00268     self->pub.rStartup = transientrel_startup;
00269     self->pub.rShutdown = transientrel_shutdown;
00270     self->pub.rDestroy = transientrel_destroy;
00271     self->pub.mydest = DestTransientRel;
00272     self->transientoid = transientoid;
00273 
00274     return (DestReceiver *) self;
00275 }
00276 
00277 /*
00278  * transientrel_startup --- executor startup
00279  */
00280 static void
00281 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
00282 {
00283     DR_transientrel *myState = (DR_transientrel *) self;
00284     Relation transientrel;
00285 
00286     transientrel = heap_open(myState->transientoid, NoLock);
00287 
00288     /*
00289      * Fill private fields of myState for use by later routines
00290      */
00291     myState->transientrel = transientrel;
00292     myState->output_cid = GetCurrentCommandId(true);
00293 
00294     /*
00295      * We can skip WAL-logging the insertions, unless PITR or streaming
00296      * replication is in use. We can skip the FSM in any case.
00297      */
00298     myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
00299     if (!XLogIsNeeded())
00300         myState->hi_options |= HEAP_INSERT_SKIP_WAL;
00301     myState->bistate = GetBulkInsertState();
00302 
00303     SetMatViewToPopulated(transientrel);
00304 
00305     /* Not using WAL requires smgr_targblock be initially invalid */
00306     Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
00307 }
00308 
00309 /*
00310  * transientrel_receive --- receive one tuple
00311  */
00312 static void
00313 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
00314 {
00315     DR_transientrel *myState = (DR_transientrel *) self;
00316     HeapTuple   tuple;
00317 
00318     /*
00319      * get the heap tuple out of the tuple table slot, making sure we have a
00320      * writable copy
00321      */
00322     tuple = ExecMaterializeSlot(slot);
00323 
00324     heap_insert(myState->transientrel,
00325                 tuple,
00326                 myState->output_cid,
00327                 myState->hi_options,
00328                 myState->bistate);
00329 
00330     /* We know this is a newly created relation, so there are no indexes */
00331 }
00332 
00333 /*
00334  * transientrel_shutdown --- executor end
00335  */
00336 static void
00337 transientrel_shutdown(DestReceiver *self)
00338 {
00339     DR_transientrel *myState = (DR_transientrel *) self;
00340 
00341     FreeBulkInsertState(myState->bistate);
00342 
00343     /* If we skipped using WAL, must heap_sync before commit */
00344     if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
00345         heap_sync(myState->transientrel);
00346 
00347     /* close transientrel, but keep lock until commit */
00348     heap_close(myState->transientrel, NoLock);
00349     myState->transientrel = NULL;
00350 }
00351 
00352 /*
00353  * transientrel_destroy --- release DestReceiver object
00354  */
00355 static void
00356 transientrel_destroy(DestReceiver *self)
00357 {
00358     pfree(self);
00359 }