#include "postgres.h"
#include "access/heapam_xlog.h"
#include "access/multixact.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/heap.h"
#include "catalog/namespace.h"
#include "commands/cluster.h"
#include "commands/matview.h"
#include "commands/tablecmds.h"
#include "executor/executor.h"
#include "miscadmin.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "utils/snapmgr.h"
Go to the source code of this file.
Data Structures | |
struct | DR_transientrel |
Functions | |
static void | transientrel_startup (DestReceiver *self, int operation, TupleDesc typeinfo) |
static void | transientrel_receive (TupleTableSlot *slot, DestReceiver *self) |
static void | transientrel_shutdown (DestReceiver *self) |
static void | transientrel_destroy (DestReceiver *self) |
static void | refresh_matview_datafill (DestReceiver *dest, Query *query, const char *queryString) |
void | SetMatViewToPopulated (Relation relation) |
void | ExecRefreshMatView (RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, char *completionTag) |
DestReceiver * | CreateTransientRelDestReceiver (Oid transientoid) |
DestReceiver* CreateTransientRelDestReceiver | ( | Oid | transientoid | ) |
Definition at line 263 of file matview.c.
References palloc0().
Referenced by CreateDestReceiver(), and ExecRefreshMatView().
{ DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel)); self->pub.receiveSlot = transientrel_receive; self->pub.rStartup = transientrel_startup; self->pub.rShutdown = transientrel_shutdown; self->pub.rDestroy = transientrel_destroy; self->pub.mydest = DestTransientRel; self->transientoid = transientoid; return (DestReceiver *) self; }
void ExecRefreshMatView | ( | RefreshMatViewStmt * | stmt, | |
const char * | queryString, | |||
ParamListInfo | params, | |||
char * | completionTag | |||
) |
Definition at line 110 of file matview.c.
References AccessExclusiveLock, RewriteRule::actions, Assert, CheckTableNotInUse(), CMD_SELECT, CreateTransientRelDestReceiver(), elog, ereport, errcode(), errmsg(), ERROR, RewriteRule::event, finish_heap_swap(), heap_close, heap_open(), IsA, RewriteRule::isInstead, IsSystemRelation(), linitial, list_length(), make_new_heap(), NoLock, NULL, RuleLock::numLocks, RangeVarCallbackOwnsTable(), RangeVarGetRelidExtended(), RelationData::rd_rel, RelationData::rd_rules, ReadNextMultiXactId(), RecentXmin, refresh_matview_datafill(), RefreshMatViewStmt::relation, RelationCacheInvalidateEntry(), RelationGetRelationName, RELKIND_MATVIEW, RuleLock::rules, and RefreshMatViewStmt::skipData.
Referenced by ProcessUtilitySlow().
{ Oid matviewOid; Relation matviewRel; RewriteRule *rule; List *actions; Query *dataQuery; Oid tableSpace; Oid OIDNewHeap; DestReceiver *dest; /* * Get a lock until end of transaction. */ matviewOid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock, false, false, RangeVarCallbackOwnsTable, NULL); matviewRel = heap_open(matviewOid, NoLock); /* Make sure it is a materialized view. */ if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("\"%s\" is not a materialized view", RelationGetRelationName(matviewRel)))); /* * We're not using materialized views in the system catalogs. */ Assert(!IsSystemRelation(matviewRel)); Assert(!matviewRel->rd_rel->relhasoids); /* * Check that everything is correct for a refresh. Problems at this point * are internal errors, so elog is sufficient. */ if (matviewRel->rd_rel->relhasrules == false || matviewRel->rd_rules->numLocks < 1) elog(ERROR, "materialized view \"%s\" is missing rewrite information", RelationGetRelationName(matviewRel)); if (matviewRel->rd_rules->numLocks > 1) elog(ERROR, "materialized view \"%s\" has too many rules", RelationGetRelationName(matviewRel)); rule = matviewRel->rd_rules->rules[0]; if (rule->event != CMD_SELECT || !(rule->isInstead)) elog(ERROR, "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule", RelationGetRelationName(matviewRel)); actions = rule->actions; if (list_length(actions) != 1) elog(ERROR, "the rule for materialized view \"%s\" is not a single action", RelationGetRelationName(matviewRel)); /* * The stored query was rewritten at the time of the MV definition, but * has not been scribbled on by the planner. */ dataQuery = (Query *) linitial(actions); Assert(IsA(dataQuery, Query)); /* * Check for active uses of the relation in the current transaction, such * as open scans. * * NB: We count on this to protect us against problems with refreshing the * data using HEAP_INSERT_FROZEN. */ CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW"); tableSpace = matviewRel->rd_rel->reltablespace; heap_close(matviewRel, NoLock); /* Create the transient table that will receive the regenerated data. */ OIDNewHeap = make_new_heap(matviewOid, tableSpace); dest = CreateTransientRelDestReceiver(OIDNewHeap); if (!stmt->skipData) refresh_matview_datafill(dest, dataQuery, queryString); /* * Swap the physical files of the target and transient tables, then * rebuild the target's indexes and throw away the transient table. */ finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, RecentXmin, ReadNextMultiXactId()); RelationCacheInvalidateEntry(matviewOid); }
static void refresh_matview_datafill | ( | DestReceiver * | dest, | |
Query * | query, | |||
const char * | queryString | |||
) | [static] |
Definition at line 212 of file matview.c.
References CHECK_FOR_INTERRUPTS, copyObject(), CreateQueryDesc(), elog, ERROR, EXEC_FLAG_WITHOUT_OIDS, ExecutorEnd(), ExecutorFinish(), ExecutorRun(), ExecutorStart(), ForwardScanDirection, FreeQueryDesc(), GetActiveSnapshot(), InvalidSnapshot, linitial, list_length(), NULL, pg_plan_query(), PopActiveSnapshot(), PushCopiedSnapshot(), QueryRewrite(), and UpdateActiveSnapshotCommandId().
Referenced by ExecRefreshMatView().
{ List *rewritten; PlannedStmt *plan; QueryDesc *queryDesc; /* Rewrite, copying the given Query to make sure it's not changed */ rewritten = QueryRewrite((Query *) copyObject(query)); /* SELECT should never rewrite to more or less than one SELECT query */ if (list_length(rewritten) != 1) elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW"); query = (Query *) linitial(rewritten); /* Check for user-requested abort. */ CHECK_FOR_INTERRUPTS(); /* Plan the query which will generate data for the refresh. */ plan = pg_plan_query(query, 0, NULL); /* * Use a snapshot with an updated command ID to ensure this query sees * results of any previously executed queries. (This could only matter if * the planner executed an allegedly-stable function that changed the * database contents, but let's do it anyway to be safe.) */ PushCopiedSnapshot(GetActiveSnapshot()); UpdateActiveSnapshotCommandId(); /* Create a QueryDesc, redirecting output to our tuple receiver */ queryDesc = CreateQueryDesc(plan, queryString, GetActiveSnapshot(), InvalidSnapshot, dest, NULL, 0); /* call ExecutorStart to prepare the plan for execution */ ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS); /* run the plan */ ExecutorRun(queryDesc, ForwardScanDirection, 0L); /* and clean up */ ExecutorFinish(queryDesc); ExecutorEnd(queryDesc); FreeQueryDesc(queryDesc); PopActiveSnapshot(); }
void SetMatViewToPopulated | ( | Relation | relation | ) |
Definition at line 64 of file matview.c.
References Assert, log_newpage(), MAIN_FORKNUM, PageInit(), PageSetChecksumInplace(), palloc(), pfree(), RelationData::rd_id, RelationData::rd_ispopulated, RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationCacheInvalidateEntry(), RelationNeedsWAL, RelationOpenSmgr, RELKIND_MATVIEW, smgrextend(), and smgrimmedsync().
Referenced by copy_heap_data(), intorel_startup(), and transientrel_startup().
{ Page page; Assert(relation->rd_rel->relkind == RELKIND_MATVIEW); Assert(relation->rd_ispopulated == false); page = (Page) palloc(BLCKSZ); PageInit(page, BLCKSZ, 0); if (RelationNeedsWAL(relation)) log_newpage(&(relation->rd_node), MAIN_FORKNUM, 0, page); RelationOpenSmgr(relation); PageSetChecksumInplace(page, 0); smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true); pfree(page); smgrimmedsync(relation->rd_smgr, MAIN_FORKNUM); RelationCacheInvalidateEntry(relation->rd_id); }
static void transientrel_destroy | ( | DestReceiver * | self | ) | [static] |
static void transientrel_receive | ( | TupleTableSlot * | slot, | |
DestReceiver * | self | |||
) | [static] |
Definition at line 313 of file matview.c.
References DR_transientrel::bistate, ExecMaterializeSlot(), heap_insert(), DR_transientrel::hi_options, DR_transientrel::output_cid, and DR_transientrel::transientrel.
{ DR_transientrel *myState = (DR_transientrel *) self; HeapTuple tuple; /* * get the heap tuple out of the tuple table slot, making sure we have a * writable copy */ tuple = ExecMaterializeSlot(slot); heap_insert(myState->transientrel, tuple, myState->output_cid, myState->hi_options, myState->bistate); /* We know this is a newly created relation, so there are no indexes */ }
static void transientrel_shutdown | ( | DestReceiver * | self | ) | [static] |
Definition at line 337 of file matview.c.
References DR_transientrel::bistate, FreeBulkInsertState(), heap_close, HEAP_INSERT_SKIP_WAL, heap_sync(), DR_transientrel::hi_options, NoLock, and DR_transientrel::transientrel.
{ DR_transientrel *myState = (DR_transientrel *) self; FreeBulkInsertState(myState->bistate); /* If we skipped using WAL, must heap_sync before commit */ if (myState->hi_options & HEAP_INSERT_SKIP_WAL) heap_sync(myState->transientrel); /* close transientrel, but keep lock until commit */ heap_close(myState->transientrel, NoLock); myState->transientrel = NULL; }
static void transientrel_startup | ( | DestReceiver * | self, | |
int | operation, | |||
TupleDesc | typeinfo | |||
) | [static] |
Definition at line 281 of file matview.c.
References Assert, DR_transientrel::bistate, GetBulkInsertState(), GetCurrentCommandId(), HEAP_INSERT_SKIP_FSM, heap_open(), DR_transientrel::hi_options, InvalidBlockNumber, NoLock, DR_transientrel::output_cid, RelationGetTargetBlock, SetMatViewToPopulated(), DR_transientrel::transientoid, DR_transientrel::transientrel, and XLogIsNeeded.
{ DR_transientrel *myState = (DR_transientrel *) self; Relation transientrel; transientrel = heap_open(myState->transientoid, NoLock); /* * Fill private fields of myState for use by later routines */ myState->transientrel = transientrel; myState->output_cid = GetCurrentCommandId(true); /* * We can skip WAL-logging the insertions, unless PITR or streaming * replication is in use. We can skip the FSM in any case. */ myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN; if (!XLogIsNeeded()) myState->hi_options |= HEAP_INSERT_SKIP_WAL; myState->bistate = GetBulkInsertState(); SetMatViewToPopulated(transientrel); /* Not using WAL requires smgr_targblock be initially invalid */ Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber); }