Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Defines | Typedefs | Functions | Variables

twophase.c File Reference

#include "postgres.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
#include "storage/fd.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
Include dependency graph for twophase.c:

Go to the source code of this file.

Data Structures

struct  GlobalTransactionData
struct  TwoPhaseStateData
struct  Working_State
struct  TwoPhaseFileHeader
struct  TwoPhaseRecordOnDisk
struct  xllist

Defines

#define TWOPHASE_DIR   "pg_twophase"
#define GIDSIZE   200
#define TwoPhaseFilePath(path, xid)   snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
#define TWOPHASE_MAGIC   0x57F94532

Typedefs

typedef struct
GlobalTransactionData 
GlobalTransactionData
typedef struct TwoPhaseStateData TwoPhaseStateData
typedef struct TwoPhaseFileHeader TwoPhaseFileHeader
typedef struct TwoPhaseRecordOnDisk TwoPhaseRecordOnDisk

Functions

static void RecordTransactionCommitPrepared (TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, bool initfileinval)
static void RecordTransactionAbortPrepared (TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileNode *rels)
static void ProcessRecords (char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[])
Size TwoPhaseShmemSize (void)
void TwoPhaseShmemInit (void)
GlobalTransaction MarkAsPreparing (TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid)
static void GXactLoadSubxactData (GlobalTransaction gxact, int nsubxacts, TransactionId *children)
static void MarkAsPrepared (GlobalTransaction gxact)
static GlobalTransaction LockGXact (const char *gid, Oid user)
static void RemoveGXact (GlobalTransaction gxact)
static bool TransactionIdIsPrepared (TransactionId xid)
static int GetPreparedTransactionList (GlobalTransaction *gxacts)
Datum pg_prepared_xact (PG_FUNCTION_ARGS)
static GlobalTransaction TwoPhaseGetGXact (TransactionId xid)
BackendId TwoPhaseGetDummyBackendId (TransactionId xid)
PGPROCTwoPhaseGetDummyProc (TransactionId xid)
static void save_state_data (const void *data, uint32 len)
void StartPrepare (GlobalTransaction gxact)
void EndPrepare (GlobalTransaction gxact)
void RegisterTwoPhaseRecord (TwoPhaseRmgrId rmid, uint16 info, const void *data, uint32 len)
static char * ReadTwoPhaseFile (TransactionId xid, bool give_warnings)
bool StandbyTransactionIdIsPrepared (TransactionId xid)
void FinishPreparedTransaction (const char *gid, bool isCommit)
void RemoveTwoPhaseFile (TransactionId xid, bool giveWarning)
void RecreateTwoPhaseFile (TransactionId xid, void *content, int len)
void CheckPointTwoPhase (XLogRecPtr redo_horizon)
TransactionId PrescanPreparedTransactions (TransactionId **xids_p, int *nxids_p)
void StandbyRecoverPreparedTransactions (bool overwriteOK)
void RecoverPreparedTransactions (void)

Variables

int max_prepared_xacts = 0
static TwoPhaseStateDataTwoPhaseState
static struct xllist records

Define Documentation

#define GIDSIZE   200

Definition at line 107 of file twophase.c.

Referenced by MarkAsPreparing(), and StartPrepare().

#define TWOPHASE_DIR   "pg_twophase"
#define TWOPHASE_MAGIC   0x57F94532

Definition at line 785 of file twophase.c.

Referenced by EndPrepare(), and ReadTwoPhaseFile().

#define TwoPhaseFilePath (   path,
  xid 
)    snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)

Typedef Documentation


Function Documentation

void CheckPointTwoPhase ( XLogRecPtr  redo_horizon  ) 

Definition at line 1528 of file twophase.c.

References PROC_HDR::allPgXact, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, i, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_prepared_xacts, TwoPhaseStateData::numPrepXacts, OpenTransientFile(), palloc(), pfree(), PG_BINARY, pg_fsync(), GlobalTransactionData::pgprocno, GlobalTransactionData::prepare_lsn, TwoPhaseStateData::prepXacts, ProcGlobal, TransactionIdIsPrepared(), TwoPhaseFilePath, TwoPhaseStateLock, GlobalTransactionData::valid, and PGXACT::xid.

Referenced by CheckPointGuts().

{
    TransactionId *xids;
    int         nxids;
    char        path[MAXPGPATH];
    int         i;

    /*
     * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
     * it just long enough to make a list of the XIDs that require fsyncing,
     * and then do the I/O afterwards.
     *
     * This approach creates a race condition: someone else could delete a
     * GXACT between the time we release TwoPhaseStateLock and the time we try
     * to open its state file.  We handle this by special-casing ENOENT
     * failures: if we see that, we verify that the GXACT is no longer valid,
     * and if so ignore the failure.
     */
    if (max_prepared_xacts <= 0)
        return;                 /* nothing to do */

    TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();

    xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
    nxids = 0;

    LWLockAcquire(TwoPhaseStateLock, LW_SHARED);

    for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    {
        GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
        PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];

        if (gxact->valid &&
            gxact->prepare_lsn <= redo_horizon)
            xids[nxids++] = pgxact->xid;
    }

    LWLockRelease(TwoPhaseStateLock);

    for (i = 0; i < nxids; i++)
    {
        TransactionId xid = xids[i];
        int         fd;

        TwoPhaseFilePath(path, xid);

        fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
        if (fd < 0)
        {
            if (errno == ENOENT)
            {
                /* OK if gxact is no longer valid */
                if (!TransactionIdIsPrepared(xid))
                    continue;
                /* Restore errno in case it was changed */
                errno = ENOENT;
            }
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not open two-phase state file \"%s\": %m",
                            path)));
        }

        if (pg_fsync(fd) != 0)
        {
            CloseTransientFile(fd);
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not fsync two-phase state file \"%s\": %m",
                            path)));
        }

        if (CloseTransientFile(fd) != 0)
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not close two-phase state file \"%s\": %m",
                            path)));
    }

    pfree(xids);

    TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
}

void EndPrepare ( GlobalTransaction  gxact  ) 

Definition at line 942 of file twophase.c.

References PROC_HDR::allPgXact, Assert, CloseTransientFile(), COMP_CRC32, XLogRecData::data, PGXACT::delayChkpt, END_CRIT_SECTION, ereport, errcode(), errcode_for_file_access(), errmsg(), ERROR, FIN_CRC32, xllist::head, INIT_CRC32, XLogRecData::len, TwoPhaseFileHeader::magic, MarkAsPrepared(), MaxAllocSize, MyPgXact, XLogRecData::next, NULL, OpenTransientFile(), PG_BINARY, GlobalTransactionData::pgprocno, GlobalTransactionData::prepare_lsn, ProcGlobal, records, RegisterTwoPhaseRecord(), START_CRIT_SECTION, SyncRepWaitForLSN(), xllist::tail, xllist::total_len, TwoPhaseFileHeader::total_len, TWOPHASE_MAGIC, TWOPHASE_RM_END_ID, TwoPhaseFilePath, write, PGXACT::xid, XLOG_XACT_PREPARE, XLogFlush(), and XLogInsert().

Referenced by PrepareTransaction().

{
    PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
    TransactionId xid = pgxact->xid;
    TwoPhaseFileHeader *hdr;
    char        path[MAXPGPATH];
    XLogRecData *record;
    pg_crc32    statefile_crc;
    pg_crc32    bogus_crc;
    int         fd;

    /* Add the end sentinel to the list of 2PC records */
    RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
                           NULL, 0);

    /* Go back and fill in total_len in the file header record */
    hdr = (TwoPhaseFileHeader *) records.head->data;
    Assert(hdr->magic == TWOPHASE_MAGIC);
    hdr->total_len = records.total_len + sizeof(pg_crc32);

    /*
     * If the file size exceeds MaxAllocSize, we won't be able to read it in
     * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
     */
    if (hdr->total_len > MaxAllocSize)
        ereport(ERROR,
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("two-phase state file maximum length exceeded")));

    /*
     * Create the 2PC state file.
     */
    TwoPhaseFilePath(path, xid);

    fd = OpenTransientFile(path,
                           O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
                           S_IRUSR | S_IWUSR);
    if (fd < 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not create two-phase state file \"%s\": %m",
                        path)));

    /* Write data to file, and calculate CRC as we pass over it */
    INIT_CRC32(statefile_crc);

    for (record = records.head; record != NULL; record = record->next)
    {
        COMP_CRC32(statefile_crc, record->data, record->len);
        if ((write(fd, record->data, record->len)) != record->len)
        {
            CloseTransientFile(fd);
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not write two-phase state file: %m")));
        }
    }

    FIN_CRC32(statefile_crc);

    /*
     * Write a deliberately bogus CRC to the state file; this is just paranoia
     * to catch the case where four more bytes will run us out of disk space.
     */
    bogus_crc = ~statefile_crc;

    if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not write two-phase state file: %m")));
    }

    /* Back up to prepare for rewriting the CRC */
    if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not seek in two-phase state file: %m")));
    }

    /*
     * The state file isn't valid yet, because we haven't written the correct
     * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
     *
     * Between the time we have written the WAL entry and the time we write
     * out the correct state file CRC, we have an inconsistency: the xact is
     * prepared according to WAL but not according to our on-disk state. We
     * use a critical section to force a PANIC if we are unable to complete
     * the write --- then, WAL replay should repair the inconsistency.  The
     * odds of a PANIC actually occurring should be very tiny given that we
     * were able to write the bogus CRC above.
     *
     * We have to set delayChkpt here, too; otherwise a checkpoint starting
     * immediately after the WAL record is inserted could complete without
     * fsync'ing our state file.  (This is essentially the same kind of race
     * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
     * uses delayChkpt for; see notes there.)
     *
     * We save the PREPARE record's location in the gxact for later use by
     * CheckPointTwoPhase.
     */
    START_CRIT_SECTION();

    MyPgXact->delayChkpt = true;

    gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
                                    records.head);
    XLogFlush(gxact->prepare_lsn);

    /* If we crash now, we have prepared: WAL replay will fix things */

    /* write correct CRC and close file */
    if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not write two-phase state file: %m")));
    }

    if (CloseTransientFile(fd) != 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not close two-phase state file: %m")));

    /*
     * Mark the prepared transaction as valid.  As soon as xact.c marks
     * MyPgXact as not running our XID (which it will do immediately after
     * this function returns), others can commit/rollback the xact.
     *
     * NB: a side effect of this is to make a dummy ProcArray entry for the
     * prepared XID.  This must happen before we clear the XID from MyPgXact,
     * else there is a window where the XID is not running according to
     * TransactionIdIsInProgress, and onlookers would be entitled to assume
     * the xact crashed.  Instead we have a window where the same XID appears
     * twice in ProcArray, which is OK.
     */
    MarkAsPrepared(gxact);

    /*
     * Now we can mark ourselves as out of the commit critical section: a
     * checkpoint starting after this will certainly see the gxact as a
     * candidate for fsyncing.
     */
    MyPgXact->delayChkpt = false;

    END_CRIT_SECTION();

    /*
     * Wait for synchronous replication, if required.
     *
     * Note that at this stage we have marked the prepare, but still show as
     * running in the procarray (twice!) and continue to hold locks.
     */
    SyncRepWaitForLSN(gxact->prepare_lsn);

    records.tail = records.head = NULL;
}

void FinishPreparedTransaction ( const char *  gid,
bool  isCommit 
)

Definition at line 1259 of file twophase.c.

References PROC_HDR::allPgXact, PROC_HDR::allProcs, Assert, AtEOXact_PgStat(), buf, ereport, errcode(), errmsg(), ERROR, GetUserId(), i, TwoPhaseFileHeader::initfileinval, InvalidBackendId, LockGXact(), MAXALIGN, TwoPhaseFileHeader::nabortrels, TwoPhaseFileHeader::ncommitrels, TwoPhaseFileHeader::ninvalmsgs, TwoPhaseFileHeader::nsubxacts, NULL, pfree(), GlobalTransactionData::pgprocno, PredicateLockTwoPhaseFinish(), ProcArrayRemove(), ProcessRecords(), ProcGlobal, ReadTwoPhaseFile(), RecordTransactionAbortPrepared(), RecordTransactionCommitPrepared(), RelationCacheInitFilePostInvalidate(), RelationCacheInitFilePreInvalidate(), RemoveGXact(), RemoveTwoPhaseFile(), SendSharedInvalidMessages(), smgrclose(), smgrdounlink(), smgropen(), TransactionIdEquals, TransactionIdLatest(), twophase_postabort_callbacks, twophase_postcommit_callbacks, GlobalTransactionData::valid, TwoPhaseFileHeader::xid, and PGXACT::xid.

Referenced by standard_ProcessUtility().

{
    GlobalTransaction gxact;
    PGPROC     *proc;
    PGXACT     *pgxact;
    TransactionId xid;
    char       *buf;
    char       *bufptr;
    TwoPhaseFileHeader *hdr;
    TransactionId latestXid;
    TransactionId *children;
    RelFileNode *commitrels;
    RelFileNode *abortrels;
    RelFileNode *delrels;
    int         ndelrels;
    SharedInvalidationMessage *invalmsgs;
    int         i;

    /*
     * Validate the GID, and lock the GXACT to ensure that two backends do not
     * try to commit the same GID at once.
     */
    gxact = LockGXact(gid, GetUserId());
    proc = &ProcGlobal->allProcs[gxact->pgprocno];
    pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
    xid = pgxact->xid;

    /*
     * Read and validate the state file
     */
    buf = ReadTwoPhaseFile(xid, true);
    if (buf == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_DATA_CORRUPTED),
                 errmsg("two-phase state file for transaction %u is corrupt",
                        xid)));

    /*
     * Disassemble the header area
     */
    hdr = (TwoPhaseFileHeader *) buf;
    Assert(TransactionIdEquals(hdr->xid, xid));
    bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
    children = (TransactionId *) bufptr;
    bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
    commitrels = (RelFileNode *) bufptr;
    bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
    abortrels = (RelFileNode *) bufptr;
    bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
    invalmsgs = (SharedInvalidationMessage *) bufptr;
    bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));

    /* compute latestXid among all children */
    latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);

    /*
     * The order of operations here is critical: make the XLOG entry for
     * commit or abort, then mark the transaction committed or aborted in
     * pg_clog, then remove its PGPROC from the global ProcArray (which means
     * TransactionIdIsInProgress will stop saying the prepared xact is in
     * progress), then run the post-commit or post-abort callbacks. The
     * callbacks will release the locks the transaction held.
     */
    if (isCommit)
        RecordTransactionCommitPrepared(xid,
                                        hdr->nsubxacts, children,
                                        hdr->ncommitrels, commitrels,
                                        hdr->ninvalmsgs, invalmsgs,
                                        hdr->initfileinval);
    else
        RecordTransactionAbortPrepared(xid,
                                       hdr->nsubxacts, children,
                                       hdr->nabortrels, abortrels);

    ProcArrayRemove(proc, latestXid);

    /*
     * In case we fail while running the callbacks, mark the gxact invalid so
     * no one else will try to commit/rollback, and so it can be recycled
     * properly later.  It is still locked by our XID so it won't go away yet.
     *
     * (We assume it's safe to do this without taking TwoPhaseStateLock.)
     */
    gxact->valid = false;

    /*
     * We have to remove any files that were supposed to be dropped. For
     * consistency with the regular xact.c code paths, must do this before
     * releasing locks, so do it before running the callbacks.
     *
     * NB: this code knows that we couldn't be dropping any temp rels ...
     */
    if (isCommit)
    {
        delrels = commitrels;
        ndelrels = hdr->ncommitrels;
    }
    else
    {
        delrels = abortrels;
        ndelrels = hdr->nabortrels;
    }
    for (i = 0; i < ndelrels; i++)
    {
        SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);

        smgrdounlink(srel, false);
        smgrclose(srel);
    }

    /*
     * Handle cache invalidation messages.
     *
     * Relcache init file invalidation requires processing both before and
     * after we send the SI messages. See AtEOXact_Inval()
     */
    if (hdr->initfileinval)
        RelationCacheInitFilePreInvalidate();
    SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
    if (hdr->initfileinval)
        RelationCacheInitFilePostInvalidate();

    /* And now do the callbacks */
    if (isCommit)
        ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
    else
        ProcessRecords(bufptr, xid, twophase_postabort_callbacks);

    PredicateLockTwoPhaseFinish(xid, isCommit);

    /* Count the prepared xact as committed or aborted */
    AtEOXact_PgStat(isCommit);

    /*
     * And now we can clean up our mess.
     */
    RemoveTwoPhaseFile(xid, true);

    RemoveGXact(gxact);

    pfree(buf);
}

static int GetPreparedTransactionList ( GlobalTransaction gxacts  )  [static]

Definition at line 557 of file twophase.c.

References i, LW_SHARED, LWLockAcquire(), LWLockRelease(), TwoPhaseStateData::numPrepXacts, palloc(), TwoPhaseStateData::prepXacts, and TwoPhaseStateLock.

Referenced by pg_prepared_xact().

{
    GlobalTransaction array;
    int         num;
    int         i;

    LWLockAcquire(TwoPhaseStateLock, LW_SHARED);

    if (TwoPhaseState->numPrepXacts == 0)
    {
        LWLockRelease(TwoPhaseStateLock);

        *gxacts = NULL;
        return 0;
    }

    num = TwoPhaseState->numPrepXacts;
    array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
    *gxacts = array;
    for (i = 0; i < num; i++)
        memcpy(array + i, TwoPhaseState->prepXacts[i],
               sizeof(GlobalTransactionData));

    LWLockRelease(TwoPhaseStateLock);

    return num;
}

static void GXactLoadSubxactData ( GlobalTransaction  gxact,
int  nsubxacts,
TransactionId children 
) [static]

Definition at line 363 of file twophase.c.

References PROC_HDR::allPgXact, PROC_HDR::allProcs, PGXACT::nxids, PGXACT::overflowed, PGPROC_MAX_CACHED_SUBXIDS, GlobalTransactionData::pgprocno, ProcGlobal, PGPROC::subxids, and XidCache::xids.

Referenced by RecoverPreparedTransactions(), and StartPrepare().

{
    PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
    PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];

    /* We need no extra lock since the GXACT isn't valid yet */
    if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
    {
        pgxact->overflowed = true;
        nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
    }
    if (nsubxacts > 0)
    {
        memcpy(proc->subxids.xids, children,
               nsubxacts * sizeof(TransactionId));
        pgxact->nxids = nsubxacts;
    }
}

static GlobalTransaction LockGXact ( const char *  gid,
Oid  user 
) [static]

Definition at line 408 of file twophase.c.

References PROC_HDR::allProcs, PGPROC::databaseId, ereport, errcode(), errhint(), errmsg(), ERROR, GetTopTransactionId(), GlobalTransactionData::gid, i, GlobalTransactionData::locking_xid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, TwoPhaseStateData::numPrepXacts, GlobalTransactionData::owner, GlobalTransactionData::pgprocno, TwoPhaseStateData::prepXacts, ProcGlobal, superuser_arg(), TransactionIdIsActive(), TransactionIdIsValid, TwoPhaseStateLock, and GlobalTransactionData::valid.

Referenced by FinishPreparedTransaction().

{
    int         i;

    LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);

    for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    {
        GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
        PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];

        /* Ignore not-yet-valid GIDs */
        if (!gxact->valid)
            continue;
        if (strcmp(gxact->gid, gid) != 0)
            continue;

        /* Found it, but has someone else got it locked? */
        if (TransactionIdIsValid(gxact->locking_xid))
        {
            if (TransactionIdIsActive(gxact->locking_xid))
                ereport(ERROR,
                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                errmsg("prepared transaction with identifier \"%s\" is busy",
                       gid)));
            gxact->locking_xid = InvalidTransactionId;
        }

        if (user != gxact->owner && !superuser_arg(user))
            ereport(ERROR,
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                  errmsg("permission denied to finish prepared transaction"),
                     errhint("Must be superuser or the user that prepared the transaction.")));

        /*
         * Note: it probably would be possible to allow committing from
         * another database; but at the moment NOTIFY is known not to work and
         * there may be some other issues as well.  Hence disallow until
         * someone gets motivated to make it work.
         */
        if (MyDatabaseId != proc->databaseId)
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                  errmsg("prepared transaction belongs to another database"),
                     errhint("Connect to the database where the transaction was prepared to finish it.")));

        /* OK for me to lock it */
        gxact->locking_xid = GetTopTransactionId();

        LWLockRelease(TwoPhaseStateLock);

        return gxact;
    }

    LWLockRelease(TwoPhaseStateLock);

    ereport(ERROR,
            (errcode(ERRCODE_UNDEFINED_OBJECT),
         errmsg("prepared transaction with identifier \"%s\" does not exist",
                gid)));

    /* NOTREACHED */
    return NULL;
}

static void MarkAsPrepared ( GlobalTransaction  gxact  )  [static]

Definition at line 388 of file twophase.c.

References PROC_HDR::allProcs, Assert, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), GlobalTransactionData::pgprocno, ProcArrayAdd(), ProcGlobal, TwoPhaseStateLock, and GlobalTransactionData::valid.

Referenced by EndPrepare(), and RecoverPreparedTransactions().

{
    /* Lock here may be overkill, but I'm not convinced of that ... */
    LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
    Assert(!gxact->valid);
    gxact->valid = true;
    LWLockRelease(TwoPhaseStateLock);

    /*
     * Put it into the global ProcArray so TransactionIdIsInProgress considers
     * the XID as still running.
     */
    ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
}

GlobalTransaction MarkAsPreparing ( TransactionId  xid,
const char *  gid,
TimestampTz  prepared_at,
Oid  owner,
Oid  databaseid 
)

Definition at line 242 of file twophase.c.

References PROC_HDR::allPgXact, PROC_HDR::allProcs, Assert, PGPROC::backendId, PGPROC::databaseId, PGXACT::delayChkpt, ereport, errcode(), errhint(), errmsg(), ERROR, TwoPhaseStateData::freeGXacts, GlobalTransactionData::gid, GIDSIZE, i, PGPROC::links, GlobalTransactionData::locking_xid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), PGPROC::lwWaiting, PGPROC::lwWaitLink, PGPROC::lwWaitMode, PGPROC::lxid, max_prepared_xacts, MemSet, PGPROC::myProcLocks, GlobalTransactionData::next, NULL, TwoPhaseStateData::numPrepXacts, PGXACT::nxids, PGXACT::overflowed, GlobalTransactionData::owner, PGPROC::pgprocno, GlobalTransactionData::pgprocno, PGPROC::pid, GlobalTransactionData::prepare_lsn, GlobalTransactionData::prepared_at, TwoPhaseStateData::prepXacts, ProcGlobal, PGPROC::roleId, SHMQueueElemInit(), SHMQueueInit(), TransactionIdIsActive(), TwoPhaseStateLock, PGXACT::vacuumFlags, GlobalTransactionData::valid, PGPROC::waitLock, PGPROC::waitProcLock, PGPROC::waitStatus, PGXACT::xid, and PGXACT::xmin.

Referenced by PrepareTransaction(), and RecoverPreparedTransactions().

{
    GlobalTransaction gxact;
    PGPROC     *proc;
    PGXACT     *pgxact;
    int         i;

    if (strlen(gid) >= GIDSIZE)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("transaction identifier \"%s\" is too long",
                        gid)));

    /* fail immediately if feature is disabled */
    if (max_prepared_xacts == 0)
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("prepared transactions are disabled"),
              errhint("Set max_prepared_transactions to a nonzero value.")));

    LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);

    /*
     * First, find and recycle any gxacts that failed during prepare. We do
     * this partly to ensure we don't mistakenly say their GIDs are still
     * reserved, and partly so we don't fail on out-of-slots unnecessarily.
     */
    for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    {
        gxact = TwoPhaseState->prepXacts[i];
        if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
        {
            /* It's dead Jim ... remove from the active array */
            TwoPhaseState->numPrepXacts--;
            TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
            /* and put it back in the freelist */
            gxact->next = TwoPhaseState->freeGXacts;
            TwoPhaseState->freeGXacts = gxact;
            /* Back up index count too, so we don't miss scanning one */
            i--;
        }
    }

    /* Check for conflicting GID */
    for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    {
        gxact = TwoPhaseState->prepXacts[i];
        if (strcmp(gxact->gid, gid) == 0)
        {
            ereport(ERROR,
                    (errcode(ERRCODE_DUPLICATE_OBJECT),
                     errmsg("transaction identifier \"%s\" is already in use",
                            gid)));
        }
    }

    /* Get a free gxact from the freelist */
    if (TwoPhaseState->freeGXacts == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("maximum number of prepared transactions reached"),
                 errhint("Increase max_prepared_transactions (currently %d).",
                         max_prepared_xacts)));
    gxact = TwoPhaseState->freeGXacts;
    TwoPhaseState->freeGXacts = gxact->next;

    proc = &ProcGlobal->allProcs[gxact->pgprocno];
    pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];

    /* Initialize the PGPROC entry */
    MemSet(proc, 0, sizeof(PGPROC));
    proc->pgprocno = gxact->pgprocno;
    SHMQueueElemInit(&(proc->links));
    proc->waitStatus = STATUS_OK;
    /* We set up the gxact's VXID as InvalidBackendId/XID */
    proc->lxid = (LocalTransactionId) xid;
    pgxact->xid = xid;
    pgxact->xmin = InvalidTransactionId;
    pgxact->delayChkpt = false;
    pgxact->vacuumFlags = 0;
    proc->pid = 0;
    proc->backendId = InvalidBackendId;
    proc->databaseId = databaseid;
    proc->roleId = owner;
    proc->lwWaiting = false;
    proc->lwWaitMode = 0;
    proc->lwWaitLink = NULL;
    proc->waitLock = NULL;
    proc->waitProcLock = NULL;
    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
        SHMQueueInit(&(proc->myProcLocks[i]));
    /* subxid data must be filled later by GXactLoadSubxactData */
    pgxact->overflowed = false;
    pgxact->nxids = 0;

    gxact->prepared_at = prepared_at;
    /* initialize LSN to 0 (start of WAL) */
    gxact->prepare_lsn = 0;
    gxact->owner = owner;
    gxact->locking_xid = xid;
    gxact->valid = false;
    strcpy(gxact->gid, gid);

    /* And insert it into the active array */
    Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
    TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;

    LWLockRelease(TwoPhaseStateLock);

    return gxact;
}

Datum pg_prepared_xact ( PG_FUNCTION_ARGS   ) 

Definition at line 602 of file twophase.c.

References PROC_HDR::allPgXact, PROC_HDR::allProcs, Working_State::array, BlessTupleDesc(), CreateTemplateTupleDesc(), CStringGetTextDatum, Working_State::currIdx, PGPROC::databaseId, GetPreparedTransactionList(), GlobalTransactionData::gid, heap_form_tuple(), HeapTupleGetDatum, MemoryContextSwitchTo(), MemSet, FuncCallContext::multi_call_memory_ctx, Working_State::ngxacts, NULL, ObjectIdGetDatum, OIDOID, GlobalTransactionData::owner, palloc(), GlobalTransactionData::pgprocno, GlobalTransactionData::prepared_at, ProcGlobal, SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, SRF_RETURN_NEXT, TEXTOID, TimestampTzGetDatum, TIMESTAMPTZOID, TransactionIdGetDatum, FuncCallContext::tuple_desc, TupleDescInitEntry(), FuncCallContext::user_fctx, GlobalTransactionData::valid, values, PGXACT::xid, and XIDOID.

{
    FuncCallContext *funcctx;
    Working_State *status;

    if (SRF_IS_FIRSTCALL())
    {
        TupleDesc   tupdesc;
        MemoryContext oldcontext;

        /* create a function context for cross-call persistence */
        funcctx = SRF_FIRSTCALL_INIT();

        /*
         * Switch to memory context appropriate for multiple function calls
         */
        oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

        /* build tupdesc for result tuples */
        /* this had better match pg_prepared_xacts view in system_views.sql */
        tupdesc = CreateTemplateTupleDesc(5, false);
        TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
                           XIDOID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
                           TEXTOID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
                           TIMESTAMPTZOID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
                           OIDOID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
                           OIDOID, -1, 0);

        funcctx->tuple_desc = BlessTupleDesc(tupdesc);

        /*
         * Collect all the 2PC status information that we will format and send
         * out as a result set.
         */
        status = (Working_State *) palloc(sizeof(Working_State));
        funcctx->user_fctx = (void *) status;

        status->ngxacts = GetPreparedTransactionList(&status->array);
        status->currIdx = 0;

        MemoryContextSwitchTo(oldcontext);
    }

    funcctx = SRF_PERCALL_SETUP();
    status = (Working_State *) funcctx->user_fctx;

    while (status->array != NULL && status->currIdx < status->ngxacts)
    {
        GlobalTransaction gxact = &status->array[status->currIdx++];
        PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
        PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
        Datum       values[5];
        bool        nulls[5];
        HeapTuple   tuple;
        Datum       result;

        if (!gxact->valid)
            continue;

        /*
         * Form tuple with appropriate data.
         */
        MemSet(values, 0, sizeof(values));
        MemSet(nulls, 0, sizeof(nulls));

        values[0] = TransactionIdGetDatum(pgxact->xid);
        values[1] = CStringGetTextDatum(gxact->gid);
        values[2] = TimestampTzGetDatum(gxact->prepared_at);
        values[3] = ObjectIdGetDatum(gxact->owner);
        values[4] = ObjectIdGetDatum(proc->databaseId);

        tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
        result = HeapTupleGetDatum(tuple);
        SRF_RETURN_NEXT(funcctx, result);
    }

    SRF_RETURN_DONE(funcctx);
}

TransactionId PrescanPreparedTransactions ( TransactionId **  xids_p,
int *  nxids_p 
)

Definition at line 1641 of file twophase.c.

References AllocateDir(), Assert, buf, dirent::d_name, ereport, errmsg(), FreeDir(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXALIGN, VariableCacheData::nextXid, TwoPhaseFileHeader::nsubxacts, NULL, palloc(), pfree(), ReadDir(), ReadTwoPhaseFile(), RemoveTwoPhaseFile(), repalloc(), ShmemVariableCache, TransactionIdAdvance, TransactionIdEquals, TransactionIdFollows(), TransactionIdFollowsOrEquals(), TransactionIdPrecedes(), TWOPHASE_DIR, WARNING, TwoPhaseFileHeader::xid, and XidGenLock.

Referenced by StartupXLOG(), and xlog_redo().

{
    TransactionId origNextXid = ShmemVariableCache->nextXid;
    TransactionId result = origNextXid;
    DIR        *cldir;
    struct dirent *clde;
    TransactionId *xids = NULL;
    int         nxids = 0;
    int         allocsize = 0;

    cldir = AllocateDir(TWOPHASE_DIR);
    while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
    {
        if (strlen(clde->d_name) == 8 &&
            strspn(clde->d_name, "0123456789ABCDEF") == 8)
        {
            TransactionId xid;
            char       *buf;
            TwoPhaseFileHeader *hdr;
            TransactionId *subxids;
            int         i;

            xid = (TransactionId) strtoul(clde->d_name, NULL, 16);

            /* Reject XID if too new */
            if (TransactionIdFollowsOrEquals(xid, origNextXid))
            {
                ereport(WARNING,
                        (errmsg("removing future two-phase state file \"%s\"",
                                clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /*
             * Note: we can't check if already processed because clog
             * subsystem isn't up yet.
             */

            /* Read and validate file */
            buf = ReadTwoPhaseFile(xid, true);
            if (buf == NULL)
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /* Deconstruct header */
            hdr = (TwoPhaseFileHeader *) buf;
            if (!TransactionIdEquals(hdr->xid, xid))
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                pfree(buf);
                continue;
            }

            /*
             * OK, we think this file is valid.  Incorporate xid into the
             * running-minimum result.
             */
            if (TransactionIdPrecedes(xid, result))
                result = xid;

            /*
             * Examine subtransaction XIDs ... they should all follow main
             * XID, and they may force us to advance nextXid.
             *
             * We don't expect anyone else to modify nextXid, hence we don't
             * need to hold a lock while examining it.  We still acquire the
             * lock to modify it, though.
             */
            subxids = (TransactionId *)
                (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
            for (i = 0; i < hdr->nsubxacts; i++)
            {
                TransactionId subxid = subxids[i];

                Assert(TransactionIdFollows(subxid, xid));
                if (TransactionIdFollowsOrEquals(subxid,
                                                 ShmemVariableCache->nextXid))
                {
                    LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
                    ShmemVariableCache->nextXid = subxid;
                    TransactionIdAdvance(ShmemVariableCache->nextXid);
                    LWLockRelease(XidGenLock);
                }
            }


            if (xids_p)
            {
                if (nxids == allocsize)
                {
                    if (nxids == 0)
                    {
                        allocsize = 10;
                        xids = palloc(allocsize * sizeof(TransactionId));
                    }
                    else
                    {
                        allocsize = allocsize * 2;
                        xids = repalloc(xids, allocsize * sizeof(TransactionId));
                    }
                }
                xids[nxids++] = xid;
            }

            pfree(buf);
        }
    }
    FreeDir(cldir);

    if (xids_p)
    {
        *xids_p = xids;
        *nxids_p = nxids;
    }

    return result;
}

static void ProcessRecords ( char *  bufptr,
TransactionId  xid,
const TwoPhaseCallback  callbacks[] 
) [static]

Definition at line 1407 of file twophase.c.

References Assert, TwoPhaseRecordOnDisk::info, TwoPhaseRecordOnDisk::len, MAXALIGN, NULL, TwoPhaseRecordOnDisk::rmid, TWOPHASE_RM_END_ID, and TWOPHASE_RM_MAX_ID.

Referenced by FinishPreparedTransaction(), and RecoverPreparedTransactions().

{
    for (;;)
    {
        TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;

        Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
        if (record->rmid == TWOPHASE_RM_END_ID)
            break;

        bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));

        if (callbacks[record->rmid] != NULL)
            callbacks[record->rmid] (xid, record->info,
                                     (void *) bufptr, record->len);

        bufptr += MAXALIGN(record->len);
    }
}

static char* ReadTwoPhaseFile ( TransactionId  xid,
bool  give_warnings 
) [static]

Definition at line 1129 of file twophase.c.

References buf, CloseTransientFile(), COMP_CRC32, EQ_CRC32, ereport, errcode_for_file_access(), errmsg(), FIN_CRC32, INIT_CRC32, TwoPhaseFileHeader::magic, MAXALIGN, MaxAllocSize, OpenTransientFile(), palloc(), pfree(), PG_BINARY, read, TwoPhaseFileHeader::total_len, TWOPHASE_MAGIC, TwoPhaseFilePath, and WARNING.

Referenced by FinishPreparedTransaction(), PrescanPreparedTransactions(), RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions(), and StandbyTransactionIdIsPrepared().

{
    char        path[MAXPGPATH];
    char       *buf;
    TwoPhaseFileHeader *hdr;
    int         fd;
    struct stat stat;
    uint32      crc_offset;
    pg_crc32    calc_crc,
                file_crc;

    TwoPhaseFilePath(path, xid);

    fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
    if (fd < 0)
    {
        if (give_warnings)
            ereport(WARNING,
                    (errcode_for_file_access(),
                     errmsg("could not open two-phase state file \"%s\": %m",
                            path)));
        return NULL;
    }

    /*
     * Check file length.  We can determine a lower bound pretty easily. We
     * set an upper bound to avoid palloc() failure on a corrupt file, though
     * we can't guarantee that we won't get an out of memory error anyway,
     * even on a valid file.
     */
    if (fstat(fd, &stat))
    {
        CloseTransientFile(fd);
        if (give_warnings)
            ereport(WARNING,
                    (errcode_for_file_access(),
                     errmsg("could not stat two-phase state file \"%s\": %m",
                            path)));
        return NULL;
    }

    if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
                        MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
                        sizeof(pg_crc32)) ||
        stat.st_size > MaxAllocSize)
    {
        CloseTransientFile(fd);
        return NULL;
    }

    crc_offset = stat.st_size - sizeof(pg_crc32);
    if (crc_offset != MAXALIGN(crc_offset))
    {
        CloseTransientFile(fd);
        return NULL;
    }

    /*
     * OK, slurp in the file.
     */
    buf = (char *) palloc(stat.st_size);

    if (read(fd, buf, stat.st_size) != stat.st_size)
    {
        CloseTransientFile(fd);
        if (give_warnings)
            ereport(WARNING,
                    (errcode_for_file_access(),
                     errmsg("could not read two-phase state file \"%s\": %m",
                            path)));
        pfree(buf);
        return NULL;
    }

    CloseTransientFile(fd);

    hdr = (TwoPhaseFileHeader *) buf;
    if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
    {
        pfree(buf);
        return NULL;
    }

    INIT_CRC32(calc_crc);
    COMP_CRC32(calc_crc, buf, crc_offset);
    FIN_CRC32(calc_crc);

    file_crc = *((pg_crc32 *) (buf + crc_offset));

    if (!EQ_CRC32(calc_crc, file_crc))
    {
        pfree(buf);
        return NULL;
    }

    return buf;
}

static void RecordTransactionAbortPrepared ( TransactionId  xid,
int  nchildren,
TransactionId children,
int  nrels,
RelFileNode rels 
) [static]

Definition at line 2078 of file twophase.c.

References xl_xact_abort_prepared::arec, XLogRecData::buffer, XLogRecData::data, elog, END_CRIT_SECTION, GetCurrentTimestamp(), XLogRecData::len, XLogRecData::next, xl_xact_abort::nrels, xl_xact_abort::nsubxacts, PANIC, START_CRIT_SECTION, SyncRepWaitForLSN(), TransactionIdAbortTree(), TransactionIdDidCommit(), xl_xact_abort::xact_time, xl_xact_abort_prepared::xid, XLOG_XACT_ABORT_PREPARED, XLogFlush(), and XLogInsert().

Referenced by FinishPreparedTransaction().

{
    XLogRecData rdata[3];
    int         lastrdata = 0;
    xl_xact_abort_prepared xlrec;
    XLogRecPtr  recptr;

    /*
     * Catch the scenario where we aborted partway through
     * RecordTransactionCommitPrepared ...
     */
    if (TransactionIdDidCommit(xid))
        elog(PANIC, "cannot abort transaction %u, it was already committed",
             xid);

    START_CRIT_SECTION();

    /* Emit the XLOG abort record */
    xlrec.xid = xid;
    xlrec.arec.xact_time = GetCurrentTimestamp();
    xlrec.arec.nrels = nrels;
    xlrec.arec.nsubxacts = nchildren;
    rdata[0].data = (char *) (&xlrec);
    rdata[0].len = MinSizeOfXactAbortPrepared;
    rdata[0].buffer = InvalidBuffer;
    /* dump rels to delete */
    if (nrels > 0)
    {
        rdata[0].next = &(rdata[1]);
        rdata[1].data = (char *) rels;
        rdata[1].len = nrels * sizeof(RelFileNode);
        rdata[1].buffer = InvalidBuffer;
        lastrdata = 1;
    }
    /* dump committed child Xids */
    if (nchildren > 0)
    {
        rdata[lastrdata].next = &(rdata[2]);
        rdata[2].data = (char *) children;
        rdata[2].len = nchildren * sizeof(TransactionId);
        rdata[2].buffer = InvalidBuffer;
        lastrdata = 2;
    }
    rdata[lastrdata].next = NULL;

    recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);

    /* Always flush, since we're about to remove the 2PC state file */
    XLogFlush(recptr);

    /*
     * Mark the transaction aborted in clog.  This is not absolutely necessary
     * but we may as well do it while we are here.
     */
    TransactionIdAbortTree(xid, nchildren, children);

    END_CRIT_SECTION();

    /*
     * Wait for synchronous replication, if required.
     *
     * Note that at this stage we have marked clog, but still show as running
     * in the procarray and continue to hold locks.
     */
    SyncRepWaitForLSN(recptr);
}

static void RecordTransactionCommitPrepared ( TransactionId  xid,
int  nchildren,
TransactionId children,
int  nrels,
RelFileNode rels,
int  ninvalmsgs,
SharedInvalidationMessage invalmsgs,
bool  initfileinval 
) [static]

Definition at line 1981 of file twophase.c.

References XLogRecData::buffer, xl_xact_commit_prepared::crec, XLogRecData::data, PGXACT::delayChkpt, END_CRIT_SECTION, GetCurrentTimestamp(), XLogRecData::len, MyPgXact, XLogRecData::next, xl_xact_commit::nmsgs, xl_xact_commit::nrels, xl_xact_commit::nsubxacts, START_CRIT_SECTION, SyncRepWaitForLSN(), TransactionIdCommitTree(), XACT_COMPLETION_UPDATE_RELCACHE_FILE, xl_xact_commit::xact_time, xl_xact_commit_prepared::xid, xl_xact_commit::xinfo, XLOG_XACT_COMMIT_PREPARED, XLogFlush(), and XLogInsert().

Referenced by FinishPreparedTransaction().

{
    XLogRecData rdata[4];
    int         lastrdata = 0;
    xl_xact_commit_prepared xlrec;
    XLogRecPtr  recptr;

    START_CRIT_SECTION();

    /* See notes in RecordTransactionCommit */
    MyPgXact->delayChkpt = true;

    /* Emit the XLOG commit record */
    xlrec.xid = xid;
    xlrec.crec.xact_time = GetCurrentTimestamp();
    xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
    xlrec.crec.nmsgs = 0;
    xlrec.crec.nrels = nrels;
    xlrec.crec.nsubxacts = nchildren;
    xlrec.crec.nmsgs = ninvalmsgs;

    rdata[0].data = (char *) (&xlrec);
    rdata[0].len = MinSizeOfXactCommitPrepared;
    rdata[0].buffer = InvalidBuffer;
    /* dump rels to delete */
    if (nrels > 0)
    {
        rdata[0].next = &(rdata[1]);
        rdata[1].data = (char *) rels;
        rdata[1].len = nrels * sizeof(RelFileNode);
        rdata[1].buffer = InvalidBuffer;
        lastrdata = 1;
    }
    /* dump committed child Xids */
    if (nchildren > 0)
    {
        rdata[lastrdata].next = &(rdata[2]);
        rdata[2].data = (char *) children;
        rdata[2].len = nchildren * sizeof(TransactionId);
        rdata[2].buffer = InvalidBuffer;
        lastrdata = 2;
    }
    /* dump cache invalidation messages */
    if (ninvalmsgs > 0)
    {
        rdata[lastrdata].next = &(rdata[3]);
        rdata[3].data = (char *) invalmsgs;
        rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
        rdata[3].buffer = InvalidBuffer;
        lastrdata = 3;
    }
    rdata[lastrdata].next = NULL;

    recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);

    /*
     * We don't currently try to sleep before flush here ... nor is there any
     * support for async commit of a prepared xact (the very idea is probably
     * a contradiction)
     */

    /* Flush XLOG to disk */
    XLogFlush(recptr);

    /* Mark the transaction committed in pg_clog */
    TransactionIdCommitTree(xid, nchildren, children);

    /* Checkpoint can proceed now */
    MyPgXact->delayChkpt = false;

    END_CRIT_SECTION();

    /*
     * Wait for synchronous replication, if required.
     *
     * Note that at this stage we have marked clog, but still show as running
     * in the procarray and continue to hold locks.
     */
    SyncRepWaitForLSN(recptr);
}

void RecoverPreparedTransactions ( void   ) 

Definition at line 1859 of file twophase.c.

References AllocateDir(), Assert, buf, dirent::d_name, TwoPhaseFileHeader::database, ereport, errmsg(), FreeDir(), TwoPhaseFileHeader::gid, GXactLoadSubxactData(), i, InHotStandby, LOG, MarkAsPrepared(), MarkAsPreparing(), MAXALIGN, MAXPGPATH, TwoPhaseFileHeader::nabortrels, TwoPhaseFileHeader::ncommitrels, TwoPhaseFileHeader::ninvalmsgs, TwoPhaseFileHeader::nsubxacts, NULL, TwoPhaseFileHeader::owner, pfree(), PGPROC_MAX_CACHED_SUBXIDS, TwoPhaseFileHeader::prepared_at, ProcessRecords(), ReadDir(), ReadTwoPhaseFile(), RemoveTwoPhaseFile(), snprintf(), StandbyReleaseLockTree(), SubTransSetParent(), TransactionIdDidAbort(), TransactionIdDidCommit(), TransactionIdEquals, TWOPHASE_DIR, twophase_recover_callbacks, WARNING, and TwoPhaseFileHeader::xid.

Referenced by StartupXLOG().

{
    char        dir[MAXPGPATH];
    DIR        *cldir;
    struct dirent *clde;
    bool        overwriteOK = false;

    snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);

    cldir = AllocateDir(dir);
    while ((clde = ReadDir(cldir, dir)) != NULL)
    {
        if (strlen(clde->d_name) == 8 &&
            strspn(clde->d_name, "0123456789ABCDEF") == 8)
        {
            TransactionId xid;
            char       *buf;
            char       *bufptr;
            TwoPhaseFileHeader *hdr;
            TransactionId *subxids;
            GlobalTransaction gxact;
            int         i;

            xid = (TransactionId) strtoul(clde->d_name, NULL, 16);

            /* Already processed? */
            if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
            {
                ereport(WARNING,
                        (errmsg("removing stale two-phase state file \"%s\"",
                                clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /* Read and validate file */
            buf = ReadTwoPhaseFile(xid, true);
            if (buf == NULL)
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            ereport(LOG,
                    (errmsg("recovering prepared transaction %u", xid)));

            /* Deconstruct header */
            hdr = (TwoPhaseFileHeader *) buf;
            Assert(TransactionIdEquals(hdr->xid, xid));
            bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
            subxids = (TransactionId *) bufptr;
            bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
            bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
            bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
            bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));

            /*
             * It's possible that SubTransSetParent has been set before, if
             * the prepared transaction generated xid assignment records. Test
             * here must match one used in AssignTransactionId().
             */
            if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
                overwriteOK = true;

            /*
             * Reconstruct subtrans state for the transaction --- needed
             * because pg_subtrans is not preserved over a restart.  Note that
             * we are linking all the subtransactions directly to the
             * top-level XID; there may originally have been a more complex
             * hierarchy, but there's no need to restore that exactly.
             */
            for (i = 0; i < hdr->nsubxacts; i++)
                SubTransSetParent(subxids[i], xid, overwriteOK);

            /*
             * Recreate its GXACT and dummy PGPROC
             *
             * Note: since we don't have the PREPARE record's WAL location at
             * hand, we leave prepare_lsn zeroes.  This means the GXACT will
             * be fsync'd on every future checkpoint.  We assume this
             * situation is infrequent enough that the performance cost is
             * negligible (especially since we know the state file has already
             * been fsynced).
             */
            gxact = MarkAsPreparing(xid, hdr->gid,
                                    hdr->prepared_at,
                                    hdr->owner, hdr->database);
            GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
            MarkAsPrepared(gxact);

            /*
             * Recover other state (notably locks) using resource managers
             */
            ProcessRecords(bufptr, xid, twophase_recover_callbacks);

            /*
             * Release locks held by the standby process after we process each
             * prepared transaction. As a result, we don't need too many
             * additional locks at any one time.
             */
            if (InHotStandby)
                StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);

            pfree(buf);
        }
    }
    FreeDir(cldir);
}

void RecreateTwoPhaseFile ( TransactionId  xid,
void *  content,
int  len 
)

Definition at line 1454 of file twophase.c.

References CloseTransientFile(), COMP_CRC32, ereport, errcode_for_file_access(), errmsg(), ERROR, FIN_CRC32, INIT_CRC32, OpenTransientFile(), PG_BINARY, pg_fsync(), TwoPhaseFilePath, and write.

Referenced by xact_redo().

{
    char        path[MAXPGPATH];
    pg_crc32    statefile_crc;
    int         fd;

    /* Recompute CRC */
    INIT_CRC32(statefile_crc);
    COMP_CRC32(statefile_crc, content, len);
    FIN_CRC32(statefile_crc);

    TwoPhaseFilePath(path, xid);

    fd = OpenTransientFile(path,
                           O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
                           S_IRUSR | S_IWUSR);
    if (fd < 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not recreate two-phase state file \"%s\": %m",
                        path)));

    /* Write content and CRC */
    if (write(fd, content, len) != len)
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not write two-phase state file: %m")));
    }
    if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not write two-phase state file: %m")));
    }

    /*
     * We must fsync the file because the end-of-replay checkpoint will not do
     * so, there being no GXACT in shared memory yet to tell it to.
     */
    if (pg_fsync(fd) != 0)
    {
        CloseTransientFile(fd);
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not fsync two-phase state file: %m")));
    }

    if (CloseTransientFile(fd) != 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not close two-phase state file: %m")));
}

void RegisterTwoPhaseRecord ( TwoPhaseRmgrId  rmid,
uint16  info,
const void *  data,
uint32  len 
)

Definition at line 1108 of file twophase.c.

References TwoPhaseRecordOnDisk::info, TwoPhaseRecordOnDisk::len, TwoPhaseRecordOnDisk::rmid, and save_state_data().

Referenced by AtPrepare_Locks(), AtPrepare_MultiXact(), AtPrepare_PgStat(), AtPrepare_PredicateLocks(), and EndPrepare().

{
    TwoPhaseRecordOnDisk record;

    record.rmid = rmid;
    record.info = info;
    record.len = len;
    save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
    if (len > 0)
        save_state_data(data, len);
}

static void RemoveGXact ( GlobalTransaction  gxact  )  [static]
void RemoveTwoPhaseFile ( TransactionId  xid,
bool  giveWarning 
)

Definition at line 1435 of file twophase.c.

References ereport, errcode_for_file_access(), errmsg(), TwoPhaseFilePath, unlink(), and WARNING.

Referenced by FinishPreparedTransaction(), PrescanPreparedTransactions(), RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions(), and xact_redo().

{
    char        path[MAXPGPATH];

    TwoPhaseFilePath(path, xid);
    if (unlink(path))
        if (errno != ENOENT || giveWarning)
            ereport(WARNING,
                    (errcode_for_file_access(),
                   errmsg("could not remove two-phase state file \"%s\": %m",
                          path)));
}

static void save_state_data ( const void *  data,
uint32  len 
) [static]
void StandbyRecoverPreparedTransactions ( bool  overwriteOK  ) 

Definition at line 1781 of file twophase.c.

References AllocateDir(), Assert, buf, dirent::d_name, ereport, errmsg(), FreeDir(), i, MAXALIGN, TwoPhaseFileHeader::nsubxacts, NULL, pfree(), ReadDir(), ReadTwoPhaseFile(), RemoveTwoPhaseFile(), SubTransSetParent(), TransactionIdDidAbort(), TransactionIdDidCommit(), TransactionIdEquals, TransactionIdFollows(), TWOPHASE_DIR, WARNING, and TwoPhaseFileHeader::xid.

Referenced by StartupXLOG(), and xlog_redo().

{
    DIR        *cldir;
    struct dirent *clde;

    cldir = AllocateDir(TWOPHASE_DIR);
    while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
    {
        if (strlen(clde->d_name) == 8 &&
            strspn(clde->d_name, "0123456789ABCDEF") == 8)
        {
            TransactionId xid;
            char       *buf;
            TwoPhaseFileHeader *hdr;
            TransactionId *subxids;
            int         i;

            xid = (TransactionId) strtoul(clde->d_name, NULL, 16);

            /* Already processed? */
            if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
            {
                ereport(WARNING,
                        (errmsg("removing stale two-phase state file \"%s\"",
                                clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /* Read and validate file */
            buf = ReadTwoPhaseFile(xid, true);
            if (buf == NULL)
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                continue;
            }

            /* Deconstruct header */
            hdr = (TwoPhaseFileHeader *) buf;
            if (!TransactionIdEquals(hdr->xid, xid))
            {
                ereport(WARNING,
                      (errmsg("removing corrupt two-phase state file \"%s\"",
                              clde->d_name)));
                RemoveTwoPhaseFile(xid, true);
                pfree(buf);
                continue;
            }

            /*
             * Examine subtransaction XIDs ... they should all follow main
             * XID.
             */
            subxids = (TransactionId *)
                (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
            for (i = 0; i < hdr->nsubxacts; i++)
            {
                TransactionId subxid = subxids[i];

                Assert(TransactionIdFollows(subxid, xid));
                SubTransSetParent(xid, subxid, overwriteOK);
            }
        }
    }
    FreeDir(cldir);
}

bool StandbyTransactionIdIsPrepared ( TransactionId  xid  ) 

Definition at line 1231 of file twophase.c.

References Assert, buf, max_prepared_xacts, NULL, pfree(), ReadTwoPhaseFile(), TransactionIdEquals, TransactionIdIsValid, and TwoPhaseFileHeader::xid.

Referenced by KnownAssignedXidsRemovePreceding(), and StandbyReleaseOldLocks().

{
    char       *buf;
    TwoPhaseFileHeader *hdr;
    bool        result;

    Assert(TransactionIdIsValid(xid));

    if (max_prepared_xacts <= 0)
        return false;           /* nothing to do */

    /* Read and validate file */
    buf = ReadTwoPhaseFile(xid, false);
    if (buf == NULL)
        return false;

    /* Check header also */
    hdr = (TwoPhaseFileHeader *) buf;
    result = TransactionIdEquals(hdr->xid, xid);
    pfree(buf);

    return result;
}

void StartPrepare ( GlobalTransaction  gxact  ) 

Definition at line 868 of file twophase.c.

References PROC_HDR::allPgXact, PROC_HDR::allProcs, XLogRecData::buffer, xllist::bytes_free, XLogRecData::data, TwoPhaseFileHeader::database, PGPROC::databaseId, GlobalTransactionData::gid, TwoPhaseFileHeader::gid, GIDSIZE, GXactLoadSubxactData(), xllist::head, TwoPhaseFileHeader::initfileinval, XLogRecData::len, TwoPhaseFileHeader::magic, Max, TwoPhaseFileHeader::nabortrels, TwoPhaseFileHeader::ncommitrels, XLogRecData::next, TwoPhaseFileHeader::ninvalmsgs, TwoPhaseFileHeader::nsubxacts, GlobalTransactionData::owner, TwoPhaseFileHeader::owner, palloc(), palloc0(), pfree(), GlobalTransactionData::pgprocno, GlobalTransactionData::prepared_at, TwoPhaseFileHeader::prepared_at, ProcGlobal, records, save_state_data(), smgrGetPendingDeletes(), StrNCpy, xllist::tail, TwoPhaseFileHeader::total_len, xllist::total_len, xactGetCommittedChildren(), xactGetCommittedInvalidationMessages(), TwoPhaseFileHeader::xid, and PGXACT::xid.

Referenced by PrepareTransaction().

{
    PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
    PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
    TransactionId xid = pgxact->xid;
    TwoPhaseFileHeader hdr;
    TransactionId *children;
    RelFileNode *commitrels;
    RelFileNode *abortrels;
    SharedInvalidationMessage *invalmsgs;

    /* Initialize linked list */
    records.head = palloc0(sizeof(XLogRecData));
    records.head->buffer = InvalidBuffer;
    records.head->len = 0;
    records.head->next = NULL;

    records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
    records.head->data = palloc(records.bytes_free);

    records.tail = records.head;

    records.total_len = 0;

    /* Create header */
    hdr.magic = TWOPHASE_MAGIC;
    hdr.total_len = 0;          /* EndPrepare will fill this in */
    hdr.xid = xid;
    hdr.database = proc->databaseId;
    hdr.prepared_at = gxact->prepared_at;
    hdr.owner = gxact->owner;
    hdr.nsubxacts = xactGetCommittedChildren(&children);
    hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
    hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
    hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
                                                          &hdr.initfileinval);
    StrNCpy(hdr.gid, gxact->gid, GIDSIZE);

    save_state_data(&hdr, sizeof(TwoPhaseFileHeader));

    /*
     * Add the additional info about subxacts, deletable files and cache
     * invalidation messages.
     */
    if (hdr.nsubxacts > 0)
    {
        save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
        /* While we have the child-xact data, stuff it in the gxact too */
        GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
    }
    if (hdr.ncommitrels > 0)
    {
        save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
        pfree(commitrels);
    }
    if (hdr.nabortrels > 0)
    {
        save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
        pfree(abortrels);
    }
    if (hdr.ninvalmsgs > 0)
    {
        save_state_data(invalmsgs,
                        hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
        pfree(invalmsgs);
    }
}

static bool TransactionIdIsPrepared ( TransactionId  xid  )  [static]
BackendId TwoPhaseGetDummyBackendId ( TransactionId  xid  ) 
PGPROC* TwoPhaseGetDummyProc ( TransactionId  xid  ) 
static GlobalTransaction TwoPhaseGetGXact ( TransactionId  xid  )  [static]

Definition at line 691 of file twophase.c.

References PROC_HDR::allPgXact, elog, ERROR, i, LW_SHARED, LWLockAcquire(), LWLockRelease(), NULL, TwoPhaseStateData::numPrepXacts, GlobalTransactionData::pgprocno, TwoPhaseStateData::prepXacts, ProcGlobal, TwoPhaseStateLock, and PGXACT::xid.

Referenced by TwoPhaseGetDummyBackendId(), and TwoPhaseGetDummyProc().

{
    GlobalTransaction result = NULL;
    int         i;

    static TransactionId cached_xid = InvalidTransactionId;
    static GlobalTransaction cached_gxact = NULL;

    /*
     * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
     * repeatedly for the same XID.  We can save work with a simple cache.
     */
    if (xid == cached_xid)
        return cached_gxact;

    LWLockAcquire(TwoPhaseStateLock, LW_SHARED);

    for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    {
        GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
        PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];

        if (pgxact->xid == xid)
        {
            result = gxact;
            break;
        }
    }

    LWLockRelease(TwoPhaseStateLock);

    if (result == NULL)         /* should not happen */
        elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);

    cached_xid = xid;
    cached_gxact = result;

    return result;
}

void TwoPhaseShmemInit ( void   ) 

Definition at line 181 of file twophase.c.

References Assert, GlobalTransactionData::dummyBackendId, TwoPhaseStateData::freeGXacts, i, IsUnderPostmaster, max_prepared_xacts, MAXALIGN, MaxBackends, GlobalTransactionData::next, TwoPhaseStateData::numPrepXacts, offsetof, PGPROC::pgprocno, GlobalTransactionData::pgprocno, PreparedXactProcs, ShmemInitStruct(), and TwoPhaseShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

{
    bool        found;

    TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
                                    TwoPhaseShmemSize(),
                                    &found);
    if (!IsUnderPostmaster)
    {
        GlobalTransaction gxacts;
        int         i;

        Assert(!found);
        TwoPhaseState->freeGXacts = NULL;
        TwoPhaseState->numPrepXacts = 0;

        /*
         * Initialize the linked list of free GlobalTransactionData structs
         */
        gxacts = (GlobalTransaction)
            ((char *) TwoPhaseState +
             MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
                      sizeof(GlobalTransaction) * max_prepared_xacts));
        for (i = 0; i < max_prepared_xacts; i++)
        {
            /* insert into linked list */
            gxacts[i].next = TwoPhaseState->freeGXacts;
            TwoPhaseState->freeGXacts = &gxacts[i];

            /* associate it with a PGPROC assigned by InitProcGlobal */
            gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;

            /*
             * Assign a unique ID for each dummy proc, so that the range of
             * dummy backend IDs immediately follows the range of normal
             * backend IDs. We don't dare to assign a real backend ID to dummy
             * procs, because prepared transactions don't take part in cache
             * invalidation like a real backend ID would imply, but having a
             * unique ID for them is nevertheless handy. This arrangement
             * allows you to allocate an array of size (MaxBackends +
             * max_prepared_xacts + 1), and have a slot for every backend and
             * prepared transaction. Currently multixact.c uses that
             * technique.
             */
            gxacts[i].dummyBackendId = MaxBackends + 1 + i;
        }
    }
    else
        Assert(found);
}

Size TwoPhaseShmemSize ( void   ) 

Definition at line 165 of file twophase.c.

References add_size(), max_prepared_xacts, MAXALIGN, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and TwoPhaseShmemInit().

{
    Size        size;

    /* Need the fixed struct, the array of pointers, and the GTD structs */
    size = offsetof(TwoPhaseStateData, prepXacts);
    size = add_size(size, mul_size(max_prepared_xacts,
                                   sizeof(GlobalTransaction)));
    size = MAXALIGN(size);
    size = add_size(size, mul_size(max_prepared_xacts,
                                   sizeof(GlobalTransactionData)));

    return size;
}


Variable Documentation

struct xllist records [static]

Definition at line 141 of file twophase.c.