Header And Logo

PostgreSQL
| The world's most advanced open source database.

heap.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * heap.c
00004  *    code to create and destroy POSTGRES heap relations
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/catalog/heap.c
00012  *
00013  *
00014  * INTERFACE ROUTINES
00015  *      heap_create()           - Create an uncataloged heap relation
00016  *      heap_create_with_catalog() - Create a cataloged relation
00017  *      heap_drop_with_catalog() - Removes named relation from catalogs
00018  *
00019  * NOTES
00020  *    this code taken from access/heap/create.c, which contains
00021  *    the old heap_create_with_catalog, amcreate, and amdestroy.
00022  *    those routines will soon call these routines using the function
00023  *    manager,
00024  *    just like the poorly named "NewXXX" routines do.  The
00025  *    "New" routines are all going to die soon, once and for all!
00026  *      -cim 1/13/91
00027  *
00028  *-------------------------------------------------------------------------
00029  */
00030 #include "postgres.h"
00031 
00032 #include "access/htup_details.h"
00033 #include "access/multixact.h"
00034 #include "access/sysattr.h"
00035 #include "access/transam.h"
00036 #include "access/xact.h"
00037 #include "catalog/catalog.h"
00038 #include "catalog/dependency.h"
00039 #include "catalog/heap.h"
00040 #include "catalog/index.h"
00041 #include "catalog/objectaccess.h"
00042 #include "catalog/pg_attrdef.h"
00043 #include "catalog/pg_collation.h"
00044 #include "catalog/pg_constraint.h"
00045 #include "catalog/pg_foreign_table.h"
00046 #include "catalog/pg_inherits.h"
00047 #include "catalog/pg_namespace.h"
00048 #include "catalog/pg_statistic.h"
00049 #include "catalog/pg_tablespace.h"
00050 #include "catalog/pg_type.h"
00051 #include "catalog/pg_type_fn.h"
00052 #include "catalog/storage.h"
00053 #include "catalog/storage_xlog.h"
00054 #include "commands/tablecmds.h"
00055 #include "commands/typecmds.h"
00056 #include "miscadmin.h"
00057 #include "nodes/nodeFuncs.h"
00058 #include "optimizer/var.h"
00059 #include "parser/parse_coerce.h"
00060 #include "parser/parse_collate.h"
00061 #include "parser/parse_expr.h"
00062 #include "parser/parse_relation.h"
00063 #include "storage/predicate.h"
00064 #include "storage/smgr.h"
00065 #include "utils/acl.h"
00066 #include "utils/builtins.h"
00067 #include "utils/fmgroids.h"
00068 #include "utils/inval.h"
00069 #include "utils/lsyscache.h"
00070 #include "utils/rel.h"
00071 #include "utils/snapmgr.h"
00072 #include "utils/syscache.h"
00073 #include "utils/tqual.h"
00074 
00075 
00076 /* Potentially set by contrib/pg_upgrade_support functions */
00077 Oid         binary_upgrade_next_heap_pg_class_oid = InvalidOid;
00078 Oid         binary_upgrade_next_toast_pg_class_oid = InvalidOid;
00079 
00080 static void AddNewRelationTuple(Relation pg_class_desc,
00081                     Relation new_rel_desc,
00082                     Oid new_rel_oid,
00083                     Oid new_type_oid,
00084                     Oid reloftype,
00085                     Oid relowner,
00086                     char relkind,
00087                     Datum relacl,
00088                     Datum reloptions);
00089 static Oid AddNewRelationType(const char *typeName,
00090                    Oid typeNamespace,
00091                    Oid new_rel_oid,
00092                    char new_rel_kind,
00093                    Oid ownerid,
00094                    Oid new_row_type,
00095                    Oid new_array_type);
00096 static void RelationRemoveInheritance(Oid relid);
00097 static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
00098               bool is_validated, bool is_local, int inhcount,
00099               bool is_no_inherit, bool is_internal);
00100 static void StoreConstraints(Relation rel, List *cooked_constraints,
00101                              bool is_internal);
00102 static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
00103                             bool allow_merge, bool is_local,
00104                             bool is_no_inherit);
00105 static void SetRelationNumChecks(Relation rel, int numchecks);
00106 static Node *cookConstraint(ParseState *pstate,
00107                Node *raw_constraint,
00108                char *relname);
00109 static List *insert_ordered_unique_oid(List *list, Oid datum);
00110 
00111 
00112 /* ----------------------------------------------------------------
00113  *              XXX UGLY HARD CODED BADNESS FOLLOWS XXX
00114  *
00115  *      these should all be moved to someplace in the lib/catalog
00116  *      module, if not obliterated first.
00117  * ----------------------------------------------------------------
00118  */
00119 
00120 
00121 /*
00122  * Note:
00123  *      Should the system special case these attributes in the future?
00124  *      Advantage:  consume much less space in the ATTRIBUTE relation.
00125  *      Disadvantage:  special cases will be all over the place.
00126  */
00127 
00128 /*
00129  * The initializers below do not include trailing variable length fields,
00130  * but that's OK - we're never going to reference anything beyond the
00131  * fixed-size portion of the structure anyway.
00132  */
00133 
00134 static FormData_pg_attribute a1 = {
00135     0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
00136     SelfItemPointerAttributeNumber, 0, -1, -1,
00137     false, 'p', 's', true, false, false, true, 0
00138 };
00139 
00140 static FormData_pg_attribute a2 = {
00141     0, {"oid"}, OIDOID, 0, sizeof(Oid),
00142     ObjectIdAttributeNumber, 0, -1, -1,
00143     true, 'p', 'i', true, false, false, true, 0
00144 };
00145 
00146 static FormData_pg_attribute a3 = {
00147     0, {"xmin"}, XIDOID, 0, sizeof(TransactionId),
00148     MinTransactionIdAttributeNumber, 0, -1, -1,
00149     true, 'p', 'i', true, false, false, true, 0
00150 };
00151 
00152 static FormData_pg_attribute a4 = {
00153     0, {"cmin"}, CIDOID, 0, sizeof(CommandId),
00154     MinCommandIdAttributeNumber, 0, -1, -1,
00155     true, 'p', 'i', true, false, false, true, 0
00156 };
00157 
00158 static FormData_pg_attribute a5 = {
00159     0, {"xmax"}, XIDOID, 0, sizeof(TransactionId),
00160     MaxTransactionIdAttributeNumber, 0, -1, -1,
00161     true, 'p', 'i', true, false, false, true, 0
00162 };
00163 
00164 static FormData_pg_attribute a6 = {
00165     0, {"cmax"}, CIDOID, 0, sizeof(CommandId),
00166     MaxCommandIdAttributeNumber, 0, -1, -1,
00167     true, 'p', 'i', true, false, false, true, 0
00168 };
00169 
00170 /*
00171  * We decided to call this attribute "tableoid" rather than say
00172  * "classoid" on the basis that in the future there may be more than one
00173  * table of a particular class/type. In any case table is still the word
00174  * used in SQL.
00175  */
00176 static FormData_pg_attribute a7 = {
00177     0, {"tableoid"}, OIDOID, 0, sizeof(Oid),
00178     TableOidAttributeNumber, 0, -1, -1,
00179     true, 'p', 'i', true, false, false, true, 0
00180 };
00181 
00182 static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7};
00183 
00184 /*
00185  * This function returns a Form_pg_attribute pointer for a system attribute.
00186  * Note that we elog if the presented attno is invalid, which would only
00187  * happen if there's a problem upstream.
00188  */
00189 Form_pg_attribute
00190 SystemAttributeDefinition(AttrNumber attno, bool relhasoids)
00191 {
00192     if (attno >= 0 || attno < -(int) lengthof(SysAtt))
00193         elog(ERROR, "invalid system attribute number %d", attno);
00194     if (attno == ObjectIdAttributeNumber && !relhasoids)
00195         elog(ERROR, "invalid system attribute number %d", attno);
00196     return SysAtt[-attno - 1];
00197 }
00198 
00199 /*
00200  * If the given name is a system attribute name, return a Form_pg_attribute
00201  * pointer for a prototype definition.  If not, return NULL.
00202  */
00203 Form_pg_attribute
00204 SystemAttributeByName(const char *attname, bool relhasoids)
00205 {
00206     int         j;
00207 
00208     for (j = 0; j < (int) lengthof(SysAtt); j++)
00209     {
00210         Form_pg_attribute att = SysAtt[j];
00211 
00212         if (relhasoids || att->attnum != ObjectIdAttributeNumber)
00213         {
00214             if (strcmp(NameStr(att->attname), attname) == 0)
00215                 return att;
00216         }
00217     }
00218 
00219     return NULL;
00220 }
00221 
00222 
00223 /* ----------------------------------------------------------------
00224  *              XXX END OF UGLY HARD CODED BADNESS XXX
00225  * ---------------------------------------------------------------- */
00226 
00227 
00228 /* ----------------------------------------------------------------
00229  *      heap_create     - Create an uncataloged heap relation
00230  *
00231  *      Note API change: the caller must now always provide the OID
00232  *      to use for the relation.  The relfilenode may (and, normally,
00233  *      should) be left unspecified.
00234  *
00235  *      rel->rd_rel is initialized by RelationBuildLocalRelation,
00236  *      and is mostly zeroes at return.
00237  * ----------------------------------------------------------------
00238  */
00239 Relation
00240 heap_create(const char *relname,
00241             Oid relnamespace,
00242             Oid reltablespace,
00243             Oid relid,
00244             Oid relfilenode,
00245             TupleDesc tupDesc,
00246             char relkind,
00247             char relpersistence,
00248             bool shared_relation,
00249             bool mapped_relation)
00250 {
00251     bool        create_storage;
00252     Relation    rel;
00253 
00254     /* The caller must have provided an OID for the relation. */
00255     Assert(OidIsValid(relid));
00256 
00257     /*
00258      * Decide if we need storage or not, and handle a couple other special
00259      * cases for particular relkinds.
00260      */
00261     switch (relkind)
00262     {
00263         case RELKIND_VIEW:
00264         case RELKIND_COMPOSITE_TYPE:
00265         case RELKIND_FOREIGN_TABLE:
00266             create_storage = false;
00267 
00268             /*
00269              * Force reltablespace to zero if the relation has no physical
00270              * storage.  This is mainly just for cleanliness' sake.
00271              */
00272             reltablespace = InvalidOid;
00273             break;
00274         case RELKIND_SEQUENCE:
00275             create_storage = true;
00276 
00277             /*
00278              * Force reltablespace to zero for sequences, since we don't
00279              * support moving them around into different tablespaces.
00280              */
00281             reltablespace = InvalidOid;
00282             break;
00283         default:
00284             create_storage = true;
00285             break;
00286     }
00287 
00288     /*
00289      * Unless otherwise requested, the physical ID (relfilenode) is initially
00290      * the same as the logical ID (OID).  When the caller did specify a
00291      * relfilenode, it already exists; do not attempt to create it.
00292      */
00293     if (OidIsValid(relfilenode))
00294         create_storage = false;
00295     else
00296         relfilenode = relid;
00297 
00298     /*
00299      * Never allow a pg_class entry to explicitly specify the database's
00300      * default tablespace in reltablespace; force it to zero instead. This
00301      * ensures that if the database is cloned with a different default
00302      * tablespace, the pg_class entry will still match where CREATE DATABASE
00303      * will put the physically copied relation.
00304      *
00305      * Yes, this is a bit of a hack.
00306      */
00307     if (reltablespace == MyDatabaseTableSpace)
00308         reltablespace = InvalidOid;
00309 
00310     /*
00311      * build the relcache entry.
00312      */
00313     rel = RelationBuildLocalRelation(relname,
00314                                      relnamespace,
00315                                      tupDesc,
00316                                      relid,
00317                                      relfilenode,
00318                                      reltablespace,
00319                                      shared_relation,
00320                                      mapped_relation,
00321                                      relpersistence,
00322                                      relkind);
00323 
00324     /*
00325      * Have the storage manager create the relation's disk file, if needed.
00326      *
00327      * We only create the main fork here, other forks will be created on
00328      * demand.
00329      */
00330     if (create_storage)
00331     {
00332         RelationOpenSmgr(rel);
00333         RelationCreateStorage(rel->rd_node, relpersistence);
00334     }
00335 
00336     return rel;
00337 }
00338 
00339 /* ----------------------------------------------------------------
00340  *      heap_create_with_catalog        - Create a cataloged relation
00341  *
00342  *      this is done in multiple steps:
00343  *
00344  *      1) CheckAttributeNamesTypes() is used to make certain the tuple
00345  *         descriptor contains a valid set of attribute names and types
00346  *
00347  *      2) pg_class is opened and get_relname_relid()
00348  *         performs a scan to ensure that no relation with the
00349  *         same name already exists.
00350  *
00351  *      3) heap_create() is called to create the new relation on disk.
00352  *
00353  *      4) TypeCreate() is called to define a new type corresponding
00354  *         to the new relation.
00355  *
00356  *      5) AddNewRelationTuple() is called to register the
00357  *         relation in pg_class.
00358  *
00359  *      6) AddNewAttributeTuples() is called to register the
00360  *         new relation's schema in pg_attribute.
00361  *
00362  *      7) StoreConstraints is called ()        - vadim 08/22/97
00363  *
00364  *      8) the relations are closed and the new relation's oid
00365  *         is returned.
00366  *
00367  * ----------------------------------------------------------------
00368  */
00369 
00370 /* --------------------------------
00371  *      CheckAttributeNamesTypes
00372  *
00373  *      this is used to make certain the tuple descriptor contains a
00374  *      valid set of attribute names and datatypes.  a problem simply
00375  *      generates ereport(ERROR) which aborts the current transaction.
00376  * --------------------------------
00377  */
00378 void
00379 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
00380                          bool allow_system_table_mods)
00381 {
00382     int         i;
00383     int         j;
00384     int         natts = tupdesc->natts;
00385 
00386     /* Sanity check on column count */
00387     if (natts < 0 || natts > MaxHeapAttributeNumber)
00388         ereport(ERROR,
00389                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
00390                  errmsg("tables can have at most %d columns",
00391                         MaxHeapAttributeNumber)));
00392 
00393     /*
00394      * first check for collision with system attribute names
00395      *
00396      * Skip this for a view or type relation, since those don't have system
00397      * attributes.
00398      */
00399     if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
00400     {
00401         for (i = 0; i < natts; i++)
00402         {
00403             if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname),
00404                                       tupdesc->tdhasoid) != NULL)
00405                 ereport(ERROR,
00406                         (errcode(ERRCODE_DUPLICATE_COLUMN),
00407                          errmsg("column name \"%s\" conflicts with a system column name",
00408                                 NameStr(tupdesc->attrs[i]->attname))));
00409         }
00410     }
00411 
00412     /*
00413      * next check for repeated attribute names
00414      */
00415     for (i = 1; i < natts; i++)
00416     {
00417         for (j = 0; j < i; j++)
00418         {
00419             if (strcmp(NameStr(tupdesc->attrs[j]->attname),
00420                        NameStr(tupdesc->attrs[i]->attname)) == 0)
00421                 ereport(ERROR,
00422                         (errcode(ERRCODE_DUPLICATE_COLUMN),
00423                          errmsg("column name \"%s\" specified more than once",
00424                                 NameStr(tupdesc->attrs[j]->attname))));
00425         }
00426     }
00427 
00428     /*
00429      * next check the attribute types
00430      */
00431     for (i = 0; i < natts; i++)
00432     {
00433         CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
00434                            tupdesc->attrs[i]->atttypid,
00435                            tupdesc->attrs[i]->attcollation,
00436                            NIL, /* assume we're creating a new rowtype */
00437                            allow_system_table_mods);
00438     }
00439 }
00440 
00441 /* --------------------------------
00442  *      CheckAttributeType
00443  *
00444  *      Verify that the proposed datatype of an attribute is legal.
00445  *      This is needed mainly because there are types (and pseudo-types)
00446  *      in the catalogs that we do not support as elements of real tuples.
00447  *      We also check some other properties required of a table column.
00448  *
00449  * If the attribute is being proposed for addition to an existing table or
00450  * composite type, pass a one-element list of the rowtype OID as
00451  * containing_rowtypes.  When checking a to-be-created rowtype, it's
00452  * sufficient to pass NIL, because there could not be any recursive reference
00453  * to a not-yet-existing rowtype.
00454  * --------------------------------
00455  */
00456 void
00457 CheckAttributeType(const char *attname,
00458                    Oid atttypid, Oid attcollation,
00459                    List *containing_rowtypes,
00460                    bool allow_system_table_mods)
00461 {
00462     char        att_typtype = get_typtype(atttypid);
00463     Oid         att_typelem;
00464 
00465     if (atttypid == UNKNOWNOID)
00466     {
00467         /*
00468          * Warn user, but don't fail, if column to be created has UNKNOWN type
00469          * (usually as a result of a 'retrieve into' - jolly)
00470          */
00471         ereport(WARNING,
00472                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
00473                  errmsg("column \"%s\" has type \"unknown\"", attname),
00474                  errdetail("Proceeding with relation creation anyway.")));
00475     }
00476     else if (att_typtype == TYPTYPE_PSEUDO)
00477     {
00478         /*
00479          * Refuse any attempt to create a pseudo-type column, except for a
00480          * special hack for pg_statistic: allow ANYARRAY when modifying system
00481          * catalogs (this allows creating pg_statistic and cloning it during
00482          * VACUUM FULL)
00483          */
00484         if (atttypid != ANYARRAYOID || !allow_system_table_mods)
00485             ereport(ERROR,
00486                     (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
00487                      errmsg("column \"%s\" has pseudo-type %s",
00488                             attname, format_type_be(atttypid))));
00489     }
00490     else if (att_typtype == TYPTYPE_DOMAIN)
00491     {
00492         /*
00493          * If it's a domain, recurse to check its base type.
00494          */
00495         CheckAttributeType(attname, getBaseType(atttypid), attcollation,
00496                            containing_rowtypes,
00497                            allow_system_table_mods);
00498     }
00499     else if (att_typtype == TYPTYPE_COMPOSITE)
00500     {
00501         /*
00502          * For a composite type, recurse into its attributes.
00503          */
00504         Relation    relation;
00505         TupleDesc   tupdesc;
00506         int         i;
00507 
00508         /*
00509          * Check for self-containment.  Eventually we might be able to allow
00510          * this (just return without complaint, if so) but it's not clear how
00511          * many other places would require anti-recursion defenses before it
00512          * would be safe to allow tables to contain their own rowtype.
00513          */
00514         if (list_member_oid(containing_rowtypes, atttypid))
00515             ereport(ERROR,
00516                     (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
00517                 errmsg("composite type %s cannot be made a member of itself",
00518                        format_type_be(atttypid))));
00519 
00520         containing_rowtypes = lcons_oid(atttypid, containing_rowtypes);
00521 
00522         relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
00523 
00524         tupdesc = RelationGetDescr(relation);
00525 
00526         for (i = 0; i < tupdesc->natts; i++)
00527         {
00528             Form_pg_attribute attr = tupdesc->attrs[i];
00529 
00530             if (attr->attisdropped)
00531                 continue;
00532             CheckAttributeType(NameStr(attr->attname),
00533                                attr->atttypid, attr->attcollation,
00534                                containing_rowtypes,
00535                                allow_system_table_mods);
00536         }
00537 
00538         relation_close(relation, AccessShareLock);
00539 
00540         containing_rowtypes = list_delete_first(containing_rowtypes);
00541     }
00542     else if (OidIsValid((att_typelem = get_element_type(atttypid))))
00543     {
00544         /*
00545          * Must recurse into array types, too, in case they are composite.
00546          */
00547         CheckAttributeType(attname, att_typelem, attcollation,
00548                            containing_rowtypes,
00549                            allow_system_table_mods);
00550     }
00551 
00552     /*
00553      * This might not be strictly invalid per SQL standard, but it is pretty
00554      * useless, and it cannot be dumped, so we must disallow it.
00555      */
00556     if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
00557         ereport(ERROR,
00558                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
00559                  errmsg("no collation was derived for column \"%s\" with collatable type %s",
00560                         attname, format_type_be(atttypid)),
00561         errhint("Use the COLLATE clause to set the collation explicitly.")));
00562 }
00563 
00564 /*
00565  * InsertPgAttributeTuple
00566  *      Construct and insert a new tuple in pg_attribute.
00567  *
00568  * Caller has already opened and locked pg_attribute.  new_attribute is the
00569  * attribute to insert (but we ignore attacl and attoptions, which are always
00570  * initialized to NULL).
00571  *
00572  * indstate is the index state for CatalogIndexInsert.  It can be passed as
00573  * NULL, in which case we'll fetch the necessary info.  (Don't do this when
00574  * inserting multiple attributes, because it's a tad more expensive.)
00575  */
00576 void
00577 InsertPgAttributeTuple(Relation pg_attribute_rel,
00578                        Form_pg_attribute new_attribute,
00579                        CatalogIndexState indstate)
00580 {
00581     Datum       values[Natts_pg_attribute];
00582     bool        nulls[Natts_pg_attribute];
00583     HeapTuple   tup;
00584 
00585     /* This is a tad tedious, but way cleaner than what we used to do... */
00586     memset(values, 0, sizeof(values));
00587     memset(nulls, false, sizeof(nulls));
00588 
00589     values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
00590     values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
00591     values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
00592     values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
00593     values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
00594     values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
00595     values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
00596     values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
00597     values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
00598     values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
00599     values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
00600     values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
00601     values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
00602     values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
00603     values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
00604     values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
00605     values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
00606     values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
00607 
00608     /* start out with empty permissions and empty options */
00609     nulls[Anum_pg_attribute_attacl - 1] = true;
00610     nulls[Anum_pg_attribute_attoptions - 1] = true;
00611     nulls[Anum_pg_attribute_attfdwoptions - 1] = true;
00612 
00613     tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
00614 
00615     /* finally insert the new tuple, update the indexes, and clean up */
00616     simple_heap_insert(pg_attribute_rel, tup);
00617 
00618     if (indstate != NULL)
00619         CatalogIndexInsert(indstate, tup);
00620     else
00621         CatalogUpdateIndexes(pg_attribute_rel, tup);
00622 
00623     heap_freetuple(tup);
00624 }
00625 
00626 /* --------------------------------
00627  *      AddNewAttributeTuples
00628  *
00629  *      this registers the new relation's schema by adding
00630  *      tuples to pg_attribute.
00631  * --------------------------------
00632  */
00633 static void
00634 AddNewAttributeTuples(Oid new_rel_oid,
00635                       TupleDesc tupdesc,
00636                       char relkind,
00637                       bool oidislocal,
00638                       int oidinhcount)
00639 {
00640     Form_pg_attribute attr;
00641     int         i;
00642     Relation    rel;
00643     CatalogIndexState indstate;
00644     int         natts = tupdesc->natts;
00645     ObjectAddress myself,
00646                 referenced;
00647 
00648     /*
00649      * open pg_attribute and its indexes.
00650      */
00651     rel = heap_open(AttributeRelationId, RowExclusiveLock);
00652 
00653     indstate = CatalogOpenIndexes(rel);
00654 
00655     /*
00656      * First we add the user attributes.  This is also a convenient place to
00657      * add dependencies on their datatypes and collations.
00658      */
00659     for (i = 0; i < natts; i++)
00660     {
00661         attr = tupdesc->attrs[i];
00662         /* Fill in the correct relation OID */
00663         attr->attrelid = new_rel_oid;
00664         /* Make sure these are OK, too */
00665         attr->attstattarget = -1;
00666         attr->attcacheoff = -1;
00667 
00668         InsertPgAttributeTuple(rel, attr, indstate);
00669 
00670         /* Add dependency info */
00671         myself.classId = RelationRelationId;
00672         myself.objectId = new_rel_oid;
00673         myself.objectSubId = i + 1;
00674         referenced.classId = TypeRelationId;
00675         referenced.objectId = attr->atttypid;
00676         referenced.objectSubId = 0;
00677         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
00678 
00679         /* The default collation is pinned, so don't bother recording it */
00680         if (OidIsValid(attr->attcollation) &&
00681             attr->attcollation != DEFAULT_COLLATION_OID)
00682         {
00683             referenced.classId = CollationRelationId;
00684             referenced.objectId = attr->attcollation;
00685             referenced.objectSubId = 0;
00686             recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
00687         }
00688     }
00689 
00690     /*
00691      * Next we add the system attributes.  Skip OID if rel has no OIDs. Skip
00692      * all for a view or type relation.  We don't bother with making datatype
00693      * dependencies here, since presumably all these types are pinned.
00694      */
00695     if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
00696     {
00697         for (i = 0; i < (int) lengthof(SysAtt); i++)
00698         {
00699             FormData_pg_attribute attStruct;
00700 
00701             /* skip OID where appropriate */
00702             if (!tupdesc->tdhasoid &&
00703                 SysAtt[i]->attnum == ObjectIdAttributeNumber)
00704                 continue;
00705 
00706             memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));
00707 
00708             /* Fill in the correct relation OID in the copied tuple */
00709             attStruct.attrelid = new_rel_oid;
00710 
00711             /* Fill in correct inheritance info for the OID column */
00712             if (attStruct.attnum == ObjectIdAttributeNumber)
00713             {
00714                 attStruct.attislocal = oidislocal;
00715                 attStruct.attinhcount = oidinhcount;
00716             }
00717 
00718             InsertPgAttributeTuple(rel, &attStruct, indstate);
00719         }
00720     }
00721 
00722     /*
00723      * clean up
00724      */
00725     CatalogCloseIndexes(indstate);
00726 
00727     heap_close(rel, RowExclusiveLock);
00728 }
00729 
00730 /* --------------------------------
00731  *      InsertPgClassTuple
00732  *
00733  *      Construct and insert a new tuple in pg_class.
00734  *
00735  * Caller has already opened and locked pg_class.
00736  * Tuple data is taken from new_rel_desc->rd_rel, except for the
00737  * variable-width fields which are not present in a cached reldesc.
00738  * relacl and reloptions are passed in Datum form (to avoid having
00739  * to reference the data types in heap.h).  Pass (Datum) 0 to set them
00740  * to NULL.
00741  * --------------------------------
00742  */
00743 void
00744 InsertPgClassTuple(Relation pg_class_desc,
00745                    Relation new_rel_desc,
00746                    Oid new_rel_oid,
00747                    Datum relacl,
00748                    Datum reloptions)
00749 {
00750     Form_pg_class rd_rel = new_rel_desc->rd_rel;
00751     Datum       values[Natts_pg_class];
00752     bool        nulls[Natts_pg_class];
00753     HeapTuple   tup;
00754 
00755     /* This is a tad tedious, but way cleaner than what we used to do... */
00756     memset(values, 0, sizeof(values));
00757     memset(nulls, false, sizeof(nulls));
00758 
00759     values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
00760     values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
00761     values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
00762     values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
00763     values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
00764     values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
00765     values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
00766     values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
00767     values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
00768     values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
00769     values[Anum_pg_class_relallvisible - 1] = Int32GetDatum(rd_rel->relallvisible);
00770     values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
00771     values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
00772     values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
00773     values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
00774     values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
00775     values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
00776     values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
00777     values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
00778     values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
00779     values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
00780     values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
00781     values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
00782     values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
00783     values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
00784     values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
00785     if (relacl != (Datum) 0)
00786         values[Anum_pg_class_relacl - 1] = relacl;
00787     else
00788         nulls[Anum_pg_class_relacl - 1] = true;
00789     if (reloptions != (Datum) 0)
00790         values[Anum_pg_class_reloptions - 1] = reloptions;
00791     else
00792         nulls[Anum_pg_class_reloptions - 1] = true;
00793 
00794     tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
00795 
00796     /*
00797      * The new tuple must have the oid already chosen for the rel.  Sure would
00798      * be embarrassing to do this sort of thing in polite company.
00799      */
00800     HeapTupleSetOid(tup, new_rel_oid);
00801 
00802     /* finally insert the new tuple, update the indexes, and clean up */
00803     simple_heap_insert(pg_class_desc, tup);
00804 
00805     CatalogUpdateIndexes(pg_class_desc, tup);
00806 
00807     heap_freetuple(tup);
00808 }
00809 
00810 /* --------------------------------
00811  *      AddNewRelationTuple
00812  *
00813  *      this registers the new relation in the catalogs by
00814  *      adding a tuple to pg_class.
00815  * --------------------------------
00816  */
00817 static void
00818 AddNewRelationTuple(Relation pg_class_desc,
00819                     Relation new_rel_desc,
00820                     Oid new_rel_oid,
00821                     Oid new_type_oid,
00822                     Oid reloftype,
00823                     Oid relowner,
00824                     char relkind,
00825                     Datum relacl,
00826                     Datum reloptions)
00827 {
00828     Form_pg_class new_rel_reltup;
00829 
00830     /*
00831      * first we update some of the information in our uncataloged relation's
00832      * relation descriptor.
00833      */
00834     new_rel_reltup = new_rel_desc->rd_rel;
00835 
00836     switch (relkind)
00837     {
00838         case RELKIND_RELATION:
00839         case RELKIND_MATVIEW:
00840         case RELKIND_INDEX:
00841         case RELKIND_TOASTVALUE:
00842             /* The relation is real, but as yet empty */
00843             new_rel_reltup->relpages = 0;
00844             new_rel_reltup->reltuples = 0;
00845             new_rel_reltup->relallvisible = 0;
00846             break;
00847         case RELKIND_SEQUENCE:
00848             /* Sequences always have a known size */
00849             new_rel_reltup->relpages = 1;
00850             new_rel_reltup->reltuples = 1;
00851             new_rel_reltup->relallvisible = 0;
00852             break;
00853         default:
00854             /* Views, etc, have no disk storage */
00855             new_rel_reltup->relpages = 0;
00856             new_rel_reltup->reltuples = 0;
00857             new_rel_reltup->relallvisible = 0;
00858             break;
00859     }
00860 
00861     /* Initialize relfrozenxid and relminmxid */
00862     if (relkind == RELKIND_RELATION ||
00863         relkind == RELKIND_MATVIEW ||
00864         relkind == RELKIND_TOASTVALUE)
00865     {
00866         /*
00867          * Initialize to the minimum XID that could put tuples in the table.
00868          * We know that no xacts older than RecentXmin are still running, so
00869          * that will do.
00870          */
00871         new_rel_reltup->relfrozenxid = RecentXmin;
00872         /*
00873          * Similarly, initialize the minimum Multixact to the first value that
00874          * could possibly be stored in tuples in the table.  Running
00875          * transactions could reuse values from their local cache, so we are
00876          * careful to consider all currently running multis.
00877          *
00878          * XXX this could be refined further, but is it worth the hassle?
00879          */
00880         new_rel_reltup->relminmxid = GetOldestMultiXactId();
00881     }
00882     else
00883     {
00884         /*
00885          * Other relation types will not contain XIDs, so set relfrozenxid to
00886          * InvalidTransactionId.  (Note: a sequence does contain a tuple, but
00887          * we force its xmin to be FrozenTransactionId always; see
00888          * commands/sequence.c.)
00889          */
00890         new_rel_reltup->relfrozenxid = InvalidTransactionId;
00891         new_rel_reltup->relfrozenxid = InvalidMultiXactId;
00892     }
00893 
00894     new_rel_reltup->relowner = relowner;
00895     new_rel_reltup->reltype = new_type_oid;
00896     new_rel_reltup->reloftype = reloftype;
00897 
00898     new_rel_desc->rd_att->tdtypeid = new_type_oid;
00899 
00900     /* Now build and insert the tuple */
00901     InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
00902                        relacl, reloptions);
00903 }
00904 
00905 
00906 /* --------------------------------
00907  *      AddNewRelationType -
00908  *
00909  *      define a composite type corresponding to the new relation
00910  * --------------------------------
00911  */
00912 static Oid
00913 AddNewRelationType(const char *typeName,
00914                    Oid typeNamespace,
00915                    Oid new_rel_oid,
00916                    char new_rel_kind,
00917                    Oid ownerid,
00918                    Oid new_row_type,
00919                    Oid new_array_type)
00920 {
00921     return
00922         TypeCreate(new_row_type,    /* optional predetermined OID */
00923                    typeName,    /* type name */
00924                    typeNamespace,       /* type namespace */
00925                    new_rel_oid, /* relation oid */
00926                    new_rel_kind,    /* relation kind */
00927                    ownerid,     /* owner's ID */
00928                    -1,          /* internal size (varlena) */
00929                    TYPTYPE_COMPOSITE,   /* type-type (composite) */
00930                    TYPCATEGORY_COMPOSITE,       /* type-category (ditto) */
00931                    false,       /* composite types are never preferred */
00932                    DEFAULT_TYPDELIM,    /* default array delimiter */
00933                    F_RECORD_IN, /* input procedure */
00934                    F_RECORD_OUT,    /* output procedure */
00935                    F_RECORD_RECV,       /* receive procedure */
00936                    F_RECORD_SEND,       /* send procedure */
00937                    InvalidOid,  /* typmodin procedure - none */
00938                    InvalidOid,  /* typmodout procedure - none */
00939                    InvalidOid,  /* analyze procedure - default */
00940                    InvalidOid,  /* array element type - irrelevant */
00941                    false,       /* this is not an array type */
00942                    new_array_type,      /* array type if any */
00943                    InvalidOid,  /* domain base type - irrelevant */
00944                    NULL,        /* default value - none */
00945                    NULL,        /* default binary representation */
00946                    false,       /* passed by reference */
00947                    'd',         /* alignment - must be the largest! */
00948                    'x',         /* fully TOASTable */
00949                    -1,          /* typmod */
00950                    0,           /* array dimensions for typBaseType */
00951                    false,       /* Type NOT NULL */
00952                    InvalidOid); /* rowtypes never have a collation */
00953 }
00954 
00955 /* --------------------------------
00956  *      heap_create_with_catalog
00957  *
00958  *      creates a new cataloged relation.  see comments above.
00959  *
00960  * Arguments:
00961  *  relname: name to give to new rel
00962  *  relnamespace: OID of namespace it goes in
00963  *  reltablespace: OID of tablespace it goes in
00964  *  relid: OID to assign to new rel, or InvalidOid to select a new OID
00965  *  reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
00966  *  reloftypeid: if a typed table, OID of underlying type; else InvalidOid
00967  *  ownerid: OID of new rel's owner
00968  *  tupdesc: tuple descriptor (source of column definitions)
00969  *  cooked_constraints: list of precooked check constraints and defaults
00970  *  relkind: relkind for new rel
00971  *  relpersistence: rel's persistence status (permanent, temp, or unlogged)
00972  *  shared_relation: TRUE if it's to be a shared relation
00973  *  mapped_relation: TRUE if the relation will use the relfilenode map
00974  *  oidislocal: TRUE if oid column (if any) should be marked attislocal
00975  *  oidinhcount: attinhcount to assign to oid column (if any)
00976  *  oncommit: ON COMMIT marking (only relevant if it's a temp table)
00977  *  reloptions: reloptions in Datum form, or (Datum) 0 if none
00978  *  use_user_acl: TRUE if should look for user-defined default permissions;
00979  *      if FALSE, relacl is always set NULL
00980  *  allow_system_table_mods: TRUE to allow creation in system namespaces
00981  *
00982  * Returns the OID of the new relation
00983  * --------------------------------
00984  */
00985 Oid
00986 heap_create_with_catalog(const char *relname,
00987                          Oid relnamespace,
00988                          Oid reltablespace,
00989                          Oid relid,
00990                          Oid reltypeid,
00991                          Oid reloftypeid,
00992                          Oid ownerid,
00993                          TupleDesc tupdesc,
00994                          List *cooked_constraints,
00995                          char relkind,
00996                          char relpersistence,
00997                          bool shared_relation,
00998                          bool mapped_relation,
00999                          bool oidislocal,
01000                          int oidinhcount,
01001                          OnCommitAction oncommit,
01002                          Datum reloptions,
01003                          bool use_user_acl,
01004                          bool allow_system_table_mods,
01005                          bool is_internal)
01006 {
01007     Relation    pg_class_desc;
01008     Relation    new_rel_desc;
01009     Acl        *relacl;
01010     Oid         existing_relid;
01011     Oid         old_type_oid;
01012     Oid         new_type_oid;
01013     Oid         new_array_oid = InvalidOid;
01014 
01015     pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
01016 
01017     /*
01018      * sanity checks
01019      */
01020     Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
01021 
01022     CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
01023 
01024     /*
01025      * This would fail later on anyway, if the relation already exists.  But
01026      * by catching it here we can emit a nicer error message.
01027      */
01028     existing_relid = get_relname_relid(relname, relnamespace);
01029     if (existing_relid != InvalidOid)
01030         ereport(ERROR,
01031                 (errcode(ERRCODE_DUPLICATE_TABLE),
01032                  errmsg("relation \"%s\" already exists", relname)));
01033 
01034     /*
01035      * Since we are going to create a rowtype as well, also check for
01036      * collision with an existing type name.  If there is one and it's an
01037      * autogenerated array, we can rename it out of the way; otherwise we can
01038      * at least give a good error message.
01039      */
01040     old_type_oid = GetSysCacheOid2(TYPENAMENSP,
01041                                    CStringGetDatum(relname),
01042                                    ObjectIdGetDatum(relnamespace));
01043     if (OidIsValid(old_type_oid))
01044     {
01045         if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
01046             ereport(ERROR,
01047                     (errcode(ERRCODE_DUPLICATE_OBJECT),
01048                      errmsg("type \"%s\" already exists", relname),
01049                errhint("A relation has an associated type of the same name, "
01050                        "so you must use a name that doesn't conflict "
01051                        "with any existing type.")));
01052     }
01053 
01054     /*
01055      * Shared relations must be in pg_global (last-ditch check)
01056      */
01057     if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
01058         elog(ERROR, "shared relations must be placed in pg_global tablespace");
01059 
01060     /*
01061      * Allocate an OID for the relation, unless we were told what to use.
01062      *
01063      * The OID will be the relfilenode as well, so make sure it doesn't
01064      * collide with either pg_class OIDs or existing physical files.
01065      */
01066     if (!OidIsValid(relid))
01067     {
01068         /*
01069          * Use binary-upgrade override for pg_class.oid/relfilenode, if
01070          * supplied.
01071          */
01072         if (IsBinaryUpgrade &&
01073             OidIsValid(binary_upgrade_next_heap_pg_class_oid) &&
01074             (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
01075              relkind == RELKIND_VIEW || relkind == RELKIND_MATVIEW ||
01076              relkind == RELKIND_COMPOSITE_TYPE || relkind == RELKIND_FOREIGN_TABLE))
01077         {
01078             relid = binary_upgrade_next_heap_pg_class_oid;
01079             binary_upgrade_next_heap_pg_class_oid = InvalidOid;
01080         }
01081         else if (IsBinaryUpgrade &&
01082                  OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
01083                  relkind == RELKIND_TOASTVALUE)
01084         {
01085             relid = binary_upgrade_next_toast_pg_class_oid;
01086             binary_upgrade_next_toast_pg_class_oid = InvalidOid;
01087         }
01088         else
01089             relid = GetNewRelFileNode(reltablespace, pg_class_desc,
01090                                       relpersistence);
01091     }
01092 
01093     /*
01094      * Determine the relation's initial permissions.
01095      */
01096     if (use_user_acl)
01097     {
01098         switch (relkind)
01099         {
01100             case RELKIND_RELATION:
01101             case RELKIND_VIEW:
01102             case RELKIND_MATVIEW:
01103             case RELKIND_FOREIGN_TABLE:
01104                 relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
01105                                               relnamespace);
01106                 break;
01107             case RELKIND_SEQUENCE:
01108                 relacl = get_user_default_acl(ACL_OBJECT_SEQUENCE, ownerid,
01109                                               relnamespace);
01110                 break;
01111             default:
01112                 relacl = NULL;
01113                 break;
01114         }
01115     }
01116     else
01117         relacl = NULL;
01118 
01119     /*
01120      * Create the relcache entry (mostly dummy at this point) and the physical
01121      * disk file.  (If we fail further down, it's the smgr's responsibility to
01122      * remove the disk file again.)
01123      */
01124     new_rel_desc = heap_create(relname,
01125                                relnamespace,
01126                                reltablespace,
01127                                relid,
01128                                InvalidOid,
01129                                tupdesc,
01130                                relkind,
01131                                relpersistence,
01132                                shared_relation,
01133                                mapped_relation);
01134 
01135     Assert(relid == RelationGetRelid(new_rel_desc));
01136 
01137     /*
01138      * Decide whether to create an array type over the relation's rowtype. We
01139      * do not create any array types for system catalogs (ie, those made
01140      * during initdb).  We create array types for regular relations, views,
01141      * composite types and foreign tables ... but not, eg, for toast tables or
01142      * sequences.
01143      */
01144     if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
01145                               relkind == RELKIND_VIEW ||
01146                               relkind == RELKIND_MATVIEW ||
01147                               relkind == RELKIND_FOREIGN_TABLE ||
01148                               relkind == RELKIND_COMPOSITE_TYPE))
01149         new_array_oid = AssignTypeArrayOid();
01150 
01151     /*
01152      * Since defining a relation also defines a complex type, we add a new
01153      * system type corresponding to the new relation.  The OID of the type can
01154      * be preselected by the caller, but if reltypeid is InvalidOid, we'll
01155      * generate a new OID for it.
01156      *
01157      * NOTE: we could get a unique-index failure here, in case someone else is
01158      * creating the same type name in parallel but hadn't committed yet when
01159      * we checked for a duplicate name above.
01160      */
01161     new_type_oid = AddNewRelationType(relname,
01162                                       relnamespace,
01163                                       relid,
01164                                       relkind,
01165                                       ownerid,
01166                                       reltypeid,
01167                                       new_array_oid);
01168 
01169     /*
01170      * Now make the array type if wanted.
01171      */
01172     if (OidIsValid(new_array_oid))
01173     {
01174         char       *relarrayname;
01175 
01176         relarrayname = makeArrayTypeName(relname, relnamespace);
01177 
01178         TypeCreate(new_array_oid,       /* force the type's OID to this */
01179                    relarrayname,    /* Array type name */
01180                    relnamespace,    /* Same namespace as parent */
01181                    InvalidOid,  /* Not composite, no relationOid */
01182                    0,           /* relkind, also N/A here */
01183                    ownerid,     /* owner's ID */
01184                    -1,          /* Internal size (varlena) */
01185                    TYPTYPE_BASE,    /* Not composite - typelem is */
01186                    TYPCATEGORY_ARRAY,   /* type-category (array) */
01187                    false,       /* array types are never preferred */
01188                    DEFAULT_TYPDELIM,    /* default array delimiter */
01189                    F_ARRAY_IN,  /* array input proc */
01190                    F_ARRAY_OUT, /* array output proc */
01191                    F_ARRAY_RECV,    /* array recv (bin) proc */
01192                    F_ARRAY_SEND,    /* array send (bin) proc */
01193                    InvalidOid,  /* typmodin procedure - none */
01194                    InvalidOid,  /* typmodout procedure - none */
01195                    F_ARRAY_TYPANALYZE,  /* array analyze procedure */
01196                    new_type_oid,    /* array element type - the rowtype */
01197                    true,        /* yes, this is an array type */
01198                    InvalidOid,  /* this has no array type */
01199                    InvalidOid,  /* domain base type - irrelevant */
01200                    NULL,        /* default value - none */
01201                    NULL,        /* default binary representation */
01202                    false,       /* passed by reference */
01203                    'd',         /* alignment - must be the largest! */
01204                    'x',         /* fully TOASTable */
01205                    -1,          /* typmod */
01206                    0,           /* array dimensions for typBaseType */
01207                    false,       /* Type NOT NULL */
01208                    InvalidOid); /* rowtypes never have a collation */
01209 
01210         pfree(relarrayname);
01211     }
01212 
01213     /*
01214      * now create an entry in pg_class for the relation.
01215      *
01216      * NOTE: we could get a unique-index failure here, in case someone else is
01217      * creating the same relation name in parallel but hadn't committed yet
01218      * when we checked for a duplicate name above.
01219      */
01220     AddNewRelationTuple(pg_class_desc,
01221                         new_rel_desc,
01222                         relid,
01223                         new_type_oid,
01224                         reloftypeid,
01225                         ownerid,
01226                         relkind,
01227                         PointerGetDatum(relacl),
01228                         reloptions);
01229 
01230     /*
01231      * now add tuples to pg_attribute for the attributes in our new relation.
01232      */
01233     AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind,
01234                           oidislocal, oidinhcount);
01235 
01236     /*
01237      * Make a dependency link to force the relation to be deleted if its
01238      * namespace is.  Also make a dependency link to its owner, as well as
01239      * dependencies for any roles mentioned in the default ACL.
01240      *
01241      * For composite types, these dependencies are tracked for the pg_type
01242      * entry, so we needn't record them here.  Likewise, TOAST tables don't
01243      * need a namespace dependency (they live in a pinned namespace) nor an
01244      * owner dependency (they depend indirectly through the parent table), nor
01245      * should they have any ACL entries.  The same applies for extension
01246      * dependencies.
01247      *
01248      * If it's a temp table, we do not make it an extension member; this
01249      * prevents the unintuitive result that deletion of the temp table at
01250      * session end would make the whole extension go away.
01251      *
01252      * Also, skip this in bootstrap mode, since we don't make dependencies
01253      * while bootstrapping.
01254      */
01255     if (relkind != RELKIND_COMPOSITE_TYPE &&
01256         relkind != RELKIND_TOASTVALUE &&
01257         !IsBootstrapProcessingMode())
01258     {
01259         ObjectAddress myself,
01260                     referenced;
01261 
01262         myself.classId = RelationRelationId;
01263         myself.objectId = relid;
01264         myself.objectSubId = 0;
01265         referenced.classId = NamespaceRelationId;
01266         referenced.objectId = relnamespace;
01267         referenced.objectSubId = 0;
01268         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
01269 
01270         recordDependencyOnOwner(RelationRelationId, relid, ownerid);
01271 
01272         if (relpersistence != RELPERSISTENCE_TEMP)
01273             recordDependencyOnCurrentExtension(&myself, false);
01274 
01275         if (reloftypeid)
01276         {
01277             referenced.classId = TypeRelationId;
01278             referenced.objectId = reloftypeid;
01279             referenced.objectSubId = 0;
01280             recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
01281         }
01282 
01283         if (relacl != NULL)
01284         {
01285             int         nnewmembers;
01286             Oid        *newmembers;
01287 
01288             nnewmembers = aclmembers(relacl, &newmembers);
01289             updateAclDependencies(RelationRelationId, relid, 0,
01290                                   ownerid,
01291                                   0, NULL,
01292                                   nnewmembers, newmembers);
01293         }
01294     }
01295 
01296     /* Post creation hook for new relation */
01297     InvokeObjectPostCreateHookArg(RelationRelationId, relid, 0, is_internal);
01298 
01299     /*
01300      * Store any supplied constraints and defaults.
01301      *
01302      * NB: this may do a CommandCounterIncrement and rebuild the relcache
01303      * entry, so the relation must be valid and self-consistent at this point.
01304      * In particular, there are not yet constraints and defaults anywhere.
01305      */
01306     StoreConstraints(new_rel_desc, cooked_constraints, is_internal);
01307 
01308     /*
01309      * If there's a special on-commit action, remember it
01310      */
01311     if (oncommit != ONCOMMIT_NOOP)
01312         register_on_commit_action(relid, oncommit);
01313 
01314     if (relpersistence == RELPERSISTENCE_UNLOGGED)
01315     {
01316         Assert(relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW ||
01317                relkind == RELKIND_TOASTVALUE);
01318         heap_create_init_fork(new_rel_desc);
01319     }
01320 
01321     /*
01322      * ok, the relation has been cataloged, so close our relations and return
01323      * the OID of the newly created relation.
01324      */
01325     heap_close(new_rel_desc, NoLock);   /* do not unlock till end of xact */
01326     heap_close(pg_class_desc, RowExclusiveLock);
01327 
01328     return relid;
01329 }
01330 
01331 /*
01332  * Set up an init fork for an unlogged table so that it can be correctly
01333  * reinitialized on restart.  Since we're going to do an immediate sync, we
01334  * only need to xlog this if archiving or streaming is enabled.  And the
01335  * immediate sync is required, because otherwise there's no guarantee that
01336  * this will hit the disk before the next checkpoint moves the redo pointer.
01337  */
01338 void
01339 heap_create_init_fork(Relation rel)
01340 {
01341     RelationOpenSmgr(rel);
01342     smgrcreate(rel->rd_smgr, INIT_FORKNUM, false);
01343     if (XLogIsNeeded())
01344         log_smgrcreate(&rel->rd_smgr->smgr_rnode.node, INIT_FORKNUM);
01345     smgrimmedsync(rel->rd_smgr, INIT_FORKNUM);
01346 }
01347 
01348 /*
01349  * Check whether a materialized view is in an initial, unloaded state.
01350  *
01351  * The check here must match what is set up in heap_create_init_fork().
01352  * Currently the init fork is an empty file.  A missing heap is also
01353  * considered to be unloaded.
01354  */
01355 bool
01356 heap_is_matview_init_state(Relation rel)
01357 {
01358     Assert(rel->rd_rel->relkind == RELKIND_MATVIEW);
01359 
01360     RelationOpenSmgr(rel);
01361 
01362     if (!smgrexists(rel->rd_smgr, MAIN_FORKNUM))
01363         return true;
01364 
01365     return (smgrnblocks(rel->rd_smgr, MAIN_FORKNUM) < 1);
01366 }
01367 
01368 /*
01369  *      RelationRemoveInheritance
01370  *
01371  * Formerly, this routine checked for child relations and aborted the
01372  * deletion if any were found.  Now we rely on the dependency mechanism
01373  * to check for or delete child relations.  By the time we get here,
01374  * there are no children and we need only remove any pg_inherits rows
01375  * linking this relation to its parent(s).
01376  */
01377 static void
01378 RelationRemoveInheritance(Oid relid)
01379 {
01380     Relation    catalogRelation;
01381     SysScanDesc scan;
01382     ScanKeyData key;
01383     HeapTuple   tuple;
01384 
01385     catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
01386 
01387     ScanKeyInit(&key,
01388                 Anum_pg_inherits_inhrelid,
01389                 BTEqualStrategyNumber, F_OIDEQ,
01390                 ObjectIdGetDatum(relid));
01391 
01392     scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
01393                               SnapshotNow, 1, &key);
01394 
01395     while (HeapTupleIsValid(tuple = systable_getnext(scan)))
01396         simple_heap_delete(catalogRelation, &tuple->t_self);
01397 
01398     systable_endscan(scan);
01399     heap_close(catalogRelation, RowExclusiveLock);
01400 }
01401 
01402 /*
01403  *      DeleteRelationTuple
01404  *
01405  * Remove pg_class row for the given relid.
01406  *
01407  * Note: this is shared by relation deletion and index deletion.  It's
01408  * not intended for use anyplace else.
01409  */
01410 void
01411 DeleteRelationTuple(Oid relid)
01412 {
01413     Relation    pg_class_desc;
01414     HeapTuple   tup;
01415 
01416     /* Grab an appropriate lock on the pg_class relation */
01417     pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
01418 
01419     tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
01420     if (!HeapTupleIsValid(tup))
01421         elog(ERROR, "cache lookup failed for relation %u", relid);
01422 
01423     /* delete the relation tuple from pg_class, and finish up */
01424     simple_heap_delete(pg_class_desc, &tup->t_self);
01425 
01426     ReleaseSysCache(tup);
01427 
01428     heap_close(pg_class_desc, RowExclusiveLock);
01429 }
01430 
01431 /*
01432  *      DeleteAttributeTuples
01433  *
01434  * Remove pg_attribute rows for the given relid.
01435  *
01436  * Note: this is shared by relation deletion and index deletion.  It's
01437  * not intended for use anyplace else.
01438  */
01439 void
01440 DeleteAttributeTuples(Oid relid)
01441 {
01442     Relation    attrel;
01443     SysScanDesc scan;
01444     ScanKeyData key[1];
01445     HeapTuple   atttup;
01446 
01447     /* Grab an appropriate lock on the pg_attribute relation */
01448     attrel = heap_open(AttributeRelationId, RowExclusiveLock);
01449 
01450     /* Use the index to scan only attributes of the target relation */
01451     ScanKeyInit(&key[0],
01452                 Anum_pg_attribute_attrelid,
01453                 BTEqualStrategyNumber, F_OIDEQ,
01454                 ObjectIdGetDatum(relid));
01455 
01456     scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
01457                               SnapshotNow, 1, key);
01458 
01459     /* Delete all the matching tuples */
01460     while ((atttup = systable_getnext(scan)) != NULL)
01461         simple_heap_delete(attrel, &atttup->t_self);
01462 
01463     /* Clean up after the scan */
01464     systable_endscan(scan);
01465     heap_close(attrel, RowExclusiveLock);
01466 }
01467 
01468 /*
01469  *      DeleteSystemAttributeTuples
01470  *
01471  * Remove pg_attribute rows for system columns of the given relid.
01472  *
01473  * Note: this is only used when converting a table to a view.  Views don't
01474  * have system columns, so we should remove them from pg_attribute.
01475  */
01476 void
01477 DeleteSystemAttributeTuples(Oid relid)
01478 {
01479     Relation    attrel;
01480     SysScanDesc scan;
01481     ScanKeyData key[2];
01482     HeapTuple   atttup;
01483 
01484     /* Grab an appropriate lock on the pg_attribute relation */
01485     attrel = heap_open(AttributeRelationId, RowExclusiveLock);
01486 
01487     /* Use the index to scan only system attributes of the target relation */
01488     ScanKeyInit(&key[0],
01489                 Anum_pg_attribute_attrelid,
01490                 BTEqualStrategyNumber, F_OIDEQ,
01491                 ObjectIdGetDatum(relid));
01492     ScanKeyInit(&key[1],
01493                 Anum_pg_attribute_attnum,
01494                 BTLessEqualStrategyNumber, F_INT2LE,
01495                 Int16GetDatum(0));
01496 
01497     scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
01498                               SnapshotNow, 2, key);
01499 
01500     /* Delete all the matching tuples */
01501     while ((atttup = systable_getnext(scan)) != NULL)
01502         simple_heap_delete(attrel, &atttup->t_self);
01503 
01504     /* Clean up after the scan */
01505     systable_endscan(scan);
01506     heap_close(attrel, RowExclusiveLock);
01507 }
01508 
01509 /*
01510  *      RemoveAttributeById
01511  *
01512  * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
01513  * deleted in pg_attribute.  We also remove pg_statistic entries for it.
01514  * (Everything else needed, such as getting rid of any pg_attrdef entry,
01515  * is handled by dependency.c.)
01516  */
01517 void
01518 RemoveAttributeById(Oid relid, AttrNumber attnum)
01519 {
01520     Relation    rel;
01521     Relation    attr_rel;
01522     HeapTuple   tuple;
01523     Form_pg_attribute attStruct;
01524     char        newattname[NAMEDATALEN];
01525 
01526     /*
01527      * Grab an exclusive lock on the target table, which we will NOT release
01528      * until end of transaction.  (In the simple case where we are directly
01529      * dropping this column, AlterTableDropColumn already did this ... but
01530      * when cascading from a drop of some other object, we may not have any
01531      * lock.)
01532      */
01533     rel = relation_open(relid, AccessExclusiveLock);
01534 
01535     attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
01536 
01537     tuple = SearchSysCacheCopy2(ATTNUM,
01538                                 ObjectIdGetDatum(relid),
01539                                 Int16GetDatum(attnum));
01540     if (!HeapTupleIsValid(tuple))       /* shouldn't happen */
01541         elog(ERROR, "cache lookup failed for attribute %d of relation %u",
01542              attnum, relid);
01543     attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
01544 
01545     if (attnum < 0)
01546     {
01547         /* System attribute (probably OID) ... just delete the row */
01548 
01549         simple_heap_delete(attr_rel, &tuple->t_self);
01550     }
01551     else
01552     {
01553         /* Dropping user attributes is lots harder */
01554 
01555         /* Mark the attribute as dropped */
01556         attStruct->attisdropped = true;
01557 
01558         /*
01559          * Set the type OID to invalid.  A dropped attribute's type link
01560          * cannot be relied on (once the attribute is dropped, the type might
01561          * be too). Fortunately we do not need the type row --- the only
01562          * really essential information is the type's typlen and typalign,
01563          * which are preserved in the attribute's attlen and attalign.  We set
01564          * atttypid to zero here as a means of catching code that incorrectly
01565          * expects it to be valid.
01566          */
01567         attStruct->atttypid = InvalidOid;
01568 
01569         /* Remove any NOT NULL constraint the column may have */
01570         attStruct->attnotnull = false;
01571 
01572         /* We don't want to keep stats for it anymore */
01573         attStruct->attstattarget = 0;
01574 
01575         /*
01576          * Change the column name to something that isn't likely to conflict
01577          */
01578         snprintf(newattname, sizeof(newattname),
01579                  "........pg.dropped.%d........", attnum);
01580         namestrcpy(&(attStruct->attname), newattname);
01581 
01582         simple_heap_update(attr_rel, &tuple->t_self, tuple);
01583 
01584         /* keep the system catalog indexes current */
01585         CatalogUpdateIndexes(attr_rel, tuple);
01586     }
01587 
01588     /*
01589      * Because updating the pg_attribute row will trigger a relcache flush for
01590      * the target relation, we need not do anything else to notify other
01591      * backends of the change.
01592      */
01593 
01594     heap_close(attr_rel, RowExclusiveLock);
01595 
01596     if (attnum > 0)
01597         RemoveStatistics(relid, attnum);
01598 
01599     relation_close(rel, NoLock);
01600 }
01601 
01602 /*
01603  *      RemoveAttrDefault
01604  *
01605  * If the specified relation/attribute has a default, remove it.
01606  * (If no default, raise error if complain is true, else return quietly.)
01607  */
01608 void
01609 RemoveAttrDefault(Oid relid, AttrNumber attnum,
01610                   DropBehavior behavior, bool complain, bool internal)
01611 {
01612     Relation    attrdef_rel;
01613     ScanKeyData scankeys[2];
01614     SysScanDesc scan;
01615     HeapTuple   tuple;
01616     bool        found = false;
01617 
01618     attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
01619 
01620     ScanKeyInit(&scankeys[0],
01621                 Anum_pg_attrdef_adrelid,
01622                 BTEqualStrategyNumber, F_OIDEQ,
01623                 ObjectIdGetDatum(relid));
01624     ScanKeyInit(&scankeys[1],
01625                 Anum_pg_attrdef_adnum,
01626                 BTEqualStrategyNumber, F_INT2EQ,
01627                 Int16GetDatum(attnum));
01628 
01629     scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
01630                               SnapshotNow, 2, scankeys);
01631 
01632     /* There should be at most one matching tuple, but we loop anyway */
01633     while (HeapTupleIsValid(tuple = systable_getnext(scan)))
01634     {
01635         ObjectAddress object;
01636 
01637         object.classId = AttrDefaultRelationId;
01638         object.objectId = HeapTupleGetOid(tuple);
01639         object.objectSubId = 0;
01640 
01641         performDeletion(&object, behavior,
01642                         internal ? PERFORM_DELETION_INTERNAL : 0);
01643 
01644         found = true;
01645     }
01646 
01647     systable_endscan(scan);
01648     heap_close(attrdef_rel, RowExclusiveLock);
01649 
01650     if (complain && !found)
01651         elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
01652              relid, attnum);
01653 }
01654 
01655 /*
01656  *      RemoveAttrDefaultById
01657  *
01658  * Remove a pg_attrdef entry specified by OID.  This is the guts of
01659  * attribute-default removal.  Note it should be called via performDeletion,
01660  * not directly.
01661  */
01662 void
01663 RemoveAttrDefaultById(Oid attrdefId)
01664 {
01665     Relation    attrdef_rel;
01666     Relation    attr_rel;
01667     Relation    myrel;
01668     ScanKeyData scankeys[1];
01669     SysScanDesc scan;
01670     HeapTuple   tuple;
01671     Oid         myrelid;
01672     AttrNumber  myattnum;
01673 
01674     /* Grab an appropriate lock on the pg_attrdef relation */
01675     attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
01676 
01677     /* Find the pg_attrdef tuple */
01678     ScanKeyInit(&scankeys[0],
01679                 ObjectIdAttributeNumber,
01680                 BTEqualStrategyNumber, F_OIDEQ,
01681                 ObjectIdGetDatum(attrdefId));
01682 
01683     scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
01684                               SnapshotNow, 1, scankeys);
01685 
01686     tuple = systable_getnext(scan);
01687     if (!HeapTupleIsValid(tuple))
01688         elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
01689 
01690     myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
01691     myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
01692 
01693     /* Get an exclusive lock on the relation owning the attribute */
01694     myrel = relation_open(myrelid, AccessExclusiveLock);
01695 
01696     /* Now we can delete the pg_attrdef row */
01697     simple_heap_delete(attrdef_rel, &tuple->t_self);
01698 
01699     systable_endscan(scan);
01700     heap_close(attrdef_rel, RowExclusiveLock);
01701 
01702     /* Fix the pg_attribute row */
01703     attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
01704 
01705     tuple = SearchSysCacheCopy2(ATTNUM,
01706                                 ObjectIdGetDatum(myrelid),
01707                                 Int16GetDatum(myattnum));
01708     if (!HeapTupleIsValid(tuple))       /* shouldn't happen */
01709         elog(ERROR, "cache lookup failed for attribute %d of relation %u",
01710              myattnum, myrelid);
01711 
01712     ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
01713 
01714     simple_heap_update(attr_rel, &tuple->t_self, tuple);
01715 
01716     /* keep the system catalog indexes current */
01717     CatalogUpdateIndexes(attr_rel, tuple);
01718 
01719     /*
01720      * Our update of the pg_attribute row will force a relcache rebuild, so
01721      * there's nothing else to do here.
01722      */
01723     heap_close(attr_rel, RowExclusiveLock);
01724 
01725     /* Keep lock on attribute's rel until end of xact */
01726     relation_close(myrel, NoLock);
01727 }
01728 
01729 /*
01730  * heap_drop_with_catalog   - removes specified relation from catalogs
01731  *
01732  * Note that this routine is not responsible for dropping objects that are
01733  * linked to the pg_class entry via dependencies (for example, indexes and
01734  * constraints).  Those are deleted by the dependency-tracing logic in
01735  * dependency.c before control gets here.  In general, therefore, this routine
01736  * should never be called directly; go through performDeletion() instead.
01737  */
01738 void
01739 heap_drop_with_catalog(Oid relid)
01740 {
01741     Relation    rel;
01742 
01743     /*
01744      * Open and lock the relation.
01745      */
01746     rel = relation_open(relid, AccessExclusiveLock);
01747 
01748     /*
01749      * There can no longer be anyone *else* touching the relation, but we
01750      * might still have open queries or cursors, or pending trigger events, in
01751      * our own session.
01752      */
01753     CheckTableNotInUse(rel, "DROP TABLE");
01754 
01755     /*
01756      * This effectively deletes all rows in the table, and may be done in a
01757      * serializable transaction.  In that case we must record a rw-conflict in
01758      * to this transaction from each transaction holding a predicate lock on
01759      * the table.
01760      */
01761     CheckTableForSerializableConflictIn(rel);
01762 
01763     /*
01764      * Delete pg_foreign_table tuple first.
01765      */
01766     if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
01767     {
01768         Relation    rel;
01769         HeapTuple   tuple;
01770 
01771         rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
01772 
01773         tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
01774         if (!HeapTupleIsValid(tuple))
01775             elog(ERROR, "cache lookup failed for foreign table %u", relid);
01776 
01777         simple_heap_delete(rel, &tuple->t_self);
01778 
01779         ReleaseSysCache(tuple);
01780         heap_close(rel, RowExclusiveLock);
01781     }
01782 
01783     /*
01784      * Schedule unlinking of the relation's physical files at commit.
01785      */
01786     if (rel->rd_rel->relkind != RELKIND_VIEW &&
01787         rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
01788         rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
01789     {
01790         RelationDropStorage(rel);
01791     }
01792 
01793     /*
01794      * Close relcache entry, but *keep* AccessExclusiveLock on the relation
01795      * until transaction commit.  This ensures no one else will try to do
01796      * something with the doomed relation.
01797      */
01798     relation_close(rel, NoLock);
01799 
01800     /*
01801      * Forget any ON COMMIT action for the rel
01802      */
01803     remove_on_commit_action(relid);
01804 
01805     /*
01806      * Flush the relation from the relcache.  We want to do this before
01807      * starting to remove catalog entries, just to be certain that no relcache
01808      * entry rebuild will happen partway through.  (That should not really
01809      * matter, since we don't do CommandCounterIncrement here, but let's be
01810      * safe.)
01811      */
01812     RelationForgetRelation(relid);
01813 
01814     /*
01815      * remove inheritance information
01816      */
01817     RelationRemoveInheritance(relid);
01818 
01819     /*
01820      * delete statistics
01821      */
01822     RemoveStatistics(relid, 0);
01823 
01824     /*
01825      * delete attribute tuples
01826      */
01827     DeleteAttributeTuples(relid);
01828 
01829     /*
01830      * delete relation tuple
01831      */
01832     DeleteRelationTuple(relid);
01833 }
01834 
01835 
01836 /*
01837  * Store a default expression for column attnum of relation rel.
01838  */
01839 void
01840 StoreAttrDefault(Relation rel, AttrNumber attnum,
01841                  Node *expr, bool is_internal)
01842 {
01843     char       *adbin;
01844     char       *adsrc;
01845     Relation    adrel;
01846     HeapTuple   tuple;
01847     Datum       values[4];
01848     static bool nulls[4] = {false, false, false, false};
01849     Relation    attrrel;
01850     HeapTuple   atttup;
01851     Form_pg_attribute attStruct;
01852     Oid         attrdefOid;
01853     ObjectAddress colobject,
01854                 defobject;
01855 
01856     /*
01857      * Flatten expression to string form for storage.
01858      */
01859     adbin = nodeToString(expr);
01860 
01861     /*
01862      * Also deparse it to form the mostly-obsolete adsrc field.
01863      */
01864     adsrc = deparse_expression(expr,
01865                             deparse_context_for(RelationGetRelationName(rel),
01866                                                 RelationGetRelid(rel)),
01867                                false, false);
01868 
01869     /*
01870      * Make the pg_attrdef entry.
01871      */
01872     values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
01873     values[Anum_pg_attrdef_adnum - 1] = attnum;
01874     values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
01875     values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);
01876 
01877     adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
01878 
01879     tuple = heap_form_tuple(adrel->rd_att, values, nulls);
01880     attrdefOid = simple_heap_insert(adrel, tuple);
01881 
01882     CatalogUpdateIndexes(adrel, tuple);
01883 
01884     defobject.classId = AttrDefaultRelationId;
01885     defobject.objectId = attrdefOid;
01886     defobject.objectSubId = 0;
01887 
01888     heap_close(adrel, RowExclusiveLock);
01889 
01890     /* now can free some of the stuff allocated above */
01891     pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
01892     pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
01893     heap_freetuple(tuple);
01894     pfree(adbin);
01895     pfree(adsrc);
01896 
01897     /*
01898      * Update the pg_attribute entry for the column to show that a default
01899      * exists.
01900      */
01901     attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
01902     atttup = SearchSysCacheCopy2(ATTNUM,
01903                                  ObjectIdGetDatum(RelationGetRelid(rel)),
01904                                  Int16GetDatum(attnum));
01905     if (!HeapTupleIsValid(atttup))
01906         elog(ERROR, "cache lookup failed for attribute %d of relation %u",
01907              attnum, RelationGetRelid(rel));
01908     attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
01909     if (!attStruct->atthasdef)
01910     {
01911         attStruct->atthasdef = true;
01912         simple_heap_update(attrrel, &atttup->t_self, atttup);
01913         /* keep catalog indexes current */
01914         CatalogUpdateIndexes(attrrel, atttup);
01915     }
01916     heap_close(attrrel, RowExclusiveLock);
01917     heap_freetuple(atttup);
01918 
01919     /*
01920      * Make a dependency so that the pg_attrdef entry goes away if the column
01921      * (or whole table) is deleted.
01922      */
01923     colobject.classId = RelationRelationId;
01924     colobject.objectId = RelationGetRelid(rel);
01925     colobject.objectSubId = attnum;
01926 
01927     recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
01928 
01929     /*
01930      * Record dependencies on objects used in the expression, too.
01931      */
01932     recordDependencyOnExpr(&defobject, expr, NIL, DEPENDENCY_NORMAL);
01933 
01934     /*
01935      * Post creation hook for attribute defaults.
01936      *
01937      * XXX. ALTER TABLE ALTER COLUMN SET/DROP DEFAULT is implemented
01938      * with a couple of deletion/creation of the attribute's default entry,
01939      * so the callee should check existence of an older version of this
01940      * entry if it needs to distinguish.
01941      */
01942     InvokeObjectPostCreateHookArg(AttrDefaultRelationId,
01943                                   RelationGetRelid(rel), attnum, is_internal);
01944 }
01945 
01946 /*
01947  * Store a check-constraint expression for the given relation.
01948  *
01949  * Caller is responsible for updating the count of constraints
01950  * in the pg_class entry for the relation.
01951  */
01952 static void
01953 StoreRelCheck(Relation rel, char *ccname, Node *expr,
01954               bool is_validated, bool is_local, int inhcount,
01955               bool is_no_inherit, bool is_internal)
01956 {
01957     char       *ccbin;
01958     char       *ccsrc;
01959     List       *varList;
01960     int         keycount;
01961     int16      *attNos;
01962 
01963     /*
01964      * Flatten expression to string form for storage.
01965      */
01966     ccbin = nodeToString(expr);
01967 
01968     /*
01969      * Also deparse it to form the mostly-obsolete consrc field.
01970      */
01971     ccsrc = deparse_expression(expr,
01972                             deparse_context_for(RelationGetRelationName(rel),
01973                                                 RelationGetRelid(rel)),
01974                                false, false);
01975 
01976     /*
01977      * Find columns of rel that are used in expr
01978      *
01979      * NB: pull_var_clause is okay here only because we don't allow subselects
01980      * in check constraints; it would fail to examine the contents of
01981      * subselects.
01982      */
01983     varList = pull_var_clause(expr,
01984                               PVC_REJECT_AGGREGATES,
01985                               PVC_REJECT_PLACEHOLDERS);
01986     keycount = list_length(varList);
01987 
01988     if (keycount > 0)
01989     {
01990         ListCell   *vl;
01991         int         i = 0;
01992 
01993         attNos = (int16 *) palloc(keycount * sizeof(int16));
01994         foreach(vl, varList)
01995         {
01996             Var        *var = (Var *) lfirst(vl);
01997             int         j;
01998 
01999             for (j = 0; j < i; j++)
02000                 if (attNos[j] == var->varattno)
02001                     break;
02002             if (j == i)
02003                 attNos[i++] = var->varattno;
02004         }
02005         keycount = i;
02006     }
02007     else
02008         attNos = NULL;
02009 
02010     /*
02011      * Create the Check Constraint
02012      */
02013     CreateConstraintEntry(ccname,       /* Constraint Name */
02014                           RelationGetNamespace(rel),    /* namespace */
02015                           CONSTRAINT_CHECK,     /* Constraint Type */
02016                           false,    /* Is Deferrable */
02017                           false,    /* Is Deferred */
02018                           is_validated,
02019                           RelationGetRelid(rel),        /* relation */
02020                           attNos,       /* attrs in the constraint */
02021                           keycount,     /* # attrs in the constraint */
02022                           InvalidOid,   /* not a domain constraint */
02023                           InvalidOid,   /* no associated index */
02024                           InvalidOid,   /* Foreign key fields */
02025                           NULL,
02026                           NULL,
02027                           NULL,
02028                           NULL,
02029                           0,
02030                           ' ',
02031                           ' ',
02032                           ' ',
02033                           NULL, /* not an exclusion constraint */
02034                           expr, /* Tree form of check constraint */
02035                           ccbin,    /* Binary form of check constraint */
02036                           ccsrc,    /* Source form of check constraint */
02037                           is_local,     /* conislocal */
02038                           inhcount,     /* coninhcount */
02039                           is_no_inherit,        /* connoinherit */
02040                           is_internal); /* internally constructed? */
02041 
02042     pfree(ccbin);
02043     pfree(ccsrc);
02044 }
02045 
02046 /*
02047  * Store defaults and constraints (passed as a list of CookedConstraint).
02048  *
02049  * NOTE: only pre-cooked expressions will be passed this way, which is to
02050  * say expressions inherited from an existing relation.  Newly parsed
02051  * expressions can be added later, by direct calls to StoreAttrDefault
02052  * and StoreRelCheck (see AddRelationNewConstraints()).
02053  */
02054 static void
02055 StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
02056 {
02057     int         numchecks = 0;
02058     ListCell   *lc;
02059 
02060     if (!cooked_constraints)
02061         return;                 /* nothing to do */
02062 
02063     /*
02064      * Deparsing of constraint expressions will fail unless the just-created
02065      * pg_attribute tuples for this relation are made visible.  So, bump the
02066      * command counter.  CAUTION: this will cause a relcache entry rebuild.
02067      */
02068     CommandCounterIncrement();
02069 
02070     foreach(lc, cooked_constraints)
02071     {
02072         CookedConstraint *con = (CookedConstraint *) lfirst(lc);
02073 
02074         switch (con->contype)
02075         {
02076             case CONSTR_DEFAULT:
02077                 StoreAttrDefault(rel, con->attnum, con->expr, is_internal);
02078                 break;
02079             case CONSTR_CHECK:
02080                 StoreRelCheck(rel, con->name, con->expr, !con->skip_validation,
02081                               con->is_local, con->inhcount,
02082                               con->is_no_inherit, is_internal);
02083                 numchecks++;
02084                 break;
02085             default:
02086                 elog(ERROR, "unrecognized constraint type: %d",
02087                      (int) con->contype);
02088         }
02089     }
02090 
02091     if (numchecks > 0)
02092         SetRelationNumChecks(rel, numchecks);
02093 }
02094 
02095 /*
02096  * AddRelationNewConstraints
02097  *
02098  * Add new column default expressions and/or constraint check expressions
02099  * to an existing relation.  This is defined to do both for efficiency in
02100  * DefineRelation, but of course you can do just one or the other by passing
02101  * empty lists.
02102  *
02103  * rel: relation to be modified
02104  * newColDefaults: list of RawColumnDefault structures
02105  * newConstraints: list of Constraint nodes
02106  * allow_merge: TRUE if check constraints may be merged with existing ones
02107  * is_local: TRUE if definition is local, FALSE if it's inherited
02108  * is_internal: TRUE if result of some internal process, not a user request
02109  *
02110  * All entries in newColDefaults will be processed.  Entries in newConstraints
02111  * will be processed only if they are CONSTR_CHECK type.
02112  *
02113  * Returns a list of CookedConstraint nodes that shows the cooked form of
02114  * the default and constraint expressions added to the relation.
02115  *
02116  * NB: caller should have opened rel with AccessExclusiveLock, and should
02117  * hold that lock till end of transaction.  Also, we assume the caller has
02118  * done a CommandCounterIncrement if necessary to make the relation's catalog
02119  * tuples visible.
02120  */
02121 List *
02122 AddRelationNewConstraints(Relation rel,
02123                           List *newColDefaults,
02124                           List *newConstraints,
02125                           bool allow_merge,
02126                           bool is_local,
02127                           bool is_internal)
02128 {
02129     List       *cookedConstraints = NIL;
02130     TupleDesc   tupleDesc;
02131     TupleConstr *oldconstr;
02132     int         numoldchecks;
02133     ParseState *pstate;
02134     RangeTblEntry *rte;
02135     int         numchecks;
02136     List       *checknames;
02137     ListCell   *cell;
02138     Node       *expr;
02139     CookedConstraint *cooked;
02140 
02141     /*
02142      * Get info about existing constraints.
02143      */
02144     tupleDesc = RelationGetDescr(rel);
02145     oldconstr = tupleDesc->constr;
02146     if (oldconstr)
02147         numoldchecks = oldconstr->num_check;
02148     else
02149         numoldchecks = 0;
02150 
02151     /*
02152      * Create a dummy ParseState and insert the target relation as its sole
02153      * rangetable entry.  We need a ParseState for transformExpr.
02154      */
02155     pstate = make_parsestate(NULL);
02156     rte = addRangeTableEntryForRelation(pstate,
02157                                         rel,
02158                                         NULL,
02159                                         false,
02160                                         true);
02161     addRTEtoQuery(pstate, rte, true, true, true);
02162 
02163     /*
02164      * Process column default expressions.
02165      */
02166     foreach(cell, newColDefaults)
02167     {
02168         RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
02169         Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
02170 
02171         expr = cookDefault(pstate, colDef->raw_default,
02172                            atp->atttypid, atp->atttypmod,
02173                            NameStr(atp->attname));
02174 
02175         /*
02176          * If the expression is just a NULL constant, we do not bother to make
02177          * an explicit pg_attrdef entry, since the default behavior is
02178          * equivalent.
02179          *
02180          * Note a nonobvious property of this test: if the column is of a
02181          * domain type, what we'll get is not a bare null Const but a
02182          * CoerceToDomain expr, so we will not discard the default.  This is
02183          * critical because the column default needs to be retained to
02184          * override any default that the domain might have.
02185          */
02186         if (expr == NULL ||
02187             (IsA(expr, Const) &&((Const *) expr)->constisnull))
02188             continue;
02189 
02190         StoreAttrDefault(rel, colDef->attnum, expr, is_internal);
02191 
02192         cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
02193         cooked->contype = CONSTR_DEFAULT;
02194         cooked->name = NULL;
02195         cooked->attnum = colDef->attnum;
02196         cooked->expr = expr;
02197         cooked->skip_validation = false;
02198         cooked->is_local = is_local;
02199         cooked->inhcount = is_local ? 0 : 1;
02200         cooked->is_no_inherit = false;
02201         cookedConstraints = lappend(cookedConstraints, cooked);
02202     }
02203 
02204     /*
02205      * Process constraint expressions.
02206      */
02207     numchecks = numoldchecks;
02208     checknames = NIL;
02209     foreach(cell, newConstraints)
02210     {
02211         Constraint *cdef = (Constraint *) lfirst(cell);
02212         char       *ccname;
02213 
02214         if (cdef->contype != CONSTR_CHECK)
02215             continue;
02216 
02217         if (cdef->raw_expr != NULL)
02218         {
02219             Assert(cdef->cooked_expr == NULL);
02220 
02221             /*
02222              * Transform raw parsetree to executable expression, and verify
02223              * it's valid as a CHECK constraint.
02224              */
02225             expr = cookConstraint(pstate, cdef->raw_expr,
02226                                   RelationGetRelationName(rel));
02227         }
02228         else
02229         {
02230             Assert(cdef->cooked_expr != NULL);
02231 
02232             /*
02233              * Here, we assume the parser will only pass us valid CHECK
02234              * expressions, so we do no particular checking.
02235              */
02236             expr = stringToNode(cdef->cooked_expr);
02237         }
02238 
02239         /*
02240          * Check name uniqueness, or generate a name if none was given.
02241          */
02242         if (cdef->conname != NULL)
02243         {
02244             ListCell   *cell2;
02245 
02246             ccname = cdef->conname;
02247             /* Check against other new constraints */
02248             /* Needed because we don't do CommandCounterIncrement in loop */
02249             foreach(cell2, checknames)
02250             {
02251                 if (strcmp((char *) lfirst(cell2), ccname) == 0)
02252                     ereport(ERROR,
02253                             (errcode(ERRCODE_DUPLICATE_OBJECT),
02254                              errmsg("check constraint \"%s\" already exists",
02255                                     ccname)));
02256             }
02257 
02258             /* save name for future checks */
02259             checknames = lappend(checknames, ccname);
02260 
02261             /*
02262              * Check against pre-existing constraints.  If we are allowed to
02263              * merge with an existing constraint, there's no more to do here.
02264              * (We omit the duplicate constraint from the result, which is
02265              * what ATAddCheckConstraint wants.)
02266              */
02267             if (MergeWithExistingConstraint(rel, ccname, expr,
02268                                             allow_merge, is_local,
02269                                             cdef->is_no_inherit))
02270                 continue;
02271         }
02272         else
02273         {
02274             /*
02275              * When generating a name, we want to create "tab_col_check" for a
02276              * column constraint and "tab_check" for a table constraint.  We
02277              * no longer have any info about the syntactic positioning of the
02278              * constraint phrase, so we approximate this by seeing whether the
02279              * expression references more than one column.  (If the user
02280              * played by the rules, the result is the same...)
02281              *
02282              * Note: pull_var_clause() doesn't descend into sublinks, but we
02283              * eliminated those above; and anyway this only needs to be an
02284              * approximate answer.
02285              */
02286             List       *vars;
02287             char       *colname;
02288 
02289             vars = pull_var_clause(expr,
02290                                    PVC_REJECT_AGGREGATES,
02291                                    PVC_REJECT_PLACEHOLDERS);
02292 
02293             /* eliminate duplicates */
02294             vars = list_union(NIL, vars);
02295 
02296             if (list_length(vars) == 1)
02297                 colname = get_attname(RelationGetRelid(rel),
02298                                       ((Var *) linitial(vars))->varattno);
02299             else
02300                 colname = NULL;
02301 
02302             ccname = ChooseConstraintName(RelationGetRelationName(rel),
02303                                           colname,
02304                                           "check",
02305                                           RelationGetNamespace(rel),
02306                                           checknames);
02307 
02308             /* save name for future checks */
02309             checknames = lappend(checknames, ccname);
02310         }
02311 
02312         /*
02313          * OK, store it.
02314          */
02315         StoreRelCheck(rel, ccname, expr, !cdef->skip_validation, is_local,
02316                       is_local ? 0 : 1, cdef->is_no_inherit, is_internal);
02317 
02318         numchecks++;
02319 
02320         cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
02321         cooked->contype = CONSTR_CHECK;
02322         cooked->name = ccname;
02323         cooked->attnum = 0;
02324         cooked->expr = expr;
02325         cooked->skip_validation = cdef->skip_validation;
02326         cooked->is_local = is_local;
02327         cooked->inhcount = is_local ? 0 : 1;
02328         cooked->is_no_inherit = cdef->is_no_inherit;
02329         cookedConstraints = lappend(cookedConstraints, cooked);
02330     }
02331 
02332     /*
02333      * Update the count of constraints in the relation's pg_class tuple. We do
02334      * this even if there was no change, in order to ensure that an SI update
02335      * message is sent out for the pg_class tuple, which will force other
02336      * backends to rebuild their relcache entries for the rel. (This is
02337      * critical if we added defaults but not constraints.)
02338      */
02339     SetRelationNumChecks(rel, numchecks);
02340 
02341     return cookedConstraints;
02342 }
02343 
02344 /*
02345  * Check for a pre-existing check constraint that conflicts with a proposed
02346  * new one, and either adjust its conislocal/coninhcount settings or throw
02347  * error as needed.
02348  *
02349  * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
02350  * got a so-far-unique name, or throws error if conflict.
02351  *
02352  * XXX See MergeConstraintsIntoExisting too if you change this code.
02353  */
02354 static bool
02355 MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
02356                             bool allow_merge, bool is_local,
02357                             bool is_no_inherit)
02358 {
02359     bool        found;
02360     Relation    conDesc;
02361     SysScanDesc conscan;
02362     ScanKeyData skey[2];
02363     HeapTuple   tup;
02364 
02365     /* Search for a pg_constraint entry with same name and relation */
02366     conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
02367 
02368     found = false;
02369 
02370     ScanKeyInit(&skey[0],
02371                 Anum_pg_constraint_conname,
02372                 BTEqualStrategyNumber, F_NAMEEQ,
02373                 CStringGetDatum(ccname));
02374 
02375     ScanKeyInit(&skey[1],
02376                 Anum_pg_constraint_connamespace,
02377                 BTEqualStrategyNumber, F_OIDEQ,
02378                 ObjectIdGetDatum(RelationGetNamespace(rel)));
02379 
02380     conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
02381                                  SnapshotNow, 2, skey);
02382 
02383     while (HeapTupleIsValid(tup = systable_getnext(conscan)))
02384     {
02385         Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
02386 
02387         if (con->conrelid == RelationGetRelid(rel))
02388         {
02389             /* Found it.  Conflicts if not identical check constraint */
02390             if (con->contype == CONSTRAINT_CHECK)
02391             {
02392                 Datum       val;
02393                 bool        isnull;
02394 
02395                 val = fastgetattr(tup,
02396                                   Anum_pg_constraint_conbin,
02397                                   conDesc->rd_att, &isnull);
02398                 if (isnull)
02399                     elog(ERROR, "null conbin for rel %s",
02400                          RelationGetRelationName(rel));
02401                 if (equal(expr, stringToNode(TextDatumGetCString(val))))
02402                     found = true;
02403             }
02404             if (!found || !allow_merge)
02405                 ereport(ERROR,
02406                         (errcode(ERRCODE_DUPLICATE_OBJECT),
02407                 errmsg("constraint \"%s\" for relation \"%s\" already exists",
02408                        ccname, RelationGetRelationName(rel))));
02409 
02410             tup = heap_copytuple(tup);
02411             con = (Form_pg_constraint) GETSTRUCT(tup);
02412 
02413             /* If the constraint is "no inherit" then cannot merge */
02414             if (con->connoinherit)
02415                 ereport(ERROR,
02416                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
02417                          errmsg("constraint \"%s\" conflicts with non-inherited constraint on relation \"%s\"",
02418                                 ccname, RelationGetRelationName(rel))));
02419 
02420             if (is_local)
02421                 con->conislocal = true;
02422             else
02423                 con->coninhcount++;
02424             if (is_no_inherit)
02425             {
02426                 Assert(is_local);
02427                 con->connoinherit = true;
02428             }
02429             /* OK to update the tuple */
02430             ereport(NOTICE,
02431                (errmsg("merging constraint \"%s\" with inherited definition",
02432                        ccname)));
02433             simple_heap_update(conDesc, &tup->t_self, tup);
02434             CatalogUpdateIndexes(conDesc, tup);
02435             break;
02436         }
02437     }
02438 
02439     systable_endscan(conscan);
02440     heap_close(conDesc, RowExclusiveLock);
02441 
02442     return found;
02443 }
02444 
02445 /*
02446  * Update the count of constraints in the relation's pg_class tuple.
02447  *
02448  * Caller had better hold exclusive lock on the relation.
02449  *
02450  * An important side effect is that a SI update message will be sent out for
02451  * the pg_class tuple, which will force other backends to rebuild their
02452  * relcache entries for the rel.  Also, this backend will rebuild its
02453  * own relcache entry at the next CommandCounterIncrement.
02454  */
02455 static void
02456 SetRelationNumChecks(Relation rel, int numchecks)
02457 {
02458     Relation    relrel;
02459     HeapTuple   reltup;
02460     Form_pg_class relStruct;
02461 
02462     relrel = heap_open(RelationRelationId, RowExclusiveLock);
02463     reltup = SearchSysCacheCopy1(RELOID,
02464                                  ObjectIdGetDatum(RelationGetRelid(rel)));
02465     if (!HeapTupleIsValid(reltup))
02466         elog(ERROR, "cache lookup failed for relation %u",
02467              RelationGetRelid(rel));
02468     relStruct = (Form_pg_class) GETSTRUCT(reltup);
02469 
02470     if (relStruct->relchecks != numchecks)
02471     {
02472         relStruct->relchecks = numchecks;
02473 
02474         simple_heap_update(relrel, &reltup->t_self, reltup);
02475 
02476         /* keep catalog indexes current */
02477         CatalogUpdateIndexes(relrel, reltup);
02478     }
02479     else
02480     {
02481         /* Skip the disk update, but force relcache inval anyway */
02482         CacheInvalidateRelcache(rel);
02483     }
02484 
02485     heap_freetuple(reltup);
02486     heap_close(relrel, RowExclusiveLock);
02487 }
02488 
02489 /*
02490  * Take a raw default and convert it to a cooked format ready for
02491  * storage.
02492  *
02493  * Parse state should be set up to recognize any vars that might appear
02494  * in the expression.  (Even though we plan to reject vars, it's more
02495  * user-friendly to give the correct error message than "unknown var".)
02496  *
02497  * If atttypid is not InvalidOid, coerce the expression to the specified
02498  * type (and typmod atttypmod).   attname is only needed in this case:
02499  * it is used in the error message, if any.
02500  */
02501 Node *
02502 cookDefault(ParseState *pstate,
02503             Node *raw_default,
02504             Oid atttypid,
02505             int32 atttypmod,
02506             char *attname)
02507 {
02508     Node       *expr;
02509 
02510     Assert(raw_default != NULL);
02511 
02512     /*
02513      * Transform raw parsetree to executable expression.
02514      */
02515     expr = transformExpr(pstate, raw_default, EXPR_KIND_COLUMN_DEFAULT);
02516 
02517     /*
02518      * Make sure default expr does not refer to any vars (we need this check
02519      * since the pstate includes the target table).
02520      */
02521     if (contain_var_clause(expr))
02522         ereport(ERROR,
02523                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
02524               errmsg("cannot use column references in default expression")));
02525 
02526     /*
02527      * transformExpr() should have already rejected subqueries, aggregates,
02528      * and window functions, based on the EXPR_KIND_ for a default expression.
02529      *
02530      * It can't return a set either.
02531      */
02532     if (expression_returns_set(expr))
02533         ereport(ERROR,
02534                 (errcode(ERRCODE_DATATYPE_MISMATCH),
02535                  errmsg("default expression must not return a set")));
02536 
02537     /*
02538      * Coerce the expression to the correct type and typmod, if given. This
02539      * should match the parser's processing of non-defaulted expressions ---
02540      * see transformAssignedExpr().
02541      */
02542     if (OidIsValid(atttypid))
02543     {
02544         Oid         type_id = exprType(expr);
02545 
02546         expr = coerce_to_target_type(pstate, expr, type_id,
02547                                      atttypid, atttypmod,
02548                                      COERCION_ASSIGNMENT,
02549                                      COERCE_IMPLICIT_CAST,
02550                                      -1);
02551         if (expr == NULL)
02552             ereport(ERROR,
02553                     (errcode(ERRCODE_DATATYPE_MISMATCH),
02554                      errmsg("column \"%s\" is of type %s"
02555                             " but default expression is of type %s",
02556                             attname,
02557                             format_type_be(atttypid),
02558                             format_type_be(type_id)),
02559                errhint("You will need to rewrite or cast the expression.")));
02560     }
02561 
02562     /*
02563      * Finally, take care of collations in the finished expression.
02564      */
02565     assign_expr_collations(pstate, expr);
02566 
02567     return expr;
02568 }
02569 
02570 /*
02571  * Take a raw CHECK constraint expression and convert it to a cooked format
02572  * ready for storage.
02573  *
02574  * Parse state must be set up to recognize any vars that might appear
02575  * in the expression.
02576  */
02577 static Node *
02578 cookConstraint(ParseState *pstate,
02579                Node *raw_constraint,
02580                char *relname)
02581 {
02582     Node       *expr;
02583 
02584     /*
02585      * Transform raw parsetree to executable expression.
02586      */
02587     expr = transformExpr(pstate, raw_constraint, EXPR_KIND_CHECK_CONSTRAINT);
02588 
02589     /*
02590      * Make sure it yields a boolean result.
02591      */
02592     expr = coerce_to_boolean(pstate, expr, "CHECK");
02593 
02594     /*
02595      * Take care of collations.
02596      */
02597     assign_expr_collations(pstate, expr);
02598 
02599     /*
02600      * Make sure no outside relations are referred to (this is probably dead
02601      * code now that add_missing_from is history).
02602      */
02603     if (list_length(pstate->p_rtable) != 1)
02604         ereport(ERROR,
02605                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
02606             errmsg("only table \"%s\" can be referenced in check constraint",
02607                    relname)));
02608 
02609     return expr;
02610 }
02611 
02612 
02613 /*
02614  * RemoveStatistics --- remove entries in pg_statistic for a rel or column
02615  *
02616  * If attnum is zero, remove all entries for rel; else remove only the one(s)
02617  * for that column.
02618  */
02619 void
02620 RemoveStatistics(Oid relid, AttrNumber attnum)
02621 {
02622     Relation    pgstatistic;
02623     SysScanDesc scan;
02624     ScanKeyData key[2];
02625     int         nkeys;
02626     HeapTuple   tuple;
02627 
02628     pgstatistic = heap_open(StatisticRelationId, RowExclusiveLock);
02629 
02630     ScanKeyInit(&key[0],
02631                 Anum_pg_statistic_starelid,
02632                 BTEqualStrategyNumber, F_OIDEQ,
02633                 ObjectIdGetDatum(relid));
02634 
02635     if (attnum == 0)
02636         nkeys = 1;
02637     else
02638     {
02639         ScanKeyInit(&key[1],
02640                     Anum_pg_statistic_staattnum,
02641                     BTEqualStrategyNumber, F_INT2EQ,
02642                     Int16GetDatum(attnum));
02643         nkeys = 2;
02644     }
02645 
02646     scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
02647                               SnapshotNow, nkeys, key);
02648 
02649     /* we must loop even when attnum != 0, in case of inherited stats */
02650     while (HeapTupleIsValid(tuple = systable_getnext(scan)))
02651         simple_heap_delete(pgstatistic, &tuple->t_self);
02652 
02653     systable_endscan(scan);
02654 
02655     heap_close(pgstatistic, RowExclusiveLock);
02656 }
02657 
02658 
02659 /*
02660  * RelationTruncateIndexes - truncate all indexes associated
02661  * with the heap relation to zero tuples.
02662  *
02663  * The routine will truncate and then reconstruct the indexes on
02664  * the specified relation.  Caller must hold exclusive lock on rel.
02665  */
02666 static void
02667 RelationTruncateIndexes(Relation heapRelation)
02668 {
02669     ListCell   *indlist;
02670 
02671     /* Ask the relcache to produce a list of the indexes of the rel */
02672     foreach(indlist, RelationGetIndexList(heapRelation))
02673     {
02674         Oid         indexId = lfirst_oid(indlist);
02675         Relation    currentIndex;
02676         IndexInfo  *indexInfo;
02677 
02678         /* Open the index relation; use exclusive lock, just to be sure */
02679         currentIndex = index_open(indexId, AccessExclusiveLock);
02680 
02681         /* Fetch info needed for index_build */
02682         indexInfo = BuildIndexInfo(currentIndex);
02683 
02684         /*
02685          * Now truncate the actual file (and discard buffers).
02686          */
02687         RelationTruncate(currentIndex, 0);
02688 
02689         /* Initialize the index and rebuild */
02690         /* Note: we do not need to re-establish pkey setting */
02691         index_build(heapRelation, currentIndex, indexInfo, false, true);
02692 
02693         /* We're done with this index */
02694         index_close(currentIndex, NoLock);
02695     }
02696 }
02697 
02698 /*
02699  *   heap_truncate
02700  *
02701  *   This routine deletes all data within all the specified relations.
02702  *
02703  * This is not transaction-safe!  There is another, transaction-safe
02704  * implementation in commands/tablecmds.c.  We now use this only for
02705  * ON COMMIT truncation of temporary tables, where it doesn't matter.
02706  */
02707 void
02708 heap_truncate(List *relids)
02709 {
02710     List       *relations = NIL;
02711     ListCell   *cell;
02712 
02713     /* Open relations for processing, and grab exclusive access on each */
02714     foreach(cell, relids)
02715     {
02716         Oid         rid = lfirst_oid(cell);
02717         Relation    rel;
02718 
02719         rel = heap_open(rid, AccessExclusiveLock);
02720         relations = lappend(relations, rel);
02721     }
02722 
02723     /* Don't allow truncate on tables that are referenced by foreign keys */
02724     heap_truncate_check_FKs(relations, true);
02725 
02726     /* OK to do it */
02727     foreach(cell, relations)
02728     {
02729         Relation    rel = lfirst(cell);
02730 
02731         /* Truncate the relation */
02732         heap_truncate_one_rel(rel);
02733 
02734         /* Close the relation, but keep exclusive lock on it until commit */
02735         heap_close(rel, NoLock);
02736     }
02737 }
02738 
02739 /*
02740  *   heap_truncate_one_rel
02741  *
02742  *   This routine deletes all data within the specified relation.
02743  *
02744  * This is not transaction-safe, because the truncation is done immediately
02745  * and cannot be rolled back later.  Caller is responsible for having
02746  * checked permissions etc, and must have obtained AccessExclusiveLock.
02747  */
02748 void
02749 heap_truncate_one_rel(Relation rel)
02750 {
02751     Oid         toastrelid;
02752 
02753     /* Truncate the actual file (and discard buffers) */
02754     RelationTruncate(rel, 0);
02755 
02756     /* If the relation has indexes, truncate the indexes too */
02757     RelationTruncateIndexes(rel);
02758 
02759     /* If there is a toast table, truncate that too */
02760     toastrelid = rel->rd_rel->reltoastrelid;
02761     if (OidIsValid(toastrelid))
02762     {
02763         Relation    toastrel = heap_open(toastrelid, AccessExclusiveLock);
02764 
02765         RelationTruncate(toastrel, 0);
02766         RelationTruncateIndexes(toastrel);
02767         /* keep the lock... */
02768         heap_close(toastrel, NoLock);
02769     }
02770 }
02771 
02772 /*
02773  * heap_truncate_check_FKs
02774  *      Check for foreign keys referencing a list of relations that
02775  *      are to be truncated, and raise error if there are any
02776  *
02777  * We disallow such FKs (except self-referential ones) since the whole point
02778  * of TRUNCATE is to not scan the individual rows to be thrown away.
02779  *
02780  * This is split out so it can be shared by both implementations of truncate.
02781  * Caller should already hold a suitable lock on the relations.
02782  *
02783  * tempTables is only used to select an appropriate error message.
02784  */
02785 void
02786 heap_truncate_check_FKs(List *relations, bool tempTables)
02787 {
02788     List       *oids = NIL;
02789     List       *dependents;
02790     ListCell   *cell;
02791 
02792     /*
02793      * Build a list of OIDs of the interesting relations.
02794      *
02795      * If a relation has no triggers, then it can neither have FKs nor be
02796      * referenced by a FK from another table, so we can ignore it.
02797      */
02798     foreach(cell, relations)
02799     {
02800         Relation    rel = lfirst(cell);
02801 
02802         if (rel->rd_rel->relhastriggers)
02803             oids = lappend_oid(oids, RelationGetRelid(rel));
02804     }
02805 
02806     /*
02807      * Fast path: if no relation has triggers, none has FKs either.
02808      */
02809     if (oids == NIL)
02810         return;
02811 
02812     /*
02813      * Otherwise, must scan pg_constraint.  We make one pass with all the
02814      * relations considered; if this finds nothing, then all is well.
02815      */
02816     dependents = heap_truncate_find_FKs(oids);
02817     if (dependents == NIL)
02818         return;
02819 
02820     /*
02821      * Otherwise we repeat the scan once per relation to identify a particular
02822      * pair of relations to complain about.  This is pretty slow, but
02823      * performance shouldn't matter much in a failure path.  The reason for
02824      * doing things this way is to ensure that the message produced is not
02825      * dependent on chance row locations within pg_constraint.
02826      */
02827     foreach(cell, oids)
02828     {
02829         Oid         relid = lfirst_oid(cell);
02830         ListCell   *cell2;
02831 
02832         dependents = heap_truncate_find_FKs(list_make1_oid(relid));
02833 
02834         foreach(cell2, dependents)
02835         {
02836             Oid         relid2 = lfirst_oid(cell2);
02837 
02838             if (!list_member_oid(oids, relid2))
02839             {
02840                 char       *relname = get_rel_name(relid);
02841                 char       *relname2 = get_rel_name(relid2);
02842 
02843                 if (tempTables)
02844                     ereport(ERROR,
02845                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
02846                              errmsg("unsupported ON COMMIT and foreign key combination"),
02847                              errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
02848                                        relname2, relname)));
02849                 else
02850                     ereport(ERROR,
02851                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
02852                              errmsg("cannot truncate a table referenced in a foreign key constraint"),
02853                              errdetail("Table \"%s\" references \"%s\".",
02854                                        relname2, relname),
02855                            errhint("Truncate table \"%s\" at the same time, "
02856                                    "or use TRUNCATE ... CASCADE.",
02857                                    relname2)));
02858             }
02859         }
02860     }
02861 }
02862 
02863 /*
02864  * heap_truncate_find_FKs
02865  *      Find relations having foreign keys referencing any of the given rels
02866  *
02867  * Input and result are both lists of relation OIDs.  The result contains
02868  * no duplicates, does *not* include any rels that were already in the input
02869  * list, and is sorted in OID order.  (The last property is enforced mainly
02870  * to guarantee consistent behavior in the regression tests; we don't want
02871  * behavior to change depending on chance locations of rows in pg_constraint.)
02872  *
02873  * Note: caller should already have appropriate lock on all rels mentioned
02874  * in relationIds.  Since adding or dropping an FK requires exclusive lock
02875  * on both rels, this ensures that the answer will be stable.
02876  */
02877 List *
02878 heap_truncate_find_FKs(List *relationIds)
02879 {
02880     List       *result = NIL;
02881     Relation    fkeyRel;
02882     SysScanDesc fkeyScan;
02883     HeapTuple   tuple;
02884 
02885     /*
02886      * Must scan pg_constraint.  Right now, it is a seqscan because there is
02887      * no available index on confrelid.
02888      */
02889     fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
02890 
02891     fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
02892                                   SnapshotNow, 0, NULL);
02893 
02894     while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
02895     {
02896         Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
02897 
02898         /* Not a foreign key */
02899         if (con->contype != CONSTRAINT_FOREIGN)
02900             continue;
02901 
02902         /* Not referencing one of our list of tables */
02903         if (!list_member_oid(relationIds, con->confrelid))
02904             continue;
02905 
02906         /* Add referencer unless already in input or result list */
02907         if (!list_member_oid(relationIds, con->conrelid))
02908             result = insert_ordered_unique_oid(result, con->conrelid);
02909     }
02910 
02911     systable_endscan(fkeyScan);
02912     heap_close(fkeyRel, AccessShareLock);
02913 
02914     return result;
02915 }
02916 
02917 /*
02918  * insert_ordered_unique_oid
02919  *      Insert a new Oid into a sorted list of Oids, preserving ordering,
02920  *      and eliminating duplicates
02921  *
02922  * Building the ordered list this way is O(N^2), but with a pretty small
02923  * constant, so for the number of entries we expect it will probably be
02924  * faster than trying to apply qsort().  It seems unlikely someone would be
02925  * trying to truncate a table with thousands of dependent tables ...
02926  */
02927 static List *
02928 insert_ordered_unique_oid(List *list, Oid datum)
02929 {
02930     ListCell   *prev;
02931 
02932     /* Does the datum belong at the front? */
02933     if (list == NIL || datum < linitial_oid(list))
02934         return lcons_oid(datum, list);
02935     /* Does it match the first entry? */
02936     if (datum == linitial_oid(list))
02937         return list;            /* duplicate, so don't insert */
02938     /* No, so find the entry it belongs after */
02939     prev = list_head(list);
02940     for (;;)
02941     {
02942         ListCell   *curr = lnext(prev);
02943 
02944         if (curr == NULL || datum < lfirst_oid(curr))
02945             break;              /* it belongs after 'prev', before 'curr' */
02946 
02947         if (datum == lfirst_oid(curr))
02948             return list;        /* duplicate, so don't insert */
02949 
02950         prev = curr;
02951     }
02952     /* Insert datum into list after 'prev' */
02953     lappend_cell_oid(list, prev, datum);
02954     return list;
02955 }