00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "postgres.h"
00025
00026 #include "access/reloptions.h"
00027 #include "access/htup_details.h"
00028 #include "access/sysattr.h"
00029 #include "access/xact.h"
00030 #include "catalog/toasting.h"
00031 #include "commands/createas.h"
00032 #include "commands/matview.h"
00033 #include "commands/prepare.h"
00034 #include "commands/tablecmds.h"
00035 #include "commands/view.h"
00036 #include "parser/parse_clause.h"
00037 #include "rewrite/rewriteHandler.h"
00038 #include "storage/smgr.h"
00039 #include "tcop/tcopprot.h"
00040 #include "utils/builtins.h"
00041 #include "utils/lsyscache.h"
00042 #include "utils/rel.h"
00043 #include "utils/snapmgr.h"
00044
00045
00046 typedef struct
00047 {
00048 DestReceiver pub;
00049 IntoClause *into;
00050
00051 Relation rel;
00052 CommandId output_cid;
00053 int hi_options;
00054 BulkInsertState bistate;
00055 } DR_intorel;
00056
00057 static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
00058 static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
00059 static void intorel_shutdown(DestReceiver *self);
00060 static void intorel_destroy(DestReceiver *self);
00061
00062
00063
00064
00065
00066 void
00067 ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
00068 ParamListInfo params, char *completionTag)
00069 {
00070 Query *query = (Query *) stmt->query;
00071 IntoClause *into = stmt->into;
00072 DestReceiver *dest;
00073 List *rewritten;
00074 PlannedStmt *plan;
00075 QueryDesc *queryDesc;
00076 ScanDirection dir;
00077
00078
00079
00080
00081 dest = CreateIntoRelDestReceiver(into);
00082
00083
00084
00085
00086
00087 Assert(IsA(query, Query));
00088 if (query->commandType == CMD_UTILITY &&
00089 IsA(query->utilityStmt, ExecuteStmt))
00090 {
00091 ExecuteStmt *estmt = (ExecuteStmt *) query->utilityStmt;
00092
00093 ExecuteQuery(estmt, into, queryString, params, dest, completionTag);
00094
00095 return;
00096 }
00097 Assert(query->commandType == CMD_SELECT);
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110 rewritten = QueryRewrite((Query *) copyObject(query));
00111
00112
00113 if (list_length(rewritten) != 1)
00114 elog(ERROR, "unexpected rewrite result for CREATE TABLE AS SELECT");
00115 query = (Query *) linitial(rewritten);
00116 Assert(query->commandType == CMD_SELECT);
00117
00118
00119 plan = pg_plan_query(query, 0, params);
00120
00121
00122
00123
00124
00125
00126
00127
00128 PushCopiedSnapshot(GetActiveSnapshot());
00129 UpdateActiveSnapshotCommandId();
00130
00131
00132 queryDesc = CreateQueryDesc(plan, queryString,
00133 GetActiveSnapshot(), InvalidSnapshot,
00134 dest, params, 0);
00135
00136
00137 ExecutorStart(queryDesc, GetIntoRelEFlags(into));
00138
00139
00140
00141
00142
00143 if (into->skipData)
00144 dir = NoMovementScanDirection;
00145 else
00146 dir = ForwardScanDirection;
00147
00148
00149 ExecutorRun(queryDesc, dir, 0L);
00150
00151
00152 if (completionTag)
00153 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00154 "SELECT %u", queryDesc->estate->es_processed);
00155
00156
00157 ExecutorFinish(queryDesc);
00158 ExecutorEnd(queryDesc);
00159
00160 FreeQueryDesc(queryDesc);
00161
00162 PopActiveSnapshot();
00163 }
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173 int
00174 GetIntoRelEFlags(IntoClause *intoClause)
00175 {
00176 int flags;
00177
00178
00179
00180
00181
00182
00183
00184
00185 if (interpretOidsOption(intoClause->options,
00186 (intoClause->viewQuery == NULL)))
00187 flags = EXEC_FLAG_WITH_OIDS;
00188 else
00189 flags = EXEC_FLAG_WITHOUT_OIDS;
00190
00191 if (intoClause->skipData)
00192 flags |= EXEC_FLAG_WITH_NO_DATA;
00193
00194 return flags;
00195 }
00196
00197
00198
00199
00200
00201
00202
00203
00204 DestReceiver *
00205 CreateIntoRelDestReceiver(IntoClause *intoClause)
00206 {
00207 DR_intorel *self = (DR_intorel *) palloc0(sizeof(DR_intorel));
00208
00209 self->pub.receiveSlot = intorel_receive;
00210 self->pub.rStartup = intorel_startup;
00211 self->pub.rShutdown = intorel_shutdown;
00212 self->pub.rDestroy = intorel_destroy;
00213 self->pub.mydest = DestIntoRel;
00214 self->into = intoClause;
00215
00216
00217 return (DestReceiver *) self;
00218 }
00219
00220
00221
00222
00223 static void
00224 intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
00225 {
00226 DR_intorel *myState = (DR_intorel *) self;
00227 IntoClause *into = myState->into;
00228 bool is_matview;
00229 char relkind;
00230 CreateStmt *create;
00231 Oid intoRelationId;
00232 Relation intoRelationDesc;
00233 RangeTblEntry *rte;
00234 Datum toast_options;
00235 ListCell *lc;
00236 int attnum;
00237 static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
00238
00239 Assert(into != NULL);
00240
00241
00242 is_matview = (into->viewQuery != NULL);
00243 relkind = is_matview ? RELKIND_MATVIEW : RELKIND_RELATION;
00244
00245
00246
00247
00248
00249 create = makeNode(CreateStmt);
00250 create->relation = into->rel;
00251 create->tableElts = NIL;
00252 create->inhRelations = NIL;
00253 create->ofTypename = NULL;
00254 create->constraints = NIL;
00255 create->options = into->options;
00256 create->oncommit = into->onCommit;
00257 create->tablespacename = into->tableSpaceName;
00258 create->if_not_exists = false;
00259
00260
00261
00262
00263
00264
00265
00266 lc = list_head(into->colNames);
00267 for (attnum = 0; attnum < typeinfo->natts; attnum++)
00268 {
00269 Form_pg_attribute attribute = typeinfo->attrs[attnum];
00270 ColumnDef *col = makeNode(ColumnDef);
00271 TypeName *coltype = makeNode(TypeName);
00272
00273 if (lc)
00274 {
00275 col->colname = strVal(lfirst(lc));
00276 lc = lnext(lc);
00277 }
00278 else
00279 col->colname = NameStr(attribute->attname);
00280 col->typeName = coltype;
00281 col->inhcount = 0;
00282 col->is_local = true;
00283 col->is_not_null = false;
00284 col->is_from_type = false;
00285 col->storage = 0;
00286 col->raw_default = NULL;
00287 col->cooked_default = NULL;
00288 col->collClause = NULL;
00289 col->collOid = attribute->attcollation;
00290 col->constraints = NIL;
00291 col->fdwoptions = NIL;
00292
00293 coltype->names = NIL;
00294 coltype->typeOid = attribute->atttypid;
00295 coltype->setof = false;
00296 coltype->pct_type = false;
00297 coltype->typmods = NIL;
00298 coltype->typemod = attribute->atttypmod;
00299 coltype->arrayBounds = NIL;
00300 coltype->location = -1;
00301
00302
00303
00304
00305
00306
00307
00308 if (!OidIsValid(col->collOid) &&
00309 type_is_collatable(coltype->typeOid))
00310 ereport(ERROR,
00311 (errcode(ERRCODE_INDETERMINATE_COLLATION),
00312 errmsg("no collation was derived for column \"%s\" with collatable type %s",
00313 col->colname, format_type_be(coltype->typeOid)),
00314 errhint("Use the COLLATE clause to set the collation explicitly.")));
00315
00316 create->tableElts = lappend(create->tableElts, col);
00317 }
00318
00319 if (lc != NULL)
00320 ereport(ERROR,
00321 (errcode(ERRCODE_SYNTAX_ERROR),
00322 errmsg("too many column names were specified")));
00323
00324
00325
00326
00327 intoRelationId = DefineRelation(create, relkind, InvalidOid);
00328
00329
00330
00331
00332
00333
00334 CommandCounterIncrement();
00335
00336
00337 toast_options = transformRelOptions((Datum) 0,
00338 create->options,
00339 "toast",
00340 validnsps,
00341 true, false);
00342
00343 (void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true);
00344
00345 AlterTableCreateToastTable(intoRelationId, toast_options);
00346
00347
00348 if (is_matview)
00349 {
00350
00351 Query *query = (Query *) copyObject(into->viewQuery);
00352
00353 StoreViewQuery(intoRelationId, query, false);
00354 CommandCounterIncrement();
00355 }
00356
00357
00358
00359
00360 intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock);
00361
00362 if (is_matview && !into->skipData)
00363
00364 SetMatViewToPopulated(intoRelationDesc);
00365
00366
00367
00368
00369
00370
00371
00372 rte = makeNode(RangeTblEntry);
00373 rte->rtekind = RTE_RELATION;
00374 rte->relid = intoRelationId;
00375 rte->relkind = relkind;
00376 rte->requiredPerms = ACL_INSERT;
00377
00378 for (attnum = 1; attnum <= intoRelationDesc->rd_att->natts; attnum++)
00379 rte->modifiedCols = bms_add_member(rte->modifiedCols,
00380 attnum - FirstLowInvalidHeapAttributeNumber);
00381
00382 ExecCheckRTPerms(list_make1(rte), true);
00383
00384
00385
00386
00387 myState->rel = intoRelationDesc;
00388 myState->output_cid = GetCurrentCommandId(true);
00389
00390
00391
00392
00393
00394 myState->hi_options = HEAP_INSERT_SKIP_FSM |
00395 (XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
00396 myState->bistate = GetBulkInsertState();
00397
00398
00399 Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
00400 }
00401
00402
00403
00404
00405 static void
00406 intorel_receive(TupleTableSlot *slot, DestReceiver *self)
00407 {
00408 DR_intorel *myState = (DR_intorel *) self;
00409 HeapTuple tuple;
00410
00411
00412
00413
00414
00415 tuple = ExecMaterializeSlot(slot);
00416
00417
00418
00419
00420 if (myState->rel->rd_rel->relhasoids)
00421 HeapTupleSetOid(tuple, InvalidOid);
00422
00423 heap_insert(myState->rel,
00424 tuple,
00425 myState->output_cid,
00426 myState->hi_options,
00427 myState->bistate);
00428
00429
00430 }
00431
00432
00433
00434
00435 static void
00436 intorel_shutdown(DestReceiver *self)
00437 {
00438 DR_intorel *myState = (DR_intorel *) self;
00439
00440 FreeBulkInsertState(myState->bistate);
00441
00442
00443 if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
00444 heap_sync(myState->rel);
00445
00446
00447 heap_close(myState->rel, NoLock);
00448 myState->rel = NULL;
00449 }
00450
00451
00452
00453
00454 static void
00455 intorel_destroy(DestReceiver *self)
00456 {
00457 pfree(self);
00458 }