Header And Logo

PostgreSQL
| The world's most advanced open source database.

Typedefs | Functions | Variables

twophase.h File Reference

#include "access/xlogdefs.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
Include dependency graph for twophase.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Typedefs

typedef struct
GlobalTransactionData
GlobalTransaction

Functions

Size TwoPhaseShmemSize (void)
void TwoPhaseShmemInit (void)
PGPROCTwoPhaseGetDummyProc (TransactionId xid)
BackendId TwoPhaseGetDummyBackendId (TransactionId xid)
GlobalTransaction MarkAsPreparing (TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid)
void StartPrepare (GlobalTransaction gxact)
void EndPrepare (GlobalTransaction gxact)
bool StandbyTransactionIdIsPrepared (TransactionId xid)
TransactionId PrescanPreparedTransactions (TransactionId **xids_p, int *nxids_p)
void StandbyRecoverPreparedTransactions (bool overwriteOK)
void RecoverPreparedTransactions (void)
void RecreateTwoPhaseFile (TransactionId xid, void *content, int len)
void RemoveTwoPhaseFile (TransactionId xid, bool giveWarning)
void CheckPointTwoPhase (XLogRecPtr redo_horizon)
void FinishPreparedTransaction (const char *gid, bool isCommit)

Variables

int max_prepared_xacts

Typedef Documentation

Definition at line 25 of file twophase.h.


Function Documentation

void CheckPointTwoPhase ( XLogRecPtr  redo_horizon  ) 

Definition at line 1528 of file twophase.c.

References PROC_HDR::allPgXact, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, i, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_prepared_xacts, TwoPhaseStateData::numPrepXacts, OpenTransientFile(), palloc(), pfree(), PG_BINARY, pg_fsync(), GlobalTransactionData::pgprocno, GlobalTransactionData::prepare_lsn, TwoPhaseStateData::prepXacts, ProcGlobal, TransactionIdIsPrepared(), TwoPhaseFilePath, TwoPhaseStateLock, GlobalTransactionData::valid, and PGXACT::xid.

Referenced by CheckPointGuts().

{
    TransactionId *xids;
    int         nxids;
    char        path[MAXPGPATH];
    int         i;

    /*
     * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
     * it just long enough to make a list of the XIDs that require fsyncing,
     * and then do the I/O afterwards.
     *
     * This approach creates a race condition: someone else could delete a
     * GXACT between the time we release TwoPhaseStateLock and the time we try
     * to open its state file.  We handle this by special-casing ENOENT
     * failures: if we see that, we verify that the GXACT is no longer valid,
     * and if so ignore the failure.
     */
    if (max_prepared_xacts <= 0)
        return;                 /* nothing to do */

    TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();

    xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
    nxids = 0;

    LWLockAcquire(TwoPhaseStateLock, LW_SHARED);

    for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    {
        GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
        PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];

        if (gxact->valid &&
            gxact->prepare_lsn <= redo_horizon)
            xids[nxids++] = pgxact->xid;
    }

    LWLockRelease(TwoPhaseStateLock);

    for (i = 0; i < nxids; i++)
    {
        TransactionId xid = xids[i];
        int         fd;

        TwoPhaseFilePath(path, xid);

        fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
        if (fd < 0)
        {
            if (errno == ENOENT)
            {
                /* OK if gxact is no longer valid */
                if (!TransactionIdIsPrepared(xid))
                    continue;
                /* Restore errno in case it was changed */
                errno = ENOENT;
            }
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not open two-phase state file \"%s\": %m",
                            path)));
        }

        if (pg_fsync(fd) != 0)
        {
            CloseTransientFile(fd);
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not fsync two-phase state file \"%s\": %m",
                            path)));
        }

        if (CloseTransientFile(fd) != 0)
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not close two-phase state file \"%s\": %m",
                            path)));
    }

    pfree(xids);

    TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
}

void EndPrepare ( GlobalTransaction  gxact  ) 

Definition at line 942 of file twophase.c.

References PROC_HDR::allPgXact, Assert, CloseTransientFile(), COMP_CRC32, XLogRecData::data, PGXACT::delayChkpt, END_CRIT_SECTION, ereport, errcode(), errcode_for_file_access(), errmsg(), ERROR, FIN_CRC32, xllist::head, INIT_CRC32, XLogRecData::len, TwoPhaseFileHeader::magic, MarkAsPrepared(), MaxAllocSize, MyPgXact, XLogRecData::next, NULL, OpenTransientFile(), PG_BINARY, GlobalTransactionData::pgprocno, GlobalTransactionData::prepare_lsn, ProcGlobal, records, RegisterTwoPhaseRecord(), START_CRIT_SECTION, SyncRepWaitForLSN(), xllist::tail, xllist::total_len, TwoPhaseFileHeader::total_len, TWOPHASE_MAGIC, TWOPHASE_RM_END_ID, TwoPhaseFilePath, write, PGXACT::xid, XLOG_XACT_PREPARE, XLogFlush(), and XLogInsert().

Referenced by PrepareTransaction().

{
    PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
    TransactionId xid = pgxact->xid;
    TwoPhaseFileHeader *hdr;
    char        path[MAXPGPATH];
    XLogRecData *record;
    pg_crc32    statefile_crc;
    pg_crc32    bogus_crc;
    int         fd;

    /* Add the end sentinel to the list of 2PC records */
    RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
                           NULL, 0);

    /* Go back and fill in total_len in the file header record */
    hdr = (TwoPhaseFileHeader *) records.head->data;
    Assert(hdr->magic == TWOPHASE_MAGIC);
    hdr->total_len = records.total_len + sizeof(pg_crc32);

    /*
     * If the file size exceeds MaxAllocSize, we won't be able to read it in
     * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
     */
    if (hdr->total_len > MaxAllocSize)
        ereport(ERROR,
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("two-phase state file maximum length exceeded")));

    /*
     * Create the 2PC state file.
     */
    TwoPhaseFilePath(path, xid);

    fd = OpenTransientFile(path,
                           O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
                           S_IRUSR | S_IWUSR);
    if (fd < 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not create two-phase state file \"%s\": %m",
                        path)));

    /* Write data to file, and calculate CRC as we pass over it */
    INIT_CRC32(statefile_crc);

    for (record = records.head; record != NULL; record = record->next)
    {
        COMP_CRC32(statefile_crc, record->data, record->len);
        if ((write(fd, record->data, record->len)) != record->len)
        {
            CloseTransientFile(fd);
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not write two-phase state file: %m")));
        }
    }

    FIN_CRC32(statefile_crc);

    /*
     * Write a deliberately bogus CRC to the state file; this is just paranoia
     * to catch the case where four more bytes will run us out of disk space.
     */
    bogus_crc = ~statefile_crc;

    if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not write two-phase state file: %m")));
    }

    /* Back up to prepare for rewriting the CRC */
    if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not seek in two-phase state file: %m")));
    }

    /*
     * The state file isn't valid yet, because we haven't written the correct
     * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
     *
     * Between the time we have written the WAL entry and the time we write
     * out the correct state file CRC, we have an inconsistency: the xact is
     * prepared according to WAL but not according to our on-disk state. We
     * use a critical section to force a PANIC if we are unable to complete
     * the write --- then, WAL replay should repair the inconsistency.  The
     * odds of a PANIC actually occurring should be very tiny given that we
     * were able to write the bogus CRC above.
     *
     * We have to set delayChkpt here, too; otherwise a checkpoint starting
     * immediately after the WAL record is inserted could complete without
     * fsync'ing our state file.  (This is essentially the same kind of race
     * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
     * uses delayChkpt for; see notes there.)
     *
     * We save the PREPARE record's location in the gxact for later use by
     * CheckPointTwoPhase.
     */
    START_CRIT_SECTION();

    MyPgXact->delayChkpt = true;

    gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
                                    records.head);
    XLogFlush(gxact->prepare_lsn);

    /* If we crash now, we have prepared: WAL replay will fix things */

    /* write correct CRC and close file */
    if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not write two-phase state file: %m")));
    }

    if (CloseTransientFile(fd) != 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not close two-phase state file: %m")));

    /*
     * Mark the prepared transaction as valid.  As soon as xact.c marks
     * MyPgXact as not running our XID (which it will do immediately after
     * this function returns), others can commit/rollback the xact.
     *
     * NB: a side effect of this is to make a dummy ProcArray entry for the
     * prepared XID.  This must happen before we clear the XID from MyPgXact,
     * else there is a window where the XID is not running according to
     * TransactionIdIsInProgress, and onlookers would be entitled to assume
     * the xact crashed.  Instead we have a window where the same XID appears
     * twice in ProcArray, which is OK.
     */
    MarkAsPrepared(gxact);

    /*
     * Now we can mark ourselves as out of the commit critical section: a
     * checkpoint starting after this will certainly see the gxact as a
     * candidate for fsyncing.
     */
    MyPgXact->delayChkpt = false;

    END_CRIT_SECTION();

    /*
     * Wait for synchronous replication, if required.
     *
     * Note that at this stage we have marked the prepare, but still show as
     * running in the procarray (twice!) and continue to hold locks.
     */
    SyncRepWaitForLSN(gxact->prepare_lsn);

    records.tail = records.head = NULL;
}

void FinishPreparedTransaction ( const char *  gid,
bool  isCommit 
)

Definition at line 1259 of file twophase.c.

References PROC_HDR::allPgXact, PROC_HDR::allProcs, Assert, AtEOXact_PgStat(), buf, ereport, errcode(), errmsg(), ERROR, GetUserId(), i, TwoPhaseFileHeader::initfileinval, InvalidBackendId, LockGXact(), MAXALIGN, TwoPhaseFileHeader::nabortrels, TwoPhaseFileHeader::ncommitrels, TwoPhaseFileHeader::ninvalmsgs, TwoPhaseFileHeader::nsubxacts, NULL, pfree(), GlobalTransactionData::pgprocno, PredicateLockTwoPhaseFinish(), ProcArrayRemove(), ProcessRecords(), ProcGlobal, ReadTwoPhaseFile(), RecordTransactionAbortPrepared(), RecordTransactionCommitPrepared(), RelationCacheInitFilePostInvalidate(), RelationCacheInitFilePreInvalidate(), RemoveGXact(), RemoveTwoPhaseFile(), SendSharedInvalidMessages(), smgrclose(), smgrdounlink(), smgropen(), TransactionIdEquals, TransactionIdLatest(), twophase_postabort_callbacks, twophase_postcommit_callbacks, GlobalTransactionData::valid, TwoPhaseFileHeader::xid, and PGXACT::xid.

Referenced by standard_ProcessUtility().

{
    GlobalTransaction gxact;
    PGPROC     *proc;
    PGXACT     *pgxact;
    TransactionId xid;
    char       *buf;
    char       *bufptr;
    TwoPhaseFileHeader *hdr;
    TransactionId latestXid;
    TransactionId *children;
    RelFileNode *commitrels;
    RelFileNode *abortrels;
    RelFileNode *delrels;
    int         ndelrels;
    SharedInvalidationMessage *invalmsgs;
    int         i;

    /*
     * Validate the GID, and lock the GXACT to ensure that two backends do not
     * try to commit the same GID at once.
     */
    gxact = LockGXact(gid, GetUserId());
    proc = &ProcGlobal->allProcs[gxact->pgprocno];
    pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
    xid = pgxact->xid;

    /*
     * Read and validate the state file
     */
    buf = ReadTwoPhaseFile(xid, true);
    if (buf == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_DATA_CORRUPTED),
                 errmsg("two-phase state file for transaction %u is corrupt",
                        xid)));

    /*
     * Disassemble the header area
     */
    hdr = (TwoPhaseFileHeader *) buf;
    Assert(TransactionIdEquals(hdr->xid, xid));
    bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
    children = (TransactionId *) bufptr;
    bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
    commitrels = (RelFileNode *) bufptr;
    bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
    abortrels = (RelFileNode *) bufptr;
    bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
    invalmsgs = (SharedInvalidationMessage *) bufptr;
    bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));

    /* compute latestXid among all children */
    latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);

    /*
     * The order of operations here is critical: make the XLOG entry for
     * commit or abort, then mark the transaction committed or aborted in
     * pg_clog, then remove its PGPROC from the global ProcArray (which means
     * TransactionIdIsInProgress will stop saying the prepared xact is in
     * progress), then run the post-commit or post-abort callbacks. The
     * callbacks will release the locks the transaction held.
     */
    if (isCommit)
        RecordTransactionCommitPrepared(xid,
                                        hdr->nsubxacts, children,
                                        hdr->ncommitrels, commitrels,
                                        hdr->ninvalmsgs, invalmsgs,
                                        hdr->initfileinval);
    else
        RecordTransactionAbortPrepared(xid,
                                       hdr->nsubxacts, children,
                                       hdr->nabortrels, abortrels);

    ProcArrayRemove(proc, latestXid);

    /*
     * In case we fail while running the callbacks, mark the gxact invalid so
     * no one else will try to commit/rollback, and so it can be recycled
     * properly later.  It is still locked by our XID so it won't go away yet.
     *
     * (We assume it's safe to do this without taking TwoPhaseStateLock.)
     */
    gxact->valid = false;

    /*
     * We have to remove any files that were supposed to be dropped. For
     * consistency with the regular xact.c code paths, must do this before
     * releasing locks, so do it before running the callbacks.
     *
     * NB: this code knows that we couldn't be dropping any temp rels ...
     */
    if (isCommit)
    {
        delrels = commitrels;
        ndelrels = hdr->ncommitrels;
    }
    else
    {
        delrels = abortrels;
        ndelrels = hdr->nabortrels;
    }
    for (i = 0; i < ndelrels; i++)
    {
        SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);

        smgrdounlink(srel, false);
        smgrclose(srel);
    }

    /*
     * Handle cache invalidation messages.
     *
     * Relcache init file invalidation requires processing both before and
     * after we send the SI messages. See AtEOXact_Inval()
     */
    if (hdr->initfileinval)
        RelationCacheInitFilePreInvalidate();
    SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
    if (hdr->initfileinval)
        RelationCacheInitFilePostInvalidate();

    /* And now do the callbacks */
    if (isCommit)
        ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
    else
        ProcessRecords(bufptr, xid, twophase_postabort_callbacks);

    PredicateLockTwoPhaseFinish(xid, isCommit);

    /* Count the prepared xact as committed or aborted */
    AtEOXact_PgStat(isCommit);

    /*
     * And now we can clean up our mess.
     */
    RemoveTwoPhaseFile(xid, true);

    RemoveGXact(gxact);

    pfree(buf);
}

GlobalTransaction MarkAsPreparing ( TransactionId  xid,
const char *  gid,
TimestampTz  prepared_at,
Oid  owner,
Oid  databaseid 
)

Definition at line 242 of file twophase.c.

References PROC_HDR::allPgXact, PROC_HDR::allProcs, Assert, PGPROC::backendId, PGPROC::databaseId, PGXACT::delayChkpt, ereport, errcode(), errhint(), errmsg(), ERROR, TwoPhaseStateData::freeGXacts, GlobalTransactionData::gid, GIDSIZE, i, PGPROC::links, GlobalTransactionData::locking_xid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), PGPROC::lwWaiting, PGPROC::lwWaitLink, PGPROC::lwWaitMode, PGPROC::lxid, max_prepared_xacts, MemSet, PGPROC::myProcLocks, GlobalTransactionData::next, NULL, TwoPhaseStateData::numPrepXacts, PGXACT::nxids, PGXACT::overflowed, GlobalTransactionData::owner, PGPROC::pgprocno, GlobalTransactionData::pgprocno, PGPROC::pid, GlobalTransactionData::prepare_lsn, GlobalTransactionData::prepared_at, TwoPhaseStateData::prepXacts, ProcGlobal, PGPROC::roleId, SHMQueueElemInit(), SHMQueueInit(), TransactionIdIsActive(), TwoPhaseStateLock, PGXACT::vacuumFlags, GlobalTransactionData::valid, PGPROC::waitLock, PGPROC::waitProcLock, PGPROC::waitStatus, PGXACT::xid, and PGXACT::xmin.

Referenced by PrepareTransaction(), and RecoverPreparedTransactions().

{
    GlobalTransaction gxact;
    PGPROC     *proc;
    PGXACT     *pgxact;
    int         i;

    if (strlen(gid) >= GIDSIZE)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("transaction identifier \"%s\" is too long",
                        gid)));

    /* fail immediately if feature is disabled */
    if (max_prepared_xacts == 0)
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("prepared transactions are disabled"),
              errhint("Set max_prepared_transactions to a nonzero value.")));

    LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);

    /*
     * First, find and recycle any gxacts that failed during prepare. We do
     * this partly to ensure we don't mistakenly say their GIDs are still
     * reserved, and partly so we don't fail on out-of-slots unnecessarily.
     */
    for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    {
        gxact = TwoPhaseState->prepXacts[i];
        if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
        {
            /* It's dead Jim ... remove from the active array */
            TwoPhaseState->numPrepXacts--;
            TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
            /* and put it back in the freelist */
            gxact->next = TwoPhaseState->freeGXacts;
            TwoPhaseState->freeGXacts = gxact;
            /* Back up index count too, so we don't miss scanning one */
            i--;
        }
    }

    /* Check for conflicting GID */
    for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    {
        gxact = TwoPhaseState->prepXacts[i];
        if (strcmp(gxact->gid, gid) == 0)
        {
            ereport(ERROR,
                    (errcode(ERRCODE_DUPLICATE_OBJECT),
                     errmsg("transaction identifier \"%s\" is already in use",
                            gid)));
        }
    }

    /* Get a free gxact from the freelist */
    if (TwoPhaseState->freeGXacts == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("maximum number of prepared transactions reached"),
                 errhint("Increase max_prepared_transactions (currently %d).",
                         max_prepared_xacts)));
    gxact = TwoPhaseState->freeGXacts;
    TwoPhaseState->freeGXacts = gxact->next;

    proc = &ProcGlobal->allProcs[gxact->pgprocno];
    pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];

    /* Initialize the PGPROC entry */
    MemSet(proc, 0, sizeof(PGPROC));
    proc->pgprocno = gxact->pgprocno;
    SHMQueueElemInit(&(proc->links));
    proc->waitStatus = STATUS_OK;
    /* We set up the gxact's VXID as InvalidBackendId/XID */
    proc->lxid = (LocalTransactionId) xid;
    pgxact->xid = xid;
    pgxact->xmin = InvalidTransactionId;
    pgxact->delayChkpt = false;
    pgxact->vacuumFlags = 0;
    proc->pid = 0;
    proc->backendId = InvalidBackendId;
    proc->databaseId = databaseid;
    proc->roleId = owner;
    proc->lwWaiting = false;
    proc->lwWaitMode = 0;
    proc->lwWaitLink = NULL;
    proc->waitLock = NULL;
    proc->waitProcLock = NULL;
    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
        SHMQueueInit(&(proc->myProcLocks[i]));
    /* subxid data must be filled later by GXactLoadSubxactData */
    pgxact->overflowed = false;
    pgxact->nxids = 0;

    gxact->prepared_at = prepared_at;
    /* initialize LSN to 0 (start of WAL) */
    gxact->prepare_lsn = 0;
    gxact->owner = owner;
    gxact->locking_xid = xid;
    gxact->valid = false;
    strcpy(gxact->gid, gid);

    /* And insert it into the active array */
    Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
    TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;

    LWLockRelease(TwoPhaseStateLock);

    return gxact;
}

TransactionId PrescanPreparedTransactions ( TransactionId **  xids_p,
int *  nxids_p 
)

Definition at line 1641 of file twophase.c.

References AllocateDir(), Assert, buf, dirent::d_name, ereport, errmsg(), FreeDir(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXALIGN, VariableCacheData::nextXid, TwoPhaseFileHeader::nsubxacts, NULL, palloc(), pfree(), ReadDir(), ReadTwoPhaseFile(), RemoveTwoPhaseFile(), repalloc(), ShmemVariableCache, TransactionIdAdvance, TransactionIdEquals, TransactionIdFollows(), TransactionIdFollowsOrEquals(), TransactionIdPrecedes(), TWOPHASE_DIR, WARNING, TwoPhaseFileHeader::xid, and XidGenLock.

Referenced by StartupXLOG(), and xlog_redo().

{
    TransactionId origNextXid = ShmemVariableCache->nextXid;
    TransactionId result = origNextXid;
    DIR        *cldir;
    struct dirent *clde;
    TransactionId *xids = NULL;
    int         nxids = 0;
    int         allocsize = 0;

    cldir = AllocateDir(TWOPHASE_DIR);
    while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
    {
        if (strlen(clde->d_name) == 8 &&
            strspn(clde->d_name, "0123456789ABCDEF") == 8)
        {
            TransactionId xid;
            char       *buf;
            TwoPhaseFileHeader *hdr;
            TransactionId *subxids;
            int         i;

            xid = (TransactionId) strtoul(clde->d_name, NULL, 16);

            /* Reject XID if too new */
            if (TransactionIdFollowsOrEquals(xid, origNextXid))
            {
                ereport(WARNING,
                        (errmsg("removing future two-phase state file \"%s\"",
                                clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /*
             * Note: we can't check if already processed because clog
             * subsystem isn't up yet.
             */

            /* Read and validate file */
            buf = ReadTwoPhaseFile(xid, true);
            if (buf == NULL)
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /* Deconstruct header */
            hdr = (TwoPhaseFileHeader *) buf;
            if (!TransactionIdEquals(hdr->xid, xid))
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                pfree(buf);
                continue;
            }

            /*
             * OK, we think this file is valid.  Incorporate xid into the
             * running-minimum result.
             */
            if (TransactionIdPrecedes(xid, result))
                result = xid;

            /*
             * Examine subtransaction XIDs ... they should all follow main
             * XID, and they may force us to advance nextXid.
             *
             * We don't expect anyone else to modify nextXid, hence we don't
             * need to hold a lock while examining it.  We still acquire the
             * lock to modify it, though.
             */
            subxids = (TransactionId *)
                (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
            for (i = 0; i < hdr->nsubxacts; i++)
            {
                TransactionId subxid = subxids[i];

                Assert(TransactionIdFollows(subxid, xid));
                if (TransactionIdFollowsOrEquals(subxid,
                                                 ShmemVariableCache->nextXid))
                {
                    LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
                    ShmemVariableCache->nextXid = subxid;
                    TransactionIdAdvance(ShmemVariableCache->nextXid);
                    LWLockRelease(XidGenLock);
                }
            }


            if (xids_p)
            {
                if (nxids == allocsize)
                {
                    if (nxids == 0)
                    {
                        allocsize = 10;
                        xids = palloc(allocsize * sizeof(TransactionId));
                    }
                    else
                    {
                        allocsize = allocsize * 2;
                        xids = repalloc(xids, allocsize * sizeof(TransactionId));
                    }
                }
                xids[nxids++] = xid;
            }

            pfree(buf);
        }
    }
    FreeDir(cldir);

    if (xids_p)
    {
        *xids_p = xids;
        *nxids_p = nxids;
    }

    return result;
}

void RecoverPreparedTransactions ( void   ) 

Definition at line 1859 of file twophase.c.

References AllocateDir(), Assert, buf, dirent::d_name, TwoPhaseFileHeader::database, ereport, errmsg(), FreeDir(), TwoPhaseFileHeader::gid, GXactLoadSubxactData(), i, InHotStandby, LOG, MarkAsPrepared(), MarkAsPreparing(), MAXALIGN, MAXPGPATH, TwoPhaseFileHeader::nabortrels, TwoPhaseFileHeader::ncommitrels, TwoPhaseFileHeader::ninvalmsgs, TwoPhaseFileHeader::nsubxacts, NULL, TwoPhaseFileHeader::owner, pfree(), PGPROC_MAX_CACHED_SUBXIDS, TwoPhaseFileHeader::prepared_at, ProcessRecords(), ReadDir(), ReadTwoPhaseFile(), RemoveTwoPhaseFile(), snprintf(), StandbyReleaseLockTree(), SubTransSetParent(), TransactionIdDidAbort(), TransactionIdDidCommit(), TransactionIdEquals, TWOPHASE_DIR, twophase_recover_callbacks, WARNING, and TwoPhaseFileHeader::xid.

Referenced by StartupXLOG().

{
    char        dir[MAXPGPATH];
    DIR        *cldir;
    struct dirent *clde;
    bool        overwriteOK = false;

    snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);

    cldir = AllocateDir(dir);
    while ((clde = ReadDir(cldir, dir)) != NULL)
    {
        if (strlen(clde->d_name) == 8 &&
            strspn(clde->d_name, "0123456789ABCDEF") == 8)
        {
            TransactionId xid;
            char       *buf;
            char       *bufptr;
            TwoPhaseFileHeader *hdr;
            TransactionId *subxids;
            GlobalTransaction gxact;
            int         i;

            xid = (TransactionId) strtoul(clde->d_name, NULL, 16);

            /* Already processed? */
            if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
            {
                ereport(WARNING,
                        (errmsg("removing stale two-phase state file \"%s\"",
                                clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /* Read and validate file */
            buf = ReadTwoPhaseFile(xid, true);
            if (buf == NULL)
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            ereport(LOG,
                    (errmsg("recovering prepared transaction %u", xid)));

            /* Deconstruct header */
            hdr = (TwoPhaseFileHeader *) buf;
            Assert(TransactionIdEquals(hdr->xid, xid));
            bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
            subxids = (TransactionId *) bufptr;
            bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
            bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
            bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
            bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));

            /*
             * It's possible that SubTransSetParent has been set before, if
             * the prepared transaction generated xid assignment records. Test
             * here must match one used in AssignTransactionId().
             */
            if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
                overwriteOK = true;

            /*
             * Reconstruct subtrans state for the transaction --- needed
             * because pg_subtrans is not preserved over a restart.  Note that
             * we are linking all the subtransactions directly to the
             * top-level XID; there may originally have been a more complex
             * hierarchy, but there's no need to restore that exactly.
             */
            for (i = 0; i < hdr->nsubxacts; i++)
                SubTransSetParent(subxids[i], xid, overwriteOK);

            /*
             * Recreate its GXACT and dummy PGPROC
             *
             * Note: since we don't have the PREPARE record's WAL location at
             * hand, we leave prepare_lsn zeroes.  This means the GXACT will
             * be fsync'd on every future checkpoint.  We assume this
             * situation is infrequent enough that the performance cost is
             * negligible (especially since we know the state file has already
             * been fsynced).
             */
            gxact = MarkAsPreparing(xid, hdr->gid,
                                    hdr->prepared_at,
                                    hdr->owner, hdr->database);
            GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
            MarkAsPrepared(gxact);

            /*
             * Recover other state (notably locks) using resource managers
             */
            ProcessRecords(bufptr, xid, twophase_recover_callbacks);

            /*
             * Release locks held by the standby process after we process each
             * prepared transaction. As a result, we don't need too many
             * additional locks at any one time.
             */
            if (InHotStandby)
                StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);

            pfree(buf);
        }
    }
    FreeDir(cldir);
}

void RecreateTwoPhaseFile ( TransactionId  xid,
void *  content,
int  len 
)

Definition at line 1454 of file twophase.c.

References CloseTransientFile(), COMP_CRC32, ereport, errcode_for_file_access(), errmsg(), ERROR, FIN_CRC32, INIT_CRC32, OpenTransientFile(), PG_BINARY, pg_fsync(), TwoPhaseFilePath, and write.

Referenced by xact_redo().

{
    char        path[MAXPGPATH];
    pg_crc32    statefile_crc;
    int         fd;

    /* Recompute CRC */
    INIT_CRC32(statefile_crc);
    COMP_CRC32(statefile_crc, content, len);
    FIN_CRC32(statefile_crc);

    TwoPhaseFilePath(path, xid);

    fd = OpenTransientFile(path,
                           O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
                           S_IRUSR | S_IWUSR);
    if (fd < 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not recreate two-phase state file \"%s\": %m",
                        path)));

    /* Write content and CRC */
    if (write(fd, content, len) != len)
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not write two-phase state file: %m")));
    }
    if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not write two-phase state file: %m")));
    }

    /*
     * We must fsync the file because the end-of-replay checkpoint will not do
     * so, there being no GXACT in shared memory yet to tell it to.
     */
    if (pg_fsync(fd) != 0)
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not fsync two-phase state file: %m")));
    }

    if (CloseTransientFile(fd) != 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not close two-phase state file: %m")));
}

void RemoveTwoPhaseFile ( TransactionId  xid,
bool  giveWarning 
)

Definition at line 1435 of file twophase.c.

References ereport, errcode_for_file_access(), errmsg(), TwoPhaseFilePath, unlink(), and WARNING.

Referenced by FinishPreparedTransaction(), PrescanPreparedTransactions(), RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions(), and xact_redo().

{
    char        path[MAXPGPATH];

    TwoPhaseFilePath(path, xid);
    if (unlink(path))
        if (errno != ENOENT || giveWarning)
            ereport(WARNING,
                    (errcode_for_file_access(),
                   errmsg("could not remove two-phase state file \"%s\": %m",
                          path)));
}

void StandbyRecoverPreparedTransactions ( bool  overwriteOK  ) 

Definition at line 1781 of file twophase.c.

References AllocateDir(), Assert, buf, dirent::d_name, ereport, errmsg(), FreeDir(), i, MAXALIGN, TwoPhaseFileHeader::nsubxacts, NULL, pfree(), ReadDir(), ReadTwoPhaseFile(), RemoveTwoPhaseFile(), SubTransSetParent(), TransactionIdDidAbort(), TransactionIdDidCommit(), TransactionIdEquals, TransactionIdFollows(), TWOPHASE_DIR, WARNING, and TwoPhaseFileHeader::xid.

Referenced by StartupXLOG(), and xlog_redo().

{
    DIR        *cldir;
    struct dirent *clde;

    cldir = AllocateDir(TWOPHASE_DIR);
    while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
    {
        if (strlen(clde->d_name) == 8 &&
            strspn(clde->d_name, "0123456789ABCDEF") == 8)
        {
            TransactionId xid;
            char       *buf;
            TwoPhaseFileHeader *hdr;
            TransactionId *subxids;
            int         i;

            xid = (TransactionId) strtoul(clde->d_name, NULL, 16);

            /* Already processed? */
            if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
            {
                ereport(WARNING,
                        (errmsg("removing stale two-phase state file \"%s\"",
                                clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /* Read and validate file */
            buf = ReadTwoPhaseFile(xid, true);
            if (buf == NULL)
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /* Deconstruct header */
            hdr = (TwoPhaseFileHeader *) buf;
            if (!TransactionIdEquals(hdr->xid, xid))
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                pfree(buf);
                continue;
            }

            /*
             * Examine subtransaction XIDs ... they should all follow main
             * XID.
             */
            subxids = (TransactionId *)
                (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
            for (i = 0; i < hdr->nsubxacts; i++)
            {
                TransactionId subxid = subxids[i];

                Assert(TransactionIdFollows(subxid, xid));
                SubTransSetParent(xid, subxid, overwriteOK);
            }
        }
    }
    FreeDir(cldir);
}

bool StandbyTransactionIdIsPrepared ( TransactionId  xid  ) 

Definition at line 1231 of file twophase.c.

References Assert, buf, max_prepared_xacts, NULL, pfree(), ReadTwoPhaseFile(), TransactionIdEquals, TransactionIdIsValid, and TwoPhaseFileHeader::xid.

Referenced by KnownAssignedXidsRemovePreceding(), and StandbyReleaseOldLocks().

{
    char       *buf;
    TwoPhaseFileHeader *hdr;
    bool        result;

    Assert(TransactionIdIsValid(xid));

    if (max_prepared_xacts <= 0)
        return false;           /* nothing to do */

    /* Read and validate file */
    buf = ReadTwoPhaseFile(xid, false);
    if (buf == NULL)
        return false;

    /* Check header also */
    hdr = (TwoPhaseFileHeader *) buf;
    result = TransactionIdEquals(hdr->xid, xid);
    pfree(buf);

    return result;
}

void StartPrepare ( GlobalTransaction  gxact  ) 

Definition at line 868 of file twophase.c.

References PROC_HDR::allPgXact, PROC_HDR::allProcs, XLogRecData::buffer, xllist::bytes_free, XLogRecData::data, TwoPhaseFileHeader::database, PGPROC::databaseId, GlobalTransactionData::gid, TwoPhaseFileHeader::gid, GIDSIZE, GXactLoadSubxactData(), xllist::head, TwoPhaseFileHeader::initfileinval, XLogRecData::len, TwoPhaseFileHeader::magic, Max, TwoPhaseFileHeader::nabortrels, TwoPhaseFileHeader::ncommitrels, XLogRecData::next, TwoPhaseFileHeader::ninvalmsgs, TwoPhaseFileHeader::nsubxacts, GlobalTransactionData::owner, TwoPhaseFileHeader::owner, palloc(), palloc0(), pfree(), GlobalTransactionData::pgprocno, GlobalTransactionData::prepared_at, TwoPhaseFileHeader::prepared_at, ProcGlobal, records, save_state_data(), smgrGetPendingDeletes(), StrNCpy, xllist::tail, TwoPhaseFileHeader::total_len, xllist::total_len, xactGetCommittedChildren(), xactGetCommittedInvalidationMessages(), TwoPhaseFileHeader::xid, and PGXACT::xid.

Referenced by PrepareTransaction().

{
    PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
    PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
    TransactionId xid = pgxact->xid;
    TwoPhaseFileHeader hdr;
    TransactionId *children;
    RelFileNode *commitrels;
    RelFileNode *abortrels;
    SharedInvalidationMessage *invalmsgs;

    /* Initialize linked list */
    records.head = palloc0(sizeof(XLogRecData));
    records.head->buffer = InvalidBuffer;
    records.head->len = 0;
    records.head->next = NULL;

    records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
    records.head->data = palloc(records.bytes_free);

    records.tail = records.head;

    records.total_len = 0;

    /* Create header */
    hdr.magic = TWOPHASE_MAGIC;
    hdr.total_len = 0;          /* EndPrepare will fill this in */
    hdr.xid = xid;
    hdr.database = proc->databaseId;
    hdr.prepared_at = gxact->prepared_at;
    hdr.owner = gxact->owner;
    hdr.nsubxacts = xactGetCommittedChildren(&children);
    hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
    hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
    hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
                                                          &hdr.initfileinval);
    StrNCpy(hdr.gid, gxact->gid, GIDSIZE);

    save_state_data(&hdr, sizeof(TwoPhaseFileHeader));

    /*
     * Add the additional info about subxacts, deletable files and cache
     * invalidation messages.
     */
    if (hdr.nsubxacts > 0)
    {
        save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
        /* While we have the child-xact data, stuff it in the gxact too */
        GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
    }
    if (hdr.ncommitrels > 0)
    {
        save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
        pfree(commitrels);
    }
    if (hdr.nabortrels > 0)
    {
        save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
        pfree(abortrels);
    }
    if (hdr.ninvalmsgs > 0)
    {
        save_state_data(invalmsgs,
                        hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
        pfree(invalmsgs);
    }
}

BackendId TwoPhaseGetDummyBackendId ( TransactionId  xid  ) 
PGPROC* TwoPhaseGetDummyProc ( TransactionId  xid  ) 
void TwoPhaseShmemInit ( void   ) 

Definition at line 181 of file twophase.c.

References Assert, GlobalTransactionData::dummyBackendId, TwoPhaseStateData::freeGXacts, i, IsUnderPostmaster, max_prepared_xacts, MAXALIGN, MaxBackends, GlobalTransactionData::next, TwoPhaseStateData::numPrepXacts, offsetof, PGPROC::pgprocno, GlobalTransactionData::pgprocno, PreparedXactProcs, ShmemInitStruct(), and TwoPhaseShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

{
    bool        found;

    TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
                                    TwoPhaseShmemSize(),
                                    &found);
    if (!IsUnderPostmaster)
    {
        GlobalTransaction gxacts;
        int         i;

        Assert(!found);
        TwoPhaseState->freeGXacts = NULL;
        TwoPhaseState->numPrepXacts = 0;

        /*
         * Initialize the linked list of free GlobalTransactionData structs
         */
        gxacts = (GlobalTransaction)
            ((char *) TwoPhaseState +
             MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
                      sizeof(GlobalTransaction) * max_prepared_xacts));
        for (i = 0; i < max_prepared_xacts; i++)
        {
            /* insert into linked list */
            gxacts[i].next = TwoPhaseState->freeGXacts;
            TwoPhaseState->freeGXacts = &gxacts[i];

            /* associate it with a PGPROC assigned by InitProcGlobal */
            gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;

            /*
             * Assign a unique ID for each dummy proc, so that the range of
             * dummy backend IDs immediately follows the range of normal
             * backend IDs. We don't dare to assign a real backend ID to dummy
             * procs, because prepared transactions don't take part in cache
             * invalidation like a real backend ID would imply, but having a
             * unique ID for them is nevertheless handy. This arrangement
             * allows you to allocate an array of size (MaxBackends +
             * max_prepared_xacts + 1), and have a slot for every backend and
             * prepared transaction. Currently multixact.c uses that
             * technique.
             */
            gxacts[i].dummyBackendId = MaxBackends + 1 + i;
        }
    }
    else
        Assert(found);
}

Size TwoPhaseShmemSize ( void   ) 

Definition at line 165 of file twophase.c.

References add_size(), max_prepared_xacts, MAXALIGN, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and TwoPhaseShmemInit().

{
    Size        size;

    /* Need the fixed struct, the array of pointers, and the GTD structs */
    size = offsetof(TwoPhaseStateData, prepXacts);
    size = add_size(size, mul_size(max_prepared_xacts,
                                   sizeof(GlobalTransaction)));
    size = MAXALIGN(size);
    size = add_size(size, mul_size(max_prepared_xacts,
                                   sizeof(GlobalTransactionData)));

    return size;
}


Variable Documentation