00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include "access/gin_private.h"
00018 #include "miscadmin.h"
00019 #include "utils/rel.h"
00020
00021
00022
00023
00024 static int
00025 ginTraverseLock(Buffer buffer, bool searchMode)
00026 {
00027 Page page;
00028 int access = GIN_SHARE;
00029
00030 LockBuffer(buffer, GIN_SHARE);
00031 page = BufferGetPage(buffer);
00032 if (GinPageIsLeaf(page))
00033 {
00034 if (searchMode == FALSE)
00035 {
00036
00037 LockBuffer(buffer, GIN_UNLOCK);
00038 LockBuffer(buffer, GIN_EXCLUSIVE);
00039
00040
00041 if (!GinPageIsLeaf(page))
00042 {
00043
00044 LockBuffer(buffer, GIN_UNLOCK);
00045 LockBuffer(buffer, GIN_SHARE);
00046 }
00047 else
00048 access = GIN_EXCLUSIVE;
00049 }
00050 }
00051
00052 return access;
00053 }
00054
00055 GinBtreeStack *
00056 ginPrepareFindLeafPage(GinBtree btree, BlockNumber blkno)
00057 {
00058 GinBtreeStack *stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
00059
00060 stack->blkno = blkno;
00061 stack->buffer = ReadBuffer(btree->index, stack->blkno);
00062 stack->parent = NULL;
00063 stack->predictNumber = 1;
00064
00065 ginTraverseLock(stack->buffer, btree->searchMode);
00066
00067 return stack;
00068 }
00069
00070
00071
00072
00073 GinBtreeStack *
00074 ginFindLeafPage(GinBtree btree, GinBtreeStack *stack)
00075 {
00076 bool isfirst = TRUE;
00077 BlockNumber rootBlkno;
00078
00079 if (!stack)
00080 stack = ginPrepareFindLeafPage(btree, GIN_ROOT_BLKNO);
00081 rootBlkno = stack->blkno;
00082
00083 for (;;)
00084 {
00085 Page page;
00086 BlockNumber child;
00087 int access = GIN_SHARE;
00088
00089 stack->off = InvalidOffsetNumber;
00090
00091 page = BufferGetPage(stack->buffer);
00092
00093 if (isfirst)
00094 {
00095 if (GinPageIsLeaf(page) && !btree->searchMode)
00096 access = GIN_EXCLUSIVE;
00097 isfirst = FALSE;
00098 }
00099 else
00100 access = ginTraverseLock(stack->buffer, btree->searchMode);
00101
00102
00103
00104
00105
00106 while (btree->fullScan == FALSE && stack->blkno != rootBlkno &&
00107 btree->isMoveRight(btree, page))
00108 {
00109 BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
00110
00111 if (rightlink == InvalidBlockNumber)
00112
00113 break;
00114
00115 stack->blkno = rightlink;
00116 LockBuffer(stack->buffer, GIN_UNLOCK);
00117 stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
00118 LockBuffer(stack->buffer, access);
00119 page = BufferGetPage(stack->buffer);
00120 }
00121
00122 if (GinPageIsLeaf(page))
00123 return stack;
00124
00125
00126 child = btree->findChildPage(btree, stack);
00127
00128 LockBuffer(stack->buffer, GIN_UNLOCK);
00129 Assert(child != InvalidBlockNumber);
00130 Assert(stack->blkno != child);
00131
00132 if (btree->searchMode)
00133 {
00134
00135 stack->blkno = child;
00136 stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
00137 }
00138 else
00139 {
00140 GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
00141
00142 ptr->parent = stack;
00143 stack = ptr;
00144 stack->blkno = child;
00145 stack->buffer = ReadBuffer(btree->index, stack->blkno);
00146 stack->predictNumber = 1;
00147 }
00148 }
00149 }
00150
00151 void
00152 freeGinBtreeStack(GinBtreeStack *stack)
00153 {
00154 while (stack)
00155 {
00156 GinBtreeStack *tmp = stack->parent;
00157
00158 if (stack->buffer != InvalidBuffer)
00159 ReleaseBuffer(stack->buffer);
00160
00161 pfree(stack);
00162 stack = tmp;
00163 }
00164 }
00165
00166
00167
00168
00169
00170
00171
00172 void
00173 ginFindParents(GinBtree btree, GinBtreeStack *stack,
00174 BlockNumber rootBlkno)
00175 {
00176
00177 Page page;
00178 Buffer buffer;
00179 BlockNumber blkno,
00180 leftmostBlkno;
00181 OffsetNumber offset;
00182 GinBtreeStack *root = stack->parent;
00183 GinBtreeStack *ptr;
00184
00185 if (!root)
00186 {
00187
00188 root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
00189 root->blkno = rootBlkno;
00190 root->buffer = ReadBuffer(btree->index, rootBlkno);
00191 LockBuffer(root->buffer, GIN_EXCLUSIVE);
00192 root->parent = NULL;
00193 }
00194 else
00195 {
00196
00197
00198
00199
00200 while (root->parent)
00201 {
00202 ReleaseBuffer(root->buffer);
00203 root = root->parent;
00204 }
00205
00206 Assert(root->blkno == rootBlkno);
00207 Assert(BufferGetBlockNumber(root->buffer) == rootBlkno);
00208 LockBuffer(root->buffer, GIN_EXCLUSIVE);
00209 }
00210 root->off = InvalidOffsetNumber;
00211
00212 page = BufferGetPage(root->buffer);
00213 Assert(!GinPageIsLeaf(page));
00214
00215
00216 if ((root->off = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) != InvalidOffsetNumber)
00217 {
00218 stack->parent = root;
00219 return;
00220 }
00221
00222 leftmostBlkno = blkno = btree->getLeftMostPage(btree, page);
00223 LockBuffer(root->buffer, GIN_UNLOCK);
00224 Assert(blkno != InvalidBlockNumber);
00225
00226 for (;;)
00227 {
00228 buffer = ReadBuffer(btree->index, blkno);
00229 LockBuffer(buffer, GIN_EXCLUSIVE);
00230 page = BufferGetPage(buffer);
00231 if (GinPageIsLeaf(page))
00232 elog(ERROR, "Lost path");
00233
00234 leftmostBlkno = btree->getLeftMostPage(btree, page);
00235
00236 while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
00237 {
00238 blkno = GinPageGetOpaque(page)->rightlink;
00239 LockBuffer(buffer, GIN_UNLOCK);
00240 ReleaseBuffer(buffer);
00241 if (blkno == InvalidBlockNumber)
00242 break;
00243 buffer = ReadBuffer(btree->index, blkno);
00244 LockBuffer(buffer, GIN_EXCLUSIVE);
00245 page = BufferGetPage(buffer);
00246 }
00247
00248 if (blkno != InvalidBlockNumber)
00249 {
00250 ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
00251 ptr->blkno = blkno;
00252 ptr->buffer = buffer;
00253 ptr->parent = root;
00254
00255 ptr->off = offset;
00256 stack->parent = ptr;
00257 return;
00258 }
00259
00260 blkno = leftmostBlkno;
00261 }
00262 }
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272 void
00273 ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
00274 {
00275 GinBtreeStack *parent;
00276 BlockNumber rootBlkno;
00277 Page page,
00278 rpage,
00279 lpage;
00280
00281
00282 Assert(stack != NULL);
00283 parent = stack;
00284 while (parent->parent)
00285 parent = parent->parent;
00286 rootBlkno = parent->blkno;
00287 Assert(BlockNumberIsValid(rootBlkno));
00288
00289
00290 for (;;)
00291 {
00292 XLogRecData *rdata;
00293 BlockNumber savedRightLink;
00294
00295 page = BufferGetPage(stack->buffer);
00296 savedRightLink = GinPageGetOpaque(page)->rightlink;
00297
00298 if (btree->isEnoughSpace(btree, stack->buffer, stack->off))
00299 {
00300 START_CRIT_SECTION();
00301 btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
00302
00303 MarkBufferDirty(stack->buffer);
00304
00305 if (RelationNeedsWAL(btree->index))
00306 {
00307 XLogRecPtr recptr;
00308
00309 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
00310 PageSetLSN(page, recptr);
00311 }
00312
00313 LockBuffer(stack->buffer, GIN_UNLOCK);
00314 END_CRIT_SECTION();
00315
00316 freeGinBtreeStack(stack);
00317
00318 return;
00319 }
00320 else
00321 {
00322 Buffer rbuffer = GinNewBuffer(btree->index);
00323 Page newlpage;
00324
00325
00326
00327
00328
00329 newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
00330
00331 ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
00332
00333
00334 if (buildStats)
00335 {
00336 if (btree->isData)
00337 buildStats->nDataPages++;
00338 else
00339 buildStats->nEntryPages++;
00340 }
00341
00342 parent = stack->parent;
00343
00344 if (parent == NULL)
00345 {
00346
00347
00348
00349
00350 Buffer lbuffer = GinNewBuffer(btree->index);
00351
00352 ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
00353 ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
00354
00355 page = BufferGetPage(stack->buffer);
00356 lpage = BufferGetPage(lbuffer);
00357 rpage = BufferGetPage(rbuffer);
00358
00359 GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
00360 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
00361 ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
00362
00363 START_CRIT_SECTION();
00364
00365 GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
00366 PageRestoreTempPage(newlpage, lpage);
00367 btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
00368
00369 MarkBufferDirty(rbuffer);
00370 MarkBufferDirty(lbuffer);
00371 MarkBufferDirty(stack->buffer);
00372
00373 if (RelationNeedsWAL(btree->index))
00374 {
00375 XLogRecPtr recptr;
00376
00377 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
00378 PageSetLSN(page, recptr);
00379 PageSetLSN(lpage, recptr);
00380 PageSetLSN(rpage, recptr);
00381 }
00382
00383 UnlockReleaseBuffer(rbuffer);
00384 UnlockReleaseBuffer(lbuffer);
00385 LockBuffer(stack->buffer, GIN_UNLOCK);
00386 END_CRIT_SECTION();
00387
00388 freeGinBtreeStack(stack);
00389
00390
00391 if (buildStats)
00392 {
00393 if (btree->isData)
00394 buildStats->nDataPages++;
00395 else
00396 buildStats->nEntryPages++;
00397 }
00398
00399 return;
00400 }
00401 else
00402 {
00403
00404 ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
00405 ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
00406
00407 lpage = BufferGetPage(stack->buffer);
00408 rpage = BufferGetPage(rbuffer);
00409
00410 GinPageGetOpaque(rpage)->rightlink = savedRightLink;
00411 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
00412
00413 START_CRIT_SECTION();
00414 PageRestoreTempPage(newlpage, lpage);
00415
00416 MarkBufferDirty(rbuffer);
00417 MarkBufferDirty(stack->buffer);
00418
00419 if (RelationNeedsWAL(btree->index))
00420 {
00421 XLogRecPtr recptr;
00422
00423 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
00424 PageSetLSN(lpage, recptr);
00425 PageSetLSN(rpage, recptr);
00426 }
00427 UnlockReleaseBuffer(rbuffer);
00428 END_CRIT_SECTION();
00429 }
00430 }
00431
00432 btree->isDelete = FALSE;
00433
00434
00435 LockBuffer(parent->buffer, GIN_EXCLUSIVE);
00436
00437
00438 page = BufferGetPage(parent->buffer);
00439 while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
00440 {
00441 BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
00442
00443 LockBuffer(parent->buffer, GIN_UNLOCK);
00444
00445 if (rightlink == InvalidBlockNumber)
00446 {
00447
00448
00449
00450
00451 ginFindParents(btree, stack, rootBlkno);
00452 parent = stack->parent;
00453 Assert(parent != NULL);
00454 break;
00455 }
00456
00457 parent->blkno = rightlink;
00458 parent->buffer = ReleaseAndReadBuffer(parent->buffer, btree->index, parent->blkno);
00459 LockBuffer(parent->buffer, GIN_EXCLUSIVE);
00460 page = BufferGetPage(parent->buffer);
00461 }
00462
00463 UnlockReleaseBuffer(stack->buffer);
00464 pfree(stack);
00465 stack = parent;
00466 }
00467 }