Header And Logo

PostgreSQL
| The world's most advanced open source database.

spgtextproc.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * spgtextproc.c
00004  *    implementation of compressed-suffix tree over text
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *          src/backend/access/spgist/spgtextproc.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include "access/spgist.h"
00018 #include "catalog/pg_type.h"
00019 #include "mb/pg_wchar.h"
00020 #include "utils/builtins.h"
00021 #include "utils/datum.h"
00022 #include "utils/pg_locale.h"
00023 
00024 
00025 /*
00026  * In the worst case, a inner tuple in a text suffix tree could have as many
00027  * as 256 nodes (one for each possible byte value).  Each node can take 16
00028  * bytes on MAXALIGN=8 machines.  The inner tuple must fit on an index page
00029  * of size BLCKSZ.  Rather than assuming we know the exact amount of overhead
00030  * imposed by page headers, tuple headers, etc, we leave 100 bytes for that
00031  * (the actual overhead should be no more than 56 bytes at this writing, so
00032  * there is slop in this number).  So we can safely create prefixes up to
00033  * BLCKSZ - 256 * 16 - 100 bytes long.  Unfortunately, because 256 * 16 is
00034  * already 4K, there is no safe prefix length when BLCKSZ is less than 8K;
00035  * it is always possible to get "SPGiST inner tuple size exceeds maximum"
00036  * if there are too many distinct next-byte values at a given place in the
00037  * tree.  Since use of nonstandard block sizes appears to be negligible in
00038  * the field, we just live with that fact for now, choosing a max prefix
00039  * size of 32 bytes when BLCKSZ is configured smaller than default.
00040  */
00041 #define SPGIST_MAX_PREFIX_LENGTH    Max((int) (BLCKSZ - 256 * 16 - 100), 32)
00042 
00043 /* Struct for sorting values in picksplit */
00044 typedef struct spgNodePtr
00045 {
00046     Datum       d;
00047     int         i;
00048     uint8       c;
00049 } spgNodePtr;
00050 
00051 
00052 Datum
00053 spg_text_config(PG_FUNCTION_ARGS)
00054 {
00055     /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
00056     spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
00057 
00058     cfg->prefixType = TEXTOID;
00059     cfg->labelType = CHAROID;
00060     cfg->canReturnData = true;
00061     cfg->longValuesOK = true;   /* suffixing will shorten long values */
00062     PG_RETURN_VOID();
00063 }
00064 
00065 /*
00066  * Form a text datum from the given not-necessarily-null-terminated string,
00067  * using short varlena header format if possible
00068  */
00069 static Datum
00070 formTextDatum(const char *data, int datalen)
00071 {
00072     char       *p;
00073 
00074     p = (char *) palloc(datalen + VARHDRSZ);
00075 
00076     if (datalen + VARHDRSZ_SHORT <= VARATT_SHORT_MAX)
00077     {
00078         SET_VARSIZE_SHORT(p, datalen + VARHDRSZ_SHORT);
00079         if (datalen)
00080             memcpy(p + VARHDRSZ_SHORT, data, datalen);
00081     }
00082     else
00083     {
00084         SET_VARSIZE(p, datalen + VARHDRSZ);
00085         memcpy(p + VARHDRSZ, data, datalen);
00086     }
00087 
00088     return PointerGetDatum(p);
00089 }
00090 
00091 /*
00092  * Find the length of the common prefix of a and b
00093  */
00094 static int
00095 commonPrefix(const char *a, const char *b, int lena, int lenb)
00096 {
00097     int         i = 0;
00098 
00099     while (i < lena && i < lenb && *a == *b)
00100     {
00101         a++;
00102         b++;
00103         i++;
00104     }
00105 
00106     return i;
00107 }
00108 
00109 /*
00110  * Binary search an array of uint8 datums for a match to c
00111  *
00112  * On success, *i gets the match location; on failure, it gets where to insert
00113  */
00114 static bool
00115 searchChar(Datum *nodeLabels, int nNodes, uint8 c, int *i)
00116 {
00117     int         StopLow = 0,
00118                 StopHigh = nNodes;
00119 
00120     while (StopLow < StopHigh)
00121     {
00122         int         StopMiddle = (StopLow + StopHigh) >> 1;
00123         uint8       middle = DatumGetUInt8(nodeLabels[StopMiddle]);
00124 
00125         if (c < middle)
00126             StopHigh = StopMiddle;
00127         else if (c > middle)
00128             StopLow = StopMiddle + 1;
00129         else
00130         {
00131             *i = StopMiddle;
00132             return true;
00133         }
00134     }
00135 
00136     *i = StopHigh;
00137     return false;
00138 }
00139 
00140 Datum
00141 spg_text_choose(PG_FUNCTION_ARGS)
00142 {
00143     spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
00144     spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
00145     text       *inText = DatumGetTextPP(in->datum);
00146     char       *inStr = VARDATA_ANY(inText);
00147     int         inSize = VARSIZE_ANY_EXHDR(inText);
00148     uint8       nodeChar = '\0';
00149     int         i = 0;
00150     int         commonLen = 0;
00151 
00152     /* Check for prefix match, set nodeChar to first byte after prefix */
00153     if (in->hasPrefix)
00154     {
00155         text       *prefixText = DatumGetTextPP(in->prefixDatum);
00156         char       *prefixStr = VARDATA_ANY(prefixText);
00157         int         prefixSize = VARSIZE_ANY_EXHDR(prefixText);
00158 
00159         commonLen = commonPrefix(inStr + in->level,
00160                                  prefixStr,
00161                                  inSize - in->level,
00162                                  prefixSize);
00163 
00164         if (commonLen == prefixSize)
00165         {
00166             if (inSize - in->level > commonLen)
00167                 nodeChar = *(uint8 *) (inStr + in->level + commonLen);
00168             else
00169                 nodeChar = '\0';
00170         }
00171         else
00172         {
00173             /* Must split tuple because incoming value doesn't match prefix */
00174             out->resultType = spgSplitTuple;
00175 
00176             if (commonLen == 0)
00177             {
00178                 out->result.splitTuple.prefixHasPrefix = false;
00179             }
00180             else
00181             {
00182                 out->result.splitTuple.prefixHasPrefix = true;
00183                 out->result.splitTuple.prefixPrefixDatum =
00184                     formTextDatum(prefixStr, commonLen);
00185             }
00186             out->result.splitTuple.nodeLabel =
00187                 UInt8GetDatum(*(prefixStr + commonLen));
00188 
00189             if (prefixSize - commonLen == 1)
00190             {
00191                 out->result.splitTuple.postfixHasPrefix = false;
00192             }
00193             else
00194             {
00195                 out->result.splitTuple.postfixHasPrefix = true;
00196                 out->result.splitTuple.postfixPrefixDatum =
00197                     formTextDatum(prefixStr + commonLen + 1,
00198                                   prefixSize - commonLen - 1);
00199             }
00200 
00201             PG_RETURN_VOID();
00202         }
00203     }
00204     else if (inSize > in->level)
00205     {
00206         nodeChar = *(uint8 *) (inStr + in->level);
00207     }
00208     else
00209     {
00210         nodeChar = '\0';
00211     }
00212 
00213     /* Look up nodeChar in the node label array */
00214     if (searchChar(in->nodeLabels, in->nNodes, nodeChar, &i))
00215     {
00216         /*
00217          * Descend to existing node.  (If in->allTheSame, the core code will
00218          * ignore our nodeN specification here, but that's OK.  We still have
00219          * to provide the correct levelAdd and restDatum values, and those are
00220          * the same regardless of which node gets chosen by core.)
00221          */
00222         out->resultType = spgMatchNode;
00223         out->result.matchNode.nodeN = i;
00224         out->result.matchNode.levelAdd = commonLen + 1;
00225         if (inSize - in->level - commonLen - 1 > 0)
00226             out->result.matchNode.restDatum =
00227                 formTextDatum(inStr + in->level + commonLen + 1,
00228                               inSize - in->level - commonLen - 1);
00229         else
00230             out->result.matchNode.restDatum =
00231                 formTextDatum(NULL, 0);
00232     }
00233     else if (in->allTheSame)
00234     {
00235         /*
00236          * Can't use AddNode action, so split the tuple.  The upper tuple has
00237          * the same prefix as before and uses an empty node label for the
00238          * lower tuple.  The lower tuple has no prefix and the same node
00239          * labels as the original tuple.
00240          */
00241         out->resultType = spgSplitTuple;
00242         out->result.splitTuple.prefixHasPrefix = in->hasPrefix;
00243         out->result.splitTuple.prefixPrefixDatum = in->prefixDatum;
00244         out->result.splitTuple.nodeLabel = UInt8GetDatum('\0');
00245         out->result.splitTuple.postfixHasPrefix = false;
00246     }
00247     else
00248     {
00249         /* Add a node for the not-previously-seen nodeChar value */
00250         out->resultType = spgAddNode;
00251         out->result.addNode.nodeLabel = UInt8GetDatum(nodeChar);
00252         out->result.addNode.nodeN = i;
00253     }
00254 
00255     PG_RETURN_VOID();
00256 }
00257 
00258 /* qsort comparator to sort spgNodePtr structs by "c" */
00259 static int
00260 cmpNodePtr(const void *a, const void *b)
00261 {
00262     const spgNodePtr *aa = (const spgNodePtr *) a;
00263     const spgNodePtr *bb = (const spgNodePtr *) b;
00264 
00265     if (aa->c == bb->c)
00266         return 0;
00267     else if (aa->c > bb->c)
00268         return 1;
00269     else
00270         return -1;
00271 }
00272 
00273 Datum
00274 spg_text_picksplit(PG_FUNCTION_ARGS)
00275 {
00276     spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
00277     spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
00278     text       *text0 = DatumGetTextPP(in->datums[0]);
00279     int         i,
00280                 commonLen;
00281     spgNodePtr *nodes;
00282 
00283     /* Identify longest common prefix, if any */
00284     commonLen = VARSIZE_ANY_EXHDR(text0);
00285     for (i = 1; i < in->nTuples && commonLen > 0; i++)
00286     {
00287         text       *texti = DatumGetTextPP(in->datums[i]);
00288         int         tmp = commonPrefix(VARDATA_ANY(text0),
00289                                        VARDATA_ANY(texti),
00290                                        VARSIZE_ANY_EXHDR(text0),
00291                                        VARSIZE_ANY_EXHDR(texti));
00292 
00293         if (tmp < commonLen)
00294             commonLen = tmp;
00295     }
00296 
00297     /*
00298      * Limit the prefix length, if necessary, to ensure that the resulting
00299      * inner tuple will fit on a page.
00300      */
00301     commonLen = Min(commonLen, SPGIST_MAX_PREFIX_LENGTH);
00302 
00303     /* Set node prefix to be that string, if it's not empty */
00304     if (commonLen == 0)
00305     {
00306         out->hasPrefix = false;
00307     }
00308     else
00309     {
00310         out->hasPrefix = true;
00311         out->prefixDatum = formTextDatum(VARDATA_ANY(text0), commonLen);
00312     }
00313 
00314     /* Extract the node label (first non-common byte) from each value */
00315     nodes = (spgNodePtr *) palloc(sizeof(spgNodePtr) * in->nTuples);
00316 
00317     for (i = 0; i < in->nTuples; i++)
00318     {
00319         text       *texti = DatumGetTextPP(in->datums[i]);
00320 
00321         if (commonLen < VARSIZE_ANY_EXHDR(texti))
00322             nodes[i].c = *(uint8 *) (VARDATA_ANY(texti) + commonLen);
00323         else
00324             nodes[i].c = '\0';  /* use \0 if string is all common */
00325         nodes[i].i = i;
00326         nodes[i].d = in->datums[i];
00327     }
00328 
00329     /*
00330      * Sort by label bytes so that we can group the values into nodes.  This
00331      * also ensures that the nodes are ordered by label value, allowing the
00332      * use of binary search in searchChar.
00333      */
00334     qsort(nodes, in->nTuples, sizeof(*nodes), cmpNodePtr);
00335 
00336     /* And emit results */
00337     out->nNodes = 0;
00338     out->nodeLabels = (Datum *) palloc(sizeof(Datum) * in->nTuples);
00339     out->mapTuplesToNodes = (int *) palloc(sizeof(int) * in->nTuples);
00340     out->leafTupleDatums = (Datum *) palloc(sizeof(Datum) * in->nTuples);
00341 
00342     for (i = 0; i < in->nTuples; i++)
00343     {
00344         text       *texti = DatumGetTextPP(nodes[i].d);
00345         Datum       leafD;
00346 
00347         if (i == 0 || nodes[i].c != nodes[i - 1].c)
00348         {
00349             out->nodeLabels[out->nNodes] = UInt8GetDatum(nodes[i].c);
00350             out->nNodes++;
00351         }
00352 
00353         if (commonLen < VARSIZE_ANY_EXHDR(texti))
00354             leafD = formTextDatum(VARDATA_ANY(texti) + commonLen + 1,
00355                                   VARSIZE_ANY_EXHDR(texti) - commonLen - 1);
00356         else
00357             leafD = formTextDatum(NULL, 0);
00358 
00359         out->leafTupleDatums[nodes[i].i] = leafD;
00360         out->mapTuplesToNodes[nodes[i].i] = out->nNodes - 1;
00361     }
00362 
00363     PG_RETURN_VOID();
00364 }
00365 
00366 Datum
00367 spg_text_inner_consistent(PG_FUNCTION_ARGS)
00368 {
00369     spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
00370     spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
00371     bool        collate_is_c = lc_collate_is_c(PG_GET_COLLATION());
00372     text       *reconstrText = NULL;
00373     int         maxReconstrLen = 0;
00374     text       *prefixText = NULL;
00375     int         prefixSize = 0;
00376     int         i;
00377 
00378     /*
00379      * Reconstruct values represented at this tuple, including parent data,
00380      * prefix of this tuple if any, and the node label if any.  in->level
00381      * should be the length of the previously reconstructed value, and the
00382      * number of bytes added here is prefixSize or prefixSize + 1.
00383      *
00384      * Note: we assume that in->reconstructedValue isn't toasted and doesn't
00385      * have a short varlena header.  This is okay because it must have been
00386      * created by a previous invocation of this routine, and we always emit
00387      * long-format reconstructed values.
00388      */
00389     Assert(in->level == 0 ? DatumGetPointer(in->reconstructedValue) == NULL :
00390     VARSIZE_ANY_EXHDR(DatumGetPointer(in->reconstructedValue)) == in->level);
00391 
00392     maxReconstrLen = in->level + 1;
00393     if (in->hasPrefix)
00394     {
00395         prefixText = DatumGetTextPP(in->prefixDatum);
00396         prefixSize = VARSIZE_ANY_EXHDR(prefixText);
00397         maxReconstrLen += prefixSize;
00398     }
00399 
00400     reconstrText = palloc(VARHDRSZ + maxReconstrLen);
00401     SET_VARSIZE(reconstrText, VARHDRSZ + maxReconstrLen);
00402 
00403     if (in->level)
00404         memcpy(VARDATA(reconstrText),
00405                VARDATA(DatumGetPointer(in->reconstructedValue)),
00406                in->level);
00407     if (prefixSize)
00408         memcpy(((char *) VARDATA(reconstrText)) + in->level,
00409                VARDATA_ANY(prefixText),
00410                prefixSize);
00411     /* last byte of reconstrText will be filled in below */
00412 
00413     /*
00414      * Scan the child nodes.  For each one, complete the reconstructed value
00415      * and see if it's consistent with the query.  If so, emit an entry into
00416      * the output arrays.
00417      */
00418     out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
00419     out->levelAdds = (int *) palloc(sizeof(int) * in->nNodes);
00420     out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
00421     out->nNodes = 0;
00422 
00423     for (i = 0; i < in->nNodes; i++)
00424     {
00425         uint8       nodeChar = DatumGetUInt8(in->nodeLabels[i]);
00426         int         thisLen;
00427         bool        res = true;
00428         int         j;
00429 
00430         /* If nodeChar is zero, don't include it in data */
00431         if (nodeChar == '\0')
00432             thisLen = maxReconstrLen - 1;
00433         else
00434         {
00435             ((char *) VARDATA(reconstrText))[maxReconstrLen - 1] = nodeChar;
00436             thisLen = maxReconstrLen;
00437         }
00438 
00439         for (j = 0; j < in->nkeys; j++)
00440         {
00441             StrategyNumber strategy = in->scankeys[j].sk_strategy;
00442             text       *inText;
00443             int         inSize;
00444             int         r;
00445 
00446             /*
00447              * If it's a collation-aware operator, but the collation is C, we
00448              * can treat it as non-collation-aware.  With non-C collation we
00449              * need to traverse whole tree :-( so there's no point in making
00450              * any check here.
00451              */
00452             if (strategy > 10)
00453             {
00454                 if (collate_is_c)
00455                     strategy -= 10;
00456                 else
00457                     continue;
00458             }
00459 
00460             inText = DatumGetTextPP(in->scankeys[j].sk_argument);
00461             inSize = VARSIZE_ANY_EXHDR(inText);
00462 
00463             r = memcmp(VARDATA(reconstrText), VARDATA_ANY(inText),
00464                        Min(inSize, thisLen));
00465 
00466             switch (strategy)
00467             {
00468                 case BTLessStrategyNumber:
00469                 case BTLessEqualStrategyNumber:
00470                     if (r > 0)
00471                         res = false;
00472                     break;
00473                 case BTEqualStrategyNumber:
00474                     if (r != 0 || inSize < thisLen)
00475                         res = false;
00476                     break;
00477                 case BTGreaterEqualStrategyNumber:
00478                 case BTGreaterStrategyNumber:
00479                     if (r < 0)
00480                         res = false;
00481                     break;
00482                 default:
00483                     elog(ERROR, "unrecognized strategy number: %d",
00484                          in->scankeys[j].sk_strategy);
00485                     break;
00486             }
00487 
00488             if (!res)
00489                 break;          /* no need to consider remaining conditions */
00490         }
00491 
00492         if (res)
00493         {
00494             out->nodeNumbers[out->nNodes] = i;
00495             out->levelAdds[out->nNodes] = thisLen - in->level;
00496             SET_VARSIZE(reconstrText, VARHDRSZ + thisLen);
00497             out->reconstructedValues[out->nNodes] =
00498                 datumCopy(PointerGetDatum(reconstrText), false, -1);
00499             out->nNodes++;
00500         }
00501     }
00502 
00503     PG_RETURN_VOID();
00504 }
00505 
00506 Datum
00507 spg_text_leaf_consistent(PG_FUNCTION_ARGS)
00508 {
00509     spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
00510     spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
00511     int         level = in->level;
00512     text       *leafValue,
00513                *reconstrValue = NULL;
00514     char       *fullValue;
00515     int         fullLen;
00516     bool        res;
00517     int         j;
00518 
00519     /* all tests are exact */
00520     out->recheck = false;
00521 
00522     leafValue = DatumGetTextPP(in->leafDatum);
00523 
00524     if (DatumGetPointer(in->reconstructedValue))
00525         reconstrValue = DatumGetTextP(in->reconstructedValue);
00526 
00527     Assert(level == 0 ? reconstrValue == NULL :
00528            VARSIZE_ANY_EXHDR(reconstrValue) == level);
00529 
00530     /* Reconstruct the full string represented by this leaf tuple */
00531     fullLen = level + VARSIZE_ANY_EXHDR(leafValue);
00532     if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0)
00533     {
00534         fullValue = VARDATA(reconstrValue);
00535         out->leafValue = PointerGetDatum(reconstrValue);
00536     }
00537     else
00538     {
00539         text       *fullText = palloc(VARHDRSZ + fullLen);
00540 
00541         SET_VARSIZE(fullText, VARHDRSZ + fullLen);
00542         fullValue = VARDATA(fullText);
00543         if (level)
00544             memcpy(fullValue, VARDATA(reconstrValue), level);
00545         if (VARSIZE_ANY_EXHDR(leafValue) > 0)
00546             memcpy(fullValue + level, VARDATA_ANY(leafValue),
00547                    VARSIZE_ANY_EXHDR(leafValue));
00548         out->leafValue = PointerGetDatum(fullText);
00549     }
00550 
00551     /* Perform the required comparison(s) */
00552     res = true;
00553     for (j = 0; j < in->nkeys; j++)
00554     {
00555         StrategyNumber strategy = in->scankeys[j].sk_strategy;
00556         text       *query = DatumGetTextPP(in->scankeys[j].sk_argument);
00557         int         queryLen = VARSIZE_ANY_EXHDR(query);
00558         int         r;
00559 
00560         if (strategy > 10)
00561         {
00562             /* Collation-aware comparison */
00563             strategy -= 10;
00564 
00565             /* If asserts enabled, verify encoding of reconstructed string */
00566             Assert(pg_verifymbstr(fullValue, fullLen, false));
00567 
00568             r = varstr_cmp(fullValue, Min(queryLen, fullLen),
00569                            VARDATA_ANY(query), Min(queryLen, fullLen),
00570                            PG_GET_COLLATION());
00571         }
00572         else
00573         {
00574             /* Non-collation-aware comparison */
00575             r = memcmp(fullValue, VARDATA_ANY(query), Min(queryLen, fullLen));
00576         }
00577 
00578         if (r == 0)
00579         {
00580             if (queryLen > fullLen)
00581                 r = -1;
00582             else if (queryLen < fullLen)
00583                 r = 1;
00584         }
00585 
00586         switch (strategy)
00587         {
00588             case BTLessStrategyNumber:
00589                 res = (r < 0);
00590                 break;
00591             case BTLessEqualStrategyNumber:
00592                 res = (r <= 0);
00593                 break;
00594             case BTEqualStrategyNumber:
00595                 res = (r == 0);
00596                 break;
00597             case BTGreaterEqualStrategyNumber:
00598                 res = (r >= 0);
00599                 break;
00600             case BTGreaterStrategyNumber:
00601                 res = (r > 0);
00602                 break;
00603             default:
00604                 elog(ERROR, "unrecognized strategy number: %d",
00605                      in->scankeys[j].sk_strategy);
00606                 res = false;
00607                 break;
00608         }
00609 
00610         if (!res)
00611             break;              /* no need to consider remaining conditions */
00612     }
00613 
00614     PG_RETURN_BOOL(res);
00615 }