00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "postgres.h"
00017
00018 #include "access/relscan.h"
00019 #include "access/spgist_private.h"
00020 #include "miscadmin.h"
00021 #include "storage/bufmgr.h"
00022 #include "utils/datum.h"
00023 #include "utils/memutils.h"
00024 #include "utils/rel.h"
00025
00026
00027 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
00028 Datum leafValue, bool isnull, bool recheck);
00029
00030 typedef struct ScanStackEntry
00031 {
00032 Datum reconstructedValue;
00033 int level;
00034 ItemPointerData ptr;
00035 } ScanStackEntry;
00036
00037
00038
00039 static void
00040 freeScanStackEntry(SpGistScanOpaque so, ScanStackEntry *stackEntry)
00041 {
00042 if (!so->state.attType.attbyval &&
00043 DatumGetPointer(stackEntry->reconstructedValue) != NULL)
00044 pfree(DatumGetPointer(stackEntry->reconstructedValue));
00045 pfree(stackEntry);
00046 }
00047
00048
00049 static void
00050 freeScanStack(SpGistScanOpaque so)
00051 {
00052 ListCell *lc;
00053
00054 foreach(lc, so->scanStack)
00055 {
00056 freeScanStackEntry(so, (ScanStackEntry *) lfirst(lc));
00057 }
00058 list_free(so->scanStack);
00059 so->scanStack = NIL;
00060 }
00061
00062
00063
00064
00065
00066 static void
00067 resetSpGistScanOpaque(SpGistScanOpaque so)
00068 {
00069 ScanStackEntry *startEntry;
00070
00071 freeScanStack(so);
00072
00073 if (so->searchNulls)
00074 {
00075
00076 startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
00077 ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber);
00078 so->scanStack = lappend(so->scanStack, startEntry);
00079 }
00080
00081 if (so->searchNonNulls)
00082 {
00083
00084 startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
00085 ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber);
00086 so->scanStack = lappend(so->scanStack, startEntry);
00087 }
00088
00089 if (so->want_itup)
00090 {
00091
00092 int i;
00093
00094 for (i = 0; i < so->nPtrs; i++)
00095 pfree(so->indexTups[i]);
00096 }
00097 so->iPtr = so->nPtrs = 0;
00098 }
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111 static void
00112 spgPrepareScanKeys(IndexScanDesc scan)
00113 {
00114 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
00115 bool qual_ok;
00116 bool haveIsNull;
00117 bool haveNotNull;
00118 int nkeys;
00119 int i;
00120
00121 if (scan->numberOfKeys <= 0)
00122 {
00123
00124 so->searchNulls = true;
00125 so->searchNonNulls = true;
00126 so->numberOfKeys = 0;
00127 return;
00128 }
00129
00130
00131 qual_ok = true;
00132 haveIsNull = haveNotNull = false;
00133 nkeys = 0;
00134 for (i = 0; i < scan->numberOfKeys; i++)
00135 {
00136 ScanKey skey = &scan->keyData[i];
00137
00138 if (skey->sk_flags & SK_SEARCHNULL)
00139 haveIsNull = true;
00140 else if (skey->sk_flags & SK_SEARCHNOTNULL)
00141 haveNotNull = true;
00142 else if (skey->sk_flags & SK_ISNULL)
00143 {
00144
00145 qual_ok = false;
00146 break;
00147 }
00148 else
00149 {
00150
00151 so->keyData[nkeys++] = *skey;
00152
00153 haveNotNull = true;
00154 }
00155 }
00156
00157
00158 if (haveIsNull && haveNotNull)
00159 qual_ok = false;
00160
00161
00162 if (qual_ok)
00163 {
00164 so->searchNulls = haveIsNull;
00165 so->searchNonNulls = haveNotNull;
00166 so->numberOfKeys = nkeys;
00167 }
00168 else
00169 {
00170 so->searchNulls = false;
00171 so->searchNonNulls = false;
00172 so->numberOfKeys = 0;
00173 }
00174 }
00175
00176 Datum
00177 spgbeginscan(PG_FUNCTION_ARGS)
00178 {
00179 Relation rel = (Relation) PG_GETARG_POINTER(0);
00180 int keysz = PG_GETARG_INT32(1);
00181
00182
00183 IndexScanDesc scan;
00184 SpGistScanOpaque so;
00185
00186 scan = RelationGetIndexScan(rel, keysz, 0);
00187
00188 so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
00189 if (keysz > 0)
00190 so->keyData = (ScanKey) palloc(sizeof(ScanKeyData) * keysz);
00191 else
00192 so->keyData = NULL;
00193 initSpGistState(&so->state, scan->indexRelation);
00194 so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
00195 "SP-GiST search temporary context",
00196 ALLOCSET_DEFAULT_MINSIZE,
00197 ALLOCSET_DEFAULT_INITSIZE,
00198 ALLOCSET_DEFAULT_MAXSIZE);
00199
00200
00201 so->indexTupDesc = scan->xs_itupdesc = RelationGetDescr(rel);
00202
00203 scan->opaque = so;
00204
00205 PG_RETURN_POINTER(scan);
00206 }
00207
00208 Datum
00209 spgrescan(PG_FUNCTION_ARGS)
00210 {
00211 IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00212 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
00213 ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
00214
00215
00216 if (scankey && scan->numberOfKeys > 0)
00217 {
00218 memmove(scan->keyData, scankey,
00219 scan->numberOfKeys * sizeof(ScanKeyData));
00220 }
00221
00222
00223 spgPrepareScanKeys(scan);
00224
00225
00226 resetSpGistScanOpaque(so);
00227
00228 PG_RETURN_VOID();
00229 }
00230
00231 Datum
00232 spgendscan(PG_FUNCTION_ARGS)
00233 {
00234 IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00235 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
00236
00237 MemoryContextDelete(so->tempCxt);
00238
00239 PG_RETURN_VOID();
00240 }
00241
00242 Datum
00243 spgmarkpos(PG_FUNCTION_ARGS)
00244 {
00245 elog(ERROR, "SPGiST does not support mark/restore");
00246 PG_RETURN_VOID();
00247 }
00248
00249 Datum
00250 spgrestrpos(PG_FUNCTION_ARGS)
00251 {
00252 elog(ERROR, "SPGiST does not support mark/restore");
00253 PG_RETURN_VOID();
00254 }
00255
00256
00257
00258
00259
00260
00261
00262 static bool
00263 spgLeafTest(Relation index, SpGistScanOpaque so,
00264 SpGistLeafTuple leafTuple, bool isnull,
00265 int level, Datum reconstructedValue,
00266 Datum *leafValue, bool *recheck)
00267 {
00268 bool result;
00269 Datum leafDatum;
00270 spgLeafConsistentIn in;
00271 spgLeafConsistentOut out;
00272 FmgrInfo *procinfo;
00273 MemoryContext oldCtx;
00274
00275 if (isnull)
00276 {
00277
00278 Assert(so->searchNulls);
00279 *leafValue = (Datum) 0;
00280 *recheck = false;
00281 return true;
00282 }
00283
00284 leafDatum = SGLTDATUM(leafTuple, &so->state);
00285
00286
00287 oldCtx = MemoryContextSwitchTo(so->tempCxt);
00288
00289 in.scankeys = so->keyData;
00290 in.nkeys = so->numberOfKeys;
00291 in.reconstructedValue = reconstructedValue;
00292 in.level = level;
00293 in.returnData = so->want_itup;
00294 in.leafDatum = leafDatum;
00295
00296 out.leafValue = (Datum) 0;
00297 out.recheck = false;
00298
00299 procinfo = index_getprocinfo(index, 1, SPGIST_LEAF_CONSISTENT_PROC);
00300 result = DatumGetBool(FunctionCall2Coll(procinfo,
00301 index->rd_indcollation[0],
00302 PointerGetDatum(&in),
00303 PointerGetDatum(&out)));
00304
00305 *leafValue = out.leafValue;
00306 *recheck = out.recheck;
00307
00308 MemoryContextSwitchTo(oldCtx);
00309
00310 return result;
00311 }
00312
00313
00314
00315
00316
00317
00318
00319
00320 static void
00321 spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
00322 storeRes_func storeRes)
00323 {
00324 Buffer buffer = InvalidBuffer;
00325 bool reportedSome = false;
00326
00327 while (scanWholeIndex || !reportedSome)
00328 {
00329 ScanStackEntry *stackEntry;
00330 BlockNumber blkno;
00331 OffsetNumber offset;
00332 Page page;
00333 bool isnull;
00334
00335
00336 if (so->scanStack == NIL)
00337 break;
00338
00339 stackEntry = (ScanStackEntry *) linitial(so->scanStack);
00340 so->scanStack = list_delete_first(so->scanStack);
00341
00342 redirect:
00343
00344 CHECK_FOR_INTERRUPTS();
00345
00346 blkno = ItemPointerGetBlockNumber(&stackEntry->ptr);
00347 offset = ItemPointerGetOffsetNumber(&stackEntry->ptr);
00348
00349 if (buffer == InvalidBuffer)
00350 {
00351 buffer = ReadBuffer(index, blkno);
00352 LockBuffer(buffer, BUFFER_LOCK_SHARE);
00353 }
00354 else if (blkno != BufferGetBlockNumber(buffer))
00355 {
00356 UnlockReleaseBuffer(buffer);
00357 buffer = ReadBuffer(index, blkno);
00358 LockBuffer(buffer, BUFFER_LOCK_SHARE);
00359 }
00360
00361
00362 page = BufferGetPage(buffer);
00363
00364 isnull = SpGistPageStoresNulls(page) ? true : false;
00365
00366 if (SpGistPageIsLeaf(page))
00367 {
00368 SpGistLeafTuple leafTuple;
00369 OffsetNumber max = PageGetMaxOffsetNumber(page);
00370 Datum leafValue = (Datum) 0;
00371 bool recheck = false;
00372
00373 if (SpGistBlockIsRoot(blkno))
00374 {
00375
00376 for (offset = FirstOffsetNumber; offset <= max; offset++)
00377 {
00378 leafTuple = (SpGistLeafTuple)
00379 PageGetItem(page, PageGetItemId(page, offset));
00380 if (leafTuple->tupstate != SPGIST_LIVE)
00381 {
00382
00383 elog(ERROR, "unexpected SPGiST tuple state: %d",
00384 leafTuple->tupstate);
00385 }
00386
00387 Assert(ItemPointerIsValid(&leafTuple->heapPtr));
00388 if (spgLeafTest(index, so,
00389 leafTuple, isnull,
00390 stackEntry->level,
00391 stackEntry->reconstructedValue,
00392 &leafValue,
00393 &recheck))
00394 {
00395 storeRes(so, &leafTuple->heapPtr,
00396 leafValue, isnull, recheck);
00397 reportedSome = true;
00398 }
00399 }
00400 }
00401 else
00402 {
00403
00404 while (offset != InvalidOffsetNumber)
00405 {
00406 Assert(offset >= FirstOffsetNumber && offset <= max);
00407 leafTuple = (SpGistLeafTuple)
00408 PageGetItem(page, PageGetItemId(page, offset));
00409 if (leafTuple->tupstate != SPGIST_LIVE)
00410 {
00411 if (leafTuple->tupstate == SPGIST_REDIRECT)
00412 {
00413
00414 Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
00415
00416 stackEntry->ptr = ((SpGistDeadTuple) leafTuple)->pointer;
00417 Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
00418 goto redirect;
00419 }
00420 if (leafTuple->tupstate == SPGIST_DEAD)
00421 {
00422
00423 Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
00424
00425 Assert(leafTuple->nextOffset == InvalidOffsetNumber);
00426 break;
00427 }
00428
00429 elog(ERROR, "unexpected SPGiST tuple state: %d",
00430 leafTuple->tupstate);
00431 }
00432
00433 Assert(ItemPointerIsValid(&leafTuple->heapPtr));
00434 if (spgLeafTest(index, so,
00435 leafTuple, isnull,
00436 stackEntry->level,
00437 stackEntry->reconstructedValue,
00438 &leafValue,
00439 &recheck))
00440 {
00441 storeRes(so, &leafTuple->heapPtr,
00442 leafValue, isnull, recheck);
00443 reportedSome = true;
00444 }
00445
00446 offset = leafTuple->nextOffset;
00447 }
00448 }
00449 }
00450 else
00451 {
00452 SpGistInnerTuple innerTuple;
00453 spgInnerConsistentIn in;
00454 spgInnerConsistentOut out;
00455 FmgrInfo *procinfo;
00456 SpGistNodeTuple *nodes;
00457 SpGistNodeTuple node;
00458 int i;
00459 MemoryContext oldCtx;
00460
00461 innerTuple = (SpGistInnerTuple) PageGetItem(page,
00462 PageGetItemId(page, offset));
00463
00464 if (innerTuple->tupstate != SPGIST_LIVE)
00465 {
00466 if (innerTuple->tupstate == SPGIST_REDIRECT)
00467 {
00468
00469 stackEntry->ptr = ((SpGistDeadTuple) innerTuple)->pointer;
00470 Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
00471 goto redirect;
00472 }
00473 elog(ERROR, "unexpected SPGiST tuple state: %d",
00474 innerTuple->tupstate);
00475 }
00476
00477
00478 oldCtx = MemoryContextSwitchTo(so->tempCxt);
00479
00480 in.scankeys = so->keyData;
00481 in.nkeys = so->numberOfKeys;
00482 in.reconstructedValue = stackEntry->reconstructedValue;
00483 in.level = stackEntry->level;
00484 in.returnData = so->want_itup;
00485 in.allTheSame = innerTuple->allTheSame;
00486 in.hasPrefix = (innerTuple->prefixSize > 0);
00487 in.prefixDatum = SGITDATUM(innerTuple, &so->state);
00488 in.nNodes = innerTuple->nNodes;
00489 in.nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
00490
00491
00492 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * in.nNodes);
00493 SGITITERATE(innerTuple, i, node)
00494 {
00495 nodes[i] = node;
00496 }
00497
00498 memset(&out, 0, sizeof(out));
00499
00500 if (!isnull)
00501 {
00502
00503 procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
00504 FunctionCall2Coll(procinfo,
00505 index->rd_indcollation[0],
00506 PointerGetDatum(&in),
00507 PointerGetDatum(&out));
00508 }
00509 else
00510 {
00511
00512 out.nNodes = in.nNodes;
00513 out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes);
00514 for (i = 0; i < in.nNodes; i++)
00515 out.nodeNumbers[i] = i;
00516 }
00517
00518 MemoryContextSwitchTo(oldCtx);
00519
00520
00521 if (innerTuple->allTheSame)
00522 if (out.nNodes != 0 && out.nNodes != in.nNodes)
00523 elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
00524
00525 for (i = 0; i < out.nNodes; i++)
00526 {
00527 int nodeN = out.nodeNumbers[i];
00528
00529 Assert(nodeN >= 0 && nodeN < in.nNodes);
00530 if (ItemPointerIsValid(&nodes[nodeN]->t_tid))
00531 {
00532 ScanStackEntry *newEntry;
00533
00534
00535 newEntry = palloc(sizeof(ScanStackEntry));
00536 newEntry->ptr = nodes[nodeN]->t_tid;
00537 if (out.levelAdds)
00538 newEntry->level = stackEntry->level + out.levelAdds[i];
00539 else
00540 newEntry->level = stackEntry->level;
00541
00542 if (out.reconstructedValues)
00543 newEntry->reconstructedValue =
00544 datumCopy(out.reconstructedValues[i],
00545 so->state.attType.attbyval,
00546 so->state.attType.attlen);
00547 else
00548 newEntry->reconstructedValue = (Datum) 0;
00549
00550 so->scanStack = lcons(newEntry, so->scanStack);
00551 }
00552 }
00553 }
00554
00555
00556 freeScanStackEntry(so, stackEntry);
00557
00558 MemoryContextReset(so->tempCxt);
00559 }
00560
00561 if (buffer != InvalidBuffer)
00562 UnlockReleaseBuffer(buffer);
00563 }
00564
00565
00566 static void
00567 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
00568 Datum leafValue, bool isnull, bool recheck)
00569 {
00570 tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
00571 so->ntids++;
00572 }
00573
00574 Datum
00575 spggetbitmap(PG_FUNCTION_ARGS)
00576 {
00577 IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00578 TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
00579 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
00580
00581
00582 so->want_itup = false;
00583
00584 so->tbm = tbm;
00585 so->ntids = 0;
00586
00587 spgWalk(scan->indexRelation, so, true, storeBitmap);
00588
00589 PG_RETURN_INT64(so->ntids);
00590 }
00591
00592
00593 static void
00594 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
00595 Datum leafValue, bool isnull, bool recheck)
00596 {
00597 Assert(so->nPtrs < MaxIndexTuplesPerPage);
00598 so->heapPtrs[so->nPtrs] = *heapPtr;
00599 so->recheck[so->nPtrs] = recheck;
00600 if (so->want_itup)
00601 {
00602
00603
00604
00605
00606 so->indexTups[so->nPtrs] = index_form_tuple(so->indexTupDesc,
00607 &leafValue,
00608 &isnull);
00609 }
00610 so->nPtrs++;
00611 }
00612
00613 Datum
00614 spggettuple(PG_FUNCTION_ARGS)
00615 {
00616 IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00617 ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
00618 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
00619
00620 if (dir != ForwardScanDirection)
00621 elog(ERROR, "SP-GiST only supports forward scan direction");
00622
00623
00624 so->want_itup = scan->xs_want_itup;
00625
00626 for (;;)
00627 {
00628 if (so->iPtr < so->nPtrs)
00629 {
00630
00631 scan->xs_ctup.t_self = so->heapPtrs[so->iPtr];
00632 scan->xs_recheck = so->recheck[so->iPtr];
00633 scan->xs_itup = so->indexTups[so->iPtr];
00634 so->iPtr++;
00635 PG_RETURN_BOOL(true);
00636 }
00637
00638 if (so->want_itup)
00639 {
00640
00641 int i;
00642
00643 for (i = 0; i < so->nPtrs; i++)
00644 pfree(so->indexTups[i]);
00645 }
00646 so->iPtr = so->nPtrs = 0;
00647
00648 spgWalk(scan->indexRelation, so, false, storeGettuple);
00649
00650 if (so->nPtrs == 0)
00651 break;
00652 }
00653
00654 PG_RETURN_BOOL(false);
00655 }
00656
00657 Datum
00658 spgcanreturn(PG_FUNCTION_ARGS)
00659 {
00660 Relation index = (Relation) PG_GETARG_POINTER(0);
00661 SpGistCache *cache;
00662
00663
00664 cache = spgGetCache(index);
00665
00666 PG_RETURN_BOOL(cache->config.canReturnData);
00667 }