00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "postgres.h"
00037
00038 #include <fcntl.h>
00039 #include <sys/stat.h>
00040 #include <sys/types.h>
00041 #include <time.h>
00042 #include <unistd.h>
00043
00044 #include "access/htup_details.h"
00045 #include "access/subtrans.h"
00046 #include "access/transam.h"
00047 #include "access/twophase.h"
00048 #include "access/twophase_rmgr.h"
00049 #include "access/xact.h"
00050 #include "access/xlogutils.h"
00051 #include "catalog/pg_type.h"
00052 #include "catalog/storage.h"
00053 #include "funcapi.h"
00054 #include "miscadmin.h"
00055 #include "pg_trace.h"
00056 #include "pgstat.h"
00057 #include "replication/walsender.h"
00058 #include "replication/syncrep.h"
00059 #include "storage/fd.h"
00060 #include "storage/predicate.h"
00061 #include "storage/proc.h"
00062 #include "storage/procarray.h"
00063 #include "storage/sinvaladt.h"
00064 #include "storage/smgr.h"
00065 #include "utils/builtins.h"
00066 #include "utils/memutils.h"
00067 #include "utils/timestamp.h"
00068
00069
00070
00071
00072
00073 #define TWOPHASE_DIR "pg_twophase"
00074
00075
00076 int max_prepared_xacts = 0;
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107 #define GIDSIZE 200
00108
00109 typedef struct GlobalTransactionData
00110 {
00111 GlobalTransaction next;
00112 int pgprocno;
00113 BackendId dummyBackendId;
00114 TimestampTz prepared_at;
00115 XLogRecPtr prepare_lsn;
00116 Oid owner;
00117 TransactionId locking_xid;
00118 bool valid;
00119 char gid[GIDSIZE];
00120 } GlobalTransactionData;
00121
00122
00123
00124
00125
00126 typedef struct TwoPhaseStateData
00127 {
00128
00129 GlobalTransaction freeGXacts;
00130
00131
00132 int numPrepXacts;
00133
00134
00135
00136
00137
00138 GlobalTransaction prepXacts[1];
00139 } TwoPhaseStateData;
00140
00141 static TwoPhaseStateData *TwoPhaseState;
00142
00143
00144 static void RecordTransactionCommitPrepared(TransactionId xid,
00145 int nchildren,
00146 TransactionId *children,
00147 int nrels,
00148 RelFileNode *rels,
00149 int ninvalmsgs,
00150 SharedInvalidationMessage *invalmsgs,
00151 bool initfileinval);
00152 static void RecordTransactionAbortPrepared(TransactionId xid,
00153 int nchildren,
00154 TransactionId *children,
00155 int nrels,
00156 RelFileNode *rels);
00157 static void ProcessRecords(char *bufptr, TransactionId xid,
00158 const TwoPhaseCallback callbacks[]);
00159
00160
00161
00162
00163
00164 Size
00165 TwoPhaseShmemSize(void)
00166 {
00167 Size size;
00168
00169
00170 size = offsetof(TwoPhaseStateData, prepXacts);
00171 size = add_size(size, mul_size(max_prepared_xacts,
00172 sizeof(GlobalTransaction)));
00173 size = MAXALIGN(size);
00174 size = add_size(size, mul_size(max_prepared_xacts,
00175 sizeof(GlobalTransactionData)));
00176
00177 return size;
00178 }
00179
00180 void
00181 TwoPhaseShmemInit(void)
00182 {
00183 bool found;
00184
00185 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
00186 TwoPhaseShmemSize(),
00187 &found);
00188 if (!IsUnderPostmaster)
00189 {
00190 GlobalTransaction gxacts;
00191 int i;
00192
00193 Assert(!found);
00194 TwoPhaseState->freeGXacts = NULL;
00195 TwoPhaseState->numPrepXacts = 0;
00196
00197
00198
00199
00200 gxacts = (GlobalTransaction)
00201 ((char *) TwoPhaseState +
00202 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
00203 sizeof(GlobalTransaction) * max_prepared_xacts));
00204 for (i = 0; i < max_prepared_xacts; i++)
00205 {
00206
00207 gxacts[i].next = TwoPhaseState->freeGXacts;
00208 TwoPhaseState->freeGXacts = &gxacts[i];
00209
00210
00211 gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225 gxacts[i].dummyBackendId = MaxBackends + 1 + i;
00226 }
00227 }
00228 else
00229 Assert(found);
00230 }
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241 GlobalTransaction
00242 MarkAsPreparing(TransactionId xid, const char *gid,
00243 TimestampTz prepared_at, Oid owner, Oid databaseid)
00244 {
00245 GlobalTransaction gxact;
00246 PGPROC *proc;
00247 PGXACT *pgxact;
00248 int i;
00249
00250 if (strlen(gid) >= GIDSIZE)
00251 ereport(ERROR,
00252 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00253 errmsg("transaction identifier \"%s\" is too long",
00254 gid)));
00255
00256
00257 if (max_prepared_xacts == 0)
00258 ereport(ERROR,
00259 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00260 errmsg("prepared transactions are disabled"),
00261 errhint("Set max_prepared_transactions to a nonzero value.")));
00262
00263 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
00264
00265
00266
00267
00268
00269
00270 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00271 {
00272 gxact = TwoPhaseState->prepXacts[i];
00273 if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
00274 {
00275
00276 TwoPhaseState->numPrepXacts--;
00277 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
00278
00279 gxact->next = TwoPhaseState->freeGXacts;
00280 TwoPhaseState->freeGXacts = gxact;
00281
00282 i--;
00283 }
00284 }
00285
00286
00287 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00288 {
00289 gxact = TwoPhaseState->prepXacts[i];
00290 if (strcmp(gxact->gid, gid) == 0)
00291 {
00292 ereport(ERROR,
00293 (errcode(ERRCODE_DUPLICATE_OBJECT),
00294 errmsg("transaction identifier \"%s\" is already in use",
00295 gid)));
00296 }
00297 }
00298
00299
00300 if (TwoPhaseState->freeGXacts == NULL)
00301 ereport(ERROR,
00302 (errcode(ERRCODE_OUT_OF_MEMORY),
00303 errmsg("maximum number of prepared transactions reached"),
00304 errhint("Increase max_prepared_transactions (currently %d).",
00305 max_prepared_xacts)));
00306 gxact = TwoPhaseState->freeGXacts;
00307 TwoPhaseState->freeGXacts = gxact->next;
00308
00309 proc = &ProcGlobal->allProcs[gxact->pgprocno];
00310 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00311
00312
00313 MemSet(proc, 0, sizeof(PGPROC));
00314 proc->pgprocno = gxact->pgprocno;
00315 SHMQueueElemInit(&(proc->links));
00316 proc->waitStatus = STATUS_OK;
00317
00318 proc->lxid = (LocalTransactionId) xid;
00319 pgxact->xid = xid;
00320 pgxact->xmin = InvalidTransactionId;
00321 pgxact->delayChkpt = false;
00322 pgxact->vacuumFlags = 0;
00323 proc->pid = 0;
00324 proc->backendId = InvalidBackendId;
00325 proc->databaseId = databaseid;
00326 proc->roleId = owner;
00327 proc->lwWaiting = false;
00328 proc->lwWaitMode = 0;
00329 proc->lwWaitLink = NULL;
00330 proc->waitLock = NULL;
00331 proc->waitProcLock = NULL;
00332 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
00333 SHMQueueInit(&(proc->myProcLocks[i]));
00334
00335 pgxact->overflowed = false;
00336 pgxact->nxids = 0;
00337
00338 gxact->prepared_at = prepared_at;
00339
00340 gxact->prepare_lsn = 0;
00341 gxact->owner = owner;
00342 gxact->locking_xid = xid;
00343 gxact->valid = false;
00344 strcpy(gxact->gid, gid);
00345
00346
00347 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
00348 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
00349
00350 LWLockRelease(TwoPhaseStateLock);
00351
00352 return gxact;
00353 }
00354
00355
00356
00357
00358
00359
00360
00361
00362 static void
00363 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
00364 TransactionId *children)
00365 {
00366 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
00367 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00368
00369
00370 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
00371 {
00372 pgxact->overflowed = true;
00373 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
00374 }
00375 if (nsubxacts > 0)
00376 {
00377 memcpy(proc->subxids.xids, children,
00378 nsubxacts * sizeof(TransactionId));
00379 pgxact->nxids = nsubxacts;
00380 }
00381 }
00382
00383
00384
00385
00386
00387 static void
00388 MarkAsPrepared(GlobalTransaction gxact)
00389 {
00390
00391 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
00392 Assert(!gxact->valid);
00393 gxact->valid = true;
00394 LWLockRelease(TwoPhaseStateLock);
00395
00396
00397
00398
00399
00400 ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
00401 }
00402
00403
00404
00405
00406
00407 static GlobalTransaction
00408 LockGXact(const char *gid, Oid user)
00409 {
00410 int i;
00411
00412 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
00413
00414 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00415 {
00416 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
00417 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
00418
00419
00420 if (!gxact->valid)
00421 continue;
00422 if (strcmp(gxact->gid, gid) != 0)
00423 continue;
00424
00425
00426 if (TransactionIdIsValid(gxact->locking_xid))
00427 {
00428 if (TransactionIdIsActive(gxact->locking_xid))
00429 ereport(ERROR,
00430 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00431 errmsg("prepared transaction with identifier \"%s\" is busy",
00432 gid)));
00433 gxact->locking_xid = InvalidTransactionId;
00434 }
00435
00436 if (user != gxact->owner && !superuser_arg(user))
00437 ereport(ERROR,
00438 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
00439 errmsg("permission denied to finish prepared transaction"),
00440 errhint("Must be superuser or the user that prepared the transaction.")));
00441
00442
00443
00444
00445
00446
00447
00448 if (MyDatabaseId != proc->databaseId)
00449 ereport(ERROR,
00450 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00451 errmsg("prepared transaction belongs to another database"),
00452 errhint("Connect to the database where the transaction was prepared to finish it.")));
00453
00454
00455 gxact->locking_xid = GetTopTransactionId();
00456
00457 LWLockRelease(TwoPhaseStateLock);
00458
00459 return gxact;
00460 }
00461
00462 LWLockRelease(TwoPhaseStateLock);
00463
00464 ereport(ERROR,
00465 (errcode(ERRCODE_UNDEFINED_OBJECT),
00466 errmsg("prepared transaction with identifier \"%s\" does not exist",
00467 gid)));
00468
00469
00470 return NULL;
00471 }
00472
00473
00474
00475
00476
00477
00478
00479 static void
00480 RemoveGXact(GlobalTransaction gxact)
00481 {
00482 int i;
00483
00484 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
00485
00486 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00487 {
00488 if (gxact == TwoPhaseState->prepXacts[i])
00489 {
00490
00491 TwoPhaseState->numPrepXacts--;
00492 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
00493
00494
00495 gxact->next = TwoPhaseState->freeGXacts;
00496 TwoPhaseState->freeGXacts = gxact;
00497
00498 LWLockRelease(TwoPhaseStateLock);
00499
00500 return;
00501 }
00502 }
00503
00504 LWLockRelease(TwoPhaseStateLock);
00505
00506 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
00507 }
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519 static bool
00520 TransactionIdIsPrepared(TransactionId xid)
00521 {
00522 bool result = false;
00523 int i;
00524
00525 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
00526
00527 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00528 {
00529 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
00530 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00531
00532 if (gxact->valid && pgxact->xid == xid)
00533 {
00534 result = true;
00535 break;
00536 }
00537 }
00538
00539 LWLockRelease(TwoPhaseStateLock);
00540
00541 return result;
00542 }
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556 static int
00557 GetPreparedTransactionList(GlobalTransaction *gxacts)
00558 {
00559 GlobalTransaction array;
00560 int num;
00561 int i;
00562
00563 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
00564
00565 if (TwoPhaseState->numPrepXacts == 0)
00566 {
00567 LWLockRelease(TwoPhaseStateLock);
00568
00569 *gxacts = NULL;
00570 return 0;
00571 }
00572
00573 num = TwoPhaseState->numPrepXacts;
00574 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
00575 *gxacts = array;
00576 for (i = 0; i < num; i++)
00577 memcpy(array + i, TwoPhaseState->prepXacts[i],
00578 sizeof(GlobalTransactionData));
00579
00580 LWLockRelease(TwoPhaseStateLock);
00581
00582 return num;
00583 }
00584
00585
00586
00587 typedef struct
00588 {
00589 GlobalTransaction array;
00590 int ngxacts;
00591 int currIdx;
00592 } Working_State;
00593
00594
00595
00596
00597
00598
00599
00600
00601 Datum
00602 pg_prepared_xact(PG_FUNCTION_ARGS)
00603 {
00604 FuncCallContext *funcctx;
00605 Working_State *status;
00606
00607 if (SRF_IS_FIRSTCALL())
00608 {
00609 TupleDesc tupdesc;
00610 MemoryContext oldcontext;
00611
00612
00613 funcctx = SRF_FIRSTCALL_INIT();
00614
00615
00616
00617
00618 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
00619
00620
00621
00622 tupdesc = CreateTemplateTupleDesc(5, false);
00623 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
00624 XIDOID, -1, 0);
00625 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
00626 TEXTOID, -1, 0);
00627 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
00628 TIMESTAMPTZOID, -1, 0);
00629 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
00630 OIDOID, -1, 0);
00631 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
00632 OIDOID, -1, 0);
00633
00634 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
00635
00636
00637
00638
00639
00640 status = (Working_State *) palloc(sizeof(Working_State));
00641 funcctx->user_fctx = (void *) status;
00642
00643 status->ngxacts = GetPreparedTransactionList(&status->array);
00644 status->currIdx = 0;
00645
00646 MemoryContextSwitchTo(oldcontext);
00647 }
00648
00649 funcctx = SRF_PERCALL_SETUP();
00650 status = (Working_State *) funcctx->user_fctx;
00651
00652 while (status->array != NULL && status->currIdx < status->ngxacts)
00653 {
00654 GlobalTransaction gxact = &status->array[status->currIdx++];
00655 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
00656 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00657 Datum values[5];
00658 bool nulls[5];
00659 HeapTuple tuple;
00660 Datum result;
00661
00662 if (!gxact->valid)
00663 continue;
00664
00665
00666
00667
00668 MemSet(values, 0, sizeof(values));
00669 MemSet(nulls, 0, sizeof(nulls));
00670
00671 values[0] = TransactionIdGetDatum(pgxact->xid);
00672 values[1] = CStringGetTextDatum(gxact->gid);
00673 values[2] = TimestampTzGetDatum(gxact->prepared_at);
00674 values[3] = ObjectIdGetDatum(gxact->owner);
00675 values[4] = ObjectIdGetDatum(proc->databaseId);
00676
00677 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
00678 result = HeapTupleGetDatum(tuple);
00679 SRF_RETURN_NEXT(funcctx, result);
00680 }
00681
00682 SRF_RETURN_DONE(funcctx);
00683 }
00684
00685
00686
00687
00688
00689
00690 static GlobalTransaction
00691 TwoPhaseGetGXact(TransactionId xid)
00692 {
00693 GlobalTransaction result = NULL;
00694 int i;
00695
00696 static TransactionId cached_xid = InvalidTransactionId;
00697 static GlobalTransaction cached_gxact = NULL;
00698
00699
00700
00701
00702
00703 if (xid == cached_xid)
00704 return cached_gxact;
00705
00706 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
00707
00708 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00709 {
00710 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
00711 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00712
00713 if (pgxact->xid == xid)
00714 {
00715 result = gxact;
00716 break;
00717 }
00718 }
00719
00720 LWLockRelease(TwoPhaseStateLock);
00721
00722 if (result == NULL)
00723 elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
00724
00725 cached_xid = xid;
00726 cached_gxact = result;
00727
00728 return result;
00729 }
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739 BackendId
00740 TwoPhaseGetDummyBackendId(TransactionId xid)
00741 {
00742 GlobalTransaction gxact = TwoPhaseGetGXact(xid);
00743
00744 return gxact->dummyBackendId;
00745 }
00746
00747
00748
00749
00750
00751 PGPROC *
00752 TwoPhaseGetDummyProc(TransactionId xid)
00753 {
00754 GlobalTransaction gxact = TwoPhaseGetGXact(xid);
00755
00756 return &ProcGlobal->allProcs[gxact->pgprocno];
00757 }
00758
00759
00760
00761
00762
00763 #define TwoPhaseFilePath(path, xid) \
00764 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785 #define TWOPHASE_MAGIC 0x57F94532
00786
00787 typedef struct TwoPhaseFileHeader
00788 {
00789 uint32 magic;
00790 uint32 total_len;
00791 TransactionId xid;
00792 Oid database;
00793 TimestampTz prepared_at;
00794 Oid owner;
00795 int32 nsubxacts;
00796 int32 ncommitrels;
00797 int32 nabortrels;
00798 int32 ninvalmsgs;
00799 bool initfileinval;
00800 char gid[GIDSIZE];
00801 } TwoPhaseFileHeader;
00802
00803
00804
00805
00806
00807
00808
00809 typedef struct TwoPhaseRecordOnDisk
00810 {
00811 uint32 len;
00812 TwoPhaseRmgrId rmid;
00813 uint16 info;
00814 } TwoPhaseRecordOnDisk;
00815
00816
00817
00818
00819
00820
00821
00822 static struct xllist
00823 {
00824 XLogRecData *head;
00825 XLogRecData *tail;
00826 uint32 bytes_free;
00827 uint32 total_len;
00828 } records;
00829
00830
00831
00832
00833
00834
00835
00836
00837
00838
00839 static void
00840 save_state_data(const void *data, uint32 len)
00841 {
00842 uint32 padlen = MAXALIGN(len);
00843
00844 if (padlen > records.bytes_free)
00845 {
00846 records.tail->next = palloc0(sizeof(XLogRecData));
00847 records.tail = records.tail->next;
00848 records.tail->buffer = InvalidBuffer;
00849 records.tail->len = 0;
00850 records.tail->next = NULL;
00851
00852 records.bytes_free = Max(padlen, 512);
00853 records.tail->data = palloc(records.bytes_free);
00854 }
00855
00856 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
00857 records.tail->len += padlen;
00858 records.bytes_free -= padlen;
00859 records.total_len += padlen;
00860 }
00861
00862
00863
00864
00865
00866
00867 void
00868 StartPrepare(GlobalTransaction gxact)
00869 {
00870 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
00871 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00872 TransactionId xid = pgxact->xid;
00873 TwoPhaseFileHeader hdr;
00874 TransactionId *children;
00875 RelFileNode *commitrels;
00876 RelFileNode *abortrels;
00877 SharedInvalidationMessage *invalmsgs;
00878
00879
00880 records.head = palloc0(sizeof(XLogRecData));
00881 records.head->buffer = InvalidBuffer;
00882 records.head->len = 0;
00883 records.head->next = NULL;
00884
00885 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
00886 records.head->data = palloc(records.bytes_free);
00887
00888 records.tail = records.head;
00889
00890 records.total_len = 0;
00891
00892
00893 hdr.magic = TWOPHASE_MAGIC;
00894 hdr.total_len = 0;
00895 hdr.xid = xid;
00896 hdr.database = proc->databaseId;
00897 hdr.prepared_at = gxact->prepared_at;
00898 hdr.owner = gxact->owner;
00899 hdr.nsubxacts = xactGetCommittedChildren(&children);
00900 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
00901 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
00902 hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
00903 &hdr.initfileinval);
00904 StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
00905
00906 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
00907
00908
00909
00910
00911
00912 if (hdr.nsubxacts > 0)
00913 {
00914 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
00915
00916 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
00917 }
00918 if (hdr.ncommitrels > 0)
00919 {
00920 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
00921 pfree(commitrels);
00922 }
00923 if (hdr.nabortrels > 0)
00924 {
00925 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
00926 pfree(abortrels);
00927 }
00928 if (hdr.ninvalmsgs > 0)
00929 {
00930 save_state_data(invalmsgs,
00931 hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
00932 pfree(invalmsgs);
00933 }
00934 }
00935
00936
00937
00938
00939
00940
00941 void
00942 EndPrepare(GlobalTransaction gxact)
00943 {
00944 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00945 TransactionId xid = pgxact->xid;
00946 TwoPhaseFileHeader *hdr;
00947 char path[MAXPGPATH];
00948 XLogRecData *record;
00949 pg_crc32 statefile_crc;
00950 pg_crc32 bogus_crc;
00951 int fd;
00952
00953
00954 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
00955 NULL, 0);
00956
00957
00958 hdr = (TwoPhaseFileHeader *) records.head->data;
00959 Assert(hdr->magic == TWOPHASE_MAGIC);
00960 hdr->total_len = records.total_len + sizeof(pg_crc32);
00961
00962
00963
00964
00965
00966 if (hdr->total_len > MaxAllocSize)
00967 ereport(ERROR,
00968 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
00969 errmsg("two-phase state file maximum length exceeded")));
00970
00971
00972
00973
00974 TwoPhaseFilePath(path, xid);
00975
00976 fd = OpenTransientFile(path,
00977 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
00978 S_IRUSR | S_IWUSR);
00979 if (fd < 0)
00980 ereport(ERROR,
00981 (errcode_for_file_access(),
00982 errmsg("could not create two-phase state file \"%s\": %m",
00983 path)));
00984
00985
00986 INIT_CRC32(statefile_crc);
00987
00988 for (record = records.head; record != NULL; record = record->next)
00989 {
00990 COMP_CRC32(statefile_crc, record->data, record->len);
00991 if ((write(fd, record->data, record->len)) != record->len)
00992 {
00993 CloseTransientFile(fd);
00994 ereport(ERROR,
00995 (errcode_for_file_access(),
00996 errmsg("could not write two-phase state file: %m")));
00997 }
00998 }
00999
01000 FIN_CRC32(statefile_crc);
01001
01002
01003
01004
01005
01006 bogus_crc = ~statefile_crc;
01007
01008 if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
01009 {
01010 CloseTransientFile(fd);
01011 ereport(ERROR,
01012 (errcode_for_file_access(),
01013 errmsg("could not write two-phase state file: %m")));
01014 }
01015
01016
01017 if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
01018 {
01019 CloseTransientFile(fd);
01020 ereport(ERROR,
01021 (errcode_for_file_access(),
01022 errmsg("could not seek in two-phase state file: %m")));
01023 }
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043
01044
01045
01046 START_CRIT_SECTION();
01047
01048 MyPgXact->delayChkpt = true;
01049
01050 gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
01051 records.head);
01052 XLogFlush(gxact->prepare_lsn);
01053
01054
01055
01056
01057 if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
01058 {
01059 CloseTransientFile(fd);
01060 ereport(ERROR,
01061 (errcode_for_file_access(),
01062 errmsg("could not write two-phase state file: %m")));
01063 }
01064
01065 if (CloseTransientFile(fd) != 0)
01066 ereport(ERROR,
01067 (errcode_for_file_access(),
01068 errmsg("could not close two-phase state file: %m")));
01069
01070
01071
01072
01073
01074
01075
01076
01077
01078
01079
01080
01081
01082 MarkAsPrepared(gxact);
01083
01084
01085
01086
01087
01088
01089 MyPgXact->delayChkpt = false;
01090
01091 END_CRIT_SECTION();
01092
01093
01094
01095
01096
01097
01098
01099 SyncRepWaitForLSN(gxact->prepare_lsn);
01100
01101 records.tail = records.head = NULL;
01102 }
01103
01104
01105
01106
01107 void
01108 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
01109 const void *data, uint32 len)
01110 {
01111 TwoPhaseRecordOnDisk record;
01112
01113 record.rmid = rmid;
01114 record.info = info;
01115 record.len = len;
01116 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
01117 if (len > 0)
01118 save_state_data(data, len);
01119 }
01120
01121
01122
01123
01124
01125
01126
01127
01128 static char *
01129 ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
01130 {
01131 char path[MAXPGPATH];
01132 char *buf;
01133 TwoPhaseFileHeader *hdr;
01134 int fd;
01135 struct stat stat;
01136 uint32 crc_offset;
01137 pg_crc32 calc_crc,
01138 file_crc;
01139
01140 TwoPhaseFilePath(path, xid);
01141
01142 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
01143 if (fd < 0)
01144 {
01145 if (give_warnings)
01146 ereport(WARNING,
01147 (errcode_for_file_access(),
01148 errmsg("could not open two-phase state file \"%s\": %m",
01149 path)));
01150 return NULL;
01151 }
01152
01153
01154
01155
01156
01157
01158
01159 if (fstat(fd, &stat))
01160 {
01161 CloseTransientFile(fd);
01162 if (give_warnings)
01163 ereport(WARNING,
01164 (errcode_for_file_access(),
01165 errmsg("could not stat two-phase state file \"%s\": %m",
01166 path)));
01167 return NULL;
01168 }
01169
01170 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
01171 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
01172 sizeof(pg_crc32)) ||
01173 stat.st_size > MaxAllocSize)
01174 {
01175 CloseTransientFile(fd);
01176 return NULL;
01177 }
01178
01179 crc_offset = stat.st_size - sizeof(pg_crc32);
01180 if (crc_offset != MAXALIGN(crc_offset))
01181 {
01182 CloseTransientFile(fd);
01183 return NULL;
01184 }
01185
01186
01187
01188
01189 buf = (char *) palloc(stat.st_size);
01190
01191 if (read(fd, buf, stat.st_size) != stat.st_size)
01192 {
01193 CloseTransientFile(fd);
01194 if (give_warnings)
01195 ereport(WARNING,
01196 (errcode_for_file_access(),
01197 errmsg("could not read two-phase state file \"%s\": %m",
01198 path)));
01199 pfree(buf);
01200 return NULL;
01201 }
01202
01203 CloseTransientFile(fd);
01204
01205 hdr = (TwoPhaseFileHeader *) buf;
01206 if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
01207 {
01208 pfree(buf);
01209 return NULL;
01210 }
01211
01212 INIT_CRC32(calc_crc);
01213 COMP_CRC32(calc_crc, buf, crc_offset);
01214 FIN_CRC32(calc_crc);
01215
01216 file_crc = *((pg_crc32 *) (buf + crc_offset));
01217
01218 if (!EQ_CRC32(calc_crc, file_crc))
01219 {
01220 pfree(buf);
01221 return NULL;
01222 }
01223
01224 return buf;
01225 }
01226
01227
01228
01229
01230 bool
01231 StandbyTransactionIdIsPrepared(TransactionId xid)
01232 {
01233 char *buf;
01234 TwoPhaseFileHeader *hdr;
01235 bool result;
01236
01237 Assert(TransactionIdIsValid(xid));
01238
01239 if (max_prepared_xacts <= 0)
01240 return false;
01241
01242
01243 buf = ReadTwoPhaseFile(xid, false);
01244 if (buf == NULL)
01245 return false;
01246
01247
01248 hdr = (TwoPhaseFileHeader *) buf;
01249 result = TransactionIdEquals(hdr->xid, xid);
01250 pfree(buf);
01251
01252 return result;
01253 }
01254
01255
01256
01257
01258 void
01259 FinishPreparedTransaction(const char *gid, bool isCommit)
01260 {
01261 GlobalTransaction gxact;
01262 PGPROC *proc;
01263 PGXACT *pgxact;
01264 TransactionId xid;
01265 char *buf;
01266 char *bufptr;
01267 TwoPhaseFileHeader *hdr;
01268 TransactionId latestXid;
01269 TransactionId *children;
01270 RelFileNode *commitrels;
01271 RelFileNode *abortrels;
01272 RelFileNode *delrels;
01273 int ndelrels;
01274 SharedInvalidationMessage *invalmsgs;
01275 int i;
01276
01277
01278
01279
01280
01281 gxact = LockGXact(gid, GetUserId());
01282 proc = &ProcGlobal->allProcs[gxact->pgprocno];
01283 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
01284 xid = pgxact->xid;
01285
01286
01287
01288
01289 buf = ReadTwoPhaseFile(xid, true);
01290 if (buf == NULL)
01291 ereport(ERROR,
01292 (errcode(ERRCODE_DATA_CORRUPTED),
01293 errmsg("two-phase state file for transaction %u is corrupt",
01294 xid)));
01295
01296
01297
01298
01299 hdr = (TwoPhaseFileHeader *) buf;
01300 Assert(TransactionIdEquals(hdr->xid, xid));
01301 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
01302 children = (TransactionId *) bufptr;
01303 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
01304 commitrels = (RelFileNode *) bufptr;
01305 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
01306 abortrels = (RelFileNode *) bufptr;
01307 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
01308 invalmsgs = (SharedInvalidationMessage *) bufptr;
01309 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
01310
01311
01312 latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
01313
01314
01315
01316
01317
01318
01319
01320
01321
01322 if (isCommit)
01323 RecordTransactionCommitPrepared(xid,
01324 hdr->nsubxacts, children,
01325 hdr->ncommitrels, commitrels,
01326 hdr->ninvalmsgs, invalmsgs,
01327 hdr->initfileinval);
01328 else
01329 RecordTransactionAbortPrepared(xid,
01330 hdr->nsubxacts, children,
01331 hdr->nabortrels, abortrels);
01332
01333 ProcArrayRemove(proc, latestXid);
01334
01335
01336
01337
01338
01339
01340
01341
01342 gxact->valid = false;
01343
01344
01345
01346
01347
01348
01349
01350
01351 if (isCommit)
01352 {
01353 delrels = commitrels;
01354 ndelrels = hdr->ncommitrels;
01355 }
01356 else
01357 {
01358 delrels = abortrels;
01359 ndelrels = hdr->nabortrels;
01360 }
01361 for (i = 0; i < ndelrels; i++)
01362 {
01363 SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
01364
01365 smgrdounlink(srel, false);
01366 smgrclose(srel);
01367 }
01368
01369
01370
01371
01372
01373
01374
01375 if (hdr->initfileinval)
01376 RelationCacheInitFilePreInvalidate();
01377 SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
01378 if (hdr->initfileinval)
01379 RelationCacheInitFilePostInvalidate();
01380
01381
01382 if (isCommit)
01383 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
01384 else
01385 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
01386
01387 PredicateLockTwoPhaseFinish(xid, isCommit);
01388
01389
01390 AtEOXact_PgStat(isCommit);
01391
01392
01393
01394
01395 RemoveTwoPhaseFile(xid, true);
01396
01397 RemoveGXact(gxact);
01398
01399 pfree(buf);
01400 }
01401
01402
01403
01404
01405
01406 static void
01407 ProcessRecords(char *bufptr, TransactionId xid,
01408 const TwoPhaseCallback callbacks[])
01409 {
01410 for (;;)
01411 {
01412 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
01413
01414 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
01415 if (record->rmid == TWOPHASE_RM_END_ID)
01416 break;
01417
01418 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
01419
01420 if (callbacks[record->rmid] != NULL)
01421 callbacks[record->rmid] (xid, record->info,
01422 (void *) bufptr, record->len);
01423
01424 bufptr += MAXALIGN(record->len);
01425 }
01426 }
01427
01428
01429
01430
01431
01432
01433
01434 void
01435 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
01436 {
01437 char path[MAXPGPATH];
01438
01439 TwoPhaseFilePath(path, xid);
01440 if (unlink(path))
01441 if (errno != ENOENT || giveWarning)
01442 ereport(WARNING,
01443 (errcode_for_file_access(),
01444 errmsg("could not remove two-phase state file \"%s\": %m",
01445 path)));
01446 }
01447
01448
01449
01450
01451
01452
01453 void
01454 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
01455 {
01456 char path[MAXPGPATH];
01457 pg_crc32 statefile_crc;
01458 int fd;
01459
01460
01461 INIT_CRC32(statefile_crc);
01462 COMP_CRC32(statefile_crc, content, len);
01463 FIN_CRC32(statefile_crc);
01464
01465 TwoPhaseFilePath(path, xid);
01466
01467 fd = OpenTransientFile(path,
01468 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
01469 S_IRUSR | S_IWUSR);
01470 if (fd < 0)
01471 ereport(ERROR,
01472 (errcode_for_file_access(),
01473 errmsg("could not recreate two-phase state file \"%s\": %m",
01474 path)));
01475
01476
01477 if (write(fd, content, len) != len)
01478 {
01479 CloseTransientFile(fd);
01480 ereport(ERROR,
01481 (errcode_for_file_access(),
01482 errmsg("could not write two-phase state file: %m")));
01483 }
01484 if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
01485 {
01486 CloseTransientFile(fd);
01487 ereport(ERROR,
01488 (errcode_for_file_access(),
01489 errmsg("could not write two-phase state file: %m")));
01490 }
01491
01492
01493
01494
01495
01496 if (pg_fsync(fd) != 0)
01497 {
01498 CloseTransientFile(fd);
01499 ereport(ERROR,
01500 (errcode_for_file_access(),
01501 errmsg("could not fsync two-phase state file: %m")));
01502 }
01503
01504 if (CloseTransientFile(fd) != 0)
01505 ereport(ERROR,
01506 (errcode_for_file_access(),
01507 errmsg("could not close two-phase state file: %m")));
01508 }
01509
01510
01511
01512
01513
01514
01515
01516
01517
01518
01519
01520
01521
01522
01523
01524
01525
01526
01527 void
01528 CheckPointTwoPhase(XLogRecPtr redo_horizon)
01529 {
01530 TransactionId *xids;
01531 int nxids;
01532 char path[MAXPGPATH];
01533 int i;
01534
01535
01536
01537
01538
01539
01540
01541
01542
01543
01544
01545
01546 if (max_prepared_xacts <= 0)
01547 return;
01548
01549 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
01550
01551 xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
01552 nxids = 0;
01553
01554 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
01555
01556 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
01557 {
01558 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
01559 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
01560
01561 if (gxact->valid &&
01562 gxact->prepare_lsn <= redo_horizon)
01563 xids[nxids++] = pgxact->xid;
01564 }
01565
01566 LWLockRelease(TwoPhaseStateLock);
01567
01568 for (i = 0; i < nxids; i++)
01569 {
01570 TransactionId xid = xids[i];
01571 int fd;
01572
01573 TwoPhaseFilePath(path, xid);
01574
01575 fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
01576 if (fd < 0)
01577 {
01578 if (errno == ENOENT)
01579 {
01580
01581 if (!TransactionIdIsPrepared(xid))
01582 continue;
01583
01584 errno = ENOENT;
01585 }
01586 ereport(ERROR,
01587 (errcode_for_file_access(),
01588 errmsg("could not open two-phase state file \"%s\": %m",
01589 path)));
01590 }
01591
01592 if (pg_fsync(fd) != 0)
01593 {
01594 CloseTransientFile(fd);
01595 ereport(ERROR,
01596 (errcode_for_file_access(),
01597 errmsg("could not fsync two-phase state file \"%s\": %m",
01598 path)));
01599 }
01600
01601 if (CloseTransientFile(fd) != 0)
01602 ereport(ERROR,
01603 (errcode_for_file_access(),
01604 errmsg("could not close two-phase state file \"%s\": %m",
01605 path)));
01606 }
01607
01608 pfree(xids);
01609
01610 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
01611 }
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621
01622
01623
01624
01625
01626
01627
01628
01629
01630
01631
01632
01633
01634
01635
01636
01637
01638
01639
01640 TransactionId
01641 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
01642 {
01643 TransactionId origNextXid = ShmemVariableCache->nextXid;
01644 TransactionId result = origNextXid;
01645 DIR *cldir;
01646 struct dirent *clde;
01647 TransactionId *xids = NULL;
01648 int nxids = 0;
01649 int allocsize = 0;
01650
01651 cldir = AllocateDir(TWOPHASE_DIR);
01652 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
01653 {
01654 if (strlen(clde->d_name) == 8 &&
01655 strspn(clde->d_name, "0123456789ABCDEF") == 8)
01656 {
01657 TransactionId xid;
01658 char *buf;
01659 TwoPhaseFileHeader *hdr;
01660 TransactionId *subxids;
01661 int i;
01662
01663 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
01664
01665
01666 if (TransactionIdFollowsOrEquals(xid, origNextXid))
01667 {
01668 ereport(WARNING,
01669 (errmsg("removing future two-phase state file \"%s\"",
01670 clde->d_name)));
01671 RemoveTwoPhaseFile(xid, true);
01672 continue;
01673 }
01674
01675
01676
01677
01678
01679
01680
01681 buf = ReadTwoPhaseFile(xid, true);
01682 if (buf == NULL)
01683 {
01684 ereport(WARNING,
01685 (errmsg("removing corrupt two-phase state file \"%s\"",
01686 clde->d_name)));
01687 RemoveTwoPhaseFile(xid, true);
01688 continue;
01689 }
01690
01691
01692 hdr = (TwoPhaseFileHeader *) buf;
01693 if (!TransactionIdEquals(hdr->xid, xid))
01694 {
01695 ereport(WARNING,
01696 (errmsg("removing corrupt two-phase state file \"%s\"",
01697 clde->d_name)));
01698 RemoveTwoPhaseFile(xid, true);
01699 pfree(buf);
01700 continue;
01701 }
01702
01703
01704
01705
01706
01707 if (TransactionIdPrecedes(xid, result))
01708 result = xid;
01709
01710
01711
01712
01713
01714
01715
01716
01717
01718 subxids = (TransactionId *)
01719 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
01720 for (i = 0; i < hdr->nsubxacts; i++)
01721 {
01722 TransactionId subxid = subxids[i];
01723
01724 Assert(TransactionIdFollows(subxid, xid));
01725 if (TransactionIdFollowsOrEquals(subxid,
01726 ShmemVariableCache->nextXid))
01727 {
01728 LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
01729 ShmemVariableCache->nextXid = subxid;
01730 TransactionIdAdvance(ShmemVariableCache->nextXid);
01731 LWLockRelease(XidGenLock);
01732 }
01733 }
01734
01735
01736 if (xids_p)
01737 {
01738 if (nxids == allocsize)
01739 {
01740 if (nxids == 0)
01741 {
01742 allocsize = 10;
01743 xids = palloc(allocsize * sizeof(TransactionId));
01744 }
01745 else
01746 {
01747 allocsize = allocsize * 2;
01748 xids = repalloc(xids, allocsize * sizeof(TransactionId));
01749 }
01750 }
01751 xids[nxids++] = xid;
01752 }
01753
01754 pfree(buf);
01755 }
01756 }
01757 FreeDir(cldir);
01758
01759 if (xids_p)
01760 {
01761 *xids_p = xids;
01762 *nxids_p = nxids;
01763 }
01764
01765 return result;
01766 }
01767
01768
01769
01770
01771
01772
01773
01774
01775
01776
01777
01778
01779
01780 void
01781 StandbyRecoverPreparedTransactions(bool overwriteOK)
01782 {
01783 DIR *cldir;
01784 struct dirent *clde;
01785
01786 cldir = AllocateDir(TWOPHASE_DIR);
01787 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
01788 {
01789 if (strlen(clde->d_name) == 8 &&
01790 strspn(clde->d_name, "0123456789ABCDEF") == 8)
01791 {
01792 TransactionId xid;
01793 char *buf;
01794 TwoPhaseFileHeader *hdr;
01795 TransactionId *subxids;
01796 int i;
01797
01798 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
01799
01800
01801 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
01802 {
01803 ereport(WARNING,
01804 (errmsg("removing stale two-phase state file \"%s\"",
01805 clde->d_name)));
01806 RemoveTwoPhaseFile(xid, true);
01807 continue;
01808 }
01809
01810
01811 buf = ReadTwoPhaseFile(xid, true);
01812 if (buf == NULL)
01813 {
01814 ereport(WARNING,
01815 (errmsg("removing corrupt two-phase state file \"%s\"",
01816 clde->d_name)));
01817 RemoveTwoPhaseFile(xid, true);
01818 continue;
01819 }
01820
01821
01822 hdr = (TwoPhaseFileHeader *) buf;
01823 if (!TransactionIdEquals(hdr->xid, xid))
01824 {
01825 ereport(WARNING,
01826 (errmsg("removing corrupt two-phase state file \"%s\"",
01827 clde->d_name)));
01828 RemoveTwoPhaseFile(xid, true);
01829 pfree(buf);
01830 continue;
01831 }
01832
01833
01834
01835
01836
01837 subxids = (TransactionId *)
01838 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
01839 for (i = 0; i < hdr->nsubxacts; i++)
01840 {
01841 TransactionId subxid = subxids[i];
01842
01843 Assert(TransactionIdFollows(subxid, xid));
01844 SubTransSetParent(xid, subxid, overwriteOK);
01845 }
01846 }
01847 }
01848 FreeDir(cldir);
01849 }
01850
01851
01852
01853
01854
01855
01856
01857
01858 void
01859 RecoverPreparedTransactions(void)
01860 {
01861 char dir[MAXPGPATH];
01862 DIR *cldir;
01863 struct dirent *clde;
01864 bool overwriteOK = false;
01865
01866 snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
01867
01868 cldir = AllocateDir(dir);
01869 while ((clde = ReadDir(cldir, dir)) != NULL)
01870 {
01871 if (strlen(clde->d_name) == 8 &&
01872 strspn(clde->d_name, "0123456789ABCDEF") == 8)
01873 {
01874 TransactionId xid;
01875 char *buf;
01876 char *bufptr;
01877 TwoPhaseFileHeader *hdr;
01878 TransactionId *subxids;
01879 GlobalTransaction gxact;
01880 int i;
01881
01882 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
01883
01884
01885 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
01886 {
01887 ereport(WARNING,
01888 (errmsg("removing stale two-phase state file \"%s\"",
01889 clde->d_name)));
01890 RemoveTwoPhaseFile(xid, true);
01891 continue;
01892 }
01893
01894
01895 buf = ReadTwoPhaseFile(xid, true);
01896 if (buf == NULL)
01897 {
01898 ereport(WARNING,
01899 (errmsg("removing corrupt two-phase state file \"%s\"",
01900 clde->d_name)));
01901 RemoveTwoPhaseFile(xid, true);
01902 continue;
01903 }
01904
01905 ereport(LOG,
01906 (errmsg("recovering prepared transaction %u", xid)));
01907
01908
01909 hdr = (TwoPhaseFileHeader *) buf;
01910 Assert(TransactionIdEquals(hdr->xid, xid));
01911 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
01912 subxids = (TransactionId *) bufptr;
01913 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
01914 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
01915 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
01916 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
01917
01918
01919
01920
01921
01922
01923 if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
01924 overwriteOK = true;
01925
01926
01927
01928
01929
01930
01931
01932
01933 for (i = 0; i < hdr->nsubxacts; i++)
01934 SubTransSetParent(subxids[i], xid, overwriteOK);
01935
01936
01937
01938
01939
01940
01941
01942
01943
01944
01945
01946 gxact = MarkAsPreparing(xid, hdr->gid,
01947 hdr->prepared_at,
01948 hdr->owner, hdr->database);
01949 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
01950 MarkAsPrepared(gxact);
01951
01952
01953
01954
01955 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
01956
01957
01958
01959
01960
01961
01962 if (InHotStandby)
01963 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
01964
01965 pfree(buf);
01966 }
01967 }
01968 FreeDir(cldir);
01969 }
01970
01971
01972
01973
01974
01975
01976
01977
01978
01979
01980 static void
01981 RecordTransactionCommitPrepared(TransactionId xid,
01982 int nchildren,
01983 TransactionId *children,
01984 int nrels,
01985 RelFileNode *rels,
01986 int ninvalmsgs,
01987 SharedInvalidationMessage *invalmsgs,
01988 bool initfileinval)
01989 {
01990 XLogRecData rdata[4];
01991 int lastrdata = 0;
01992 xl_xact_commit_prepared xlrec;
01993 XLogRecPtr recptr;
01994
01995 START_CRIT_SECTION();
01996
01997
01998 MyPgXact->delayChkpt = true;
01999
02000
02001 xlrec.xid = xid;
02002 xlrec.crec.xact_time = GetCurrentTimestamp();
02003 xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
02004 xlrec.crec.nmsgs = 0;
02005 xlrec.crec.nrels = nrels;
02006 xlrec.crec.nsubxacts = nchildren;
02007 xlrec.crec.nmsgs = ninvalmsgs;
02008
02009 rdata[0].data = (char *) (&xlrec);
02010 rdata[0].len = MinSizeOfXactCommitPrepared;
02011 rdata[0].buffer = InvalidBuffer;
02012
02013 if (nrels > 0)
02014 {
02015 rdata[0].next = &(rdata[1]);
02016 rdata[1].data = (char *) rels;
02017 rdata[1].len = nrels * sizeof(RelFileNode);
02018 rdata[1].buffer = InvalidBuffer;
02019 lastrdata = 1;
02020 }
02021
02022 if (nchildren > 0)
02023 {
02024 rdata[lastrdata].next = &(rdata[2]);
02025 rdata[2].data = (char *) children;
02026 rdata[2].len = nchildren * sizeof(TransactionId);
02027 rdata[2].buffer = InvalidBuffer;
02028 lastrdata = 2;
02029 }
02030
02031 if (ninvalmsgs > 0)
02032 {
02033 rdata[lastrdata].next = &(rdata[3]);
02034 rdata[3].data = (char *) invalmsgs;
02035 rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
02036 rdata[3].buffer = InvalidBuffer;
02037 lastrdata = 3;
02038 }
02039 rdata[lastrdata].next = NULL;
02040
02041 recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
02042
02043
02044
02045
02046
02047
02048
02049
02050 XLogFlush(recptr);
02051
02052
02053 TransactionIdCommitTree(xid, nchildren, children);
02054
02055
02056 MyPgXact->delayChkpt = false;
02057
02058 END_CRIT_SECTION();
02059
02060
02061
02062
02063
02064
02065
02066 SyncRepWaitForLSN(recptr);
02067 }
02068
02069
02070
02071
02072
02073
02074
02075
02076
02077 static void
02078 RecordTransactionAbortPrepared(TransactionId xid,
02079 int nchildren,
02080 TransactionId *children,
02081 int nrels,
02082 RelFileNode *rels)
02083 {
02084 XLogRecData rdata[3];
02085 int lastrdata = 0;
02086 xl_xact_abort_prepared xlrec;
02087 XLogRecPtr recptr;
02088
02089
02090
02091
02092
02093 if (TransactionIdDidCommit(xid))
02094 elog(PANIC, "cannot abort transaction %u, it was already committed",
02095 xid);
02096
02097 START_CRIT_SECTION();
02098
02099
02100 xlrec.xid = xid;
02101 xlrec.arec.xact_time = GetCurrentTimestamp();
02102 xlrec.arec.nrels = nrels;
02103 xlrec.arec.nsubxacts = nchildren;
02104 rdata[0].data = (char *) (&xlrec);
02105 rdata[0].len = MinSizeOfXactAbortPrepared;
02106 rdata[0].buffer = InvalidBuffer;
02107
02108 if (nrels > 0)
02109 {
02110 rdata[0].next = &(rdata[1]);
02111 rdata[1].data = (char *) rels;
02112 rdata[1].len = nrels * sizeof(RelFileNode);
02113 rdata[1].buffer = InvalidBuffer;
02114 lastrdata = 1;
02115 }
02116
02117 if (nchildren > 0)
02118 {
02119 rdata[lastrdata].next = &(rdata[2]);
02120 rdata[2].data = (char *) children;
02121 rdata[2].len = nchildren * sizeof(TransactionId);
02122 rdata[2].buffer = InvalidBuffer;
02123 lastrdata = 2;
02124 }
02125 rdata[lastrdata].next = NULL;
02126
02127 recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
02128
02129
02130 XLogFlush(recptr);
02131
02132
02133
02134
02135
02136 TransactionIdAbortTree(xid, nchildren, children);
02137
02138 END_CRIT_SECTION();
02139
02140
02141
02142
02143
02144
02145
02146 SyncRepWaitForLSN(recptr);
02147 }