00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include "access/gist_private.h"
00018 #include "access/relscan.h"
00019 #include "miscadmin.h"
00020 #include "pgstat.h"
00021 #include "utils/builtins.h"
00022 #include "utils/memutils.h"
00023 #include "utils/rel.h"
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047 static bool
00048 gistindex_keytest(IndexScanDesc scan,
00049 IndexTuple tuple,
00050 Page page,
00051 OffsetNumber offset,
00052 bool *recheck_p)
00053 {
00054 GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
00055 GISTSTATE *giststate = so->giststate;
00056 ScanKey key = scan->keyData;
00057 int keySize = scan->numberOfKeys;
00058 double *distance_p;
00059 Relation r = scan->indexRelation;
00060
00061 *recheck_p = false;
00062
00063
00064
00065
00066
00067
00068 if (GistTupleIsInvalid(tuple))
00069 {
00070 int i;
00071
00072 if (GistPageIsLeaf(page))
00073 elog(ERROR, "invalid GiST tuple found on leaf page");
00074 for (i = 0; i < scan->numberOfOrderBys; i++)
00075 so->distances[i] = -get_float8_infinity();
00076 return true;
00077 }
00078
00079
00080 while (keySize > 0)
00081 {
00082 Datum datum;
00083 bool isNull;
00084
00085 datum = index_getattr(tuple,
00086 key->sk_attno,
00087 giststate->tupdesc,
00088 &isNull);
00089
00090 if (key->sk_flags & SK_ISNULL)
00091 {
00092
00093
00094
00095
00096
00097
00098 if (key->sk_flags & SK_SEARCHNULL)
00099 {
00100 if (GistPageIsLeaf(page) && !isNull)
00101 return false;
00102 }
00103 else
00104 {
00105 Assert(key->sk_flags & SK_SEARCHNOTNULL);
00106 if (isNull)
00107 return false;
00108 }
00109 }
00110 else if (isNull)
00111 {
00112 return false;
00113 }
00114 else
00115 {
00116 Datum test;
00117 bool recheck;
00118 GISTENTRY de;
00119
00120 gistdentryinit(giststate, key->sk_attno - 1, &de,
00121 datum, r, page, offset,
00122 FALSE, isNull);
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137 recheck = true;
00138
00139 test = FunctionCall5Coll(&key->sk_func,
00140 key->sk_collation,
00141 PointerGetDatum(&de),
00142 key->sk_argument,
00143 Int32GetDatum(key->sk_strategy),
00144 ObjectIdGetDatum(key->sk_subtype),
00145 PointerGetDatum(&recheck));
00146
00147 if (!DatumGetBool(test))
00148 return false;
00149 *recheck_p |= recheck;
00150 }
00151
00152 key++;
00153 keySize--;
00154 }
00155
00156
00157 key = scan->orderByData;
00158 distance_p = so->distances;
00159 keySize = scan->numberOfOrderBys;
00160 while (keySize > 0)
00161 {
00162 Datum datum;
00163 bool isNull;
00164
00165 datum = index_getattr(tuple,
00166 key->sk_attno,
00167 giststate->tupdesc,
00168 &isNull);
00169
00170 if ((key->sk_flags & SK_ISNULL) || isNull)
00171 {
00172
00173 *distance_p = get_float8_infinity();
00174 }
00175 else
00176 {
00177 Datum dist;
00178 GISTENTRY de;
00179
00180 gistdentryinit(giststate, key->sk_attno - 1, &de,
00181 datum, r, page, offset,
00182 FALSE, isNull);
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198 dist = FunctionCall4Coll(&key->sk_func,
00199 key->sk_collation,
00200 PointerGetDatum(&de),
00201 key->sk_argument,
00202 Int32GetDatum(key->sk_strategy),
00203 ObjectIdGetDatum(key->sk_subtype));
00204
00205 *distance_p = DatumGetFloat8(dist);
00206 }
00207
00208 key++;
00209 distance_p++;
00210 keySize--;
00211 }
00212
00213 return true;
00214 }
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236 static void
00237 gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
00238 TIDBitmap *tbm, int64 *ntids)
00239 {
00240 GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
00241 Buffer buffer;
00242 Page page;
00243 GISTPageOpaque opaque;
00244 OffsetNumber maxoff;
00245 OffsetNumber i;
00246 GISTSearchTreeItem *tmpItem = so->tmpTreeItem;
00247 bool isNew;
00248 MemoryContext oldcxt;
00249
00250 Assert(!GISTSearchItemIsHeap(*pageItem));
00251
00252 buffer = ReadBuffer(scan->indexRelation, pageItem->blkno);
00253 LockBuffer(buffer, GIST_SHARE);
00254 gistcheckpage(scan->indexRelation, buffer);
00255 page = BufferGetPage(buffer);
00256 opaque = GistPageGetOpaque(page);
00257
00258
00259
00260
00261
00262
00263
00264 if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) &&
00265 (GistFollowRight(page) ||
00266 pageItem->data.parentlsn < GistPageGetNSN(page)) &&
00267 opaque->rightlink != InvalidBlockNumber )
00268 {
00269
00270 GISTSearchItem *item;
00271
00272
00273 Assert(myDistances != NULL);
00274
00275 oldcxt = MemoryContextSwitchTo(so->queueCxt);
00276
00277
00278 item = palloc(sizeof(GISTSearchItem));
00279 item->next = NULL;
00280 item->blkno = opaque->rightlink;
00281 item->data.parentlsn = pageItem->data.parentlsn;
00282
00283
00284 tmpItem->head = item;
00285 tmpItem->lastHeap = NULL;
00286 memcpy(tmpItem->distances, myDistances,
00287 sizeof(double) * scan->numberOfOrderBys);
00288
00289 (void) rb_insert(so->queue, (RBNode *) tmpItem, &isNew);
00290
00291 MemoryContextSwitchTo(oldcxt);
00292 }
00293
00294 so->nPageData = so->curPageData = 0;
00295
00296
00297
00298
00299 maxoff = PageGetMaxOffsetNumber(page);
00300 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
00301 {
00302 IndexTuple it = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
00303 bool match;
00304 bool recheck;
00305
00306
00307
00308
00309
00310 oldcxt = MemoryContextSwitchTo(so->giststate->tempCxt);
00311
00312 match = gistindex_keytest(scan, it, page, i, &recheck);
00313
00314 MemoryContextSwitchTo(oldcxt);
00315 MemoryContextReset(so->giststate->tempCxt);
00316
00317
00318 if (!match)
00319 continue;
00320
00321 if (tbm && GistPageIsLeaf(page))
00322 {
00323
00324
00325
00326
00327 tbm_add_tuples(tbm, &it->t_tid, 1, recheck);
00328 (*ntids)++;
00329 }
00330 else if (scan->numberOfOrderBys == 0 && GistPageIsLeaf(page))
00331 {
00332
00333
00334
00335 so->pageData[so->nPageData].heapPtr = it->t_tid;
00336 so->pageData[so->nPageData].recheck = recheck;
00337 so->nPageData++;
00338 }
00339 else
00340 {
00341
00342
00343
00344
00345
00346 GISTSearchItem *item;
00347
00348 oldcxt = MemoryContextSwitchTo(so->queueCxt);
00349
00350
00351 item = palloc(sizeof(GISTSearchItem));
00352 item->next = NULL;
00353
00354 if (GistPageIsLeaf(page))
00355 {
00356
00357 item->blkno = InvalidBlockNumber;
00358 item->data.heap.heapPtr = it->t_tid;
00359 item->data.heap.recheck = recheck;
00360 }
00361 else
00362 {
00363
00364 item->blkno = ItemPointerGetBlockNumber(&it->t_tid);
00365
00366
00367
00368
00369
00370 item->data.parentlsn = BufferGetLSNAtomic(buffer);
00371 }
00372
00373
00374 tmpItem->head = item;
00375 tmpItem->lastHeap = GISTSearchItemIsHeap(*item) ? item : NULL;
00376 memcpy(tmpItem->distances, so->distances,
00377 sizeof(double) * scan->numberOfOrderBys);
00378
00379 (void) rb_insert(so->queue, (RBNode *) tmpItem, &isNew);
00380
00381 MemoryContextSwitchTo(oldcxt);
00382 }
00383 }
00384
00385 UnlockReleaseBuffer(buffer);
00386 }
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397 static GISTSearchItem *
00398 getNextGISTSearchItem(GISTScanOpaque so)
00399 {
00400 for (;;)
00401 {
00402 GISTSearchItem *item;
00403
00404
00405 if (so->curTreeItem == NULL)
00406 {
00407 so->curTreeItem = (GISTSearchTreeItem *) rb_leftmost(so->queue);
00408
00409 if (so->curTreeItem == NULL)
00410 break;
00411 }
00412
00413 item = so->curTreeItem->head;
00414 if (item != NULL)
00415 {
00416
00417 so->curTreeItem->head = item->next;
00418 if (item == so->curTreeItem->lastHeap)
00419 so->curTreeItem->lastHeap = NULL;
00420
00421 return item;
00422 }
00423
00424
00425 rb_delete(so->queue, (RBNode *) so->curTreeItem);
00426 so->curTreeItem = NULL;
00427 }
00428
00429 return NULL;
00430 }
00431
00432
00433
00434
00435 static bool
00436 getNextNearest(IndexScanDesc scan)
00437 {
00438 GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
00439 bool res = false;
00440
00441 do
00442 {
00443 GISTSearchItem *item = getNextGISTSearchItem(so);
00444
00445 if (!item)
00446 break;
00447
00448 if (GISTSearchItemIsHeap(*item))
00449 {
00450
00451 scan->xs_ctup.t_self = item->data.heap.heapPtr;
00452 scan->xs_recheck = item->data.heap.recheck;
00453 res = true;
00454 }
00455 else
00456 {
00457
00458 CHECK_FOR_INTERRUPTS();
00459
00460 gistScanPage(scan, item, so->curTreeItem->distances, NULL, NULL);
00461 }
00462
00463 pfree(item);
00464 } while (!res);
00465
00466 return res;
00467 }
00468
00469
00470
00471
00472 Datum
00473 gistgettuple(PG_FUNCTION_ARGS)
00474 {
00475 IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00476 ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
00477 GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
00478
00479 if (dir != ForwardScanDirection)
00480 elog(ERROR, "GiST only supports forward scan direction");
00481
00482 if (!so->qual_ok)
00483 PG_RETURN_BOOL(false);
00484
00485 if (so->firstCall)
00486 {
00487
00488 GISTSearchItem fakeItem;
00489
00490 pgstat_count_index_scan(scan->indexRelation);
00491
00492 so->firstCall = false;
00493 so->curTreeItem = NULL;
00494 so->curPageData = so->nPageData = 0;
00495
00496 fakeItem.blkno = GIST_ROOT_BLKNO;
00497 memset(&fakeItem.data.parentlsn, 0, sizeof(GistNSN));
00498 gistScanPage(scan, &fakeItem, NULL, NULL, NULL);
00499 }
00500
00501 if (scan->numberOfOrderBys > 0)
00502 {
00503
00504 PG_RETURN_BOOL(getNextNearest(scan));
00505 }
00506 else
00507 {
00508
00509 for (;;)
00510 {
00511 if (so->curPageData < so->nPageData)
00512 {
00513
00514 scan->xs_ctup.t_self = so->pageData[so->curPageData].heapPtr;
00515 scan->xs_recheck = so->pageData[so->curPageData].recheck;
00516 so->curPageData++;
00517 PG_RETURN_BOOL(true);
00518 }
00519
00520
00521 do
00522 {
00523 GISTSearchItem *item = getNextGISTSearchItem(so);
00524
00525 if (!item)
00526 PG_RETURN_BOOL(false);
00527
00528 CHECK_FOR_INTERRUPTS();
00529
00530
00531
00532
00533
00534
00535
00536 gistScanPage(scan, item, so->curTreeItem->distances, NULL, NULL);
00537
00538 pfree(item);
00539 } while (so->nPageData == 0);
00540 }
00541 }
00542 }
00543
00544
00545
00546
00547 Datum
00548 gistgetbitmap(PG_FUNCTION_ARGS)
00549 {
00550 IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00551 TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
00552 GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
00553 int64 ntids = 0;
00554 GISTSearchItem fakeItem;
00555
00556 if (!so->qual_ok)
00557 PG_RETURN_INT64(0);
00558
00559 pgstat_count_index_scan(scan->indexRelation);
00560
00561
00562 so->curTreeItem = NULL;
00563 so->curPageData = so->nPageData = 0;
00564
00565 fakeItem.blkno = GIST_ROOT_BLKNO;
00566 memset(&fakeItem.data.parentlsn, 0, sizeof(GistNSN));
00567 gistScanPage(scan, &fakeItem, NULL, tbm, &ntids);
00568
00569
00570
00571
00572
00573 for (;;)
00574 {
00575 GISTSearchItem *item = getNextGISTSearchItem(so);
00576
00577 if (!item)
00578 break;
00579
00580 CHECK_FOR_INTERRUPTS();
00581
00582 gistScanPage(scan, item, so->curTreeItem->distances, tbm, &ntids);
00583
00584 pfree(item);
00585 }
00586
00587 PG_RETURN_INT64(ntids);
00588 }