Header And Logo

PostgreSQL
| The world's most advanced open source database.

nbtutils.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * nbtutils.c
00004  *    Utility code for Postgres btree implementation.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/access/nbtree/nbtutils.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include <time.h>
00019 
00020 #include "access/nbtree.h"
00021 #include "access/reloptions.h"
00022 #include "access/relscan.h"
00023 #include "miscadmin.h"
00024 #include "utils/array.h"
00025 #include "utils/lsyscache.h"
00026 #include "utils/memutils.h"
00027 #include "utils/rel.h"
00028 
00029 
00030 typedef struct BTSortArrayContext
00031 {
00032     FmgrInfo    flinfo;
00033     Oid         collation;
00034     bool        reverse;
00035 } BTSortArrayContext;
00036 
00037 static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
00038                          StrategyNumber strat,
00039                          Datum *elems, int nelems);
00040 static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
00041                         bool reverse,
00042                         Datum *elems, int nelems);
00043 static int  _bt_compare_array_elements(const void *a, const void *b, void *arg);
00044 static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
00045                          ScanKey leftarg, ScanKey rightarg,
00046                          bool *result);
00047 static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption);
00048 static void _bt_mark_scankey_required(ScanKey skey);
00049 static bool _bt_check_rowcompare(ScanKey skey,
00050                      IndexTuple tuple, TupleDesc tupdesc,
00051                      ScanDirection dir, bool *continuescan);
00052 
00053 
00054 /*
00055  * _bt_mkscankey
00056  *      Build an insertion scan key that contains comparison data from itup
00057  *      as well as comparator routines appropriate to the key datatypes.
00058  *
00059  *      The result is intended for use with _bt_compare().
00060  */
00061 ScanKey
00062 _bt_mkscankey(Relation rel, IndexTuple itup)
00063 {
00064     ScanKey     skey;
00065     TupleDesc   itupdesc;
00066     int         natts;
00067     int16      *indoption;
00068     int         i;
00069 
00070     itupdesc = RelationGetDescr(rel);
00071     natts = RelationGetNumberOfAttributes(rel);
00072     indoption = rel->rd_indoption;
00073 
00074     skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
00075 
00076     for (i = 0; i < natts; i++)
00077     {
00078         FmgrInfo   *procinfo;
00079         Datum       arg;
00080         bool        null;
00081         int         flags;
00082 
00083         /*
00084          * We can use the cached (default) support procs since no cross-type
00085          * comparison can be needed.
00086          */
00087         procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
00088         arg = index_getattr(itup, i + 1, itupdesc, &null);
00089         flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
00090         ScanKeyEntryInitializeWithInfo(&skey[i],
00091                                        flags,
00092                                        (AttrNumber) (i + 1),
00093                                        InvalidStrategy,
00094                                        InvalidOid,
00095                                        rel->rd_indcollation[i],
00096                                        procinfo,
00097                                        arg);
00098     }
00099 
00100     return skey;
00101 }
00102 
00103 /*
00104  * _bt_mkscankey_nodata
00105  *      Build an insertion scan key that contains 3-way comparator routines
00106  *      appropriate to the key datatypes, but no comparison data.  The
00107  *      comparison data ultimately used must match the key datatypes.
00108  *
00109  *      The result cannot be used with _bt_compare(), unless comparison
00110  *      data is first stored into the key entries.  Currently this
00111  *      routine is only called by nbtsort.c and tuplesort.c, which have
00112  *      their own comparison routines.
00113  */
00114 ScanKey
00115 _bt_mkscankey_nodata(Relation rel)
00116 {
00117     ScanKey     skey;
00118     int         natts;
00119     int16      *indoption;
00120     int         i;
00121 
00122     natts = RelationGetNumberOfAttributes(rel);
00123     indoption = rel->rd_indoption;
00124 
00125     skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
00126 
00127     for (i = 0; i < natts; i++)
00128     {
00129         FmgrInfo   *procinfo;
00130         int         flags;
00131 
00132         /*
00133          * We can use the cached (default) support procs since no cross-type
00134          * comparison can be needed.
00135          */
00136         procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
00137         flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
00138         ScanKeyEntryInitializeWithInfo(&skey[i],
00139                                        flags,
00140                                        (AttrNumber) (i + 1),
00141                                        InvalidStrategy,
00142                                        InvalidOid,
00143                                        rel->rd_indcollation[i],
00144                                        procinfo,
00145                                        (Datum) 0);
00146     }
00147 
00148     return skey;
00149 }
00150 
00151 /*
00152  * free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
00153  */
00154 void
00155 _bt_freeskey(ScanKey skey)
00156 {
00157     pfree(skey);
00158 }
00159 
00160 /*
00161  * free a retracement stack made by _bt_search.
00162  */
00163 void
00164 _bt_freestack(BTStack stack)
00165 {
00166     BTStack     ostack;
00167 
00168     while (stack != NULL)
00169     {
00170         ostack = stack;
00171         stack = stack->bts_parent;
00172         pfree(ostack);
00173     }
00174 }
00175 
00176 
00177 /*
00178  *  _bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys
00179  *
00180  * If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and
00181  * set up BTArrayKeyInfo info for each one that is an equality-type key.
00182  * Prepare modified scan keys in so->arrayKeyData, which will hold the current
00183  * array elements during each primitive indexscan operation.  For inequality
00184  * array keys, it's sufficient to find the extreme element value and replace
00185  * the whole array with that scalar value.
00186  *
00187  * Note: the reason we need so->arrayKeyData, rather than just scribbling
00188  * on scan->keyData, is that callers are permitted to call btrescan without
00189  * supplying a new set of scankey data.
00190  */
00191 void
00192 _bt_preprocess_array_keys(IndexScanDesc scan)
00193 {
00194     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00195     int         numberOfKeys = scan->numberOfKeys;
00196     int16      *indoption = scan->indexRelation->rd_indoption;
00197     int         numArrayKeys;
00198     ScanKey     cur;
00199     int         i;
00200     MemoryContext oldContext;
00201 
00202     /* Quick check to see if there are any array keys */
00203     numArrayKeys = 0;
00204     for (i = 0; i < numberOfKeys; i++)
00205     {
00206         cur = &scan->keyData[i];
00207         if (cur->sk_flags & SK_SEARCHARRAY)
00208         {
00209             numArrayKeys++;
00210             Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL)));
00211             /* If any arrays are null as a whole, we can quit right now. */
00212             if (cur->sk_flags & SK_ISNULL)
00213             {
00214                 so->numArrayKeys = -1;
00215                 so->arrayKeyData = NULL;
00216                 return;
00217             }
00218         }
00219     }
00220 
00221     /* Quit if nothing to do. */
00222     if (numArrayKeys == 0)
00223     {
00224         so->numArrayKeys = 0;
00225         so->arrayKeyData = NULL;
00226         return;
00227     }
00228 
00229     /*
00230      * Make a scan-lifespan context to hold array-associated data, or reset it
00231      * if we already have one from a previous rescan cycle.
00232      */
00233     if (so->arrayContext == NULL)
00234         so->arrayContext = AllocSetContextCreate(CurrentMemoryContext,
00235                                                  "BTree Array Context",
00236                                                  ALLOCSET_SMALL_MINSIZE,
00237                                                  ALLOCSET_SMALL_INITSIZE,
00238                                                  ALLOCSET_SMALL_MAXSIZE);
00239     else
00240         MemoryContextReset(so->arrayContext);
00241 
00242     oldContext = MemoryContextSwitchTo(so->arrayContext);
00243 
00244     /* Create modifiable copy of scan->keyData in the workspace context */
00245     so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
00246     memcpy(so->arrayKeyData,
00247            scan->keyData,
00248            scan->numberOfKeys * sizeof(ScanKeyData));
00249 
00250     /* Allocate space for per-array data in the workspace context */
00251     so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo));
00252 
00253     /* Now process each array key */
00254     numArrayKeys = 0;
00255     for (i = 0; i < numberOfKeys; i++)
00256     {
00257         ArrayType  *arrayval;
00258         int16       elmlen;
00259         bool        elmbyval;
00260         char        elmalign;
00261         int         num_elems;
00262         Datum      *elem_values;
00263         bool       *elem_nulls;
00264         int         num_nonnulls;
00265         int         j;
00266 
00267         cur = &so->arrayKeyData[i];
00268         if (!(cur->sk_flags & SK_SEARCHARRAY))
00269             continue;
00270 
00271         /*
00272          * First, deconstruct the array into elements.  Anything allocated
00273          * here (including a possibly detoasted array value) is in the
00274          * workspace context.
00275          */
00276         arrayval = DatumGetArrayTypeP(cur->sk_argument);
00277         /* We could cache this data, but not clear it's worth it */
00278         get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
00279                              &elmlen, &elmbyval, &elmalign);
00280         deconstruct_array(arrayval,
00281                           ARR_ELEMTYPE(arrayval),
00282                           elmlen, elmbyval, elmalign,
00283                           &elem_values, &elem_nulls, &num_elems);
00284 
00285         /*
00286          * Compress out any null elements.  We can ignore them since we assume
00287          * all btree operators are strict.
00288          */
00289         num_nonnulls = 0;
00290         for (j = 0; j < num_elems; j++)
00291         {
00292             if (!elem_nulls[j])
00293                 elem_values[num_nonnulls++] = elem_values[j];
00294         }
00295 
00296         /* We could pfree(elem_nulls) now, but not worth the cycles */
00297 
00298         /* If there's no non-nulls, the scan qual is unsatisfiable */
00299         if (num_nonnulls == 0)
00300         {
00301             numArrayKeys = -1;
00302             break;
00303         }
00304 
00305         /*
00306          * If the comparison operator is not equality, then the array qual
00307          * degenerates to a simple comparison against the smallest or largest
00308          * non-null array element, as appropriate.
00309          */
00310         switch (cur->sk_strategy)
00311         {
00312             case BTLessStrategyNumber:
00313             case BTLessEqualStrategyNumber:
00314                 cur->sk_argument =
00315                     _bt_find_extreme_element(scan, cur,
00316                                              BTGreaterStrategyNumber,
00317                                              elem_values, num_nonnulls);
00318                 continue;
00319             case BTEqualStrategyNumber:
00320                 /* proceed with rest of loop */
00321                 break;
00322             case BTGreaterEqualStrategyNumber:
00323             case BTGreaterStrategyNumber:
00324                 cur->sk_argument =
00325                     _bt_find_extreme_element(scan, cur,
00326                                              BTLessStrategyNumber,
00327                                              elem_values, num_nonnulls);
00328                 continue;
00329             default:
00330                 elog(ERROR, "unrecognized StrategyNumber: %d",
00331                      (int) cur->sk_strategy);
00332                 break;
00333         }
00334 
00335         /*
00336          * Sort the non-null elements and eliminate any duplicates.  We must
00337          * sort in the same ordering used by the index column, so that the
00338          * successive primitive indexscans produce data in index order.
00339          */
00340         num_elems = _bt_sort_array_elements(scan, cur,
00341                         (indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0,
00342                                             elem_values, num_nonnulls);
00343 
00344         /*
00345          * And set up the BTArrayKeyInfo data.
00346          */
00347         so->arrayKeys[numArrayKeys].scan_key = i;
00348         so->arrayKeys[numArrayKeys].num_elems = num_elems;
00349         so->arrayKeys[numArrayKeys].elem_values = elem_values;
00350         numArrayKeys++;
00351     }
00352 
00353     so->numArrayKeys = numArrayKeys;
00354 
00355     MemoryContextSwitchTo(oldContext);
00356 }
00357 
00358 /*
00359  * _bt_find_extreme_element() -- get least or greatest array element
00360  *
00361  * scan and skey identify the index column, whose opfamily determines the
00362  * comparison semantics.  strat should be BTLessStrategyNumber to get the
00363  * least element, or BTGreaterStrategyNumber to get the greatest.
00364  */
00365 static Datum
00366 _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
00367                          StrategyNumber strat,
00368                          Datum *elems, int nelems)
00369 {
00370     Relation    rel = scan->indexRelation;
00371     Oid         elemtype,
00372                 cmp_op;
00373     RegProcedure cmp_proc;
00374     FmgrInfo    flinfo;
00375     Datum       result;
00376     int         i;
00377 
00378     /*
00379      * Determine the nominal datatype of the array elements.  We have to
00380      * support the convention that sk_subtype == InvalidOid means the opclass
00381      * input type; this is a hack to simplify life for ScanKeyInit().
00382      */
00383     elemtype = skey->sk_subtype;
00384     if (elemtype == InvalidOid)
00385         elemtype = rel->rd_opcintype[skey->sk_attno - 1];
00386 
00387     /*
00388      * Look up the appropriate comparison operator in the opfamily.
00389      *
00390      * Note: it's possible that this would fail, if the opfamily is
00391      * incomplete, but it seems quite unlikely that an opfamily would omit
00392      * non-cross-type comparison operators for any datatype that it supports
00393      * at all.
00394      */
00395     cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1],
00396                                  elemtype,
00397                                  elemtype,
00398                                  strat);
00399     if (!OidIsValid(cmp_op))
00400         elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
00401              strat, elemtype, elemtype,
00402              rel->rd_opfamily[skey->sk_attno - 1]);
00403     cmp_proc = get_opcode(cmp_op);
00404     if (!RegProcedureIsValid(cmp_proc))
00405         elog(ERROR, "missing oprcode for operator %u", cmp_op);
00406 
00407     fmgr_info(cmp_proc, &flinfo);
00408 
00409     Assert(nelems > 0);
00410     result = elems[0];
00411     for (i = 1; i < nelems; i++)
00412     {
00413         if (DatumGetBool(FunctionCall2Coll(&flinfo,
00414                                            skey->sk_collation,
00415                                            elems[i],
00416                                            result)))
00417             result = elems[i];
00418     }
00419 
00420     return result;
00421 }
00422 
00423 /*
00424  * _bt_sort_array_elements() -- sort and de-dup array elements
00425  *
00426  * The array elements are sorted in-place, and the new number of elements
00427  * after duplicate removal is returned.
00428  *
00429  * scan and skey identify the index column, whose opfamily determines the
00430  * comparison semantics.  If reverse is true, we sort in descending order.
00431  */
00432 static int
00433 _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
00434                         bool reverse,
00435                         Datum *elems, int nelems)
00436 {
00437     Relation    rel = scan->indexRelation;
00438     Oid         elemtype;
00439     RegProcedure cmp_proc;
00440     BTSortArrayContext cxt;
00441     int         last_non_dup;
00442     int         i;
00443 
00444     if (nelems <= 1)
00445         return nelems;          /* no work to do */
00446 
00447     /*
00448      * Determine the nominal datatype of the array elements.  We have to
00449      * support the convention that sk_subtype == InvalidOid means the opclass
00450      * input type; this is a hack to simplify life for ScanKeyInit().
00451      */
00452     elemtype = skey->sk_subtype;
00453     if (elemtype == InvalidOid)
00454         elemtype = rel->rd_opcintype[skey->sk_attno - 1];
00455 
00456     /*
00457      * Look up the appropriate comparison function in the opfamily.
00458      *
00459      * Note: it's possible that this would fail, if the opfamily is
00460      * incomplete, but it seems quite unlikely that an opfamily would omit
00461      * non-cross-type support functions for any datatype that it supports at
00462      * all.
00463      */
00464     cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1],
00465                                  elemtype,
00466                                  elemtype,
00467                                  BTORDER_PROC);
00468     if (!RegProcedureIsValid(cmp_proc))
00469         elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
00470              BTORDER_PROC, elemtype, elemtype,
00471              rel->rd_opfamily[skey->sk_attno - 1]);
00472 
00473     /* Sort the array elements */
00474     fmgr_info(cmp_proc, &cxt.flinfo);
00475     cxt.collation = skey->sk_collation;
00476     cxt.reverse = reverse;
00477     qsort_arg((void *) elems, nelems, sizeof(Datum),
00478               _bt_compare_array_elements, (void *) &cxt);
00479 
00480     /* Now scan the sorted elements and remove duplicates */
00481     last_non_dup = 0;
00482     for (i = 1; i < nelems; i++)
00483     {
00484         int32       compare;
00485 
00486         compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo,
00487                                                   cxt.collation,
00488                                                   elems[last_non_dup],
00489                                                   elems[i]));
00490         if (compare != 0)
00491             elems[++last_non_dup] = elems[i];
00492     }
00493 
00494     return last_non_dup + 1;
00495 }
00496 
00497 /*
00498  * qsort_arg comparator for sorting array elements
00499  */
00500 static int
00501 _bt_compare_array_elements(const void *a, const void *b, void *arg)
00502 {
00503     Datum       da = *((const Datum *) a);
00504     Datum       db = *((const Datum *) b);
00505     BTSortArrayContext *cxt = (BTSortArrayContext *) arg;
00506     int32       compare;
00507 
00508     compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo,
00509                                               cxt->collation,
00510                                               da, db));
00511     if (cxt->reverse)
00512         compare = -compare;
00513     return compare;
00514 }
00515 
00516 /*
00517  * _bt_start_array_keys() -- Initialize array keys at start of a scan
00518  *
00519  * Set up the cur_elem counters and fill in the first sk_argument value for
00520  * each array scankey.  We can't do this until we know the scan direction.
00521  */
00522 void
00523 _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
00524 {
00525     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00526     int         i;
00527 
00528     for (i = 0; i < so->numArrayKeys; i++)
00529     {
00530         BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
00531         ScanKey     skey = &so->arrayKeyData[curArrayKey->scan_key];
00532 
00533         Assert(curArrayKey->num_elems > 0);
00534         if (ScanDirectionIsBackward(dir))
00535             curArrayKey->cur_elem = curArrayKey->num_elems - 1;
00536         else
00537             curArrayKey->cur_elem = 0;
00538         skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
00539     }
00540 }
00541 
00542 /*
00543  * _bt_advance_array_keys() -- Advance to next set of array elements
00544  *
00545  * Returns TRUE if there is another set of values to consider, FALSE if not.
00546  * On TRUE result, the scankeys are initialized with the next set of values.
00547  */
00548 bool
00549 _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
00550 {
00551     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00552     bool        found = false;
00553     int         i;
00554 
00555     /*
00556      * We must advance the last array key most quickly, since it will
00557      * correspond to the lowest-order index column among the available
00558      * qualifications. This is necessary to ensure correct ordering of output
00559      * when there are multiple array keys.
00560      */
00561     for (i = so->numArrayKeys - 1; i >= 0; i--)
00562     {
00563         BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
00564         ScanKey     skey = &so->arrayKeyData[curArrayKey->scan_key];
00565         int         cur_elem = curArrayKey->cur_elem;
00566         int         num_elems = curArrayKey->num_elems;
00567 
00568         if (ScanDirectionIsBackward(dir))
00569         {
00570             if (--cur_elem < 0)
00571             {
00572                 cur_elem = num_elems - 1;
00573                 found = false;  /* need to advance next array key */
00574             }
00575             else
00576                 found = true;
00577         }
00578         else
00579         {
00580             if (++cur_elem >= num_elems)
00581             {
00582                 cur_elem = 0;
00583                 found = false;  /* need to advance next array key */
00584             }
00585             else
00586                 found = true;
00587         }
00588 
00589         curArrayKey->cur_elem = cur_elem;
00590         skey->sk_argument = curArrayKey->elem_values[cur_elem];
00591         if (found)
00592             break;
00593     }
00594 
00595     return found;
00596 }
00597 
00598 /*
00599  * _bt_mark_array_keys() -- Handle array keys during btmarkpos
00600  *
00601  * Save the current state of the array keys as the "mark" position.
00602  */
00603 void
00604 _bt_mark_array_keys(IndexScanDesc scan)
00605 {
00606     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00607     int         i;
00608 
00609     for (i = 0; i < so->numArrayKeys; i++)
00610     {
00611         BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
00612 
00613         curArrayKey->mark_elem = curArrayKey->cur_elem;
00614     }
00615 }
00616 
00617 /*
00618  * _bt_restore_array_keys() -- Handle array keys during btrestrpos
00619  *
00620  * Restore the array keys to where they were when the mark was set.
00621  */
00622 void
00623 _bt_restore_array_keys(IndexScanDesc scan)
00624 {
00625     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00626     bool        changed = false;
00627     int         i;
00628 
00629     /* Restore each array key to its position when the mark was set */
00630     for (i = 0; i < so->numArrayKeys; i++)
00631     {
00632         BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
00633         ScanKey     skey = &so->arrayKeyData[curArrayKey->scan_key];
00634         int         mark_elem = curArrayKey->mark_elem;
00635 
00636         if (curArrayKey->cur_elem != mark_elem)
00637         {
00638             curArrayKey->cur_elem = mark_elem;
00639             skey->sk_argument = curArrayKey->elem_values[mark_elem];
00640             changed = true;
00641         }
00642     }
00643 
00644     /*
00645      * If we changed any keys, we must redo _bt_preprocess_keys.  That might
00646      * sound like overkill, but in cases with multiple keys per index column
00647      * it seems necessary to do the full set of pushups.
00648      */
00649     if (changed)
00650     {
00651         _bt_preprocess_keys(scan);
00652         /* The mark should have been set on a consistent set of keys... */
00653         Assert(so->qual_ok);
00654     }
00655 }
00656 
00657 
00658 /*
00659  *  _bt_preprocess_keys() -- Preprocess scan keys
00660  *
00661  * The given search-type keys (in scan->keyData[] or so->arrayKeyData[])
00662  * are copied to so->keyData[] with possible transformation.
00663  * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets
00664  * the number of output keys (possibly less, never greater).
00665  *
00666  * The output keys are marked with additional sk_flag bits beyond the
00667  * system-standard bits supplied by the caller.  The DESC and NULLS_FIRST
00668  * indoption bits for the relevant index attribute are copied into the flags.
00669  * Also, for a DESC column, we commute (flip) all the sk_strategy numbers
00670  * so that the index sorts in the desired direction.
00671  *
00672  * One key purpose of this routine is to discover which scan keys must be
00673  * satisfied to continue the scan.  It also attempts to eliminate redundant
00674  * keys and detect contradictory keys.  (If the index opfamily provides
00675  * incomplete sets of cross-type operators, we may fail to detect redundant
00676  * or contradictory keys, but we can survive that.)
00677  *
00678  * The output keys must be sorted by index attribute.  Presently we expect
00679  * (but verify) that the input keys are already so sorted --- this is done
00680  * by match_clauses_to_index() in indxpath.c.  Some reordering of the keys
00681  * within each attribute may be done as a byproduct of the processing here,
00682  * but no other code depends on that.
00683  *
00684  * The output keys are marked with flags SK_BT_REQFWD and/or SK_BT_REQBKWD
00685  * if they must be satisfied in order to continue the scan forward or backward
00686  * respectively.  _bt_checkkeys uses these flags.  For example, if the quals
00687  * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
00688  * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
00689  * But once we reach tuples like (1,4,z) we can stop scanning because no
00690  * later tuples could match.  This is reflected by marking the x and y keys,
00691  * but not the z key, with SK_BT_REQFWD.  In general, the keys for leading
00692  * attributes with "=" keys are marked both SK_BT_REQFWD and SK_BT_REQBKWD.
00693  * For the first attribute without an "=" key, any "<" and "<=" keys are
00694  * marked SK_BT_REQFWD while any ">" and ">=" keys are marked SK_BT_REQBKWD.
00695  * This can be seen to be correct by considering the above example.  Note
00696  * in particular that if there are no keys for a given attribute, the keys for
00697  * subsequent attributes can never be required; for instance "WHERE y = 4"
00698  * requires a full-index scan.
00699  *
00700  * If possible, redundant keys are eliminated: we keep only the tightest
00701  * >/>= bound and the tightest </<= bound, and if there's an = key then
00702  * that's the only one returned.  (So, we return either a single = key,
00703  * or one or two boundary-condition keys for each attr.)  However, if we
00704  * cannot compare two keys for lack of a suitable cross-type operator,
00705  * we cannot eliminate either.  If there are two such keys of the same
00706  * operator strategy, the second one is just pushed into the output array
00707  * without further processing here.  We may also emit both >/>= or both
00708  * </<= keys if we can't compare them.  The logic about required keys still
00709  * works if we don't eliminate redundant keys.
00710  *
00711  * Note that one reason we need direction-sensitive required-key flags is
00712  * precisely that we may not be able to eliminate redundant keys.  Suppose
00713  * we have "x > 4::int AND x > 10::bigint", and we are unable to determine
00714  * which key is more restrictive for lack of a suitable cross-type operator.
00715  * _bt_first will arbitrarily pick one of the keys to do the initial
00716  * positioning with.  If it picks x > 4, then the x > 10 condition will fail
00717  * until we reach index entries > 10; but we can't stop the scan just because
00718  * x > 10 is failing.  On the other hand, if we are scanning backwards, then
00719  * failure of either key is indeed enough to stop the scan.  (In general, when
00720  * inequality keys are present, the initial-positioning code only promises to
00721  * position before the first possible match, not exactly at the first match,
00722  * for a forward scan; or after the last match for a backward scan.)
00723  *
00724  * As a byproduct of this work, we can detect contradictory quals such
00725  * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = FALSE,
00726  * indicating the scan need not be run at all since no tuples can match.
00727  * (In this case we do not bother completing the output key array!)
00728  * Again, missing cross-type operators might cause us to fail to prove the
00729  * quals contradictory when they really are, but the scan will work correctly.
00730  *
00731  * Row comparison keys are currently also treated without any smarts:
00732  * we just transfer them into the preprocessed array without any
00733  * editorialization.  We can treat them the same as an ordinary inequality
00734  * comparison on the row's first index column, for the purposes of the logic
00735  * about required keys.
00736  *
00737  * Note: the reason we have to copy the preprocessed scan keys into private
00738  * storage is that we are modifying the array based on comparisons of the
00739  * key argument values, which could change on a rescan or after moving to
00740  * new elements of array keys.  Therefore we can't overwrite the source data.
00741  */
00742 void
00743 _bt_preprocess_keys(IndexScanDesc scan)
00744 {
00745     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00746     int         numberOfKeys = scan->numberOfKeys;
00747     int16      *indoption = scan->indexRelation->rd_indoption;
00748     int         new_numberOfKeys;
00749     int         numberOfEqualCols;
00750     ScanKey     inkeys;
00751     ScanKey     outkeys;
00752     ScanKey     cur;
00753     ScanKey     xform[BTMaxStrategyNumber];
00754     bool        test_result;
00755     int         i,
00756                 j;
00757     AttrNumber  attno;
00758 
00759     /* initialize result variables */
00760     so->qual_ok = true;
00761     so->numberOfKeys = 0;
00762 
00763     if (numberOfKeys < 1)
00764         return;                 /* done if qual-less scan */
00765 
00766     /*
00767      * Read so->arrayKeyData if array keys are present, else scan->keyData
00768      */
00769     if (so->arrayKeyData != NULL)
00770         inkeys = so->arrayKeyData;
00771     else
00772         inkeys = scan->keyData;
00773 
00774     outkeys = so->keyData;
00775     cur = &inkeys[0];
00776     /* we check that input keys are correctly ordered */
00777     if (cur->sk_attno < 1)
00778         elog(ERROR, "btree index keys must be ordered by attribute");
00779 
00780     /* We can short-circuit most of the work if there's just one key */
00781     if (numberOfKeys == 1)
00782     {
00783         /* Apply indoption to scankey (might change sk_strategy!) */
00784         if (!_bt_fix_scankey_strategy(cur, indoption))
00785             so->qual_ok = false;
00786         memcpy(outkeys, cur, sizeof(ScanKeyData));
00787         so->numberOfKeys = 1;
00788         /* We can mark the qual as required if it's for first index col */
00789         if (cur->sk_attno == 1)
00790             _bt_mark_scankey_required(outkeys);
00791         return;
00792     }
00793 
00794     /*
00795      * Otherwise, do the full set of pushups.
00796      */
00797     new_numberOfKeys = 0;
00798     numberOfEqualCols = 0;
00799 
00800     /*
00801      * Initialize for processing of keys for attr 1.
00802      *
00803      * xform[i] points to the currently best scan key of strategy type i+1; it
00804      * is NULL if we haven't yet found such a key for this attr.
00805      */
00806     attno = 1;
00807     memset(xform, 0, sizeof(xform));
00808 
00809     /*
00810      * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
00811      * handle after-last-key processing.  Actual exit from the loop is at the
00812      * "break" statement below.
00813      */
00814     for (i = 0;; cur++, i++)
00815     {
00816         if (i < numberOfKeys)
00817         {
00818             /* Apply indoption to scankey (might change sk_strategy!) */
00819             if (!_bt_fix_scankey_strategy(cur, indoption))
00820             {
00821                 /* NULL can't be matched, so give up */
00822                 so->qual_ok = false;
00823                 return;
00824             }
00825         }
00826 
00827         /*
00828          * If we are at the end of the keys for a particular attr, finish up
00829          * processing and emit the cleaned-up keys.
00830          */
00831         if (i == numberOfKeys || cur->sk_attno != attno)
00832         {
00833             int         priorNumberOfEqualCols = numberOfEqualCols;
00834 
00835             /* check input keys are correctly ordered */
00836             if (i < numberOfKeys && cur->sk_attno < attno)
00837                 elog(ERROR, "btree index keys must be ordered by attribute");
00838 
00839             /*
00840              * If = has been specified, all other keys can be eliminated as
00841              * redundant.  If we have a case like key = 1 AND key > 2, we can
00842              * set qual_ok to false and abandon further processing.
00843              *
00844              * We also have to deal with the case of "key IS NULL", which is
00845              * unsatisfiable in combination with any other index condition. By
00846              * the time we get here, that's been classified as an equality
00847              * check, and we've rejected any combination of it with a regular
00848              * equality condition; but not with other types of conditions.
00849              */
00850             if (xform[BTEqualStrategyNumber - 1])
00851             {
00852                 ScanKey     eq = xform[BTEqualStrategyNumber - 1];
00853 
00854                 for (j = BTMaxStrategyNumber; --j >= 0;)
00855                 {
00856                     ScanKey     chk = xform[j];
00857 
00858                     if (!chk || j == (BTEqualStrategyNumber - 1))
00859                         continue;
00860 
00861                     if (eq->sk_flags & SK_SEARCHNULL)
00862                     {
00863                         /* IS NULL is contradictory to anything else */
00864                         so->qual_ok = false;
00865                         return;
00866                     }
00867 
00868                     if (_bt_compare_scankey_args(scan, chk, eq, chk,
00869                                                  &test_result))
00870                     {
00871                         if (!test_result)
00872                         {
00873                             /* keys proven mutually contradictory */
00874                             so->qual_ok = false;
00875                             return;
00876                         }
00877                         /* else discard the redundant non-equality key */
00878                         xform[j] = NULL;
00879                     }
00880                     /* else, cannot determine redundancy, keep both keys */
00881                 }
00882                 /* track number of attrs for which we have "=" keys */
00883                 numberOfEqualCols++;
00884             }
00885 
00886             /* try to keep only one of <, <= */
00887             if (xform[BTLessStrategyNumber - 1]
00888                 && xform[BTLessEqualStrategyNumber - 1])
00889             {
00890                 ScanKey     lt = xform[BTLessStrategyNumber - 1];
00891                 ScanKey     le = xform[BTLessEqualStrategyNumber - 1];
00892 
00893                 if (_bt_compare_scankey_args(scan, le, lt, le,
00894                                              &test_result))
00895                 {
00896                     if (test_result)
00897                         xform[BTLessEqualStrategyNumber - 1] = NULL;
00898                     else
00899                         xform[BTLessStrategyNumber - 1] = NULL;
00900                 }
00901             }
00902 
00903             /* try to keep only one of >, >= */
00904             if (xform[BTGreaterStrategyNumber - 1]
00905                 && xform[BTGreaterEqualStrategyNumber - 1])
00906             {
00907                 ScanKey     gt = xform[BTGreaterStrategyNumber - 1];
00908                 ScanKey     ge = xform[BTGreaterEqualStrategyNumber - 1];
00909 
00910                 if (_bt_compare_scankey_args(scan, ge, gt, ge,
00911                                              &test_result))
00912                 {
00913                     if (test_result)
00914                         xform[BTGreaterEqualStrategyNumber - 1] = NULL;
00915                     else
00916                         xform[BTGreaterStrategyNumber - 1] = NULL;
00917                 }
00918             }
00919 
00920             /*
00921              * Emit the cleaned-up keys into the outkeys[] array, and then
00922              * mark them if they are required.  They are required (possibly
00923              * only in one direction) if all attrs before this one had "=".
00924              */
00925             for (j = BTMaxStrategyNumber; --j >= 0;)
00926             {
00927                 if (xform[j])
00928                 {
00929                     ScanKey     outkey = &outkeys[new_numberOfKeys++];
00930 
00931                     memcpy(outkey, xform[j], sizeof(ScanKeyData));
00932                     if (priorNumberOfEqualCols == attno - 1)
00933                         _bt_mark_scankey_required(outkey);
00934                 }
00935             }
00936 
00937             /*
00938              * Exit loop here if done.
00939              */
00940             if (i == numberOfKeys)
00941                 break;
00942 
00943             /* Re-initialize for new attno */
00944             attno = cur->sk_attno;
00945             memset(xform, 0, sizeof(xform));
00946         }
00947 
00948         /* check strategy this key's operator corresponds to */
00949         j = cur->sk_strategy - 1;
00950 
00951         /* if row comparison, push it directly to the output array */
00952         if (cur->sk_flags & SK_ROW_HEADER)
00953         {
00954             ScanKey     outkey = &outkeys[new_numberOfKeys++];
00955 
00956             memcpy(outkey, cur, sizeof(ScanKeyData));
00957             if (numberOfEqualCols == attno - 1)
00958                 _bt_mark_scankey_required(outkey);
00959 
00960             /*
00961              * We don't support RowCompare using equality; such a qual would
00962              * mess up the numberOfEqualCols tracking.
00963              */
00964             Assert(j != (BTEqualStrategyNumber - 1));
00965             continue;
00966         }
00967 
00968         /* have we seen one of these before? */
00969         if (xform[j] == NULL)
00970         {
00971             /* nope, so remember this scankey */
00972             xform[j] = cur;
00973         }
00974         else
00975         {
00976             /* yup, keep only the more restrictive key */
00977             if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
00978                                          &test_result))
00979             {
00980                 if (test_result)
00981                     xform[j] = cur;
00982                 else if (j == (BTEqualStrategyNumber - 1))
00983                 {
00984                     /* key == a && key == b, but a != b */
00985                     so->qual_ok = false;
00986                     return;
00987                 }
00988                 /* else old key is more restrictive, keep it */
00989             }
00990             else
00991             {
00992                 /*
00993                  * We can't determine which key is more restrictive.  Keep the
00994                  * previous one in xform[j] and push this one directly to the
00995                  * output array.
00996                  */
00997                 ScanKey     outkey = &outkeys[new_numberOfKeys++];
00998 
00999                 memcpy(outkey, cur, sizeof(ScanKeyData));
01000                 if (numberOfEqualCols == attno - 1)
01001                     _bt_mark_scankey_required(outkey);
01002             }
01003         }
01004     }
01005 
01006     so->numberOfKeys = new_numberOfKeys;
01007 }
01008 
01009 /*
01010  * Compare two scankey values using a specified operator.
01011  *
01012  * The test we want to perform is logically "leftarg op rightarg", where
01013  * leftarg and rightarg are the sk_argument values in those ScanKeys, and
01014  * the comparison operator is the one in the op ScanKey.  However, in
01015  * cross-data-type situations we may need to look up the correct operator in
01016  * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
01017  * and amoplefttype/amoprighttype equal to the two argument datatypes.
01018  *
01019  * If the opfamily doesn't supply a complete set of cross-type operators we
01020  * may not be able to make the comparison.  If we can make the comparison
01021  * we store the operator result in *result and return TRUE.  We return FALSE
01022  * if the comparison could not be made.
01023  *
01024  * Note: op always points at the same ScanKey as either leftarg or rightarg.
01025  * Since we don't scribble on the scankeys, this aliasing should cause no
01026  * trouble.
01027  *
01028  * Note: this routine needs to be insensitive to any DESC option applied
01029  * to the index column.  For example, "x < 4" is a tighter constraint than
01030  * "x < 5" regardless of which way the index is sorted.
01031  */
01032 static bool
01033 _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
01034                          ScanKey leftarg, ScanKey rightarg,
01035                          bool *result)
01036 {
01037     Relation    rel = scan->indexRelation;
01038     Oid         lefttype,
01039                 righttype,
01040                 optype,
01041                 opcintype,
01042                 cmp_op;
01043     StrategyNumber strat;
01044 
01045     /*
01046      * First, deal with cases where one or both args are NULL.  This should
01047      * only happen when the scankeys represent IS NULL/NOT NULL conditions.
01048      */
01049     if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ISNULL)
01050     {
01051         bool        leftnull,
01052                     rightnull;
01053 
01054         if (leftarg->sk_flags & SK_ISNULL)
01055         {
01056             Assert(leftarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
01057             leftnull = true;
01058         }
01059         else
01060             leftnull = false;
01061         if (rightarg->sk_flags & SK_ISNULL)
01062         {
01063             Assert(rightarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
01064             rightnull = true;
01065         }
01066         else
01067             rightnull = false;
01068 
01069         /*
01070          * We treat NULL as either greater than or less than all other values.
01071          * Since true > false, the tests below work correctly for NULLS LAST
01072          * logic.  If the index is NULLS FIRST, we need to flip the strategy.
01073          */
01074         strat = op->sk_strategy;
01075         if (op->sk_flags & SK_BT_NULLS_FIRST)
01076             strat = BTCommuteStrategyNumber(strat);
01077 
01078         switch (strat)
01079         {
01080             case BTLessStrategyNumber:
01081                 *result = (leftnull < rightnull);
01082                 break;
01083             case BTLessEqualStrategyNumber:
01084                 *result = (leftnull <= rightnull);
01085                 break;
01086             case BTEqualStrategyNumber:
01087                 *result = (leftnull == rightnull);
01088                 break;
01089             case BTGreaterEqualStrategyNumber:
01090                 *result = (leftnull >= rightnull);
01091                 break;
01092             case BTGreaterStrategyNumber:
01093                 *result = (leftnull > rightnull);
01094                 break;
01095             default:
01096                 elog(ERROR, "unrecognized StrategyNumber: %d", (int) strat);
01097                 *result = false;    /* keep compiler quiet */
01098                 break;
01099         }
01100         return true;
01101     }
01102 
01103     /*
01104      * The opfamily we need to worry about is identified by the index column.
01105      */
01106     Assert(leftarg->sk_attno == rightarg->sk_attno);
01107 
01108     opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
01109 
01110     /*
01111      * Determine the actual datatypes of the ScanKey arguments.  We have to
01112      * support the convention that sk_subtype == InvalidOid means the opclass
01113      * input type; this is a hack to simplify life for ScanKeyInit().
01114      */
01115     lefttype = leftarg->sk_subtype;
01116     if (lefttype == InvalidOid)
01117         lefttype = opcintype;
01118     righttype = rightarg->sk_subtype;
01119     if (righttype == InvalidOid)
01120         righttype = opcintype;
01121     optype = op->sk_subtype;
01122     if (optype == InvalidOid)
01123         optype = opcintype;
01124 
01125     /*
01126      * If leftarg and rightarg match the types expected for the "op" scankey,
01127      * we can use its already-looked-up comparison function.
01128      */
01129     if (lefttype == opcintype && righttype == optype)
01130     {
01131         *result = DatumGetBool(FunctionCall2Coll(&op->sk_func,
01132                                                  op->sk_collation,
01133                                                  leftarg->sk_argument,
01134                                                  rightarg->sk_argument));
01135         return true;
01136     }
01137 
01138     /*
01139      * Otherwise, we need to go to the syscache to find the appropriate
01140      * operator.  (This cannot result in infinite recursion, since no
01141      * indexscan initiated by syscache lookup will use cross-data-type
01142      * operators.)
01143      *
01144      * If the sk_strategy was flipped by _bt_fix_scankey_strategy, we have to
01145      * un-flip it to get the correct opfamily member.
01146      */
01147     strat = op->sk_strategy;
01148     if (op->sk_flags & SK_BT_DESC)
01149         strat = BTCommuteStrategyNumber(strat);
01150 
01151     cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
01152                                  lefttype,
01153                                  righttype,
01154                                  strat);
01155     if (OidIsValid(cmp_op))
01156     {
01157         RegProcedure cmp_proc = get_opcode(cmp_op);
01158 
01159         if (RegProcedureIsValid(cmp_proc))
01160         {
01161             *result = DatumGetBool(OidFunctionCall2Coll(cmp_proc,
01162                                                         op->sk_collation,
01163                                                         leftarg->sk_argument,
01164                                                      rightarg->sk_argument));
01165             return true;
01166         }
01167     }
01168 
01169     /* Can't make the comparison */
01170     *result = false;            /* suppress compiler warnings */
01171     return false;
01172 }
01173 
01174 /*
01175  * Adjust a scankey's strategy and flags setting as needed for indoptions.
01176  *
01177  * We copy the appropriate indoption value into the scankey sk_flags
01178  * (shifting to avoid clobbering system-defined flag bits).  Also, if
01179  * the DESC option is set, commute (flip) the operator strategy number.
01180  *
01181  * A secondary purpose is to check for IS NULL/NOT NULL scankeys and set up
01182  * the strategy field correctly for them.
01183  *
01184  * Lastly, for ordinary scankeys (not IS NULL/NOT NULL), we check for a
01185  * NULL comparison value.  Since all btree operators are assumed strict,
01186  * a NULL means that the qual cannot be satisfied.  We return TRUE if the
01187  * comparison value isn't NULL, or FALSE if the scan should be abandoned.
01188  *
01189  * This function is applied to the *input* scankey structure; therefore
01190  * on a rescan we will be looking at already-processed scankeys.  Hence
01191  * we have to be careful not to re-commute the strategy if we already did it.
01192  * It's a bit ugly to modify the caller's copy of the scankey but in practice
01193  * there shouldn't be any problem, since the index's indoptions are certainly
01194  * not going to change while the scankey survives.
01195  */
01196 static bool
01197 _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption)
01198 {
01199     int         addflags;
01200 
01201     addflags = indoption[skey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
01202 
01203     /*
01204      * We treat all btree operators as strict (even if they're not so marked
01205      * in pg_proc). This means that it is impossible for an operator condition
01206      * with a NULL comparison constant to succeed, and we can reject it right
01207      * away.
01208      *
01209      * However, we now also support "x IS NULL" clauses as search conditions,
01210      * so in that case keep going. The planner has not filled in any
01211      * particular strategy in this case, so set it to BTEqualStrategyNumber
01212      * --- we can treat IS NULL as an equality operator for purposes of search
01213      * strategy.
01214      *
01215      * Likewise, "x IS NOT NULL" is supported.  We treat that as either "less
01216      * than NULL" in a NULLS LAST index, or "greater than NULL" in a NULLS
01217      * FIRST index.
01218      *
01219      * Note: someday we might have to fill in sk_collation from the index
01220      * column's collation.  At the moment this is a non-issue because we'll
01221      * never actually call the comparison operator on a NULL.
01222      */
01223     if (skey->sk_flags & SK_ISNULL)
01224     {
01225         /* SK_ISNULL shouldn't be set in a row header scankey */
01226         Assert(!(skey->sk_flags & SK_ROW_HEADER));
01227 
01228         /* Set indoption flags in scankey (might be done already) */
01229         skey->sk_flags |= addflags;
01230 
01231         /* Set correct strategy for IS NULL or NOT NULL search */
01232         if (skey->sk_flags & SK_SEARCHNULL)
01233         {
01234             skey->sk_strategy = BTEqualStrategyNumber;
01235             skey->sk_subtype = InvalidOid;
01236             skey->sk_collation = InvalidOid;
01237         }
01238         else if (skey->sk_flags & SK_SEARCHNOTNULL)
01239         {
01240             if (skey->sk_flags & SK_BT_NULLS_FIRST)
01241                 skey->sk_strategy = BTGreaterStrategyNumber;
01242             else
01243                 skey->sk_strategy = BTLessStrategyNumber;
01244             skey->sk_subtype = InvalidOid;
01245             skey->sk_collation = InvalidOid;
01246         }
01247         else
01248         {
01249             /* regular qual, so it cannot be satisfied */
01250             return false;
01251         }
01252 
01253         /* Needn't do the rest */
01254         return true;
01255     }
01256 
01257     /* Adjust strategy for DESC, if we didn't already */
01258     if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC))
01259         skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy);
01260     skey->sk_flags |= addflags;
01261 
01262     /* If it's a row header, fix row member flags and strategies similarly */
01263     if (skey->sk_flags & SK_ROW_HEADER)
01264     {
01265         ScanKey     subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
01266 
01267         for (;;)
01268         {
01269             Assert(subkey->sk_flags & SK_ROW_MEMBER);
01270             addflags = indoption[subkey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
01271             if ((addflags & SK_BT_DESC) && !(subkey->sk_flags & SK_BT_DESC))
01272                 subkey->sk_strategy = BTCommuteStrategyNumber(subkey->sk_strategy);
01273             subkey->sk_flags |= addflags;
01274             if (subkey->sk_flags & SK_ROW_END)
01275                 break;
01276             subkey++;
01277         }
01278     }
01279 
01280     return true;
01281 }
01282 
01283 /*
01284  * Mark a scankey as "required to continue the scan".
01285  *
01286  * Depending on the operator type, the key may be required for both scan
01287  * directions or just one.  Also, if the key is a row comparison header,
01288  * we have to mark the appropriate subsidiary ScanKeys as required.  In
01289  * such cases, the first subsidiary key is required, but subsequent ones
01290  * are required only as long as they correspond to successive index columns
01291  * and match the leading column as to sort direction.
01292  * Otherwise the row comparison ordering is different from the index ordering
01293  * and so we can't stop the scan on the basis of those lower-order columns.
01294  *
01295  * Note: when we set required-key flag bits in a subsidiary scankey, we are
01296  * scribbling on a data structure belonging to the index AM's caller, not on
01297  * our private copy.  This should be OK because the marking will not change
01298  * from scan to scan within a query, and so we'd just re-mark the same way
01299  * anyway on a rescan.  Something to keep an eye on though.
01300  */
01301 static void
01302 _bt_mark_scankey_required(ScanKey skey)
01303 {
01304     int         addflags;
01305 
01306     switch (skey->sk_strategy)
01307     {
01308         case BTLessStrategyNumber:
01309         case BTLessEqualStrategyNumber:
01310             addflags = SK_BT_REQFWD;
01311             break;
01312         case BTEqualStrategyNumber:
01313             addflags = SK_BT_REQFWD | SK_BT_REQBKWD;
01314             break;
01315         case BTGreaterEqualStrategyNumber:
01316         case BTGreaterStrategyNumber:
01317             addflags = SK_BT_REQBKWD;
01318             break;
01319         default:
01320             elog(ERROR, "unrecognized StrategyNumber: %d",
01321                  (int) skey->sk_strategy);
01322             addflags = 0;       /* keep compiler quiet */
01323             break;
01324     }
01325 
01326     skey->sk_flags |= addflags;
01327 
01328     if (skey->sk_flags & SK_ROW_HEADER)
01329     {
01330         ScanKey     subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
01331         AttrNumber  attno = skey->sk_attno;
01332 
01333         /* First subkey should be same as the header says */
01334         Assert(subkey->sk_attno == attno);
01335 
01336         for (;;)
01337         {
01338             Assert(subkey->sk_flags & SK_ROW_MEMBER);
01339             if (subkey->sk_attno != attno)
01340                 break;          /* non-adjacent key, so not required */
01341             if (subkey->sk_strategy != skey->sk_strategy)
01342                 break;          /* wrong direction, so not required */
01343             subkey->sk_flags |= addflags;
01344             if (subkey->sk_flags & SK_ROW_END)
01345                 break;
01346             subkey++;
01347             attno++;
01348         }
01349     }
01350 }
01351 
01352 /*
01353  * Test whether an indextuple satisfies all the scankey conditions.
01354  *
01355  * If so, return the address of the index tuple on the index page.
01356  * If not, return NULL.
01357  *
01358  * If the tuple fails to pass the qual, we also determine whether there's
01359  * any need to continue the scan beyond this tuple, and set *continuescan
01360  * accordingly.  See comments for _bt_preprocess_keys(), above, about how
01361  * this is done.
01362  *
01363  * scan: index scan descriptor (containing a search-type scankey)
01364  * page: buffer page containing index tuple
01365  * offnum: offset number of index tuple (must be a valid item!)
01366  * dir: direction we are scanning in
01367  * continuescan: output parameter (will be set correctly in all cases)
01368  *
01369  * Caller must hold pin and lock on the index page.
01370  */
01371 IndexTuple
01372 _bt_checkkeys(IndexScanDesc scan,
01373               Page page, OffsetNumber offnum,
01374               ScanDirection dir, bool *continuescan)
01375 {
01376     ItemId      iid = PageGetItemId(page, offnum);
01377     bool        tuple_alive;
01378     IndexTuple  tuple;
01379     TupleDesc   tupdesc;
01380     BTScanOpaque so;
01381     int         keysz;
01382     int         ikey;
01383     ScanKey     key;
01384 
01385     *continuescan = true;       /* default assumption */
01386 
01387     /*
01388      * If the scan specifies not to return killed tuples, then we treat a
01389      * killed tuple as not passing the qual.  Most of the time, it's a win to
01390      * not bother examining the tuple's index keys, but just return
01391      * immediately with continuescan = true to proceed to the next tuple.
01392      * However, if this is the last tuple on the page, we should check the
01393      * index keys to prevent uselessly advancing to the next page.
01394      */
01395     if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
01396     {
01397         /* return immediately if there are more tuples on the page */
01398         if (ScanDirectionIsForward(dir))
01399         {
01400             if (offnum < PageGetMaxOffsetNumber(page))
01401                 return NULL;
01402         }
01403         else
01404         {
01405             BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
01406 
01407             if (offnum > P_FIRSTDATAKEY(opaque))
01408                 return NULL;
01409         }
01410 
01411         /*
01412          * OK, we want to check the keys so we can set continuescan correctly,
01413          * but we'll return NULL even if the tuple passes the key tests.
01414          */
01415         tuple_alive = false;
01416     }
01417     else
01418         tuple_alive = true;
01419 
01420     tuple = (IndexTuple) PageGetItem(page, iid);
01421 
01422     tupdesc = RelationGetDescr(scan->indexRelation);
01423     so = (BTScanOpaque) scan->opaque;
01424     keysz = so->numberOfKeys;
01425 
01426     for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
01427     {
01428         Datum       datum;
01429         bool        isNull;
01430         Datum       test;
01431 
01432         /* row-comparison keys need special processing */
01433         if (key->sk_flags & SK_ROW_HEADER)
01434         {
01435             if (_bt_check_rowcompare(key, tuple, tupdesc, dir, continuescan))
01436                 continue;
01437             return NULL;
01438         }
01439 
01440         datum = index_getattr(tuple,
01441                               key->sk_attno,
01442                               tupdesc,
01443                               &isNull);
01444 
01445         if (key->sk_flags & SK_ISNULL)
01446         {
01447             /* Handle IS NULL/NOT NULL tests */
01448             if (key->sk_flags & SK_SEARCHNULL)
01449             {
01450                 if (isNull)
01451                     continue;   /* tuple satisfies this qual */
01452             }
01453             else
01454             {
01455                 Assert(key->sk_flags & SK_SEARCHNOTNULL);
01456                 if (!isNull)
01457                     continue;   /* tuple satisfies this qual */
01458             }
01459 
01460             /*
01461              * Tuple fails this qual.  If it's a required qual for the current
01462              * scan direction, then we can conclude no further tuples will
01463              * pass, either.
01464              */
01465             if ((key->sk_flags & SK_BT_REQFWD) &&
01466                 ScanDirectionIsForward(dir))
01467                 *continuescan = false;
01468             else if ((key->sk_flags & SK_BT_REQBKWD) &&
01469                      ScanDirectionIsBackward(dir))
01470                 *continuescan = false;
01471 
01472             /*
01473              * In any case, this indextuple doesn't match the qual.
01474              */
01475             return NULL;
01476         }
01477 
01478         if (isNull)
01479         {
01480             if (key->sk_flags & SK_BT_NULLS_FIRST)
01481             {
01482                 /*
01483                  * Since NULLs are sorted before non-NULLs, we know we have
01484                  * reached the lower limit of the range of values for this
01485                  * index attr.  On a backward scan, we can stop if this qual
01486                  * is one of the "must match" subset.  We can stop regardless
01487                  * of whether the qual is > or <, so long as it's required,
01488                  * because it's not possible for any future tuples to pass. On
01489                  * a forward scan, however, we must keep going, because we may
01490                  * have initially positioned to the start of the index.
01491                  */
01492                 if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
01493                     ScanDirectionIsBackward(dir))
01494                     *continuescan = false;
01495             }
01496             else
01497             {
01498                 /*
01499                  * Since NULLs are sorted after non-NULLs, we know we have
01500                  * reached the upper limit of the range of values for this
01501                  * index attr.  On a forward scan, we can stop if this qual is
01502                  * one of the "must match" subset.  We can stop regardless of
01503                  * whether the qual is > or <, so long as it's required,
01504                  * because it's not possible for any future tuples to pass. On
01505                  * a backward scan, however, we must keep going, because we
01506                  * may have initially positioned to the end of the index.
01507                  */
01508                 if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
01509                     ScanDirectionIsForward(dir))
01510                     *continuescan = false;
01511             }
01512 
01513             /*
01514              * In any case, this indextuple doesn't match the qual.
01515              */
01516             return NULL;
01517         }
01518 
01519         test = FunctionCall2Coll(&key->sk_func, key->sk_collation,
01520                                  datum, key->sk_argument);
01521 
01522         if (!DatumGetBool(test))
01523         {
01524             /*
01525              * Tuple fails this qual.  If it's a required qual for the current
01526              * scan direction, then we can conclude no further tuples will
01527              * pass, either.
01528              *
01529              * Note: because we stop the scan as soon as any required equality
01530              * qual fails, it is critical that equality quals be used for the
01531              * initial positioning in _bt_first() when they are available. See
01532              * comments in _bt_first().
01533              */
01534             if ((key->sk_flags & SK_BT_REQFWD) &&
01535                 ScanDirectionIsForward(dir))
01536                 *continuescan = false;
01537             else if ((key->sk_flags & SK_BT_REQBKWD) &&
01538                      ScanDirectionIsBackward(dir))
01539                 *continuescan = false;
01540 
01541             /*
01542              * In any case, this indextuple doesn't match the qual.
01543              */
01544             return NULL;
01545         }
01546     }
01547 
01548     /* Check for failure due to it being a killed tuple. */
01549     if (!tuple_alive)
01550         return NULL;
01551 
01552     /* If we get here, the tuple passes all index quals. */
01553     return tuple;
01554 }
01555 
01556 /*
01557  * Test whether an indextuple satisfies a row-comparison scan condition.
01558  *
01559  * Return true if so, false if not.  If not, also clear *continuescan if
01560  * it's not possible for any future tuples in the current scan direction
01561  * to pass the qual.
01562  *
01563  * This is a subroutine for _bt_checkkeys, which see for more info.
01564  */
01565 static bool
01566 _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc,
01567                      ScanDirection dir, bool *continuescan)
01568 {
01569     ScanKey     subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
01570     int32       cmpresult = 0;
01571     bool        result;
01572 
01573     /* First subkey should be same as the header says */
01574     Assert(subkey->sk_attno == skey->sk_attno);
01575 
01576     /* Loop over columns of the row condition */
01577     for (;;)
01578     {
01579         Datum       datum;
01580         bool        isNull;
01581 
01582         Assert(subkey->sk_flags & SK_ROW_MEMBER);
01583 
01584         datum = index_getattr(tuple,
01585                               subkey->sk_attno,
01586                               tupdesc,
01587                               &isNull);
01588 
01589         if (isNull)
01590         {
01591             if (subkey->sk_flags & SK_BT_NULLS_FIRST)
01592             {
01593                 /*
01594                  * Since NULLs are sorted before non-NULLs, we know we have
01595                  * reached the lower limit of the range of values for this
01596                  * index attr.  On a backward scan, we can stop if this qual
01597                  * is one of the "must match" subset.  We can stop regardless
01598                  * of whether the qual is > or <, so long as it's required,
01599                  * because it's not possible for any future tuples to pass. On
01600                  * a forward scan, however, we must keep going, because we may
01601                  * have initially positioned to the start of the index.
01602                  */
01603                 if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
01604                     ScanDirectionIsBackward(dir))
01605                     *continuescan = false;
01606             }
01607             else
01608             {
01609                 /*
01610                  * Since NULLs are sorted after non-NULLs, we know we have
01611                  * reached the upper limit of the range of values for this
01612                  * index attr.  On a forward scan, we can stop if this qual is
01613                  * one of the "must match" subset.  We can stop regardless of
01614                  * whether the qual is > or <, so long as it's required,
01615                  * because it's not possible for any future tuples to pass. On
01616                  * a backward scan, however, we must keep going, because we
01617                  * may have initially positioned to the end of the index.
01618                  */
01619                 if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
01620                     ScanDirectionIsForward(dir))
01621                     *continuescan = false;
01622             }
01623 
01624             /*
01625              * In any case, this indextuple doesn't match the qual.
01626              */
01627             return false;
01628         }
01629 
01630         if (subkey->sk_flags & SK_ISNULL)
01631         {
01632             /*
01633              * Unlike the simple-scankey case, this isn't a disallowed case.
01634              * But it can never match.  If all the earlier row comparison
01635              * columns are required for the scan direction, we can stop the
01636              * scan, because there can't be another tuple that will succeed.
01637              */
01638             if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument))
01639                 subkey--;
01640             if ((subkey->sk_flags & SK_BT_REQFWD) &&
01641                 ScanDirectionIsForward(dir))
01642                 *continuescan = false;
01643             else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
01644                      ScanDirectionIsBackward(dir))
01645                 *continuescan = false;
01646             return false;
01647         }
01648 
01649         /* Perform the test --- three-way comparison not bool operator */
01650         cmpresult = DatumGetInt32(FunctionCall2Coll(&subkey->sk_func,
01651                                                     subkey->sk_collation,
01652                                                     datum,
01653                                                     subkey->sk_argument));
01654 
01655         if (subkey->sk_flags & SK_BT_DESC)
01656             cmpresult = -cmpresult;
01657 
01658         /* Done comparing if unequal, else advance to next column */
01659         if (cmpresult != 0)
01660             break;
01661 
01662         if (subkey->sk_flags & SK_ROW_END)
01663             break;
01664         subkey++;
01665     }
01666 
01667     /*
01668      * At this point cmpresult indicates the overall result of the row
01669      * comparison, and subkey points to the deciding column (or the last
01670      * column if the result is "=").
01671      */
01672     switch (subkey->sk_strategy)
01673     {
01674             /* EQ and NE cases aren't allowed here */
01675         case BTLessStrategyNumber:
01676             result = (cmpresult < 0);
01677             break;
01678         case BTLessEqualStrategyNumber:
01679             result = (cmpresult <= 0);
01680             break;
01681         case BTGreaterEqualStrategyNumber:
01682             result = (cmpresult >= 0);
01683             break;
01684         case BTGreaterStrategyNumber:
01685             result = (cmpresult > 0);
01686             break;
01687         default:
01688             elog(ERROR, "unrecognized RowCompareType: %d",
01689                  (int) subkey->sk_strategy);
01690             result = 0;         /* keep compiler quiet */
01691             break;
01692     }
01693 
01694     if (!result)
01695     {
01696         /*
01697          * Tuple fails this qual.  If it's a required qual for the current
01698          * scan direction, then we can conclude no further tuples will pass,
01699          * either.  Note we have to look at the deciding column, not
01700          * necessarily the first or last column of the row condition.
01701          */
01702         if ((subkey->sk_flags & SK_BT_REQFWD) &&
01703             ScanDirectionIsForward(dir))
01704             *continuescan = false;
01705         else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
01706                  ScanDirectionIsBackward(dir))
01707             *continuescan = false;
01708     }
01709 
01710     return result;
01711 }
01712 
01713 /*
01714  * _bt_killitems - set LP_DEAD state for items an indexscan caller has
01715  * told us were killed
01716  *
01717  * scan->so contains information about the current page and killed tuples
01718  * thereon (generally, this should only be called if so->numKilled > 0).
01719  *
01720  * The caller must have pin on so->currPos.buf, but may or may not have
01721  * read-lock, as indicated by haveLock.  Note that we assume read-lock
01722  * is sufficient for setting LP_DEAD status (which is only a hint).
01723  *
01724  * We match items by heap TID before assuming they are the right ones to
01725  * delete.  We cope with cases where items have moved right due to insertions.
01726  * If an item has moved off the current page due to a split, we'll fail to
01727  * find it and do nothing (this is not an error case --- we assume the item
01728  * will eventually get marked in a future indexscan).  Note that because we
01729  * hold pin on the target page continuously from initially reading the items
01730  * until applying this function, VACUUM cannot have deleted any items from
01731  * the page, and so there is no need to search left from the recorded offset.
01732  * (This observation also guarantees that the item is still the right one
01733  * to delete, which might otherwise be questionable since heap TIDs can get
01734  * recycled.)
01735  */
01736 void
01737 _bt_killitems(IndexScanDesc scan, bool haveLock)
01738 {
01739     BTScanOpaque so = (BTScanOpaque) scan->opaque;
01740     Page        page;
01741     BTPageOpaque opaque;
01742     OffsetNumber minoff;
01743     OffsetNumber maxoff;
01744     int         i;
01745     bool        killedsomething = false;
01746 
01747     Assert(BufferIsValid(so->currPos.buf));
01748 
01749     if (!haveLock)
01750         LockBuffer(so->currPos.buf, BT_READ);
01751 
01752     page = BufferGetPage(so->currPos.buf);
01753     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
01754     minoff = P_FIRSTDATAKEY(opaque);
01755     maxoff = PageGetMaxOffsetNumber(page);
01756 
01757     for (i = 0; i < so->numKilled; i++)
01758     {
01759         int         itemIndex = so->killedItems[i];
01760         BTScanPosItem *kitem = &so->currPos.items[itemIndex];
01761         OffsetNumber offnum = kitem->indexOffset;
01762 
01763         Assert(itemIndex >= so->currPos.firstItem &&
01764                itemIndex <= so->currPos.lastItem);
01765         if (offnum < minoff)
01766             continue;           /* pure paranoia */
01767         while (offnum <= maxoff)
01768         {
01769             ItemId      iid = PageGetItemId(page, offnum);
01770             IndexTuple  ituple = (IndexTuple) PageGetItem(page, iid);
01771 
01772             if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
01773             {
01774                 /* found the item */
01775                 ItemIdMarkDead(iid);
01776                 killedsomething = true;
01777                 break;          /* out of inner search loop */
01778             }
01779             offnum = OffsetNumberNext(offnum);
01780         }
01781     }
01782 
01783     /*
01784      * Since this can be redone later if needed, mark as dirty hint.
01785      *
01786      * Whenever we mark anything LP_DEAD, we also set the page's
01787      * BTP_HAS_GARBAGE flag, which is likewise just a hint.
01788      */
01789     if (killedsomething)
01790     {
01791         opaque->btpo_flags |= BTP_HAS_GARBAGE;
01792         MarkBufferDirtyHint(so->currPos.buf);
01793     }
01794 
01795     if (!haveLock)
01796         LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
01797 
01798     /*
01799      * Always reset the scan state, so we don't look for same items on other
01800      * pages.
01801      */
01802     so->numKilled = 0;
01803 }
01804 
01805 
01806 /*
01807  * The following routines manage a shared-memory area in which we track
01808  * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
01809  * operations.  There is a single counter which increments each time we
01810  * start a vacuum to assign it a cycle ID.  Since multiple vacuums could
01811  * be active concurrently, we have to track the cycle ID for each active
01812  * vacuum; this requires at most MaxBackends entries (usually far fewer).
01813  * We assume at most one vacuum can be active for a given index.
01814  *
01815  * Access to the shared memory area is controlled by BtreeVacuumLock.
01816  * In principle we could use a separate lmgr locktag for each index,
01817  * but a single LWLock is much cheaper, and given the short time that
01818  * the lock is ever held, the concurrency hit should be minimal.
01819  */
01820 
01821 typedef struct BTOneVacInfo
01822 {
01823     LockRelId   relid;          /* global identifier of an index */
01824     BTCycleId   cycleid;        /* cycle ID for its active VACUUM */
01825 } BTOneVacInfo;
01826 
01827 typedef struct BTVacInfo
01828 {
01829     BTCycleId   cycle_ctr;      /* cycle ID most recently assigned */
01830     int         num_vacuums;    /* number of currently active VACUUMs */
01831     int         max_vacuums;    /* allocated length of vacuums[] array */
01832     BTOneVacInfo vacuums[1];    /* VARIABLE LENGTH ARRAY */
01833 } BTVacInfo;
01834 
01835 static BTVacInfo *btvacinfo;
01836 
01837 
01838 /*
01839  * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
01840  *      or zero if there is no active VACUUM
01841  *
01842  * Note: for correct interlocking, the caller must already hold pin and
01843  * exclusive lock on each buffer it will store the cycle ID into.  This
01844  * ensures that even if a VACUUM starts immediately afterwards, it cannot
01845  * process those pages until the page split is complete.
01846  */
01847 BTCycleId
01848 _bt_vacuum_cycleid(Relation rel)
01849 {
01850     BTCycleId   result = 0;
01851     int         i;
01852 
01853     /* Share lock is enough since this is a read-only operation */
01854     LWLockAcquire(BtreeVacuumLock, LW_SHARED);
01855 
01856     for (i = 0; i < btvacinfo->num_vacuums; i++)
01857     {
01858         BTOneVacInfo *vac = &btvacinfo->vacuums[i];
01859 
01860         if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
01861             vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
01862         {
01863             result = vac->cycleid;
01864             break;
01865         }
01866     }
01867 
01868     LWLockRelease(BtreeVacuumLock);
01869     return result;
01870 }
01871 
01872 /*
01873  * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
01874  *
01875  * Note: the caller must guarantee that it will eventually call
01876  * _bt_end_vacuum, else we'll permanently leak an array slot.  To ensure
01877  * that this happens even in elog(FATAL) scenarios, the appropriate coding
01878  * is not just a PG_TRY, but
01879  *      PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel))
01880  */
01881 BTCycleId
01882 _bt_start_vacuum(Relation rel)
01883 {
01884     BTCycleId   result;
01885     int         i;
01886     BTOneVacInfo *vac;
01887 
01888     LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
01889 
01890     /*
01891      * Assign the next cycle ID, being careful to avoid zero as well as the
01892      * reserved high values.
01893      */
01894     result = ++(btvacinfo->cycle_ctr);
01895     if (result == 0 || result > MAX_BT_CYCLE_ID)
01896         result = btvacinfo->cycle_ctr = 1;
01897 
01898     /* Let's just make sure there's no entry already for this index */
01899     for (i = 0; i < btvacinfo->num_vacuums; i++)
01900     {
01901         vac = &btvacinfo->vacuums[i];
01902         if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
01903             vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
01904         {
01905             /*
01906              * Unlike most places in the backend, we have to explicitly
01907              * release our LWLock before throwing an error.  This is because
01908              * we expect _bt_end_vacuum() to be called before transaction
01909              * abort cleanup can run to release LWLocks.
01910              */
01911             LWLockRelease(BtreeVacuumLock);
01912             elog(ERROR, "multiple active vacuums for index \"%s\"",
01913                  RelationGetRelationName(rel));
01914         }
01915     }
01916 
01917     /* OK, add an entry */
01918     if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
01919     {
01920         LWLockRelease(BtreeVacuumLock);
01921         elog(ERROR, "out of btvacinfo slots");
01922     }
01923     vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
01924     vac->relid = rel->rd_lockInfo.lockRelId;
01925     vac->cycleid = result;
01926     btvacinfo->num_vacuums++;
01927 
01928     LWLockRelease(BtreeVacuumLock);
01929     return result;
01930 }
01931 
01932 /*
01933  * _bt_end_vacuum --- mark a btree VACUUM operation as done
01934  *
01935  * Note: this is deliberately coded not to complain if no entry is found;
01936  * this allows the caller to put PG_TRY around the start_vacuum operation.
01937  */
01938 void
01939 _bt_end_vacuum(Relation rel)
01940 {
01941     int         i;
01942 
01943     LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
01944 
01945     /* Find the array entry */
01946     for (i = 0; i < btvacinfo->num_vacuums; i++)
01947     {
01948         BTOneVacInfo *vac = &btvacinfo->vacuums[i];
01949 
01950         if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
01951             vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
01952         {
01953             /* Remove it by shifting down the last entry */
01954             *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
01955             btvacinfo->num_vacuums--;
01956             break;
01957         }
01958     }
01959 
01960     LWLockRelease(BtreeVacuumLock);
01961 }
01962 
01963 /*
01964  * _bt_end_vacuum wrapped as an on_shmem_exit callback function
01965  */
01966 void
01967 _bt_end_vacuum_callback(int code, Datum arg)
01968 {
01969     _bt_end_vacuum((Relation) DatumGetPointer(arg));
01970 }
01971 
01972 /*
01973  * BTreeShmemSize --- report amount of shared memory space needed
01974  */
01975 Size
01976 BTreeShmemSize(void)
01977 {
01978     Size        size;
01979 
01980     size = offsetof(BTVacInfo, vacuums[0]);
01981     size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
01982     return size;
01983 }
01984 
01985 /*
01986  * BTreeShmemInit --- initialize this module's shared memory
01987  */
01988 void
01989 BTreeShmemInit(void)
01990 {
01991     bool        found;
01992 
01993     btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
01994                                               BTreeShmemSize(),
01995                                               &found);
01996 
01997     if (!IsUnderPostmaster)
01998     {
01999         /* Initialize shared memory area */
02000         Assert(!found);
02001 
02002         /*
02003          * It doesn't really matter what the cycle counter starts at, but
02004          * having it always start the same doesn't seem good.  Seed with
02005          * low-order bits of time() instead.
02006          */
02007         btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
02008 
02009         btvacinfo->num_vacuums = 0;
02010         btvacinfo->max_vacuums = MaxBackends;
02011     }
02012     else
02013         Assert(found);
02014 }
02015 
02016 Datum
02017 btoptions(PG_FUNCTION_ARGS)
02018 {
02019     Datum       reloptions = PG_GETARG_DATUM(0);
02020     bool        validate = PG_GETARG_BOOL(1);
02021     bytea      *result;
02022 
02023     result = default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
02024     if (result)
02025         PG_RETURN_BYTEA_P(result);
02026     PG_RETURN_NULL();
02027 }