Header And Logo

PostgreSQL
| The world's most advanced open source database.

relcache.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * relcache.c
00004  *    POSTGRES relation descriptor cache code
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/utils/cache/relcache.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 /*
00016  * INTERFACE ROUTINES
00017  *      RelationCacheInitialize         - initialize relcache (to empty)
00018  *      RelationCacheInitializePhase2   - initialize shared-catalog entries
00019  *      RelationCacheInitializePhase3   - finish initializing relcache
00020  *      RelationIdGetRelation           - get a reldesc by relation id
00021  *      RelationClose                   - close an open relation
00022  *
00023  * NOTES
00024  *      The following code contains many undocumented hacks.  Please be
00025  *      careful....
00026  */
00027 #include "postgres.h"
00028 
00029 #include <sys/file.h>
00030 #include <fcntl.h>
00031 #include <unistd.h>
00032 
00033 #include "access/htup_details.h"
00034 #include "access/multixact.h"
00035 #include "access/reloptions.h"
00036 #include "access/sysattr.h"
00037 #include "access/transam.h"
00038 #include "access/xact.h"
00039 #include "catalog/catalog.h"
00040 #include "catalog/heap.h"
00041 #include "catalog/index.h"
00042 #include "catalog/indexing.h"
00043 #include "catalog/namespace.h"
00044 #include "catalog/pg_amproc.h"
00045 #include "catalog/pg_attrdef.h"
00046 #include "catalog/pg_authid.h"
00047 #include "catalog/pg_auth_members.h"
00048 #include "catalog/pg_constraint.h"
00049 #include "catalog/pg_database.h"
00050 #include "catalog/pg_namespace.h"
00051 #include "catalog/pg_opclass.h"
00052 #include "catalog/pg_proc.h"
00053 #include "catalog/pg_rewrite.h"
00054 #include "catalog/pg_tablespace.h"
00055 #include "catalog/pg_trigger.h"
00056 #include "catalog/pg_type.h"
00057 #include "catalog/schemapg.h"
00058 #include "catalog/storage.h"
00059 #include "commands/trigger.h"
00060 #include "common/relpath.h"
00061 #include "miscadmin.h"
00062 #include "optimizer/clauses.h"
00063 #include "optimizer/planmain.h"
00064 #include "optimizer/prep.h"
00065 #include "optimizer/var.h"
00066 #include "rewrite/rewriteDefine.h"
00067 #include "storage/lmgr.h"
00068 #include "storage/smgr.h"
00069 #include "utils/array.h"
00070 #include "utils/builtins.h"
00071 #include "utils/fmgroids.h"
00072 #include "utils/inval.h"
00073 #include "utils/lsyscache.h"
00074 #include "utils/memutils.h"
00075 #include "utils/relmapper.h"
00076 #include "utils/resowner_private.h"
00077 #include "utils/syscache.h"
00078 #include "utils/tqual.h"
00079 
00080 
00081 /*
00082  *      name of relcache init file(s), used to speed up backend startup
00083  */
00084 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
00085 
00086 #define RELCACHE_INIT_FILEMAGIC     0x573266    /* version ID value */
00087 
00088 /*
00089  *      hardcoded tuple descriptors, contents generated by genbki.pl
00090  */
00091 static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
00092 static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
00093 static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
00094 static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
00095 static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
00096 static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
00097 static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
00098 static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
00099 
00100 /*
00101  *      Hash tables that index the relation cache
00102  *
00103  *      We used to index the cache by both name and OID, but now there
00104  *      is only an index by OID.
00105  */
00106 typedef struct relidcacheent
00107 {
00108     Oid         reloid;
00109     Relation    reldesc;
00110 } RelIdCacheEnt;
00111 
00112 static HTAB *RelationIdCache;
00113 
00114 /*
00115  * This flag is false until we have prepared the critical relcache entries
00116  * that are needed to do indexscans on the tables read by relcache building.
00117  */
00118 bool        criticalRelcachesBuilt = false;
00119 
00120 /*
00121  * This flag is false until we have prepared the critical relcache entries
00122  * for shared catalogs (which are the tables needed for login).
00123  */
00124 bool        criticalSharedRelcachesBuilt = false;
00125 
00126 /*
00127  * This counter counts relcache inval events received since backend startup
00128  * (but only for rels that are actually in cache).  Presently, we use it only
00129  * to detect whether data about to be written by write_relcache_init_file()
00130  * might already be obsolete.
00131  */
00132 static long relcacheInvalsReceived = 0L;
00133 
00134 /*
00135  * This list remembers the OIDs of the non-shared relations cached in the
00136  * database's local relcache init file.  Note that there is no corresponding
00137  * list for the shared relcache init file, for reasons explained in the
00138  * comments for RelationCacheInitFileRemove.
00139  */
00140 static List *initFileRelationIds = NIL;
00141 
00142 /*
00143  * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
00144  * cleanup work.  This list intentionally has limited size; if it overflows,
00145  * we fall back to scanning the whole hashtable.  There is no value in a very
00146  * large list because (1) at some point, a hash_seq_search scan is faster than
00147  * retail lookups, and (2) the value of this is to reduce EOXact work for
00148  * short transactions, which can't have dirtied all that many tables anyway.
00149  * EOXactListAdd() does not bother to prevent duplicate list entries, so the
00150  * cleanup processing must be idempotent.
00151  */
00152 #define MAX_EOXACT_LIST 32
00153 static Oid  eoxact_list[MAX_EOXACT_LIST];
00154 static int  eoxact_list_len = 0;
00155 static bool eoxact_list_overflowed = false;
00156 
00157 #define EOXactListAdd(rel) \
00158     do { \
00159         if (eoxact_list_len < MAX_EOXACT_LIST) \
00160             eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
00161         else \
00162             eoxact_list_overflowed = true; \
00163     } while (0)
00164 
00165 
00166 /*
00167  *      macros to manipulate the lookup hashtables
00168  */
00169 #define RelationCacheInsert(RELATION)   \
00170 do { \
00171     RelIdCacheEnt *idhentry; bool found; \
00172     idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
00173                                            (void *) &(RELATION->rd_id), \
00174                                            HASH_ENTER, &found); \
00175     /* used to give notice if found -- now just keep quiet */ \
00176     idhentry->reldesc = RELATION; \
00177 } while(0)
00178 
00179 #define RelationIdCacheLookup(ID, RELATION) \
00180 do { \
00181     RelIdCacheEnt *hentry; \
00182     hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
00183                                          (void *) &(ID), \
00184                                          HASH_FIND, NULL); \
00185     if (hentry) \
00186         RELATION = hentry->reldesc; \
00187     else \
00188         RELATION = NULL; \
00189 } while(0)
00190 
00191 #define RelationCacheDelete(RELATION) \
00192 do { \
00193     RelIdCacheEnt *idhentry; \
00194     idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
00195                                            (void *) &(RELATION->rd_id), \
00196                                            HASH_REMOVE, NULL); \
00197     if (idhentry == NULL) \
00198         elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
00199 } while(0)
00200 
00201 
00202 /*
00203  * Special cache for opclass-related information
00204  *
00205  * Note: only default support procs get cached, ie, those with
00206  * lefttype = righttype = opcintype.
00207  */
00208 typedef struct opclasscacheent
00209 {
00210     Oid         opclassoid;     /* lookup key: OID of opclass */
00211     bool        valid;          /* set TRUE after successful fill-in */
00212     StrategyNumber numSupport;  /* max # of support procs (from pg_am) */
00213     Oid         opcfamily;      /* OID of opclass's family */
00214     Oid         opcintype;      /* OID of opclass's declared input type */
00215     RegProcedure *supportProcs; /* OIDs of support procedures */
00216 } OpClassCacheEnt;
00217 
00218 static HTAB *OpClassCache = NULL;
00219 
00220 
00221 /* non-export function prototypes */
00222 
00223 static void RelationDestroyRelation(Relation relation);
00224 static void RelationClearRelation(Relation relation, bool rebuild);
00225 
00226 static void RelationReloadIndexInfo(Relation relation);
00227 static void RelationFlushRelation(Relation relation);
00228 static void AtEOXact_cleanup(Relation relation, bool isCommit);
00229 static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
00230                     SubTransactionId mySubid, SubTransactionId parentSubid);
00231 static bool load_relcache_init_file(bool shared);
00232 static void write_relcache_init_file(bool shared);
00233 static void write_item(const void *data, Size len, FILE *fp);
00234 
00235 static void formrdesc(const char *relationName, Oid relationReltype,
00236           bool isshared, bool hasoids,
00237           int natts, const FormData_pg_attribute *attrs);
00238 
00239 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
00240 static Relation AllocateRelationDesc(Form_pg_class relp);
00241 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
00242 static void RelationBuildTupleDesc(Relation relation);
00243 static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
00244 static void RelationInitPhysicalAddr(Relation relation);
00245 static void load_critical_index(Oid indexoid, Oid heapoid);
00246 static TupleDesc GetPgClassDescriptor(void);
00247 static TupleDesc GetPgIndexDescriptor(void);
00248 static void AttrDefaultFetch(Relation relation);
00249 static void CheckConstraintFetch(Relation relation);
00250 static List *insert_ordered_oid(List *list, Oid datum);
00251 static void IndexSupportInitialize(oidvector *indclass,
00252                        RegProcedure *indexSupport,
00253                        Oid *opFamily,
00254                        Oid *opcInType,
00255                        StrategyNumber maxSupportNumber,
00256                        AttrNumber maxAttributeNumber);
00257 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
00258                   StrategyNumber numSupport);
00259 static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
00260 static void unlink_initfile(const char *initfilename);
00261 
00262 
00263 /*
00264  *      ScanPgRelation
00265  *
00266  *      This is used by RelationBuildDesc to find a pg_class
00267  *      tuple matching targetRelId.  The caller must hold at least
00268  *      AccessShareLock on the target relid to prevent concurrent-update
00269  *      scenarios --- else our SnapshotNow scan might fail to find any
00270  *      version that it thinks is live.
00271  *
00272  *      NB: the returned tuple has been copied into palloc'd storage
00273  *      and must eventually be freed with heap_freetuple.
00274  */
00275 static HeapTuple
00276 ScanPgRelation(Oid targetRelId, bool indexOK)
00277 {
00278     HeapTuple   pg_class_tuple;
00279     Relation    pg_class_desc;
00280     SysScanDesc pg_class_scan;
00281     ScanKeyData key[1];
00282 
00283     /*
00284      * If something goes wrong during backend startup, we might find ourselves
00285      * trying to read pg_class before we've selected a database.  That ain't
00286      * gonna work, so bail out with a useful error message.  If this happens,
00287      * it probably means a relcache entry that needs to be nailed isn't.
00288      */
00289     if (!OidIsValid(MyDatabaseId))
00290         elog(FATAL, "cannot read pg_class without having selected a database");
00291 
00292     /*
00293      * form a scan key
00294      */
00295     ScanKeyInit(&key[0],
00296                 ObjectIdAttributeNumber,
00297                 BTEqualStrategyNumber, F_OIDEQ,
00298                 ObjectIdGetDatum(targetRelId));
00299 
00300     /*
00301      * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
00302      * built the critical relcache entries (this includes initdb and startup
00303      * without a pg_internal.init file).  The caller can also force a heap
00304      * scan by setting indexOK == false.
00305      */
00306     pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
00307     pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
00308                                        indexOK && criticalRelcachesBuilt,
00309                                        SnapshotNow,
00310                                        1, key);
00311 
00312     pg_class_tuple = systable_getnext(pg_class_scan);
00313 
00314     /*
00315      * Must copy tuple before releasing buffer.
00316      */
00317     if (HeapTupleIsValid(pg_class_tuple))
00318         pg_class_tuple = heap_copytuple(pg_class_tuple);
00319 
00320     /* all done */
00321     systable_endscan(pg_class_scan);
00322     heap_close(pg_class_desc, AccessShareLock);
00323 
00324     return pg_class_tuple;
00325 }
00326 
00327 /*
00328  *      AllocateRelationDesc
00329  *
00330  *      This is used to allocate memory for a new relation descriptor
00331  *      and initialize the rd_rel field from the given pg_class tuple.
00332  */
00333 static Relation
00334 AllocateRelationDesc(Form_pg_class relp)
00335 {
00336     Relation    relation;
00337     MemoryContext oldcxt;
00338     Form_pg_class relationForm;
00339 
00340     /* Relcache entries must live in CacheMemoryContext */
00341     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
00342 
00343     /*
00344      * allocate and zero space for new relation descriptor
00345      */
00346     relation = (Relation) palloc0(sizeof(RelationData));
00347 
00348     /* make sure relation is marked as having no open file yet */
00349     relation->rd_smgr = NULL;
00350 
00351     /*
00352      * Copy the relation tuple form
00353      *
00354      * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
00355      * variable-length fields (relacl, reloptions) are NOT stored in the
00356      * relcache --- there'd be little point in it, since we don't copy the
00357      * tuple's nulls bitmap and hence wouldn't know if the values are valid.
00358      * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
00359      * it from the syscache if you need it.  The same goes for the original
00360      * form of reloptions (however, we do store the parsed form of reloptions
00361      * in rd_options).
00362      */
00363     relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
00364 
00365     memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
00366 
00367     /* initialize relation tuple form */
00368     relation->rd_rel = relationForm;
00369 
00370     /* and allocate attribute tuple form storage */
00371     relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
00372                                                relationForm->relhasoids);
00373     /* which we mark as a reference-counted tupdesc */
00374     relation->rd_att->tdrefcount = 1;
00375 
00376     MemoryContextSwitchTo(oldcxt);
00377 
00378     return relation;
00379 }
00380 
00381 /*
00382  * RelationParseRelOptions
00383  *      Convert pg_class.reloptions into pre-parsed rd_options
00384  *
00385  * tuple is the real pg_class tuple (not rd_rel!) for relation
00386  *
00387  * Note: rd_rel and (if an index) rd_am must be valid already
00388  */
00389 static void
00390 RelationParseRelOptions(Relation relation, HeapTuple tuple)
00391 {
00392     bytea      *options;
00393 
00394     relation->rd_options = NULL;
00395 
00396     /* Fall out if relkind should not have options */
00397     switch (relation->rd_rel->relkind)
00398     {
00399         case RELKIND_RELATION:
00400         case RELKIND_TOASTVALUE:
00401         case RELKIND_INDEX:
00402         case RELKIND_VIEW:
00403         case RELKIND_MATVIEW:
00404             break;
00405         default:
00406             return;
00407     }
00408 
00409     /*
00410      * Fetch reloptions from tuple; have to use a hardwired descriptor because
00411      * we might not have any other for pg_class yet (consider executing this
00412      * code for pg_class itself)
00413      */
00414     options = extractRelOptions(tuple,
00415                                 GetPgClassDescriptor(),
00416                                 relation->rd_rel->relkind == RELKIND_INDEX ?
00417                                 relation->rd_am->amoptions : InvalidOid);
00418 
00419     /*
00420      * Copy parsed data into CacheMemoryContext.  To guard against the
00421      * possibility of leaks in the reloptions code, we want to do the actual
00422      * parsing in the caller's memory context and copy the results into
00423      * CacheMemoryContext after the fact.
00424      */
00425     if (options)
00426     {
00427         relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
00428                                                   VARSIZE(options));
00429         memcpy(relation->rd_options, options, VARSIZE(options));
00430         pfree(options);
00431     }
00432 }
00433 
00434 /*
00435  *      RelationBuildTupleDesc
00436  *
00437  *      Form the relation's tuple descriptor from information in
00438  *      the pg_attribute, pg_attrdef & pg_constraint system catalogs.
00439  */
00440 static void
00441 RelationBuildTupleDesc(Relation relation)
00442 {
00443     HeapTuple   pg_attribute_tuple;
00444     Relation    pg_attribute_desc;
00445     SysScanDesc pg_attribute_scan;
00446     ScanKeyData skey[2];
00447     int         need;
00448     TupleConstr *constr;
00449     AttrDefault *attrdef = NULL;
00450     int         ndef = 0;
00451 
00452     /* copy some fields from pg_class row to rd_att */
00453     relation->rd_att->tdtypeid = relation->rd_rel->reltype;
00454     relation->rd_att->tdtypmod = -1;    /* unnecessary, but... */
00455     relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
00456 
00457     constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
00458                                                 sizeof(TupleConstr));
00459     constr->has_not_null = false;
00460 
00461     /*
00462      * Form a scan key that selects only user attributes (attnum > 0).
00463      * (Eliminating system attribute rows at the index level is lots faster
00464      * than fetching them.)
00465      */
00466     ScanKeyInit(&skey[0],
00467                 Anum_pg_attribute_attrelid,
00468                 BTEqualStrategyNumber, F_OIDEQ,
00469                 ObjectIdGetDatum(RelationGetRelid(relation)));
00470     ScanKeyInit(&skey[1],
00471                 Anum_pg_attribute_attnum,
00472                 BTGreaterStrategyNumber, F_INT2GT,
00473                 Int16GetDatum(0));
00474 
00475     /*
00476      * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
00477      * built the critical relcache entries (this includes initdb and startup
00478      * without a pg_internal.init file).
00479      */
00480     pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
00481     pg_attribute_scan = systable_beginscan(pg_attribute_desc,
00482                                            AttributeRelidNumIndexId,
00483                                            criticalRelcachesBuilt,
00484                                            SnapshotNow,
00485                                            2, skey);
00486 
00487     /*
00488      * add attribute data to relation->rd_att
00489      */
00490     need = relation->rd_rel->relnatts;
00491 
00492     while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
00493     {
00494         Form_pg_attribute attp;
00495 
00496         attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
00497 
00498         if (attp->attnum <= 0 ||
00499             attp->attnum > relation->rd_rel->relnatts)
00500             elog(ERROR, "invalid attribute number %d for %s",
00501                  attp->attnum, RelationGetRelationName(relation));
00502 
00503         memcpy(relation->rd_att->attrs[attp->attnum - 1],
00504                attp,
00505                ATTRIBUTE_FIXED_PART_SIZE);
00506 
00507         /* Update constraint/default info */
00508         if (attp->attnotnull)
00509             constr->has_not_null = true;
00510 
00511         if (attp->atthasdef)
00512         {
00513             if (attrdef == NULL)
00514                 attrdef = (AttrDefault *)
00515                     MemoryContextAllocZero(CacheMemoryContext,
00516                                            relation->rd_rel->relnatts *
00517                                            sizeof(AttrDefault));
00518             attrdef[ndef].adnum = attp->attnum;
00519             attrdef[ndef].adbin = NULL;
00520             ndef++;
00521         }
00522         need--;
00523         if (need == 0)
00524             break;
00525     }
00526 
00527     /*
00528      * end the scan and close the attribute relation
00529      */
00530     systable_endscan(pg_attribute_scan);
00531     heap_close(pg_attribute_desc, AccessShareLock);
00532 
00533     if (need != 0)
00534         elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
00535              need, RelationGetRelid(relation));
00536 
00537     /*
00538      * The attcacheoff values we read from pg_attribute should all be -1
00539      * ("unknown").  Verify this if assert checking is on.  They will be
00540      * computed when and if needed during tuple access.
00541      */
00542 #ifdef USE_ASSERT_CHECKING
00543     {
00544         int         i;
00545 
00546         for (i = 0; i < relation->rd_rel->relnatts; i++)
00547             Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
00548     }
00549 #endif
00550 
00551     /*
00552      * However, we can easily set the attcacheoff value for the first
00553      * attribute: it must be zero.  This eliminates the need for special cases
00554      * for attnum=1 that used to exist in fastgetattr() and index_getattr().
00555      */
00556     if (relation->rd_rel->relnatts > 0)
00557         relation->rd_att->attrs[0]->attcacheoff = 0;
00558 
00559     /*
00560      * Set up constraint/default info
00561      */
00562     if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
00563     {
00564         relation->rd_att->constr = constr;
00565 
00566         if (ndef > 0)           /* DEFAULTs */
00567         {
00568             if (ndef < relation->rd_rel->relnatts)
00569                 constr->defval = (AttrDefault *)
00570                     repalloc(attrdef, ndef * sizeof(AttrDefault));
00571             else
00572                 constr->defval = attrdef;
00573             constr->num_defval = ndef;
00574             AttrDefaultFetch(relation);
00575         }
00576         else
00577             constr->num_defval = 0;
00578 
00579         if (relation->rd_rel->relchecks > 0)    /* CHECKs */
00580         {
00581             constr->num_check = relation->rd_rel->relchecks;
00582             constr->check = (ConstrCheck *)
00583                 MemoryContextAllocZero(CacheMemoryContext,
00584                                     constr->num_check * sizeof(ConstrCheck));
00585             CheckConstraintFetch(relation);
00586         }
00587         else
00588             constr->num_check = 0;
00589     }
00590     else
00591     {
00592         pfree(constr);
00593         relation->rd_att->constr = NULL;
00594     }
00595 }
00596 
00597 /*
00598  *      RelationBuildRuleLock
00599  *
00600  *      Form the relation's rewrite rules from information in
00601  *      the pg_rewrite system catalog.
00602  *
00603  * Note: The rule parsetrees are potentially very complex node structures.
00604  * To allow these trees to be freed when the relcache entry is flushed,
00605  * we make a private memory context to hold the RuleLock information for
00606  * each relcache entry that has associated rules.  The context is used
00607  * just for rule info, not for any other subsidiary data of the relcache
00608  * entry, because that keeps the update logic in RelationClearRelation()
00609  * manageable.  The other subsidiary data structures are simple enough
00610  * to be easy to free explicitly, anyway.
00611  */
00612 static void
00613 RelationBuildRuleLock(Relation relation)
00614 {
00615     MemoryContext rulescxt;
00616     MemoryContext oldcxt;
00617     HeapTuple   rewrite_tuple;
00618     Relation    rewrite_desc;
00619     TupleDesc   rewrite_tupdesc;
00620     SysScanDesc rewrite_scan;
00621     ScanKeyData key;
00622     RuleLock   *rulelock;
00623     int         numlocks;
00624     RewriteRule **rules;
00625     int         maxlocks;
00626 
00627     /*
00628      * Make the private context.  Parameters are set on the assumption that
00629      * it'll probably not contain much data.
00630      */
00631     rulescxt = AllocSetContextCreate(CacheMemoryContext,
00632                                      RelationGetRelationName(relation),
00633                                      ALLOCSET_SMALL_MINSIZE,
00634                                      ALLOCSET_SMALL_INITSIZE,
00635                                      ALLOCSET_SMALL_MAXSIZE);
00636     relation->rd_rulescxt = rulescxt;
00637 
00638     /*
00639      * allocate an array to hold the rewrite rules (the array is extended if
00640      * necessary)
00641      */
00642     maxlocks = 4;
00643     rules = (RewriteRule **)
00644         MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
00645     numlocks = 0;
00646 
00647     /*
00648      * form a scan key
00649      */
00650     ScanKeyInit(&key,
00651                 Anum_pg_rewrite_ev_class,
00652                 BTEqualStrategyNumber, F_OIDEQ,
00653                 ObjectIdGetDatum(RelationGetRelid(relation)));
00654 
00655     /*
00656      * open pg_rewrite and begin a scan
00657      *
00658      * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
00659      * be reading the rules in name order, except possibly during
00660      * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
00661      * ensures that rules will be fired in name order.
00662      */
00663     rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
00664     rewrite_tupdesc = RelationGetDescr(rewrite_desc);
00665     rewrite_scan = systable_beginscan(rewrite_desc,
00666                                       RewriteRelRulenameIndexId,
00667                                       true, SnapshotNow,
00668                                       1, &key);
00669 
00670     while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
00671     {
00672         Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
00673         bool        isnull;
00674         Datum       rule_datum;
00675         char       *rule_str;
00676         RewriteRule *rule;
00677 
00678         rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
00679                                                   sizeof(RewriteRule));
00680 
00681         rule->ruleId = HeapTupleGetOid(rewrite_tuple);
00682 
00683         rule->event = rewrite_form->ev_type - '0';
00684         rule->attrno = rewrite_form->ev_attr;
00685         rule->enabled = rewrite_form->ev_enabled;
00686         rule->isInstead = rewrite_form->is_instead;
00687 
00688         /*
00689          * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
00690          * rule strings are often large enough to be toasted.  To avoid
00691          * leaking memory in the caller's context, do the detoasting here so
00692          * we can free the detoasted version.
00693          */
00694         rule_datum = heap_getattr(rewrite_tuple,
00695                                   Anum_pg_rewrite_ev_action,
00696                                   rewrite_tupdesc,
00697                                   &isnull);
00698         Assert(!isnull);
00699         rule_str = TextDatumGetCString(rule_datum);
00700         oldcxt = MemoryContextSwitchTo(rulescxt);
00701         rule->actions = (List *) stringToNode(rule_str);
00702         MemoryContextSwitchTo(oldcxt);
00703         pfree(rule_str);
00704 
00705         rule_datum = heap_getattr(rewrite_tuple,
00706                                   Anum_pg_rewrite_ev_qual,
00707                                   rewrite_tupdesc,
00708                                   &isnull);
00709         Assert(!isnull);
00710         rule_str = TextDatumGetCString(rule_datum);
00711         oldcxt = MemoryContextSwitchTo(rulescxt);
00712         rule->qual = (Node *) stringToNode(rule_str);
00713         MemoryContextSwitchTo(oldcxt);
00714         pfree(rule_str);
00715 
00716         /*
00717          * We want the rule's table references to be checked as though by the
00718          * table owner, not the user referencing the rule.  Therefore, scan
00719          * through the rule's actions and set the checkAsUser field on all
00720          * rtable entries.  We have to look at the qual as well, in case it
00721          * contains sublinks.
00722          *
00723          * The reason for doing this when the rule is loaded, rather than when
00724          * it is stored, is that otherwise ALTER TABLE OWNER would have to
00725          * grovel through stored rules to update checkAsUser fields. Scanning
00726          * the rule tree during load is relatively cheap (compared to
00727          * constructing it in the first place), so we do it here.
00728          */
00729         setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
00730         setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
00731 
00732         if (numlocks >= maxlocks)
00733         {
00734             maxlocks *= 2;
00735             rules = (RewriteRule **)
00736                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
00737         }
00738         rules[numlocks++] = rule;
00739     }
00740 
00741     /*
00742      * end the scan and close the attribute relation
00743      */
00744     systable_endscan(rewrite_scan);
00745     heap_close(rewrite_desc, AccessShareLock);
00746 
00747     /*
00748      * there might not be any rules (if relhasrules is out-of-date)
00749      */
00750     if (numlocks == 0)
00751     {
00752         relation->rd_rules = NULL;
00753         relation->rd_rulescxt = NULL;
00754         MemoryContextDelete(rulescxt);
00755         return;
00756     }
00757 
00758     /*
00759      * form a RuleLock and insert into relation
00760      */
00761     rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
00762     rulelock->numLocks = numlocks;
00763     rulelock->rules = rules;
00764 
00765     relation->rd_rules = rulelock;
00766 }
00767 
00768 /*
00769  *      equalRuleLocks
00770  *
00771  *      Determine whether two RuleLocks are equivalent
00772  *
00773  *      Probably this should be in the rules code someplace...
00774  */
00775 static bool
00776 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
00777 {
00778     int         i;
00779 
00780     /*
00781      * As of 7.3 we assume the rule ordering is repeatable, because
00782      * RelationBuildRuleLock should read 'em in a consistent order.  So just
00783      * compare corresponding slots.
00784      */
00785     if (rlock1 != NULL)
00786     {
00787         if (rlock2 == NULL)
00788             return false;
00789         if (rlock1->numLocks != rlock2->numLocks)
00790             return false;
00791         for (i = 0; i < rlock1->numLocks; i++)
00792         {
00793             RewriteRule *rule1 = rlock1->rules[i];
00794             RewriteRule *rule2 = rlock2->rules[i];
00795 
00796             if (rule1->ruleId != rule2->ruleId)
00797                 return false;
00798             if (rule1->event != rule2->event)
00799                 return false;
00800             if (rule1->attrno != rule2->attrno)
00801                 return false;
00802             if (rule1->enabled != rule2->enabled)
00803                 return false;
00804             if (rule1->isInstead != rule2->isInstead)
00805                 return false;
00806             if (!equal(rule1->qual, rule2->qual))
00807                 return false;
00808             if (!equal(rule1->actions, rule2->actions))
00809                 return false;
00810         }
00811     }
00812     else if (rlock2 != NULL)
00813         return false;
00814     return true;
00815 }
00816 
00817 
00818 /*
00819  *      RelationBuildDesc
00820  *
00821  *      Build a relation descriptor.  The caller must hold at least
00822  *      AccessShareLock on the target relid.
00823  *
00824  *      The new descriptor is inserted into the hash table if insertIt is true.
00825  *
00826  *      Returns NULL if no pg_class row could be found for the given relid
00827  *      (suggesting we are trying to access a just-deleted relation).
00828  *      Any other error is reported via elog.
00829  */
00830 static Relation
00831 RelationBuildDesc(Oid targetRelId, bool insertIt)
00832 {
00833     Relation    relation;
00834     Oid         relid;
00835     HeapTuple   pg_class_tuple;
00836     Form_pg_class relp;
00837 
00838     /*
00839      * find the tuple in pg_class corresponding to the given relation id
00840      */
00841     pg_class_tuple = ScanPgRelation(targetRelId, true);
00842 
00843     /*
00844      * if no such tuple exists, return NULL
00845      */
00846     if (!HeapTupleIsValid(pg_class_tuple))
00847         return NULL;
00848 
00849     /*
00850      * get information from the pg_class_tuple
00851      */
00852     relid = HeapTupleGetOid(pg_class_tuple);
00853     relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
00854     Assert(relid == targetRelId);
00855 
00856     /*
00857      * allocate storage for the relation descriptor, and copy pg_class_tuple
00858      * to relation->rd_rel.
00859      */
00860     relation = AllocateRelationDesc(relp);
00861 
00862     /*
00863      * initialize the relation's relation id (relation->rd_id)
00864      */
00865     RelationGetRelid(relation) = relid;
00866 
00867     /*
00868      * normal relations are not nailed into the cache; nor can a pre-existing
00869      * relation be new.  It could be temp though.  (Actually, it could be new
00870      * too, but it's okay to forget that fact if forced to flush the entry.)
00871      */
00872     relation->rd_refcnt = 0;
00873     relation->rd_isnailed = false;
00874     relation->rd_createSubid = InvalidSubTransactionId;
00875     relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
00876     switch (relation->rd_rel->relpersistence)
00877     {
00878         case RELPERSISTENCE_UNLOGGED:
00879         case RELPERSISTENCE_PERMANENT:
00880             relation->rd_backend = InvalidBackendId;
00881             relation->rd_islocaltemp = false;
00882             break;
00883         case RELPERSISTENCE_TEMP:
00884             if (isTempOrToastNamespace(relation->rd_rel->relnamespace))
00885             {
00886                 relation->rd_backend = MyBackendId;
00887                 relation->rd_islocaltemp = true;
00888             }
00889             else
00890             {
00891                 /*
00892                  * If it's a temp table, but not one of ours, we have to use
00893                  * the slow, grotty method to figure out the owning backend.
00894                  *
00895                  * Note: it's possible that rd_backend gets set to MyBackendId
00896                  * here, in case we are looking at a pg_class entry left over
00897                  * from a crashed backend that coincidentally had the same
00898                  * BackendId we're using.  We should *not* consider such a
00899                  * table to be "ours"; this is why we need the separate
00900                  * rd_islocaltemp flag.  The pg_class entry will get flushed
00901                  * if/when we clean out the corresponding temp table namespace
00902                  * in preparation for using it.
00903                  */
00904                 relation->rd_backend =
00905                     GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
00906                 Assert(relation->rd_backend != InvalidBackendId);
00907                 relation->rd_islocaltemp = false;
00908             }
00909             break;
00910         default:
00911             elog(ERROR, "invalid relpersistence: %c",
00912                  relation->rd_rel->relpersistence);
00913             break;
00914     }
00915 
00916     /*
00917      * initialize the tuple descriptor (relation->rd_att).
00918      */
00919     RelationBuildTupleDesc(relation);
00920 
00921     /*
00922      * Fetch rules and triggers that affect this relation
00923      */
00924     if (relation->rd_rel->relhasrules)
00925         RelationBuildRuleLock(relation);
00926     else
00927     {
00928         relation->rd_rules = NULL;
00929         relation->rd_rulescxt = NULL;
00930     }
00931 
00932     if (relation->rd_rel->relhastriggers)
00933         RelationBuildTriggers(relation);
00934     else
00935         relation->trigdesc = NULL;
00936 
00937     /*
00938      * if it's an index, initialize index-related information
00939      */
00940     if (OidIsValid(relation->rd_rel->relam))
00941         RelationInitIndexAccessInfo(relation);
00942 
00943     /* extract reloptions if any */
00944     RelationParseRelOptions(relation, pg_class_tuple);
00945 
00946     /*
00947      * initialize the relation lock manager information
00948      */
00949     RelationInitLockInfo(relation);     /* see lmgr.c */
00950 
00951     /*
00952      * initialize physical addressing information for the relation
00953      */
00954     RelationInitPhysicalAddr(relation);
00955 
00956     /* make sure relation is marked as having no open file yet */
00957     relation->rd_smgr = NULL;
00958 
00959     if (relation->rd_rel->relkind == RELKIND_MATVIEW &&
00960         heap_is_matview_init_state(relation))
00961         relation->rd_ispopulated = false;
00962     else
00963         relation->rd_ispopulated = true;
00964 
00965     /*
00966      * now we can free the memory allocated for pg_class_tuple
00967      */
00968     heap_freetuple(pg_class_tuple);
00969 
00970     /*
00971      * Insert newly created relation into relcache hash table, if requested.
00972      */
00973     if (insertIt)
00974         RelationCacheInsert(relation);
00975 
00976     /* It's fully valid */
00977     relation->rd_isvalid = true;
00978 
00979     return relation;
00980 }
00981 
00982 /*
00983  * Initialize the physical addressing info (RelFileNode) for a relcache entry
00984  *
00985  * Note: at the physical level, relations in the pg_global tablespace must
00986  * be treated as shared, even if relisshared isn't set.  Hence we do not
00987  * look at relisshared here.
00988  */
00989 static void
00990 RelationInitPhysicalAddr(Relation relation)
00991 {
00992     if (relation->rd_rel->reltablespace)
00993         relation->rd_node.spcNode = relation->rd_rel->reltablespace;
00994     else
00995         relation->rd_node.spcNode = MyDatabaseTableSpace;
00996     if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
00997         relation->rd_node.dbNode = InvalidOid;
00998     else
00999         relation->rd_node.dbNode = MyDatabaseId;
01000     if (relation->rd_rel->relfilenode)
01001         relation->rd_node.relNode = relation->rd_rel->relfilenode;
01002     else
01003     {
01004         /* Consult the relation mapper */
01005         relation->rd_node.relNode =
01006             RelationMapOidToFilenode(relation->rd_id,
01007                                      relation->rd_rel->relisshared);
01008         if (!OidIsValid(relation->rd_node.relNode))
01009             elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
01010                  RelationGetRelationName(relation), relation->rd_id);
01011     }
01012 }
01013 
01014 /*
01015  * Initialize index-access-method support data for an index relation
01016  */
01017 void
01018 RelationInitIndexAccessInfo(Relation relation)
01019 {
01020     HeapTuple   tuple;
01021     Form_pg_am  aform;
01022     Datum       indcollDatum;
01023     Datum       indclassDatum;
01024     Datum       indoptionDatum;
01025     bool        isnull;
01026     oidvector  *indcoll;
01027     oidvector  *indclass;
01028     int2vector *indoption;
01029     MemoryContext indexcxt;
01030     MemoryContext oldcontext;
01031     int         natts;
01032     uint16      amsupport;
01033 
01034     /*
01035      * Make a copy of the pg_index entry for the index.  Since pg_index
01036      * contains variable-length and possibly-null fields, we have to do this
01037      * honestly rather than just treating it as a Form_pg_index struct.
01038      */
01039     tuple = SearchSysCache1(INDEXRELID,
01040                             ObjectIdGetDatum(RelationGetRelid(relation)));
01041     if (!HeapTupleIsValid(tuple))
01042         elog(ERROR, "cache lookup failed for index %u",
01043              RelationGetRelid(relation));
01044     oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
01045     relation->rd_indextuple = heap_copytuple(tuple);
01046     relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
01047     MemoryContextSwitchTo(oldcontext);
01048     ReleaseSysCache(tuple);
01049 
01050     /*
01051      * Make a copy of the pg_am entry for the index's access method
01052      */
01053     tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
01054     if (!HeapTupleIsValid(tuple))
01055         elog(ERROR, "cache lookup failed for access method %u",
01056              relation->rd_rel->relam);
01057     aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
01058     memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
01059     ReleaseSysCache(tuple);
01060     relation->rd_am = aform;
01061 
01062     natts = relation->rd_rel->relnatts;
01063     if (natts != relation->rd_index->indnatts)
01064         elog(ERROR, "relnatts disagrees with indnatts for index %u",
01065              RelationGetRelid(relation));
01066     amsupport = aform->amsupport;
01067 
01068     /*
01069      * Make the private context to hold index access info.  The reason we need
01070      * a context, and not just a couple of pallocs, is so that we won't leak
01071      * any subsidiary info attached to fmgr lookup records.
01072      *
01073      * Context parameters are set on the assumption that it'll probably not
01074      * contain much data.
01075      */
01076     indexcxt = AllocSetContextCreate(CacheMemoryContext,
01077                                      RelationGetRelationName(relation),
01078                                      ALLOCSET_SMALL_MINSIZE,
01079                                      ALLOCSET_SMALL_INITSIZE,
01080                                      ALLOCSET_SMALL_MAXSIZE);
01081     relation->rd_indexcxt = indexcxt;
01082 
01083     /*
01084      * Allocate arrays to hold data
01085      */
01086     relation->rd_aminfo = (RelationAmInfo *)
01087         MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
01088 
01089     relation->rd_opfamily = (Oid *)
01090         MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
01091     relation->rd_opcintype = (Oid *)
01092         MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
01093 
01094     if (amsupport > 0)
01095     {
01096         int         nsupport = natts * amsupport;
01097 
01098         relation->rd_support = (RegProcedure *)
01099             MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
01100         relation->rd_supportinfo = (FmgrInfo *)
01101             MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
01102     }
01103     else
01104     {
01105         relation->rd_support = NULL;
01106         relation->rd_supportinfo = NULL;
01107     }
01108 
01109     relation->rd_indcollation = (Oid *)
01110         MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
01111 
01112     relation->rd_indoption = (int16 *)
01113         MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
01114 
01115     /*
01116      * indcollation cannot be referenced directly through the C struct,
01117      * because it comes after the variable-width indkey field.  Must extract
01118      * the datum the hard way...
01119      */
01120     indcollDatum = fastgetattr(relation->rd_indextuple,
01121                                Anum_pg_index_indcollation,
01122                                GetPgIndexDescriptor(),
01123                                &isnull);
01124     Assert(!isnull);
01125     indcoll = (oidvector *) DatumGetPointer(indcollDatum);
01126     memcpy(relation->rd_indcollation, indcoll->values, natts * sizeof(Oid));
01127 
01128     /*
01129      * indclass cannot be referenced directly through the C struct, because it
01130      * comes after the variable-width indkey field.  Must extract the datum
01131      * the hard way...
01132      */
01133     indclassDatum = fastgetattr(relation->rd_indextuple,
01134                                 Anum_pg_index_indclass,
01135                                 GetPgIndexDescriptor(),
01136                                 &isnull);
01137     Assert(!isnull);
01138     indclass = (oidvector *) DatumGetPointer(indclassDatum);
01139 
01140     /*
01141      * Fill the support procedure OID array, as well as the info about
01142      * opfamilies and opclass input types.  (aminfo and supportinfo are left
01143      * as zeroes, and are filled on-the-fly when used)
01144      */
01145     IndexSupportInitialize(indclass, relation->rd_support,
01146                            relation->rd_opfamily, relation->rd_opcintype,
01147                            amsupport, natts);
01148 
01149     /*
01150      * Similarly extract indoption and copy it to the cache entry
01151      */
01152     indoptionDatum = fastgetattr(relation->rd_indextuple,
01153                                  Anum_pg_index_indoption,
01154                                  GetPgIndexDescriptor(),
01155                                  &isnull);
01156     Assert(!isnull);
01157     indoption = (int2vector *) DatumGetPointer(indoptionDatum);
01158     memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
01159 
01160     /*
01161      * expressions, predicate, exclusion caches will be filled later
01162      */
01163     relation->rd_indexprs = NIL;
01164     relation->rd_indpred = NIL;
01165     relation->rd_exclops = NULL;
01166     relation->rd_exclprocs = NULL;
01167     relation->rd_exclstrats = NULL;
01168     relation->rd_amcache = NULL;
01169 }
01170 
01171 /*
01172  * IndexSupportInitialize
01173  *      Initializes an index's cached opclass information,
01174  *      given the index's pg_index.indclass entry.
01175  *
01176  * Data is returned into *indexSupport, *opFamily, and *opcInType,
01177  * which are arrays allocated by the caller.
01178  *
01179  * The caller also passes maxSupportNumber and maxAttributeNumber, since these
01180  * indicate the size of the arrays it has allocated --- but in practice these
01181  * numbers must always match those obtainable from the system catalog entries
01182  * for the index and access method.
01183  */
01184 static void
01185 IndexSupportInitialize(oidvector *indclass,
01186                        RegProcedure *indexSupport,
01187                        Oid *opFamily,
01188                        Oid *opcInType,
01189                        StrategyNumber maxSupportNumber,
01190                        AttrNumber maxAttributeNumber)
01191 {
01192     int         attIndex;
01193 
01194     for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
01195     {
01196         OpClassCacheEnt *opcentry;
01197 
01198         if (!OidIsValid(indclass->values[attIndex]))
01199             elog(ERROR, "bogus pg_index tuple");
01200 
01201         /* look up the info for this opclass, using a cache */
01202         opcentry = LookupOpclassInfo(indclass->values[attIndex],
01203                                      maxSupportNumber);
01204 
01205         /* copy cached data into relcache entry */
01206         opFamily[attIndex] = opcentry->opcfamily;
01207         opcInType[attIndex] = opcentry->opcintype;
01208         if (maxSupportNumber > 0)
01209             memcpy(&indexSupport[attIndex * maxSupportNumber],
01210                    opcentry->supportProcs,
01211                    maxSupportNumber * sizeof(RegProcedure));
01212     }
01213 }
01214 
01215 /*
01216  * LookupOpclassInfo
01217  *
01218  * This routine maintains a per-opclass cache of the information needed
01219  * by IndexSupportInitialize().  This is more efficient than relying on
01220  * the catalog cache, because we can load all the info about a particular
01221  * opclass in a single indexscan of pg_amproc.
01222  *
01223  * The information from pg_am about expected range of support function
01224  * numbers is passed in, rather than being looked up, mainly because the
01225  * caller will have it already.
01226  *
01227  * Note there is no provision for flushing the cache.  This is OK at the
01228  * moment because there is no way to ALTER any interesting properties of an
01229  * existing opclass --- all you can do is drop it, which will result in
01230  * a useless but harmless dead entry in the cache.  To support altering
01231  * opclass membership (not the same as opfamily membership!), we'd need to
01232  * be able to flush this cache as well as the contents of relcache entries
01233  * for indexes.
01234  */
01235 static OpClassCacheEnt *
01236 LookupOpclassInfo(Oid operatorClassOid,
01237                   StrategyNumber numSupport)
01238 {
01239     OpClassCacheEnt *opcentry;
01240     bool        found;
01241     Relation    rel;
01242     SysScanDesc scan;
01243     ScanKeyData skey[3];
01244     HeapTuple   htup;
01245     bool        indexOK;
01246 
01247     if (OpClassCache == NULL)
01248     {
01249         /* First time through: initialize the opclass cache */
01250         HASHCTL     ctl;
01251 
01252         MemSet(&ctl, 0, sizeof(ctl));
01253         ctl.keysize = sizeof(Oid);
01254         ctl.entrysize = sizeof(OpClassCacheEnt);
01255         ctl.hash = oid_hash;
01256         OpClassCache = hash_create("Operator class cache", 64,
01257                                    &ctl, HASH_ELEM | HASH_FUNCTION);
01258 
01259         /* Also make sure CacheMemoryContext exists */
01260         if (!CacheMemoryContext)
01261             CreateCacheMemoryContext();
01262     }
01263 
01264     opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
01265                                                (void *) &operatorClassOid,
01266                                                HASH_ENTER, &found);
01267 
01268     if (!found)
01269     {
01270         /* Need to allocate memory for new entry */
01271         opcentry->valid = false;    /* until known OK */
01272         opcentry->numSupport = numSupport;
01273 
01274         if (numSupport > 0)
01275             opcentry->supportProcs = (RegProcedure *)
01276                 MemoryContextAllocZero(CacheMemoryContext,
01277                                        numSupport * sizeof(RegProcedure));
01278         else
01279             opcentry->supportProcs = NULL;
01280     }
01281     else
01282     {
01283         Assert(numSupport == opcentry->numSupport);
01284     }
01285 
01286     /*
01287      * When testing for cache-flush hazards, we intentionally disable the
01288      * operator class cache and force reloading of the info on each call. This
01289      * is helpful because we want to test the case where a cache flush occurs
01290      * while we are loading the info, and it's very hard to provoke that if
01291      * this happens only once per opclass per backend.
01292      */
01293 #if defined(CLOBBER_CACHE_ALWAYS)
01294     opcentry->valid = false;
01295 #endif
01296 
01297     if (opcentry->valid)
01298         return opcentry;
01299 
01300     /*
01301      * Need to fill in new entry.
01302      *
01303      * To avoid infinite recursion during startup, force heap scans if we're
01304      * looking up info for the opclasses used by the indexes we would like to
01305      * reference here.
01306      */
01307     indexOK = criticalRelcachesBuilt ||
01308         (operatorClassOid != OID_BTREE_OPS_OID &&
01309          operatorClassOid != INT2_BTREE_OPS_OID);
01310 
01311     /*
01312      * We have to fetch the pg_opclass row to determine its opfamily and
01313      * opcintype, which are needed to look up related operators and functions.
01314      * It'd be convenient to use the syscache here, but that probably doesn't
01315      * work while bootstrapping.
01316      */
01317     ScanKeyInit(&skey[0],
01318                 ObjectIdAttributeNumber,
01319                 BTEqualStrategyNumber, F_OIDEQ,
01320                 ObjectIdGetDatum(operatorClassOid));
01321     rel = heap_open(OperatorClassRelationId, AccessShareLock);
01322     scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
01323                               SnapshotNow, 1, skey);
01324 
01325     if (HeapTupleIsValid(htup = systable_getnext(scan)))
01326     {
01327         Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
01328 
01329         opcentry->opcfamily = opclassform->opcfamily;
01330         opcentry->opcintype = opclassform->opcintype;
01331     }
01332     else
01333         elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
01334 
01335     systable_endscan(scan);
01336     heap_close(rel, AccessShareLock);
01337 
01338     /*
01339      * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
01340      * the default ones (those with lefttype = righttype = opcintype).
01341      */
01342     if (numSupport > 0)
01343     {
01344         ScanKeyInit(&skey[0],
01345                     Anum_pg_amproc_amprocfamily,
01346                     BTEqualStrategyNumber, F_OIDEQ,
01347                     ObjectIdGetDatum(opcentry->opcfamily));
01348         ScanKeyInit(&skey[1],
01349                     Anum_pg_amproc_amproclefttype,
01350                     BTEqualStrategyNumber, F_OIDEQ,
01351                     ObjectIdGetDatum(opcentry->opcintype));
01352         ScanKeyInit(&skey[2],
01353                     Anum_pg_amproc_amprocrighttype,
01354                     BTEqualStrategyNumber, F_OIDEQ,
01355                     ObjectIdGetDatum(opcentry->opcintype));
01356         rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
01357         scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
01358                                   SnapshotNow, 3, skey);
01359 
01360         while (HeapTupleIsValid(htup = systable_getnext(scan)))
01361         {
01362             Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
01363 
01364             if (amprocform->amprocnum <= 0 ||
01365                 (StrategyNumber) amprocform->amprocnum > numSupport)
01366                 elog(ERROR, "invalid amproc number %d for opclass %u",
01367                      amprocform->amprocnum, operatorClassOid);
01368 
01369             opcentry->supportProcs[amprocform->amprocnum - 1] =
01370                 amprocform->amproc;
01371         }
01372 
01373         systable_endscan(scan);
01374         heap_close(rel, AccessShareLock);
01375     }
01376 
01377     opcentry->valid = true;
01378     return opcentry;
01379 }
01380 
01381 
01382 /*
01383  *      formrdesc
01384  *
01385  *      This is a special cut-down version of RelationBuildDesc(),
01386  *      used while initializing the relcache.
01387  *      The relation descriptor is built just from the supplied parameters,
01388  *      without actually looking at any system table entries.  We cheat
01389  *      quite a lot since we only need to work for a few basic system
01390  *      catalogs.
01391  *
01392  * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
01393  * pg_class, pg_attribute, pg_proc, and pg_type
01394  * (see RelationCacheInitializePhase2/3).
01395  *
01396  * Note that these catalogs can't have constraints (except attnotnull),
01397  * default values, rules, or triggers, since we don't cope with any of that.
01398  * (Well, actually, this only matters for properties that need to be valid
01399  * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
01400  * these properties matter then...)
01401  *
01402  * NOTE: we assume we are already switched into CacheMemoryContext.
01403  */
01404 static void
01405 formrdesc(const char *relationName, Oid relationReltype,
01406           bool isshared, bool hasoids,
01407           int natts, const FormData_pg_attribute *attrs)
01408 {
01409     Relation    relation;
01410     int         i;
01411     bool        has_not_null;
01412 
01413     /*
01414      * allocate new relation desc, clear all fields of reldesc
01415      */
01416     relation = (Relation) palloc0(sizeof(RelationData));
01417 
01418     /* make sure relation is marked as having no open file yet */
01419     relation->rd_smgr = NULL;
01420 
01421     /*
01422      * initialize reference count: 1 because it is nailed in cache
01423      */
01424     relation->rd_refcnt = 1;
01425 
01426     /*
01427      * all entries built with this routine are nailed-in-cache; none are for
01428      * new or temp relations.
01429      */
01430     relation->rd_isnailed = true;
01431     relation->rd_createSubid = InvalidSubTransactionId;
01432     relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
01433     relation->rd_backend = InvalidBackendId;
01434     relation->rd_islocaltemp = false;
01435 
01436     /*
01437      * initialize relation tuple form
01438      *
01439      * The data we insert here is pretty incomplete/bogus, but it'll serve to
01440      * get us launched.  RelationCacheInitializePhase3() will read the real
01441      * data from pg_class and replace what we've done here.  Note in
01442      * particular that relowner is left as zero; this cues
01443      * RelationCacheInitializePhase3 that the real data isn't there yet.
01444      */
01445     relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
01446 
01447     namestrcpy(&relation->rd_rel->relname, relationName);
01448     relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
01449     relation->rd_rel->reltype = relationReltype;
01450 
01451     /*
01452      * It's important to distinguish between shared and non-shared relations,
01453      * even at bootstrap time, to make sure we know where they are stored.
01454      */
01455     relation->rd_rel->relisshared = isshared;
01456     if (isshared)
01457         relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
01458 
01459     /* formrdesc is used only for permanent relations */
01460     relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
01461 
01462     relation->rd_rel->relpages = 0;
01463     relation->rd_rel->reltuples = 0;
01464     relation->rd_rel->relallvisible = 0;
01465     relation->rd_rel->relkind = RELKIND_RELATION;
01466     relation->rd_rel->relhasoids = hasoids;
01467     relation->rd_rel->relnatts = (int16) natts;
01468 
01469     /*
01470      * initialize attribute tuple form
01471      *
01472      * Unlike the case with the relation tuple, this data had better be right
01473      * because it will never be replaced.  The data comes from
01474      * src/include/catalog/ headers via genbki.pl.
01475      */
01476     relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
01477     relation->rd_att->tdrefcount = 1;   /* mark as refcounted */
01478 
01479     relation->rd_att->tdtypeid = relationReltype;
01480     relation->rd_att->tdtypmod = -1;    /* unnecessary, but... */
01481 
01482     /*
01483      * initialize tuple desc info
01484      */
01485     has_not_null = false;
01486     for (i = 0; i < natts; i++)
01487     {
01488         memcpy(relation->rd_att->attrs[i],
01489                &attrs[i],
01490                ATTRIBUTE_FIXED_PART_SIZE);
01491         has_not_null |= attrs[i].attnotnull;
01492         /* make sure attcacheoff is valid */
01493         relation->rd_att->attrs[i]->attcacheoff = -1;
01494     }
01495 
01496     /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
01497     relation->rd_att->attrs[0]->attcacheoff = 0;
01498 
01499     /* mark not-null status */
01500     if (has_not_null)
01501     {
01502         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
01503 
01504         constr->has_not_null = true;
01505         relation->rd_att->constr = constr;
01506     }
01507 
01508     /*
01509      * initialize relation id from info in att array (my, this is ugly)
01510      */
01511     RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
01512 
01513     /*
01514      * All relations made with formrdesc are mapped.  This is necessarily so
01515      * because there is no other way to know what filenode they currently
01516      * have.  In bootstrap mode, add them to the initial relation mapper data,
01517      * specifying that the initial filenode is the same as the OID.
01518      */
01519     relation->rd_rel->relfilenode = InvalidOid;
01520     if (IsBootstrapProcessingMode())
01521         RelationMapUpdateMap(RelationGetRelid(relation),
01522                              RelationGetRelid(relation),
01523                              isshared, true);
01524 
01525     /*
01526      * initialize the relation lock manager information
01527      */
01528     RelationInitLockInfo(relation);     /* see lmgr.c */
01529 
01530     /*
01531      * initialize physical addressing information for the relation
01532      */
01533     RelationInitPhysicalAddr(relation);
01534     relation->rd_ispopulated = true;
01535 
01536     /*
01537      * initialize the rel-has-index flag, using hardwired knowledge
01538      */
01539     if (IsBootstrapProcessingMode())
01540     {
01541         /* In bootstrap mode, we have no indexes */
01542         relation->rd_rel->relhasindex = false;
01543     }
01544     else
01545     {
01546         /* Otherwise, all the rels formrdesc is used for have indexes */
01547         relation->rd_rel->relhasindex = true;
01548     }
01549 
01550     /*
01551      * add new reldesc to relcache
01552      */
01553     RelationCacheInsert(relation);
01554 
01555     /* It's fully valid */
01556     relation->rd_isvalid = true;
01557 }
01558 
01559 
01560 /* ----------------------------------------------------------------
01561  *               Relation Descriptor Lookup Interface
01562  * ----------------------------------------------------------------
01563  */
01564 
01565 /*
01566  *      RelationIdGetRelation
01567  *
01568  *      Lookup a reldesc by OID; make one if not already in cache.
01569  *
01570  *      Returns NULL if no pg_class row could be found for the given relid
01571  *      (suggesting we are trying to access a just-deleted relation).
01572  *      Any other error is reported via elog.
01573  *
01574  *      NB: caller should already have at least AccessShareLock on the
01575  *      relation ID, else there are nasty race conditions.
01576  *
01577  *      NB: relation ref count is incremented, or set to 1 if new entry.
01578  *      Caller should eventually decrement count.  (Usually,
01579  *      that happens by calling RelationClose().)
01580  */
01581 Relation
01582 RelationIdGetRelation(Oid relationId)
01583 {
01584     Relation    rd;
01585 
01586     /*
01587      * first try to find reldesc in the cache
01588      */
01589     RelationIdCacheLookup(relationId, rd);
01590 
01591     if (RelationIsValid(rd))
01592     {
01593         RelationIncrementReferenceCount(rd);
01594         /* revalidate cache entry if necessary */
01595         if (!rd->rd_isvalid)
01596         {
01597             /*
01598              * Indexes only have a limited number of possible schema changes,
01599              * and we don't want to use the full-blown procedure because it's
01600              * a headache for indexes that reload itself depends on.
01601              */
01602             if (rd->rd_rel->relkind == RELKIND_INDEX)
01603                 RelationReloadIndexInfo(rd);
01604             else
01605                 RelationClearRelation(rd, true);
01606         }
01607         return rd;
01608     }
01609 
01610     /*
01611      * no reldesc in the cache, so have RelationBuildDesc() build one and add
01612      * it.
01613      */
01614     rd = RelationBuildDesc(relationId, true);
01615     if (RelationIsValid(rd))
01616         RelationIncrementReferenceCount(rd);
01617     return rd;
01618 }
01619 
01620 /* ----------------------------------------------------------------
01621  *              cache invalidation support routines
01622  * ----------------------------------------------------------------
01623  */
01624 
01625 /*
01626  * RelationIncrementReferenceCount
01627  *      Increments relation reference count.
01628  *
01629  * Note: bootstrap mode has its own weird ideas about relation refcount
01630  * behavior; we ought to fix it someday, but for now, just disable
01631  * reference count ownership tracking in bootstrap mode.
01632  */
01633 void
01634 RelationIncrementReferenceCount(Relation rel)
01635 {
01636     ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
01637     rel->rd_refcnt += 1;
01638     if (!IsBootstrapProcessingMode())
01639         ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
01640 }
01641 
01642 /*
01643  * RelationDecrementReferenceCount
01644  *      Decrements relation reference count.
01645  */
01646 void
01647 RelationDecrementReferenceCount(Relation rel)
01648 {
01649     Assert(rel->rd_refcnt > 0);
01650     rel->rd_refcnt -= 1;
01651     if (!IsBootstrapProcessingMode())
01652         ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
01653 }
01654 
01655 /*
01656  * RelationClose - close an open relation
01657  *
01658  *  Actually, we just decrement the refcount.
01659  *
01660  *  NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
01661  *  will be freed as soon as their refcount goes to zero.  In combination
01662  *  with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
01663  *  to catch references to already-released relcache entries.  It slows
01664  *  things down quite a bit, however.
01665  */
01666 void
01667 RelationClose(Relation relation)
01668 {
01669     /* Note: no locking manipulations needed */
01670     RelationDecrementReferenceCount(relation);
01671 
01672 #ifdef RELCACHE_FORCE_RELEASE
01673     if (RelationHasReferenceCountZero(relation) &&
01674         relation->rd_createSubid == InvalidSubTransactionId &&
01675         relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
01676         RelationClearRelation(relation, false);
01677 #endif
01678 }
01679 
01680 /*
01681  * RelationReloadIndexInfo - reload minimal information for an open index
01682  *
01683  *  This function is used only for indexes.  A relcache inval on an index
01684  *  can mean that its pg_class or pg_index row changed.  There are only
01685  *  very limited changes that are allowed to an existing index's schema,
01686  *  so we can update the relcache entry without a complete rebuild; which
01687  *  is fortunate because we can't rebuild an index entry that is "nailed"
01688  *  and/or in active use.  We support full replacement of the pg_class row,
01689  *  as well as updates of a few simple fields of the pg_index row.
01690  *
01691  *  We can't necessarily reread the catalog rows right away; we might be
01692  *  in a failed transaction when we receive the SI notification.  If so,
01693  *  RelationClearRelation just marks the entry as invalid by setting
01694  *  rd_isvalid to false.  This routine is called to fix the entry when it
01695  *  is next needed.
01696  *
01697  *  We assume that at the time we are called, we have at least AccessShareLock
01698  *  on the target index.  (Note: in the calls from RelationClearRelation,
01699  *  this is legitimate because we know the rel has positive refcount.)
01700  *
01701  *  If the target index is an index on pg_class or pg_index, we'd better have
01702  *  previously gotten at least AccessShareLock on its underlying catalog,
01703  *  else we are at risk of deadlock against someone trying to exclusive-lock
01704  *  the heap and index in that order.  This is ensured in current usage by
01705  *  only applying this to indexes being opened or having positive refcount.
01706  */
01707 static void
01708 RelationReloadIndexInfo(Relation relation)
01709 {
01710     bool        indexOK;
01711     HeapTuple   pg_class_tuple;
01712     Form_pg_class relp;
01713 
01714     /* Should be called only for invalidated indexes */
01715     Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
01716            !relation->rd_isvalid);
01717     /* Should be closed at smgr level */
01718     Assert(relation->rd_smgr == NULL);
01719 
01720     /* Must free any AM cached data upon relcache flush */
01721     if (relation->rd_amcache)
01722         pfree(relation->rd_amcache);
01723     relation->rd_amcache = NULL;
01724 
01725     /*
01726      * If it's a shared index, we might be called before backend startup has
01727      * finished selecting a database, in which case we have no way to read
01728      * pg_class yet.  However, a shared index can never have any significant
01729      * schema updates, so it's okay to ignore the invalidation signal.  Just
01730      * mark it valid and return without doing anything more.
01731      */
01732     if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
01733     {
01734         relation->rd_isvalid = true;
01735         return;
01736     }
01737 
01738     /*
01739      * Read the pg_class row
01740      *
01741      * Don't try to use an indexscan of pg_class_oid_index to reload the info
01742      * for pg_class_oid_index ...
01743      */
01744     indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
01745     pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
01746     if (!HeapTupleIsValid(pg_class_tuple))
01747         elog(ERROR, "could not find pg_class tuple for index %u",
01748              RelationGetRelid(relation));
01749     relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
01750     memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
01751     /* Reload reloptions in case they changed */
01752     if (relation->rd_options)
01753         pfree(relation->rd_options);
01754     RelationParseRelOptions(relation, pg_class_tuple);
01755     /* done with pg_class tuple */
01756     heap_freetuple(pg_class_tuple);
01757     /* We must recalculate physical address in case it changed */
01758     RelationInitPhysicalAddr(relation);
01759     relation->rd_ispopulated = true;
01760 
01761     /*
01762      * For a non-system index, there are fields of the pg_index row that are
01763      * allowed to change, so re-read that row and update the relcache entry.
01764      * Most of the info derived from pg_index (such as support function lookup
01765      * info) cannot change, and indeed the whole point of this routine is to
01766      * update the relcache entry without clobbering that data; so wholesale
01767      * replacement is not appropriate.
01768      */
01769     if (!IsSystemRelation(relation))
01770     {
01771         HeapTuple   tuple;
01772         Form_pg_index index;
01773 
01774         tuple = SearchSysCache1(INDEXRELID,
01775                                 ObjectIdGetDatum(RelationGetRelid(relation)));
01776         if (!HeapTupleIsValid(tuple))
01777             elog(ERROR, "cache lookup failed for index %u",
01778                  RelationGetRelid(relation));
01779         index = (Form_pg_index) GETSTRUCT(tuple);
01780 
01781         /*
01782          * Basically, let's just copy all the bool fields.  There are one or
01783          * two of these that can't actually change in the current code, but
01784          * it's not worth it to track exactly which ones they are.  None of
01785          * the array fields are allowed to change, though.
01786          */
01787         relation->rd_index->indisunique = index->indisunique;
01788         relation->rd_index->indisprimary = index->indisprimary;
01789         relation->rd_index->indisexclusion = index->indisexclusion;
01790         relation->rd_index->indimmediate = index->indimmediate;
01791         relation->rd_index->indisclustered = index->indisclustered;
01792         relation->rd_index->indisvalid = index->indisvalid;
01793         relation->rd_index->indcheckxmin = index->indcheckxmin;
01794         relation->rd_index->indisready = index->indisready;
01795         relation->rd_index->indislive = index->indislive;
01796 
01797         /* Copy xmin too, as that is needed to make sense of indcheckxmin */
01798         HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
01799                                HeapTupleHeaderGetXmin(tuple->t_data));
01800 
01801         ReleaseSysCache(tuple);
01802     }
01803 
01804     /* Okay, now it's valid again */
01805     relation->rd_isvalid = true;
01806 }
01807 
01808 /*
01809  * RelationDestroyRelation
01810  *
01811  *  Physically delete a relation cache entry and all subsidiary data.
01812  *  Caller must already have unhooked the entry from the hash table.
01813  */
01814 static void
01815 RelationDestroyRelation(Relation relation)
01816 {
01817     Assert(RelationHasReferenceCountZero(relation));
01818 
01819     /*
01820      * Make sure smgr and lower levels close the relation's files, if they
01821      * weren't closed already.  (This was probably done by caller, but let's
01822      * just be real sure.)
01823      */
01824     RelationCloseSmgr(relation);
01825 
01826     /*
01827      * Free all the subsidiary data structures of the relcache entry, then the
01828      * entry itself.
01829      */
01830     if (relation->rd_rel)
01831         pfree(relation->rd_rel);
01832     /* can't use DecrTupleDescRefCount here */
01833     Assert(relation->rd_att->tdrefcount > 0);
01834     if (--relation->rd_att->tdrefcount == 0)
01835         FreeTupleDesc(relation->rd_att);
01836     list_free(relation->rd_indexlist);
01837     bms_free(relation->rd_indexattr);
01838     FreeTriggerDesc(relation->trigdesc);
01839     if (relation->rd_options)
01840         pfree(relation->rd_options);
01841     if (relation->rd_indextuple)
01842         pfree(relation->rd_indextuple);
01843     if (relation->rd_am)
01844         pfree(relation->rd_am);
01845     if (relation->rd_indexcxt)
01846         MemoryContextDelete(relation->rd_indexcxt);
01847     if (relation->rd_rulescxt)
01848         MemoryContextDelete(relation->rd_rulescxt);
01849     if (relation->rd_fdwroutine)
01850         pfree(relation->rd_fdwroutine);
01851     pfree(relation);
01852 }
01853 
01854 /*
01855  * RelationClearRelation
01856  *
01857  *   Physically blow away a relation cache entry, or reset it and rebuild
01858  *   it from scratch (that is, from catalog entries).  The latter path is
01859  *   used when we are notified of a change to an open relation (one with
01860  *   refcount > 0).
01861  *
01862  *   NB: when rebuilding, we'd better hold some lock on the relation,
01863  *   else the catalog data we need to read could be changing under us.
01864  *   Also, a rel to be rebuilt had better have refcnt > 0.  This is because
01865  *   an sinval reset could happen while we're accessing the catalogs, and
01866  *   the rel would get blown away underneath us by RelationCacheInvalidate
01867  *   if it has zero refcnt.
01868  *
01869  *   The "rebuild" parameter is redundant in current usage because it has
01870  *   to match the relation's refcnt status, but we keep it as a crosscheck
01871  *   that we're doing what the caller expects.
01872  */
01873 static void
01874 RelationClearRelation(Relation relation, bool rebuild)
01875 {
01876     /*
01877      * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
01878      * course it would be a bad idea to blow away one with nonzero refcnt.
01879      */
01880     Assert(rebuild ?
01881            !RelationHasReferenceCountZero(relation) :
01882            RelationHasReferenceCountZero(relation));
01883 
01884     /*
01885      * Make sure smgr and lower levels close the relation's files, if they
01886      * weren't closed already.  If the relation is not getting deleted, the
01887      * next smgr access should reopen the files automatically.  This ensures
01888      * that the low-level file access state is updated after, say, a vacuum
01889      * truncation.
01890      */
01891     RelationCloseSmgr(relation);
01892 
01893     /*
01894      * Never, never ever blow away a nailed-in system relation, because we'd
01895      * be unable to recover.  However, we must redo RelationInitPhysicalAddr
01896      * in case it is a mapped relation whose mapping changed.
01897      *
01898      * If it's a nailed index, then we need to re-read the pg_class row to see
01899      * if its relfilenode changed.  We can't necessarily do that here, because
01900      * we might be in a failed transaction.  We assume it's okay to do it if
01901      * there are open references to the relcache entry (cf notes for
01902      * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
01903      * invalid, and it'll be fixed when next opened.
01904      */
01905     if (relation->rd_isnailed)
01906     {
01907         RelationInitPhysicalAddr(relation);
01908         if (relation->rd_rel->relkind == RELKIND_MATVIEW &&
01909             heap_is_matview_init_state(relation))
01910             relation->rd_ispopulated = false;
01911         else
01912             relation->rd_ispopulated = true;
01913 
01914         if (relation->rd_rel->relkind == RELKIND_INDEX)
01915         {
01916             relation->rd_isvalid = false;       /* needs to be revalidated */
01917             if (relation->rd_refcnt > 1)
01918                 RelationReloadIndexInfo(relation);
01919         }
01920         return;
01921     }
01922 
01923     /*
01924      * Even non-system indexes should not be blown away if they are open and
01925      * have valid index support information.  This avoids problems with active
01926      * use of the index support information.  As with nailed indexes, we
01927      * re-read the pg_class row to handle possible physical relocation of the
01928      * index, and we check for pg_index updates too.
01929      */
01930     if (relation->rd_rel->relkind == RELKIND_INDEX &&
01931         relation->rd_refcnt > 0 &&
01932         relation->rd_indexcxt != NULL)
01933     {
01934         relation->rd_isvalid = false;   /* needs to be revalidated */
01935         RelationReloadIndexInfo(relation);
01936         return;
01937     }
01938 
01939     /* Mark it invalid until we've finished rebuild */
01940     relation->rd_isvalid = false;
01941 
01942     /*
01943      * If we're really done with the relcache entry, blow it away. But if
01944      * someone is still using it, reconstruct the whole deal without moving
01945      * the physical RelationData record (so that the someone's pointer is
01946      * still valid).
01947      */
01948     if (!rebuild)
01949     {
01950         /* Remove it from the hash table */
01951         RelationCacheDelete(relation);
01952 
01953         /* And release storage */
01954         RelationDestroyRelation(relation);
01955     }
01956     else
01957     {
01958         /*
01959          * Our strategy for rebuilding an open relcache entry is to build a
01960          * new entry from scratch, swap its contents with the old entry, and
01961          * finally delete the new entry (along with any infrastructure swapped
01962          * over from the old entry).  This is to avoid trouble in case an
01963          * error causes us to lose control partway through.  The old entry
01964          * will still be marked !rd_isvalid, so we'll try to rebuild it again
01965          * on next access.  Meanwhile it's not any less valid than it was
01966          * before, so any code that might expect to continue accessing it
01967          * isn't hurt by the rebuild failure.  (Consider for example a
01968          * subtransaction that ALTERs a table and then gets canceled partway
01969          * through the cache entry rebuild.  The outer transaction should
01970          * still see the not-modified cache entry as valid.)  The worst
01971          * consequence of an error is leaking the necessarily-unreferenced new
01972          * entry, and this shouldn't happen often enough for that to be a big
01973          * problem.
01974          *
01975          * When rebuilding an open relcache entry, we must preserve ref count,
01976          * rd_createSubid/rd_newRelfilenodeSubid, and rd_toastoid state.  Also
01977          * attempt to preserve the pg_class entry (rd_rel), tupledesc, and
01978          * rewrite-rule substructures in place, because various places assume
01979          * that these structures won't move while they are working with an
01980          * open relcache entry.  (Note: the refcount mechanism for tupledescs
01981          * might someday allow us to remove this hack for the tupledesc.)
01982          *
01983          * Note that this process does not touch CurrentResourceOwner; which
01984          * is good because whatever ref counts the entry may have do not
01985          * necessarily belong to that resource owner.
01986          */
01987         Relation    newrel;
01988         Oid         save_relid = RelationGetRelid(relation);
01989         bool        keep_tupdesc;
01990         bool        keep_rules;
01991 
01992         /* Build temporary entry, but don't link it into hashtable */
01993         newrel = RelationBuildDesc(save_relid, false);
01994         if (newrel == NULL)
01995         {
01996             /* Should only get here if relation was deleted */
01997             RelationCacheDelete(relation);
01998             RelationDestroyRelation(relation);
01999             elog(ERROR, "relation %u deleted while still in use", save_relid);
02000         }
02001 
02002         keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
02003         keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
02004 
02005         /*
02006          * Perform swapping of the relcache entry contents.  Within this
02007          * process the old entry is momentarily invalid, so there *must* be no
02008          * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
02009          * all-in-line code for safety.
02010          *
02011          * Since the vast majority of fields should be swapped, our method is
02012          * to swap the whole structures and then re-swap those few fields we
02013          * didn't want swapped.
02014          */
02015 #define SWAPFIELD(fldtype, fldname) \
02016         do { \
02017             fldtype _tmp = newrel->fldname; \
02018             newrel->fldname = relation->fldname; \
02019             relation->fldname = _tmp; \
02020         } while (0)
02021 
02022         /* swap all Relation struct fields */
02023         {
02024             RelationData tmpstruct;
02025 
02026             memcpy(&tmpstruct, newrel, sizeof(RelationData));
02027             memcpy(newrel, relation, sizeof(RelationData));
02028             memcpy(relation, &tmpstruct, sizeof(RelationData));
02029         }
02030 
02031         /* rd_smgr must not be swapped, due to back-links from smgr level */
02032         SWAPFIELD(SMgrRelation, rd_smgr);
02033         /* rd_refcnt must be preserved */
02034         SWAPFIELD(int, rd_refcnt);
02035         /* isnailed shouldn't change */
02036         Assert(newrel->rd_isnailed == relation->rd_isnailed);
02037         /* creation sub-XIDs must be preserved */
02038         SWAPFIELD(SubTransactionId, rd_createSubid);
02039         SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
02040         /* un-swap rd_rel pointers, swap contents instead */
02041         SWAPFIELD(Form_pg_class, rd_rel);
02042         /* ... but actually, we don't have to update newrel->rd_rel */
02043         memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
02044         /* preserve old tupledesc and rules if no logical change */
02045         if (keep_tupdesc)
02046             SWAPFIELD(TupleDesc, rd_att);
02047         if (keep_rules)
02048         {
02049             SWAPFIELD(RuleLock *, rd_rules);
02050             SWAPFIELD(MemoryContext, rd_rulescxt);
02051         }
02052         /* toast OID override must be preserved */
02053         SWAPFIELD(Oid, rd_toastoid);
02054         /* pgstat_info must be preserved */
02055         SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
02056 
02057 #undef SWAPFIELD
02058 
02059         /* And now we can throw away the temporary entry */
02060         RelationDestroyRelation(newrel);
02061     }
02062 }
02063 
02064 /*
02065  * RelationFlushRelation
02066  *
02067  *   Rebuild the relation if it is open (refcount > 0), else blow it away.
02068  */
02069 static void
02070 RelationFlushRelation(Relation relation)
02071 {
02072     if (relation->rd_createSubid != InvalidSubTransactionId ||
02073         relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
02074     {
02075         /*
02076          * New relcache entries are always rebuilt, not flushed; else we'd
02077          * forget the "new" status of the relation, which is a useful
02078          * optimization to have.  Ditto for the new-relfilenode status.
02079          *
02080          * The rel could have zero refcnt here, so temporarily increment the
02081          * refcnt to ensure it's safe to rebuild it.  We can assume that the
02082          * current transaction has some lock on the rel already.
02083          */
02084         RelationIncrementReferenceCount(relation);
02085         RelationClearRelation(relation, true);
02086         RelationDecrementReferenceCount(relation);
02087     }
02088     else
02089     {
02090         /*
02091          * Pre-existing rels can be dropped from the relcache if not open.
02092          */
02093         bool        rebuild = !RelationHasReferenceCountZero(relation);
02094 
02095         RelationClearRelation(relation, rebuild);
02096     }
02097 }
02098 
02099 /*
02100  * RelationForgetRelation - unconditionally remove a relcache entry
02101  *
02102  *         External interface for destroying a relcache entry when we
02103  *         drop the relation.
02104  */
02105 void
02106 RelationForgetRelation(Oid rid)
02107 {
02108     Relation    relation;
02109 
02110     RelationIdCacheLookup(rid, relation);
02111 
02112     if (!PointerIsValid(relation))
02113         return;                 /* not in cache, nothing to do */
02114 
02115     if (!RelationHasReferenceCountZero(relation))
02116         elog(ERROR, "relation %u is still open", rid);
02117 
02118     /* Unconditionally destroy the relcache entry */
02119     RelationClearRelation(relation, false);
02120 }
02121 
02122 /*
02123  *      RelationCacheInvalidateEntry
02124  *
02125  *      This routine is invoked for SI cache flush messages.
02126  *
02127  * Any relcache entry matching the relid must be flushed.  (Note: caller has
02128  * already determined that the relid belongs to our database or is a shared
02129  * relation.)
02130  *
02131  * We used to skip local relations, on the grounds that they could
02132  * not be targets of cross-backend SI update messages; but it seems
02133  * safer to process them, so that our *own* SI update messages will
02134  * have the same effects during CommandCounterIncrement for both
02135  * local and nonlocal relations.
02136  */
02137 void
02138 RelationCacheInvalidateEntry(Oid relationId)
02139 {
02140     Relation    relation;
02141 
02142     RelationIdCacheLookup(relationId, relation);
02143 
02144     if (PointerIsValid(relation))
02145     {
02146         relcacheInvalsReceived++;
02147         RelationFlushRelation(relation);
02148     }
02149 }
02150 
02151 /*
02152  * RelationCacheInvalidate
02153  *   Blow away cached relation descriptors that have zero reference counts,
02154  *   and rebuild those with positive reference counts.  Also reset the smgr
02155  *   relation cache and re-read relation mapping data.
02156  *
02157  *   This is currently used only to recover from SI message buffer overflow,
02158  *   so we do not touch new-in-transaction relations; they cannot be targets
02159  *   of cross-backend SI updates (and our own updates now go through a
02160  *   separate linked list that isn't limited by the SI message buffer size).
02161  *   Likewise, we need not discard new-relfilenode-in-transaction hints,
02162  *   since any invalidation of those would be a local event.
02163  *
02164  *   We do this in two phases: the first pass deletes deletable items, and
02165  *   the second one rebuilds the rebuildable items.  This is essential for
02166  *   safety, because hash_seq_search only copes with concurrent deletion of
02167  *   the element it is currently visiting.  If a second SI overflow were to
02168  *   occur while we are walking the table, resulting in recursive entry to
02169  *   this routine, we could crash because the inner invocation blows away
02170  *   the entry next to be visited by the outer scan.  But this way is OK,
02171  *   because (a) during the first pass we won't process any more SI messages,
02172  *   so hash_seq_search will complete safely; (b) during the second pass we
02173  *   only hold onto pointers to nondeletable entries.
02174  *
02175  *   The two-phase approach also makes it easy to update relfilenodes for
02176  *   mapped relations before we do anything else, and to ensure that the
02177  *   second pass processes nailed-in-cache items before other nondeletable
02178  *   items.  This should ensure that system catalogs are up to date before
02179  *   we attempt to use them to reload information about other open relations.
02180  */
02181 void
02182 RelationCacheInvalidate(void)
02183 {
02184     HASH_SEQ_STATUS status;
02185     RelIdCacheEnt *idhentry;
02186     Relation    relation;
02187     List       *rebuildFirstList = NIL;
02188     List       *rebuildList = NIL;
02189     ListCell   *l;
02190 
02191     /*
02192      * Reload relation mapping data before starting to reconstruct cache.
02193      */
02194     RelationMapInvalidateAll();
02195 
02196     /* Phase 1 */
02197     hash_seq_init(&status, RelationIdCache);
02198 
02199     while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
02200     {
02201         relation = idhentry->reldesc;
02202 
02203         /* Must close all smgr references to avoid leaving dangling ptrs */
02204         RelationCloseSmgr(relation);
02205 
02206         /*
02207          * Ignore new relations; no other backend will manipulate them before
02208          * we commit.  Likewise, before replacing a relation's relfilenode, we
02209          * shall have acquired AccessExclusiveLock and drained any applicable
02210          * pending invalidations.
02211          */
02212         if (relation->rd_createSubid != InvalidSubTransactionId ||
02213             relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
02214             continue;
02215 
02216         relcacheInvalsReceived++;
02217 
02218         if (RelationHasReferenceCountZero(relation))
02219         {
02220             /* Delete this entry immediately */
02221             Assert(!relation->rd_isnailed);
02222             RelationClearRelation(relation, false);
02223         }
02224         else
02225         {
02226             /*
02227              * If it's a mapped relation, immediately update its rd_node in
02228              * case its relfilenode changed.  We must do this during phase 1
02229              * in case the relation is consulted during rebuild of other
02230              * relcache entries in phase 2.  It's safe since consulting the
02231              * map doesn't involve any access to relcache entries.
02232              */
02233             if (RelationIsMapped(relation))
02234                 RelationInitPhysicalAddr(relation);
02235 
02236             /*
02237              * Add this entry to list of stuff to rebuild in second pass.
02238              * pg_class goes to the front of rebuildFirstList while
02239              * pg_class_oid_index goes to the back of rebuildFirstList, so
02240              * they are done first and second respectively.  Other nailed
02241              * relations go to the front of rebuildList, so they'll be done
02242              * next in no particular order; and everything else goes to the
02243              * back of rebuildList.
02244              */
02245             if (RelationGetRelid(relation) == RelationRelationId)
02246                 rebuildFirstList = lcons(relation, rebuildFirstList);
02247             else if (RelationGetRelid(relation) == ClassOidIndexId)
02248                 rebuildFirstList = lappend(rebuildFirstList, relation);
02249             else if (relation->rd_isnailed)
02250                 rebuildList = lcons(relation, rebuildList);
02251             else
02252                 rebuildList = lappend(rebuildList, relation);
02253         }
02254     }
02255 
02256     /*
02257      * Now zap any remaining smgr cache entries.  This must happen before we
02258      * start to rebuild entries, since that may involve catalog fetches which
02259      * will re-open catalog files.
02260      */
02261     smgrcloseall();
02262 
02263     /* Phase 2: rebuild the items found to need rebuild in phase 1 */
02264     foreach(l, rebuildFirstList)
02265     {
02266         relation = (Relation) lfirst(l);
02267         RelationClearRelation(relation, true);
02268     }
02269     list_free(rebuildFirstList);
02270     foreach(l, rebuildList)
02271     {
02272         relation = (Relation) lfirst(l);
02273         RelationClearRelation(relation, true);
02274     }
02275     list_free(rebuildList);
02276 }
02277 
02278 /*
02279  * RelationCloseSmgrByOid - close a relcache entry's smgr link
02280  *
02281  * Needed in some cases where we are changing a relation's physical mapping.
02282  * The link will be automatically reopened on next use.
02283  */
02284 void
02285 RelationCloseSmgrByOid(Oid relationId)
02286 {
02287     Relation    relation;
02288 
02289     RelationIdCacheLookup(relationId, relation);
02290 
02291     if (!PointerIsValid(relation))
02292         return;                 /* not in cache, nothing to do */
02293 
02294     RelationCloseSmgr(relation);
02295 }
02296 
02297 /*
02298  * AtEOXact_RelationCache
02299  *
02300  *  Clean up the relcache at main-transaction commit or abort.
02301  *
02302  * Note: this must be called *before* processing invalidation messages.
02303  * In the case of abort, we don't want to try to rebuild any invalidated
02304  * cache entries (since we can't safely do database accesses).  Therefore
02305  * we must reset refcnts before handling pending invalidations.
02306  *
02307  * As of PostgreSQL 8.1, relcache refcnts should get released by the
02308  * ResourceOwner mechanism.  This routine just does a debugging
02309  * cross-check that no pins remain.  However, we also need to do special
02310  * cleanup when the current transaction created any relations or made use
02311  * of forced index lists.
02312  */
02313 void
02314 AtEOXact_RelationCache(bool isCommit)
02315 {
02316     HASH_SEQ_STATUS status;
02317     RelIdCacheEnt *idhentry;
02318     int         i;
02319 
02320     /*
02321      * Unless the eoxact_list[] overflowed, we only need to examine the rels
02322      * listed in it.  Otherwise fall back on a hash_seq_search scan.
02323      *
02324      * For simplicity, eoxact_list[] entries are not deleted till end of
02325      * top-level transaction, even though we could remove them at
02326      * subtransaction end in some cases, or remove relations from the list if
02327      * they are cleared for other reasons.  Therefore we should expect the
02328      * case that list entries are not found in the hashtable; if not, there's
02329      * nothing to do for them.
02330      */
02331     if (eoxact_list_overflowed)
02332     {
02333         hash_seq_init(&status, RelationIdCache);
02334         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
02335         {
02336             AtEOXact_cleanup(idhentry->reldesc, isCommit);
02337         }
02338     }
02339     else
02340     {
02341         for (i = 0; i < eoxact_list_len; i++)
02342         {
02343             idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
02344                                                      (void *) &eoxact_list[i],
02345                                                      HASH_FIND,
02346                                                      NULL);
02347             if (idhentry != NULL)
02348                 AtEOXact_cleanup(idhentry->reldesc, isCommit);
02349         }
02350     }
02351 
02352     /* Now we're out of the transaction and can clear the list */
02353     eoxact_list_len = 0;
02354     eoxact_list_overflowed = false;
02355 }
02356 
02357 /*
02358  * AtEOXact_cleanup
02359  *
02360  *  Clean up a single rel at main-transaction commit or abort
02361  *
02362  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
02363  * bother to prevent duplicate entries in eoxact_list[].
02364  */
02365 static void
02366 AtEOXact_cleanup(Relation relation, bool isCommit)
02367 {
02368         /*
02369          * The relcache entry's ref count should be back to its normal
02370          * not-in-a-transaction state: 0 unless it's nailed in cache.
02371          *
02372          * In bootstrap mode, this is NOT true, so don't check it --- the
02373          * bootstrap code expects relations to stay open across start/commit
02374          * transaction calls.  (That seems bogus, but it's not worth fixing.)
02375          *
02376          * Note: ideally this check would be applied to every relcache entry,
02377          * not just those that have eoxact work to do.  But it's not worth
02378          * forcing a scan of the whole relcache just for this.  (Moreover,
02379          * doing so would mean that assert-enabled testing never tests the
02380          * hash_search code path above, which seems a bad idea.)
02381          */
02382 #ifdef USE_ASSERT_CHECKING
02383         if (!IsBootstrapProcessingMode())
02384         {
02385             int         expected_refcnt;
02386 
02387             expected_refcnt = relation->rd_isnailed ? 1 : 0;
02388             Assert(relation->rd_refcnt == expected_refcnt);
02389         }
02390 #endif
02391 
02392         /*
02393          * Is it a relation created in the current transaction?
02394          *
02395          * During commit, reset the flag to zero, since we are now out of the
02396          * creating transaction.  During abort, simply delete the relcache
02397          * entry --- it isn't interesting any longer.  (NOTE: if we have
02398          * forgotten the new-ness of a new relation due to a forced cache
02399          * flush, the entry will get deleted anyway by shared-cache-inval
02400          * processing of the aborted pg_class insertion.)
02401          */
02402         if (relation->rd_createSubid != InvalidSubTransactionId)
02403         {
02404             if (isCommit)
02405                 relation->rd_createSubid = InvalidSubTransactionId;
02406             else
02407             {
02408                 RelationClearRelation(relation, false);
02409                 return;
02410             }
02411         }
02412 
02413         /*
02414          * Likewise, reset the hint about the relfilenode being new.
02415          */
02416         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
02417 
02418         /*
02419          * Flush any temporary index list.
02420          */
02421         if (relation->rd_indexvalid == 2)
02422         {
02423             list_free(relation->rd_indexlist);
02424             relation->rd_indexlist = NIL;
02425             relation->rd_oidindex = InvalidOid;
02426             relation->rd_indexvalid = 0;
02427         }
02428 }
02429 
02430 /*
02431  * AtEOSubXact_RelationCache
02432  *
02433  *  Clean up the relcache at sub-transaction commit or abort.
02434  *
02435  * Note: this must be called *before* processing invalidation messages.
02436  */
02437 void
02438 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
02439                           SubTransactionId parentSubid)
02440 {
02441     HASH_SEQ_STATUS status;
02442     RelIdCacheEnt *idhentry;
02443     int         i;
02444 
02445     /*
02446      * Unless the eoxact_list[] overflowed, we only need to examine the rels
02447      * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
02448      * logic as in AtEOXact_RelationCache.
02449      */
02450     if (eoxact_list_overflowed)
02451     {
02452         hash_seq_init(&status, RelationIdCache);
02453         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
02454         {
02455             AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
02456                                 mySubid, parentSubid);
02457         }
02458     }
02459     else
02460     {
02461         for (i = 0; i < eoxact_list_len; i++)
02462         {
02463             idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
02464                                                      (void *) &eoxact_list[i],
02465                                                      HASH_FIND,
02466                                                      NULL);
02467             if (idhentry != NULL)
02468                 AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
02469                                     mySubid, parentSubid);
02470         }
02471     }
02472 
02473     /* Don't reset the list; we still need more cleanup later */
02474 }
02475 
02476 /*
02477  * AtEOSubXact_cleanup
02478  *
02479  *  Clean up a single rel at subtransaction commit or abort
02480  *
02481  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
02482  * bother to prevent duplicate entries in eoxact_list[].
02483  */
02484 static void
02485 AtEOSubXact_cleanup(Relation relation, bool isCommit,
02486                     SubTransactionId mySubid, SubTransactionId parentSubid)
02487 {
02488         /*
02489          * Is it a relation created in the current subtransaction?
02490          *
02491          * During subcommit, mark it as belonging to the parent, instead.
02492          * During subabort, simply delete the relcache entry.
02493          */
02494         if (relation->rd_createSubid == mySubid)
02495         {
02496             if (isCommit)
02497                 relation->rd_createSubid = parentSubid;
02498             else
02499             {
02500                 RelationClearRelation(relation, false);
02501                 return;
02502             }
02503         }
02504 
02505         /*
02506          * Likewise, update or drop any new-relfilenode-in-subtransaction
02507          * hint.
02508          */
02509         if (relation->rd_newRelfilenodeSubid == mySubid)
02510         {
02511             if (isCommit)
02512                 relation->rd_newRelfilenodeSubid = parentSubid;
02513             else
02514                 relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
02515         }
02516 
02517         /*
02518          * Flush any temporary index list.
02519          */
02520         if (relation->rd_indexvalid == 2)
02521         {
02522             list_free(relation->rd_indexlist);
02523             relation->rd_indexlist = NIL;
02524             relation->rd_oidindex = InvalidOid;
02525             relation->rd_indexvalid = 0;
02526         }
02527 }
02528 
02529 
02530 /*
02531  *      RelationBuildLocalRelation
02532  *          Build a relcache entry for an about-to-be-created relation,
02533  *          and enter it into the relcache.
02534  */
02535 Relation
02536 RelationBuildLocalRelation(const char *relname,
02537                            Oid relnamespace,
02538                            TupleDesc tupDesc,
02539                            Oid relid,
02540                            Oid relfilenode,
02541                            Oid reltablespace,
02542                            bool shared_relation,
02543                            bool mapped_relation,
02544                            char relpersistence,
02545                            char relkind)
02546 {
02547     Relation    rel;
02548     MemoryContext oldcxt;
02549     int         natts = tupDesc->natts;
02550     int         i;
02551     bool        has_not_null;
02552     bool        nailit;
02553 
02554     AssertArg(natts >= 0);
02555 
02556     /*
02557      * check for creation of a rel that must be nailed in cache.
02558      *
02559      * XXX this list had better match the relations specially handled in
02560      * RelationCacheInitializePhase2/3.
02561      */
02562     switch (relid)
02563     {
02564         case DatabaseRelationId:
02565         case AuthIdRelationId:
02566         case AuthMemRelationId:
02567         case RelationRelationId:
02568         case AttributeRelationId:
02569         case ProcedureRelationId:
02570         case TypeRelationId:
02571             nailit = true;
02572             break;
02573         default:
02574             nailit = false;
02575             break;
02576     }
02577 
02578     /*
02579      * check that hardwired list of shared rels matches what's in the
02580      * bootstrap .bki file.  If you get a failure here during initdb, you
02581      * probably need to fix IsSharedRelation() to match whatever you've done
02582      * to the set of shared relations.
02583      */
02584     if (shared_relation != IsSharedRelation(relid))
02585         elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
02586              relname, relid);
02587 
02588     /* Shared relations had better be mapped, too */
02589     Assert(mapped_relation || !shared_relation);
02590 
02591     /*
02592      * switch to the cache context to create the relcache entry.
02593      */
02594     if (!CacheMemoryContext)
02595         CreateCacheMemoryContext();
02596 
02597     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
02598 
02599     /*
02600      * allocate a new relation descriptor and fill in basic state fields.
02601      */
02602     rel = (Relation) palloc0(sizeof(RelationData));
02603 
02604     /* make sure relation is marked as having no open file yet */
02605     rel->rd_smgr = NULL;
02606 
02607     /* mark it nailed if appropriate */
02608     rel->rd_isnailed = nailit;
02609 
02610     rel->rd_refcnt = nailit ? 1 : 0;
02611 
02612     /* it's being created in this transaction */
02613     rel->rd_createSubid = GetCurrentSubTransactionId();
02614     rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
02615 
02616     /*
02617      * create a new tuple descriptor from the one passed in.  We do this
02618      * partly to copy it into the cache context, and partly because the new
02619      * relation can't have any defaults or constraints yet; they have to be
02620      * added in later steps, because they require additions to multiple system
02621      * catalogs.  We can copy attnotnull constraints here, however.
02622      */
02623     rel->rd_att = CreateTupleDescCopy(tupDesc);
02624     rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
02625     has_not_null = false;
02626     for (i = 0; i < natts; i++)
02627     {
02628         rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
02629         has_not_null |= tupDesc->attrs[i]->attnotnull;
02630     }
02631 
02632     if (has_not_null)
02633     {
02634         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
02635 
02636         constr->has_not_null = true;
02637         rel->rd_att->constr = constr;
02638     }
02639 
02640     /*
02641      * initialize relation tuple form (caller may add/override data later)
02642      */
02643     rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
02644 
02645     namestrcpy(&rel->rd_rel->relname, relname);
02646     rel->rd_rel->relnamespace = relnamespace;
02647 
02648     rel->rd_rel->relkind = relkind;
02649     rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
02650     rel->rd_rel->relnatts = natts;
02651     rel->rd_rel->reltype = InvalidOid;
02652     /* needed when bootstrapping: */
02653     rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
02654 
02655     /* set up persistence and relcache fields dependent on it */
02656     rel->rd_rel->relpersistence = relpersistence;
02657     switch (relpersistence)
02658     {
02659         case RELPERSISTENCE_UNLOGGED:
02660         case RELPERSISTENCE_PERMANENT:
02661             rel->rd_backend = InvalidBackendId;
02662             rel->rd_islocaltemp = false;
02663             break;
02664         case RELPERSISTENCE_TEMP:
02665             Assert(isTempOrToastNamespace(relnamespace));
02666             rel->rd_backend = MyBackendId;
02667             rel->rd_islocaltemp = true;
02668             break;
02669         default:
02670             elog(ERROR, "invalid relpersistence: %c", relpersistence);
02671             break;
02672     }
02673 
02674     /*
02675      * Insert relation physical and logical identifiers (OIDs) into the right
02676      * places.  For a mapped relation, we set relfilenode to zero and rely on
02677      * RelationInitPhysicalAddr to consult the map.
02678      */
02679     rel->rd_rel->relisshared = shared_relation;
02680 
02681     RelationGetRelid(rel) = relid;
02682 
02683     for (i = 0; i < natts; i++)
02684         rel->rd_att->attrs[i]->attrelid = relid;
02685 
02686     rel->rd_rel->reltablespace = reltablespace;
02687 
02688     if (mapped_relation)
02689     {
02690         rel->rd_rel->relfilenode = InvalidOid;
02691         /* Add it to the active mapping information */
02692         RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
02693     }
02694     else
02695         rel->rd_rel->relfilenode = relfilenode;
02696 
02697     RelationInitLockInfo(rel);  /* see lmgr.c */
02698 
02699     RelationInitPhysicalAddr(rel);
02700 
02701     /* materialized view not initially scannable */
02702     if (relkind == RELKIND_MATVIEW)
02703         rel->rd_ispopulated = false;
02704     else
02705         rel->rd_ispopulated = true;
02706 
02707     /*
02708      * Okay to insert into the relcache hash tables.
02709      */
02710     RelationCacheInsert(rel);
02711 
02712     /*
02713      * Flag relation as needing eoxact cleanup (to clear rd_createSubid).
02714      * We can't do this before storing relid in it.
02715      */
02716     EOXactListAdd(rel);
02717 
02718     /*
02719      * done building relcache entry.
02720      */
02721     MemoryContextSwitchTo(oldcxt);
02722 
02723     /* It's fully valid */
02724     rel->rd_isvalid = true;
02725 
02726     /*
02727      * Caller expects us to pin the returned entry.
02728      */
02729     RelationIncrementReferenceCount(rel);
02730 
02731     return rel;
02732 }
02733 
02734 
02735 /*
02736  * RelationSetNewRelfilenode
02737  *
02738  * Assign a new relfilenode (physical file name) to the relation.
02739  *
02740  * This allows a full rewrite of the relation to be done with transactional
02741  * safety (since the filenode assignment can be rolled back).  Note however
02742  * that there is no simple way to access the relation's old data for the
02743  * remainder of the current transaction.  This limits the usefulness to cases
02744  * such as TRUNCATE or rebuilding an index from scratch.
02745  *
02746  * Caller must already hold exclusive lock on the relation.
02747  *
02748  * The relation is marked with relfrozenxid = freezeXid (InvalidTransactionId
02749  * must be passed for indexes and sequences).  This should be a lower bound on
02750  * the XIDs that will be put into the new relation contents.
02751  */
02752 void
02753 RelationSetNewRelfilenode(Relation relation, TransactionId freezeXid,
02754                           MultiXactId minmulti)
02755 {
02756     Oid         newrelfilenode;
02757     RelFileNodeBackend newrnode;
02758     Relation    pg_class;
02759     HeapTuple   tuple;
02760     Form_pg_class classform;
02761 
02762     /* Indexes, sequences must have Invalid frozenxid; other rels must not */
02763     Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
02764             relation->rd_rel->relkind == RELKIND_SEQUENCE) ?
02765            freezeXid == InvalidTransactionId :
02766            TransactionIdIsNormal(freezeXid));
02767     Assert(TransactionIdIsNormal(freezeXid) == MultiXactIdIsValid(minmulti));
02768 
02769     /* Allocate a new relfilenode */
02770     newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
02771                                        relation->rd_rel->relpersistence);
02772 
02773     /*
02774      * Get a writable copy of the pg_class tuple for the given relation.
02775      */
02776     pg_class = heap_open(RelationRelationId, RowExclusiveLock);
02777 
02778     tuple = SearchSysCacheCopy1(RELOID,
02779                                 ObjectIdGetDatum(RelationGetRelid(relation)));
02780     if (!HeapTupleIsValid(tuple))
02781         elog(ERROR, "could not find tuple for relation %u",
02782              RelationGetRelid(relation));
02783     classform = (Form_pg_class) GETSTRUCT(tuple);
02784 
02785     /*
02786      * Create storage for the main fork of the new relfilenode.
02787      *
02788      * NOTE: any conflict in relfilenode value will be caught here, if
02789      * GetNewRelFileNode messes up for any reason.
02790      */
02791     newrnode.node = relation->rd_node;
02792     newrnode.node.relNode = newrelfilenode;
02793     newrnode.backend = relation->rd_backend;
02794     RelationCreateStorage(newrnode.node, relation->rd_rel->relpersistence);
02795     smgrclosenode(newrnode);
02796 
02797     /*
02798      * Schedule unlinking of the old storage at transaction commit.
02799      */
02800     RelationDropStorage(relation);
02801 
02802     /*
02803      * Now update the pg_class row.  However, if we're dealing with a mapped
02804      * index, pg_class.relfilenode doesn't change; instead we have to send the
02805      * update to the relation mapper.
02806      */
02807     if (RelationIsMapped(relation))
02808         RelationMapUpdateMap(RelationGetRelid(relation),
02809                              newrelfilenode,
02810                              relation->rd_rel->relisshared,
02811                              false);
02812     else
02813         classform->relfilenode = newrelfilenode;
02814 
02815     /* These changes are safe even for a mapped relation */
02816     if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
02817     {
02818         classform->relpages = 0;    /* it's empty until further notice */
02819         classform->reltuples = 0;
02820         classform->relallvisible = 0;
02821     }
02822     classform->relfrozenxid = freezeXid;
02823     classform->relminmxid = minmulti;
02824 
02825     simple_heap_update(pg_class, &tuple->t_self, tuple);
02826     CatalogUpdateIndexes(pg_class, tuple);
02827 
02828     heap_freetuple(tuple);
02829 
02830     heap_close(pg_class, RowExclusiveLock);
02831 
02832     /*
02833      * Make the pg_class row change visible, as well as the relation map
02834      * change if any.  This will cause the relcache entry to get updated, too.
02835      */
02836     CommandCounterIncrement();
02837 
02838     /*
02839      * Mark the rel as having been given a new relfilenode in the current
02840      * (sub) transaction.  This is a hint that can be used to optimize later
02841      * operations on the rel in the same transaction.
02842      */
02843     relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
02844 
02845     /* Flag relation as needing eoxact cleanup (to remove the hint) */
02846     EOXactListAdd(relation);
02847 }
02848 
02849 
02850 /*
02851  *      RelationCacheInitialize
02852  *
02853  *      This initializes the relation descriptor cache.  At the time
02854  *      that this is invoked, we can't do database access yet (mainly
02855  *      because the transaction subsystem is not up); all we are doing
02856  *      is making an empty cache hashtable.  This must be done before
02857  *      starting the initialization transaction, because otherwise
02858  *      AtEOXact_RelationCache would crash if that transaction aborts
02859  *      before we can get the relcache set up.
02860  */
02861 
02862 #define INITRELCACHESIZE        400
02863 
02864 void
02865 RelationCacheInitialize(void)
02866 {
02867     HASHCTL     ctl;
02868 
02869     /*
02870      * make sure cache memory context exists
02871      */
02872     if (!CacheMemoryContext)
02873         CreateCacheMemoryContext();
02874 
02875     /*
02876      * create hashtable that indexes the relcache
02877      */
02878     MemSet(&ctl, 0, sizeof(ctl));
02879     ctl.keysize = sizeof(Oid);
02880     ctl.entrysize = sizeof(RelIdCacheEnt);
02881     ctl.hash = oid_hash;
02882     RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
02883                                   &ctl, HASH_ELEM | HASH_FUNCTION);
02884 
02885     /*
02886      * relation mapper needs to be initialized too
02887      */
02888     RelationMapInitialize();
02889 }
02890 
02891 /*
02892  *      RelationCacheInitializePhase2
02893  *
02894  *      This is called to prepare for access to shared catalogs during startup.
02895  *      We must at least set up nailed reldescs for pg_database, pg_authid,
02896  *      and pg_auth_members.  Ideally we'd like to have reldescs for their
02897  *      indexes, too.  We attempt to load this information from the shared
02898  *      relcache init file.  If that's missing or broken, just make phony
02899  *      entries for the catalogs themselves.  RelationCacheInitializePhase3
02900  *      will clean up as needed.
02901  */
02902 void
02903 RelationCacheInitializePhase2(void)
02904 {
02905     MemoryContext oldcxt;
02906 
02907     /*
02908      * relation mapper needs initialized too
02909      */
02910     RelationMapInitializePhase2();
02911 
02912     /*
02913      * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
02914      * nothing.
02915      */
02916     if (IsBootstrapProcessingMode())
02917         return;
02918 
02919     /*
02920      * switch to cache memory context
02921      */
02922     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
02923 
02924     /*
02925      * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
02926      * the cache with pre-made descriptors for the critical shared catalogs.
02927      */
02928     if (!load_relcache_init_file(true))
02929     {
02930         formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
02931                   true, Natts_pg_database, Desc_pg_database);
02932         formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
02933                   true, Natts_pg_authid, Desc_pg_authid);
02934         formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
02935                   false, Natts_pg_auth_members, Desc_pg_auth_members);
02936 
02937 #define NUM_CRITICAL_SHARED_RELS    3   /* fix if you change list above */
02938     }
02939 
02940     MemoryContextSwitchTo(oldcxt);
02941 }
02942 
02943 /*
02944  *      RelationCacheInitializePhase3
02945  *
02946  *      This is called as soon as the catcache and transaction system
02947  *      are functional and we have determined MyDatabaseId.  At this point
02948  *      we can actually read data from the database's system catalogs.
02949  *      We first try to read pre-computed relcache entries from the local
02950  *      relcache init file.  If that's missing or broken, make phony entries
02951  *      for the minimum set of nailed-in-cache relations.  Then (unless
02952  *      bootstrapping) make sure we have entries for the critical system
02953  *      indexes.  Once we've done all this, we have enough infrastructure to
02954  *      open any system catalog or use any catcache.  The last step is to
02955  *      rewrite the cache files if needed.
02956  */
02957 void
02958 RelationCacheInitializePhase3(void)
02959 {
02960     HASH_SEQ_STATUS status;
02961     RelIdCacheEnt *idhentry;
02962     MemoryContext oldcxt;
02963     bool        needNewCacheFile = !criticalSharedRelcachesBuilt;
02964 
02965     /*
02966      * relation mapper needs initialized too
02967      */
02968     RelationMapInitializePhase3();
02969 
02970     /*
02971      * switch to cache memory context
02972      */
02973     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
02974 
02975     /*
02976      * Try to load the local relcache cache file.  If unsuccessful, bootstrap
02977      * the cache with pre-made descriptors for the critical "nailed-in" system
02978      * catalogs.
02979      */
02980     if (IsBootstrapProcessingMode() ||
02981         !load_relcache_init_file(false))
02982     {
02983         needNewCacheFile = true;
02984 
02985         formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
02986                   true, Natts_pg_class, Desc_pg_class);
02987         formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
02988                   false, Natts_pg_attribute, Desc_pg_attribute);
02989         formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
02990                   true, Natts_pg_proc, Desc_pg_proc);
02991         formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
02992                   true, Natts_pg_type, Desc_pg_type);
02993 
02994 #define NUM_CRITICAL_LOCAL_RELS 4       /* fix if you change list above */
02995     }
02996 
02997     MemoryContextSwitchTo(oldcxt);
02998 
02999     /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
03000     if (IsBootstrapProcessingMode())
03001         return;
03002 
03003     /*
03004      * If we didn't get the critical system indexes loaded into relcache, do
03005      * so now.  These are critical because the catcache and/or opclass cache
03006      * depend on them for fetches done during relcache load.  Thus, we have an
03007      * infinite-recursion problem.  We can break the recursion by doing
03008      * heapscans instead of indexscans at certain key spots. To avoid hobbling
03009      * performance, we only want to do that until we have the critical indexes
03010      * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
03011      * decide whether to do heapscan or indexscan at the key spots, and we set
03012      * it true after we've loaded the critical indexes.
03013      *
03014      * The critical indexes are marked as "nailed in cache", partly to make it
03015      * easy for load_relcache_init_file to count them, but mainly because we
03016      * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
03017      * true.  (NOTE: perhaps it would be possible to reload them by
03018      * temporarily setting criticalRelcachesBuilt to false again.  For now,
03019      * though, we just nail 'em in.)
03020      *
03021      * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
03022      * in the same way as the others, because the critical catalogs don't
03023      * (currently) have any rules or triggers, and so these indexes can be
03024      * rebuilt without inducing recursion.  However they are used during
03025      * relcache load when a rel does have rules or triggers, so we choose to
03026      * nail them for performance reasons.
03027      */
03028     if (!criticalRelcachesBuilt)
03029     {
03030         load_critical_index(ClassOidIndexId,
03031                             RelationRelationId);
03032         load_critical_index(AttributeRelidNumIndexId,
03033                             AttributeRelationId);
03034         load_critical_index(IndexRelidIndexId,
03035                             IndexRelationId);
03036         load_critical_index(OpclassOidIndexId,
03037                             OperatorClassRelationId);
03038         load_critical_index(AccessMethodProcedureIndexId,
03039                             AccessMethodProcedureRelationId);
03040         load_critical_index(RewriteRelRulenameIndexId,
03041                             RewriteRelationId);
03042         load_critical_index(TriggerRelidNameIndexId,
03043                             TriggerRelationId);
03044 
03045 #define NUM_CRITICAL_LOCAL_INDEXES  7   /* fix if you change list above */
03046 
03047         criticalRelcachesBuilt = true;
03048     }
03049 
03050     /*
03051      * Process critical shared indexes too.
03052      *
03053      * DatabaseNameIndexId isn't critical for relcache loading, but rather for
03054      * initial lookup of MyDatabaseId, without which we'll never find any
03055      * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
03056      * database OID, so it instead depends on DatabaseOidIndexId.  We also
03057      * need to nail up some indexes on pg_authid and pg_auth_members for use
03058      * during client authentication.
03059      */
03060     if (!criticalSharedRelcachesBuilt)
03061     {
03062         load_critical_index(DatabaseNameIndexId,
03063                             DatabaseRelationId);
03064         load_critical_index(DatabaseOidIndexId,
03065                             DatabaseRelationId);
03066         load_critical_index(AuthIdRolnameIndexId,
03067                             AuthIdRelationId);
03068         load_critical_index(AuthIdOidIndexId,
03069                             AuthIdRelationId);
03070         load_critical_index(AuthMemMemRoleIndexId,
03071                             AuthMemRelationId);
03072 
03073 #define NUM_CRITICAL_SHARED_INDEXES 5   /* fix if you change list above */
03074 
03075         criticalSharedRelcachesBuilt = true;
03076     }
03077 
03078     /*
03079      * Now, scan all the relcache entries and update anything that might be
03080      * wrong in the results from formrdesc or the relcache cache file. If we
03081      * faked up relcache entries using formrdesc, then read the real pg_class
03082      * rows and replace the fake entries with them. Also, if any of the
03083      * relcache entries have rules or triggers, load that info the hard way
03084      * since it isn't recorded in the cache file.
03085      *
03086      * Whenever we access the catalogs to read data, there is a possibility of
03087      * a shared-inval cache flush causing relcache entries to be removed.
03088      * Since hash_seq_search only guarantees to still work after the *current*
03089      * entry is removed, it's unsafe to continue the hashtable scan afterward.
03090      * We handle this by restarting the scan from scratch after each access.
03091      * This is theoretically O(N^2), but the number of entries that actually
03092      * need to be fixed is small enough that it doesn't matter.
03093      */
03094     hash_seq_init(&status, RelationIdCache);
03095 
03096     while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
03097     {
03098         Relation    relation = idhentry->reldesc;
03099         bool        restart = false;
03100 
03101         /*
03102          * Make sure *this* entry doesn't get flushed while we work with it.
03103          */
03104         RelationIncrementReferenceCount(relation);
03105 
03106         /*
03107          * If it's a faked-up entry, read the real pg_class tuple.
03108          */
03109         if (relation->rd_rel->relowner == InvalidOid)
03110         {
03111             HeapTuple   htup;
03112             Form_pg_class relp;
03113 
03114             htup = SearchSysCache1(RELOID,
03115                                ObjectIdGetDatum(RelationGetRelid(relation)));
03116             if (!HeapTupleIsValid(htup))
03117                 elog(FATAL, "cache lookup failed for relation %u",
03118                      RelationGetRelid(relation));
03119             relp = (Form_pg_class) GETSTRUCT(htup);
03120 
03121             /*
03122              * Copy tuple to relation->rd_rel. (See notes in
03123              * AllocateRelationDesc())
03124              */
03125             memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
03126 
03127             /* Update rd_options while we have the tuple */
03128             if (relation->rd_options)
03129                 pfree(relation->rd_options);
03130             RelationParseRelOptions(relation, htup);
03131 
03132             /*
03133              * Check the values in rd_att were set up correctly.  (We cannot
03134              * just copy them over now: formrdesc must have set up the rd_att
03135              * data correctly to start with, because it may already have been
03136              * copied into one or more catcache entries.)
03137              */
03138             Assert(relation->rd_att->tdtypeid == relp->reltype);
03139             Assert(relation->rd_att->tdtypmod == -1);
03140             Assert(relation->rd_att->tdhasoid == relp->relhasoids);
03141 
03142             ReleaseSysCache(htup);
03143 
03144             /* relowner had better be OK now, else we'll loop forever */
03145             if (relation->rd_rel->relowner == InvalidOid)
03146                 elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
03147                      RelationGetRelationName(relation));
03148 
03149             restart = true;
03150         }
03151 
03152         /*
03153          * Fix data that isn't saved in relcache cache file.
03154          *
03155          * relhasrules or relhastriggers could possibly be wrong or out of
03156          * date.  If we don't actually find any rules or triggers, clear the
03157          * local copy of the flag so that we don't get into an infinite loop
03158          * here.  We don't make any attempt to fix the pg_class entry, though.
03159          */
03160         if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
03161         {
03162             RelationBuildRuleLock(relation);
03163             if (relation->rd_rules == NULL)
03164                 relation->rd_rel->relhasrules = false;
03165             restart = true;
03166         }
03167         if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
03168         {
03169             RelationBuildTriggers(relation);
03170             if (relation->trigdesc == NULL)
03171                 relation->rd_rel->relhastriggers = false;
03172             restart = true;
03173         }
03174 
03175         /* Release hold on the relation */
03176         RelationDecrementReferenceCount(relation);
03177 
03178         /* Now, restart the hashtable scan if needed */
03179         if (restart)
03180         {
03181             hash_seq_term(&status);
03182             hash_seq_init(&status, RelationIdCache);
03183         }
03184     }
03185 
03186     /*
03187      * Lastly, write out new relcache cache files if needed.  We don't bother
03188      * to distinguish cases where only one of the two needs an update.
03189      */
03190     if (needNewCacheFile)
03191     {
03192         /*
03193          * Force all the catcaches to finish initializing and thereby open the
03194          * catalogs and indexes they use.  This will preload the relcache with
03195          * entries for all the most important system catalogs and indexes, so
03196          * that the init files will be most useful for future backends.
03197          */
03198         InitCatalogCachePhase2();
03199 
03200         /* reset initFileRelationIds list; we'll fill it during write */
03201         initFileRelationIds = NIL;
03202 
03203         /* now write the files */
03204         write_relcache_init_file(true);
03205         write_relcache_init_file(false);
03206     }
03207 }
03208 
03209 /*
03210  * Load one critical system index into the relcache
03211  *
03212  * indexoid is the OID of the target index, heapoid is the OID of the catalog
03213  * it belongs to.
03214  */
03215 static void
03216 load_critical_index(Oid indexoid, Oid heapoid)
03217 {
03218     Relation    ird;
03219 
03220     /*
03221      * We must lock the underlying catalog before locking the index to avoid
03222      * deadlock, since RelationBuildDesc might well need to read the catalog,
03223      * and if anyone else is exclusive-locking this catalog and index they'll
03224      * be doing it in that order.
03225      */
03226     LockRelationOid(heapoid, AccessShareLock);
03227     LockRelationOid(indexoid, AccessShareLock);
03228     ird = RelationBuildDesc(indexoid, true);
03229     if (ird == NULL)
03230         elog(PANIC, "could not open critical system index %u", indexoid);
03231     ird->rd_isnailed = true;
03232     ird->rd_refcnt = 1;
03233     UnlockRelationOid(indexoid, AccessShareLock);
03234     UnlockRelationOid(heapoid, AccessShareLock);
03235 }
03236 
03237 /*
03238  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
03239  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
03240  *
03241  * We need this kluge because we have to be able to access non-fixed-width
03242  * fields of pg_class and pg_index before we have the standard catalog caches
03243  * available.  We use predefined data that's set up in just the same way as
03244  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
03245  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
03246  * does it have a TupleConstr field.  But it's good enough for the purpose of
03247  * extracting fields.
03248  */
03249 static TupleDesc
03250 BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
03251                          bool hasoids)
03252 {
03253     TupleDesc   result;
03254     MemoryContext oldcxt;
03255     int         i;
03256 
03257     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
03258 
03259     result = CreateTemplateTupleDesc(natts, hasoids);
03260     result->tdtypeid = RECORDOID;       /* not right, but we don't care */
03261     result->tdtypmod = -1;
03262 
03263     for (i = 0; i < natts; i++)
03264     {
03265         memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
03266         /* make sure attcacheoff is valid */
03267         result->attrs[i]->attcacheoff = -1;
03268     }
03269 
03270     /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
03271     result->attrs[0]->attcacheoff = 0;
03272 
03273     /* Note: we don't bother to set up a TupleConstr entry */
03274 
03275     MemoryContextSwitchTo(oldcxt);
03276 
03277     return result;
03278 }
03279 
03280 static TupleDesc
03281 GetPgClassDescriptor(void)
03282 {
03283     static TupleDesc pgclassdesc = NULL;
03284 
03285     /* Already done? */
03286     if (pgclassdesc == NULL)
03287         pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
03288                                                Desc_pg_class,
03289                                                true);
03290 
03291     return pgclassdesc;
03292 }
03293 
03294 static TupleDesc
03295 GetPgIndexDescriptor(void)
03296 {
03297     static TupleDesc pgindexdesc = NULL;
03298 
03299     /* Already done? */
03300     if (pgindexdesc == NULL)
03301         pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
03302                                                Desc_pg_index,
03303                                                false);
03304 
03305     return pgindexdesc;
03306 }
03307 
03308 /*
03309  * Load any default attribute value definitions for the relation.
03310  */
03311 static void
03312 AttrDefaultFetch(Relation relation)
03313 {
03314     AttrDefault *attrdef = relation->rd_att->constr->defval;
03315     int         ndef = relation->rd_att->constr->num_defval;
03316     Relation    adrel;
03317     SysScanDesc adscan;
03318     ScanKeyData skey;
03319     HeapTuple   htup;
03320     Datum       val;
03321     bool        isnull;
03322     int         found;
03323     int         i;
03324 
03325     ScanKeyInit(&skey,
03326                 Anum_pg_attrdef_adrelid,
03327                 BTEqualStrategyNumber, F_OIDEQ,
03328                 ObjectIdGetDatum(RelationGetRelid(relation)));
03329 
03330     adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
03331     adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
03332                                 SnapshotNow, 1, &skey);
03333     found = 0;
03334 
03335     while (HeapTupleIsValid(htup = systable_getnext(adscan)))
03336     {
03337         Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
03338 
03339         for (i = 0; i < ndef; i++)
03340         {
03341             if (adform->adnum != attrdef[i].adnum)
03342                 continue;
03343             if (attrdef[i].adbin != NULL)
03344                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
03345                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
03346                      RelationGetRelationName(relation));
03347             else
03348                 found++;
03349 
03350             val = fastgetattr(htup,
03351                               Anum_pg_attrdef_adbin,
03352                               adrel->rd_att, &isnull);
03353             if (isnull)
03354                 elog(WARNING, "null adbin for attr %s of rel %s",
03355                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
03356                      RelationGetRelationName(relation));
03357             else
03358                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
03359                                                    TextDatumGetCString(val));
03360             break;
03361         }
03362 
03363         if (i >= ndef)
03364             elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
03365                  adform->adnum, RelationGetRelationName(relation));
03366     }
03367 
03368     systable_endscan(adscan);
03369     heap_close(adrel, AccessShareLock);
03370 
03371     if (found != ndef)
03372         elog(WARNING, "%d attrdef record(s) missing for rel %s",
03373              ndef - found, RelationGetRelationName(relation));
03374 }
03375 
03376 /*
03377  * Load any check constraints for the relation.
03378  */
03379 static void
03380 CheckConstraintFetch(Relation relation)
03381 {
03382     ConstrCheck *check = relation->rd_att->constr->check;
03383     int         ncheck = relation->rd_att->constr->num_check;
03384     Relation    conrel;
03385     SysScanDesc conscan;
03386     ScanKeyData skey[1];
03387     HeapTuple   htup;
03388     Datum       val;
03389     bool        isnull;
03390     int         found = 0;
03391 
03392     ScanKeyInit(&skey[0],
03393                 Anum_pg_constraint_conrelid,
03394                 BTEqualStrategyNumber, F_OIDEQ,
03395                 ObjectIdGetDatum(RelationGetRelid(relation)));
03396 
03397     conrel = heap_open(ConstraintRelationId, AccessShareLock);
03398     conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
03399                                  SnapshotNow, 1, skey);
03400 
03401     while (HeapTupleIsValid(htup = systable_getnext(conscan)))
03402     {
03403         Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
03404 
03405         /* We want check constraints only */
03406         if (conform->contype != CONSTRAINT_CHECK)
03407             continue;
03408 
03409         if (found >= ncheck)
03410             elog(ERROR, "unexpected constraint record found for rel %s",
03411                  RelationGetRelationName(relation));
03412 
03413         check[found].ccvalid = conform->convalidated;
03414         check[found].ccnoinherit = conform->connoinherit;
03415         check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
03416                                                   NameStr(conform->conname));
03417 
03418         /* Grab and test conbin is actually set */
03419         val = fastgetattr(htup,
03420                           Anum_pg_constraint_conbin,
03421                           conrel->rd_att, &isnull);
03422         if (isnull)
03423             elog(ERROR, "null conbin for rel %s",
03424                  RelationGetRelationName(relation));
03425 
03426         check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
03427                                                  TextDatumGetCString(val));
03428         found++;
03429     }
03430 
03431     systable_endscan(conscan);
03432     heap_close(conrel, AccessShareLock);
03433 
03434     if (found != ncheck)
03435         elog(ERROR, "%d constraint record(s) missing for rel %s",
03436              ncheck - found, RelationGetRelationName(relation));
03437 }
03438 
03439 /*
03440  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
03441  *
03442  * The index list is created only if someone requests it.  We scan pg_index
03443  * to find relevant indexes, and add the list to the relcache entry so that
03444  * we won't have to compute it again.  Note that shared cache inval of a
03445  * relcache entry will delete the old list and set rd_indexvalid to 0,
03446  * so that we must recompute the index list on next request.  This handles
03447  * creation or deletion of an index.
03448  *
03449  * Indexes that are marked not IndexIsLive are omitted from the returned list.
03450  * Such indexes are expected to be dropped momentarily, and should not be
03451  * touched at all by any caller of this function.
03452  *
03453  * The returned list is guaranteed to be sorted in order by OID.  This is
03454  * needed by the executor, since for index types that we obtain exclusive
03455  * locks on when updating the index, all backends must lock the indexes in
03456  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
03457  * consistent ordering would do, but ordering by OID is easy.
03458  *
03459  * Since shared cache inval causes the relcache's copy of the list to go away,
03460  * we return a copy of the list palloc'd in the caller's context.  The caller
03461  * may list_free() the returned list after scanning it. This is necessary
03462  * since the caller will typically be doing syscache lookups on the relevant
03463  * indexes, and syscache lookup could cause SI messages to be processed!
03464  *
03465  * We also update rd_oidindex, which this module treats as effectively part
03466  * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
03467  * it is the pg_class OID of a unique index on OID when the relation has one,
03468  * and InvalidOid if there is no such index.
03469  */
03470 List *
03471 RelationGetIndexList(Relation relation)
03472 {
03473     Relation    indrel;
03474     SysScanDesc indscan;
03475     ScanKeyData skey;
03476     HeapTuple   htup;
03477     List       *result;
03478     Oid         oidIndex;
03479     MemoryContext oldcxt;
03480 
03481     /* Quick exit if we already computed the list. */
03482     if (relation->rd_indexvalid != 0)
03483         return list_copy(relation->rd_indexlist);
03484 
03485     /*
03486      * We build the list we intend to return (in the caller's context) while
03487      * doing the scan.  After successfully completing the scan, we copy that
03488      * list into the relcache entry.  This avoids cache-context memory leakage
03489      * if we get some sort of error partway through.
03490      */
03491     result = NIL;
03492     oidIndex = InvalidOid;
03493 
03494     /* Prepare to scan pg_index for entries having indrelid = this rel. */
03495     ScanKeyInit(&skey,
03496                 Anum_pg_index_indrelid,
03497                 BTEqualStrategyNumber, F_OIDEQ,
03498                 ObjectIdGetDatum(RelationGetRelid(relation)));
03499 
03500     indrel = heap_open(IndexRelationId, AccessShareLock);
03501     indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
03502                                  SnapshotNow, 1, &skey);
03503 
03504     while (HeapTupleIsValid(htup = systable_getnext(indscan)))
03505     {
03506         Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
03507         Datum       indclassDatum;
03508         oidvector  *indclass;
03509         bool        isnull;
03510 
03511         /*
03512          * Ignore any indexes that are currently being dropped.  This will
03513          * prevent them from being searched, inserted into, or considered in
03514          * HOT-safety decisions.  It's unsafe to touch such an index at all
03515          * since its catalog entries could disappear at any instant.
03516          */
03517         if (!IndexIsLive(index))
03518             continue;
03519 
03520         /* Add index's OID to result list in the proper order */
03521         result = insert_ordered_oid(result, index->indexrelid);
03522 
03523         /*
03524          * indclass cannot be referenced directly through the C struct,
03525          * because it comes after the variable-width indkey field.  Must
03526          * extract the datum the hard way...
03527          */
03528         indclassDatum = heap_getattr(htup,
03529                                      Anum_pg_index_indclass,
03530                                      GetPgIndexDescriptor(),
03531                                      &isnull);
03532         Assert(!isnull);
03533         indclass = (oidvector *) DatumGetPointer(indclassDatum);
03534 
03535         /* Check to see if it is a unique, non-partial btree index on OID */
03536         if (IndexIsValid(index) &&
03537             index->indnatts == 1 &&
03538             index->indisunique && index->indimmediate &&
03539             index->indkey.values[0] == ObjectIdAttributeNumber &&
03540             indclass->values[0] == OID_BTREE_OPS_OID &&
03541             heap_attisnull(htup, Anum_pg_index_indpred))
03542             oidIndex = index->indexrelid;
03543     }
03544 
03545     systable_endscan(indscan);
03546     heap_close(indrel, AccessShareLock);
03547 
03548     /* Now save a copy of the completed list in the relcache entry. */
03549     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
03550     relation->rd_indexlist = list_copy(result);
03551     relation->rd_oidindex = oidIndex;
03552     relation->rd_indexvalid = 1;
03553     MemoryContextSwitchTo(oldcxt);
03554 
03555     return result;
03556 }
03557 
03558 /*
03559  * insert_ordered_oid
03560  *      Insert a new Oid into a sorted list of Oids, preserving ordering
03561  *
03562  * Building the ordered list this way is O(N^2), but with a pretty small
03563  * constant, so for the number of entries we expect it will probably be
03564  * faster than trying to apply qsort().  Most tables don't have very many
03565  * indexes...
03566  */
03567 static List *
03568 insert_ordered_oid(List *list, Oid datum)
03569 {
03570     ListCell   *prev;
03571 
03572     /* Does the datum belong at the front? */
03573     if (list == NIL || datum < linitial_oid(list))
03574         return lcons_oid(datum, list);
03575     /* No, so find the entry it belongs after */
03576     prev = list_head(list);
03577     for (;;)
03578     {
03579         ListCell   *curr = lnext(prev);
03580 
03581         if (curr == NULL || datum < lfirst_oid(curr))
03582             break;              /* it belongs after 'prev', before 'curr' */
03583 
03584         prev = curr;
03585     }
03586     /* Insert datum into list after 'prev' */
03587     lappend_cell_oid(list, prev, datum);
03588     return list;
03589 }
03590 
03591 /*
03592  * RelationSetIndexList -- externally force the index list contents
03593  *
03594  * This is used to temporarily override what we think the set of valid
03595  * indexes is (including the presence or absence of an OID index).
03596  * The forcing will be valid only until transaction commit or abort.
03597  *
03598  * This should only be applied to nailed relations, because in a non-nailed
03599  * relation the hacked index list could be lost at any time due to SI
03600  * messages.  In practice it is only used on pg_class (see REINDEX).
03601  *
03602  * It is up to the caller to make sure the given list is correctly ordered.
03603  *
03604  * We deliberately do not change rd_indexattr here: even when operating
03605  * with a temporary partial index list, HOT-update decisions must be made
03606  * correctly with respect to the full index set.  It is up to the caller
03607  * to ensure that a correct rd_indexattr set has been cached before first
03608  * calling RelationSetIndexList; else a subsequent inquiry might cause a
03609  * wrong rd_indexattr set to get computed and cached.
03610  */
03611 void
03612 RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
03613 {
03614     MemoryContext oldcxt;
03615 
03616     Assert(relation->rd_isnailed);
03617     /* Copy the list into the cache context (could fail for lack of mem) */
03618     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
03619     indexIds = list_copy(indexIds);
03620     MemoryContextSwitchTo(oldcxt);
03621     /* Okay to replace old list */
03622     list_free(relation->rd_indexlist);
03623     relation->rd_indexlist = indexIds;
03624     relation->rd_oidindex = oidIndex;
03625     relation->rd_indexvalid = 2;    /* mark list as forced */
03626     /* Flag relation as needing eoxact cleanup (to reset the list) */
03627     EOXactListAdd(relation);
03628 }
03629 
03630 /*
03631  * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
03632  *
03633  * Returns InvalidOid if there is no such index.
03634  */
03635 Oid
03636 RelationGetOidIndex(Relation relation)
03637 {
03638     List       *ilist;
03639 
03640     /*
03641      * If relation doesn't have OIDs at all, caller is probably confused. (We
03642      * could just silently return InvalidOid, but it seems better to throw an
03643      * assertion.)
03644      */
03645     Assert(relation->rd_rel->relhasoids);
03646 
03647     if (relation->rd_indexvalid == 0)
03648     {
03649         /* RelationGetIndexList does the heavy lifting. */
03650         ilist = RelationGetIndexList(relation);
03651         list_free(ilist);
03652         Assert(relation->rd_indexvalid != 0);
03653     }
03654 
03655     return relation->rd_oidindex;
03656 }
03657 
03658 /*
03659  * RelationGetIndexExpressions -- get the index expressions for an index
03660  *
03661  * We cache the result of transforming pg_index.indexprs into a node tree.
03662  * If the rel is not an index or has no expressional columns, we return NIL.
03663  * Otherwise, the returned tree is copied into the caller's memory context.
03664  * (We don't want to return a pointer to the relcache copy, since it could
03665  * disappear due to relcache invalidation.)
03666  */
03667 List *
03668 RelationGetIndexExpressions(Relation relation)
03669 {
03670     List       *result;
03671     Datum       exprsDatum;
03672     bool        isnull;
03673     char       *exprsString;
03674     MemoryContext oldcxt;
03675 
03676     /* Quick exit if we already computed the result. */
03677     if (relation->rd_indexprs)
03678         return (List *) copyObject(relation->rd_indexprs);
03679 
03680     /* Quick exit if there is nothing to do. */
03681     if (relation->rd_indextuple == NULL ||
03682         heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
03683         return NIL;
03684 
03685     /*
03686      * We build the tree we intend to return in the caller's context. After
03687      * successfully completing the work, we copy it into the relcache entry.
03688      * This avoids problems if we get some sort of error partway through.
03689      */
03690     exprsDatum = heap_getattr(relation->rd_indextuple,
03691                               Anum_pg_index_indexprs,
03692                               GetPgIndexDescriptor(),
03693                               &isnull);
03694     Assert(!isnull);
03695     exprsString = TextDatumGetCString(exprsDatum);
03696     result = (List *) stringToNode(exprsString);
03697     pfree(exprsString);
03698 
03699     /*
03700      * Run the expressions through eval_const_expressions. This is not just an
03701      * optimization, but is necessary, because the planner will be comparing
03702      * them to similarly-processed qual clauses, and may fail to detect valid
03703      * matches without this.  We don't bother with canonicalize_qual, however.
03704      */
03705     result = (List *) eval_const_expressions(NULL, (Node *) result);
03706 
03707     /* May as well fix opfuncids too */
03708     fix_opfuncids((Node *) result);
03709 
03710     /* Now save a copy of the completed tree in the relcache entry. */
03711     oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
03712     relation->rd_indexprs = (List *) copyObject(result);
03713     MemoryContextSwitchTo(oldcxt);
03714 
03715     return result;
03716 }
03717 
03718 /*
03719  * RelationGetIndexPredicate -- get the index predicate for an index
03720  *
03721  * We cache the result of transforming pg_index.indpred into an implicit-AND
03722  * node tree (suitable for ExecQual).
03723  * If the rel is not an index or has no predicate, we return NIL.
03724  * Otherwise, the returned tree is copied into the caller's memory context.
03725  * (We don't want to return a pointer to the relcache copy, since it could
03726  * disappear due to relcache invalidation.)
03727  */
03728 List *
03729 RelationGetIndexPredicate(Relation relation)
03730 {
03731     List       *result;
03732     Datum       predDatum;
03733     bool        isnull;
03734     char       *predString;
03735     MemoryContext oldcxt;
03736 
03737     /* Quick exit if we already computed the result. */
03738     if (relation->rd_indpred)
03739         return (List *) copyObject(relation->rd_indpred);
03740 
03741     /* Quick exit if there is nothing to do. */
03742     if (relation->rd_indextuple == NULL ||
03743         heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
03744         return NIL;
03745 
03746     /*
03747      * We build the tree we intend to return in the caller's context. After
03748      * successfully completing the work, we copy it into the relcache entry.
03749      * This avoids problems if we get some sort of error partway through.
03750      */
03751     predDatum = heap_getattr(relation->rd_indextuple,
03752                              Anum_pg_index_indpred,
03753                              GetPgIndexDescriptor(),
03754                              &isnull);
03755     Assert(!isnull);
03756     predString = TextDatumGetCString(predDatum);
03757     result = (List *) stringToNode(predString);
03758     pfree(predString);
03759 
03760     /*
03761      * Run the expression through const-simplification and canonicalization.
03762      * This is not just an optimization, but is necessary, because the planner
03763      * will be comparing it to similarly-processed qual clauses, and may fail
03764      * to detect valid matches without this.  This must match the processing
03765      * done to qual clauses in preprocess_expression()!  (We can skip the
03766      * stuff involving subqueries, however, since we don't allow any in index
03767      * predicates.)
03768      */
03769     result = (List *) eval_const_expressions(NULL, (Node *) result);
03770 
03771     result = (List *) canonicalize_qual((Expr *) result);
03772 
03773     /* Also convert to implicit-AND format */
03774     result = make_ands_implicit((Expr *) result);
03775 
03776     /* May as well fix opfuncids too */
03777     fix_opfuncids((Node *) result);
03778 
03779     /* Now save a copy of the completed tree in the relcache entry. */
03780     oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
03781     relation->rd_indpred = (List *) copyObject(result);
03782     MemoryContextSwitchTo(oldcxt);
03783 
03784     return result;
03785 }
03786 
03787 /*
03788  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
03789  *
03790  * The result has a bit set for each attribute used anywhere in the index
03791  * definitions of all the indexes on this relation.  (This includes not only
03792  * simple index keys, but attributes used in expressions and partial-index
03793  * predicates.)
03794  *
03795  * If "keyAttrs" is true, only attributes that can be referenced by foreign
03796  * keys are considered.
03797  *
03798  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
03799  * we can include system attributes (e.g., OID) in the bitmap representation.
03800  *
03801  * Caller had better hold at least RowExclusiveLock on the target relation
03802  * to ensure that it has a stable set of indexes.  This also makes it safe
03803  * (deadlock-free) for us to take locks on the relation's indexes.
03804  *
03805  * The returned result is palloc'd in the caller's memory context and should
03806  * be bms_free'd when not needed anymore.
03807  */
03808 Bitmapset *
03809 RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
03810 {
03811     Bitmapset  *indexattrs;
03812     Bitmapset  *uindexattrs;
03813     List       *indexoidlist;
03814     ListCell   *l;
03815     MemoryContext oldcxt;
03816 
03817     /* Quick exit if we already computed the result. */
03818     if (relation->rd_indexattr != NULL)
03819         return bms_copy(keyAttrs ? relation->rd_keyattr : relation->rd_indexattr);
03820 
03821     /* Fast path if definitely no indexes */
03822     if (!RelationGetForm(relation)->relhasindex)
03823         return NULL;
03824 
03825     /*
03826      * Get cached list of index OIDs
03827      */
03828     indexoidlist = RelationGetIndexList(relation);
03829 
03830     /* Fall out if no indexes (but relhasindex was set) */
03831     if (indexoidlist == NIL)
03832         return NULL;
03833 
03834     /*
03835      * For each index, add referenced attributes to indexattrs.
03836      *
03837      * Note: we consider all indexes returned by RelationGetIndexList, even if
03838      * they are not indisready or indisvalid.  This is important because an
03839      * index for which CREATE INDEX CONCURRENTLY has just started must be
03840      * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
03841      * CONCURRENTLY is far enough along that we should ignore the index, it
03842      * won't be returned at all by RelationGetIndexList.
03843      */
03844     indexattrs = NULL;
03845     uindexattrs = NULL;
03846     foreach(l, indexoidlist)
03847     {
03848         Oid         indexOid = lfirst_oid(l);
03849         Relation    indexDesc;
03850         IndexInfo  *indexInfo;
03851         int         i;
03852         bool        isKey;
03853 
03854         indexDesc = index_open(indexOid, AccessShareLock);
03855 
03856         /* Extract index key information from the index's pg_index row */
03857         indexInfo = BuildIndexInfo(indexDesc);
03858 
03859         /* Can this index be referenced by a foreign key? */
03860         isKey = indexInfo->ii_Unique &&
03861                 indexInfo->ii_Expressions == NIL &&
03862                 indexInfo->ii_Predicate == NIL;
03863 
03864         /* Collect simple attribute references */
03865         for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
03866         {
03867             int         attrnum = indexInfo->ii_KeyAttrNumbers[i];
03868 
03869             if (attrnum != 0)
03870             {
03871                 indexattrs = bms_add_member(indexattrs,
03872                                attrnum - FirstLowInvalidHeapAttributeNumber);
03873                 if (isKey)
03874                     uindexattrs = bms_add_member(uindexattrs,
03875                                                  attrnum - FirstLowInvalidHeapAttributeNumber);
03876             }
03877         }
03878 
03879         /* Collect all attributes used in expressions, too */
03880         pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
03881 
03882         /* Collect all attributes in the index predicate, too */
03883         pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
03884 
03885         index_close(indexDesc, AccessShareLock);
03886     }
03887 
03888     list_free(indexoidlist);
03889 
03890     /* Now save a copy of the bitmap in the relcache entry. */
03891     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
03892     relation->rd_indexattr = bms_copy(indexattrs);
03893     relation->rd_keyattr = bms_copy(uindexattrs);
03894     MemoryContextSwitchTo(oldcxt);
03895 
03896     /* We return our original working copy for caller to play with */
03897     return keyAttrs ? uindexattrs : indexattrs;
03898 }
03899 
03900 /*
03901  * RelationGetExclusionInfo -- get info about index's exclusion constraint
03902  *
03903  * This should be called only for an index that is known to have an
03904  * associated exclusion constraint.  It returns arrays (palloc'd in caller's
03905  * context) of the exclusion operator OIDs, their underlying functions'
03906  * OIDs, and their strategy numbers in the index's opclasses.  We cache
03907  * all this information since it requires a fair amount of work to get.
03908  */
03909 void
03910 RelationGetExclusionInfo(Relation indexRelation,
03911                          Oid **operators,
03912                          Oid **procs,
03913                          uint16 **strategies)
03914 {
03915     int         ncols = indexRelation->rd_rel->relnatts;
03916     Oid        *ops;
03917     Oid        *funcs;
03918     uint16     *strats;
03919     Relation    conrel;
03920     SysScanDesc conscan;
03921     ScanKeyData skey[1];
03922     HeapTuple   htup;
03923     bool        found;
03924     MemoryContext oldcxt;
03925     int         i;
03926 
03927     /* Allocate result space in caller context */
03928     *operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
03929     *procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
03930     *strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
03931 
03932     /* Quick exit if we have the data cached already */
03933     if (indexRelation->rd_exclstrats != NULL)
03934     {
03935         memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
03936         memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
03937         memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
03938         return;
03939     }
03940 
03941     /*
03942      * Search pg_constraint for the constraint associated with the index. To
03943      * make this not too painfully slow, we use the index on conrelid; that
03944      * will hold the parent relation's OID not the index's own OID.
03945      */
03946     ScanKeyInit(&skey[0],
03947                 Anum_pg_constraint_conrelid,
03948                 BTEqualStrategyNumber, F_OIDEQ,
03949                 ObjectIdGetDatum(indexRelation->rd_index->indrelid));
03950 
03951     conrel = heap_open(ConstraintRelationId, AccessShareLock);
03952     conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
03953                                  SnapshotNow, 1, skey);
03954     found = false;
03955 
03956     while (HeapTupleIsValid(htup = systable_getnext(conscan)))
03957     {
03958         Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
03959         Datum       val;
03960         bool        isnull;
03961         ArrayType  *arr;
03962         int         nelem;
03963 
03964         /* We want the exclusion constraint owning the index */
03965         if (conform->contype != CONSTRAINT_EXCLUSION ||
03966             conform->conindid != RelationGetRelid(indexRelation))
03967             continue;
03968 
03969         /* There should be only one */
03970         if (found)
03971             elog(ERROR, "unexpected exclusion constraint record found for rel %s",
03972                  RelationGetRelationName(indexRelation));
03973         found = true;
03974 
03975         /* Extract the operator OIDS from conexclop */
03976         val = fastgetattr(htup,
03977                           Anum_pg_constraint_conexclop,
03978                           conrel->rd_att, &isnull);
03979         if (isnull)
03980             elog(ERROR, "null conexclop for rel %s",
03981                  RelationGetRelationName(indexRelation));
03982 
03983         arr = DatumGetArrayTypeP(val);  /* ensure not toasted */
03984         nelem = ARR_DIMS(arr)[0];
03985         if (ARR_NDIM(arr) != 1 ||
03986             nelem != ncols ||
03987             ARR_HASNULL(arr) ||
03988             ARR_ELEMTYPE(arr) != OIDOID)
03989             elog(ERROR, "conexclop is not a 1-D Oid array");
03990 
03991         memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
03992     }
03993 
03994     systable_endscan(conscan);
03995     heap_close(conrel, AccessShareLock);
03996 
03997     if (!found)
03998         elog(ERROR, "exclusion constraint record missing for rel %s",
03999              RelationGetRelationName(indexRelation));
04000 
04001     /* We need the func OIDs and strategy numbers too */
04002     for (i = 0; i < ncols; i++)
04003     {
04004         funcs[i] = get_opcode(ops[i]);
04005         strats[i] = get_op_opfamily_strategy(ops[i],
04006                                              indexRelation->rd_opfamily[i]);
04007         /* shouldn't fail, since it was checked at index creation */
04008         if (strats[i] == InvalidStrategy)
04009             elog(ERROR, "could not find strategy for operator %u in family %u",
04010                  ops[i], indexRelation->rd_opfamily[i]);
04011     }
04012 
04013     /* Save a copy of the results in the relcache entry. */
04014     oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
04015     indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
04016     indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
04017     indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
04018     memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
04019     memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
04020     memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
04021     MemoryContextSwitchTo(oldcxt);
04022 }
04023 
04024 
04025 /*
04026  * Routines to support ereport() reports of relation-related errors
04027  *
04028  * These could have been put into elog.c, but it seems like a module layering
04029  * violation to have elog.c calling relcache or syscache stuff --- and we
04030  * definitely don't want elog.h including rel.h.  So we put them here.
04031  */
04032 
04033 /*
04034  * errtable --- stores schema_name and table_name of a table
04035  * within the current errordata.
04036  */
04037 int
04038 errtable(Relation rel)
04039 {
04040     err_generic_string(PG_DIAG_SCHEMA_NAME,
04041                        get_namespace_name(RelationGetNamespace(rel)));
04042     err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));
04043 
04044     return 0;           /* return value does not matter */
04045 }
04046 
04047 /*
04048  * errtablecol --- stores schema_name, table_name and column_name
04049  * of a table column within the current errordata.
04050  *
04051  * The column is specified by attribute number --- for most callers, this is
04052  * easier and less error-prone than getting the column name for themselves.
04053  */
04054 int
04055 errtablecol(Relation rel, int attnum)
04056 {
04057     TupleDesc   reldesc = RelationGetDescr(rel);
04058     const char *colname;
04059 
04060     /* Use reldesc if it's a user attribute, else consult the catalogs */
04061     if (attnum > 0 && attnum <= reldesc->natts)
04062         colname = NameStr(reldesc->attrs[attnum - 1]->attname);
04063     else
04064         colname = get_relid_attribute_name(RelationGetRelid(rel), attnum);
04065 
04066     return errtablecolname(rel, colname);
04067 }
04068 
04069 /*
04070  * errtablecolname --- stores schema_name, table_name and column_name
04071  * of a table column within the current errordata, where the column name is
04072  * given directly rather than extracted from the relation's catalog data.
04073  *
04074  * Don't use this directly unless errtablecol() is inconvenient for some
04075  * reason.  This might possibly be needed during intermediate states in ALTER
04076  * TABLE, for instance.
04077  */
04078 int
04079 errtablecolname(Relation rel, const char *colname)
04080 {
04081     errtable(rel);
04082     err_generic_string(PG_DIAG_COLUMN_NAME, colname);
04083 
04084     return 0;           /* return value does not matter */
04085 }
04086 
04087 /*
04088  * errtableconstraint --- stores schema_name, table_name and constraint_name
04089  * of a table-related constraint within the current errordata.
04090  */
04091 int
04092 errtableconstraint(Relation rel, const char *conname)
04093 {
04094     errtable(rel);
04095     err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);
04096 
04097     return 0;           /* return value does not matter */
04098 }
04099 
04100 
04101 /*
04102  *  load_relcache_init_file, write_relcache_init_file
04103  *
04104  *      In late 1992, we started regularly having databases with more than
04105  *      a thousand classes in them.  With this number of classes, it became
04106  *      critical to do indexed lookups on the system catalogs.
04107  *
04108  *      Bootstrapping these lookups is very hard.  We want to be able to
04109  *      use an index on pg_attribute, for example, but in order to do so,
04110  *      we must have read pg_attribute for the attributes in the index,
04111  *      which implies that we need to use the index.
04112  *
04113  *      In order to get around the problem, we do the following:
04114  *
04115  *         +  When the database system is initialized (at initdb time), we
04116  *            don't use indexes.  We do sequential scans.
04117  *
04118  *         +  When the backend is started up in normal mode, we load an image
04119  *            of the appropriate relation descriptors, in internal format,
04120  *            from an initialization file in the data/base/... directory.
04121  *
04122  *         +  If the initialization file isn't there, then we create the
04123  *            relation descriptors using sequential scans and write 'em to
04124  *            the initialization file for use by subsequent backends.
04125  *
04126  *      As of Postgres 9.0, there is one local initialization file in each
04127  *      database, plus one shared initialization file for shared catalogs.
04128  *
04129  *      We could dispense with the initialization files and just build the
04130  *      critical reldescs the hard way on every backend startup, but that
04131  *      slows down backend startup noticeably.
04132  *
04133  *      We can in fact go further, and save more relcache entries than
04134  *      just the ones that are absolutely critical; this allows us to speed
04135  *      up backend startup by not having to build such entries the hard way.
04136  *      Presently, all the catalog and index entries that are referred to
04137  *      by catcaches are stored in the initialization files.
04138  *
04139  *      The same mechanism that detects when catcache and relcache entries
04140  *      need to be invalidated (due to catalog updates) also arranges to
04141  *      unlink the initialization files when the contents may be out of date.
04142  *      The files will then be rebuilt during the next backend startup.
04143  */
04144 
04145 /*
04146  * load_relcache_init_file -- attempt to load cache from the shared
04147  * or local cache init file
04148  *
04149  * If successful, return TRUE and set criticalRelcachesBuilt or
04150  * criticalSharedRelcachesBuilt to true.
04151  * If not successful, return FALSE.
04152  *
04153  * NOTE: we assume we are already switched into CacheMemoryContext.
04154  */
04155 static bool
04156 load_relcache_init_file(bool shared)
04157 {
04158     FILE       *fp;
04159     char        initfilename[MAXPGPATH];
04160     Relation   *rels;
04161     int         relno,
04162                 num_rels,
04163                 max_rels,
04164                 nailed_rels,
04165                 nailed_indexes,
04166                 magic;
04167     int         i;
04168 
04169     if (shared)
04170         snprintf(initfilename, sizeof(initfilename), "global/%s",
04171                  RELCACHE_INIT_FILENAME);
04172     else
04173         snprintf(initfilename, sizeof(initfilename), "%s/%s",
04174                  DatabasePath, RELCACHE_INIT_FILENAME);
04175 
04176     fp = AllocateFile(initfilename, PG_BINARY_R);
04177     if (fp == NULL)
04178         return false;
04179 
04180     /*
04181      * Read the index relcache entries from the file.  Note we will not enter
04182      * any of them into the cache if the read fails partway through; this
04183      * helps to guard against broken init files.
04184      */
04185     max_rels = 100;
04186     rels = (Relation *) palloc(max_rels * sizeof(Relation));
04187     num_rels = 0;
04188     nailed_rels = nailed_indexes = 0;
04189 
04190     /* check for correct magic number (compatible version) */
04191     if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
04192         goto read_failed;
04193     if (magic != RELCACHE_INIT_FILEMAGIC)
04194         goto read_failed;
04195 
04196     for (relno = 0;; relno++)
04197     {
04198         Size        len;
04199         size_t      nread;
04200         Relation    rel;
04201         Form_pg_class relform;
04202         bool        has_not_null;
04203 
04204         /* first read the relation descriptor length */
04205         nread = fread(&len, 1, sizeof(len), fp);
04206         if (nread != sizeof(len))
04207         {
04208             if (nread == 0)
04209                 break;          /* end of file */
04210             goto read_failed;
04211         }
04212 
04213         /* safety check for incompatible relcache layout */
04214         if (len != sizeof(RelationData))
04215             goto read_failed;
04216 
04217         /* allocate another relcache header */
04218         if (num_rels >= max_rels)
04219         {
04220             max_rels *= 2;
04221             rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
04222         }
04223 
04224         rel = rels[num_rels++] = (Relation) palloc(len);
04225 
04226         /* then, read the Relation structure */
04227         if (fread(rel, 1, len, fp) != len)
04228             goto read_failed;
04229 
04230         /* next read the relation tuple form */
04231         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04232             goto read_failed;
04233 
04234         relform = (Form_pg_class) palloc(len);
04235         if (fread(relform, 1, len, fp) != len)
04236             goto read_failed;
04237 
04238         rel->rd_rel = relform;
04239 
04240         /* initialize attribute tuple forms */
04241         rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
04242                                               relform->relhasoids);
04243         rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
04244 
04245         rel->rd_att->tdtypeid = relform->reltype;
04246         rel->rd_att->tdtypmod = -1;     /* unnecessary, but... */
04247 
04248         /* next read all the attribute tuple form data entries */
04249         has_not_null = false;
04250         for (i = 0; i < relform->relnatts; i++)
04251         {
04252             if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04253                 goto read_failed;
04254             if (len != ATTRIBUTE_FIXED_PART_SIZE)
04255                 goto read_failed;
04256             if (fread(rel->rd_att->attrs[i], 1, len, fp) != len)
04257                 goto read_failed;
04258 
04259             has_not_null |= rel->rd_att->attrs[i]->attnotnull;
04260         }
04261 
04262         /* next read the access method specific field */
04263         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04264             goto read_failed;
04265         if (len > 0)
04266         {
04267             rel->rd_options = palloc(len);
04268             if (fread(rel->rd_options, 1, len, fp) != len)
04269                 goto read_failed;
04270             if (len != VARSIZE(rel->rd_options))
04271                 goto read_failed;       /* sanity check */
04272         }
04273         else
04274         {
04275             rel->rd_options = NULL;
04276         }
04277 
04278         /* mark not-null status */
04279         if (has_not_null)
04280         {
04281             TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
04282 
04283             constr->has_not_null = true;
04284             rel->rd_att->constr = constr;
04285         }
04286 
04287         /* If it's an index, there's more to do */
04288         if (rel->rd_rel->relkind == RELKIND_INDEX)
04289         {
04290             Form_pg_am  am;
04291             MemoryContext indexcxt;
04292             Oid        *opfamily;
04293             Oid        *opcintype;
04294             RegProcedure *support;
04295             int         nsupport;
04296             int16      *indoption;
04297             Oid        *indcollation;
04298 
04299             /* Count nailed indexes to ensure we have 'em all */
04300             if (rel->rd_isnailed)
04301                 nailed_indexes++;
04302 
04303             /* next, read the pg_index tuple */
04304             if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04305                 goto read_failed;
04306 
04307             rel->rd_indextuple = (HeapTuple) palloc(len);
04308             if (fread(rel->rd_indextuple, 1, len, fp) != len)
04309                 goto read_failed;
04310 
04311             /* Fix up internal pointers in the tuple -- see heap_copytuple */
04312             rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
04313             rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
04314 
04315             /* next, read the access method tuple form */
04316             if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04317                 goto read_failed;
04318 
04319             am = (Form_pg_am) palloc(len);
04320             if (fread(am, 1, len, fp) != len)
04321                 goto read_failed;
04322             rel->rd_am = am;
04323 
04324             /*
04325              * prepare index info context --- parameters should match
04326              * RelationInitIndexAccessInfo
04327              */
04328             indexcxt = AllocSetContextCreate(CacheMemoryContext,
04329                                              RelationGetRelationName(rel),
04330                                              ALLOCSET_SMALL_MINSIZE,
04331                                              ALLOCSET_SMALL_INITSIZE,
04332                                              ALLOCSET_SMALL_MAXSIZE);
04333             rel->rd_indexcxt = indexcxt;
04334 
04335             /* next, read the vector of opfamily OIDs */
04336             if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04337                 goto read_failed;
04338 
04339             opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
04340             if (fread(opfamily, 1, len, fp) != len)
04341                 goto read_failed;
04342 
04343             rel->rd_opfamily = opfamily;
04344 
04345             /* next, read the vector of opcintype OIDs */
04346             if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04347                 goto read_failed;
04348 
04349             opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
04350             if (fread(opcintype, 1, len, fp) != len)
04351                 goto read_failed;
04352 
04353             rel->rd_opcintype = opcintype;
04354 
04355             /* next, read the vector of support procedure OIDs */
04356             if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04357                 goto read_failed;
04358             support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
04359             if (fread(support, 1, len, fp) != len)
04360                 goto read_failed;
04361 
04362             rel->rd_support = support;
04363 
04364             /* next, read the vector of collation OIDs */
04365             if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04366                 goto read_failed;
04367 
04368             indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
04369             if (fread(indcollation, 1, len, fp) != len)
04370                 goto read_failed;
04371 
04372             rel->rd_indcollation = indcollation;
04373 
04374             /* finally, read the vector of indoption values */
04375             if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
04376                 goto read_failed;
04377 
04378             indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
04379             if (fread(indoption, 1, len, fp) != len)
04380                 goto read_failed;
04381 
04382             rel->rd_indoption = indoption;
04383 
04384             /* set up zeroed fmgr-info vectors */
04385             rel->rd_aminfo = (RelationAmInfo *)
04386                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
04387             nsupport = relform->relnatts * am->amsupport;
04388             rel->rd_supportinfo = (FmgrInfo *)
04389                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
04390         }
04391         else
04392         {
04393             /* Count nailed rels to ensure we have 'em all */
04394             if (rel->rd_isnailed)
04395                 nailed_rels++;
04396 
04397             Assert(rel->rd_index == NULL);
04398             Assert(rel->rd_indextuple == NULL);
04399             Assert(rel->rd_am == NULL);
04400             Assert(rel->rd_indexcxt == NULL);
04401             Assert(rel->rd_aminfo == NULL);
04402             Assert(rel->rd_opfamily == NULL);
04403             Assert(rel->rd_opcintype == NULL);
04404             Assert(rel->rd_support == NULL);
04405             Assert(rel->rd_supportinfo == NULL);
04406             Assert(rel->rd_indoption == NULL);
04407             Assert(rel->rd_indcollation == NULL);
04408         }
04409 
04410         /*
04411          * Rules and triggers are not saved (mainly because the internal
04412          * format is complex and subject to change).  They must be rebuilt if
04413          * needed by RelationCacheInitializePhase3.  This is not expected to
04414          * be a big performance hit since few system catalogs have such. Ditto
04415          * for index expressions, predicates, exclusion info, and FDW info.
04416          */
04417         rel->rd_rules = NULL;
04418         rel->rd_rulescxt = NULL;
04419         rel->trigdesc = NULL;
04420         rel->rd_indexprs = NIL;
04421         rel->rd_indpred = NIL;
04422         rel->rd_exclops = NULL;
04423         rel->rd_exclprocs = NULL;
04424         rel->rd_exclstrats = NULL;
04425         rel->rd_fdwroutine = NULL;
04426 
04427         /*
04428          * Reset transient-state fields in the relcache entry
04429          */
04430         rel->rd_smgr = NULL;
04431         if (rel->rd_isnailed)
04432             rel->rd_refcnt = 1;
04433         else
04434             rel->rd_refcnt = 0;
04435         rel->rd_indexvalid = 0;
04436         rel->rd_indexlist = NIL;
04437         rel->rd_indexattr = NULL;
04438         rel->rd_oidindex = InvalidOid;
04439         rel->rd_createSubid = InvalidSubTransactionId;
04440         rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
04441         rel->rd_amcache = NULL;
04442         MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
04443 
04444         /*
04445          * Recompute lock and physical addressing info.  This is needed in
04446          * case the pg_internal.init file was copied from some other database
04447          * by CREATE DATABASE.
04448          */
04449         RelationInitLockInfo(rel);
04450         RelationInitPhysicalAddr(rel);
04451         if (rel->rd_rel->relkind == RELKIND_MATVIEW &&
04452             heap_is_matview_init_state(rel))
04453             rel->rd_ispopulated = false;
04454         else
04455             rel->rd_ispopulated = true;
04456     }
04457 
04458     /*
04459      * We reached the end of the init file without apparent problem. Did we
04460      * get the right number of nailed items?  (This is a useful crosscheck in
04461      * case the set of critical rels or indexes changes.)
04462      */
04463     if (shared)
04464     {
04465         if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
04466             nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
04467             goto read_failed;
04468     }
04469     else
04470     {
04471         if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
04472             nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
04473             goto read_failed;
04474     }
04475 
04476     /*
04477      * OK, all appears well.
04478      *
04479      * Now insert all the new relcache entries into the cache.
04480      */
04481     for (relno = 0; relno < num_rels; relno++)
04482     {
04483         RelationCacheInsert(rels[relno]);
04484         /* also make a list of their OIDs, for RelationIdIsInInitFile */
04485         if (!shared)
04486             initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
04487                                             initFileRelationIds);
04488     }
04489 
04490     pfree(rels);
04491     FreeFile(fp);
04492 
04493     if (shared)
04494         criticalSharedRelcachesBuilt = true;
04495     else
04496         criticalRelcachesBuilt = true;
04497     return true;
04498 
04499     /*
04500      * init file is broken, so do it the hard way.  We don't bother trying to
04501      * free the clutter we just allocated; it's not in the relcache so it
04502      * won't hurt.
04503      */
04504 read_failed:
04505     pfree(rels);
04506     FreeFile(fp);
04507 
04508     return false;
04509 }
04510 
04511 /*
04512  * Write out a new initialization file with the current contents
04513  * of the relcache (either shared rels or local rels, as indicated).
04514  */
04515 static void
04516 write_relcache_init_file(bool shared)
04517 {
04518     FILE       *fp;
04519     char        tempfilename[MAXPGPATH];
04520     char        finalfilename[MAXPGPATH];
04521     int         magic;
04522     HASH_SEQ_STATUS status;
04523     RelIdCacheEnt *idhentry;
04524     MemoryContext oldcxt;
04525     int         i;
04526 
04527     /*
04528      * We must write a temporary file and rename it into place. Otherwise,
04529      * another backend starting at about the same time might crash trying to
04530      * read the partially-complete file.
04531      */
04532     if (shared)
04533     {
04534         snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
04535                  RELCACHE_INIT_FILENAME, MyProcPid);
04536         snprintf(finalfilename, sizeof(finalfilename), "global/%s",
04537                  RELCACHE_INIT_FILENAME);
04538     }
04539     else
04540     {
04541         snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
04542                  DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
04543         snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
04544                  DatabasePath, RELCACHE_INIT_FILENAME);
04545     }
04546 
04547     unlink(tempfilename);       /* in case it exists w/wrong permissions */
04548 
04549     fp = AllocateFile(tempfilename, PG_BINARY_W);
04550     if (fp == NULL)
04551     {
04552         /*
04553          * We used to consider this a fatal error, but we might as well
04554          * continue with backend startup ...
04555          */
04556         ereport(WARNING,
04557                 (errcode_for_file_access(),
04558                  errmsg("could not create relation-cache initialization file \"%s\": %m",
04559                         tempfilename),
04560               errdetail("Continuing anyway, but there's something wrong.")));
04561         return;
04562     }
04563 
04564     /*
04565      * Write a magic number to serve as a file version identifier.  We can
04566      * change the magic number whenever the relcache layout changes.
04567      */
04568     magic = RELCACHE_INIT_FILEMAGIC;
04569     if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
04570         elog(FATAL, "could not write init file");
04571 
04572     /*
04573      * Write all the appropriate reldescs (in no particular order).
04574      */
04575     hash_seq_init(&status, RelationIdCache);
04576 
04577     while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
04578     {
04579         Relation    rel = idhentry->reldesc;
04580         Form_pg_class relform = rel->rd_rel;
04581 
04582         /* ignore if not correct group */
04583         if (relform->relisshared != shared)
04584             continue;
04585 
04586         /* first write the relcache entry proper */
04587         write_item(rel, sizeof(RelationData), fp);
04588 
04589         /* next write the relation tuple form */
04590         write_item(relform, CLASS_TUPLE_SIZE, fp);
04591 
04592         /* next, do all the attribute tuple form data entries */
04593         for (i = 0; i < relform->relnatts; i++)
04594         {
04595             write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
04596         }
04597 
04598         /* next, do the access method specific field */
04599         write_item(rel->rd_options,
04600                    (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
04601                    fp);
04602 
04603         /* If it's an index, there's more to do */
04604         if (rel->rd_rel->relkind == RELKIND_INDEX)
04605         {
04606             Form_pg_am  am = rel->rd_am;
04607 
04608             /* write the pg_index tuple */
04609             /* we assume this was created by heap_copytuple! */
04610             write_item(rel->rd_indextuple,
04611                        HEAPTUPLESIZE + rel->rd_indextuple->t_len,
04612                        fp);
04613 
04614             /* next, write the access method tuple form */
04615             write_item(am, sizeof(FormData_pg_am), fp);
04616 
04617             /* next, write the vector of opfamily OIDs */
04618             write_item(rel->rd_opfamily,
04619                        relform->relnatts * sizeof(Oid),
04620                        fp);
04621 
04622             /* next, write the vector of opcintype OIDs */
04623             write_item(rel->rd_opcintype,
04624                        relform->relnatts * sizeof(Oid),
04625                        fp);
04626 
04627             /* next, write the vector of support procedure OIDs */
04628             write_item(rel->rd_support,
04629                   relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
04630                        fp);
04631 
04632             /* next, write the vector of collation OIDs */
04633             write_item(rel->rd_indcollation,
04634                        relform->relnatts * sizeof(Oid),
04635                        fp);
04636 
04637             /* finally, write the vector of indoption values */
04638             write_item(rel->rd_indoption,
04639                        relform->relnatts * sizeof(int16),
04640                        fp);
04641         }
04642 
04643         /* also make a list of their OIDs, for RelationIdIsInInitFile */
04644         if (!shared)
04645         {
04646             oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
04647             initFileRelationIds = lcons_oid(RelationGetRelid(rel),
04648                                             initFileRelationIds);
04649             MemoryContextSwitchTo(oldcxt);
04650         }
04651     }
04652 
04653     if (FreeFile(fp))
04654         elog(FATAL, "could not write init file");
04655 
04656     /*
04657      * Now we have to check whether the data we've so painstakingly
04658      * accumulated is already obsolete due to someone else's just-committed
04659      * catalog changes.  If so, we just delete the temp file and leave it to
04660      * the next backend to try again.  (Our own relcache entries will be
04661      * updated by SI message processing, but we can't be sure whether what we
04662      * wrote out was up-to-date.)
04663      *
04664      * This mustn't run concurrently with the code that unlinks an init file
04665      * and sends SI messages, so grab a serialization lock for the duration.
04666      */
04667     LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
04668 
04669     /* Make sure we have seen all incoming SI messages */
04670     AcceptInvalidationMessages();
04671 
04672     /*
04673      * If we have received any SI relcache invals since backend start, assume
04674      * we may have written out-of-date data.
04675      */
04676     if (relcacheInvalsReceived == 0L)
04677     {
04678         /*
04679          * OK, rename the temp file to its final name, deleting any
04680          * previously-existing init file.
04681          *
04682          * Note: a failure here is possible under Cygwin, if some other
04683          * backend is holding open an unlinked-but-not-yet-gone init file. So
04684          * treat this as a noncritical failure; just remove the useless temp
04685          * file on failure.
04686          */
04687         if (rename(tempfilename, finalfilename) < 0)
04688             unlink(tempfilename);
04689     }
04690     else
04691     {
04692         /* Delete the already-obsolete temp file */
04693         unlink(tempfilename);
04694     }
04695 
04696     LWLockRelease(RelCacheInitLock);
04697 }
04698 
04699 /* write a chunk of data preceded by its length */
04700 static void
04701 write_item(const void *data, Size len, FILE *fp)
04702 {
04703     if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
04704         elog(FATAL, "could not write init file");
04705     if (fwrite(data, 1, len, fp) != len)
04706         elog(FATAL, "could not write init file");
04707 }
04708 
04709 /*
04710  * Detect whether a given relation (identified by OID) is one of the ones
04711  * we store in the local relcache init file.
04712  *
04713  * Note that we effectively assume that all backends running in a database
04714  * would choose to store the same set of relations in the init file;
04715  * otherwise there are cases where we'd fail to detect the need for an init
04716  * file invalidation.  This does not seem likely to be a problem in practice.
04717  */
04718 bool
04719 RelationIdIsInInitFile(Oid relationId)
04720 {
04721     return list_member_oid(initFileRelationIds, relationId);
04722 }
04723 
04724 /*
04725  * Invalidate (remove) the init file during commit of a transaction that
04726  * changed one or more of the relation cache entries that are kept in the
04727  * local init file.
04728  *
04729  * To be safe against concurrent inspection or rewriting of the init file,
04730  * we must take RelCacheInitLock, then remove the old init file, then send
04731  * the SI messages that include relcache inval for such relations, and then
04732  * release RelCacheInitLock.  This serializes the whole affair against
04733  * write_relcache_init_file, so that we can be sure that any other process
04734  * that's concurrently trying to create a new init file won't move an
04735  * already-stale version into place after we unlink.  Also, because we unlink
04736  * before sending the SI messages, a backend that's currently starting cannot
04737  * read the now-obsolete init file and then miss the SI messages that will
04738  * force it to update its relcache entries.  (This works because the backend
04739  * startup sequence gets into the sinval array before trying to load the init
04740  * file.)
04741  *
04742  * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
04743  * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
04744  * send any pending SI messages between those calls.
04745  *
04746  * Notice this deals only with the local init file, not the shared init file.
04747  * The reason is that there can never be a "significant" change to the
04748  * relcache entry of a shared relation; the most that could happen is
04749  * updates of noncritical fields such as relpages/reltuples.  So, while
04750  * it's worth updating the shared init file from time to time, it can never
04751  * be invalid enough to make it necessary to remove it.
04752  */
04753 void
04754 RelationCacheInitFilePreInvalidate(void)
04755 {
04756     char        initfilename[MAXPGPATH];
04757 
04758     snprintf(initfilename, sizeof(initfilename), "%s/%s",
04759              DatabasePath, RELCACHE_INIT_FILENAME);
04760 
04761     LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
04762 
04763     if (unlink(initfilename) < 0)
04764     {
04765         /*
04766          * The file might not be there if no backend has been started since
04767          * the last removal.  But complain about failures other than ENOENT.
04768          * Fortunately, it's not too late to abort the transaction if we can't
04769          * get rid of the would-be-obsolete init file.
04770          */
04771         if (errno != ENOENT)
04772             ereport(ERROR,
04773                     (errcode_for_file_access(),
04774                      errmsg("could not remove cache file \"%s\": %m",
04775                             initfilename)));
04776     }
04777 }
04778 
04779 void
04780 RelationCacheInitFilePostInvalidate(void)
04781 {
04782     LWLockRelease(RelCacheInitLock);
04783 }
04784 
04785 /*
04786  * Remove the init files during postmaster startup.
04787  *
04788  * We used to keep the init files across restarts, but that is unsafe in PITR
04789  * scenarios, and even in simple crash-recovery cases there are windows for
04790  * the init files to become out-of-sync with the database.  So now we just
04791  * remove them during startup and expect the first backend launch to rebuild
04792  * them.  Of course, this has to happen in each database of the cluster.
04793  */
04794 void
04795 RelationCacheInitFileRemove(void)
04796 {
04797     const char *tblspcdir = "pg_tblspc";
04798     DIR        *dir;
04799     struct dirent *de;
04800     char        path[MAXPGPATH];
04801 
04802     /*
04803      * We zap the shared cache file too.  In theory it can't get out of sync
04804      * enough to be a problem, but in data-corruption cases, who knows ...
04805      */
04806     snprintf(path, sizeof(path), "global/%s",
04807              RELCACHE_INIT_FILENAME);
04808     unlink_initfile(path);
04809 
04810     /* Scan everything in the default tablespace */
04811     RelationCacheInitFileRemoveInDir("base");
04812 
04813     /* Scan the tablespace link directory to find non-default tablespaces */
04814     dir = AllocateDir(tblspcdir);
04815     if (dir == NULL)
04816     {
04817         elog(LOG, "could not open tablespace link directory \"%s\": %m",
04818              tblspcdir);
04819         return;
04820     }
04821 
04822     while ((de = ReadDir(dir, tblspcdir)) != NULL)
04823     {
04824         if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
04825         {
04826             /* Scan the tablespace dir for per-database dirs */
04827             snprintf(path, sizeof(path), "%s/%s/%s",
04828                      tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
04829             RelationCacheInitFileRemoveInDir(path);
04830         }
04831     }
04832 
04833     FreeDir(dir);
04834 }
04835 
04836 /* Process one per-tablespace directory for RelationCacheInitFileRemove */
04837 static void
04838 RelationCacheInitFileRemoveInDir(const char *tblspcpath)
04839 {
04840     DIR        *dir;
04841     struct dirent *de;
04842     char        initfilename[MAXPGPATH];
04843 
04844     /* Scan the tablespace directory to find per-database directories */
04845     dir = AllocateDir(tblspcpath);
04846     if (dir == NULL)
04847     {
04848         elog(LOG, "could not open tablespace directory \"%s\": %m",
04849              tblspcpath);
04850         return;
04851     }
04852 
04853     while ((de = ReadDir(dir, tblspcpath)) != NULL)
04854     {
04855         if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
04856         {
04857             /* Try to remove the init file in each database */
04858             snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
04859                      tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
04860             unlink_initfile(initfilename);
04861         }
04862     }
04863 
04864     FreeDir(dir);
04865 }
04866 
04867 static void
04868 unlink_initfile(const char *initfilename)
04869 {
04870     if (unlink(initfilename) < 0)
04871     {
04872         /* It might not be there, but log any error other than ENOENT */
04873         if (errno != ENOENT)
04874             elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
04875     }
04876 }