#include "postgres.h"#include "access/heapam_xlog.h"#include "access/multixact.h"#include "access/relscan.h"#include "access/xact.h"#include "catalog/catalog.h"#include "catalog/heap.h"#include "catalog/namespace.h"#include "commands/cluster.h"#include "commands/matview.h"#include "commands/tablecmds.h"#include "executor/executor.h"#include "miscadmin.h"#include "rewrite/rewriteHandler.h"#include "storage/lmgr.h"#include "storage/smgr.h"#include "tcop/tcopprot.h"#include "utils/snapmgr.h"
Go to the source code of this file.
Data Structures | |
| struct | DR_transientrel |
Functions | |
| static void | transientrel_startup (DestReceiver *self, int operation, TupleDesc typeinfo) |
| static void | transientrel_receive (TupleTableSlot *slot, DestReceiver *self) |
| static void | transientrel_shutdown (DestReceiver *self) |
| static void | transientrel_destroy (DestReceiver *self) |
| static void | refresh_matview_datafill (DestReceiver *dest, Query *query, const char *queryString) |
| void | SetMatViewToPopulated (Relation relation) |
| void | ExecRefreshMatView (RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, char *completionTag) |
| DestReceiver * | CreateTransientRelDestReceiver (Oid transientoid) |
| DestReceiver* CreateTransientRelDestReceiver | ( | Oid | transientoid | ) |
Definition at line 263 of file matview.c.
References palloc0().
Referenced by CreateDestReceiver(), and ExecRefreshMatView().
{
DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
self->pub.receiveSlot = transientrel_receive;
self->pub.rStartup = transientrel_startup;
self->pub.rShutdown = transientrel_shutdown;
self->pub.rDestroy = transientrel_destroy;
self->pub.mydest = DestTransientRel;
self->transientoid = transientoid;
return (DestReceiver *) self;
}
| void ExecRefreshMatView | ( | RefreshMatViewStmt * | stmt, | |
| const char * | queryString, | |||
| ParamListInfo | params, | |||
| char * | completionTag | |||
| ) |
Definition at line 110 of file matview.c.
References AccessExclusiveLock, RewriteRule::actions, Assert, CheckTableNotInUse(), CMD_SELECT, CreateTransientRelDestReceiver(), elog, ereport, errcode(), errmsg(), ERROR, RewriteRule::event, finish_heap_swap(), heap_close, heap_open(), IsA, RewriteRule::isInstead, IsSystemRelation(), linitial, list_length(), make_new_heap(), NoLock, NULL, RuleLock::numLocks, RangeVarCallbackOwnsTable(), RangeVarGetRelidExtended(), RelationData::rd_rel, RelationData::rd_rules, ReadNextMultiXactId(), RecentXmin, refresh_matview_datafill(), RefreshMatViewStmt::relation, RelationCacheInvalidateEntry(), RelationGetRelationName, RELKIND_MATVIEW, RuleLock::rules, and RefreshMatViewStmt::skipData.
Referenced by ProcessUtilitySlow().
{
Oid matviewOid;
Relation matviewRel;
RewriteRule *rule;
List *actions;
Query *dataQuery;
Oid tableSpace;
Oid OIDNewHeap;
DestReceiver *dest;
/*
* Get a lock until end of transaction.
*/
matviewOid = RangeVarGetRelidExtended(stmt->relation,
AccessExclusiveLock, false, false,
RangeVarCallbackOwnsTable, NULL);
matviewRel = heap_open(matviewOid, NoLock);
/* Make sure it is a materialized view. */
if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a materialized view",
RelationGetRelationName(matviewRel))));
/*
* We're not using materialized views in the system catalogs.
*/
Assert(!IsSystemRelation(matviewRel));
Assert(!matviewRel->rd_rel->relhasoids);
/*
* Check that everything is correct for a refresh. Problems at this point
* are internal errors, so elog is sufficient.
*/
if (matviewRel->rd_rel->relhasrules == false ||
matviewRel->rd_rules->numLocks < 1)
elog(ERROR,
"materialized view \"%s\" is missing rewrite information",
RelationGetRelationName(matviewRel));
if (matviewRel->rd_rules->numLocks > 1)
elog(ERROR,
"materialized view \"%s\" has too many rules",
RelationGetRelationName(matviewRel));
rule = matviewRel->rd_rules->rules[0];
if (rule->event != CMD_SELECT || !(rule->isInstead))
elog(ERROR,
"the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
RelationGetRelationName(matviewRel));
actions = rule->actions;
if (list_length(actions) != 1)
elog(ERROR,
"the rule for materialized view \"%s\" is not a single action",
RelationGetRelationName(matviewRel));
/*
* The stored query was rewritten at the time of the MV definition, but
* has not been scribbled on by the planner.
*/
dataQuery = (Query *) linitial(actions);
Assert(IsA(dataQuery, Query));
/*
* Check for active uses of the relation in the current transaction, such
* as open scans.
*
* NB: We count on this to protect us against problems with refreshing the
* data using HEAP_INSERT_FROZEN.
*/
CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
tableSpace = matviewRel->rd_rel->reltablespace;
heap_close(matviewRel, NoLock);
/* Create the transient table that will receive the regenerated data. */
OIDNewHeap = make_new_heap(matviewOid, tableSpace);
dest = CreateTransientRelDestReceiver(OIDNewHeap);
if (!stmt->skipData)
refresh_matview_datafill(dest, dataQuery, queryString);
/*
* Swap the physical files of the target and transient tables, then
* rebuild the target's indexes and throw away the transient table.
*/
finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
RecentXmin, ReadNextMultiXactId());
RelationCacheInvalidateEntry(matviewOid);
}
| static void refresh_matview_datafill | ( | DestReceiver * | dest, | |
| Query * | query, | |||
| const char * | queryString | |||
| ) | [static] |
Definition at line 212 of file matview.c.
References CHECK_FOR_INTERRUPTS, copyObject(), CreateQueryDesc(), elog, ERROR, EXEC_FLAG_WITHOUT_OIDS, ExecutorEnd(), ExecutorFinish(), ExecutorRun(), ExecutorStart(), ForwardScanDirection, FreeQueryDesc(), GetActiveSnapshot(), InvalidSnapshot, linitial, list_length(), NULL, pg_plan_query(), PopActiveSnapshot(), PushCopiedSnapshot(), QueryRewrite(), and UpdateActiveSnapshotCommandId().
Referenced by ExecRefreshMatView().
{
List *rewritten;
PlannedStmt *plan;
QueryDesc *queryDesc;
/* Rewrite, copying the given Query to make sure it's not changed */
rewritten = QueryRewrite((Query *) copyObject(query));
/* SELECT should never rewrite to more or less than one SELECT query */
if (list_length(rewritten) != 1)
elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
query = (Query *) linitial(rewritten);
/* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS();
/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, 0, NULL);
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
* the planner executed an allegedly-stable function that changed the
* database contents, but let's do it anyway to be safe.)
*/
PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/* Create a QueryDesc, redirecting output to our tuple receiver */
queryDesc = CreateQueryDesc(plan, queryString,
GetActiveSnapshot(), InvalidSnapshot,
dest, NULL, 0);
/* call ExecutorStart to prepare the plan for execution */
ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
/* run the plan */
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
/* and clean up */
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc);
PopActiveSnapshot();
}
| void SetMatViewToPopulated | ( | Relation | relation | ) |
Definition at line 64 of file matview.c.
References Assert, log_newpage(), MAIN_FORKNUM, PageInit(), PageSetChecksumInplace(), palloc(), pfree(), RelationData::rd_id, RelationData::rd_ispopulated, RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationCacheInvalidateEntry(), RelationNeedsWAL, RelationOpenSmgr, RELKIND_MATVIEW, smgrextend(), and smgrimmedsync().
Referenced by copy_heap_data(), intorel_startup(), and transientrel_startup().
{
Page page;
Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
Assert(relation->rd_ispopulated == false);
page = (Page) palloc(BLCKSZ);
PageInit(page, BLCKSZ, 0);
if (RelationNeedsWAL(relation))
log_newpage(&(relation->rd_node), MAIN_FORKNUM, 0, page);
RelationOpenSmgr(relation);
PageSetChecksumInplace(page, 0);
smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true);
pfree(page);
smgrimmedsync(relation->rd_smgr, MAIN_FORKNUM);
RelationCacheInvalidateEntry(relation->rd_id);
}
| static void transientrel_destroy | ( | DestReceiver * | self | ) | [static] |
| static void transientrel_receive | ( | TupleTableSlot * | slot, | |
| DestReceiver * | self | |||
| ) | [static] |
Definition at line 313 of file matview.c.
References DR_transientrel::bistate, ExecMaterializeSlot(), heap_insert(), DR_transientrel::hi_options, DR_transientrel::output_cid, and DR_transientrel::transientrel.
{
DR_transientrel *myState = (DR_transientrel *) self;
HeapTuple tuple;
/*
* get the heap tuple out of the tuple table slot, making sure we have a
* writable copy
*/
tuple = ExecMaterializeSlot(slot);
heap_insert(myState->transientrel,
tuple,
myState->output_cid,
myState->hi_options,
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
}
| static void transientrel_shutdown | ( | DestReceiver * | self | ) | [static] |
Definition at line 337 of file matview.c.
References DR_transientrel::bistate, FreeBulkInsertState(), heap_close, HEAP_INSERT_SKIP_WAL, heap_sync(), DR_transientrel::hi_options, NoLock, and DR_transientrel::transientrel.
{
DR_transientrel *myState = (DR_transientrel *) self;
FreeBulkInsertState(myState->bistate);
/* If we skipped using WAL, must heap_sync before commit */
if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
heap_sync(myState->transientrel);
/* close transientrel, but keep lock until commit */
heap_close(myState->transientrel, NoLock);
myState->transientrel = NULL;
}
| static void transientrel_startup | ( | DestReceiver * | self, | |
| int | operation, | |||
| TupleDesc | typeinfo | |||
| ) | [static] |
Definition at line 281 of file matview.c.
References Assert, DR_transientrel::bistate, GetBulkInsertState(), GetCurrentCommandId(), HEAP_INSERT_SKIP_FSM, heap_open(), DR_transientrel::hi_options, InvalidBlockNumber, NoLock, DR_transientrel::output_cid, RelationGetTargetBlock, SetMatViewToPopulated(), DR_transientrel::transientoid, DR_transientrel::transientrel, and XLogIsNeeded.
{
DR_transientrel *myState = (DR_transientrel *) self;
Relation transientrel;
transientrel = heap_open(myState->transientoid, NoLock);
/*
* Fill private fields of myState for use by later routines
*/
myState->transientrel = transientrel;
myState->output_cid = GetCurrentCommandId(true);
/*
* We can skip WAL-logging the insertions, unless PITR or streaming
* replication is in use. We can skip the FSM in any case.
*/
myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
if (!XLogIsNeeded())
myState->hi_options |= HEAP_INSERT_SKIP_WAL;
myState->bistate = GetBulkInsertState();
SetMatViewToPopulated(transientrel);
/* Not using WAL requires smgr_targblock be initially invalid */
Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
}
1.7.1