Header And Logo

PostgreSQL
| The world's most advanced open source database.

indexcmds.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * indexcmds.c
00004  *    POSTGRES define and remove index code.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/commands/indexcmds.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/htup_details.h"
00019 #include "access/reloptions.h"
00020 #include "access/xact.h"
00021 #include "catalog/catalog.h"
00022 #include "catalog/index.h"
00023 #include "catalog/indexing.h"
00024 #include "catalog/pg_opclass.h"
00025 #include "catalog/pg_opfamily.h"
00026 #include "catalog/pg_tablespace.h"
00027 #include "catalog/pg_type.h"
00028 #include "commands/comment.h"
00029 #include "commands/dbcommands.h"
00030 #include "commands/defrem.h"
00031 #include "commands/tablecmds.h"
00032 #include "commands/tablespace.h"
00033 #include "mb/pg_wchar.h"
00034 #include "miscadmin.h"
00035 #include "nodes/nodeFuncs.h"
00036 #include "optimizer/clauses.h"
00037 #include "optimizer/planner.h"
00038 #include "parser/parse_coerce.h"
00039 #include "parser/parse_func.h"
00040 #include "parser/parse_oper.h"
00041 #include "storage/lmgr.h"
00042 #include "storage/proc.h"
00043 #include "storage/procarray.h"
00044 #include "utils/acl.h"
00045 #include "utils/builtins.h"
00046 #include "utils/fmgroids.h"
00047 #include "utils/inval.h"
00048 #include "utils/lsyscache.h"
00049 #include "utils/memutils.h"
00050 #include "utils/snapmgr.h"
00051 #include "utils/syscache.h"
00052 #include "utils/tqual.h"
00053 
00054 
00055 /* non-export function prototypes */
00056 static void CheckPredicate(Expr *predicate);
00057 static void ComputeIndexAttrs(IndexInfo *indexInfo,
00058                   Oid *typeOidP,
00059                   Oid *collationOidP,
00060                   Oid *classOidP,
00061                   int16 *colOptionP,
00062                   List *attList,
00063                   List *exclusionOpNames,
00064                   Oid relId,
00065                   char *accessMethodName, Oid accessMethodId,
00066                   bool amcanorder,
00067                   bool isconstraint);
00068 static Oid GetIndexOpClass(List *opclass, Oid attrType,
00069                 char *accessMethodName, Oid accessMethodId);
00070 static char *ChooseIndexName(const char *tabname, Oid namespaceId,
00071                 List *colnames, List *exclusionOpNames,
00072                 bool primary, bool isconstraint);
00073 static char *ChooseIndexNameAddition(List *colnames);
00074 static List *ChooseIndexColumnNames(List *indexElems);
00075 static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
00076                                 Oid relId, Oid oldRelId, void *arg);
00077 
00078 /*
00079  * CheckIndexCompatible
00080  *      Determine whether an existing index definition is compatible with a
00081  *      prospective index definition, such that the existing index storage
00082  *      could become the storage of the new index, avoiding a rebuild.
00083  *
00084  * 'heapRelation': the relation the index would apply to.
00085  * 'accessMethodName': name of the AM to use.
00086  * 'attributeList': a list of IndexElem specifying columns and expressions
00087  *      to index on.
00088  * 'exclusionOpNames': list of names of exclusion-constraint operators,
00089  *      or NIL if not an exclusion constraint.
00090  *
00091  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
00092  * any indexes that depended on a changing column from their pg_get_indexdef
00093  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
00094  * DefineIndex.  We assume that the old and new indexes have the same number
00095  * of columns and that if one has an expression column or predicate, both do.
00096  * Errors arising from the attribute list still apply.
00097  *
00098  * Most column type changes that can skip a table rewrite do not invalidate
00099  * indexes.  We ackowledge this when all operator classes, collations and
00100  * exclusion operators match.  Though we could further permit intra-opfamily
00101  * changes for btree and hash indexes, that adds subtle complexity with no
00102  * concrete benefit for core types.
00103 
00104  * When a comparison or exclusion operator has a polymorphic input type, the
00105  * actual input types must also match.  This defends against the possibility
00106  * that operators could vary behavior in response to get_fn_expr_argtype().
00107  * At present, this hazard is theoretical: check_exclusion_constraint() and
00108  * all core index access methods decline to set fn_expr for such calls.
00109  *
00110  * We do not yet implement a test to verify compatibility of expression
00111  * columns or predicates, so assume any such index is incompatible.
00112  */
00113 bool
00114 CheckIndexCompatible(Oid oldId,
00115                      RangeVar *heapRelation,
00116                      char *accessMethodName,
00117                      List *attributeList,
00118                      List *exclusionOpNames)
00119 {
00120     bool        isconstraint;
00121     Oid        *typeObjectId;
00122     Oid        *collationObjectId;
00123     Oid        *classObjectId;
00124     Oid         accessMethodId;
00125     Oid         relationId;
00126     HeapTuple   tuple;
00127     Form_pg_index indexForm;
00128     Form_pg_am  accessMethodForm;
00129     bool        amcanorder;
00130     int16      *coloptions;
00131     IndexInfo  *indexInfo;
00132     int         numberOfAttributes;
00133     int         old_natts;
00134     bool        isnull;
00135     bool        ret = true;
00136     oidvector  *old_indclass;
00137     oidvector  *old_indcollation;
00138     Relation    irel;
00139     int         i;
00140     Datum       d;
00141 
00142     /* Caller should already have the relation locked in some way. */
00143     relationId = RangeVarGetRelid(heapRelation, NoLock, false);
00144 
00145     /*
00146      * We can pretend isconstraint = false unconditionally.  It only serves to
00147      * decide the text of an error message that should never happen for us.
00148      */
00149     isconstraint = false;
00150 
00151     numberOfAttributes = list_length(attributeList);
00152     Assert(numberOfAttributes > 0);
00153     Assert(numberOfAttributes <= INDEX_MAX_KEYS);
00154 
00155     /* look up the access method */
00156     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
00157     if (!HeapTupleIsValid(tuple))
00158         ereport(ERROR,
00159                 (errcode(ERRCODE_UNDEFINED_OBJECT),
00160                  errmsg("access method \"%s\" does not exist",
00161                         accessMethodName)));
00162     accessMethodId = HeapTupleGetOid(tuple);
00163     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
00164     amcanorder = accessMethodForm->amcanorder;
00165     ReleaseSysCache(tuple);
00166 
00167     /*
00168      * Compute the operator classes, collations, and exclusion operators for
00169      * the new index, so we can test whether it's compatible with the existing
00170      * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
00171      * DefineIndex would have called this function with the same arguments
00172      * later on, and it would have failed then anyway.
00173      */
00174     indexInfo = makeNode(IndexInfo);
00175     indexInfo->ii_Expressions = NIL;
00176     indexInfo->ii_ExpressionsState = NIL;
00177     indexInfo->ii_PredicateState = NIL;
00178     indexInfo->ii_ExclusionOps = NULL;
00179     indexInfo->ii_ExclusionProcs = NULL;
00180     indexInfo->ii_ExclusionStrats = NULL;
00181     typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
00182     collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
00183     classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
00184     coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
00185     ComputeIndexAttrs(indexInfo,
00186                       typeObjectId, collationObjectId, classObjectId,
00187                       coloptions, attributeList,
00188                       exclusionOpNames, relationId,
00189                       accessMethodName, accessMethodId,
00190                       amcanorder, isconstraint);
00191 
00192 
00193     /* Get the soon-obsolete pg_index tuple. */
00194     tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
00195     if (!HeapTupleIsValid(tuple))
00196         elog(ERROR, "cache lookup failed for index %u", oldId);
00197     indexForm = (Form_pg_index) GETSTRUCT(tuple);
00198 
00199     /*
00200      * We don't assess expressions or predicates; assume incompatibility.
00201      * Also, if the index is invalid for any reason, treat it as incompatible.
00202      */
00203     if (!(heap_attisnull(tuple, Anum_pg_index_indpred) &&
00204           heap_attisnull(tuple, Anum_pg_index_indexprs) &&
00205           IndexIsValid(indexForm)))
00206     {
00207         ReleaseSysCache(tuple);
00208         return false;
00209     }
00210 
00211     /* Any change in operator class or collation breaks compatibility. */
00212     old_natts = indexForm->indnatts;
00213     Assert(old_natts == numberOfAttributes);
00214 
00215     d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
00216     Assert(!isnull);
00217     old_indcollation = (oidvector *) DatumGetPointer(d);
00218 
00219     d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indclass, &isnull);
00220     Assert(!isnull);
00221     old_indclass = (oidvector *) DatumGetPointer(d);
00222 
00223     ret = (memcmp(old_indclass->values, classObjectId,
00224                   old_natts * sizeof(Oid)) == 0 &&
00225            memcmp(old_indcollation->values, collationObjectId,
00226                   old_natts * sizeof(Oid)) == 0);
00227 
00228     ReleaseSysCache(tuple);
00229 
00230     if (!ret)
00231         return false;
00232 
00233     /* For polymorphic opcintype, column type changes break compatibility. */
00234     irel = index_open(oldId, AccessShareLock);  /* caller probably has a lock */
00235     for (i = 0; i < old_natts; i++)
00236     {
00237         if (IsPolymorphicType(get_opclass_input_type(classObjectId[i])) &&
00238             irel->rd_att->attrs[i]->atttypid != typeObjectId[i])
00239         {
00240             ret = false;
00241             break;
00242         }
00243     }
00244 
00245     /* Any change in exclusion operator selections breaks compatibility. */
00246     if (ret && indexInfo->ii_ExclusionOps != NULL)
00247     {
00248         Oid        *old_operators,
00249                    *old_procs;
00250         uint16     *old_strats;
00251 
00252         RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
00253         ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
00254                      old_natts * sizeof(Oid)) == 0;
00255 
00256         /* Require an exact input type match for polymorphic operators. */
00257         if (ret)
00258         {
00259             for (i = 0; i < old_natts && ret; i++)
00260             {
00261                 Oid         left,
00262                             right;
00263 
00264                 op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
00265                 if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
00266                     irel->rd_att->attrs[i]->atttypid != typeObjectId[i])
00267                 {
00268                     ret = false;
00269                     break;
00270                 }
00271             }
00272         }
00273     }
00274 
00275     index_close(irel, NoLock);
00276     return ret;
00277 }
00278 
00279 /*
00280  * DefineIndex
00281  *      Creates a new index.
00282  *
00283  * 'stmt': IndexStmt describing the properties of the new index.
00284  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
00285  *      nonzero to specify a preselected OID for the index.
00286  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
00287  * 'check_rights': check for CREATE rights in the namespace.  (This should
00288  *      be true except when ALTER is deleting/recreating an index.)
00289  * 'skip_build': make the catalog entries but leave the index file empty;
00290  *      it will be filled later.
00291  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
00292  *
00293  * Returns the OID of the created index.
00294  */
00295 Oid
00296 DefineIndex(IndexStmt *stmt,
00297             Oid indexRelationId,
00298             bool is_alter_table,
00299             bool check_rights,
00300             bool skip_build,
00301             bool quiet)
00302 {
00303     char       *indexRelationName;
00304     char       *accessMethodName;
00305     Oid        *typeObjectId;
00306     Oid        *collationObjectId;
00307     Oid        *classObjectId;
00308     Oid         accessMethodId;
00309     Oid         relationId;
00310     Oid         namespaceId;
00311     Oid         tablespaceId;
00312     List       *indexColNames;
00313     Relation    rel;
00314     Relation    indexRelation;
00315     HeapTuple   tuple;
00316     Form_pg_am  accessMethodForm;
00317     bool        amcanorder;
00318     RegProcedure amoptions;
00319     Datum       reloptions;
00320     int16      *coloptions;
00321     IndexInfo  *indexInfo;
00322     int         numberOfAttributes;
00323     TransactionId limitXmin;
00324     VirtualTransactionId *old_lockholders;
00325     VirtualTransactionId *old_snapshots;
00326     int         n_old_snapshots;
00327     LockRelId   heaprelid;
00328     LOCKTAG     heaplocktag;
00329     Snapshot    snapshot;
00330     int         i;
00331 
00332     /*
00333      * count attributes in index
00334      */
00335     numberOfAttributes = list_length(stmt->indexParams);
00336     if (numberOfAttributes <= 0)
00337         ereport(ERROR,
00338                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
00339                  errmsg("must specify at least one column")));
00340     if (numberOfAttributes > INDEX_MAX_KEYS)
00341         ereport(ERROR,
00342                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
00343                  errmsg("cannot use more than %d columns in an index",
00344                         INDEX_MAX_KEYS)));
00345 
00346     /*
00347      * Open heap relation, acquire a suitable lock on it, remember its OID
00348      *
00349      * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
00350      * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
00351      * (but not VACUUM).
00352      */
00353     rel = heap_openrv(stmt->relation,
00354                       (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
00355 
00356     relationId = RelationGetRelid(rel);
00357     namespaceId = RelationGetNamespace(rel);
00358 
00359     if (rel->rd_rel->relkind != RELKIND_RELATION &&
00360         rel->rd_rel->relkind != RELKIND_MATVIEW)
00361     {
00362         if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
00363 
00364             /*
00365              * Custom error message for FOREIGN TABLE since the term is close
00366              * to a regular table and can confuse the user.
00367              */
00368             ereport(ERROR,
00369                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
00370                      errmsg("cannot create index on foreign table \"%s\"",
00371                             RelationGetRelationName(rel))));
00372         else
00373             ereport(ERROR,
00374                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
00375                      errmsg("\"%s\" is not a table",
00376                             RelationGetRelationName(rel))));
00377     }
00378 
00379     /*
00380      * Don't try to CREATE INDEX on temp tables of other backends.
00381      */
00382     if (RELATION_IS_OTHER_TEMP(rel))
00383         ereport(ERROR,
00384                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00385                  errmsg("cannot create indexes on temporary tables of other sessions")));
00386 
00387     /*
00388      * Verify we (still) have CREATE rights in the rel's namespace.
00389      * (Presumably we did when the rel was created, but maybe not anymore.)
00390      * Skip check if caller doesn't want it.  Also skip check if
00391      * bootstrapping, since permissions machinery may not be working yet.
00392      */
00393     if (check_rights && !IsBootstrapProcessingMode())
00394     {
00395         AclResult   aclresult;
00396 
00397         aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
00398                                           ACL_CREATE);
00399         if (aclresult != ACLCHECK_OK)
00400             aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
00401                            get_namespace_name(namespaceId));
00402     }
00403 
00404     /*
00405      * Select tablespace to use.  If not specified, use default tablespace
00406      * (which may in turn default to database's default).
00407      */
00408     if (stmt->tableSpace)
00409     {
00410         tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
00411     }
00412     else
00413     {
00414         tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence);
00415         /* note InvalidOid is OK in this case */
00416     }
00417 
00418     /* Check permissions except when using database's default */
00419     if (OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
00420     {
00421         AclResult   aclresult;
00422 
00423         aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
00424                                            ACL_CREATE);
00425         if (aclresult != ACLCHECK_OK)
00426             aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
00427                            get_tablespace_name(tablespaceId));
00428     }
00429 
00430     /*
00431      * Force shared indexes into the pg_global tablespace.  This is a bit of a
00432      * hack but seems simpler than marking them in the BKI commands.  On the
00433      * other hand, if it's not shared, don't allow it to be placed there.
00434      */
00435     if (rel->rd_rel->relisshared)
00436         tablespaceId = GLOBALTABLESPACE_OID;
00437     else if (tablespaceId == GLOBALTABLESPACE_OID)
00438         ereport(ERROR,
00439                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00440                  errmsg("only shared relations can be placed in pg_global tablespace")));
00441 
00442     /*
00443      * Choose the index column names.
00444      */
00445     indexColNames = ChooseIndexColumnNames(stmt->indexParams);
00446 
00447     /*
00448      * Select name for index if caller didn't specify
00449      */
00450     indexRelationName = stmt->idxname;
00451     if (indexRelationName == NULL)
00452         indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
00453                                             namespaceId,
00454                                             indexColNames,
00455                                             stmt->excludeOpNames,
00456                                             stmt->primary,
00457                                             stmt->isconstraint);
00458 
00459     /*
00460      * look up the access method, verify it can handle the requested features
00461      */
00462     accessMethodName = stmt->accessMethod;
00463     tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
00464     if (!HeapTupleIsValid(tuple))
00465     {
00466         /*
00467          * Hack to provide more-or-less-transparent updating of old RTREE
00468          * indexes to GiST: if RTREE is requested and not found, use GIST.
00469          */
00470         if (strcmp(accessMethodName, "rtree") == 0)
00471         {
00472             ereport(NOTICE,
00473                     (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
00474             accessMethodName = "gist";
00475             tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
00476         }
00477 
00478         if (!HeapTupleIsValid(tuple))
00479             ereport(ERROR,
00480                     (errcode(ERRCODE_UNDEFINED_OBJECT),
00481                      errmsg("access method \"%s\" does not exist",
00482                             accessMethodName)));
00483     }
00484     accessMethodId = HeapTupleGetOid(tuple);
00485     accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
00486 
00487     if (stmt->unique && !accessMethodForm->amcanunique)
00488         ereport(ERROR,
00489                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00490                errmsg("access method \"%s\" does not support unique indexes",
00491                       accessMethodName)));
00492     if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
00493         ereport(ERROR,
00494                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00495           errmsg("access method \"%s\" does not support multicolumn indexes",
00496                  accessMethodName)));
00497     if (stmt->excludeOpNames && !OidIsValid(accessMethodForm->amgettuple))
00498         ereport(ERROR,
00499                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00500         errmsg("access method \"%s\" does not support exclusion constraints",
00501                accessMethodName)));
00502 
00503     amcanorder = accessMethodForm->amcanorder;
00504     amoptions = accessMethodForm->amoptions;
00505 
00506     ReleaseSysCache(tuple);
00507 
00508     /*
00509      * Validate predicate, if given
00510      */
00511     if (stmt->whereClause)
00512         CheckPredicate((Expr *) stmt->whereClause);
00513 
00514     /*
00515      * Parse AM-specific options, convert to text array form, validate.
00516      */
00517     reloptions = transformRelOptions((Datum) 0, stmt->options,
00518                                      NULL, NULL, false, false);
00519 
00520     (void) index_reloptions(amoptions, reloptions, true);
00521 
00522     /*
00523      * Prepare arguments for index_create, primarily an IndexInfo structure.
00524      * Note that ii_Predicate must be in implicit-AND format.
00525      */
00526     indexInfo = makeNode(IndexInfo);
00527     indexInfo->ii_NumIndexAttrs = numberOfAttributes;
00528     indexInfo->ii_Expressions = NIL;    /* for now */
00529     indexInfo->ii_ExpressionsState = NIL;
00530     indexInfo->ii_Predicate = make_ands_implicit((Expr *) stmt->whereClause);
00531     indexInfo->ii_PredicateState = NIL;
00532     indexInfo->ii_ExclusionOps = NULL;
00533     indexInfo->ii_ExclusionProcs = NULL;
00534     indexInfo->ii_ExclusionStrats = NULL;
00535     indexInfo->ii_Unique = stmt->unique;
00536     /* In a concurrent build, mark it not-ready-for-inserts */
00537     indexInfo->ii_ReadyForInserts = !stmt->concurrent;
00538     indexInfo->ii_Concurrent = stmt->concurrent;
00539     indexInfo->ii_BrokenHotChain = false;
00540 
00541     typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
00542     collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
00543     classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
00544     coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
00545     ComputeIndexAttrs(indexInfo,
00546                       typeObjectId, collationObjectId, classObjectId,
00547                       coloptions, stmt->indexParams,
00548                       stmt->excludeOpNames, relationId,
00549                       accessMethodName, accessMethodId,
00550                       amcanorder, stmt->isconstraint);
00551 
00552     /*
00553      * Extra checks when creating a PRIMARY KEY index.
00554      */
00555     if (stmt->primary)
00556         index_check_primary_key(rel, indexInfo, is_alter_table);
00557 
00558     /*
00559      * Report index creation if appropriate (delay this till after most of the
00560      * error checks)
00561      */
00562     if (stmt->isconstraint && !quiet)
00563     {
00564         const char *constraint_type;
00565 
00566         if (stmt->primary)
00567             constraint_type = "PRIMARY KEY";
00568         else if (stmt->unique)
00569             constraint_type = "UNIQUE";
00570         else if (stmt->excludeOpNames != NIL)
00571             constraint_type = "EXCLUDE";
00572         else
00573         {
00574             elog(ERROR, "unknown constraint type");
00575             constraint_type = NULL;     /* keep compiler quiet */
00576         }
00577 
00578         ereport(DEBUG1,
00579           (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
00580                   is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
00581                   constraint_type,
00582                   indexRelationName, RelationGetRelationName(rel))));
00583     }
00584 
00585     /*
00586      * A valid stmt->oldNode implies that we already have a built form of the
00587      * index.  The caller should also decline any index build.
00588      */
00589     Assert(!OidIsValid(stmt->oldNode) || (skip_build && !stmt->concurrent));
00590 
00591     /*
00592      * Make the catalog entries for the index, including constraints. Then, if
00593      * not skip_build || concurrent, actually build the index.
00594      */
00595     indexRelationId =
00596         index_create(rel, indexRelationName, indexRelationId, stmt->oldNode,
00597                      indexInfo, indexColNames,
00598                      accessMethodId, tablespaceId,
00599                      collationObjectId, classObjectId,
00600                      coloptions, reloptions, stmt->primary,
00601                      stmt->isconstraint, stmt->deferrable, stmt->initdeferred,
00602                      allowSystemTableMods,
00603                      skip_build || stmt->concurrent,
00604                      stmt->concurrent, !check_rights);
00605 
00606     /* Add any requested comment */
00607     if (stmt->idxcomment != NULL)
00608         CreateComments(indexRelationId, RelationRelationId, 0,
00609                        stmt->idxcomment);
00610 
00611     if (!stmt->concurrent)
00612     {
00613         /* Close the heap and we're done, in the non-concurrent case */
00614         heap_close(rel, NoLock);
00615         return indexRelationId;
00616     }
00617 
00618     /* save lockrelid and locktag for below, then close rel */
00619     heaprelid = rel->rd_lockInfo.lockRelId;
00620     SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
00621     heap_close(rel, NoLock);
00622 
00623     /*
00624      * For a concurrent build, it's important to make the catalog entries
00625      * visible to other transactions before we start to build the index. That
00626      * will prevent them from making incompatible HOT updates.  The new index
00627      * will be marked not indisready and not indisvalid, so that no one else
00628      * tries to either insert into it or use it for queries.
00629      *
00630      * We must commit our current transaction so that the index becomes
00631      * visible; then start another.  Note that all the data structures we just
00632      * built are lost in the commit.  The only data we keep past here are the
00633      * relation IDs.
00634      *
00635      * Before committing, get a session-level lock on the table, to ensure
00636      * that neither it nor the index can be dropped before we finish. This
00637      * cannot block, even if someone else is waiting for access, because we
00638      * already have the same lock within our transaction.
00639      *
00640      * Note: we don't currently bother with a session lock on the index,
00641      * because there are no operations that could change its state while we
00642      * hold lock on the parent table.  This might need to change later.
00643      */
00644     LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
00645 
00646     PopActiveSnapshot();
00647     CommitTransactionCommand();
00648     StartTransactionCommand();
00649 
00650     /*
00651      * Phase 2 of concurrent index build (see comments for validate_index()
00652      * for an overview of how this works)
00653      *
00654      * Now we must wait until no running transaction could have the table open
00655      * with the old list of indexes.  To do this, inquire which xacts
00656      * currently would conflict with ShareLock on the table -- ie, which ones
00657      * have a lock that permits writing the table.  Then wait for each of
00658      * these xacts to commit or abort.  Note we do not need to worry about
00659      * xacts that open the table for writing after this point; they will see
00660      * the new index when they open it.
00661      *
00662      * Note: the reason we use actual lock acquisition here, rather than just
00663      * checking the ProcArray and sleeping, is that deadlock is possible if
00664      * one of the transactions in question is blocked trying to acquire an
00665      * exclusive lock on our table.  The lock code will detect deadlock and
00666      * error out properly.
00667      *
00668      * Note: GetLockConflicts() never reports our own xid, hence we need not
00669      * check for that.  Also, prepared xacts are not reported, which is fine
00670      * since they certainly aren't going to do anything more.
00671      */
00672     old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
00673 
00674     while (VirtualTransactionIdIsValid(*old_lockholders))
00675     {
00676         VirtualXactLock(*old_lockholders, true);
00677         old_lockholders++;
00678     }
00679 
00680     /*
00681      * At this moment we are sure that there are no transactions with the
00682      * table open for write that don't have this new index in their list of
00683      * indexes.  We have waited out all the existing transactions and any new
00684      * transaction will have the new index in its list, but the index is still
00685      * marked as "not-ready-for-inserts".  The index is consulted while
00686      * deciding HOT-safety though.  This arrangement ensures that no new HOT
00687      * chains can be created where the new tuple and the old tuple in the
00688      * chain have different index keys.
00689      *
00690      * We now take a new snapshot, and build the index using all tuples that
00691      * are visible in this snapshot.  We can be sure that any HOT updates to
00692      * these tuples will be compatible with the index, since any updates made
00693      * by transactions that didn't know about the index are now committed or
00694      * rolled back.  Thus, each visible tuple is either the end of its
00695      * HOT-chain or the extension of the chain is HOT-safe for this index.
00696      */
00697 
00698     /* Open and lock the parent heap relation */
00699     rel = heap_openrv(stmt->relation, ShareUpdateExclusiveLock);
00700 
00701     /* And the target index relation */
00702     indexRelation = index_open(indexRelationId, RowExclusiveLock);
00703 
00704     /* Set ActiveSnapshot since functions in the indexes may need it */
00705     PushActiveSnapshot(GetTransactionSnapshot());
00706 
00707     /* We have to re-build the IndexInfo struct, since it was lost in commit */
00708     indexInfo = BuildIndexInfo(indexRelation);
00709     Assert(!indexInfo->ii_ReadyForInserts);
00710     indexInfo->ii_Concurrent = true;
00711     indexInfo->ii_BrokenHotChain = false;
00712 
00713     /* Now build the index */
00714     index_build(rel, indexRelation, indexInfo, stmt->primary, false);
00715 
00716     /* Close both the relations, but keep the locks */
00717     heap_close(rel, NoLock);
00718     index_close(indexRelation, NoLock);
00719 
00720     /*
00721      * Update the pg_index row to mark the index as ready for inserts. Once we
00722      * commit this transaction, any new transactions that open the table must
00723      * insert new entries into the index for insertions and non-HOT updates.
00724      */
00725     index_set_state_flags(indexRelationId, INDEX_CREATE_SET_READY);
00726 
00727     /* we can do away with our snapshot */
00728     PopActiveSnapshot();
00729 
00730     /*
00731      * Commit this transaction to make the indisready update visible.
00732      */
00733     CommitTransactionCommand();
00734     StartTransactionCommand();
00735 
00736     /*
00737      * Phase 3 of concurrent index build
00738      *
00739      * We once again wait until no transaction can have the table open with
00740      * the index marked as read-only for updates.
00741      */
00742     old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
00743 
00744     while (VirtualTransactionIdIsValid(*old_lockholders))
00745     {
00746         VirtualXactLock(*old_lockholders, true);
00747         old_lockholders++;
00748     }
00749 
00750     /*
00751      * Now take the "reference snapshot" that will be used by validate_index()
00752      * to filter candidate tuples.  Beware!  There might still be snapshots in
00753      * use that treat some transaction as in-progress that our reference
00754      * snapshot treats as committed.  If such a recently-committed transaction
00755      * deleted tuples in the table, we will not include them in the index; yet
00756      * those transactions which see the deleting one as still-in-progress will
00757      * expect such tuples to be there once we mark the index as valid.
00758      *
00759      * We solve this by waiting for all endangered transactions to exit before
00760      * we mark the index as valid.
00761      *
00762      * We also set ActiveSnapshot to this snap, since functions in indexes may
00763      * need a snapshot.
00764      */
00765     snapshot = RegisterSnapshot(GetTransactionSnapshot());
00766     PushActiveSnapshot(snapshot);
00767 
00768     /*
00769      * Scan the index and the heap, insert any missing index entries.
00770      */
00771     validate_index(relationId, indexRelationId, snapshot);
00772 
00773     /*
00774      * Drop the reference snapshot.  We must do this before waiting out other
00775      * snapshot holders, else we will deadlock against other processes also
00776      * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
00777      * they must wait for.  But first, save the snapshot's xmin to use as
00778      * limitXmin for GetCurrentVirtualXIDs().
00779      */
00780     limitXmin = snapshot->xmin;
00781 
00782     PopActiveSnapshot();
00783     UnregisterSnapshot(snapshot);
00784 
00785     /*
00786      * The index is now valid in the sense that it contains all currently
00787      * interesting tuples.  But since it might not contain tuples deleted just
00788      * before the reference snap was taken, we have to wait out any
00789      * transactions that might have older snapshots.  Obtain a list of VXIDs
00790      * of such transactions, and wait for them individually.
00791      *
00792      * We can exclude any running transactions that have xmin > the xmin of
00793      * our reference snapshot; their oldest snapshot must be newer than ours.
00794      * We can also exclude any transactions that have xmin = zero, since they
00795      * evidently have no live snapshot at all (and any one they might be in
00796      * process of taking is certainly newer than ours).  Transactions in other
00797      * DBs can be ignored too, since they'll never even be able to see this
00798      * index.
00799      *
00800      * We can also exclude autovacuum processes and processes running manual
00801      * lazy VACUUMs, because they won't be fazed by missing index entries
00802      * either.  (Manual ANALYZEs, however, can't be excluded because they
00803      * might be within transactions that are going to do arbitrary operations
00804      * later.)
00805      *
00806      * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
00807      * check for that.
00808      *
00809      * If a process goes idle-in-transaction with xmin zero, we do not need to
00810      * wait for it anymore, per the above argument.  We do not have the
00811      * infrastructure right now to stop waiting if that happens, but we can at
00812      * least avoid the folly of waiting when it is idle at the time we would
00813      * begin to wait.  We do this by repeatedly rechecking the output of
00814      * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
00815      * doesn't show up in the output, we know we can forget about it.
00816      */
00817     old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
00818                                           PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
00819                                           &n_old_snapshots);
00820 
00821     for (i = 0; i < n_old_snapshots; i++)
00822     {
00823         if (!VirtualTransactionIdIsValid(old_snapshots[i]))
00824             continue;           /* found uninteresting in previous cycle */
00825 
00826         if (i > 0)
00827         {
00828             /* see if anything's changed ... */
00829             VirtualTransactionId *newer_snapshots;
00830             int         n_newer_snapshots;
00831             int         j;
00832             int         k;
00833 
00834             newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
00835                                                     true, false,
00836                                          PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
00837                                                     &n_newer_snapshots);
00838             for (j = i; j < n_old_snapshots; j++)
00839             {
00840                 if (!VirtualTransactionIdIsValid(old_snapshots[j]))
00841                     continue;   /* found uninteresting in previous cycle */
00842                 for (k = 0; k < n_newer_snapshots; k++)
00843                 {
00844                     if (VirtualTransactionIdEquals(old_snapshots[j],
00845                                                    newer_snapshots[k]))
00846                         break;
00847                 }
00848                 if (k >= n_newer_snapshots)     /* not there anymore */
00849                     SetInvalidVirtualTransactionId(old_snapshots[j]);
00850             }
00851             pfree(newer_snapshots);
00852         }
00853 
00854         if (VirtualTransactionIdIsValid(old_snapshots[i]))
00855             VirtualXactLock(old_snapshots[i], true);
00856     }
00857 
00858     /*
00859      * Index can now be marked valid -- update its pg_index entry
00860      */
00861     index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
00862 
00863     /*
00864      * The pg_index update will cause backends (including this one) to update
00865      * relcache entries for the index itself, but we should also send a
00866      * relcache inval on the parent table to force replanning of cached plans.
00867      * Otherwise existing sessions might fail to use the new index where it
00868      * would be useful.  (Note that our earlier commits did not create reasons
00869      * to replan; so relcache flush on the index itself was sufficient.)
00870      */
00871     CacheInvalidateRelcacheByRelid(heaprelid.relId);
00872 
00873     /*
00874      * Last thing to do is release the session-level lock on the parent table.
00875      */
00876     UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
00877 
00878     return indexRelationId;
00879 }
00880 
00881 
00882 /*
00883  * CheckMutability
00884  *      Test whether given expression is mutable
00885  */
00886 static bool
00887 CheckMutability(Expr *expr)
00888 {
00889     /*
00890      * First run the expression through the planner.  This has a couple of
00891      * important consequences.  First, function default arguments will get
00892      * inserted, which may affect volatility (consider "default now()").
00893      * Second, inline-able functions will get inlined, which may allow us to
00894      * conclude that the function is really less volatile than it's marked. As
00895      * an example, polymorphic functions must be marked with the most volatile
00896      * behavior that they have for any input type, but once we inline the
00897      * function we may be able to conclude that it's not so volatile for the
00898      * particular input type we're dealing with.
00899      *
00900      * We assume here that expression_planner() won't scribble on its input.
00901      */
00902     expr = expression_planner(expr);
00903 
00904     /* Now we can search for non-immutable functions */
00905     return contain_mutable_functions((Node *) expr);
00906 }
00907 
00908 
00909 /*
00910  * CheckPredicate
00911  *      Checks that the given partial-index predicate is valid.
00912  *
00913  * This used to also constrain the form of the predicate to forms that
00914  * indxpath.c could do something with.  However, that seems overly
00915  * restrictive.  One useful application of partial indexes is to apply
00916  * a UNIQUE constraint across a subset of a table, and in that scenario
00917  * any evaluatable predicate will work.  So accept any predicate here
00918  * (except ones requiring a plan), and let indxpath.c fend for itself.
00919  */
00920 static void
00921 CheckPredicate(Expr *predicate)
00922 {
00923     /*
00924      * transformExpr() should have already rejected subqueries, aggregates,
00925      * and window functions, based on the EXPR_KIND_ for a predicate.
00926      */
00927 
00928     /*
00929      * A predicate using mutable functions is probably wrong, for the same
00930      * reasons that we don't allow an index expression to use one.
00931      */
00932     if (CheckMutability(predicate))
00933         ereport(ERROR,
00934                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
00935            errmsg("functions in index predicate must be marked IMMUTABLE")));
00936 }
00937 
00938 /*
00939  * Compute per-index-column information, including indexed column numbers
00940  * or index expressions, opclasses, and indoptions.
00941  */
00942 static void
00943 ComputeIndexAttrs(IndexInfo *indexInfo,
00944                   Oid *typeOidP,
00945                   Oid *collationOidP,
00946                   Oid *classOidP,
00947                   int16 *colOptionP,
00948                   List *attList,    /* list of IndexElem's */
00949                   List *exclusionOpNames,
00950                   Oid relId,
00951                   char *accessMethodName,
00952                   Oid accessMethodId,
00953                   bool amcanorder,
00954                   bool isconstraint)
00955 {
00956     ListCell   *nextExclOp;
00957     ListCell   *lc;
00958     int         attn;
00959 
00960     /* Allocate space for exclusion operator info, if needed */
00961     if (exclusionOpNames)
00962     {
00963         int         ncols = list_length(attList);
00964 
00965         Assert(list_length(exclusionOpNames) == ncols);
00966         indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * ncols);
00967         indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * ncols);
00968         indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
00969         nextExclOp = list_head(exclusionOpNames);
00970     }
00971     else
00972         nextExclOp = NULL;
00973 
00974     /*
00975      * process attributeList
00976      */
00977     attn = 0;
00978     foreach(lc, attList)
00979     {
00980         IndexElem  *attribute = (IndexElem *) lfirst(lc);
00981         Oid         atttype;
00982         Oid         attcollation;
00983 
00984         /*
00985          * Process the column-or-expression to be indexed.
00986          */
00987         if (attribute->name != NULL)
00988         {
00989             /* Simple index attribute */
00990             HeapTuple   atttuple;
00991             Form_pg_attribute attform;
00992 
00993             Assert(attribute->expr == NULL);
00994             atttuple = SearchSysCacheAttName(relId, attribute->name);
00995             if (!HeapTupleIsValid(atttuple))
00996             {
00997                 /* difference in error message spellings is historical */
00998                 if (isconstraint)
00999                     ereport(ERROR,
01000                             (errcode(ERRCODE_UNDEFINED_COLUMN),
01001                           errmsg("column \"%s\" named in key does not exist",
01002                                  attribute->name)));
01003                 else
01004                     ereport(ERROR,
01005                             (errcode(ERRCODE_UNDEFINED_COLUMN),
01006                              errmsg("column \"%s\" does not exist",
01007                                     attribute->name)));
01008             }
01009             attform = (Form_pg_attribute) GETSTRUCT(atttuple);
01010             indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
01011             atttype = attform->atttypid;
01012             attcollation = attform->attcollation;
01013             ReleaseSysCache(atttuple);
01014         }
01015         else
01016         {
01017             /* Index expression */
01018             Node       *expr = attribute->expr;
01019 
01020             Assert(expr != NULL);
01021             atttype = exprType(expr);
01022             attcollation = exprCollation(expr);
01023 
01024             /*
01025              * Strip any top-level COLLATE clause.  This ensures that we treat
01026              * "x COLLATE y" and "(x COLLATE y)" alike.
01027              */
01028             while (IsA(expr, CollateExpr))
01029                 expr = (Node *) ((CollateExpr *) expr)->arg;
01030 
01031             if (IsA(expr, Var) &&
01032                 ((Var *) expr)->varattno != InvalidAttrNumber)
01033             {
01034                 /*
01035                  * User wrote "(column)" or "(column COLLATE something)".
01036                  * Treat it like simple attribute anyway.
01037                  */
01038                 indexInfo->ii_KeyAttrNumbers[attn] = ((Var *) expr)->varattno;
01039             }
01040             else
01041             {
01042                 indexInfo->ii_KeyAttrNumbers[attn] = 0; /* marks expression */
01043                 indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
01044                                                     expr);
01045 
01046                 /*
01047                  * transformExpr() should have already rejected subqueries,
01048                  * aggregates, and window functions, based on the EXPR_KIND_
01049                  * for an index expression.
01050                  */
01051 
01052                 /*
01053                  * A expression using mutable functions is probably wrong,
01054                  * since if you aren't going to get the same result for the
01055                  * same data every time, it's not clear what the index entries
01056                  * mean at all.
01057                  */
01058                 if (CheckMutability((Expr *) expr))
01059                     ereport(ERROR,
01060                             (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
01061                              errmsg("functions in index expression must be marked IMMUTABLE")));
01062             }
01063         }
01064 
01065         typeOidP[attn] = atttype;
01066 
01067         /*
01068          * Apply collation override if any
01069          */
01070         if (attribute->collation)
01071             attcollation = get_collation_oid(attribute->collation, false);
01072 
01073         /*
01074          * Check we have a collation iff it's a collatable type.  The only
01075          * expected failures here are (1) COLLATE applied to a noncollatable
01076          * type, or (2) index expression had an unresolved collation.  But we
01077          * might as well code this to be a complete consistency check.
01078          */
01079         if (type_is_collatable(atttype))
01080         {
01081             if (!OidIsValid(attcollation))
01082                 ereport(ERROR,
01083                         (errcode(ERRCODE_INDETERMINATE_COLLATION),
01084                          errmsg("could not determine which collation to use for index expression"),
01085                          errhint("Use the COLLATE clause to set the collation explicitly.")));
01086         }
01087         else
01088         {
01089             if (OidIsValid(attcollation))
01090                 ereport(ERROR,
01091                         (errcode(ERRCODE_DATATYPE_MISMATCH),
01092                          errmsg("collations are not supported by type %s",
01093                                 format_type_be(atttype))));
01094         }
01095 
01096         collationOidP[attn] = attcollation;
01097 
01098         /*
01099          * Identify the opclass to use.
01100          */
01101         classOidP[attn] = GetIndexOpClass(attribute->opclass,
01102                                           atttype,
01103                                           accessMethodName,
01104                                           accessMethodId);
01105 
01106         /*
01107          * Identify the exclusion operator, if any.
01108          */
01109         if (nextExclOp)
01110         {
01111             List       *opname = (List *) lfirst(nextExclOp);
01112             Oid         opid;
01113             Oid         opfamily;
01114             int         strat;
01115 
01116             /*
01117              * Find the operator --- it must accept the column datatype
01118              * without runtime coercion (but binary compatibility is OK)
01119              */
01120             opid = compatible_oper_opid(opname, atttype, atttype, false);
01121 
01122             /*
01123              * Only allow commutative operators to be used in exclusion
01124              * constraints. If X conflicts with Y, but Y does not conflict
01125              * with X, bad things will happen.
01126              */
01127             if (get_commutator(opid) != opid)
01128                 ereport(ERROR,
01129                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01130                          errmsg("operator %s is not commutative",
01131                                 format_operator(opid)),
01132                          errdetail("Only commutative operators can be used in exclusion constraints.")));
01133 
01134             /*
01135              * Operator must be a member of the right opfamily, too
01136              */
01137             opfamily = get_opclass_family(classOidP[attn]);
01138             strat = get_op_opfamily_strategy(opid, opfamily);
01139             if (strat == 0)
01140             {
01141                 HeapTuple   opftuple;
01142                 Form_pg_opfamily opfform;
01143 
01144                 /*
01145                  * attribute->opclass might not explicitly name the opfamily,
01146                  * so fetch the name of the selected opfamily for use in the
01147                  * error message.
01148                  */
01149                 opftuple = SearchSysCache1(OPFAMILYOID,
01150                                            ObjectIdGetDatum(opfamily));
01151                 if (!HeapTupleIsValid(opftuple))
01152                     elog(ERROR, "cache lookup failed for opfamily %u",
01153                          opfamily);
01154                 opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
01155 
01156                 ereport(ERROR,
01157                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01158                          errmsg("operator %s is not a member of operator family \"%s\"",
01159                                 format_operator(opid),
01160                                 NameStr(opfform->opfname)),
01161                          errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
01162             }
01163 
01164             indexInfo->ii_ExclusionOps[attn] = opid;
01165             indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
01166             indexInfo->ii_ExclusionStrats[attn] = strat;
01167             nextExclOp = lnext(nextExclOp);
01168         }
01169 
01170         /*
01171          * Set up the per-column options (indoption field).  For now, this is
01172          * zero for any un-ordered index, while ordered indexes have DESC and
01173          * NULLS FIRST/LAST options.
01174          */
01175         colOptionP[attn] = 0;
01176         if (amcanorder)
01177         {
01178             /* default ordering is ASC */
01179             if (attribute->ordering == SORTBY_DESC)
01180                 colOptionP[attn] |= INDOPTION_DESC;
01181             /* default null ordering is LAST for ASC, FIRST for DESC */
01182             if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
01183             {
01184                 if (attribute->ordering == SORTBY_DESC)
01185                     colOptionP[attn] |= INDOPTION_NULLS_FIRST;
01186             }
01187             else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
01188                 colOptionP[attn] |= INDOPTION_NULLS_FIRST;
01189         }
01190         else
01191         {
01192             /* index AM does not support ordering */
01193             if (attribute->ordering != SORTBY_DEFAULT)
01194                 ereport(ERROR,
01195                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01196                          errmsg("access method \"%s\" does not support ASC/DESC options",
01197                                 accessMethodName)));
01198             if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
01199                 ereport(ERROR,
01200                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01201                          errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
01202                                 accessMethodName)));
01203         }
01204 
01205         attn++;
01206     }
01207 }
01208 
01209 /*
01210  * Resolve possibly-defaulted operator class specification
01211  */
01212 static Oid
01213 GetIndexOpClass(List *opclass, Oid attrType,
01214                 char *accessMethodName, Oid accessMethodId)
01215 {
01216     char       *schemaname;
01217     char       *opcname;
01218     HeapTuple   tuple;
01219     Oid         opClassId,
01220                 opInputType;
01221 
01222     /*
01223      * Release 7.0 removed network_ops, timespan_ops, and datetime_ops, so we
01224      * ignore those opclass names so the default *_ops is used.  This can be
01225      * removed in some later release.  bjm 2000/02/07
01226      *
01227      * Release 7.1 removes lztext_ops, so suppress that too for a while.  tgl
01228      * 2000/07/30
01229      *
01230      * Release 7.2 renames timestamp_ops to timestamptz_ops, so suppress that
01231      * too for awhile.  I'm starting to think we need a better approach. tgl
01232      * 2000/10/01
01233      *
01234      * Release 8.0 removes bigbox_ops (which was dead code for a long while
01235      * anyway).  tgl 2003/11/11
01236      */
01237     if (list_length(opclass) == 1)
01238     {
01239         char       *claname = strVal(linitial(opclass));
01240 
01241         if (strcmp(claname, "network_ops") == 0 ||
01242             strcmp(claname, "timespan_ops") == 0 ||
01243             strcmp(claname, "datetime_ops") == 0 ||
01244             strcmp(claname, "lztext_ops") == 0 ||
01245             strcmp(claname, "timestamp_ops") == 0 ||
01246             strcmp(claname, "bigbox_ops") == 0)
01247             opclass = NIL;
01248     }
01249 
01250     if (opclass == NIL)
01251     {
01252         /* no operator class specified, so find the default */
01253         opClassId = GetDefaultOpClass(attrType, accessMethodId);
01254         if (!OidIsValid(opClassId))
01255             ereport(ERROR,
01256                     (errcode(ERRCODE_UNDEFINED_OBJECT),
01257                      errmsg("data type %s has no default operator class for access method \"%s\"",
01258                             format_type_be(attrType), accessMethodName),
01259                      errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
01260         return opClassId;
01261     }
01262 
01263     /*
01264      * Specific opclass name given, so look up the opclass.
01265      */
01266 
01267     /* deconstruct the name list */
01268     DeconstructQualifiedName(opclass, &schemaname, &opcname);
01269 
01270     if (schemaname)
01271     {
01272         /* Look in specific schema only */
01273         Oid         namespaceId;
01274 
01275         namespaceId = LookupExplicitNamespace(schemaname, false);
01276         tuple = SearchSysCache3(CLAAMNAMENSP,
01277                                 ObjectIdGetDatum(accessMethodId),
01278                                 PointerGetDatum(opcname),
01279                                 ObjectIdGetDatum(namespaceId));
01280     }
01281     else
01282     {
01283         /* Unqualified opclass name, so search the search path */
01284         opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
01285         if (!OidIsValid(opClassId))
01286             ereport(ERROR,
01287                     (errcode(ERRCODE_UNDEFINED_OBJECT),
01288                      errmsg("operator class \"%s\" does not exist for access method \"%s\"",
01289                             opcname, accessMethodName)));
01290         tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
01291     }
01292 
01293     if (!HeapTupleIsValid(tuple))
01294         ereport(ERROR,
01295                 (errcode(ERRCODE_UNDEFINED_OBJECT),
01296                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
01297                         NameListToString(opclass), accessMethodName)));
01298 
01299     /*
01300      * Verify that the index operator class accepts this datatype.  Note we
01301      * will accept binary compatibility.
01302      */
01303     opClassId = HeapTupleGetOid(tuple);
01304     opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
01305 
01306     if (!IsBinaryCoercible(attrType, opInputType))
01307         ereport(ERROR,
01308                 (errcode(ERRCODE_DATATYPE_MISMATCH),
01309                  errmsg("operator class \"%s\" does not accept data type %s",
01310                       NameListToString(opclass), format_type_be(attrType))));
01311 
01312     ReleaseSysCache(tuple);
01313 
01314     return opClassId;
01315 }
01316 
01317 /*
01318  * GetDefaultOpClass
01319  *
01320  * Given the OIDs of a datatype and an access method, find the default
01321  * operator class, if any.  Returns InvalidOid if there is none.
01322  */
01323 Oid
01324 GetDefaultOpClass(Oid type_id, Oid am_id)
01325 {
01326     Oid         result = InvalidOid;
01327     int         nexact = 0;
01328     int         ncompatible = 0;
01329     int         ncompatiblepreferred = 0;
01330     Relation    rel;
01331     ScanKeyData skey[1];
01332     SysScanDesc scan;
01333     HeapTuple   tup;
01334     TYPCATEGORY tcategory;
01335 
01336     /* If it's a domain, look at the base type instead */
01337     type_id = getBaseType(type_id);
01338 
01339     tcategory = TypeCategory(type_id);
01340 
01341     /*
01342      * We scan through all the opclasses available for the access method,
01343      * looking for one that is marked default and matches the target type
01344      * (either exactly or binary-compatibly, but prefer an exact match).
01345      *
01346      * We could find more than one binary-compatible match.  If just one is
01347      * for a preferred type, use that one; otherwise we fail, forcing the user
01348      * to specify which one he wants.  (The preferred-type special case is a
01349      * kluge for varchar: it's binary-compatible to both text and bpchar, so
01350      * we need a tiebreaker.)  If we find more than one exact match, then
01351      * someone put bogus entries in pg_opclass.
01352      */
01353     rel = heap_open(OperatorClassRelationId, AccessShareLock);
01354 
01355     ScanKeyInit(&skey[0],
01356                 Anum_pg_opclass_opcmethod,
01357                 BTEqualStrategyNumber, F_OIDEQ,
01358                 ObjectIdGetDatum(am_id));
01359 
01360     scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
01361                               SnapshotNow, 1, skey);
01362 
01363     while (HeapTupleIsValid(tup = systable_getnext(scan)))
01364     {
01365         Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
01366 
01367         /* ignore altogether if not a default opclass */
01368         if (!opclass->opcdefault)
01369             continue;
01370         if (opclass->opcintype == type_id)
01371         {
01372             nexact++;
01373             result = HeapTupleGetOid(tup);
01374         }
01375         else if (nexact == 0 &&
01376                  IsBinaryCoercible(type_id, opclass->opcintype))
01377         {
01378             if (IsPreferredType(tcategory, opclass->opcintype))
01379             {
01380                 ncompatiblepreferred++;
01381                 result = HeapTupleGetOid(tup);
01382             }
01383             else if (ncompatiblepreferred == 0)
01384             {
01385                 ncompatible++;
01386                 result = HeapTupleGetOid(tup);
01387             }
01388         }
01389     }
01390 
01391     systable_endscan(scan);
01392 
01393     heap_close(rel, AccessShareLock);
01394 
01395     /* raise error if pg_opclass contains inconsistent data */
01396     if (nexact > 1)
01397         ereport(ERROR,
01398                 (errcode(ERRCODE_DUPLICATE_OBJECT),
01399         errmsg("there are multiple default operator classes for data type %s",
01400                format_type_be(type_id))));
01401 
01402     if (nexact == 1 ||
01403         ncompatiblepreferred == 1 ||
01404         (ncompatiblepreferred == 0 && ncompatible == 1))
01405         return result;
01406 
01407     return InvalidOid;
01408 }
01409 
01410 /*
01411  *  makeObjectName()
01412  *
01413  *  Create a name for an implicitly created index, sequence, constraint, etc.
01414  *
01415  *  The parameters are typically: the original table name, the original field
01416  *  name, and a "type" string (such as "seq" or "pkey").    The field name
01417  *  and/or type can be NULL if not relevant.
01418  *
01419  *  The result is a palloc'd string.
01420  *
01421  *  The basic result we want is "name1_name2_label", omitting "_name2" or
01422  *  "_label" when those parameters are NULL.  However, we must generate
01423  *  a name with less than NAMEDATALEN characters!  So, we truncate one or
01424  *  both names if necessary to make a short-enough string.  The label part
01425  *  is never truncated (so it had better be reasonably short).
01426  *
01427  *  The caller is responsible for checking uniqueness of the generated
01428  *  name and retrying as needed; retrying will be done by altering the
01429  *  "label" string (which is why we never truncate that part).
01430  */
01431 char *
01432 makeObjectName(const char *name1, const char *name2, const char *label)
01433 {
01434     char       *name;
01435     int         overhead = 0;   /* chars needed for label and underscores */
01436     int         availchars;     /* chars available for name(s) */
01437     int         name1chars;     /* chars allocated to name1 */
01438     int         name2chars;     /* chars allocated to name2 */
01439     int         ndx;
01440 
01441     name1chars = strlen(name1);
01442     if (name2)
01443     {
01444         name2chars = strlen(name2);
01445         overhead++;             /* allow for separating underscore */
01446     }
01447     else
01448         name2chars = 0;
01449     if (label)
01450         overhead += strlen(label) + 1;
01451 
01452     availchars = NAMEDATALEN - 1 - overhead;
01453     Assert(availchars > 0);     /* else caller chose a bad label */
01454 
01455     /*
01456      * If we must truncate,  preferentially truncate the longer name. This
01457      * logic could be expressed without a loop, but it's simple and obvious as
01458      * a loop.
01459      */
01460     while (name1chars + name2chars > availchars)
01461     {
01462         if (name1chars > name2chars)
01463             name1chars--;
01464         else
01465             name2chars--;
01466     }
01467 
01468     name1chars = pg_mbcliplen(name1, name1chars, name1chars);
01469     if (name2)
01470         name2chars = pg_mbcliplen(name2, name2chars, name2chars);
01471 
01472     /* Now construct the string using the chosen lengths */
01473     name = palloc(name1chars + name2chars + overhead + 1);
01474     memcpy(name, name1, name1chars);
01475     ndx = name1chars;
01476     if (name2)
01477     {
01478         name[ndx++] = '_';
01479         memcpy(name + ndx, name2, name2chars);
01480         ndx += name2chars;
01481     }
01482     if (label)
01483     {
01484         name[ndx++] = '_';
01485         strcpy(name + ndx, label);
01486     }
01487     else
01488         name[ndx] = '\0';
01489 
01490     return name;
01491 }
01492 
01493 /*
01494  * Select a nonconflicting name for a new relation.  This is ordinarily
01495  * used to choose index names (which is why it's here) but it can also
01496  * be used for sequences, or any autogenerated relation kind.
01497  *
01498  * name1, name2, and label are used the same way as for makeObjectName(),
01499  * except that the label can't be NULL; digits will be appended to the label
01500  * if needed to create a name that is unique within the specified namespace.
01501  *
01502  * Note: it is theoretically possible to get a collision anyway, if someone
01503  * else chooses the same name concurrently.  This is fairly unlikely to be
01504  * a problem in practice, especially if one is holding an exclusive lock on
01505  * the relation identified by name1.  However, if choosing multiple names
01506  * within a single command, you'd better create the new object and do
01507  * CommandCounterIncrement before choosing the next one!
01508  *
01509  * Returns a palloc'd string.
01510  */
01511 char *
01512 ChooseRelationName(const char *name1, const char *name2,
01513                    const char *label, Oid namespaceid)
01514 {
01515     int         pass = 0;
01516     char       *relname = NULL;
01517     char        modlabel[NAMEDATALEN];
01518 
01519     /* try the unmodified label first */
01520     StrNCpy(modlabel, label, sizeof(modlabel));
01521 
01522     for (;;)
01523     {
01524         relname = makeObjectName(name1, name2, modlabel);
01525 
01526         if (!OidIsValid(get_relname_relid(relname, namespaceid)))
01527             break;
01528 
01529         /* found a conflict, so try a new name component */
01530         pfree(relname);
01531         snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
01532     }
01533 
01534     return relname;
01535 }
01536 
01537 /*
01538  * Select the name to be used for an index.
01539  *
01540  * The argument list is pretty ad-hoc :-(
01541  */
01542 static char *
01543 ChooseIndexName(const char *tabname, Oid namespaceId,
01544                 List *colnames, List *exclusionOpNames,
01545                 bool primary, bool isconstraint)
01546 {
01547     char       *indexname;
01548 
01549     if (primary)
01550     {
01551         /* the primary key's name does not depend on the specific column(s) */
01552         indexname = ChooseRelationName(tabname,
01553                                        NULL,
01554                                        "pkey",
01555                                        namespaceId);
01556     }
01557     else if (exclusionOpNames != NIL)
01558     {
01559         indexname = ChooseRelationName(tabname,
01560                                        ChooseIndexNameAddition(colnames),
01561                                        "excl",
01562                                        namespaceId);
01563     }
01564     else if (isconstraint)
01565     {
01566         indexname = ChooseRelationName(tabname,
01567                                        ChooseIndexNameAddition(colnames),
01568                                        "key",
01569                                        namespaceId);
01570     }
01571     else
01572     {
01573         indexname = ChooseRelationName(tabname,
01574                                        ChooseIndexNameAddition(colnames),
01575                                        "idx",
01576                                        namespaceId);
01577     }
01578 
01579     return indexname;
01580 }
01581 
01582 /*
01583  * Generate "name2" for a new index given the list of column names for it
01584  * (as produced by ChooseIndexColumnNames).  This will be passed to
01585  * ChooseRelationName along with the parent table name and a suitable label.
01586  *
01587  * We know that less than NAMEDATALEN characters will actually be used,
01588  * so we can truncate the result once we've generated that many.
01589  */
01590 static char *
01591 ChooseIndexNameAddition(List *colnames)
01592 {
01593     char        buf[NAMEDATALEN * 2];
01594     int         buflen = 0;
01595     ListCell   *lc;
01596 
01597     buf[0] = '\0';
01598     foreach(lc, colnames)
01599     {
01600         const char *name = (const char *) lfirst(lc);
01601 
01602         if (buflen > 0)
01603             buf[buflen++] = '_';    /* insert _ between names */
01604 
01605         /*
01606          * At this point we have buflen <= NAMEDATALEN.  name should be less
01607          * than NAMEDATALEN already, but use strlcpy for paranoia.
01608          */
01609         strlcpy(buf + buflen, name, NAMEDATALEN);
01610         buflen += strlen(buf + buflen);
01611         if (buflen >= NAMEDATALEN)
01612             break;
01613     }
01614     return pstrdup(buf);
01615 }
01616 
01617 /*
01618  * Select the actual names to be used for the columns of an index, given the
01619  * list of IndexElems for the columns.  This is mostly about ensuring the
01620  * names are unique so we don't get a conflicting-attribute-names error.
01621  *
01622  * Returns a List of plain strings (char *, not String nodes).
01623  */
01624 static List *
01625 ChooseIndexColumnNames(List *indexElems)
01626 {
01627     List       *result = NIL;
01628     ListCell   *lc;
01629 
01630     foreach(lc, indexElems)
01631     {
01632         IndexElem  *ielem = (IndexElem *) lfirst(lc);
01633         const char *origname;
01634         const char *curname;
01635         int         i;
01636         char        buf[NAMEDATALEN];
01637 
01638         /* Get the preliminary name from the IndexElem */
01639         if (ielem->indexcolname)
01640             origname = ielem->indexcolname;     /* caller-specified name */
01641         else if (ielem->name)
01642             origname = ielem->name;     /* simple column reference */
01643         else
01644             origname = "expr";  /* default name for expression */
01645 
01646         /* If it conflicts with any previous column, tweak it */
01647         curname = origname;
01648         for (i = 1;; i++)
01649         {
01650             ListCell   *lc2;
01651             char        nbuf[32];
01652             int         nlen;
01653 
01654             foreach(lc2, result)
01655             {
01656                 if (strcmp(curname, (char *) lfirst(lc2)) == 0)
01657                     break;
01658             }
01659             if (lc2 == NULL)
01660                 break;          /* found nonconflicting name */
01661 
01662             sprintf(nbuf, "%d", i);
01663 
01664             /* Ensure generated names are shorter than NAMEDATALEN */
01665             nlen = pg_mbcliplen(origname, strlen(origname),
01666                                 NAMEDATALEN - 1 - strlen(nbuf));
01667             memcpy(buf, origname, nlen);
01668             strcpy(buf + nlen, nbuf);
01669             curname = buf;
01670         }
01671 
01672         /* And attach to the result list */
01673         result = lappend(result, pstrdup(curname));
01674     }
01675     return result;
01676 }
01677 
01678 /*
01679  * ReindexIndex
01680  *      Recreate a specific index.
01681  */
01682 Oid
01683 ReindexIndex(RangeVar *indexRelation)
01684 {
01685     Oid         indOid;
01686     Oid         heapOid = InvalidOid;
01687 
01688     /* lock level used here should match index lock reindex_index() */
01689     indOid = RangeVarGetRelidExtended(indexRelation, AccessExclusiveLock,
01690                                       false, false,
01691                                       RangeVarCallbackForReindexIndex,
01692                                       (void *) &heapOid);
01693 
01694     reindex_index(indOid, false);
01695 
01696     return indOid;
01697 }
01698 
01699 /*
01700  * Check permissions on table before acquiring relation lock; also lock
01701  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
01702  * deadlocks.
01703  */
01704 static void
01705 RangeVarCallbackForReindexIndex(const RangeVar *relation,
01706                                 Oid relId, Oid oldRelId, void *arg)
01707 {
01708     char        relkind;
01709     Oid        *heapOid = (Oid *) arg;
01710 
01711     /*
01712      * If we previously locked some other index's heap, and the name we're
01713      * looking up no longer refers to that relation, release the now-useless
01714      * lock.
01715      */
01716     if (relId != oldRelId && OidIsValid(oldRelId))
01717     {
01718         /* lock level here should match reindex_index() heap lock */
01719         UnlockRelationOid(*heapOid, ShareLock);
01720         *heapOid = InvalidOid;
01721     }
01722 
01723     /* If the relation does not exist, there's nothing more to do. */
01724     if (!OidIsValid(relId))
01725         return;
01726 
01727     /*
01728      * If the relation does exist, check whether it's an index.  But note that
01729      * the relation might have been dropped between the time we did the name
01730      * lookup and now.  In that case, there's nothing to do.
01731      */
01732     relkind = get_rel_relkind(relId);
01733     if (!relkind)
01734         return;
01735     if (relkind != RELKIND_INDEX)
01736         ereport(ERROR,
01737                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01738                  errmsg("\"%s\" is not an index", relation->relname)));
01739 
01740     /* Check permissions */
01741     if (!pg_class_ownercheck(relId, GetUserId()))
01742         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
01743 
01744     /* Lock heap before index to avoid deadlock. */
01745     if (relId != oldRelId)
01746     {
01747         /*
01748          * Lock level here should match reindex_index() heap lock. If the OID
01749          * isn't valid, it means the index as concurrently dropped, which is
01750          * not a problem for us; just return normally.
01751          */
01752         *heapOid = IndexGetRelation(relId, true);
01753         if (OidIsValid(*heapOid))
01754             LockRelationOid(*heapOid, ShareLock);
01755     }
01756 }
01757 
01758 /*
01759  * ReindexTable
01760  *      Recreate all indexes of a table (and of its toast table, if any)
01761  */
01762 Oid
01763 ReindexTable(RangeVar *relation)
01764 {
01765     Oid         heapOid;
01766 
01767     /* The lock level used here should match reindex_relation(). */
01768     heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false,
01769                                        RangeVarCallbackOwnsTable, NULL);
01770 
01771     if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
01772         ereport(NOTICE,
01773                 (errmsg("table \"%s\" has no indexes",
01774                         relation->relname)));
01775 
01776     return heapOid;
01777 }
01778 
01779 /*
01780  * ReindexDatabase
01781  *      Recreate indexes of a database.
01782  *
01783  * To reduce the probability of deadlocks, each table is reindexed in a
01784  * separate transaction, so we can release the lock on it right away.
01785  * That means this must not be called within a user transaction block!
01786  */
01787 Oid
01788 ReindexDatabase(const char *databaseName, bool do_system, bool do_user)
01789 {
01790     Relation    relationRelation;
01791     HeapScanDesc scan;
01792     HeapTuple   tuple;
01793     MemoryContext private_context;
01794     MemoryContext old;
01795     List       *relids = NIL;
01796     ListCell   *l;
01797 
01798     AssertArg(databaseName);
01799 
01800     if (strcmp(databaseName, get_database_name(MyDatabaseId)) != 0)
01801         ereport(ERROR,
01802                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01803                  errmsg("can only reindex the currently open database")));
01804 
01805     if (!pg_database_ownercheck(MyDatabaseId, GetUserId()))
01806         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
01807                        databaseName);
01808 
01809     /*
01810      * Create a memory context that will survive forced transaction commits we
01811      * do below.  Since it is a child of PortalContext, it will go away
01812      * eventually even if we suffer an error; there's no need for special
01813      * abort cleanup logic.
01814      */
01815     private_context = AllocSetContextCreate(PortalContext,
01816                                             "ReindexDatabase",
01817                                             ALLOCSET_DEFAULT_MINSIZE,
01818                                             ALLOCSET_DEFAULT_INITSIZE,
01819                                             ALLOCSET_DEFAULT_MAXSIZE);
01820 
01821     /*
01822      * We always want to reindex pg_class first.  This ensures that if there
01823      * is any corruption in pg_class' indexes, they will be fixed before we
01824      * process any other tables.  This is critical because reindexing itself
01825      * will try to update pg_class.
01826      */
01827     if (do_system)
01828     {
01829         old = MemoryContextSwitchTo(private_context);
01830         relids = lappend_oid(relids, RelationRelationId);
01831         MemoryContextSwitchTo(old);
01832     }
01833 
01834     /*
01835      * Scan pg_class to build a list of the relations we need to reindex.
01836      *
01837      * We only consider plain relations here (toast rels will be processed
01838      * indirectly by reindex_relation).
01839      */
01840     relationRelation = heap_open(RelationRelationId, AccessShareLock);
01841     scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
01842     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
01843     {
01844         Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
01845 
01846         if (classtuple->relkind != RELKIND_RELATION &&
01847             classtuple->relkind != RELKIND_MATVIEW)
01848             continue;
01849 
01850         /* Skip temp tables of other backends; we can't reindex them at all */
01851         if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
01852             !isTempNamespace(classtuple->relnamespace))
01853             continue;
01854 
01855         /* Check user/system classification, and optionally skip */
01856         if (IsSystemClass(classtuple))
01857         {
01858             if (!do_system)
01859                 continue;
01860         }
01861         else
01862         {
01863             if (!do_user)
01864                 continue;
01865         }
01866 
01867         if (HeapTupleGetOid(tuple) == RelationRelationId)
01868             continue;           /* got it already */
01869 
01870         old = MemoryContextSwitchTo(private_context);
01871         relids = lappend_oid(relids, HeapTupleGetOid(tuple));
01872         MemoryContextSwitchTo(old);
01873     }
01874     heap_endscan(scan);
01875     heap_close(relationRelation, AccessShareLock);
01876 
01877     /* Now reindex each rel in a separate transaction */
01878     PopActiveSnapshot();
01879     CommitTransactionCommand();
01880     foreach(l, relids)
01881     {
01882         Oid         relid = lfirst_oid(l);
01883 
01884         StartTransactionCommand();
01885         /* functions in indexes may want a snapshot set */
01886         PushActiveSnapshot(GetTransactionSnapshot());
01887         if (reindex_relation(relid, REINDEX_REL_PROCESS_TOAST))
01888             ereport(NOTICE,
01889                     (errmsg("table \"%s.%s\" was reindexed",
01890                             get_namespace_name(get_rel_namespace(relid)),
01891                             get_rel_name(relid))));
01892         PopActiveSnapshot();
01893         CommitTransactionCommand();
01894     }
01895     StartTransactionCommand();
01896 
01897     MemoryContextDelete(private_context);
01898 
01899     return MyDatabaseId;
01900 }