00001 /*------------------------------------------------------------------------- 00002 * 00003 * hio.c 00004 * POSTGRES heap access method input/output code. 00005 * 00006 * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group 00007 * Portions Copyright (c) 1994, Regents of the University of California 00008 * 00009 * 00010 * IDENTIFICATION 00011 * src/backend/access/heap/hio.c 00012 * 00013 *------------------------------------------------------------------------- 00014 */ 00015 00016 #include "postgres.h" 00017 00018 #include "access/heapam.h" 00019 #include "access/hio.h" 00020 #include "access/htup_details.h" 00021 #include "access/visibilitymap.h" 00022 #include "storage/bufmgr.h" 00023 #include "storage/freespace.h" 00024 #include "storage/lmgr.h" 00025 #include "storage/smgr.h" 00026 00027 00028 /* 00029 * RelationPutHeapTuple - place tuple at specified page 00030 * 00031 * !!! EREPORT(ERROR) IS DISALLOWED HERE !!! Must PANIC on failure!!! 00032 * 00033 * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer. 00034 */ 00035 void 00036 RelationPutHeapTuple(Relation relation, 00037 Buffer buffer, 00038 HeapTuple tuple) 00039 { 00040 Page pageHeader; 00041 OffsetNumber offnum; 00042 ItemId itemId; 00043 Item item; 00044 00045 /* Add the tuple to the page */ 00046 pageHeader = BufferGetPage(buffer); 00047 00048 offnum = PageAddItem(pageHeader, (Item) tuple->t_data, 00049 tuple->t_len, InvalidOffsetNumber, false, true); 00050 00051 if (offnum == InvalidOffsetNumber) 00052 elog(PANIC, "failed to add tuple to page"); 00053 00054 /* Update tuple->t_self to the actual position where it was stored */ 00055 ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum); 00056 00057 /* Insert the correct position into CTID of the stored tuple, too */ 00058 itemId = PageGetItemId(pageHeader, offnum); 00059 item = PageGetItem(pageHeader, itemId); 00060 ((HeapTupleHeader) item)->t_ctid = tuple->t_self; 00061 } 00062 00063 /* 00064 * Read in a buffer, using bulk-insert strategy if bistate isn't NULL. 00065 */ 00066 static Buffer 00067 ReadBufferBI(Relation relation, BlockNumber targetBlock, 00068 BulkInsertState bistate) 00069 { 00070 Buffer buffer; 00071 00072 /* If not bulk-insert, exactly like ReadBuffer */ 00073 if (!bistate) 00074 return ReadBuffer(relation, targetBlock); 00075 00076 /* If we have the desired block already pinned, re-pin and return it */ 00077 if (bistate->current_buf != InvalidBuffer) 00078 { 00079 if (BufferGetBlockNumber(bistate->current_buf) == targetBlock) 00080 { 00081 IncrBufferRefCount(bistate->current_buf); 00082 return bistate->current_buf; 00083 } 00084 /* ... else drop the old buffer */ 00085 ReleaseBuffer(bistate->current_buf); 00086 bistate->current_buf = InvalidBuffer; 00087 } 00088 00089 /* Perform a read using the buffer strategy */ 00090 buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock, 00091 RBM_NORMAL, bistate->strategy); 00092 00093 /* Save the selected block as target for future inserts */ 00094 IncrBufferRefCount(buffer); 00095 bistate->current_buf = buffer; 00096 00097 return buffer; 00098 } 00099 00100 /* 00101 * For each heap page which is all-visible, acquire a pin on the appropriate 00102 * visibility map page, if we haven't already got one. 00103 * 00104 * buffer2 may be InvalidBuffer, if only one buffer is involved. buffer1 00105 * must not be InvalidBuffer. If both buffers are specified, buffer1 must 00106 * be less than buffer2. 00107 */ 00108 static void 00109 GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2, 00110 BlockNumber block1, BlockNumber block2, 00111 Buffer *vmbuffer1, Buffer *vmbuffer2) 00112 { 00113 bool need_to_pin_buffer1; 00114 bool need_to_pin_buffer2; 00115 00116 Assert(BufferIsValid(buffer1)); 00117 Assert(buffer2 == InvalidBuffer || buffer1 <= buffer2); 00118 00119 while (1) 00120 { 00121 /* Figure out which pins we need but don't have. */ 00122 need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1)) 00123 && !visibilitymap_pin_ok(block1, *vmbuffer1); 00124 need_to_pin_buffer2 = buffer2 != InvalidBuffer 00125 && PageIsAllVisible(BufferGetPage(buffer2)) 00126 && !visibilitymap_pin_ok(block2, *vmbuffer2); 00127 if (!need_to_pin_buffer1 && !need_to_pin_buffer2) 00128 return; 00129 00130 /* We must unlock both buffers before doing any I/O. */ 00131 LockBuffer(buffer1, BUFFER_LOCK_UNLOCK); 00132 if (buffer2 != InvalidBuffer && buffer2 != buffer1) 00133 LockBuffer(buffer2, BUFFER_LOCK_UNLOCK); 00134 00135 /* Get pins. */ 00136 if (need_to_pin_buffer1) 00137 visibilitymap_pin(relation, block1, vmbuffer1); 00138 if (need_to_pin_buffer2) 00139 visibilitymap_pin(relation, block2, vmbuffer2); 00140 00141 /* Relock buffers. */ 00142 LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE); 00143 if (buffer2 != InvalidBuffer && buffer2 != buffer1) 00144 LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE); 00145 00146 /* 00147 * If there are two buffers involved and we pinned just one of them, 00148 * it's possible that the second one became all-visible while we were 00149 * busy pinning the first one. If it looks like that's a possible 00150 * scenario, we'll need to make a second pass through this loop. 00151 */ 00152 if (buffer2 == InvalidBuffer || buffer1 == buffer2 00153 || (need_to_pin_buffer1 && need_to_pin_buffer2)) 00154 break; 00155 } 00156 } 00157 00158 /* 00159 * RelationGetBufferForTuple 00160 * 00161 * Returns pinned and exclusive-locked buffer of a page in given relation 00162 * with free space >= given len. 00163 * 00164 * If otherBuffer is not InvalidBuffer, then it references a previously 00165 * pinned buffer of another page in the same relation; on return, this 00166 * buffer will also be exclusive-locked. (This case is used by heap_update; 00167 * the otherBuffer contains the tuple being updated.) 00168 * 00169 * The reason for passing otherBuffer is that if two backends are doing 00170 * concurrent heap_update operations, a deadlock could occur if they try 00171 * to lock the same two buffers in opposite orders. To ensure that this 00172 * can't happen, we impose the rule that buffers of a relation must be 00173 * locked in increasing page number order. This is most conveniently done 00174 * by having RelationGetBufferForTuple lock them both, with suitable care 00175 * for ordering. 00176 * 00177 * NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the 00178 * same buffer we select for insertion of the new tuple (this could only 00179 * happen if space is freed in that page after heap_update finds there's not 00180 * enough there). In that case, the page will be pinned and locked only once. 00181 * 00182 * For the vmbuffer and vmbuffer_other arguments, we avoid deadlock by 00183 * locking them only after locking the corresponding heap page, and taking 00184 * no further lwlocks while they are locked. 00185 * 00186 * We normally use FSM to help us find free space. However, 00187 * if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to 00188 * the end of the relation if the tuple won't fit on the current target page. 00189 * This can save some cycles when we know the relation is new and doesn't 00190 * contain useful amounts of free space. 00191 * 00192 * HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a 00193 * relation, if the caller holds exclusive lock and is careful to invalidate 00194 * relation's smgr_targblock before the first insertion --- that ensures that 00195 * all insertions will occur into newly added pages and not be intermixed 00196 * with tuples from other transactions. That way, a crash can't risk losing 00197 * any committed data of other transactions. (See heap_insert's comments 00198 * for additional constraints needed for safe usage of this behavior.) 00199 * 00200 * The caller can also provide a BulkInsertState object to optimize many 00201 * insertions into the same relation. This keeps a pin on the current 00202 * insertion target page (to save pin/unpin cycles) and also passes a 00203 * BULKWRITE buffer selection strategy object to the buffer manager. 00204 * Passing NULL for bistate selects the default behavior. 00205 * 00206 * We always try to avoid filling existing pages further than the fillfactor. 00207 * This is OK since this routine is not consulted when updating a tuple and 00208 * keeping it on the same page, which is the scenario fillfactor is meant 00209 * to reserve space for. 00210 * 00211 * ereport(ERROR) is allowed here, so this routine *must* be called 00212 * before any (unlogged) changes are made in buffer pool. 00213 */ 00214 Buffer 00215 RelationGetBufferForTuple(Relation relation, Size len, 00216 Buffer otherBuffer, int options, 00217 BulkInsertState bistate, 00218 Buffer *vmbuffer, Buffer *vmbuffer_other) 00219 { 00220 bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM); 00221 Buffer buffer = InvalidBuffer; 00222 Page page; 00223 Size pageFreeSpace, 00224 saveFreeSpace; 00225 BlockNumber targetBlock, 00226 otherBlock; 00227 bool needLock; 00228 00229 len = MAXALIGN(len); /* be conservative */ 00230 00231 /* Bulk insert is not supported for updates, only inserts. */ 00232 Assert(otherBuffer == InvalidBuffer || !bistate); 00233 00234 /* 00235 * If we're gonna fail for oversize tuple, do it right away 00236 */ 00237 if (len > MaxHeapTupleSize) 00238 ereport(ERROR, 00239 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), 00240 errmsg("row is too big: size %lu, maximum size %lu", 00241 (unsigned long) len, 00242 (unsigned long) MaxHeapTupleSize))); 00243 00244 /* Compute desired extra freespace due to fillfactor option */ 00245 saveFreeSpace = RelationGetTargetPageFreeSpace(relation, 00246 HEAP_DEFAULT_FILLFACTOR); 00247 00248 if (otherBuffer != InvalidBuffer) 00249 otherBlock = BufferGetBlockNumber(otherBuffer); 00250 else 00251 otherBlock = InvalidBlockNumber; /* just to keep compiler quiet */ 00252 00253 /* 00254 * We first try to put the tuple on the same page we last inserted a tuple 00255 * on, as cached in the BulkInsertState or relcache entry. If that 00256 * doesn't work, we ask the Free Space Map to locate a suitable page. 00257 * Since the FSM's info might be out of date, we have to be prepared to 00258 * loop around and retry multiple times. (To insure this isn't an infinite 00259 * loop, we must update the FSM with the correct amount of free space on 00260 * each page that proves not to be suitable.) If the FSM has no record of 00261 * a page with enough free space, we give up and extend the relation. 00262 * 00263 * When use_fsm is false, we either put the tuple onto the existing target 00264 * page or extend the relation. 00265 */ 00266 if (len + saveFreeSpace > MaxHeapTupleSize) 00267 { 00268 /* can't fit, don't bother asking FSM */ 00269 targetBlock = InvalidBlockNumber; 00270 use_fsm = false; 00271 } 00272 else if (bistate && bistate->current_buf != InvalidBuffer) 00273 targetBlock = BufferGetBlockNumber(bistate->current_buf); 00274 else 00275 targetBlock = RelationGetTargetBlock(relation); 00276 00277 if (targetBlock == InvalidBlockNumber && use_fsm) 00278 { 00279 /* 00280 * We have no cached target page, so ask the FSM for an initial 00281 * target. 00282 */ 00283 targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace); 00284 00285 /* 00286 * If the FSM knows nothing of the rel, try the last page before we 00287 * give up and extend. This avoids one-tuple-per-page syndrome during 00288 * bootstrapping or in a recently-started system. 00289 */ 00290 if (targetBlock == InvalidBlockNumber) 00291 { 00292 BlockNumber nblocks = RelationGetNumberOfBlocks(relation); 00293 00294 if (nblocks > 0) 00295 targetBlock = nblocks - 1; 00296 } 00297 } 00298 00299 while (targetBlock != InvalidBlockNumber) 00300 { 00301 /* 00302 * Read and exclusive-lock the target block, as well as the other 00303 * block if one was given, taking suitable care with lock ordering and 00304 * the possibility they are the same block. 00305 * 00306 * If the page-level all-visible flag is set, caller will need to 00307 * clear both that and the corresponding visibility map bit. However, 00308 * by the time we return, we'll have x-locked the buffer, and we don't 00309 * want to do any I/O while in that state. So we check the bit here 00310 * before taking the lock, and pin the page if it appears necessary. 00311 * Checking without the lock creates a risk of getting the wrong 00312 * answer, so we'll have to recheck after acquiring the lock. 00313 */ 00314 if (otherBuffer == InvalidBuffer) 00315 { 00316 /* easy case */ 00317 buffer = ReadBufferBI(relation, targetBlock, bistate); 00318 if (PageIsAllVisible(BufferGetPage(buffer))) 00319 visibilitymap_pin(relation, targetBlock, vmbuffer); 00320 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); 00321 } 00322 else if (otherBlock == targetBlock) 00323 { 00324 /* also easy case */ 00325 buffer = otherBuffer; 00326 if (PageIsAllVisible(BufferGetPage(buffer))) 00327 visibilitymap_pin(relation, targetBlock, vmbuffer); 00328 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); 00329 } 00330 else if (otherBlock < targetBlock) 00331 { 00332 /* lock other buffer first */ 00333 buffer = ReadBuffer(relation, targetBlock); 00334 if (PageIsAllVisible(BufferGetPage(buffer))) 00335 visibilitymap_pin(relation, targetBlock, vmbuffer); 00336 LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); 00337 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); 00338 } 00339 else 00340 { 00341 /* lock target buffer first */ 00342 buffer = ReadBuffer(relation, targetBlock); 00343 if (PageIsAllVisible(BufferGetPage(buffer))) 00344 visibilitymap_pin(relation, targetBlock, vmbuffer); 00345 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); 00346 LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); 00347 } 00348 00349 /* 00350 * We now have the target page (and the other buffer, if any) pinned 00351 * and locked. However, since our initial PageIsAllVisible checks 00352 * were performed before acquiring the lock, the results might now be 00353 * out of date, either for the selected victim buffer, or for the 00354 * other buffer passed by the caller. In that case, we'll need to 00355 * give up our locks, go get the pin(s) we failed to get earlier, and 00356 * re-lock. That's pretty painful, but hopefully shouldn't happen 00357 * often. 00358 * 00359 * Note that there's a small possibility that we didn't pin the page 00360 * above but still have the correct page pinned anyway, either because 00361 * we've already made a previous pass through this loop, or because 00362 * caller passed us the right page anyway. 00363 * 00364 * Note also that it's possible that by the time we get the pin and 00365 * retake the buffer locks, the visibility map bit will have been 00366 * cleared by some other backend anyway. In that case, we'll have 00367 * done a bit of extra work for no gain, but there's no real harm 00368 * done. 00369 */ 00370 if (otherBuffer == InvalidBuffer || buffer <= otherBuffer) 00371 GetVisibilityMapPins(relation, buffer, otherBuffer, 00372 targetBlock, otherBlock, vmbuffer, 00373 vmbuffer_other); 00374 else 00375 GetVisibilityMapPins(relation, otherBuffer, buffer, 00376 otherBlock, targetBlock, vmbuffer_other, 00377 vmbuffer); 00378 00379 /* 00380 * Now we can check to see if there's enough free space here. If so, 00381 * we're done. 00382 */ 00383 page = BufferGetPage(buffer); 00384 pageFreeSpace = PageGetHeapFreeSpace(page); 00385 if (len + saveFreeSpace <= pageFreeSpace) 00386 { 00387 /* use this page as future insert target, too */ 00388 RelationSetTargetBlock(relation, targetBlock); 00389 return buffer; 00390 } 00391 00392 /* 00393 * Not enough space, so we must give up our page locks and pin (if 00394 * any) and prepare to look elsewhere. We don't care which order we 00395 * unlock the two buffers in, so this can be slightly simpler than the 00396 * code above. 00397 */ 00398 LockBuffer(buffer, BUFFER_LOCK_UNLOCK); 00399 if (otherBuffer == InvalidBuffer) 00400 ReleaseBuffer(buffer); 00401 else if (otherBlock != targetBlock) 00402 { 00403 LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK); 00404 ReleaseBuffer(buffer); 00405 } 00406 00407 /* Without FSM, always fall out of the loop and extend */ 00408 if (!use_fsm) 00409 break; 00410 00411 /* 00412 * Update FSM as to condition of this page, and ask for another page 00413 * to try. 00414 */ 00415 targetBlock = RecordAndGetPageWithFreeSpace(relation, 00416 targetBlock, 00417 pageFreeSpace, 00418 len + saveFreeSpace); 00419 } 00420 00421 /* 00422 * Have to extend the relation. 00423 * 00424 * We have to use a lock to ensure no one else is extending the rel at the 00425 * same time, else we will both try to initialize the same new page. We 00426 * can skip locking for new or temp relations, however, since no one else 00427 * could be accessing them. 00428 */ 00429 needLock = !RELATION_IS_LOCAL(relation); 00430 00431 if (needLock) 00432 LockRelationForExtension(relation, ExclusiveLock); 00433 00434 /* 00435 * XXX This does an lseek - rather expensive - but at the moment it is the 00436 * only way to accurately determine how many blocks are in a relation. Is 00437 * it worth keeping an accurate file length in shared memory someplace, 00438 * rather than relying on the kernel to do it for us? 00439 */ 00440 buffer = ReadBufferBI(relation, P_NEW, bistate); 00441 00442 /* 00443 * We can be certain that locking the otherBuffer first is OK, since it 00444 * must have a lower page number. 00445 */ 00446 if (otherBuffer != InvalidBuffer) 00447 LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); 00448 00449 /* 00450 * Now acquire lock on the new page. 00451 */ 00452 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); 00453 00454 /* 00455 * Release the file-extension lock; it's now OK for someone else to extend 00456 * the relation some more. Note that we cannot release this lock before 00457 * we have buffer lock on the new page, or we risk a race condition 00458 * against vacuumlazy.c --- see comments therein. 00459 */ 00460 if (needLock) 00461 UnlockRelationForExtension(relation, ExclusiveLock); 00462 00463 /* 00464 * We need to initialize the empty new page. Double-check that it really 00465 * is empty (this should never happen, but if it does we don't want to 00466 * risk wiping out valid data). 00467 */ 00468 page = BufferGetPage(buffer); 00469 00470 if (!PageIsNew(page)) 00471 elog(ERROR, "page %u of relation \"%s\" should be empty but is not", 00472 BufferGetBlockNumber(buffer), 00473 RelationGetRelationName(relation)); 00474 00475 PageInit(page, BufferGetPageSize(buffer), 0); 00476 00477 if (len > PageGetHeapFreeSpace(page)) 00478 { 00479 /* We should not get here given the test at the top */ 00480 elog(PANIC, "tuple is too big: size %lu", (unsigned long) len); 00481 } 00482 00483 /* 00484 * Remember the new page as our target for future insertions. 00485 * 00486 * XXX should we enter the new page into the free space map immediately, 00487 * or just keep it for this backend's exclusive use in the short run 00488 * (until VACUUM sees it)? Seems to depend on whether you expect the 00489 * current backend to make more insertions or not, which is probably a 00490 * good bet most of the time. So for now, don't add it to FSM yet. 00491 */ 00492 RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer)); 00493 00494 return buffer; 00495 }