00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184 #include "postgres.h"
00185
00186 #include "access/htup_details.h"
00187 #include "access/slru.h"
00188 #include "access/subtrans.h"
00189 #include "access/transam.h"
00190 #include "access/twophase.h"
00191 #include "access/twophase_rmgr.h"
00192 #include "access/xact.h"
00193 #include "miscadmin.h"
00194 #include "storage/bufmgr.h"
00195 #include "storage/predicate.h"
00196 #include "storage/predicate_internals.h"
00197 #include "storage/proc.h"
00198 #include "storage/procarray.h"
00199 #include "utils/rel.h"
00200 #include "utils/snapmgr.h"
00201 #include "utils/tqual.h"
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218 #define TargetTagIsCoveredBy(covered_target, covering_target) \
00219 ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == \
00220 GET_PREDICATELOCKTARGETTAG_RELATION(covering_target)) \
00221 && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) == \
00222 InvalidOffsetNumber) \
00223 && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) != \
00224 InvalidOffsetNumber) \
00225 && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
00226 GET_PREDICATELOCKTARGETTAG_PAGE(covered_target))) \
00227 || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
00228 InvalidBlockNumber) \
00229 && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target) \
00230 != InvalidBlockNumber))) \
00231 && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) == \
00232 GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
00233
00234
00235
00236
00237
00238
00239
00240
00241 #define PredicateLockHashPartition(hashcode) \
00242 ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
00243 #define PredicateLockHashPartitionLock(hashcode) \
00244 ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
00245
00246 #define NPREDICATELOCKTARGETENTS() \
00247 mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
00248
00249 #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259 #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
00260 #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
00261 #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
00262 #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
00263 #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
00264 #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
00265 #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
00266
00267
00268
00269
00270
00271 #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
00272 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
00273 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
00274 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284 #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
00285 (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG)))
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297 #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
00298 ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
00299 << LOG2_NUM_PREDICATELOCK_PARTITIONS)
00300
00301
00302
00303
00304
00305 static SlruCtlData OldSerXidSlruCtlData;
00306
00307 #define OldSerXidSlruCtl (&OldSerXidSlruCtlData)
00308
00309 #define OLDSERXID_PAGESIZE BLCKSZ
00310 #define OLDSERXID_ENTRYSIZE sizeof(SerCommitSeqNo)
00311 #define OLDSERXID_ENTRIESPERPAGE (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
00312
00313
00314
00315
00316
00317 #define OLDSERXID_MAX_PAGE Min(SLRU_PAGES_PER_SEGMENT * 0x10000 - 1, \
00318 (MaxTransactionId) / OLDSERXID_ENTRIESPERPAGE)
00319
00320 #define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
00321
00322 #define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
00323 (OldSerXidSlruCtl->shared->page_buffer[slotno] + \
00324 ((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
00325
00326 #define OldSerXidPage(xid) ((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
00327 #define OldSerXidSegment(page) ((page) / SLRU_PAGES_PER_SEGMENT)
00328
00329 typedef struct OldSerXidControlData
00330 {
00331 int headPage;
00332 TransactionId headXid;
00333 TransactionId tailXid;
00334 bool warningIssued;
00335 } OldSerXidControlData;
00336
00337 typedef struct OldSerXidControlData *OldSerXidControl;
00338
00339 static OldSerXidControl oldSerXidControl;
00340
00341
00342
00343
00344
00345
00346
00347 static SERIALIZABLEXACT *OldCommittedSxact;
00348
00349
00350
00351 int max_predicate_locks_per_xact;
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362 static PredXactList PredXact;
00363
00364
00365
00366
00367
00368 static RWConflictPoolHeader RWConflictPool;
00369
00370
00371
00372
00373
00374 static HTAB *SerializableXidHash;
00375 static HTAB *PredicateLockTargetHash;
00376 static HTAB *PredicateLockHash;
00377 static SHM_QUEUE *FinishedSerializableTransactions;
00378
00379
00380
00381
00382
00383
00384 static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0, 0};
00385 static uint32 ScratchTargetTagHash;
00386 static int ScratchPartitionLock;
00387
00388
00389
00390
00391
00392 static HTAB *LocalPredicateLockHash = NULL;
00393
00394
00395
00396
00397
00398
00399 static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
00400 static bool MyXactDidWrite = false;
00401
00402
00403
00404 static SERIALIZABLEXACT *CreatePredXact(void);
00405 static void ReleasePredXact(SERIALIZABLEXACT *sxact);
00406 static SERIALIZABLEXACT *FirstPredXact(void);
00407 static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
00408
00409 static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
00410 static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
00411 static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
00412 static void ReleaseRWConflict(RWConflict conflict);
00413 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
00414
00415 static bool OldSerXidPagePrecedesLogically(int p, int q);
00416 static void OldSerXidInit(void);
00417 static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
00418 static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
00419 static void OldSerXidSetActiveSerXmin(TransactionId xid);
00420
00421 static uint32 predicatelock_hash(const void *key, Size keysize);
00422 static void SummarizeOldestCommittedSxact(void);
00423 static Snapshot GetSafeSnapshot(Snapshot snapshot);
00424 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
00425 TransactionId sourcexid);
00426 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
00427 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
00428 PREDICATELOCKTARGETTAG *parent);
00429 static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
00430 static void RemoveScratchTarget(bool lockheld);
00431 static void RestoreScratchTarget(bool lockheld);
00432 static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
00433 uint32 targettaghash);
00434 static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
00435 static int PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
00436 static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
00437 static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
00438 static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
00439 uint32 targettaghash,
00440 SERIALIZABLEXACT *sxact);
00441 static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
00442 static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
00443 PREDICATELOCKTARGETTAG newtargettag,
00444 bool removeOld);
00445 static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
00446 static void DropAllPredicateLocksFromTable(Relation relation,
00447 bool transfer);
00448 static void SetNewSxactGlobalXmin(void);
00449 static void ClearOldPredicateLocks(void);
00450 static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
00451 bool summarize);
00452 static bool XidIsConcurrent(TransactionId xid);
00453 static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
00454 static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
00455 static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
00456 SERIALIZABLEXACT *writer);
00457
00458
00459
00460
00461
00462
00463
00464
00465 static inline bool
00466 PredicateLockingNeededForRelation(Relation relation)
00467 {
00468 return !(relation->rd_id < FirstBootstrapObjectId ||
00469 RelationUsesLocalBuffers(relation) ||
00470 relation->rd_rel->relkind == RELKIND_MATVIEW);
00471 }
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484 static inline bool
00485 SerializationNeededForRead(Relation relation, Snapshot snapshot)
00486 {
00487
00488 if (MySerializableXact == InvalidSerializableXact)
00489 return false;
00490
00491
00492
00493
00494
00495
00496
00497
00498 if (!IsMVCCSnapshot(snapshot))
00499 return false;
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511 if (SxactIsROSafe(MySerializableXact))
00512 {
00513 ReleasePredicateLocks(false);
00514 return false;
00515 }
00516
00517
00518 if (!PredicateLockingNeededForRelation(relation))
00519 return false;
00520
00521 return true;
00522 }
00523
00524
00525
00526
00527
00528 static inline bool
00529 SerializationNeededForWrite(Relation relation)
00530 {
00531
00532 if (MySerializableXact == InvalidSerializableXact)
00533 return false;
00534
00535
00536 if (!PredicateLockingNeededForRelation(relation))
00537 return false;
00538
00539 return true;
00540 }
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550 static SERIALIZABLEXACT *
00551 CreatePredXact(void)
00552 {
00553 PredXactListElement ptle;
00554
00555 ptle = (PredXactListElement)
00556 SHMQueueNext(&PredXact->availableList,
00557 &PredXact->availableList,
00558 offsetof(PredXactListElementData, link));
00559 if (!ptle)
00560 return NULL;
00561
00562 SHMQueueDelete(&ptle->link);
00563 SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
00564 return &ptle->sxact;
00565 }
00566
00567 static void
00568 ReleasePredXact(SERIALIZABLEXACT *sxact)
00569 {
00570 PredXactListElement ptle;
00571
00572 Assert(ShmemAddrIsValid(sxact));
00573
00574 ptle = (PredXactListElement)
00575 (((char *) sxact)
00576 - offsetof(PredXactListElementData, sxact)
00577 + offsetof(PredXactListElementData, link));
00578 SHMQueueDelete(&ptle->link);
00579 SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
00580 }
00581
00582 static SERIALIZABLEXACT *
00583 FirstPredXact(void)
00584 {
00585 PredXactListElement ptle;
00586
00587 ptle = (PredXactListElement)
00588 SHMQueueNext(&PredXact->activeList,
00589 &PredXact->activeList,
00590 offsetof(PredXactListElementData, link));
00591 if (!ptle)
00592 return NULL;
00593
00594 return &ptle->sxact;
00595 }
00596
00597 static SERIALIZABLEXACT *
00598 NextPredXact(SERIALIZABLEXACT *sxact)
00599 {
00600 PredXactListElement ptle;
00601
00602 Assert(ShmemAddrIsValid(sxact));
00603
00604 ptle = (PredXactListElement)
00605 (((char *) sxact)
00606 - offsetof(PredXactListElementData, sxact)
00607 + offsetof(PredXactListElementData, link));
00608 ptle = (PredXactListElement)
00609 SHMQueueNext(&PredXact->activeList,
00610 &ptle->link,
00611 offsetof(PredXactListElementData, link));
00612 if (!ptle)
00613 return NULL;
00614
00615 return &ptle->sxact;
00616 }
00617
00618
00619
00620
00621
00622
00623 static bool
00624 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
00625 {
00626 RWConflict conflict;
00627
00628 Assert(reader != writer);
00629
00630
00631 if (SxactIsDoomed(reader)
00632 || SxactIsDoomed(writer)
00633 || SHMQueueEmpty(&reader->outConflicts)
00634 || SHMQueueEmpty(&writer->inConflicts))
00635 return false;
00636
00637
00638 conflict = (RWConflict)
00639 SHMQueueNext(&reader->outConflicts,
00640 &reader->outConflicts,
00641 offsetof(RWConflictData, outLink));
00642 while (conflict)
00643 {
00644 if (conflict->sxactIn == writer)
00645 return true;
00646 conflict = (RWConflict)
00647 SHMQueueNext(&reader->outConflicts,
00648 &conflict->outLink,
00649 offsetof(RWConflictData, outLink));
00650 }
00651
00652
00653 return false;
00654 }
00655
00656 static void
00657 SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
00658 {
00659 RWConflict conflict;
00660
00661 Assert(reader != writer);
00662 Assert(!RWConflictExists(reader, writer));
00663
00664 conflict = (RWConflict)
00665 SHMQueueNext(&RWConflictPool->availableList,
00666 &RWConflictPool->availableList,
00667 offsetof(RWConflictData, outLink));
00668 if (!conflict)
00669 ereport(ERROR,
00670 (errcode(ERRCODE_OUT_OF_MEMORY),
00671 errmsg("not enough elements in RWConflictPool to record a read/write conflict"),
00672 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
00673
00674 SHMQueueDelete(&conflict->outLink);
00675
00676 conflict->sxactOut = reader;
00677 conflict->sxactIn = writer;
00678 SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
00679 SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
00680 }
00681
00682 static void
00683 SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
00684 SERIALIZABLEXACT *activeXact)
00685 {
00686 RWConflict conflict;
00687
00688 Assert(roXact != activeXact);
00689 Assert(SxactIsReadOnly(roXact));
00690 Assert(!SxactIsReadOnly(activeXact));
00691
00692 conflict = (RWConflict)
00693 SHMQueueNext(&RWConflictPool->availableList,
00694 &RWConflictPool->availableList,
00695 offsetof(RWConflictData, outLink));
00696 if (!conflict)
00697 ereport(ERROR,
00698 (errcode(ERRCODE_OUT_OF_MEMORY),
00699 errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"),
00700 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
00701
00702 SHMQueueDelete(&conflict->outLink);
00703
00704 conflict->sxactOut = activeXact;
00705 conflict->sxactIn = roXact;
00706 SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
00707 &conflict->outLink);
00708 SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
00709 &conflict->inLink);
00710 }
00711
00712 static void
00713 ReleaseRWConflict(RWConflict conflict)
00714 {
00715 SHMQueueDelete(&conflict->inLink);
00716 SHMQueueDelete(&conflict->outLink);
00717 SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
00718 }
00719
00720 static void
00721 FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
00722 {
00723 RWConflict conflict,
00724 nextConflict;
00725
00726 Assert(SxactIsReadOnly(sxact));
00727 Assert(!SxactIsROSafe(sxact));
00728
00729 sxact->flags |= SXACT_FLAG_RO_UNSAFE;
00730
00731
00732
00733
00734
00735 conflict = (RWConflict)
00736 SHMQueueNext(&sxact->possibleUnsafeConflicts,
00737 &sxact->possibleUnsafeConflicts,
00738 offsetof(RWConflictData, inLink));
00739 while (conflict)
00740 {
00741 nextConflict = (RWConflict)
00742 SHMQueueNext(&sxact->possibleUnsafeConflicts,
00743 &conflict->inLink,
00744 offsetof(RWConflictData, inLink));
00745
00746 Assert(!SxactIsReadOnly(conflict->sxactOut));
00747 Assert(sxact == conflict->sxactIn);
00748
00749 ReleaseRWConflict(conflict);
00750
00751 conflict = nextConflict;
00752 }
00753 }
00754
00755
00756
00757
00758
00759
00760
00761 static bool
00762 OldSerXidPagePrecedesLogically(int p, int q)
00763 {
00764 int diff;
00765
00766
00767
00768
00769
00770 Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE);
00771 Assert(q >= 0 && q <= OLDSERXID_MAX_PAGE);
00772
00773 diff = p - q;
00774 if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2))
00775 diff -= OLDSERXID_MAX_PAGE + 1;
00776 else if (diff < -((int) (OLDSERXID_MAX_PAGE + 1) / 2))
00777 diff += OLDSERXID_MAX_PAGE + 1;
00778 return diff < 0;
00779 }
00780
00781
00782
00783
00784 static void
00785 OldSerXidInit(void)
00786 {
00787 bool found;
00788
00789
00790
00791
00792 OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
00793 SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl",
00794 NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial");
00795
00796 OldSerXidSlruCtl->do_fsync = false;
00797
00798
00799
00800
00801 oldSerXidControl = (OldSerXidControl)
00802 ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
00803
00804 if (!found)
00805 {
00806
00807
00808
00809 oldSerXidControl->headPage = -1;
00810 oldSerXidControl->headXid = InvalidTransactionId;
00811 oldSerXidControl->tailXid = InvalidTransactionId;
00812 oldSerXidControl->warningIssued = false;
00813 }
00814 }
00815
00816
00817
00818
00819
00820
00821 static void
00822 OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
00823 {
00824 TransactionId tailXid;
00825 int targetPage;
00826 int slotno;
00827 int firstZeroPage;
00828 bool isNewPage;
00829
00830 Assert(TransactionIdIsValid(xid));
00831
00832 targetPage = OldSerXidPage(xid);
00833
00834 LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
00835
00836
00837
00838
00839
00840
00841 tailXid = oldSerXidControl->tailXid;
00842 Assert(TransactionIdIsValid(tailXid));
00843
00844
00845
00846
00847
00848
00849
00850 if (oldSerXidControl->headPage < 0)
00851 {
00852 firstZeroPage = OldSerXidPage(tailXid);
00853 isNewPage = true;
00854 }
00855 else
00856 {
00857 firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
00858 isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
00859 targetPage);
00860 }
00861
00862 if (!TransactionIdIsValid(oldSerXidControl->headXid)
00863 || TransactionIdFollows(xid, oldSerXidControl->headXid))
00864 oldSerXidControl->headXid = xid;
00865 if (isNewPage)
00866 oldSerXidControl->headPage = targetPage;
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883 if (oldSerXidControl->warningIssued)
00884 {
00885 TransactionId lowWatermark;
00886
00887 lowWatermark = tailXid + 800000000;
00888 if (lowWatermark < FirstNormalTransactionId)
00889 lowWatermark = FirstNormalTransactionId;
00890 if (TransactionIdPrecedes(xid, lowWatermark))
00891 oldSerXidControl->warningIssued = false;
00892 }
00893 else
00894 {
00895 TransactionId highWatermark;
00896
00897 highWatermark = tailXid + 1000000000;
00898 if (highWatermark < FirstNormalTransactionId)
00899 highWatermark = FirstNormalTransactionId;
00900 if (TransactionIdFollows(xid, highWatermark))
00901 {
00902 oldSerXidControl->warningIssued = true;
00903 ereport(WARNING,
00904 (errmsg("memory for serializable conflict tracking is nearly exhausted"),
00905 errhint("There might be an idle transaction or a forgotten prepared transaction causing this.")));
00906 }
00907 }
00908
00909 if (isNewPage)
00910 {
00911
00912 while (firstZeroPage != targetPage)
00913 {
00914 (void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
00915 firstZeroPage = OldSerXidNextPage(firstZeroPage);
00916 }
00917 slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
00918 }
00919 else
00920 slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
00921
00922 OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
00923 OldSerXidSlruCtl->shared->page_dirty[slotno] = true;
00924
00925 LWLockRelease(OldSerXidLock);
00926 }
00927
00928
00929
00930
00931
00932
00933 static SerCommitSeqNo
00934 OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
00935 {
00936 TransactionId headXid;
00937 TransactionId tailXid;
00938 SerCommitSeqNo val;
00939 int slotno;
00940
00941 Assert(TransactionIdIsValid(xid));
00942
00943 LWLockAcquire(OldSerXidLock, LW_SHARED);
00944 headXid = oldSerXidControl->headXid;
00945 tailXid = oldSerXidControl->tailXid;
00946 LWLockRelease(OldSerXidLock);
00947
00948 if (!TransactionIdIsValid(headXid))
00949 return 0;
00950
00951 Assert(TransactionIdIsValid(tailXid));
00952
00953 if (TransactionIdPrecedes(xid, tailXid)
00954 || TransactionIdFollows(xid, headXid))
00955 return 0;
00956
00957
00958
00959
00960
00961 slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
00962 OldSerXidPage(xid), xid);
00963 val = OldSerXidValue(slotno, xid);
00964 LWLockRelease(OldSerXidLock);
00965 return val;
00966 }
00967
00968
00969
00970
00971
00972
00973
00974 static void
00975 OldSerXidSetActiveSerXmin(TransactionId xid)
00976 {
00977 LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
00978
00979
00980
00981
00982
00983
00984
00985 if (!TransactionIdIsValid(xid))
00986 {
00987 oldSerXidControl->tailXid = InvalidTransactionId;
00988 oldSerXidControl->headXid = InvalidTransactionId;
00989 LWLockRelease(OldSerXidLock);
00990 return;
00991 }
00992
00993
00994
00995
00996
00997
00998
00999 if (RecoveryInProgress())
01000 {
01001 Assert(oldSerXidControl->headPage < 0);
01002 if (!TransactionIdIsValid(oldSerXidControl->tailXid)
01003 || TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
01004 {
01005 oldSerXidControl->tailXid = xid;
01006 }
01007 LWLockRelease(OldSerXidLock);
01008 return;
01009 }
01010
01011 Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
01012 || TransactionIdFollows(xid, oldSerXidControl->tailXid));
01013
01014 oldSerXidControl->tailXid = xid;
01015
01016 LWLockRelease(OldSerXidLock);
01017 }
01018
01019
01020
01021
01022
01023
01024
01025 void
01026 CheckPointPredicate(void)
01027 {
01028 int tailPage;
01029
01030 LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
01031
01032
01033 if (oldSerXidControl->headPage < 0)
01034 {
01035 LWLockRelease(OldSerXidLock);
01036 return;
01037 }
01038
01039 if (TransactionIdIsValid(oldSerXidControl->tailXid))
01040 {
01041
01042 tailPage = OldSerXidPage(oldSerXidControl->tailXid);
01043 }
01044 else
01045 {
01046
01047
01048
01049
01050
01051
01052
01053
01054
01055
01056 tailPage = oldSerXidControl->headPage;
01057 oldSerXidControl->headPage = -1;
01058 }
01059
01060 LWLockRelease(OldSerXidLock);
01061
01062
01063 SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
01064
01065
01066
01067
01068
01069
01070
01071
01072
01073
01074
01075 SimpleLruFlush(OldSerXidSlruCtl, true);
01076 }
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087
01088
01089
01090 void
01091 InitPredicateLocks(void)
01092 {
01093 HASHCTL info;
01094 int hash_flags;
01095 long max_table_size;
01096 Size requestSize;
01097 bool found;
01098
01099
01100
01101
01102
01103 max_table_size = NPREDICATELOCKTARGETENTS();
01104
01105
01106
01107
01108
01109 MemSet(&info, 0, sizeof(info));
01110 info.keysize = sizeof(PREDICATELOCKTARGETTAG);
01111 info.entrysize = sizeof(PREDICATELOCKTARGET);
01112 info.hash = tag_hash;
01113 info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
01114 hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
01115
01116 PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
01117 max_table_size,
01118 max_table_size,
01119 &info,
01120 hash_flags);
01121
01122
01123 max_table_size *= 2;
01124
01125
01126
01127
01128
01129
01130
01131 hash_search(PredicateLockTargetHash, &ScratchTargetTag, HASH_ENTER, NULL);
01132
01133
01134
01135
01136
01137 MemSet(&info, 0, sizeof(info));
01138 info.keysize = sizeof(PREDICATELOCKTAG);
01139 info.entrysize = sizeof(PREDICATELOCK);
01140 info.hash = predicatelock_hash;
01141 info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
01142 hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
01143
01144 PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
01145 max_table_size,
01146 max_table_size,
01147 &info,
01148 hash_flags);
01149
01150
01151
01152
01153
01154 max_table_size = (MaxBackends + max_prepared_xacts);
01155
01156
01157
01158
01159
01160
01161
01162
01163
01164 max_table_size *= 10;
01165
01166 PredXact = ShmemInitStruct("PredXactList",
01167 PredXactListDataSize,
01168 &found);
01169 if (!found)
01170 {
01171 int i;
01172
01173 SHMQueueInit(&PredXact->availableList);
01174 SHMQueueInit(&PredXact->activeList);
01175 PredXact->SxactGlobalXmin = InvalidTransactionId;
01176 PredXact->SxactGlobalXminCount = 0;
01177 PredXact->WritableSxactCount = 0;
01178 PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
01179 PredXact->CanPartialClearThrough = 0;
01180 PredXact->HavePartialClearedThrough = 0;
01181 requestSize = mul_size((Size) max_table_size,
01182 PredXactListElementDataSize);
01183 PredXact->element = ShmemAlloc(requestSize);
01184 if (PredXact->element == NULL)
01185 ereport(ERROR,
01186 (errcode(ERRCODE_OUT_OF_MEMORY),
01187 errmsg("not enough shared memory for elements of data structure"
01188 " \"%s\" (%lu bytes requested)",
01189 "PredXactList", (unsigned long) requestSize)));
01190
01191 memset(PredXact->element, 0, requestSize);
01192 for (i = 0; i < max_table_size; i++)
01193 {
01194 SHMQueueInsertBefore(&(PredXact->availableList),
01195 &(PredXact->element[i].link));
01196 }
01197 PredXact->OldCommittedSxact = CreatePredXact();
01198 SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
01199 PredXact->OldCommittedSxact->prepareSeqNo = 0;
01200 PredXact->OldCommittedSxact->commitSeqNo = 0;
01201 PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
01202 SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
01203 SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
01204 SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
01205 SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
01206 SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
01207 PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
01208 PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
01209 PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
01210 PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
01211 PredXact->OldCommittedSxact->pid = 0;
01212 }
01213
01214 OldCommittedSxact = PredXact->OldCommittedSxact;
01215
01216
01217
01218
01219
01220 MemSet(&info, 0, sizeof(info));
01221 info.keysize = sizeof(SERIALIZABLEXIDTAG);
01222 info.entrysize = sizeof(SERIALIZABLEXID);
01223 info.hash = tag_hash;
01224 hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE);
01225
01226 SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
01227 max_table_size,
01228 max_table_size,
01229 &info,
01230 hash_flags);
01231
01232
01233
01234
01235
01236
01237
01238
01239
01240
01241
01242
01243 max_table_size *= 5;
01244
01245 RWConflictPool = ShmemInitStruct("RWConflictPool",
01246 RWConflictPoolHeaderDataSize,
01247 &found);
01248 if (!found)
01249 {
01250 int i;
01251
01252 SHMQueueInit(&RWConflictPool->availableList);
01253 requestSize = mul_size((Size) max_table_size,
01254 RWConflictDataSize);
01255 RWConflictPool->element = ShmemAlloc(requestSize);
01256 if (RWConflictPool->element == NULL)
01257 ereport(ERROR,
01258 (errcode(ERRCODE_OUT_OF_MEMORY),
01259 errmsg("not enough shared memory for elements of data structure"
01260 " \"%s\" (%lu bytes requested)",
01261 "RWConflictPool", (unsigned long) requestSize)));
01262
01263 memset(RWConflictPool->element, 0, requestSize);
01264 for (i = 0; i < max_table_size; i++)
01265 {
01266 SHMQueueInsertBefore(&(RWConflictPool->availableList),
01267 &(RWConflictPool->element[i].outLink));
01268 }
01269 }
01270
01271
01272
01273
01274
01275 FinishedSerializableTransactions = (SHM_QUEUE *)
01276 ShmemInitStruct("FinishedSerializableTransactions",
01277 sizeof(SHM_QUEUE),
01278 &found);
01279 if (!found)
01280 SHMQueueInit(FinishedSerializableTransactions);
01281
01282
01283
01284
01285
01286 OldSerXidInit();
01287
01288
01289 ScratchTargetTagHash = PredicateLockTargetTagHashCode(&ScratchTargetTag);
01290 ScratchPartitionLock = PredicateLockHashPartitionLock(ScratchTargetTagHash);
01291 }
01292
01293
01294
01295
01296 Size
01297 PredicateLockShmemSize(void)
01298 {
01299 Size size = 0;
01300 long max_table_size;
01301
01302
01303 max_table_size = NPREDICATELOCKTARGETENTS();
01304 size = add_size(size, hash_estimate_size(max_table_size,
01305 sizeof(PREDICATELOCKTARGET)));
01306
01307
01308 max_table_size *= 2;
01309 size = add_size(size, hash_estimate_size(max_table_size,
01310 sizeof(PREDICATELOCK)));
01311
01312
01313
01314
01315
01316 size = add_size(size, size / 10);
01317
01318
01319 max_table_size = MaxBackends + max_prepared_xacts;
01320 max_table_size *= 10;
01321 size = add_size(size, PredXactListDataSize);
01322 size = add_size(size, mul_size((Size) max_table_size,
01323 PredXactListElementDataSize));
01324
01325
01326 size = add_size(size, hash_estimate_size(max_table_size,
01327 sizeof(SERIALIZABLEXID)));
01328
01329
01330 max_table_size *= 5;
01331 size = add_size(size, RWConflictPoolHeaderDataSize);
01332 size = add_size(size, mul_size((Size) max_table_size,
01333 RWConflictDataSize));
01334
01335
01336 size = add_size(size, sizeof(SHM_QUEUE));
01337
01338
01339 size = add_size(size, sizeof(OldSerXidControlData));
01340 size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
01341
01342 return size;
01343 }
01344
01345
01346
01347
01348
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358 static uint32
01359 predicatelock_hash(const void *key, Size keysize)
01360 {
01361 const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
01362 uint32 targethash;
01363
01364 Assert(keysize == sizeof(PREDICATELOCKTAG));
01365
01366
01367 targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
01368
01369 return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
01370 }
01371
01372
01373
01374
01375
01376
01377
01378
01379
01380
01381
01382
01383
01384 PredicateLockData *
01385 GetPredicateLockStatusData(void)
01386 {
01387 PredicateLockData *data;
01388 int i;
01389 int els,
01390 el;
01391 HASH_SEQ_STATUS seqstat;
01392 PREDICATELOCK *predlock;
01393
01394 data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
01395
01396
01397
01398
01399
01400 for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
01401 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
01402 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
01403
01404
01405 els = hash_get_num_entries(PredicateLockHash);
01406 data->nelements = els;
01407 data->locktags = (PREDICATELOCKTARGETTAG *)
01408 palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
01409 data->xacts = (SERIALIZABLEXACT *)
01410 palloc(sizeof(SERIALIZABLEXACT) * els);
01411
01412
01413
01414 hash_seq_init(&seqstat, PredicateLockHash);
01415
01416 el = 0;
01417
01418 while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
01419 {
01420 data->locktags[el] = predlock->tag.myTarget->tag;
01421 data->xacts[el] = *predlock->tag.myXact;
01422 el++;
01423 }
01424
01425 Assert(el == els);
01426
01427
01428 LWLockRelease(SerializableXactHashLock);
01429 for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
01430 LWLockRelease(FirstPredicateLockMgrLock + i);
01431
01432 return data;
01433 }
01434
01435
01436
01437
01438
01439
01440
01441
01442 static void
01443 SummarizeOldestCommittedSxact(void)
01444 {
01445 SERIALIZABLEXACT *sxact;
01446
01447 LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
01448
01449
01450
01451
01452
01453
01454
01455
01456
01457
01458
01459 if (SHMQueueEmpty(FinishedSerializableTransactions))
01460 {
01461 LWLockRelease(SerializableFinishedListLock);
01462 return;
01463 }
01464
01465
01466
01467
01468
01469 sxact = (SERIALIZABLEXACT *)
01470 SHMQueueNext(FinishedSerializableTransactions,
01471 FinishedSerializableTransactions,
01472 offsetof(SERIALIZABLEXACT, finishedLink));
01473 SHMQueueDelete(&(sxact->finishedLink));
01474
01475
01476 if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
01477 OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
01478 ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
01479
01480
01481 ReleaseOneSerializableXact(sxact, false, true);
01482
01483 LWLockRelease(SerializableFinishedListLock);
01484 }
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495
01496
01497
01498
01499 static Snapshot
01500 GetSafeSnapshot(Snapshot origSnapshot)
01501 {
01502 Snapshot snapshot;
01503
01504 Assert(XactReadOnly && XactDeferrable);
01505
01506 while (true)
01507 {
01508
01509
01510
01511
01512
01513
01514 snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
01515 InvalidTransactionId);
01516
01517 if (MySerializableXact == InvalidSerializableXact)
01518 return snapshot;
01519
01520 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01521
01522
01523
01524
01525
01526 MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
01527 while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
01528 SxactIsROUnsafe(MySerializableXact)))
01529 {
01530 LWLockRelease(SerializableXactHashLock);
01531 ProcWaitForSignal();
01532 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01533 }
01534 MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
01535
01536 if (!SxactIsROUnsafe(MySerializableXact))
01537 {
01538 LWLockRelease(SerializableXactHashLock);
01539 break;
01540 }
01541
01542 LWLockRelease(SerializableXactHashLock);
01543
01544
01545 ereport(DEBUG2,
01546 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
01547 errmsg("deferrable snapshot was unsafe; trying a new one")));
01548 ReleasePredicateLocks(false);
01549 }
01550
01551
01552
01553
01554 Assert(SxactIsROSafe(MySerializableXact));
01555 ReleasePredicateLocks(false);
01556
01557 return snapshot;
01558 }
01559
01560
01561
01562
01563
01564
01565
01566
01567
01568
01569
01570
01571 Snapshot
01572 GetSerializableTransactionSnapshot(Snapshot snapshot)
01573 {
01574 Assert(IsolationIsSerializable());
01575
01576
01577
01578
01579
01580
01581
01582 if (RecoveryInProgress())
01583 ereport(ERROR,
01584 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01585 errmsg("cannot use serializable mode in a hot standby"),
01586 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
01587 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
01588
01589
01590
01591
01592
01593
01594 if (XactReadOnly && XactDeferrable)
01595 return GetSafeSnapshot(snapshot);
01596
01597 return GetSerializableTransactionSnapshotInt(snapshot,
01598 InvalidTransactionId);
01599 }
01600
01601
01602
01603
01604
01605
01606
01607
01608
01609
01610
01611 void
01612 SetSerializableTransactionSnapshot(Snapshot snapshot,
01613 TransactionId sourcexid)
01614 {
01615 Assert(IsolationIsSerializable());
01616
01617
01618
01619
01620
01621
01622
01623 if (XactReadOnly && XactDeferrable)
01624 ereport(ERROR,
01625 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01626 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
01627
01628 (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
01629 }
01630
01631
01632
01633
01634
01635
01636
01637
01638
01639
01640 static Snapshot
01641 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
01642 TransactionId sourcexid)
01643 {
01644 PGPROC *proc;
01645 VirtualTransactionId vxid;
01646 SERIALIZABLEXACT *sxact,
01647 *othersxact;
01648 HASHCTL hash_ctl;
01649
01650
01651 Assert(MySerializableXact == InvalidSerializableXact);
01652
01653 Assert(!RecoveryInProgress());
01654
01655 proc = MyProc;
01656 Assert(proc != NULL);
01657 GET_VXID_FROM_PGPROC(vxid, *proc);
01658
01659
01660
01661
01662
01663
01664
01665
01666
01667
01668
01669
01670
01671 #ifdef TEST_OLDSERXID
01672 SummarizeOldestCommittedSxact();
01673 #endif
01674 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01675 do
01676 {
01677 sxact = CreatePredXact();
01678
01679 if (!sxact)
01680 {
01681 LWLockRelease(SerializableXactHashLock);
01682 SummarizeOldestCommittedSxact();
01683 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01684 }
01685 } while (!sxact);
01686
01687
01688 if (!TransactionIdIsValid(sourcexid))
01689 snapshot = GetSnapshotData(snapshot);
01690 else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
01691 {
01692 ReleasePredXact(sxact);
01693 LWLockRelease(SerializableXactHashLock);
01694 ereport(ERROR,
01695 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
01696 errmsg("could not import the requested snapshot"),
01697 errdetail("The source transaction %u is not running anymore.",
01698 sourcexid)));
01699 }
01700
01701
01702
01703
01704
01705
01706
01707
01708
01709
01710
01711
01712
01713 if (XactReadOnly && PredXact->WritableSxactCount == 0)
01714 {
01715 ReleasePredXact(sxact);
01716 LWLockRelease(SerializableXactHashLock);
01717 return snapshot;
01718 }
01719
01720
01721 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
01722 {
01723 Assert(PredXact->SxactGlobalXminCount == 0);
01724 PredXact->SxactGlobalXmin = snapshot->xmin;
01725 PredXact->SxactGlobalXminCount = 1;
01726 OldSerXidSetActiveSerXmin(snapshot->xmin);
01727 }
01728 else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
01729 {
01730 Assert(PredXact->SxactGlobalXminCount > 0);
01731 PredXact->SxactGlobalXminCount++;
01732 }
01733 else
01734 {
01735 Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
01736 }
01737
01738
01739 sxact->vxid = vxid;
01740 sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
01741 sxact->prepareSeqNo = InvalidSerCommitSeqNo;
01742 sxact->commitSeqNo = InvalidSerCommitSeqNo;
01743 SHMQueueInit(&(sxact->outConflicts));
01744 SHMQueueInit(&(sxact->inConflicts));
01745 SHMQueueInit(&(sxact->possibleUnsafeConflicts));
01746 sxact->topXid = GetTopTransactionIdIfAny();
01747 sxact->finishedBefore = InvalidTransactionId;
01748 sxact->xmin = snapshot->xmin;
01749 sxact->pid = MyProcPid;
01750 SHMQueueInit(&(sxact->predicateLocks));
01751 SHMQueueElemInit(&(sxact->finishedLink));
01752 sxact->flags = 0;
01753 if (XactReadOnly)
01754 {
01755 sxact->flags |= SXACT_FLAG_READ_ONLY;
01756
01757
01758
01759
01760
01761
01762
01763 for (othersxact = FirstPredXact();
01764 othersxact != NULL;
01765 othersxact = NextPredXact(othersxact))
01766 {
01767 if (!SxactIsCommitted(othersxact)
01768 && !SxactIsDoomed(othersxact)
01769 && !SxactIsReadOnly(othersxact))
01770 {
01771 SetPossibleUnsafeConflict(sxact, othersxact);
01772 }
01773 }
01774 }
01775 else
01776 {
01777 ++(PredXact->WritableSxactCount);
01778 Assert(PredXact->WritableSxactCount <=
01779 (MaxBackends + max_prepared_xacts));
01780 }
01781
01782 MySerializableXact = sxact;
01783 MyXactDidWrite = false;
01784
01785 LWLockRelease(SerializableXactHashLock);
01786
01787
01788 Assert(LocalPredicateLockHash == NULL);
01789 MemSet(&hash_ctl, 0, sizeof(hash_ctl));
01790 hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
01791 hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
01792 hash_ctl.hash = tag_hash;
01793 LocalPredicateLockHash = hash_create("Local predicate lock",
01794 max_predicate_locks_per_xact,
01795 &hash_ctl,
01796 HASH_ELEM | HASH_FUNCTION);
01797
01798 return snapshot;
01799 }
01800
01801
01802
01803
01804
01805 void
01806 RegisterPredicateLockingXid(TransactionId xid)
01807 {
01808 SERIALIZABLEXIDTAG sxidtag;
01809 SERIALIZABLEXID *sxid;
01810 bool found;
01811
01812
01813
01814
01815
01816 if (MySerializableXact == InvalidSerializableXact)
01817 return;
01818
01819
01820 Assert(TransactionIdIsValid(xid));
01821
01822 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01823
01824
01825 Assert(MySerializableXact->topXid == InvalidTransactionId);
01826
01827 MySerializableXact->topXid = xid;
01828
01829 sxidtag.xid = xid;
01830 sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
01831 &sxidtag,
01832 HASH_ENTER, &found);
01833 Assert(!found);
01834
01835
01836 sxid->myXact = MySerializableXact;
01837 LWLockRelease(SerializableXactHashLock);
01838 }
01839
01840
01841
01842
01843
01844
01845
01846
01847
01848
01849
01850
01851
01852
01853
01854 bool
01855 PageIsPredicateLocked(Relation relation, BlockNumber blkno)
01856 {
01857 PREDICATELOCKTARGETTAG targettag;
01858 uint32 targettaghash;
01859 LWLockId partitionLock;
01860 PREDICATELOCKTARGET *target;
01861
01862 SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
01863 relation->rd_node.dbNode,
01864 relation->rd_id,
01865 blkno);
01866
01867 targettaghash = PredicateLockTargetTagHashCode(&targettag);
01868 partitionLock = PredicateLockHashPartitionLock(targettaghash);
01869 LWLockAcquire(partitionLock, LW_SHARED);
01870 target = (PREDICATELOCKTARGET *)
01871 hash_search_with_hash_value(PredicateLockTargetHash,
01872 &targettag, targettaghash,
01873 HASH_FIND, NULL);
01874 LWLockRelease(partitionLock);
01875
01876 return (target != NULL);
01877 }
01878
01879
01880
01881
01882
01883
01884
01885
01886
01887
01888
01889
01890
01891 static bool
01892 PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
01893 {
01894 LOCALPREDICATELOCK *lock;
01895
01896
01897 lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
01898 targettag,
01899 HASH_FIND, NULL);
01900
01901 if (!lock)
01902 return false;
01903
01904
01905
01906
01907
01908 return lock->held;
01909 }
01910
01911
01912
01913
01914
01915
01916
01917
01918 static bool
01919 GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
01920 PREDICATELOCKTARGETTAG *parent)
01921 {
01922 switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
01923 {
01924 case PREDLOCKTAG_RELATION:
01925
01926 return false;
01927
01928 case PREDLOCKTAG_PAGE:
01929
01930 SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
01931 GET_PREDICATELOCKTARGETTAG_DB(*tag),
01932 GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
01933
01934 return true;
01935
01936 case PREDLOCKTAG_TUPLE:
01937
01938 SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
01939 GET_PREDICATELOCKTARGETTAG_DB(*tag),
01940 GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
01941 GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
01942 return true;
01943 }
01944
01945
01946 Assert(false);
01947 return false;
01948 }
01949
01950
01951
01952
01953
01954
01955
01956
01957 static bool
01958 CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
01959 {
01960 PREDICATELOCKTARGETTAG targettag,
01961 parenttag;
01962
01963 targettag = *newtargettag;
01964
01965
01966 while (GetParentPredicateLockTag(&targettag, &parenttag))
01967 {
01968 targettag = parenttag;
01969 if (PredicateLockExists(&targettag))
01970 return true;
01971 }
01972
01973
01974 return false;
01975 }
01976
01977
01978
01979
01980
01981
01982
01983
01984
01985
01986 static void
01987 RemoveScratchTarget(bool lockheld)
01988 {
01989 bool found;
01990
01991 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
01992
01993 if (!lockheld)
01994 LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
01995 hash_search_with_hash_value(PredicateLockTargetHash,
01996 &ScratchTargetTag,
01997 ScratchTargetTagHash,
01998 HASH_REMOVE, &found);
01999 Assert(found);
02000 if (!lockheld)
02001 LWLockRelease(ScratchPartitionLock);
02002 }
02003
02004
02005
02006
02007 static void
02008 RestoreScratchTarget(bool lockheld)
02009 {
02010 bool found;
02011
02012 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
02013
02014 if (!lockheld)
02015 LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
02016 hash_search_with_hash_value(PredicateLockTargetHash,
02017 &ScratchTargetTag,
02018 ScratchTargetTagHash,
02019 HASH_ENTER, &found);
02020 Assert(!found);
02021 if (!lockheld)
02022 LWLockRelease(ScratchPartitionLock);
02023 }
02024
02025
02026
02027
02028
02029 static void
02030 RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
02031 {
02032 PREDICATELOCKTARGET *rmtarget PG_USED_FOR_ASSERTS_ONLY;
02033
02034 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
02035
02036
02037 if (!SHMQueueEmpty(&target->predicateLocks))
02038 return;
02039
02040
02041 rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
02042 &target->tag,
02043 targettaghash,
02044 HASH_REMOVE, NULL);
02045 Assert(rmtarget == target);
02046 }
02047
02048
02049
02050
02051
02052
02053
02054
02055
02056
02057
02058 static void
02059 DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
02060 {
02061 SERIALIZABLEXACT *sxact;
02062 PREDICATELOCK *predlock;
02063
02064 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
02065 sxact = MySerializableXact;
02066 predlock = (PREDICATELOCK *)
02067 SHMQueueNext(&(sxact->predicateLocks),
02068 &(sxact->predicateLocks),
02069 offsetof(PREDICATELOCK, xactLink));
02070 while (predlock)
02071 {
02072 SHM_QUEUE *predlocksxactlink;
02073 PREDICATELOCK *nextpredlock;
02074 PREDICATELOCKTAG oldlocktag;
02075 PREDICATELOCKTARGET *oldtarget;
02076 PREDICATELOCKTARGETTAG oldtargettag;
02077
02078 predlocksxactlink = &(predlock->xactLink);
02079 nextpredlock = (PREDICATELOCK *)
02080 SHMQueueNext(&(sxact->predicateLocks),
02081 predlocksxactlink,
02082 offsetof(PREDICATELOCK, xactLink));
02083
02084 oldlocktag = predlock->tag;
02085 Assert(oldlocktag.myXact == sxact);
02086 oldtarget = oldlocktag.myTarget;
02087 oldtargettag = oldtarget->tag;
02088
02089 if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
02090 {
02091 uint32 oldtargettaghash;
02092 LWLockId partitionLock;
02093 PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY;
02094
02095 oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
02096 partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
02097
02098 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
02099
02100 SHMQueueDelete(predlocksxactlink);
02101 SHMQueueDelete(&(predlock->targetLink));
02102 rmpredlock = hash_search_with_hash_value
02103 (PredicateLockHash,
02104 &oldlocktag,
02105 PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
02106 oldtargettaghash),
02107 HASH_REMOVE, NULL);
02108 Assert(rmpredlock == predlock);
02109
02110 RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
02111
02112 LWLockRelease(partitionLock);
02113
02114 DecrementParentLocks(&oldtargettag);
02115 }
02116
02117 predlock = nextpredlock;
02118 }
02119 LWLockRelease(SerializablePredicateLockListLock);
02120 }
02121
02122
02123
02124
02125
02126
02127
02128
02129
02130
02131
02132
02133
02134
02135 static int
02136 PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
02137 {
02138 switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
02139 {
02140 case PREDLOCKTAG_RELATION:
02141 return max_predicate_locks_per_xact / 2;
02142
02143 case PREDLOCKTAG_PAGE:
02144 return 3;
02145
02146 case PREDLOCKTAG_TUPLE:
02147
02148
02149
02150
02151
02152 Assert(false);
02153 return 0;
02154 }
02155
02156
02157 Assert(false);
02158 return 0;
02159 }
02160
02161
02162
02163
02164
02165
02166
02167
02168
02169 static bool
02170 CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
02171 {
02172 PREDICATELOCKTARGETTAG targettag,
02173 nexttag,
02174 promotiontag;
02175 LOCALPREDICATELOCK *parentlock;
02176 bool found,
02177 promote;
02178
02179 promote = false;
02180
02181 targettag = *reqtag;
02182
02183
02184 while (GetParentPredicateLockTag(&targettag, &nexttag))
02185 {
02186 targettag = nexttag;
02187 parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
02188 &targettag,
02189 HASH_ENTER,
02190 &found);
02191 if (!found)
02192 {
02193 parentlock->held = false;
02194 parentlock->childLocks = 1;
02195 }
02196 else
02197 parentlock->childLocks++;
02198
02199 if (parentlock->childLocks >=
02200 PredicateLockPromotionThreshold(&targettag))
02201 {
02202
02203
02204
02205
02206
02207
02208 promotiontag = targettag;
02209 promote = true;
02210 }
02211 }
02212
02213 if (promote)
02214 {
02215
02216 PredicateLockAcquire(&promotiontag);
02217 return true;
02218 }
02219 else
02220 return false;
02221 }
02222
02223
02224
02225
02226
02227
02228
02229
02230
02231
02232
02233
02234 static void
02235 DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
02236 {
02237 PREDICATELOCKTARGETTAG parenttag,
02238 nexttag;
02239
02240 parenttag = *targettag;
02241
02242 while (GetParentPredicateLockTag(&parenttag, &nexttag))
02243 {
02244 uint32 targettaghash;
02245 LOCALPREDICATELOCK *parentlock,
02246 *rmlock PG_USED_FOR_ASSERTS_ONLY;
02247
02248 parenttag = nexttag;
02249 targettaghash = PredicateLockTargetTagHashCode(&parenttag);
02250 parentlock = (LOCALPREDICATELOCK *)
02251 hash_search_with_hash_value(LocalPredicateLockHash,
02252 &parenttag, targettaghash,
02253 HASH_FIND, NULL);
02254
02255
02256
02257
02258
02259
02260 if (parentlock == NULL)
02261 continue;
02262
02263 parentlock->childLocks--;
02264
02265
02266
02267
02268
02269
02270 if (parentlock->childLocks < 0)
02271 {
02272 Assert(parentlock->held);
02273 parentlock->childLocks = 0;
02274 }
02275
02276 if ((parentlock->childLocks == 0) && (!parentlock->held))
02277 {
02278 rmlock = (LOCALPREDICATELOCK *)
02279 hash_search_with_hash_value(LocalPredicateLockHash,
02280 &parenttag, targettaghash,
02281 HASH_REMOVE, NULL);
02282 Assert(rmlock == parentlock);
02283 }
02284 }
02285 }
02286
02287
02288
02289
02290
02291
02292
02293
02294
02295
02296 static void
02297 CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
02298 uint32 targettaghash,
02299 SERIALIZABLEXACT *sxact)
02300 {
02301 PREDICATELOCKTARGET *target;
02302 PREDICATELOCKTAG locktag;
02303 PREDICATELOCK *lock;
02304 LWLockId partitionLock;
02305 bool found;
02306
02307 partitionLock = PredicateLockHashPartitionLock(targettaghash);
02308
02309 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
02310 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
02311
02312
02313 target = (PREDICATELOCKTARGET *)
02314 hash_search_with_hash_value(PredicateLockTargetHash,
02315 targettag, targettaghash,
02316 HASH_ENTER_NULL, &found);
02317 if (!target)
02318 ereport(ERROR,
02319 (errcode(ERRCODE_OUT_OF_MEMORY),
02320 errmsg("out of shared memory"),
02321 errhint("You might need to increase max_pred_locks_per_transaction.")));
02322 if (!found)
02323 SHMQueueInit(&(target->predicateLocks));
02324
02325
02326 locktag.myTarget = target;
02327 locktag.myXact = sxact;
02328 lock = (PREDICATELOCK *)
02329 hash_search_with_hash_value(PredicateLockHash, &locktag,
02330 PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
02331 HASH_ENTER_NULL, &found);
02332 if (!lock)
02333 ereport(ERROR,
02334 (errcode(ERRCODE_OUT_OF_MEMORY),
02335 errmsg("out of shared memory"),
02336 errhint("You might need to increase max_pred_locks_per_transaction.")));
02337
02338 if (!found)
02339 {
02340 SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
02341 SHMQueueInsertBefore(&(sxact->predicateLocks),
02342 &(lock->xactLink));
02343 lock->commitSeqNo = InvalidSerCommitSeqNo;
02344 }
02345
02346 LWLockRelease(partitionLock);
02347 LWLockRelease(SerializablePredicateLockListLock);
02348 }
02349
02350
02351
02352
02353
02354
02355
02356
02357 static void
02358 PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
02359 {
02360 uint32 targettaghash;
02361 bool found;
02362 LOCALPREDICATELOCK *locallock;
02363
02364
02365 if (PredicateLockExists(targettag))
02366 return;
02367
02368 if (CoarserLockCovers(targettag))
02369 return;
02370
02371
02372 targettaghash = PredicateLockTargetTagHashCode(targettag);
02373
02374
02375 locallock = (LOCALPREDICATELOCK *)
02376 hash_search_with_hash_value(LocalPredicateLockHash,
02377 targettag, targettaghash,
02378 HASH_ENTER, &found);
02379 locallock->held = true;
02380 if (!found)
02381 locallock->childLocks = 0;
02382
02383
02384 CreatePredicateLock(targettag, targettaghash, MySerializableXact);
02385
02386
02387
02388
02389
02390
02391 if (CheckAndPromotePredicateLockRequest(targettag))
02392 {
02393
02394
02395
02396
02397
02398 }
02399 else
02400 {
02401
02402 if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
02403 DeleteChildTargetLocks(targettag);
02404 }
02405 }
02406
02407
02408
02409
02410
02411
02412
02413
02414
02415
02416 void
02417 PredicateLockRelation(Relation relation, Snapshot snapshot)
02418 {
02419 PREDICATELOCKTARGETTAG tag;
02420
02421 if (!SerializationNeededForRead(relation, snapshot))
02422 return;
02423
02424 SET_PREDICATELOCKTARGETTAG_RELATION(tag,
02425 relation->rd_node.dbNode,
02426 relation->rd_id);
02427 PredicateLockAcquire(&tag);
02428 }
02429
02430
02431
02432
02433
02434
02435
02436
02437
02438
02439 void
02440 PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot)
02441 {
02442 PREDICATELOCKTARGETTAG tag;
02443
02444 if (!SerializationNeededForRead(relation, snapshot))
02445 return;
02446
02447 SET_PREDICATELOCKTARGETTAG_PAGE(tag,
02448 relation->rd_node.dbNode,
02449 relation->rd_id,
02450 blkno);
02451 PredicateLockAcquire(&tag);
02452 }
02453
02454
02455
02456
02457
02458
02459
02460
02461 void
02462 PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot)
02463 {
02464 PREDICATELOCKTARGETTAG tag;
02465 ItemPointer tid;
02466 TransactionId targetxmin;
02467
02468 if (!SerializationNeededForRead(relation, snapshot))
02469 return;
02470
02471
02472
02473
02474 if (relation->rd_index == NULL)
02475 {
02476 TransactionId myxid;
02477
02478 targetxmin = HeapTupleHeaderGetXmin(tuple->t_data);
02479
02480 myxid = GetTopTransactionIdIfAny();
02481 if (TransactionIdIsValid(myxid))
02482 {
02483 if (TransactionIdFollowsOrEquals(targetxmin, TransactionXmin))
02484 {
02485 TransactionId xid = SubTransGetTopmostTransaction(targetxmin);
02486
02487 if (TransactionIdEquals(xid, myxid))
02488 {
02489
02490 return;
02491 }
02492 }
02493 }
02494 }
02495 else
02496 targetxmin = InvalidTransactionId;
02497
02498
02499
02500
02501
02502
02503
02504 SET_PREDICATELOCKTARGETTAG_RELATION(tag,
02505 relation->rd_node.dbNode,
02506 relation->rd_id);
02507 if (PredicateLockExists(&tag))
02508 return;
02509
02510 tid = &(tuple->t_self);
02511 SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
02512 relation->rd_node.dbNode,
02513 relation->rd_id,
02514 ItemPointerGetBlockNumber(tid),
02515 ItemPointerGetOffsetNumber(tid),
02516 targetxmin);
02517 PredicateLockAcquire(&tag);
02518 }
02519
02520
02521
02522
02523
02524
02525
02526
02527
02528
02529 static void
02530 DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
02531 {
02532 PREDICATELOCK *predlock;
02533 SHM_QUEUE *predlocktargetlink;
02534 PREDICATELOCK *nextpredlock;
02535 bool found;
02536
02537 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
02538 Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
02539
02540 predlock = (PREDICATELOCK *)
02541 SHMQueueNext(&(target->predicateLocks),
02542 &(target->predicateLocks),
02543 offsetof(PREDICATELOCK, targetLink));
02544 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
02545 while (predlock)
02546 {
02547 predlocktargetlink = &(predlock->targetLink);
02548 nextpredlock = (PREDICATELOCK *)
02549 SHMQueueNext(&(target->predicateLocks),
02550 predlocktargetlink,
02551 offsetof(PREDICATELOCK, targetLink));
02552
02553 SHMQueueDelete(&(predlock->xactLink));
02554 SHMQueueDelete(&(predlock->targetLink));
02555
02556 hash_search_with_hash_value
02557 (PredicateLockHash,
02558 &predlock->tag,
02559 PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
02560 targettaghash),
02561 HASH_REMOVE, &found);
02562 Assert(found);
02563
02564 predlock = nextpredlock;
02565 }
02566 LWLockRelease(SerializableXactHashLock);
02567
02568
02569 RemoveTargetIfNoLongerUsed(target, targettaghash);
02570 }
02571
02572
02573
02574
02575
02576
02577
02578
02579
02580
02581
02582
02583
02584
02585
02586
02587
02588
02589
02590
02591
02592
02593
02594
02595
02596
02597
02598
02599 static bool
02600 TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
02601 PREDICATELOCKTARGETTAG newtargettag,
02602 bool removeOld)
02603 {
02604 uint32 oldtargettaghash;
02605 LWLockId oldpartitionLock;
02606 PREDICATELOCKTARGET *oldtarget;
02607 uint32 newtargettaghash;
02608 LWLockId newpartitionLock;
02609 bool found;
02610 bool outOfShmem = false;
02611
02612 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
02613
02614 oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
02615 newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
02616 oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
02617 newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
02618
02619 if (removeOld)
02620 {
02621
02622
02623
02624
02625 RemoveScratchTarget(false);
02626 }
02627
02628
02629
02630
02631
02632
02633 if (oldpartitionLock < newpartitionLock)
02634 {
02635 LWLockAcquire(oldpartitionLock,
02636 (removeOld ? LW_EXCLUSIVE : LW_SHARED));
02637 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
02638 }
02639 else if (oldpartitionLock > newpartitionLock)
02640 {
02641 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
02642 LWLockAcquire(oldpartitionLock,
02643 (removeOld ? LW_EXCLUSIVE : LW_SHARED));
02644 }
02645 else
02646 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
02647
02648
02649
02650
02651
02652
02653
02654 oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
02655 &oldtargettag,
02656 oldtargettaghash,
02657 HASH_FIND, NULL);
02658
02659 if (oldtarget)
02660 {
02661 PREDICATELOCKTARGET *newtarget;
02662 PREDICATELOCK *oldpredlock;
02663 PREDICATELOCKTAG newpredlocktag;
02664
02665 newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
02666 &newtargettag,
02667 newtargettaghash,
02668 HASH_ENTER_NULL, &found);
02669
02670 if (!newtarget)
02671 {
02672
02673 outOfShmem = true;
02674 goto exit;
02675 }
02676
02677
02678 if (!found)
02679 SHMQueueInit(&(newtarget->predicateLocks));
02680
02681 newpredlocktag.myTarget = newtarget;
02682
02683
02684
02685
02686
02687 oldpredlock = (PREDICATELOCK *)
02688 SHMQueueNext(&(oldtarget->predicateLocks),
02689 &(oldtarget->predicateLocks),
02690 offsetof(PREDICATELOCK, targetLink));
02691 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
02692 while (oldpredlock)
02693 {
02694 SHM_QUEUE *predlocktargetlink;
02695 PREDICATELOCK *nextpredlock;
02696 PREDICATELOCK *newpredlock;
02697 SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
02698
02699 predlocktargetlink = &(oldpredlock->targetLink);
02700 nextpredlock = (PREDICATELOCK *)
02701 SHMQueueNext(&(oldtarget->predicateLocks),
02702 predlocktargetlink,
02703 offsetof(PREDICATELOCK, targetLink));
02704 newpredlocktag.myXact = oldpredlock->tag.myXact;
02705
02706 if (removeOld)
02707 {
02708 SHMQueueDelete(&(oldpredlock->xactLink));
02709 SHMQueueDelete(&(oldpredlock->targetLink));
02710
02711 hash_search_with_hash_value
02712 (PredicateLockHash,
02713 &oldpredlock->tag,
02714 PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
02715 oldtargettaghash),
02716 HASH_REMOVE, &found);
02717 Assert(found);
02718 }
02719
02720 newpredlock = (PREDICATELOCK *)
02721 hash_search_with_hash_value(PredicateLockHash,
02722 &newpredlocktag,
02723 PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
02724 newtargettaghash),
02725 HASH_ENTER_NULL,
02726 &found);
02727 if (!newpredlock)
02728 {
02729
02730 LWLockRelease(SerializableXactHashLock);
02731 DeleteLockTarget(newtarget, newtargettaghash);
02732 outOfShmem = true;
02733 goto exit;
02734 }
02735 if (!found)
02736 {
02737 SHMQueueInsertBefore(&(newtarget->predicateLocks),
02738 &(newpredlock->targetLink));
02739 SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
02740 &(newpredlock->xactLink));
02741 newpredlock->commitSeqNo = oldCommitSeqNo;
02742 }
02743 else
02744 {
02745 if (newpredlock->commitSeqNo < oldCommitSeqNo)
02746 newpredlock->commitSeqNo = oldCommitSeqNo;
02747 }
02748
02749 Assert(newpredlock->commitSeqNo != 0);
02750 Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
02751 || (newpredlock->tag.myXact == OldCommittedSxact));
02752
02753 oldpredlock = nextpredlock;
02754 }
02755 LWLockRelease(SerializableXactHashLock);
02756
02757 if (removeOld)
02758 {
02759 Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
02760 RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
02761 }
02762 }
02763
02764
02765 exit:
02766
02767 if (oldpartitionLock < newpartitionLock)
02768 {
02769 LWLockRelease(newpartitionLock);
02770 LWLockRelease(oldpartitionLock);
02771 }
02772 else if (oldpartitionLock > newpartitionLock)
02773 {
02774 LWLockRelease(oldpartitionLock);
02775 LWLockRelease(newpartitionLock);
02776 }
02777 else
02778 LWLockRelease(newpartitionLock);
02779
02780 if (removeOld)
02781 {
02782
02783 Assert(!outOfShmem);
02784
02785
02786 RestoreScratchTarget(false);
02787 }
02788
02789 return !outOfShmem;
02790 }
02791
02792
02793
02794
02795
02796
02797
02798
02799
02800
02801
02802
02803
02804
02805
02806
02807
02808
02809
02810
02811
02812
02813
02814
02815 static void
02816 DropAllPredicateLocksFromTable(Relation relation, bool transfer)
02817 {
02818 HASH_SEQ_STATUS seqstat;
02819 PREDICATELOCKTARGET *oldtarget;
02820 PREDICATELOCKTARGET *heaptarget;
02821 Oid dbId;
02822 Oid relId;
02823 Oid heapId;
02824 int i;
02825 bool isIndex;
02826 bool found;
02827 uint32 heaptargettaghash;
02828
02829
02830
02831
02832
02833
02834
02835 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
02836 return;
02837
02838 if (!PredicateLockingNeededForRelation(relation))
02839 return;
02840
02841 dbId = relation->rd_node.dbNode;
02842 relId = relation->rd_id;
02843 if (relation->rd_index == NULL)
02844 {
02845 isIndex = false;
02846 heapId = relId;
02847 }
02848 else
02849 {
02850 isIndex = true;
02851 heapId = relation->rd_index->indrelid;
02852 }
02853 Assert(heapId != InvalidOid);
02854 Assert(transfer || !isIndex);
02855
02856
02857
02858 heaptargettaghash = 0;
02859 heaptarget = NULL;
02860
02861
02862 LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
02863 for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
02864 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
02865 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
02866
02867
02868
02869
02870
02871 if (transfer)
02872 RemoveScratchTarget(true);
02873
02874
02875 hash_seq_init(&seqstat, PredicateLockTargetHash);
02876
02877 while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
02878 {
02879 PREDICATELOCK *oldpredlock;
02880
02881
02882
02883
02884 if (GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag) != relId)
02885 continue;
02886 if (GET_PREDICATELOCKTARGETTAG_DB(oldtarget->tag) != dbId)
02887 continue;
02888 if (transfer && !isIndex
02889 && GET_PREDICATELOCKTARGETTAG_TYPE(oldtarget->tag) == PREDLOCKTAG_RELATION)
02890 continue;
02891
02892
02893
02894
02895
02896
02897
02898
02899
02900
02901
02902
02903 if (transfer && heaptarget == NULL)
02904 {
02905 PREDICATELOCKTARGETTAG heaptargettag;
02906
02907 SET_PREDICATELOCKTARGETTAG_RELATION(heaptargettag, dbId, heapId);
02908 heaptargettaghash = PredicateLockTargetTagHashCode(&heaptargettag);
02909 heaptarget = hash_search_with_hash_value(PredicateLockTargetHash,
02910 &heaptargettag,
02911 heaptargettaghash,
02912 HASH_ENTER, &found);
02913 if (!found)
02914 SHMQueueInit(&heaptarget->predicateLocks);
02915 }
02916
02917
02918
02919
02920
02921 oldpredlock = (PREDICATELOCK *)
02922 SHMQueueNext(&(oldtarget->predicateLocks),
02923 &(oldtarget->predicateLocks),
02924 offsetof(PREDICATELOCK, targetLink));
02925 while (oldpredlock)
02926 {
02927 PREDICATELOCK *nextpredlock;
02928 PREDICATELOCK *newpredlock;
02929 SerCommitSeqNo oldCommitSeqNo;
02930 SERIALIZABLEXACT *oldXact;
02931
02932 nextpredlock = (PREDICATELOCK *)
02933 SHMQueueNext(&(oldtarget->predicateLocks),
02934 &(oldpredlock->targetLink),
02935 offsetof(PREDICATELOCK, targetLink));
02936
02937
02938
02939
02940
02941 oldCommitSeqNo = oldpredlock->commitSeqNo;
02942 oldXact = oldpredlock->tag.myXact;
02943
02944 SHMQueueDelete(&(oldpredlock->xactLink));
02945
02946
02947
02948
02949
02950 hash_search(PredicateLockHash,
02951 &oldpredlock->tag,
02952 HASH_REMOVE, &found);
02953 Assert(found);
02954
02955 if (transfer)
02956 {
02957 PREDICATELOCKTAG newpredlocktag;
02958
02959 newpredlocktag.myTarget = heaptarget;
02960 newpredlocktag.myXact = oldXact;
02961 newpredlock = (PREDICATELOCK *)
02962 hash_search_with_hash_value(PredicateLockHash,
02963 &newpredlocktag,
02964 PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
02965 heaptargettaghash),
02966 HASH_ENTER,
02967 &found);
02968 if (!found)
02969 {
02970 SHMQueueInsertBefore(&(heaptarget->predicateLocks),
02971 &(newpredlock->targetLink));
02972 SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
02973 &(newpredlock->xactLink));
02974 newpredlock->commitSeqNo = oldCommitSeqNo;
02975 }
02976 else
02977 {
02978 if (newpredlock->commitSeqNo < oldCommitSeqNo)
02979 newpredlock->commitSeqNo = oldCommitSeqNo;
02980 }
02981
02982 Assert(newpredlock->commitSeqNo != 0);
02983 Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
02984 || (newpredlock->tag.myXact == OldCommittedSxact));
02985 }
02986
02987 oldpredlock = nextpredlock;
02988 }
02989
02990 hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE,
02991 &found);
02992 Assert(found);
02993 }
02994
02995
02996 if (transfer)
02997 RestoreScratchTarget(true);
02998
02999
03000 LWLockRelease(SerializableXactHashLock);
03001 for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
03002 LWLockRelease(FirstPredicateLockMgrLock + i);
03003 LWLockRelease(SerializablePredicateLockListLock);
03004 }
03005
03006
03007
03008
03009
03010
03011 void
03012 TransferPredicateLocksToHeapRelation(Relation relation)
03013 {
03014 DropAllPredicateLocksFromTable(relation, true);
03015 }
03016
03017
03018
03019
03020
03021
03022
03023
03024
03025
03026
03027
03028
03029
03030
03031
03032 void
03033 PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
03034 BlockNumber newblkno)
03035 {
03036 PREDICATELOCKTARGETTAG oldtargettag;
03037 PREDICATELOCKTARGETTAG newtargettag;
03038 bool success;
03039
03040
03041
03042
03043
03044
03045
03046
03047
03048
03049
03050 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
03051 return;
03052
03053 if (!PredicateLockingNeededForRelation(relation))
03054 return;
03055
03056 Assert(oldblkno != newblkno);
03057 Assert(BlockNumberIsValid(oldblkno));
03058 Assert(BlockNumberIsValid(newblkno));
03059
03060 SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
03061 relation->rd_node.dbNode,
03062 relation->rd_id,
03063 oldblkno);
03064 SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
03065 relation->rd_node.dbNode,
03066 relation->rd_id,
03067 newblkno);
03068
03069 LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
03070
03071
03072
03073
03074
03075 success = TransferPredicateLocksToNewTarget(oldtargettag,
03076 newtargettag,
03077 false);
03078
03079 if (!success)
03080 {
03081
03082
03083
03084
03085
03086
03087 success = GetParentPredicateLockTag(&oldtargettag,
03088 &newtargettag);
03089 Assert(success);
03090
03091
03092
03093
03094
03095
03096
03097
03098
03099 success = TransferPredicateLocksToNewTarget(oldtargettag,
03100 newtargettag,
03101 true);
03102 Assert(success);
03103 }
03104
03105 LWLockRelease(SerializablePredicateLockListLock);
03106 }
03107
03108
03109
03110
03111
03112
03113
03114
03115
03116
03117 void
03118 PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
03119 BlockNumber newblkno)
03120 {
03121
03122
03123
03124
03125
03126
03127
03128
03129
03130
03131
03132 PredicateLockPageSplit(relation, oldblkno, newblkno);
03133 }
03134
03135
03136
03137
03138
03139 static void
03140 SetNewSxactGlobalXmin(void)
03141 {
03142 SERIALIZABLEXACT *sxact;
03143
03144 Assert(LWLockHeldByMe(SerializableXactHashLock));
03145
03146 PredXact->SxactGlobalXmin = InvalidTransactionId;
03147 PredXact->SxactGlobalXminCount = 0;
03148
03149 for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
03150 {
03151 if (!SxactIsRolledBack(sxact)
03152 && !SxactIsCommitted(sxact)
03153 && sxact != OldCommittedSxact)
03154 {
03155 Assert(sxact->xmin != InvalidTransactionId);
03156 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
03157 || TransactionIdPrecedes(sxact->xmin,
03158 PredXact->SxactGlobalXmin))
03159 {
03160 PredXact->SxactGlobalXmin = sxact->xmin;
03161 PredXact->SxactGlobalXminCount = 1;
03162 }
03163 else if (TransactionIdEquals(sxact->xmin,
03164 PredXact->SxactGlobalXmin))
03165 PredXact->SxactGlobalXminCount++;
03166 }
03167 }
03168
03169 OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
03170 }
03171
03172
03173
03174
03175
03176
03177
03178
03179
03180
03181
03182
03183
03184
03185
03186
03187
03188
03189 void
03190 ReleasePredicateLocks(bool isCommit)
03191 {
03192 bool needToClear;
03193 RWConflict conflict,
03194 nextConflict,
03195 possibleUnsafeConflict;
03196 SERIALIZABLEXACT *roXact;
03197
03198
03199
03200
03201
03202
03203
03204
03205
03206
03207 bool topLevelIsDeclaredReadOnly;
03208
03209 if (MySerializableXact == InvalidSerializableXact)
03210 {
03211 Assert(LocalPredicateLockHash == NULL);
03212 return;
03213 }
03214
03215 Assert(!isCommit || SxactIsPrepared(MySerializableXact));
03216 Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
03217 Assert(!SxactIsCommitted(MySerializableXact));
03218 Assert(!SxactIsRolledBack(MySerializableXact));
03219
03220
03221 if (MySerializableXact->pid != 0)
03222 Assert(IsolationIsSerializable());
03223
03224
03225 Assert(!SxactIsOnFinishedList(MySerializableXact));
03226
03227 topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
03228
03229 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
03230
03231
03232
03233
03234
03235
03236
03237
03238
03239
03240
03241
03242 MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
03243
03244
03245
03246
03247
03248 if (isCommit)
03249 {
03250 MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
03251 MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
03252
03253 if (!MyXactDidWrite)
03254 MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
03255 }
03256 else
03257 {
03258
03259
03260
03261
03262
03263
03264
03265
03266
03267
03268
03269
03270 MySerializableXact->flags |= SXACT_FLAG_DOOMED;
03271 MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
03272
03273
03274
03275
03276
03277
03278
03279 MySerializableXact->flags &= ~SXACT_FLAG_PREPARED;
03280 }
03281
03282 if (!topLevelIsDeclaredReadOnly)
03283 {
03284 Assert(PredXact->WritableSxactCount > 0);
03285 if (--(PredXact->WritableSxactCount) == 0)
03286 {
03287
03288
03289
03290
03291
03292
03293
03294
03295
03296 PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
03297 }
03298 }
03299 else
03300 {
03301
03302
03303
03304
03305
03306 possibleUnsafeConflict = (RWConflict)
03307 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
03308 &MySerializableXact->possibleUnsafeConflicts,
03309 offsetof(RWConflictData, inLink));
03310 while (possibleUnsafeConflict)
03311 {
03312 nextConflict = (RWConflict)
03313 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
03314 &possibleUnsafeConflict->inLink,
03315 offsetof(RWConflictData, inLink));
03316
03317 Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
03318 Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
03319
03320 ReleaseRWConflict(possibleUnsafeConflict);
03321
03322 possibleUnsafeConflict = nextConflict;
03323 }
03324 }
03325
03326
03327 if (isCommit
03328 && !SxactIsReadOnly(MySerializableXact)
03329 && SxactHasSummaryConflictOut(MySerializableXact))
03330 {
03331
03332
03333
03334
03335 MySerializableXact->SeqNo.earliestOutConflictCommit =
03336 FirstNormalSerCommitSeqNo;
03337 MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
03338 }
03339
03340
03341
03342
03343
03344
03345 conflict = (RWConflict)
03346 SHMQueueNext(&MySerializableXact->outConflicts,
03347 &MySerializableXact->outConflicts,
03348 offsetof(RWConflictData, outLink));
03349 while (conflict)
03350 {
03351 nextConflict = (RWConflict)
03352 SHMQueueNext(&MySerializableXact->outConflicts,
03353 &conflict->outLink,
03354 offsetof(RWConflictData, outLink));
03355
03356 if (isCommit
03357 && !SxactIsReadOnly(MySerializableXact)
03358 && SxactIsCommitted(conflict->sxactIn))
03359 {
03360 if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
03361 || conflict->sxactIn->prepareSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
03362 MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->prepareSeqNo;
03363 MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
03364 }
03365
03366 if (!isCommit
03367 || SxactIsCommitted(conflict->sxactIn)
03368 || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
03369 ReleaseRWConflict(conflict);
03370
03371 conflict = nextConflict;
03372 }
03373
03374
03375
03376
03377
03378 conflict = (RWConflict)
03379 SHMQueueNext(&MySerializableXact->inConflicts,
03380 &MySerializableXact->inConflicts,
03381 offsetof(RWConflictData, inLink));
03382 while (conflict)
03383 {
03384 nextConflict = (RWConflict)
03385 SHMQueueNext(&MySerializableXact->inConflicts,
03386 &conflict->inLink,
03387 offsetof(RWConflictData, inLink));
03388
03389 if (!isCommit
03390 || SxactIsCommitted(conflict->sxactOut)
03391 || SxactIsReadOnly(conflict->sxactOut))
03392 ReleaseRWConflict(conflict);
03393
03394 conflict = nextConflict;
03395 }
03396
03397 if (!topLevelIsDeclaredReadOnly)
03398 {
03399
03400
03401
03402
03403
03404
03405 possibleUnsafeConflict = (RWConflict)
03406 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
03407 &MySerializableXact->possibleUnsafeConflicts,
03408 offsetof(RWConflictData, outLink));
03409 while (possibleUnsafeConflict)
03410 {
03411 nextConflict = (RWConflict)
03412 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
03413 &possibleUnsafeConflict->outLink,
03414 offsetof(RWConflictData, outLink));
03415
03416 roXact = possibleUnsafeConflict->sxactIn;
03417 Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
03418 Assert(SxactIsReadOnly(roXact));
03419
03420
03421 if (isCommit
03422 && MyXactDidWrite
03423 && SxactHasConflictOut(MySerializableXact)
03424 && (MySerializableXact->SeqNo.earliestOutConflictCommit
03425 <= roXact->SeqNo.lastCommitBeforeSnapshot))
03426 {
03427
03428
03429
03430
03431 FlagSxactUnsafe(roXact);
03432 }
03433 else
03434 {
03435 ReleaseRWConflict(possibleUnsafeConflict);
03436
03437
03438
03439
03440
03441
03442 if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
03443 roXact->flags |= SXACT_FLAG_RO_SAFE;
03444 }
03445
03446
03447
03448
03449
03450 if (SxactIsDeferrableWaiting(roXact) &&
03451 (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
03452 ProcSendSignal(roXact->pid);
03453
03454 possibleUnsafeConflict = nextConflict;
03455 }
03456 }
03457
03458
03459
03460
03461
03462
03463
03464
03465 needToClear = false;
03466 if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
03467 {
03468 Assert(PredXact->SxactGlobalXminCount > 0);
03469 if (--(PredXact->SxactGlobalXminCount) == 0)
03470 {
03471 SetNewSxactGlobalXmin();
03472 needToClear = true;
03473 }
03474 }
03475
03476 LWLockRelease(SerializableXactHashLock);
03477
03478 LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
03479
03480
03481 if (isCommit)
03482 SHMQueueInsertBefore(FinishedSerializableTransactions,
03483 &MySerializableXact->finishedLink);
03484
03485 if (!isCommit)
03486 ReleaseOneSerializableXact(MySerializableXact, false, false);
03487
03488 LWLockRelease(SerializableFinishedListLock);
03489
03490 if (needToClear)
03491 ClearOldPredicateLocks();
03492
03493 MySerializableXact = InvalidSerializableXact;
03494 MyXactDidWrite = false;
03495
03496
03497 if (LocalPredicateLockHash != NULL)
03498 {
03499 hash_destroy(LocalPredicateLockHash);
03500 LocalPredicateLockHash = NULL;
03501 }
03502 }
03503
03504
03505
03506
03507
03508 static void
03509 ClearOldPredicateLocks(void)
03510 {
03511 SERIALIZABLEXACT *finishedSxact;
03512 PREDICATELOCK *predlock;
03513
03514
03515
03516
03517
03518 LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
03519 finishedSxact = (SERIALIZABLEXACT *)
03520 SHMQueueNext(FinishedSerializableTransactions,
03521 FinishedSerializableTransactions,
03522 offsetof(SERIALIZABLEXACT, finishedLink));
03523 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
03524 while (finishedSxact)
03525 {
03526 SERIALIZABLEXACT *nextSxact;
03527
03528 nextSxact = (SERIALIZABLEXACT *)
03529 SHMQueueNext(FinishedSerializableTransactions,
03530 &(finishedSxact->finishedLink),
03531 offsetof(SERIALIZABLEXACT, finishedLink));
03532 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
03533 || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
03534 PredXact->SxactGlobalXmin))
03535 {
03536
03537
03538
03539
03540 LWLockRelease(SerializableXactHashLock);
03541 SHMQueueDelete(&(finishedSxact->finishedLink));
03542 ReleaseOneSerializableXact(finishedSxact, false, false);
03543 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
03544 }
03545 else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
03546 && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
03547 {
03548
03549
03550
03551
03552
03553 LWLockRelease(SerializableXactHashLock);
03554
03555 if (SxactIsReadOnly(finishedSxact))
03556 {
03557
03558 SHMQueueDelete(&(finishedSxact->finishedLink));
03559 ReleaseOneSerializableXact(finishedSxact, false, false);
03560 }
03561 else
03562 {
03563
03564
03565
03566
03567
03568 ReleaseOneSerializableXact(finishedSxact, true, false);
03569 }
03570
03571 PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
03572 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
03573 }
03574 else
03575 {
03576
03577 break;
03578 }
03579 finishedSxact = nextSxact;
03580 }
03581 LWLockRelease(SerializableXactHashLock);
03582
03583
03584
03585
03586 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
03587 predlock = (PREDICATELOCK *)
03588 SHMQueueNext(&OldCommittedSxact->predicateLocks,
03589 &OldCommittedSxact->predicateLocks,
03590 offsetof(PREDICATELOCK, xactLink));
03591 while (predlock)
03592 {
03593 PREDICATELOCK *nextpredlock;
03594 bool canDoPartialCleanup;
03595
03596 nextpredlock = (PREDICATELOCK *)
03597 SHMQueueNext(&OldCommittedSxact->predicateLocks,
03598 &predlock->xactLink,
03599 offsetof(PREDICATELOCK, xactLink));
03600
03601 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
03602 Assert(predlock->commitSeqNo != 0);
03603 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
03604 canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
03605 LWLockRelease(SerializableXactHashLock);
03606
03607
03608
03609
03610
03611 if (canDoPartialCleanup)
03612 {
03613 PREDICATELOCKTAG tag;
03614 PREDICATELOCKTARGET *target;
03615 PREDICATELOCKTARGETTAG targettag;
03616 uint32 targettaghash;
03617 LWLockId partitionLock;
03618
03619 tag = predlock->tag;
03620 target = tag.myTarget;
03621 targettag = target->tag;
03622 targettaghash = PredicateLockTargetTagHashCode(&targettag);
03623 partitionLock = PredicateLockHashPartitionLock(targettaghash);
03624
03625 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
03626
03627 SHMQueueDelete(&(predlock->targetLink));
03628 SHMQueueDelete(&(predlock->xactLink));
03629
03630 hash_search_with_hash_value(PredicateLockHash, &tag,
03631 PredicateLockHashCodeFromTargetHashCode(&tag,
03632 targettaghash),
03633 HASH_REMOVE, NULL);
03634 RemoveTargetIfNoLongerUsed(target, targettaghash);
03635
03636 LWLockRelease(partitionLock);
03637 }
03638
03639 predlock = nextpredlock;
03640 }
03641
03642 LWLockRelease(SerializablePredicateLockListLock);
03643 LWLockRelease(SerializableFinishedListLock);
03644 }
03645
03646
03647
03648
03649
03650
03651
03652
03653
03654
03655
03656
03657
03658
03659
03660
03661
03662
03663
03664
03665 static void
03666 ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
03667 bool summarize)
03668 {
03669 PREDICATELOCK *predlock;
03670 SERIALIZABLEXIDTAG sxidtag;
03671 RWConflict conflict,
03672 nextConflict;
03673
03674 Assert(sxact != NULL);
03675 Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
03676 Assert(partial || !SxactIsOnFinishedList(sxact));
03677 Assert(LWLockHeldByMe(SerializableFinishedListLock));
03678
03679
03680
03681
03682
03683 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
03684 predlock = (PREDICATELOCK *)
03685 SHMQueueNext(&(sxact->predicateLocks),
03686 &(sxact->predicateLocks),
03687 offsetof(PREDICATELOCK, xactLink));
03688 while (predlock)
03689 {
03690 PREDICATELOCK *nextpredlock;
03691 PREDICATELOCKTAG tag;
03692 SHM_QUEUE *targetLink;
03693 PREDICATELOCKTARGET *target;
03694 PREDICATELOCKTARGETTAG targettag;
03695 uint32 targettaghash;
03696 LWLockId partitionLock;
03697
03698 nextpredlock = (PREDICATELOCK *)
03699 SHMQueueNext(&(sxact->predicateLocks),
03700 &(predlock->xactLink),
03701 offsetof(PREDICATELOCK, xactLink));
03702
03703 tag = predlock->tag;
03704 targetLink = &(predlock->targetLink);
03705 target = tag.myTarget;
03706 targettag = target->tag;
03707 targettaghash = PredicateLockTargetTagHashCode(&targettag);
03708 partitionLock = PredicateLockHashPartitionLock(targettaghash);
03709
03710 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
03711
03712 SHMQueueDelete(targetLink);
03713
03714 hash_search_with_hash_value(PredicateLockHash, &tag,
03715 PredicateLockHashCodeFromTargetHashCode(&tag,
03716 targettaghash),
03717 HASH_REMOVE, NULL);
03718 if (summarize)
03719 {
03720 bool found;
03721
03722
03723 tag.myXact = OldCommittedSxact;
03724 predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
03725 PredicateLockHashCodeFromTargetHashCode(&tag,
03726 targettaghash),
03727 HASH_ENTER_NULL, &found);
03728 if (!predlock)
03729 ereport(ERROR,
03730 (errcode(ERRCODE_OUT_OF_MEMORY),
03731 errmsg("out of shared memory"),
03732 errhint("You might need to increase max_pred_locks_per_transaction.")));
03733 if (found)
03734 {
03735 Assert(predlock->commitSeqNo != 0);
03736 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
03737 if (predlock->commitSeqNo < sxact->commitSeqNo)
03738 predlock->commitSeqNo = sxact->commitSeqNo;
03739 }
03740 else
03741 {
03742 SHMQueueInsertBefore(&(target->predicateLocks),
03743 &(predlock->targetLink));
03744 SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
03745 &(predlock->xactLink));
03746 predlock->commitSeqNo = sxact->commitSeqNo;
03747 }
03748 }
03749 else
03750 RemoveTargetIfNoLongerUsed(target, targettaghash);
03751
03752 LWLockRelease(partitionLock);
03753
03754 predlock = nextpredlock;
03755 }
03756
03757
03758
03759
03760
03761 SHMQueueInit(&sxact->predicateLocks);
03762
03763 LWLockRelease(SerializablePredicateLockListLock);
03764
03765 sxidtag.xid = sxact->topXid;
03766 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
03767
03768
03769 if (!partial)
03770 {
03771 conflict = (RWConflict)
03772 SHMQueueNext(&sxact->outConflicts,
03773 &sxact->outConflicts,
03774 offsetof(RWConflictData, outLink));
03775 while (conflict)
03776 {
03777 nextConflict = (RWConflict)
03778 SHMQueueNext(&sxact->outConflicts,
03779 &conflict->outLink,
03780 offsetof(RWConflictData, outLink));
03781 if (summarize)
03782 conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
03783 ReleaseRWConflict(conflict);
03784 conflict = nextConflict;
03785 }
03786 }
03787
03788
03789 conflict = (RWConflict)
03790 SHMQueueNext(&sxact->inConflicts,
03791 &sxact->inConflicts,
03792 offsetof(RWConflictData, inLink));
03793 while (conflict)
03794 {
03795 nextConflict = (RWConflict)
03796 SHMQueueNext(&sxact->inConflicts,
03797 &conflict->inLink,
03798 offsetof(RWConflictData, inLink));
03799 if (summarize)
03800 conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
03801 ReleaseRWConflict(conflict);
03802 conflict = nextConflict;
03803 }
03804
03805
03806 if (!partial)
03807 {
03808 if (sxidtag.xid != InvalidTransactionId)
03809 hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
03810 ReleasePredXact(sxact);
03811 }
03812
03813 LWLockRelease(SerializableXactHashLock);
03814 }
03815
03816
03817
03818
03819
03820
03821
03822
03823
03824 static bool
03825 XidIsConcurrent(TransactionId xid)
03826 {
03827 Snapshot snap;
03828 uint32 i;
03829
03830 Assert(TransactionIdIsValid(xid));
03831 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
03832
03833 snap = GetTransactionSnapshot();
03834
03835 if (TransactionIdPrecedes(xid, snap->xmin))
03836 return false;
03837
03838 if (TransactionIdFollowsOrEquals(xid, snap->xmax))
03839 return true;
03840
03841 for (i = 0; i < snap->xcnt; i++)
03842 {
03843 if (xid == snap->xip[i])
03844 return true;
03845 }
03846
03847 return false;
03848 }
03849
03850
03851
03852
03853
03854
03855
03856
03857
03858
03859
03860
03861
03862
03863
03864
03865
03866
03867 void
03868 CheckForSerializableConflictOut(bool visible, Relation relation,
03869 HeapTuple tuple, Buffer buffer,
03870 Snapshot snapshot)
03871 {
03872 TransactionId xid;
03873 SERIALIZABLEXIDTAG sxidtag;
03874 SERIALIZABLEXID *sxid;
03875 SERIALIZABLEXACT *sxact;
03876 HTSV_Result htsvResult;
03877
03878 if (!SerializationNeededForRead(relation, snapshot))
03879 return;
03880
03881
03882 if (SxactIsDoomed(MySerializableXact))
03883 {
03884 ereport(ERROR,
03885 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
03886 errmsg("could not serialize access due to read/write dependencies among transactions"),
03887 errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict out checking."),
03888 errhint("The transaction might succeed if retried.")));
03889 }
03890
03891
03892
03893
03894
03895
03896
03897
03898 htsvResult = HeapTupleSatisfiesVacuum(tuple->t_data, TransactionXmin, buffer);
03899 switch (htsvResult)
03900 {
03901 case HEAPTUPLE_LIVE:
03902 if (visible)
03903 return;
03904 xid = HeapTupleHeaderGetXmin(tuple->t_data);
03905 break;
03906 case HEAPTUPLE_RECENTLY_DEAD:
03907 if (!visible)
03908 return;
03909 xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
03910 break;
03911 case HEAPTUPLE_DELETE_IN_PROGRESS:
03912 xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
03913 break;
03914 case HEAPTUPLE_INSERT_IN_PROGRESS:
03915 xid = HeapTupleHeaderGetXmin(tuple->t_data);
03916 break;
03917 case HEAPTUPLE_DEAD:
03918 return;
03919 default:
03920
03921
03922
03923
03924
03925
03926 elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
03927
03928
03929
03930
03931
03932
03933
03934 xid = InvalidTransactionId;
03935 }
03936 Assert(TransactionIdIsValid(xid));
03937 Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
03938
03939
03940
03941
03942
03943 if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
03944 return;
03945 xid = SubTransGetTopmostTransaction(xid);
03946 if (TransactionIdPrecedes(xid, TransactionXmin))
03947 return;
03948 if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
03949 return;
03950
03951
03952
03953
03954 sxidtag.xid = xid;
03955 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
03956 sxid = (SERIALIZABLEXID *)
03957 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
03958 if (!sxid)
03959 {
03960
03961
03962
03963
03964 SerCommitSeqNo conflictCommitSeqNo;
03965
03966 conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
03967 if (conflictCommitSeqNo != 0)
03968 {
03969 if (conflictCommitSeqNo != InvalidSerCommitSeqNo
03970 && (!SxactIsReadOnly(MySerializableXact)
03971 || conflictCommitSeqNo
03972 <= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
03973 ereport(ERROR,
03974 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
03975 errmsg("could not serialize access due to read/write dependencies among transactions"),
03976 errdetail_internal("Reason code: Canceled on conflict out to old pivot %u.", xid),
03977 errhint("The transaction might succeed if retried.")));
03978
03979 if (SxactHasSummaryConflictIn(MySerializableXact)
03980 || !SHMQueueEmpty(&MySerializableXact->inConflicts))
03981 ereport(ERROR,
03982 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
03983 errmsg("could not serialize access due to read/write dependencies among transactions"),
03984 errdetail_internal("Reason code: Canceled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
03985 errhint("The transaction might succeed if retried.")));
03986
03987 MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
03988 }
03989
03990
03991 LWLockRelease(SerializableXactHashLock);
03992 return;
03993 }
03994 sxact = sxid->myXact;
03995 Assert(TransactionIdEquals(sxact->topXid, xid));
03996 if (sxact == MySerializableXact || SxactIsDoomed(sxact))
03997 {
03998
03999 LWLockRelease(SerializableXactHashLock);
04000 return;
04001 }
04002
04003
04004
04005
04006
04007
04008
04009 if (SxactHasSummaryConflictOut(sxact))
04010 {
04011 if (!SxactIsPrepared(sxact))
04012 {
04013 sxact->flags |= SXACT_FLAG_DOOMED;
04014 LWLockRelease(SerializableXactHashLock);
04015 return;
04016 }
04017 else
04018 {
04019 LWLockRelease(SerializableXactHashLock);
04020 ereport(ERROR,
04021 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04022 errmsg("could not serialize access due to read/write dependencies among transactions"),
04023 errdetail_internal("Reason code: Canceled on conflict out to old pivot."),
04024 errhint("The transaction might succeed if retried.")));
04025 }
04026 }
04027
04028
04029
04030
04031
04032
04033 if (SxactIsReadOnly(MySerializableXact)
04034 && SxactIsCommitted(sxact)
04035 && !SxactHasSummaryConflictOut(sxact)
04036 && (!SxactHasConflictOut(sxact)
04037 || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
04038 {
04039
04040 LWLockRelease(SerializableXactHashLock);
04041 return;
04042 }
04043
04044 if (!XidIsConcurrent(xid))
04045 {
04046
04047 LWLockRelease(SerializableXactHashLock);
04048 return;
04049 }
04050
04051 if (RWConflictExists(MySerializableXact, sxact))
04052 {
04053
04054 LWLockRelease(SerializableXactHashLock);
04055 return;
04056 }
04057
04058
04059
04060
04061
04062 FlagRWConflict(MySerializableXact, sxact);
04063 LWLockRelease(SerializableXactHashLock);
04064 }
04065
04066
04067
04068
04069
04070 static void
04071 CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
04072 {
04073 uint32 targettaghash;
04074 LWLockId partitionLock;
04075 PREDICATELOCKTARGET *target;
04076 PREDICATELOCK *predlock;
04077 PREDICATELOCK *mypredlock = NULL;
04078 PREDICATELOCKTAG mypredlocktag;
04079
04080 Assert(MySerializableXact != InvalidSerializableXact);
04081
04082
04083
04084
04085 targettaghash = PredicateLockTargetTagHashCode(targettag);
04086 partitionLock = PredicateLockHashPartitionLock(targettaghash);
04087 LWLockAcquire(partitionLock, LW_SHARED);
04088 target = (PREDICATELOCKTARGET *)
04089 hash_search_with_hash_value(PredicateLockTargetHash,
04090 targettag, targettaghash,
04091 HASH_FIND, NULL);
04092 if (!target)
04093 {
04094
04095 LWLockRelease(partitionLock);
04096 return;
04097 }
04098
04099
04100
04101
04102
04103 predlock = (PREDICATELOCK *)
04104 SHMQueueNext(&(target->predicateLocks),
04105 &(target->predicateLocks),
04106 offsetof(PREDICATELOCK, targetLink));
04107 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04108 while (predlock)
04109 {
04110 SHM_QUEUE *predlocktargetlink;
04111 PREDICATELOCK *nextpredlock;
04112 SERIALIZABLEXACT *sxact;
04113
04114 predlocktargetlink = &(predlock->targetLink);
04115 nextpredlock = (PREDICATELOCK *)
04116 SHMQueueNext(&(target->predicateLocks),
04117 predlocktargetlink,
04118 offsetof(PREDICATELOCK, targetLink));
04119
04120 sxact = predlock->tag.myXact;
04121 if (sxact == MySerializableXact)
04122 {
04123
04124
04125
04126
04127
04128
04129
04130
04131
04132
04133 if (!IsSubTransaction()
04134 && GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
04135 {
04136 mypredlock = predlock;
04137 mypredlocktag = predlock->tag;
04138 }
04139 }
04140 else if (!SxactIsDoomed(sxact)
04141 && (!SxactIsCommitted(sxact)
04142 || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
04143 sxact->finishedBefore))
04144 && !RWConflictExists(sxact, MySerializableXact))
04145 {
04146 LWLockRelease(SerializableXactHashLock);
04147 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
04148
04149
04150
04151
04152
04153 if (!SxactIsDoomed(sxact)
04154 && (!SxactIsCommitted(sxact)
04155 || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
04156 sxact->finishedBefore))
04157 && !RWConflictExists(sxact, MySerializableXact))
04158 {
04159 FlagRWConflict(sxact, MySerializableXact);
04160 }
04161
04162 LWLockRelease(SerializableXactHashLock);
04163 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04164 }
04165
04166 predlock = nextpredlock;
04167 }
04168 LWLockRelease(SerializableXactHashLock);
04169 LWLockRelease(partitionLock);
04170
04171
04172
04173
04174
04175
04176
04177
04178
04179 if (mypredlock != NULL)
04180 {
04181 uint32 predlockhashcode;
04182 PREDICATELOCK *rmpredlock;
04183
04184 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
04185 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
04186 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
04187
04188
04189
04190
04191
04192
04193 predlockhashcode = PredicateLockHashCodeFromTargetHashCode
04194 (&mypredlocktag, targettaghash);
04195 rmpredlock = (PREDICATELOCK *)
04196 hash_search_with_hash_value(PredicateLockHash,
04197 &mypredlocktag,
04198 predlockhashcode,
04199 HASH_FIND, NULL);
04200 if (rmpredlock != NULL)
04201 {
04202 Assert(rmpredlock == mypredlock);
04203
04204 SHMQueueDelete(&(mypredlock->targetLink));
04205 SHMQueueDelete(&(mypredlock->xactLink));
04206
04207 rmpredlock = (PREDICATELOCK *)
04208 hash_search_with_hash_value(PredicateLockHash,
04209 &mypredlocktag,
04210 predlockhashcode,
04211 HASH_REMOVE, NULL);
04212 Assert(rmpredlock == mypredlock);
04213
04214 RemoveTargetIfNoLongerUsed(target, targettaghash);
04215 }
04216
04217 LWLockRelease(SerializableXactHashLock);
04218 LWLockRelease(partitionLock);
04219 LWLockRelease(SerializablePredicateLockListLock);
04220
04221 if (rmpredlock != NULL)
04222 {
04223
04224
04225
04226
04227
04228 hash_search_with_hash_value(LocalPredicateLockHash,
04229 targettag, targettaghash,
04230 HASH_REMOVE, NULL);
04231
04232 DecrementParentLocks(targettag);
04233 }
04234 }
04235 }
04236
04237
04238
04239
04240
04241
04242
04243
04244
04245
04246
04247
04248 void
04249 CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
04250 Buffer buffer)
04251 {
04252 PREDICATELOCKTARGETTAG targettag;
04253
04254 if (!SerializationNeededForWrite(relation))
04255 return;
04256
04257
04258 if (SxactIsDoomed(MySerializableXact))
04259 ereport(ERROR,
04260 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04261 errmsg("could not serialize access due to read/write dependencies among transactions"),
04262 errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."),
04263 errhint("The transaction might succeed if retried.")));
04264
04265
04266
04267
04268
04269 MyXactDidWrite = true;
04270
04271
04272
04273
04274
04275
04276
04277
04278
04279
04280 if (tuple != NULL)
04281 {
04282 SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
04283 relation->rd_node.dbNode,
04284 relation->rd_id,
04285 ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
04286 ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)),
04287 HeapTupleHeaderGetXmin(tuple->t_data));
04288 CheckTargetForConflictsIn(&targettag);
04289 }
04290
04291 if (BufferIsValid(buffer))
04292 {
04293 SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
04294 relation->rd_node.dbNode,
04295 relation->rd_id,
04296 BufferGetBlockNumber(buffer));
04297 CheckTargetForConflictsIn(&targettag);
04298 }
04299
04300 SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
04301 relation->rd_node.dbNode,
04302 relation->rd_id);
04303 CheckTargetForConflictsIn(&targettag);
04304 }
04305
04306
04307
04308
04309
04310
04311
04312
04313
04314
04315
04316
04317
04318
04319
04320
04321
04322
04323
04324
04325
04326
04327
04328
04329
04330
04331
04332
04333 void
04334 CheckTableForSerializableConflictIn(Relation relation)
04335 {
04336 HASH_SEQ_STATUS seqstat;
04337 PREDICATELOCKTARGET *target;
04338 Oid dbId;
04339 Oid heapId;
04340 int i;
04341
04342
04343
04344
04345
04346
04347
04348 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
04349 return;
04350
04351 if (!SerializationNeededForWrite(relation))
04352 return;
04353
04354
04355
04356
04357
04358 MyXactDidWrite = true;
04359
04360 Assert(relation->rd_index == NULL);
04361
04362 dbId = relation->rd_node.dbNode;
04363 heapId = relation->rd_id;
04364
04365 LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
04366 for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
04367 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
04368 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04369
04370
04371 hash_seq_init(&seqstat, PredicateLockTargetHash);
04372
04373 while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
04374 {
04375 PREDICATELOCK *predlock;
04376
04377
04378
04379
04380 if (GET_PREDICATELOCKTARGETTAG_RELATION(target->tag) != heapId)
04381 continue;
04382 if (GET_PREDICATELOCKTARGETTAG_DB(target->tag) != dbId)
04383 continue;
04384
04385
04386
04387
04388 predlock = (PREDICATELOCK *)
04389 SHMQueueNext(&(target->predicateLocks),
04390 &(target->predicateLocks),
04391 offsetof(PREDICATELOCK, targetLink));
04392 while (predlock)
04393 {
04394 PREDICATELOCK *nextpredlock;
04395
04396 nextpredlock = (PREDICATELOCK *)
04397 SHMQueueNext(&(target->predicateLocks),
04398 &(predlock->targetLink),
04399 offsetof(PREDICATELOCK, targetLink));
04400
04401 if (predlock->tag.myXact != MySerializableXact
04402 && !RWConflictExists(predlock->tag.myXact, MySerializableXact))
04403 {
04404 FlagRWConflict(predlock->tag.myXact, MySerializableXact);
04405 }
04406
04407 predlock = nextpredlock;
04408 }
04409 }
04410
04411
04412 LWLockRelease(SerializableXactHashLock);
04413 for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
04414 LWLockRelease(FirstPredicateLockMgrLock + i);
04415 LWLockRelease(SerializablePredicateLockListLock);
04416 }
04417
04418
04419
04420
04421
04422
04423
04424
04425 static void
04426 FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
04427 {
04428 Assert(reader != writer);
04429
04430
04431 OnConflict_CheckForSerializationFailure(reader, writer);
04432
04433
04434 if (reader == OldCommittedSxact)
04435 writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
04436 else if (writer == OldCommittedSxact)
04437 reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
04438 else
04439 SetRWConflict(reader, writer);
04440 }
04441
04442
04443
04444
04445
04446
04447
04448
04449
04450
04451
04452
04453
04454
04455
04456
04457
04458
04459
04460 static void
04461 OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
04462 SERIALIZABLEXACT *writer)
04463 {
04464 bool failure;
04465 RWConflict conflict;
04466
04467 Assert(LWLockHeldByMe(SerializableXactHashLock));
04468
04469 failure = false;
04470
04471
04472
04473
04474
04475
04476
04477
04478
04479
04480
04481
04482 if (SxactIsCommitted(writer)
04483 && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
04484 failure = true;
04485
04486
04487
04488
04489
04490
04491
04492
04493
04494
04495
04496
04497
04498
04499
04500
04501
04502
04503
04504
04505 if (!failure)
04506 {
04507 if (SxactHasSummaryConflictOut(writer))
04508 {
04509 failure = true;
04510 conflict = NULL;
04511 }
04512 else
04513 conflict = (RWConflict)
04514 SHMQueueNext(&writer->outConflicts,
04515 &writer->outConflicts,
04516 offsetof(RWConflictData, outLink));
04517 while (conflict)
04518 {
04519 SERIALIZABLEXACT *t2 = conflict->sxactIn;
04520
04521 if (SxactIsPrepared(t2)
04522 && (!SxactIsCommitted(reader)
04523 || t2->prepareSeqNo <= reader->commitSeqNo)
04524 && (!SxactIsCommitted(writer)
04525 || t2->prepareSeqNo <= writer->commitSeqNo)
04526 && (!SxactIsReadOnly(reader)
04527 || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
04528 {
04529 failure = true;
04530 break;
04531 }
04532 conflict = (RWConflict)
04533 SHMQueueNext(&writer->outConflicts,
04534 &conflict->outLink,
04535 offsetof(RWConflictData, outLink));
04536 }
04537 }
04538
04539
04540
04541
04542
04543
04544
04545
04546
04547
04548
04549
04550
04551
04552 if (!failure && SxactIsPrepared(writer) && !SxactIsReadOnly(reader))
04553 {
04554 if (SxactHasSummaryConflictIn(reader))
04555 {
04556 failure = true;
04557 conflict = NULL;
04558 }
04559 else
04560 conflict = (RWConflict)
04561 SHMQueueNext(&reader->inConflicts,
04562 &reader->inConflicts,
04563 offsetof(RWConflictData, inLink));
04564 while (conflict)
04565 {
04566 SERIALIZABLEXACT *t0 = conflict->sxactOut;
04567
04568 if (!SxactIsDoomed(t0)
04569 && (!SxactIsCommitted(t0)
04570 || t0->commitSeqNo >= writer->prepareSeqNo)
04571 && (!SxactIsReadOnly(t0)
04572 || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
04573 {
04574 failure = true;
04575 break;
04576 }
04577 conflict = (RWConflict)
04578 SHMQueueNext(&reader->inConflicts,
04579 &conflict->inLink,
04580 offsetof(RWConflictData, inLink));
04581 }
04582 }
04583
04584 if (failure)
04585 {
04586
04587
04588
04589
04590
04591
04592
04593
04594 if (MySerializableXact == writer)
04595 {
04596 LWLockRelease(SerializableXactHashLock);
04597 ereport(ERROR,
04598 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04599 errmsg("could not serialize access due to read/write dependencies among transactions"),
04600 errdetail_internal("Reason code: Canceled on identification as a pivot, during write."),
04601 errhint("The transaction might succeed if retried.")));
04602 }
04603 else if (SxactIsPrepared(writer))
04604 {
04605 LWLockRelease(SerializableXactHashLock);
04606
04607
04608 Assert(MySerializableXact == reader);
04609 ereport(ERROR,
04610 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04611 errmsg("could not serialize access due to read/write dependencies among transactions"),
04612 errdetail_internal("Reason code: Canceled on conflict out to pivot %u, during read.", writer->topXid),
04613 errhint("The transaction might succeed if retried.")));
04614 }
04615 writer->flags |= SXACT_FLAG_DOOMED;
04616 }
04617 }
04618
04619
04620
04621
04622
04623
04624
04625
04626
04627
04628
04629
04630
04631
04632
04633
04634
04635 void
04636 PreCommit_CheckForSerializationFailure(void)
04637 {
04638 RWConflict nearConflict;
04639
04640 if (MySerializableXact == InvalidSerializableXact)
04641 return;
04642
04643 Assert(IsolationIsSerializable());
04644
04645 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
04646
04647
04648 if (SxactIsDoomed(MySerializableXact))
04649 {
04650 LWLockRelease(SerializableXactHashLock);
04651 ereport(ERROR,
04652 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04653 errmsg("could not serialize access due to read/write dependencies among transactions"),
04654 errdetail_internal("Reason code: Canceled on identification as a pivot, during commit attempt."),
04655 errhint("The transaction might succeed if retried.")));
04656 }
04657
04658 nearConflict = (RWConflict)
04659 SHMQueueNext(&MySerializableXact->inConflicts,
04660 &MySerializableXact->inConflicts,
04661 offsetof(RWConflictData, inLink));
04662 while (nearConflict)
04663 {
04664 if (!SxactIsCommitted(nearConflict->sxactOut)
04665 && !SxactIsDoomed(nearConflict->sxactOut))
04666 {
04667 RWConflict farConflict;
04668
04669 farConflict = (RWConflict)
04670 SHMQueueNext(&nearConflict->sxactOut->inConflicts,
04671 &nearConflict->sxactOut->inConflicts,
04672 offsetof(RWConflictData, inLink));
04673 while (farConflict)
04674 {
04675 if (farConflict->sxactOut == MySerializableXact
04676 || (!SxactIsCommitted(farConflict->sxactOut)
04677 && !SxactIsReadOnly(farConflict->sxactOut)
04678 && !SxactIsDoomed(farConflict->sxactOut)))
04679 {
04680
04681
04682
04683
04684
04685
04686 if (SxactIsPrepared(nearConflict->sxactOut))
04687 {
04688 LWLockRelease(SerializableXactHashLock);
04689 ereport(ERROR,
04690 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04691 errmsg("could not serialize access due to read/write dependencies among transactions"),
04692 errdetail_internal("Reason code: Canceled on commit attempt with conflict in from prepared pivot."),
04693 errhint("The transaction might succeed if retried.")));
04694 }
04695 nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
04696 break;
04697 }
04698 farConflict = (RWConflict)
04699 SHMQueueNext(&nearConflict->sxactOut->inConflicts,
04700 &farConflict->inLink,
04701 offsetof(RWConflictData, inLink));
04702 }
04703 }
04704
04705 nearConflict = (RWConflict)
04706 SHMQueueNext(&MySerializableXact->inConflicts,
04707 &nearConflict->inLink,
04708 offsetof(RWConflictData, inLink));
04709 }
04710
04711 MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
04712 MySerializableXact->flags |= SXACT_FLAG_PREPARED;
04713
04714 LWLockRelease(SerializableXactHashLock);
04715 }
04716
04717
04718
04719
04720
04721
04722
04723
04724
04725
04726
04727
04728 void
04729 AtPrepare_PredicateLocks(void)
04730 {
04731 PREDICATELOCK *predlock;
04732 SERIALIZABLEXACT *sxact;
04733 TwoPhasePredicateRecord record;
04734 TwoPhasePredicateXactRecord *xactRecord;
04735 TwoPhasePredicateLockRecord *lockRecord;
04736
04737 sxact = MySerializableXact;
04738 xactRecord = &(record.data.xactRecord);
04739 lockRecord = &(record.data.lockRecord);
04740
04741 if (MySerializableXact == InvalidSerializableXact)
04742 return;
04743
04744
04745 record.type = TWOPHASEPREDICATERECORD_XACT;
04746 xactRecord->xmin = MySerializableXact->xmin;
04747 xactRecord->flags = MySerializableXact->flags;
04748
04749
04750
04751
04752
04753
04754
04755
04756 RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
04757 &record, sizeof(record));
04758
04759
04760
04761
04762
04763
04764
04765
04766 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
04767
04768 predlock = (PREDICATELOCK *)
04769 SHMQueueNext(&(sxact->predicateLocks),
04770 &(sxact->predicateLocks),
04771 offsetof(PREDICATELOCK, xactLink));
04772
04773 while (predlock != NULL)
04774 {
04775 record.type = TWOPHASEPREDICATERECORD_LOCK;
04776 lockRecord->target = predlock->tag.myTarget->tag;
04777
04778 RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
04779 &record, sizeof(record));
04780
04781 predlock = (PREDICATELOCK *)
04782 SHMQueueNext(&(sxact->predicateLocks),
04783 &(predlock->xactLink),
04784 offsetof(PREDICATELOCK, xactLink));
04785 }
04786
04787 LWLockRelease(SerializablePredicateLockListLock);
04788 }
04789
04790
04791
04792
04793
04794
04795
04796
04797 void
04798 PostPrepare_PredicateLocks(TransactionId xid)
04799 {
04800 if (MySerializableXact == InvalidSerializableXact)
04801 return;
04802
04803 Assert(SxactIsPrepared(MySerializableXact));
04804
04805 MySerializableXact->pid = 0;
04806
04807 hash_destroy(LocalPredicateLockHash);
04808 LocalPredicateLockHash = NULL;
04809
04810 MySerializableXact = InvalidSerializableXact;
04811 MyXactDidWrite = false;
04812 }
04813
04814
04815
04816
04817
04818
04819 void
04820 PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
04821 {
04822 SERIALIZABLEXID *sxid;
04823 SERIALIZABLEXIDTAG sxidtag;
04824
04825 sxidtag.xid = xid;
04826
04827 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04828 sxid = (SERIALIZABLEXID *)
04829 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
04830 LWLockRelease(SerializableXactHashLock);
04831
04832
04833 if (sxid == NULL)
04834 return;
04835
04836
04837 MySerializableXact = sxid->myXact;
04838 MyXactDidWrite = true;
04839
04840 ReleasePredicateLocks(isCommit);
04841 }
04842
04843
04844
04845
04846 void
04847 predicatelock_twophase_recover(TransactionId xid, uint16 info,
04848 void *recdata, uint32 len)
04849 {
04850 TwoPhasePredicateRecord *record;
04851
04852 Assert(len == sizeof(TwoPhasePredicateRecord));
04853
04854 record = (TwoPhasePredicateRecord *) recdata;
04855
04856 Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
04857 (record->type == TWOPHASEPREDICATERECORD_LOCK));
04858
04859 if (record->type == TWOPHASEPREDICATERECORD_XACT)
04860 {
04861
04862 TwoPhasePredicateXactRecord *xactRecord;
04863 SERIALIZABLEXACT *sxact;
04864 SERIALIZABLEXID *sxid;
04865 SERIALIZABLEXIDTAG sxidtag;
04866 bool found;
04867
04868 xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
04869
04870 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
04871 sxact = CreatePredXact();
04872 if (!sxact)
04873 ereport(ERROR,
04874 (errcode(ERRCODE_OUT_OF_MEMORY),
04875 errmsg("out of shared memory")));
04876
04877
04878 sxact->vxid.backendId = InvalidBackendId;
04879 sxact->vxid.localTransactionId = (LocalTransactionId) xid;
04880 sxact->pid = 0;
04881
04882
04883 sxact->prepareSeqNo = RecoverySerCommitSeqNo;
04884 sxact->commitSeqNo = InvalidSerCommitSeqNo;
04885 sxact->finishedBefore = InvalidTransactionId;
04886
04887 sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
04888
04889
04890
04891
04892
04893
04894 SHMQueueInit(&(sxact->possibleUnsafeConflicts));
04895
04896 SHMQueueInit(&(sxact->predicateLocks));
04897 SHMQueueElemInit(&(sxact->finishedLink));
04898
04899 sxact->topXid = xid;
04900 sxact->xmin = xactRecord->xmin;
04901 sxact->flags = xactRecord->flags;
04902 Assert(SxactIsPrepared(sxact));
04903 if (!SxactIsReadOnly(sxact))
04904 {
04905 ++(PredXact->WritableSxactCount);
04906 Assert(PredXact->WritableSxactCount <=
04907 (MaxBackends + max_prepared_xacts));
04908 }
04909
04910
04911
04912
04913
04914
04915 SHMQueueInit(&(sxact->outConflicts));
04916 SHMQueueInit(&(sxact->inConflicts));
04917 sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
04918 sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
04919
04920
04921 sxidtag.xid = xid;
04922 sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
04923 &sxidtag,
04924 HASH_ENTER, &found);
04925 Assert(sxid != NULL);
04926 Assert(!found);
04927 sxid->myXact = (SERIALIZABLEXACT *) sxact;
04928
04929
04930
04931
04932
04933
04934
04935
04936 if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
04937 (TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
04938 {
04939 PredXact->SxactGlobalXmin = sxact->xmin;
04940 PredXact->SxactGlobalXminCount = 1;
04941 OldSerXidSetActiveSerXmin(sxact->xmin);
04942 }
04943 else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
04944 {
04945 Assert(PredXact->SxactGlobalXminCount > 0);
04946 PredXact->SxactGlobalXminCount++;
04947 }
04948
04949 LWLockRelease(SerializableXactHashLock);
04950 }
04951 else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
04952 {
04953
04954 TwoPhasePredicateLockRecord *lockRecord;
04955 SERIALIZABLEXID *sxid;
04956 SERIALIZABLEXACT *sxact;
04957 SERIALIZABLEXIDTAG sxidtag;
04958 uint32 targettaghash;
04959
04960 lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
04961 targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
04962
04963 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04964 sxidtag.xid = xid;
04965 sxid = (SERIALIZABLEXID *)
04966 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
04967 LWLockRelease(SerializableXactHashLock);
04968
04969 Assert(sxid != NULL);
04970 sxact = sxid->myXact;
04971 Assert(sxact != InvalidSerializableXact);
04972
04973 CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
04974 }
04975 }