00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include "access/heapam_xlog.h"
00018 #include "access/multixact.h"
00019 #include "access/relscan.h"
00020 #include "access/xact.h"
00021 #include "catalog/catalog.h"
00022 #include "catalog/heap.h"
00023 #include "catalog/namespace.h"
00024 #include "commands/cluster.h"
00025 #include "commands/matview.h"
00026 #include "commands/tablecmds.h"
00027 #include "executor/executor.h"
00028 #include "miscadmin.h"
00029 #include "rewrite/rewriteHandler.h"
00030 #include "storage/lmgr.h"
00031 #include "storage/smgr.h"
00032 #include "tcop/tcopprot.h"
00033 #include "utils/snapmgr.h"
00034
00035
00036 typedef struct
00037 {
00038 DestReceiver pub;
00039 Oid transientoid;
00040
00041 Relation transientrel;
00042 CommandId output_cid;
00043 int hi_options;
00044 BulkInsertState bistate;
00045 } DR_transientrel;
00046
00047 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
00048 static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
00049 static void transientrel_shutdown(DestReceiver *self);
00050 static void transientrel_destroy(DestReceiver *self);
00051 static void refresh_matview_datafill(DestReceiver *dest, Query *query,
00052 const char *queryString);
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063 void
00064 SetMatViewToPopulated(Relation relation)
00065 {
00066 Page page;
00067
00068 Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
00069 Assert(relation->rd_ispopulated == false);
00070
00071 page = (Page) palloc(BLCKSZ);
00072 PageInit(page, BLCKSZ, 0);
00073
00074 if (RelationNeedsWAL(relation))
00075 log_newpage(&(relation->rd_node), MAIN_FORKNUM, 0, page);
00076
00077 RelationOpenSmgr(relation);
00078
00079 PageSetChecksumInplace(page, 0);
00080 smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true);
00081
00082 pfree(page);
00083
00084 smgrimmedsync(relation->rd_smgr, MAIN_FORKNUM);
00085
00086 RelationCacheInvalidateEntry(relation->rd_id);
00087 }
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109 void
00110 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
00111 ParamListInfo params, char *completionTag)
00112 {
00113 Oid matviewOid;
00114 Relation matviewRel;
00115 RewriteRule *rule;
00116 List *actions;
00117 Query *dataQuery;
00118 Oid tableSpace;
00119 Oid OIDNewHeap;
00120 DestReceiver *dest;
00121
00122
00123
00124
00125 matviewOid = RangeVarGetRelidExtended(stmt->relation,
00126 AccessExclusiveLock, false, false,
00127 RangeVarCallbackOwnsTable, NULL);
00128 matviewRel = heap_open(matviewOid, NoLock);
00129
00130
00131 if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
00132 ereport(ERROR,
00133 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00134 errmsg("\"%s\" is not a materialized view",
00135 RelationGetRelationName(matviewRel))));
00136
00137
00138
00139
00140 Assert(!IsSystemRelation(matviewRel));
00141
00142 Assert(!matviewRel->rd_rel->relhasoids);
00143
00144
00145
00146
00147
00148 if (matviewRel->rd_rel->relhasrules == false ||
00149 matviewRel->rd_rules->numLocks < 1)
00150 elog(ERROR,
00151 "materialized view \"%s\" is missing rewrite information",
00152 RelationGetRelationName(matviewRel));
00153
00154 if (matviewRel->rd_rules->numLocks > 1)
00155 elog(ERROR,
00156 "materialized view \"%s\" has too many rules",
00157 RelationGetRelationName(matviewRel));
00158
00159 rule = matviewRel->rd_rules->rules[0];
00160 if (rule->event != CMD_SELECT || !(rule->isInstead))
00161 elog(ERROR,
00162 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
00163 RelationGetRelationName(matviewRel));
00164
00165 actions = rule->actions;
00166 if (list_length(actions) != 1)
00167 elog(ERROR,
00168 "the rule for materialized view \"%s\" is not a single action",
00169 RelationGetRelationName(matviewRel));
00170
00171
00172
00173
00174
00175 dataQuery = (Query *) linitial(actions);
00176 Assert(IsA(dataQuery, Query));
00177
00178
00179
00180
00181
00182
00183
00184
00185 CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
00186
00187 tableSpace = matviewRel->rd_rel->reltablespace;
00188
00189 heap_close(matviewRel, NoLock);
00190
00191
00192 OIDNewHeap = make_new_heap(matviewOid, tableSpace);
00193 dest = CreateTransientRelDestReceiver(OIDNewHeap);
00194
00195 if (!stmt->skipData)
00196 refresh_matview_datafill(dest, dataQuery, queryString);
00197
00198
00199
00200
00201
00202 finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
00203 RecentXmin, ReadNextMultiXactId());
00204
00205 RelationCacheInvalidateEntry(matviewOid);
00206 }
00207
00208
00209
00210
00211 static void
00212 refresh_matview_datafill(DestReceiver *dest, Query *query,
00213 const char *queryString)
00214 {
00215 List *rewritten;
00216 PlannedStmt *plan;
00217 QueryDesc *queryDesc;
00218
00219
00220 rewritten = QueryRewrite((Query *) copyObject(query));
00221
00222
00223 if (list_length(rewritten) != 1)
00224 elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
00225 query = (Query *) linitial(rewritten);
00226
00227
00228 CHECK_FOR_INTERRUPTS();
00229
00230
00231 plan = pg_plan_query(query, 0, NULL);
00232
00233
00234
00235
00236
00237
00238
00239 PushCopiedSnapshot(GetActiveSnapshot());
00240 UpdateActiveSnapshotCommandId();
00241
00242
00243 queryDesc = CreateQueryDesc(plan, queryString,
00244 GetActiveSnapshot(), InvalidSnapshot,
00245 dest, NULL, 0);
00246
00247
00248 ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
00249
00250
00251 ExecutorRun(queryDesc, ForwardScanDirection, 0L);
00252
00253
00254 ExecutorFinish(queryDesc);
00255 ExecutorEnd(queryDesc);
00256
00257 FreeQueryDesc(queryDesc);
00258
00259 PopActiveSnapshot();
00260 }
00261
00262 DestReceiver *
00263 CreateTransientRelDestReceiver(Oid transientoid)
00264 {
00265 DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
00266
00267 self->pub.receiveSlot = transientrel_receive;
00268 self->pub.rStartup = transientrel_startup;
00269 self->pub.rShutdown = transientrel_shutdown;
00270 self->pub.rDestroy = transientrel_destroy;
00271 self->pub.mydest = DestTransientRel;
00272 self->transientoid = transientoid;
00273
00274 return (DestReceiver *) self;
00275 }
00276
00277
00278
00279
00280 static void
00281 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
00282 {
00283 DR_transientrel *myState = (DR_transientrel *) self;
00284 Relation transientrel;
00285
00286 transientrel = heap_open(myState->transientoid, NoLock);
00287
00288
00289
00290
00291 myState->transientrel = transientrel;
00292 myState->output_cid = GetCurrentCommandId(true);
00293
00294
00295
00296
00297
00298 myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
00299 if (!XLogIsNeeded())
00300 myState->hi_options |= HEAP_INSERT_SKIP_WAL;
00301 myState->bistate = GetBulkInsertState();
00302
00303 SetMatViewToPopulated(transientrel);
00304
00305
00306 Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
00307 }
00308
00309
00310
00311
00312 static void
00313 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
00314 {
00315 DR_transientrel *myState = (DR_transientrel *) self;
00316 HeapTuple tuple;
00317
00318
00319
00320
00321
00322 tuple = ExecMaterializeSlot(slot);
00323
00324 heap_insert(myState->transientrel,
00325 tuple,
00326 myState->output_cid,
00327 myState->hi_options,
00328 myState->bistate);
00329
00330
00331 }
00332
00333
00334
00335
00336 static void
00337 transientrel_shutdown(DestReceiver *self)
00338 {
00339 DR_transientrel *myState = (DR_transientrel *) self;
00340
00341 FreeBulkInsertState(myState->bistate);
00342
00343
00344 if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
00345 heap_sync(myState->transientrel);
00346
00347
00348 heap_close(myState->transientrel, NoLock);
00349 myState->transientrel = NULL;
00350 }
00351
00352
00353
00354
00355 static void
00356 transientrel_destroy(DestReceiver *self)
00357 {
00358 pfree(self);
00359 }