Header And Logo

PostgreSQL
| The world's most advanced open source database.

pg_inherits.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * pg_inherits.c
00004  *    routines to support manipulation of the pg_inherits relation
00005  *
00006  * Note: currently, this module only contains inquiry functions; the actual
00007  * creation and deletion of pg_inherits entries is done in tablecmds.c.
00008  * Perhaps someday that code should be moved here, but it'd have to be
00009  * disentangled from other stuff such as pg_depend updates.
00010  *
00011  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00012  * Portions Copyright (c) 1994, Regents of the University of California
00013  *
00014  *
00015  * IDENTIFICATION
00016  *    src/backend/catalog/pg_inherits.c
00017  *
00018  *-------------------------------------------------------------------------
00019  */
00020 #include "postgres.h"
00021 
00022 #include "access/genam.h"
00023 #include "access/heapam.h"
00024 #include "access/htup_details.h"
00025 #include "catalog/indexing.h"
00026 #include "catalog/pg_inherits.h"
00027 #include "catalog/pg_inherits_fn.h"
00028 #include "parser/parse_type.h"
00029 #include "storage/lmgr.h"
00030 #include "utils/fmgroids.h"
00031 #include "utils/syscache.h"
00032 #include "utils/tqual.h"
00033 
00034 static int  oid_cmp(const void *p1, const void *p2);
00035 
00036 
00037 /*
00038  * find_inheritance_children
00039  *
00040  * Returns a list containing the OIDs of all relations which
00041  * inherit *directly* from the relation with OID 'parentrelId'.
00042  *
00043  * The specified lock type is acquired on each child relation (but not on the
00044  * given rel; caller should already have locked it).  If lockmode is NoLock
00045  * then no locks are acquired, but caller must beware of race conditions
00046  * against possible DROPs of child relations.
00047  */
00048 List *
00049 find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
00050 {
00051     List       *list = NIL;
00052     Relation    relation;
00053     SysScanDesc scan;
00054     ScanKeyData key[1];
00055     HeapTuple   inheritsTuple;
00056     Oid         inhrelid;
00057     Oid        *oidarr;
00058     int         maxoids,
00059                 numoids,
00060                 i;
00061 
00062     /*
00063      * Can skip the scan if pg_class shows the relation has never had a
00064      * subclass.
00065      */
00066     if (!has_subclass(parentrelId))
00067         return NIL;
00068 
00069     /*
00070      * Scan pg_inherits and build a working array of subclass OIDs.
00071      */
00072     maxoids = 32;
00073     oidarr = (Oid *) palloc(maxoids * sizeof(Oid));
00074     numoids = 0;
00075 
00076     relation = heap_open(InheritsRelationId, AccessShareLock);
00077 
00078     ScanKeyInit(&key[0],
00079                 Anum_pg_inherits_inhparent,
00080                 BTEqualStrategyNumber, F_OIDEQ,
00081                 ObjectIdGetDatum(parentrelId));
00082 
00083     scan = systable_beginscan(relation, InheritsParentIndexId, true,
00084                               SnapshotNow, 1, key);
00085 
00086     while ((inheritsTuple = systable_getnext(scan)) != NULL)
00087     {
00088         inhrelid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
00089         if (numoids >= maxoids)
00090         {
00091             maxoids *= 2;
00092             oidarr = (Oid *) repalloc(oidarr, maxoids * sizeof(Oid));
00093         }
00094         oidarr[numoids++] = inhrelid;
00095     }
00096 
00097     systable_endscan(scan);
00098 
00099     heap_close(relation, AccessShareLock);
00100 
00101     /*
00102      * If we found more than one child, sort them by OID.  This ensures
00103      * reasonably consistent behavior regardless of the vagaries of an
00104      * indexscan.  This is important since we need to be sure all backends
00105      * lock children in the same order to avoid needless deadlocks.
00106      */
00107     if (numoids > 1)
00108         qsort(oidarr, numoids, sizeof(Oid), oid_cmp);
00109 
00110     /*
00111      * Acquire locks and build the result list.
00112      */
00113     for (i = 0; i < numoids; i++)
00114     {
00115         inhrelid = oidarr[i];
00116 
00117         if (lockmode != NoLock)
00118         {
00119             /* Get the lock to synchronize against concurrent drop */
00120             LockRelationOid(inhrelid, lockmode);
00121 
00122             /*
00123              * Now that we have the lock, double-check to see if the relation
00124              * really exists or not.  If not, assume it was dropped while we
00125              * waited to acquire lock, and ignore it.
00126              */
00127             if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(inhrelid)))
00128             {
00129                 /* Release useless lock */
00130                 UnlockRelationOid(inhrelid, lockmode);
00131                 /* And ignore this relation */
00132                 continue;
00133             }
00134         }
00135 
00136         list = lappend_oid(list, inhrelid);
00137     }
00138 
00139     pfree(oidarr);
00140 
00141     return list;
00142 }
00143 
00144 
00145 /*
00146  * find_all_inheritors -
00147  *      Returns a list of relation OIDs including the given rel plus
00148  *      all relations that inherit from it, directly or indirectly.
00149  *      Optionally, it also returns the number of parents found for
00150  *      each such relation within the inheritance tree rooted at the
00151  *      given rel.
00152  *
00153  * The specified lock type is acquired on all child relations (but not on the
00154  * given rel; caller should already have locked it).  If lockmode is NoLock
00155  * then no locks are acquired, but caller must beware of race conditions
00156  * against possible DROPs of child relations.
00157  */
00158 List *
00159 find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
00160 {
00161     List       *rels_list,
00162                *rel_numparents;
00163     ListCell   *l;
00164 
00165     /*
00166      * We build a list starting with the given rel and adding all direct and
00167      * indirect children.  We can use a single list as both the record of
00168      * already-found rels and the agenda of rels yet to be scanned for more
00169      * children.  This is a bit tricky but works because the foreach() macro
00170      * doesn't fetch the next list element until the bottom of the loop.
00171      */
00172     rels_list = list_make1_oid(parentrelId);
00173     rel_numparents = list_make1_int(0);
00174 
00175     foreach(l, rels_list)
00176     {
00177         Oid         currentrel = lfirst_oid(l);
00178         List       *currentchildren;
00179         ListCell   *lc;
00180 
00181         /* Get the direct children of this rel */
00182         currentchildren = find_inheritance_children(currentrel, lockmode);
00183 
00184         /*
00185          * Add to the queue only those children not already seen. This avoids
00186          * making duplicate entries in case of multiple inheritance paths from
00187          * the same parent.  (It'll also keep us from getting into an infinite
00188          * loop, though theoretically there can't be any cycles in the
00189          * inheritance graph anyway.)
00190          */
00191         foreach(lc, currentchildren)
00192         {
00193             Oid         child_oid = lfirst_oid(lc);
00194             bool        found = false;
00195             ListCell   *lo;
00196             ListCell   *li;
00197 
00198             /* if the rel is already there, bump number-of-parents counter */
00199             forboth(lo, rels_list, li, rel_numparents)
00200             {
00201                 if (lfirst_oid(lo) == child_oid)
00202                 {
00203                     lfirst_int(li)++;
00204                     found = true;
00205                     break;
00206                 }
00207             }
00208 
00209             /* if it's not there, add it. expect 1 parent, initially. */
00210             if (!found)
00211             {
00212                 rels_list = lappend_oid(rels_list, child_oid);
00213                 rel_numparents = lappend_int(rel_numparents, 1);
00214             }
00215         }
00216     }
00217 
00218     if (numparents)
00219         *numparents = rel_numparents;
00220     else
00221         list_free(rel_numparents);
00222     return rels_list;
00223 }
00224 
00225 
00226 /*
00227  * has_subclass - does this relation have any children?
00228  *
00229  * In the current implementation, has_subclass returns whether a
00230  * particular class *might* have a subclass. It will not return the
00231  * correct result if a class had a subclass which was later dropped.
00232  * This is because relhassubclass in pg_class is not updated immediately
00233  * when a subclass is dropped, primarily because of concurrency concerns.
00234  *
00235  * Currently has_subclass is only used as an efficiency hack to skip
00236  * unnecessary inheritance searches, so this is OK.  Note that ANALYZE
00237  * on a childless table will clean up the obsolete relhassubclass flag.
00238  *
00239  * Although this doesn't actually touch pg_inherits, it seems reasonable
00240  * to keep it here since it's normally used with the other routines here.
00241  */
00242 bool
00243 has_subclass(Oid relationId)
00244 {
00245     HeapTuple   tuple;
00246     bool        result;
00247 
00248     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relationId));
00249     if (!HeapTupleIsValid(tuple))
00250         elog(ERROR, "cache lookup failed for relation %u", relationId);
00251 
00252     result = ((Form_pg_class) GETSTRUCT(tuple))->relhassubclass;
00253     ReleaseSysCache(tuple);
00254     return result;
00255 }
00256 
00257 
00258 /*
00259  * Given two type OIDs, determine whether the first is a complex type
00260  * (class type) that inherits from the second.
00261  */
00262 bool
00263 typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId)
00264 {
00265     bool        result = false;
00266     Oid         subclassRelid;
00267     Oid         superclassRelid;
00268     Relation    inhrel;
00269     List       *visited,
00270                *queue;
00271     ListCell   *queue_item;
00272 
00273     /* We need to work with the associated relation OIDs */
00274     subclassRelid = typeidTypeRelid(subclassTypeId);
00275     if (subclassRelid == InvalidOid)
00276         return false;           /* not a complex type */
00277     superclassRelid = typeidTypeRelid(superclassTypeId);
00278     if (superclassRelid == InvalidOid)
00279         return false;           /* not a complex type */
00280 
00281     /* No point in searching if the superclass has no subclasses */
00282     if (!has_subclass(superclassRelid))
00283         return false;
00284 
00285     /*
00286      * Begin the search at the relation itself, so add its relid to the queue.
00287      */
00288     queue = list_make1_oid(subclassRelid);
00289     visited = NIL;
00290 
00291     inhrel = heap_open(InheritsRelationId, AccessShareLock);
00292 
00293     /*
00294      * Use queue to do a breadth-first traversal of the inheritance graph from
00295      * the relid supplied up to the root.  Notice that we append to the queue
00296      * inside the loop --- this is okay because the foreach() macro doesn't
00297      * advance queue_item until the next loop iteration begins.
00298      */
00299     foreach(queue_item, queue)
00300     {
00301         Oid         this_relid = lfirst_oid(queue_item);
00302         ScanKeyData skey;
00303         SysScanDesc inhscan;
00304         HeapTuple   inhtup;
00305 
00306         /*
00307          * If we've seen this relid already, skip it.  This avoids extra work
00308          * in multiple-inheritance scenarios, and also protects us from an
00309          * infinite loop in case there is a cycle in pg_inherits (though
00310          * theoretically that shouldn't happen).
00311          */
00312         if (list_member_oid(visited, this_relid))
00313             continue;
00314 
00315         /*
00316          * Okay, this is a not-yet-seen relid. Add it to the list of
00317          * already-visited OIDs, then find all the types this relid inherits
00318          * from and add them to the queue.
00319          */
00320         visited = lappend_oid(visited, this_relid);
00321 
00322         ScanKeyInit(&skey,
00323                     Anum_pg_inherits_inhrelid,
00324                     BTEqualStrategyNumber, F_OIDEQ,
00325                     ObjectIdGetDatum(this_relid));
00326 
00327         inhscan = systable_beginscan(inhrel, InheritsRelidSeqnoIndexId, true,
00328                                      SnapshotNow, 1, &skey);
00329 
00330         while ((inhtup = systable_getnext(inhscan)) != NULL)
00331         {
00332             Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inhtup);
00333             Oid         inhparent = inh->inhparent;
00334 
00335             /* If this is the target superclass, we're done */
00336             if (inhparent == superclassRelid)
00337             {
00338                 result = true;
00339                 break;
00340             }
00341 
00342             /* Else add to queue */
00343             queue = lappend_oid(queue, inhparent);
00344         }
00345 
00346         systable_endscan(inhscan);
00347 
00348         if (result)
00349             break;
00350     }
00351 
00352     /* clean up ... */
00353     heap_close(inhrel, AccessShareLock);
00354 
00355     list_free(visited);
00356     list_free(queue);
00357 
00358     return result;
00359 }
00360 
00361 
00362 /* qsort comparison function */
00363 static int
00364 oid_cmp(const void *p1, const void *p2)
00365 {
00366     Oid         v1 = *((const Oid *) p1);
00367     Oid         v2 = *((const Oid *) p2);
00368 
00369     if (v1 < v2)
00370         return -1;
00371     if (v1 > v2)
00372         return 1;
00373     return 0;
00374 }