Header And Logo

PostgreSQL
| The world's most advanced open source database.

twophase.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * twophase.c
00004  *      Two-phase commit support functions.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  * IDENTIFICATION
00010  *      src/backend/access/transam/twophase.c
00011  *
00012  * NOTES
00013  *      Each global transaction is associated with a global transaction
00014  *      identifier (GID). The client assigns a GID to a postgres
00015  *      transaction with the PREPARE TRANSACTION command.
00016  *
00017  *      We keep all active global transactions in a shared memory array.
00018  *      When the PREPARE TRANSACTION command is issued, the GID is
00019  *      reserved for the transaction in the array. This is done before
00020  *      a WAL entry is made, because the reservation checks for duplicate
00021  *      GIDs and aborts the transaction if there already is a global
00022  *      transaction in prepared state with the same GID.
00023  *
00024  *      A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
00025  *      what keeps the XID considered running by TransactionIdIsInProgress.
00026  *      It is also convenient as a PGPROC to hook the gxact's locks to.
00027  *
00028  *      In order to survive crashes and shutdowns, all prepared
00029  *      transactions must be stored in permanent storage. This includes
00030  *      locking information, pending notifications etc. All that state
00031  *      information is written to the per-transaction state file in
00032  *      the pg_twophase directory.
00033  *
00034  *-------------------------------------------------------------------------
00035  */
00036 #include "postgres.h"
00037 
00038 #include <fcntl.h>
00039 #include <sys/stat.h>
00040 #include <sys/types.h>
00041 #include <time.h>
00042 #include <unistd.h>
00043 
00044 #include "access/htup_details.h"
00045 #include "access/subtrans.h"
00046 #include "access/transam.h"
00047 #include "access/twophase.h"
00048 #include "access/twophase_rmgr.h"
00049 #include "access/xact.h"
00050 #include "access/xlogutils.h"
00051 #include "catalog/pg_type.h"
00052 #include "catalog/storage.h"
00053 #include "funcapi.h"
00054 #include "miscadmin.h"
00055 #include "pg_trace.h"
00056 #include "pgstat.h"
00057 #include "replication/walsender.h"
00058 #include "replication/syncrep.h"
00059 #include "storage/fd.h"
00060 #include "storage/predicate.h"
00061 #include "storage/proc.h"
00062 #include "storage/procarray.h"
00063 #include "storage/sinvaladt.h"
00064 #include "storage/smgr.h"
00065 #include "utils/builtins.h"
00066 #include "utils/memutils.h"
00067 #include "utils/timestamp.h"
00068 
00069 
00070 /*
00071  * Directory where Two-phase commit files reside within PGDATA
00072  */
00073 #define TWOPHASE_DIR "pg_twophase"
00074 
00075 /* GUC variable, can't be changed after startup */
00076 int         max_prepared_xacts = 0;
00077 
00078 /*
00079  * This struct describes one global transaction that is in prepared state
00080  * or attempting to become prepared.
00081  *
00082  * The lifecycle of a global transaction is:
00083  *
00084  * 1. After checking that the requested GID is not in use, set up an
00085  * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
00086  * with locking_xid = my own XID and valid = false.
00087  *
00088  * 2. After successfully completing prepare, set valid = true and enter the
00089  * referenced PGPROC into the global ProcArray.
00090  *
00091  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
00092  * is valid and its locking_xid is no longer active, then store my current
00093  * XID into locking_xid.  This prevents concurrent attempts to commit or
00094  * rollback the same prepared xact.
00095  *
00096  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
00097  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
00098  * the freelist.
00099  *
00100  * Note that if the preparing transaction fails between steps 1 and 2, the
00101  * entry will remain in prepXacts until recycled.  We can detect recyclable
00102  * entries by checking for valid = false and locking_xid no longer active.
00103  *
00104  * typedef struct GlobalTransactionData *GlobalTransaction appears in
00105  * twophase.h
00106  */
00107 #define GIDSIZE 200
00108 
00109 typedef struct GlobalTransactionData
00110 {
00111     GlobalTransaction next;     /* list link for free list */
00112     int         pgprocno;       /* ID of associated dummy PGPROC */
00113     BackendId   dummyBackendId; /* similar to backend id for backends */
00114     TimestampTz prepared_at;    /* time of preparation */
00115     XLogRecPtr  prepare_lsn;    /* XLOG offset of prepare record */
00116     Oid         owner;          /* ID of user that executed the xact */
00117     TransactionId locking_xid;  /* top-level XID of backend working on xact */
00118     bool        valid;          /* TRUE if fully prepared */
00119     char        gid[GIDSIZE];   /* The GID assigned to the prepared xact */
00120 }   GlobalTransactionData;
00121 
00122 /*
00123  * Two Phase Commit shared state.  Access to this struct is protected
00124  * by TwoPhaseStateLock.
00125  */
00126 typedef struct TwoPhaseStateData
00127 {
00128     /* Head of linked list of free GlobalTransactionData structs */
00129     GlobalTransaction freeGXacts;
00130 
00131     /* Number of valid prepXacts entries. */
00132     int         numPrepXacts;
00133 
00134     /*
00135      * There are max_prepared_xacts items in this array, but C wants a
00136      * fixed-size array.
00137      */
00138     GlobalTransaction prepXacts[1];     /* VARIABLE LENGTH ARRAY */
00139 } TwoPhaseStateData;            /* VARIABLE LENGTH STRUCT */
00140 
00141 static TwoPhaseStateData *TwoPhaseState;
00142 
00143 
00144 static void RecordTransactionCommitPrepared(TransactionId xid,
00145                                 int nchildren,
00146                                 TransactionId *children,
00147                                 int nrels,
00148                                 RelFileNode *rels,
00149                                 int ninvalmsgs,
00150                                 SharedInvalidationMessage *invalmsgs,
00151                                 bool initfileinval);
00152 static void RecordTransactionAbortPrepared(TransactionId xid,
00153                                int nchildren,
00154                                TransactionId *children,
00155                                int nrels,
00156                                RelFileNode *rels);
00157 static void ProcessRecords(char *bufptr, TransactionId xid,
00158                const TwoPhaseCallback callbacks[]);
00159 
00160 
00161 /*
00162  * Initialization of shared memory
00163  */
00164 Size
00165 TwoPhaseShmemSize(void)
00166 {
00167     Size        size;
00168 
00169     /* Need the fixed struct, the array of pointers, and the GTD structs */
00170     size = offsetof(TwoPhaseStateData, prepXacts);
00171     size = add_size(size, mul_size(max_prepared_xacts,
00172                                    sizeof(GlobalTransaction)));
00173     size = MAXALIGN(size);
00174     size = add_size(size, mul_size(max_prepared_xacts,
00175                                    sizeof(GlobalTransactionData)));
00176 
00177     return size;
00178 }
00179 
00180 void
00181 TwoPhaseShmemInit(void)
00182 {
00183     bool        found;
00184 
00185     TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
00186                                     TwoPhaseShmemSize(),
00187                                     &found);
00188     if (!IsUnderPostmaster)
00189     {
00190         GlobalTransaction gxacts;
00191         int         i;
00192 
00193         Assert(!found);
00194         TwoPhaseState->freeGXacts = NULL;
00195         TwoPhaseState->numPrepXacts = 0;
00196 
00197         /*
00198          * Initialize the linked list of free GlobalTransactionData structs
00199          */
00200         gxacts = (GlobalTransaction)
00201             ((char *) TwoPhaseState +
00202              MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
00203                       sizeof(GlobalTransaction) * max_prepared_xacts));
00204         for (i = 0; i < max_prepared_xacts; i++)
00205         {
00206             /* insert into linked list */
00207             gxacts[i].next = TwoPhaseState->freeGXacts;
00208             TwoPhaseState->freeGXacts = &gxacts[i];
00209 
00210             /* associate it with a PGPROC assigned by InitProcGlobal */
00211             gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
00212 
00213             /*
00214              * Assign a unique ID for each dummy proc, so that the range of
00215              * dummy backend IDs immediately follows the range of normal
00216              * backend IDs. We don't dare to assign a real backend ID to dummy
00217              * procs, because prepared transactions don't take part in cache
00218              * invalidation like a real backend ID would imply, but having a
00219              * unique ID for them is nevertheless handy. This arrangement
00220              * allows you to allocate an array of size (MaxBackends +
00221              * max_prepared_xacts + 1), and have a slot for every backend and
00222              * prepared transaction. Currently multixact.c uses that
00223              * technique.
00224              */
00225             gxacts[i].dummyBackendId = MaxBackends + 1 + i;
00226         }
00227     }
00228     else
00229         Assert(found);
00230 }
00231 
00232 
00233 /*
00234  * MarkAsPreparing
00235  *      Reserve the GID for the given transaction.
00236  *
00237  * Internally, this creates a gxact struct and puts it into the active array.
00238  * NOTE: this is also used when reloading a gxact after a crash; so avoid
00239  * assuming that we can use very much backend context.
00240  */
00241 GlobalTransaction
00242 MarkAsPreparing(TransactionId xid, const char *gid,
00243                 TimestampTz prepared_at, Oid owner, Oid databaseid)
00244 {
00245     GlobalTransaction gxact;
00246     PGPROC     *proc;
00247     PGXACT     *pgxact;
00248     int         i;
00249 
00250     if (strlen(gid) >= GIDSIZE)
00251         ereport(ERROR,
00252                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00253                  errmsg("transaction identifier \"%s\" is too long",
00254                         gid)));
00255 
00256     /* fail immediately if feature is disabled */
00257     if (max_prepared_xacts == 0)
00258         ereport(ERROR,
00259                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00260                  errmsg("prepared transactions are disabled"),
00261               errhint("Set max_prepared_transactions to a nonzero value.")));
00262 
00263     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
00264 
00265     /*
00266      * First, find and recycle any gxacts that failed during prepare. We do
00267      * this partly to ensure we don't mistakenly say their GIDs are still
00268      * reserved, and partly so we don't fail on out-of-slots unnecessarily.
00269      */
00270     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00271     {
00272         gxact = TwoPhaseState->prepXacts[i];
00273         if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
00274         {
00275             /* It's dead Jim ... remove from the active array */
00276             TwoPhaseState->numPrepXacts--;
00277             TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
00278             /* and put it back in the freelist */
00279             gxact->next = TwoPhaseState->freeGXacts;
00280             TwoPhaseState->freeGXacts = gxact;
00281             /* Back up index count too, so we don't miss scanning one */
00282             i--;
00283         }
00284     }
00285 
00286     /* Check for conflicting GID */
00287     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00288     {
00289         gxact = TwoPhaseState->prepXacts[i];
00290         if (strcmp(gxact->gid, gid) == 0)
00291         {
00292             ereport(ERROR,
00293                     (errcode(ERRCODE_DUPLICATE_OBJECT),
00294                      errmsg("transaction identifier \"%s\" is already in use",
00295                             gid)));
00296         }
00297     }
00298 
00299     /* Get a free gxact from the freelist */
00300     if (TwoPhaseState->freeGXacts == NULL)
00301         ereport(ERROR,
00302                 (errcode(ERRCODE_OUT_OF_MEMORY),
00303                  errmsg("maximum number of prepared transactions reached"),
00304                  errhint("Increase max_prepared_transactions (currently %d).",
00305                          max_prepared_xacts)));
00306     gxact = TwoPhaseState->freeGXacts;
00307     TwoPhaseState->freeGXacts = gxact->next;
00308 
00309     proc = &ProcGlobal->allProcs[gxact->pgprocno];
00310     pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00311 
00312     /* Initialize the PGPROC entry */
00313     MemSet(proc, 0, sizeof(PGPROC));
00314     proc->pgprocno = gxact->pgprocno;
00315     SHMQueueElemInit(&(proc->links));
00316     proc->waitStatus = STATUS_OK;
00317     /* We set up the gxact's VXID as InvalidBackendId/XID */
00318     proc->lxid = (LocalTransactionId) xid;
00319     pgxact->xid = xid;
00320     pgxact->xmin = InvalidTransactionId;
00321     pgxact->delayChkpt = false;
00322     pgxact->vacuumFlags = 0;
00323     proc->pid = 0;
00324     proc->backendId = InvalidBackendId;
00325     proc->databaseId = databaseid;
00326     proc->roleId = owner;
00327     proc->lwWaiting = false;
00328     proc->lwWaitMode = 0;
00329     proc->lwWaitLink = NULL;
00330     proc->waitLock = NULL;
00331     proc->waitProcLock = NULL;
00332     for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
00333         SHMQueueInit(&(proc->myProcLocks[i]));
00334     /* subxid data must be filled later by GXactLoadSubxactData */
00335     pgxact->overflowed = false;
00336     pgxact->nxids = 0;
00337 
00338     gxact->prepared_at = prepared_at;
00339     /* initialize LSN to 0 (start of WAL) */
00340     gxact->prepare_lsn = 0;
00341     gxact->owner = owner;
00342     gxact->locking_xid = xid;
00343     gxact->valid = false;
00344     strcpy(gxact->gid, gid);
00345 
00346     /* And insert it into the active array */
00347     Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
00348     TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
00349 
00350     LWLockRelease(TwoPhaseStateLock);
00351 
00352     return gxact;
00353 }
00354 
00355 /*
00356  * GXactLoadSubxactData
00357  *
00358  * If the transaction being persisted had any subtransactions, this must
00359  * be called before MarkAsPrepared() to load information into the dummy
00360  * PGPROC.
00361  */
00362 static void
00363 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
00364                      TransactionId *children)
00365 {
00366     PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
00367     PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00368 
00369     /* We need no extra lock since the GXACT isn't valid yet */
00370     if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
00371     {
00372         pgxact->overflowed = true;
00373         nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
00374     }
00375     if (nsubxacts > 0)
00376     {
00377         memcpy(proc->subxids.xids, children,
00378                nsubxacts * sizeof(TransactionId));
00379         pgxact->nxids = nsubxacts;
00380     }
00381 }
00382 
00383 /*
00384  * MarkAsPrepared
00385  *      Mark the GXACT as fully valid, and enter it into the global ProcArray.
00386  */
00387 static void
00388 MarkAsPrepared(GlobalTransaction gxact)
00389 {
00390     /* Lock here may be overkill, but I'm not convinced of that ... */
00391     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
00392     Assert(!gxact->valid);
00393     gxact->valid = true;
00394     LWLockRelease(TwoPhaseStateLock);
00395 
00396     /*
00397      * Put it into the global ProcArray so TransactionIdIsInProgress considers
00398      * the XID as still running.
00399      */
00400     ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
00401 }
00402 
00403 /*
00404  * LockGXact
00405  *      Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
00406  */
00407 static GlobalTransaction
00408 LockGXact(const char *gid, Oid user)
00409 {
00410     int         i;
00411 
00412     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
00413 
00414     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00415     {
00416         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
00417         PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
00418 
00419         /* Ignore not-yet-valid GIDs */
00420         if (!gxact->valid)
00421             continue;
00422         if (strcmp(gxact->gid, gid) != 0)
00423             continue;
00424 
00425         /* Found it, but has someone else got it locked? */
00426         if (TransactionIdIsValid(gxact->locking_xid))
00427         {
00428             if (TransactionIdIsActive(gxact->locking_xid))
00429                 ereport(ERROR,
00430                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00431                 errmsg("prepared transaction with identifier \"%s\" is busy",
00432                        gid)));
00433             gxact->locking_xid = InvalidTransactionId;
00434         }
00435 
00436         if (user != gxact->owner && !superuser_arg(user))
00437             ereport(ERROR,
00438                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
00439                   errmsg("permission denied to finish prepared transaction"),
00440                      errhint("Must be superuser or the user that prepared the transaction.")));
00441 
00442         /*
00443          * Note: it probably would be possible to allow committing from
00444          * another database; but at the moment NOTIFY is known not to work and
00445          * there may be some other issues as well.  Hence disallow until
00446          * someone gets motivated to make it work.
00447          */
00448         if (MyDatabaseId != proc->databaseId)
00449             ereport(ERROR,
00450                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00451                   errmsg("prepared transaction belongs to another database"),
00452                      errhint("Connect to the database where the transaction was prepared to finish it.")));
00453 
00454         /* OK for me to lock it */
00455         gxact->locking_xid = GetTopTransactionId();
00456 
00457         LWLockRelease(TwoPhaseStateLock);
00458 
00459         return gxact;
00460     }
00461 
00462     LWLockRelease(TwoPhaseStateLock);
00463 
00464     ereport(ERROR,
00465             (errcode(ERRCODE_UNDEFINED_OBJECT),
00466          errmsg("prepared transaction with identifier \"%s\" does not exist",
00467                 gid)));
00468 
00469     /* NOTREACHED */
00470     return NULL;
00471 }
00472 
00473 /*
00474  * RemoveGXact
00475  *      Remove the prepared transaction from the shared memory array.
00476  *
00477  * NB: caller should have already removed it from ProcArray
00478  */
00479 static void
00480 RemoveGXact(GlobalTransaction gxact)
00481 {
00482     int         i;
00483 
00484     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
00485 
00486     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00487     {
00488         if (gxact == TwoPhaseState->prepXacts[i])
00489         {
00490             /* remove from the active array */
00491             TwoPhaseState->numPrepXacts--;
00492             TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
00493 
00494             /* and put it back in the freelist */
00495             gxact->next = TwoPhaseState->freeGXacts;
00496             TwoPhaseState->freeGXacts = gxact;
00497 
00498             LWLockRelease(TwoPhaseStateLock);
00499 
00500             return;
00501         }
00502     }
00503 
00504     LWLockRelease(TwoPhaseStateLock);
00505 
00506     elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
00507 }
00508 
00509 /*
00510  * TransactionIdIsPrepared
00511  *      True iff transaction associated with the identifier is prepared
00512  *      for two-phase commit
00513  *
00514  * Note: only gxacts marked "valid" are considered; but notice we do not
00515  * check the locking status.
00516  *
00517  * This is not currently exported, because it is only needed internally.
00518  */
00519 static bool
00520 TransactionIdIsPrepared(TransactionId xid)
00521 {
00522     bool        result = false;
00523     int         i;
00524 
00525     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
00526 
00527     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00528     {
00529         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
00530         PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00531 
00532         if (gxact->valid && pgxact->xid == xid)
00533         {
00534             result = true;
00535             break;
00536         }
00537     }
00538 
00539     LWLockRelease(TwoPhaseStateLock);
00540 
00541     return result;
00542 }
00543 
00544 /*
00545  * Returns an array of all prepared transactions for the user-level
00546  * function pg_prepared_xact.
00547  *
00548  * The returned array and all its elements are copies of internal data
00549  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
00550  *
00551  * WARNING -- we return even those transactions that are not fully prepared
00552  * yet.  The caller should filter them out if he doesn't want them.
00553  *
00554  * The returned array is palloc'd.
00555  */
00556 static int
00557 GetPreparedTransactionList(GlobalTransaction *gxacts)
00558 {
00559     GlobalTransaction array;
00560     int         num;
00561     int         i;
00562 
00563     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
00564 
00565     if (TwoPhaseState->numPrepXacts == 0)
00566     {
00567         LWLockRelease(TwoPhaseStateLock);
00568 
00569         *gxacts = NULL;
00570         return 0;
00571     }
00572 
00573     num = TwoPhaseState->numPrepXacts;
00574     array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
00575     *gxacts = array;
00576     for (i = 0; i < num; i++)
00577         memcpy(array + i, TwoPhaseState->prepXacts[i],
00578                sizeof(GlobalTransactionData));
00579 
00580     LWLockRelease(TwoPhaseStateLock);
00581 
00582     return num;
00583 }
00584 
00585 
00586 /* Working status for pg_prepared_xact */
00587 typedef struct
00588 {
00589     GlobalTransaction array;
00590     int         ngxacts;
00591     int         currIdx;
00592 } Working_State;
00593 
00594 /*
00595  * pg_prepared_xact
00596  *      Produce a view with one row per prepared transaction.
00597  *
00598  * This function is here so we don't have to export the
00599  * GlobalTransactionData struct definition.
00600  */
00601 Datum
00602 pg_prepared_xact(PG_FUNCTION_ARGS)
00603 {
00604     FuncCallContext *funcctx;
00605     Working_State *status;
00606 
00607     if (SRF_IS_FIRSTCALL())
00608     {
00609         TupleDesc   tupdesc;
00610         MemoryContext oldcontext;
00611 
00612         /* create a function context for cross-call persistence */
00613         funcctx = SRF_FIRSTCALL_INIT();
00614 
00615         /*
00616          * Switch to memory context appropriate for multiple function calls
00617          */
00618         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
00619 
00620         /* build tupdesc for result tuples */
00621         /* this had better match pg_prepared_xacts view in system_views.sql */
00622         tupdesc = CreateTemplateTupleDesc(5, false);
00623         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
00624                            XIDOID, -1, 0);
00625         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
00626                            TEXTOID, -1, 0);
00627         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
00628                            TIMESTAMPTZOID, -1, 0);
00629         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
00630                            OIDOID, -1, 0);
00631         TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
00632                            OIDOID, -1, 0);
00633 
00634         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
00635 
00636         /*
00637          * Collect all the 2PC status information that we will format and send
00638          * out as a result set.
00639          */
00640         status = (Working_State *) palloc(sizeof(Working_State));
00641         funcctx->user_fctx = (void *) status;
00642 
00643         status->ngxacts = GetPreparedTransactionList(&status->array);
00644         status->currIdx = 0;
00645 
00646         MemoryContextSwitchTo(oldcontext);
00647     }
00648 
00649     funcctx = SRF_PERCALL_SETUP();
00650     status = (Working_State *) funcctx->user_fctx;
00651 
00652     while (status->array != NULL && status->currIdx < status->ngxacts)
00653     {
00654         GlobalTransaction gxact = &status->array[status->currIdx++];
00655         PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
00656         PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00657         Datum       values[5];
00658         bool        nulls[5];
00659         HeapTuple   tuple;
00660         Datum       result;
00661 
00662         if (!gxact->valid)
00663             continue;
00664 
00665         /*
00666          * Form tuple with appropriate data.
00667          */
00668         MemSet(values, 0, sizeof(values));
00669         MemSet(nulls, 0, sizeof(nulls));
00670 
00671         values[0] = TransactionIdGetDatum(pgxact->xid);
00672         values[1] = CStringGetTextDatum(gxact->gid);
00673         values[2] = TimestampTzGetDatum(gxact->prepared_at);
00674         values[3] = ObjectIdGetDatum(gxact->owner);
00675         values[4] = ObjectIdGetDatum(proc->databaseId);
00676 
00677         tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
00678         result = HeapTupleGetDatum(tuple);
00679         SRF_RETURN_NEXT(funcctx, result);
00680     }
00681 
00682     SRF_RETURN_DONE(funcctx);
00683 }
00684 
00685 /*
00686  * TwoPhaseGetGXact
00687  *      Get the GlobalTransaction struct for a prepared transaction
00688  *      specified by XID
00689  */
00690 static GlobalTransaction
00691 TwoPhaseGetGXact(TransactionId xid)
00692 {
00693     GlobalTransaction result = NULL;
00694     int         i;
00695 
00696     static TransactionId cached_xid = InvalidTransactionId;
00697     static GlobalTransaction cached_gxact = NULL;
00698 
00699     /*
00700      * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
00701      * repeatedly for the same XID.  We can save work with a simple cache.
00702      */
00703     if (xid == cached_xid)
00704         return cached_gxact;
00705 
00706     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
00707 
00708     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
00709     {
00710         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
00711         PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00712 
00713         if (pgxact->xid == xid)
00714         {
00715             result = gxact;
00716             break;
00717         }
00718     }
00719 
00720     LWLockRelease(TwoPhaseStateLock);
00721 
00722     if (result == NULL)         /* should not happen */
00723         elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
00724 
00725     cached_xid = xid;
00726     cached_gxact = result;
00727 
00728     return result;
00729 }
00730 
00731 /*
00732  * TwoPhaseGetDummyProc
00733  *      Get the dummy backend ID for prepared transaction specified by XID
00734  *
00735  * Dummy backend IDs are similar to real backend IDs of real backends.
00736  * They start at MaxBackends + 1, and are unique across all currently active
00737  * real backends and prepared transactions.
00738  */
00739 BackendId
00740 TwoPhaseGetDummyBackendId(TransactionId xid)
00741 {
00742     GlobalTransaction gxact = TwoPhaseGetGXact(xid);
00743 
00744     return gxact->dummyBackendId;
00745 }
00746 
00747 /*
00748  * TwoPhaseGetDummyProc
00749  *      Get the PGPROC that represents a prepared transaction specified by XID
00750  */
00751 PGPROC *
00752 TwoPhaseGetDummyProc(TransactionId xid)
00753 {
00754     GlobalTransaction gxact = TwoPhaseGetGXact(xid);
00755 
00756     return &ProcGlobal->allProcs[gxact->pgprocno];
00757 }
00758 
00759 /************************************************************************/
00760 /* State file support                                                   */
00761 /************************************************************************/
00762 
00763 #define TwoPhaseFilePath(path, xid) \
00764     snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
00765 
00766 /*
00767  * 2PC state file format:
00768  *
00769  *  1. TwoPhaseFileHeader
00770  *  2. TransactionId[] (subtransactions)
00771  *  3. RelFileNode[] (files to be deleted at commit)
00772  *  4. RelFileNode[] (files to be deleted at abort)
00773  *  5. SharedInvalidationMessage[] (inval messages to be sent at commit)
00774  *  6. TwoPhaseRecordOnDisk
00775  *  7. ...
00776  *  8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
00777  *  9. CRC32
00778  *
00779  * Each segment except the final CRC32 is MAXALIGN'd.
00780  */
00781 
00782 /*
00783  * Header for a 2PC state file
00784  */
00785 #define TWOPHASE_MAGIC  0x57F94532      /* format identifier */
00786 
00787 typedef struct TwoPhaseFileHeader
00788 {
00789     uint32      magic;          /* format identifier */
00790     uint32      total_len;      /* actual file length */
00791     TransactionId xid;          /* original transaction XID */
00792     Oid         database;       /* OID of database it was in */
00793     TimestampTz prepared_at;    /* time of preparation */
00794     Oid         owner;          /* user running the transaction */
00795     int32       nsubxacts;      /* number of following subxact XIDs */
00796     int32       ncommitrels;    /* number of delete-on-commit rels */
00797     int32       nabortrels;     /* number of delete-on-abort rels */
00798     int32       ninvalmsgs;     /* number of cache invalidation messages */
00799     bool        initfileinval;  /* does relcache init file need invalidation? */
00800     char        gid[GIDSIZE];   /* GID for transaction */
00801 } TwoPhaseFileHeader;
00802 
00803 /*
00804  * Header for each record in a state file
00805  *
00806  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
00807  * The rmgr data will be stored starting on a MAXALIGN boundary.
00808  */
00809 typedef struct TwoPhaseRecordOnDisk
00810 {
00811     uint32      len;            /* length of rmgr data */
00812     TwoPhaseRmgrId rmid;        /* resource manager for this record */
00813     uint16      info;           /* flag bits for use by rmgr */
00814 } TwoPhaseRecordOnDisk;
00815 
00816 /*
00817  * During prepare, the state file is assembled in memory before writing it
00818  * to WAL and the actual state file.  We use a chain of XLogRecData blocks
00819  * so that we will be able to pass the state file contents directly to
00820  * XLogInsert.
00821  */
00822 static struct xllist
00823 {
00824     XLogRecData *head;          /* first data block in the chain */
00825     XLogRecData *tail;          /* last block in chain */
00826     uint32      bytes_free;     /* free bytes left in tail block */
00827     uint32      total_len;      /* total data bytes in chain */
00828 }   records;
00829 
00830 
00831 /*
00832  * Append a block of data to records data structure.
00833  *
00834  * NB: each block is padded to a MAXALIGN multiple.  This must be
00835  * accounted for when the file is later read!
00836  *
00837  * The data is copied, so the caller is free to modify it afterwards.
00838  */
00839 static void
00840 save_state_data(const void *data, uint32 len)
00841 {
00842     uint32      padlen = MAXALIGN(len);
00843 
00844     if (padlen > records.bytes_free)
00845     {
00846         records.tail->next = palloc0(sizeof(XLogRecData));
00847         records.tail = records.tail->next;
00848         records.tail->buffer = InvalidBuffer;
00849         records.tail->len = 0;
00850         records.tail->next = NULL;
00851 
00852         records.bytes_free = Max(padlen, 512);
00853         records.tail->data = palloc(records.bytes_free);
00854     }
00855 
00856     memcpy(((char *) records.tail->data) + records.tail->len, data, len);
00857     records.tail->len += padlen;
00858     records.bytes_free -= padlen;
00859     records.total_len += padlen;
00860 }
00861 
00862 /*
00863  * Start preparing a state file.
00864  *
00865  * Initializes data structure and inserts the 2PC file header record.
00866  */
00867 void
00868 StartPrepare(GlobalTransaction gxact)
00869 {
00870     PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
00871     PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00872     TransactionId xid = pgxact->xid;
00873     TwoPhaseFileHeader hdr;
00874     TransactionId *children;
00875     RelFileNode *commitrels;
00876     RelFileNode *abortrels;
00877     SharedInvalidationMessage *invalmsgs;
00878 
00879     /* Initialize linked list */
00880     records.head = palloc0(sizeof(XLogRecData));
00881     records.head->buffer = InvalidBuffer;
00882     records.head->len = 0;
00883     records.head->next = NULL;
00884 
00885     records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
00886     records.head->data = palloc(records.bytes_free);
00887 
00888     records.tail = records.head;
00889 
00890     records.total_len = 0;
00891 
00892     /* Create header */
00893     hdr.magic = TWOPHASE_MAGIC;
00894     hdr.total_len = 0;          /* EndPrepare will fill this in */
00895     hdr.xid = xid;
00896     hdr.database = proc->databaseId;
00897     hdr.prepared_at = gxact->prepared_at;
00898     hdr.owner = gxact->owner;
00899     hdr.nsubxacts = xactGetCommittedChildren(&children);
00900     hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
00901     hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
00902     hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
00903                                                           &hdr.initfileinval);
00904     StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
00905 
00906     save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
00907 
00908     /*
00909      * Add the additional info about subxacts, deletable files and cache
00910      * invalidation messages.
00911      */
00912     if (hdr.nsubxacts > 0)
00913     {
00914         save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
00915         /* While we have the child-xact data, stuff it in the gxact too */
00916         GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
00917     }
00918     if (hdr.ncommitrels > 0)
00919     {
00920         save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
00921         pfree(commitrels);
00922     }
00923     if (hdr.nabortrels > 0)
00924     {
00925         save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
00926         pfree(abortrels);
00927     }
00928     if (hdr.ninvalmsgs > 0)
00929     {
00930         save_state_data(invalmsgs,
00931                         hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
00932         pfree(invalmsgs);
00933     }
00934 }
00935 
00936 /*
00937  * Finish preparing state file.
00938  *
00939  * Calculates CRC and writes state file to WAL and in pg_twophase directory.
00940  */
00941 void
00942 EndPrepare(GlobalTransaction gxact)
00943 {
00944     PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
00945     TransactionId xid = pgxact->xid;
00946     TwoPhaseFileHeader *hdr;
00947     char        path[MAXPGPATH];
00948     XLogRecData *record;
00949     pg_crc32    statefile_crc;
00950     pg_crc32    bogus_crc;
00951     int         fd;
00952 
00953     /* Add the end sentinel to the list of 2PC records */
00954     RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
00955                            NULL, 0);
00956 
00957     /* Go back and fill in total_len in the file header record */
00958     hdr = (TwoPhaseFileHeader *) records.head->data;
00959     Assert(hdr->magic == TWOPHASE_MAGIC);
00960     hdr->total_len = records.total_len + sizeof(pg_crc32);
00961 
00962     /*
00963      * If the file size exceeds MaxAllocSize, we won't be able to read it in
00964      * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
00965      */
00966     if (hdr->total_len > MaxAllocSize)
00967         ereport(ERROR,
00968                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
00969                  errmsg("two-phase state file maximum length exceeded")));
00970 
00971     /*
00972      * Create the 2PC state file.
00973      */
00974     TwoPhaseFilePath(path, xid);
00975 
00976     fd = OpenTransientFile(path,
00977                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
00978                            S_IRUSR | S_IWUSR);
00979     if (fd < 0)
00980         ereport(ERROR,
00981                 (errcode_for_file_access(),
00982                  errmsg("could not create two-phase state file \"%s\": %m",
00983                         path)));
00984 
00985     /* Write data to file, and calculate CRC as we pass over it */
00986     INIT_CRC32(statefile_crc);
00987 
00988     for (record = records.head; record != NULL; record = record->next)
00989     {
00990         COMP_CRC32(statefile_crc, record->data, record->len);
00991         if ((write(fd, record->data, record->len)) != record->len)
00992         {
00993             CloseTransientFile(fd);
00994             ereport(ERROR,
00995                     (errcode_for_file_access(),
00996                      errmsg("could not write two-phase state file: %m")));
00997         }
00998     }
00999 
01000     FIN_CRC32(statefile_crc);
01001 
01002     /*
01003      * Write a deliberately bogus CRC to the state file; this is just paranoia
01004      * to catch the case where four more bytes will run us out of disk space.
01005      */
01006     bogus_crc = ~statefile_crc;
01007 
01008     if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
01009     {
01010         CloseTransientFile(fd);
01011         ereport(ERROR,
01012                 (errcode_for_file_access(),
01013                  errmsg("could not write two-phase state file: %m")));
01014     }
01015 
01016     /* Back up to prepare for rewriting the CRC */
01017     if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
01018     {
01019         CloseTransientFile(fd);
01020         ereport(ERROR,
01021                 (errcode_for_file_access(),
01022                  errmsg("could not seek in two-phase state file: %m")));
01023     }
01024 
01025     /*
01026      * The state file isn't valid yet, because we haven't written the correct
01027      * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
01028      *
01029      * Between the time we have written the WAL entry and the time we write
01030      * out the correct state file CRC, we have an inconsistency: the xact is
01031      * prepared according to WAL but not according to our on-disk state. We
01032      * use a critical section to force a PANIC if we are unable to complete
01033      * the write --- then, WAL replay should repair the inconsistency.  The
01034      * odds of a PANIC actually occurring should be very tiny given that we
01035      * were able to write the bogus CRC above.
01036      *
01037      * We have to set delayChkpt here, too; otherwise a checkpoint starting
01038      * immediately after the WAL record is inserted could complete without
01039      * fsync'ing our state file.  (This is essentially the same kind of race
01040      * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
01041      * uses delayChkpt for; see notes there.)
01042      *
01043      * We save the PREPARE record's location in the gxact for later use by
01044      * CheckPointTwoPhase.
01045      */
01046     START_CRIT_SECTION();
01047 
01048     MyPgXact->delayChkpt = true;
01049 
01050     gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
01051                                     records.head);
01052     XLogFlush(gxact->prepare_lsn);
01053 
01054     /* If we crash now, we have prepared: WAL replay will fix things */
01055 
01056     /* write correct CRC and close file */
01057     if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
01058     {
01059         CloseTransientFile(fd);
01060         ereport(ERROR,
01061                 (errcode_for_file_access(),
01062                  errmsg("could not write two-phase state file: %m")));
01063     }
01064 
01065     if (CloseTransientFile(fd) != 0)
01066         ereport(ERROR,
01067                 (errcode_for_file_access(),
01068                  errmsg("could not close two-phase state file: %m")));
01069 
01070     /*
01071      * Mark the prepared transaction as valid.  As soon as xact.c marks
01072      * MyPgXact as not running our XID (which it will do immediately after
01073      * this function returns), others can commit/rollback the xact.
01074      *
01075      * NB: a side effect of this is to make a dummy ProcArray entry for the
01076      * prepared XID.  This must happen before we clear the XID from MyPgXact,
01077      * else there is a window where the XID is not running according to
01078      * TransactionIdIsInProgress, and onlookers would be entitled to assume
01079      * the xact crashed.  Instead we have a window where the same XID appears
01080      * twice in ProcArray, which is OK.
01081      */
01082     MarkAsPrepared(gxact);
01083 
01084     /*
01085      * Now we can mark ourselves as out of the commit critical section: a
01086      * checkpoint starting after this will certainly see the gxact as a
01087      * candidate for fsyncing.
01088      */
01089     MyPgXact->delayChkpt = false;
01090 
01091     END_CRIT_SECTION();
01092 
01093     /*
01094      * Wait for synchronous replication, if required.
01095      *
01096      * Note that at this stage we have marked the prepare, but still show as
01097      * running in the procarray (twice!) and continue to hold locks.
01098      */
01099     SyncRepWaitForLSN(gxact->prepare_lsn);
01100 
01101     records.tail = records.head = NULL;
01102 }
01103 
01104 /*
01105  * Register a 2PC record to be written to state file.
01106  */
01107 void
01108 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
01109                        const void *data, uint32 len)
01110 {
01111     TwoPhaseRecordOnDisk record;
01112 
01113     record.rmid = rmid;
01114     record.info = info;
01115     record.len = len;
01116     save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
01117     if (len > 0)
01118         save_state_data(data, len);
01119 }
01120 
01121 
01122 /*
01123  * Read and validate the state file for xid.
01124  *
01125  * If it looks OK (has a valid magic number and CRC), return the palloc'd
01126  * contents of the file.  Otherwise return NULL.
01127  */
01128 static char *
01129 ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
01130 {
01131     char        path[MAXPGPATH];
01132     char       *buf;
01133     TwoPhaseFileHeader *hdr;
01134     int         fd;
01135     struct stat stat;
01136     uint32      crc_offset;
01137     pg_crc32    calc_crc,
01138                 file_crc;
01139 
01140     TwoPhaseFilePath(path, xid);
01141 
01142     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
01143     if (fd < 0)
01144     {
01145         if (give_warnings)
01146             ereport(WARNING,
01147                     (errcode_for_file_access(),
01148                      errmsg("could not open two-phase state file \"%s\": %m",
01149                             path)));
01150         return NULL;
01151     }
01152 
01153     /*
01154      * Check file length.  We can determine a lower bound pretty easily. We
01155      * set an upper bound to avoid palloc() failure on a corrupt file, though
01156      * we can't guarantee that we won't get an out of memory error anyway,
01157      * even on a valid file.
01158      */
01159     if (fstat(fd, &stat))
01160     {
01161         CloseTransientFile(fd);
01162         if (give_warnings)
01163             ereport(WARNING,
01164                     (errcode_for_file_access(),
01165                      errmsg("could not stat two-phase state file \"%s\": %m",
01166                             path)));
01167         return NULL;
01168     }
01169 
01170     if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
01171                         MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
01172                         sizeof(pg_crc32)) ||
01173         stat.st_size > MaxAllocSize)
01174     {
01175         CloseTransientFile(fd);
01176         return NULL;
01177     }
01178 
01179     crc_offset = stat.st_size - sizeof(pg_crc32);
01180     if (crc_offset != MAXALIGN(crc_offset))
01181     {
01182         CloseTransientFile(fd);
01183         return NULL;
01184     }
01185 
01186     /*
01187      * OK, slurp in the file.
01188      */
01189     buf = (char *) palloc(stat.st_size);
01190 
01191     if (read(fd, buf, stat.st_size) != stat.st_size)
01192     {
01193         CloseTransientFile(fd);
01194         if (give_warnings)
01195             ereport(WARNING,
01196                     (errcode_for_file_access(),
01197                      errmsg("could not read two-phase state file \"%s\": %m",
01198                             path)));
01199         pfree(buf);
01200         return NULL;
01201     }
01202 
01203     CloseTransientFile(fd);
01204 
01205     hdr = (TwoPhaseFileHeader *) buf;
01206     if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
01207     {
01208         pfree(buf);
01209         return NULL;
01210     }
01211 
01212     INIT_CRC32(calc_crc);
01213     COMP_CRC32(calc_crc, buf, crc_offset);
01214     FIN_CRC32(calc_crc);
01215 
01216     file_crc = *((pg_crc32 *) (buf + crc_offset));
01217 
01218     if (!EQ_CRC32(calc_crc, file_crc))
01219     {
01220         pfree(buf);
01221         return NULL;
01222     }
01223 
01224     return buf;
01225 }
01226 
01227 /*
01228  * Confirms an xid is prepared, during recovery
01229  */
01230 bool
01231 StandbyTransactionIdIsPrepared(TransactionId xid)
01232 {
01233     char       *buf;
01234     TwoPhaseFileHeader *hdr;
01235     bool        result;
01236 
01237     Assert(TransactionIdIsValid(xid));
01238 
01239     if (max_prepared_xacts <= 0)
01240         return false;           /* nothing to do */
01241 
01242     /* Read and validate file */
01243     buf = ReadTwoPhaseFile(xid, false);
01244     if (buf == NULL)
01245         return false;
01246 
01247     /* Check header also */
01248     hdr = (TwoPhaseFileHeader *) buf;
01249     result = TransactionIdEquals(hdr->xid, xid);
01250     pfree(buf);
01251 
01252     return result;
01253 }
01254 
01255 /*
01256  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
01257  */
01258 void
01259 FinishPreparedTransaction(const char *gid, bool isCommit)
01260 {
01261     GlobalTransaction gxact;
01262     PGPROC     *proc;
01263     PGXACT     *pgxact;
01264     TransactionId xid;
01265     char       *buf;
01266     char       *bufptr;
01267     TwoPhaseFileHeader *hdr;
01268     TransactionId latestXid;
01269     TransactionId *children;
01270     RelFileNode *commitrels;
01271     RelFileNode *abortrels;
01272     RelFileNode *delrels;
01273     int         ndelrels;
01274     SharedInvalidationMessage *invalmsgs;
01275     int         i;
01276 
01277     /*
01278      * Validate the GID, and lock the GXACT to ensure that two backends do not
01279      * try to commit the same GID at once.
01280      */
01281     gxact = LockGXact(gid, GetUserId());
01282     proc = &ProcGlobal->allProcs[gxact->pgprocno];
01283     pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
01284     xid = pgxact->xid;
01285 
01286     /*
01287      * Read and validate the state file
01288      */
01289     buf = ReadTwoPhaseFile(xid, true);
01290     if (buf == NULL)
01291         ereport(ERROR,
01292                 (errcode(ERRCODE_DATA_CORRUPTED),
01293                  errmsg("two-phase state file for transaction %u is corrupt",
01294                         xid)));
01295 
01296     /*
01297      * Disassemble the header area
01298      */
01299     hdr = (TwoPhaseFileHeader *) buf;
01300     Assert(TransactionIdEquals(hdr->xid, xid));
01301     bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
01302     children = (TransactionId *) bufptr;
01303     bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
01304     commitrels = (RelFileNode *) bufptr;
01305     bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
01306     abortrels = (RelFileNode *) bufptr;
01307     bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
01308     invalmsgs = (SharedInvalidationMessage *) bufptr;
01309     bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
01310 
01311     /* compute latestXid among all children */
01312     latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
01313 
01314     /*
01315      * The order of operations here is critical: make the XLOG entry for
01316      * commit or abort, then mark the transaction committed or aborted in
01317      * pg_clog, then remove its PGPROC from the global ProcArray (which means
01318      * TransactionIdIsInProgress will stop saying the prepared xact is in
01319      * progress), then run the post-commit or post-abort callbacks. The
01320      * callbacks will release the locks the transaction held.
01321      */
01322     if (isCommit)
01323         RecordTransactionCommitPrepared(xid,
01324                                         hdr->nsubxacts, children,
01325                                         hdr->ncommitrels, commitrels,
01326                                         hdr->ninvalmsgs, invalmsgs,
01327                                         hdr->initfileinval);
01328     else
01329         RecordTransactionAbortPrepared(xid,
01330                                        hdr->nsubxacts, children,
01331                                        hdr->nabortrels, abortrels);
01332 
01333     ProcArrayRemove(proc, latestXid);
01334 
01335     /*
01336      * In case we fail while running the callbacks, mark the gxact invalid so
01337      * no one else will try to commit/rollback, and so it can be recycled
01338      * properly later.  It is still locked by our XID so it won't go away yet.
01339      *
01340      * (We assume it's safe to do this without taking TwoPhaseStateLock.)
01341      */
01342     gxact->valid = false;
01343 
01344     /*
01345      * We have to remove any files that were supposed to be dropped. For
01346      * consistency with the regular xact.c code paths, must do this before
01347      * releasing locks, so do it before running the callbacks.
01348      *
01349      * NB: this code knows that we couldn't be dropping any temp rels ...
01350      */
01351     if (isCommit)
01352     {
01353         delrels = commitrels;
01354         ndelrels = hdr->ncommitrels;
01355     }
01356     else
01357     {
01358         delrels = abortrels;
01359         ndelrels = hdr->nabortrels;
01360     }
01361     for (i = 0; i < ndelrels; i++)
01362     {
01363         SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
01364 
01365         smgrdounlink(srel, false);
01366         smgrclose(srel);
01367     }
01368 
01369     /*
01370      * Handle cache invalidation messages.
01371      *
01372      * Relcache init file invalidation requires processing both before and
01373      * after we send the SI messages. See AtEOXact_Inval()
01374      */
01375     if (hdr->initfileinval)
01376         RelationCacheInitFilePreInvalidate();
01377     SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
01378     if (hdr->initfileinval)
01379         RelationCacheInitFilePostInvalidate();
01380 
01381     /* And now do the callbacks */
01382     if (isCommit)
01383         ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
01384     else
01385         ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
01386 
01387     PredicateLockTwoPhaseFinish(xid, isCommit);
01388 
01389     /* Count the prepared xact as committed or aborted */
01390     AtEOXact_PgStat(isCommit);
01391 
01392     /*
01393      * And now we can clean up our mess.
01394      */
01395     RemoveTwoPhaseFile(xid, true);
01396 
01397     RemoveGXact(gxact);
01398 
01399     pfree(buf);
01400 }
01401 
01402 /*
01403  * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
01404  * and call the indicated callbacks for each 2PC record.
01405  */
01406 static void
01407 ProcessRecords(char *bufptr, TransactionId xid,
01408                const TwoPhaseCallback callbacks[])
01409 {
01410     for (;;)
01411     {
01412         TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
01413 
01414         Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
01415         if (record->rmid == TWOPHASE_RM_END_ID)
01416             break;
01417 
01418         bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
01419 
01420         if (callbacks[record->rmid] != NULL)
01421             callbacks[record->rmid] (xid, record->info,
01422                                      (void *) bufptr, record->len);
01423 
01424         bufptr += MAXALIGN(record->len);
01425     }
01426 }
01427 
01428 /*
01429  * Remove the 2PC file for the specified XID.
01430  *
01431  * If giveWarning is false, do not complain about file-not-present;
01432  * this is an expected case during WAL replay.
01433  */
01434 void
01435 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
01436 {
01437     char        path[MAXPGPATH];
01438 
01439     TwoPhaseFilePath(path, xid);
01440     if (unlink(path))
01441         if (errno != ENOENT || giveWarning)
01442             ereport(WARNING,
01443                     (errcode_for_file_access(),
01444                    errmsg("could not remove two-phase state file \"%s\": %m",
01445                           path)));
01446 }
01447 
01448 /*
01449  * Recreates a state file. This is used in WAL replay.
01450  *
01451  * Note: content and len don't include CRC.
01452  */
01453 void
01454 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
01455 {
01456     char        path[MAXPGPATH];
01457     pg_crc32    statefile_crc;
01458     int         fd;
01459 
01460     /* Recompute CRC */
01461     INIT_CRC32(statefile_crc);
01462     COMP_CRC32(statefile_crc, content, len);
01463     FIN_CRC32(statefile_crc);
01464 
01465     TwoPhaseFilePath(path, xid);
01466 
01467     fd = OpenTransientFile(path,
01468                            O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
01469                            S_IRUSR | S_IWUSR);
01470     if (fd < 0)
01471         ereport(ERROR,
01472                 (errcode_for_file_access(),
01473                  errmsg("could not recreate two-phase state file \"%s\": %m",
01474                         path)));
01475 
01476     /* Write content and CRC */
01477     if (write(fd, content, len) != len)
01478     {
01479         CloseTransientFile(fd);
01480         ereport(ERROR,
01481                 (errcode_for_file_access(),
01482                  errmsg("could not write two-phase state file: %m")));
01483     }
01484     if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
01485     {
01486         CloseTransientFile(fd);
01487         ereport(ERROR,
01488                 (errcode_for_file_access(),
01489                  errmsg("could not write two-phase state file: %m")));
01490     }
01491 
01492     /*
01493      * We must fsync the file because the end-of-replay checkpoint will not do
01494      * so, there being no GXACT in shared memory yet to tell it to.
01495      */
01496     if (pg_fsync(fd) != 0)
01497     {
01498         CloseTransientFile(fd);
01499         ereport(ERROR,
01500                 (errcode_for_file_access(),
01501                  errmsg("could not fsync two-phase state file: %m")));
01502     }
01503 
01504     if (CloseTransientFile(fd) != 0)
01505         ereport(ERROR,
01506                 (errcode_for_file_access(),
01507                  errmsg("could not close two-phase state file: %m")));
01508 }
01509 
01510 /*
01511  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
01512  *
01513  * We must fsync the state file of any GXACT that is valid and has a PREPARE
01514  * LSN <= the checkpoint's redo horizon.  (If the gxact isn't valid yet or
01515  * has a later LSN, this checkpoint is not responsible for fsyncing it.)
01516  *
01517  * This is deliberately run as late as possible in the checkpoint sequence,
01518  * because GXACTs ordinarily have short lifespans, and so it is quite
01519  * possible that GXACTs that were valid at checkpoint start will no longer
01520  * exist if we wait a little bit.
01521  *
01522  * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
01523  * each time.  This is considered unusual enough that we don't bother to
01524  * expend any extra code to avoid the redundant fsyncs.  (They should be
01525  * reasonably cheap anyway, since they won't cause I/O.)
01526  */
01527 void
01528 CheckPointTwoPhase(XLogRecPtr redo_horizon)
01529 {
01530     TransactionId *xids;
01531     int         nxids;
01532     char        path[MAXPGPATH];
01533     int         i;
01534 
01535     /*
01536      * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
01537      * it just long enough to make a list of the XIDs that require fsyncing,
01538      * and then do the I/O afterwards.
01539      *
01540      * This approach creates a race condition: someone else could delete a
01541      * GXACT between the time we release TwoPhaseStateLock and the time we try
01542      * to open its state file.  We handle this by special-casing ENOENT
01543      * failures: if we see that, we verify that the GXACT is no longer valid,
01544      * and if so ignore the failure.
01545      */
01546     if (max_prepared_xacts <= 0)
01547         return;                 /* nothing to do */
01548 
01549     TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
01550 
01551     xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
01552     nxids = 0;
01553 
01554     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
01555 
01556     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
01557     {
01558         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
01559         PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
01560 
01561         if (gxact->valid &&
01562             gxact->prepare_lsn <= redo_horizon)
01563             xids[nxids++] = pgxact->xid;
01564     }
01565 
01566     LWLockRelease(TwoPhaseStateLock);
01567 
01568     for (i = 0; i < nxids; i++)
01569     {
01570         TransactionId xid = xids[i];
01571         int         fd;
01572 
01573         TwoPhaseFilePath(path, xid);
01574 
01575         fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
01576         if (fd < 0)
01577         {
01578             if (errno == ENOENT)
01579             {
01580                 /* OK if gxact is no longer valid */
01581                 if (!TransactionIdIsPrepared(xid))
01582                     continue;
01583                 /* Restore errno in case it was changed */
01584                 errno = ENOENT;
01585             }
01586             ereport(ERROR,
01587                     (errcode_for_file_access(),
01588                      errmsg("could not open two-phase state file \"%s\": %m",
01589                             path)));
01590         }
01591 
01592         if (pg_fsync(fd) != 0)
01593         {
01594             CloseTransientFile(fd);
01595             ereport(ERROR,
01596                     (errcode_for_file_access(),
01597                      errmsg("could not fsync two-phase state file \"%s\": %m",
01598                             path)));
01599         }
01600 
01601         if (CloseTransientFile(fd) != 0)
01602             ereport(ERROR,
01603                     (errcode_for_file_access(),
01604                      errmsg("could not close two-phase state file \"%s\": %m",
01605                             path)));
01606     }
01607 
01608     pfree(xids);
01609 
01610     TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
01611 }
01612 
01613 /*
01614  * PrescanPreparedTransactions
01615  *
01616  * Scan the pg_twophase directory and determine the range of valid XIDs
01617  * present.  This is run during database startup, after we have completed
01618  * reading WAL.  ShmemVariableCache->nextXid has been set to one more than
01619  * the highest XID for which evidence exists in WAL.
01620  *
01621  * We throw away any prepared xacts with main XID beyond nextXid --- if any
01622  * are present, it suggests that the DBA has done a PITR recovery to an
01623  * earlier point in time without cleaning out pg_twophase.  We dare not
01624  * try to recover such prepared xacts since they likely depend on database
01625  * state that doesn't exist now.
01626  *
01627  * However, we will advance nextXid beyond any subxact XIDs belonging to
01628  * valid prepared xacts.  We need to do this since subxact commit doesn't
01629  * write a WAL entry, and so there might be no evidence in WAL of those
01630  * subxact XIDs.
01631  *
01632  * Our other responsibility is to determine and return the oldest valid XID
01633  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
01634  * This is needed to synchronize pg_subtrans startup properly.
01635  *
01636  * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
01637  * top-level xids is stored in *xids_p. The number of entries in the array
01638  * is returned in *nxids_p.
01639  */
01640 TransactionId
01641 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
01642 {
01643     TransactionId origNextXid = ShmemVariableCache->nextXid;
01644     TransactionId result = origNextXid;
01645     DIR        *cldir;
01646     struct dirent *clde;
01647     TransactionId *xids = NULL;
01648     int         nxids = 0;
01649     int         allocsize = 0;
01650 
01651     cldir = AllocateDir(TWOPHASE_DIR);
01652     while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
01653     {
01654         if (strlen(clde->d_name) == 8 &&
01655             strspn(clde->d_name, "0123456789ABCDEF") == 8)
01656         {
01657             TransactionId xid;
01658             char       *buf;
01659             TwoPhaseFileHeader *hdr;
01660             TransactionId *subxids;
01661             int         i;
01662 
01663             xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
01664 
01665             /* Reject XID if too new */
01666             if (TransactionIdFollowsOrEquals(xid, origNextXid))
01667             {
01668                 ereport(WARNING,
01669                         (errmsg("removing future two-phase state file \"%s\"",
01670                                 clde->d_name)));
01671                 RemoveTwoPhaseFile(xid, true);
01672                 continue;
01673             }
01674 
01675             /*
01676              * Note: we can't check if already processed because clog
01677              * subsystem isn't up yet.
01678              */
01679 
01680             /* Read and validate file */
01681             buf = ReadTwoPhaseFile(xid, true);
01682             if (buf == NULL)
01683             {
01684                 ereport(WARNING,
01685                       (errmsg("removing corrupt two-phase state file \"%s\"",
01686                               clde->d_name)));
01687                 RemoveTwoPhaseFile(xid, true);
01688                 continue;
01689             }
01690 
01691             /* Deconstruct header */
01692             hdr = (TwoPhaseFileHeader *) buf;
01693             if (!TransactionIdEquals(hdr->xid, xid))
01694             {
01695                 ereport(WARNING,
01696                       (errmsg("removing corrupt two-phase state file \"%s\"",
01697                               clde->d_name)));
01698                 RemoveTwoPhaseFile(xid, true);
01699                 pfree(buf);
01700                 continue;
01701             }
01702 
01703             /*
01704              * OK, we think this file is valid.  Incorporate xid into the
01705              * running-minimum result.
01706              */
01707             if (TransactionIdPrecedes(xid, result))
01708                 result = xid;
01709 
01710             /*
01711              * Examine subtransaction XIDs ... they should all follow main
01712              * XID, and they may force us to advance nextXid.
01713              *
01714              * We don't expect anyone else to modify nextXid, hence we don't
01715              * need to hold a lock while examining it.  We still acquire the
01716              * lock to modify it, though.
01717              */
01718             subxids = (TransactionId *)
01719                 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
01720             for (i = 0; i < hdr->nsubxacts; i++)
01721             {
01722                 TransactionId subxid = subxids[i];
01723 
01724                 Assert(TransactionIdFollows(subxid, xid));
01725                 if (TransactionIdFollowsOrEquals(subxid,
01726                                                  ShmemVariableCache->nextXid))
01727                 {
01728                     LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
01729                     ShmemVariableCache->nextXid = subxid;
01730                     TransactionIdAdvance(ShmemVariableCache->nextXid);
01731                     LWLockRelease(XidGenLock);
01732                 }
01733             }
01734 
01735 
01736             if (xids_p)
01737             {
01738                 if (nxids == allocsize)
01739                 {
01740                     if (nxids == 0)
01741                     {
01742                         allocsize = 10;
01743                         xids = palloc(allocsize * sizeof(TransactionId));
01744                     }
01745                     else
01746                     {
01747                         allocsize = allocsize * 2;
01748                         xids = repalloc(xids, allocsize * sizeof(TransactionId));
01749                     }
01750                 }
01751                 xids[nxids++] = xid;
01752             }
01753 
01754             pfree(buf);
01755         }
01756     }
01757     FreeDir(cldir);
01758 
01759     if (xids_p)
01760     {
01761         *xids_p = xids;
01762         *nxids_p = nxids;
01763     }
01764 
01765     return result;
01766 }
01767 
01768 /*
01769  * StandbyRecoverPreparedTransactions
01770  *
01771  * Scan the pg_twophase directory and setup all the required information to
01772  * allow standby queries to treat prepared transactions as still active.
01773  * This is never called at the end of recovery - we use
01774  * RecoverPreparedTransactions() at that point.
01775  *
01776  * Currently we simply call SubTransSetParent() for any subxids of prepared
01777  * transactions. If overwriteOK is true, it's OK if some XIDs have already
01778  * been marked in pg_subtrans.
01779  */
01780 void
01781 StandbyRecoverPreparedTransactions(bool overwriteOK)
01782 {
01783     DIR        *cldir;
01784     struct dirent *clde;
01785 
01786     cldir = AllocateDir(TWOPHASE_DIR);
01787     while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
01788     {
01789         if (strlen(clde->d_name) == 8 &&
01790             strspn(clde->d_name, "0123456789ABCDEF") == 8)
01791         {
01792             TransactionId xid;
01793             char       *buf;
01794             TwoPhaseFileHeader *hdr;
01795             TransactionId *subxids;
01796             int         i;
01797 
01798             xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
01799 
01800             /* Already processed? */
01801             if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
01802             {
01803                 ereport(WARNING,
01804                         (errmsg("removing stale two-phase state file \"%s\"",
01805                                 clde->d_name)));
01806                 RemoveTwoPhaseFile(xid, true);
01807                 continue;
01808             }
01809 
01810             /* Read and validate file */
01811             buf = ReadTwoPhaseFile(xid, true);
01812             if (buf == NULL)
01813             {
01814                 ereport(WARNING,
01815                       (errmsg("removing corrupt two-phase state file \"%s\"",
01816                               clde->d_name)));
01817                 RemoveTwoPhaseFile(xid, true);
01818                 continue;
01819             }
01820 
01821             /* Deconstruct header */
01822             hdr = (TwoPhaseFileHeader *) buf;
01823             if (!TransactionIdEquals(hdr->xid, xid))
01824             {
01825                 ereport(WARNING,
01826                       (errmsg("removing corrupt two-phase state file \"%s\"",
01827                               clde->d_name)));
01828                 RemoveTwoPhaseFile(xid, true);
01829                 pfree(buf);
01830                 continue;
01831             }
01832 
01833             /*
01834              * Examine subtransaction XIDs ... they should all follow main
01835              * XID.
01836              */
01837             subxids = (TransactionId *)
01838                 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
01839             for (i = 0; i < hdr->nsubxacts; i++)
01840             {
01841                 TransactionId subxid = subxids[i];
01842 
01843                 Assert(TransactionIdFollows(subxid, xid));
01844                 SubTransSetParent(xid, subxid, overwriteOK);
01845             }
01846         }
01847     }
01848     FreeDir(cldir);
01849 }
01850 
01851 /*
01852  * RecoverPreparedTransactions
01853  *
01854  * Scan the pg_twophase directory and reload shared-memory state for each
01855  * prepared transaction (reacquire locks, etc).  This is run during database
01856  * startup.
01857  */
01858 void
01859 RecoverPreparedTransactions(void)
01860 {
01861     char        dir[MAXPGPATH];
01862     DIR        *cldir;
01863     struct dirent *clde;
01864     bool        overwriteOK = false;
01865 
01866     snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
01867 
01868     cldir = AllocateDir(dir);
01869     while ((clde = ReadDir(cldir, dir)) != NULL)
01870     {
01871         if (strlen(clde->d_name) == 8 &&
01872             strspn(clde->d_name, "0123456789ABCDEF") == 8)
01873         {
01874             TransactionId xid;
01875             char       *buf;
01876             char       *bufptr;
01877             TwoPhaseFileHeader *hdr;
01878             TransactionId *subxids;
01879             GlobalTransaction gxact;
01880             int         i;
01881 
01882             xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
01883 
01884             /* Already processed? */
01885             if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
01886             {
01887                 ereport(WARNING,
01888                         (errmsg("removing stale two-phase state file \"%s\"",
01889                                 clde->d_name)));
01890                 RemoveTwoPhaseFile(xid, true);
01891                 continue;
01892             }
01893 
01894             /* Read and validate file */
01895             buf = ReadTwoPhaseFile(xid, true);
01896             if (buf == NULL)
01897             {
01898                 ereport(WARNING,
01899                       (errmsg("removing corrupt two-phase state file \"%s\"",
01900                               clde->d_name)));
01901                 RemoveTwoPhaseFile(xid, true);
01902                 continue;
01903             }
01904 
01905             ereport(LOG,
01906                     (errmsg("recovering prepared transaction %u", xid)));
01907 
01908             /* Deconstruct header */
01909             hdr = (TwoPhaseFileHeader *) buf;
01910             Assert(TransactionIdEquals(hdr->xid, xid));
01911             bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
01912             subxids = (TransactionId *) bufptr;
01913             bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
01914             bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
01915             bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
01916             bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
01917 
01918             /*
01919              * It's possible that SubTransSetParent has been set before, if
01920              * the prepared transaction generated xid assignment records. Test
01921              * here must match one used in AssignTransactionId().
01922              */
01923             if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
01924                 overwriteOK = true;
01925 
01926             /*
01927              * Reconstruct subtrans state for the transaction --- needed
01928              * because pg_subtrans is not preserved over a restart.  Note that
01929              * we are linking all the subtransactions directly to the
01930              * top-level XID; there may originally have been a more complex
01931              * hierarchy, but there's no need to restore that exactly.
01932              */
01933             for (i = 0; i < hdr->nsubxacts; i++)
01934                 SubTransSetParent(subxids[i], xid, overwriteOK);
01935 
01936             /*
01937              * Recreate its GXACT and dummy PGPROC
01938              *
01939              * Note: since we don't have the PREPARE record's WAL location at
01940              * hand, we leave prepare_lsn zeroes.  This means the GXACT will
01941              * be fsync'd on every future checkpoint.  We assume this
01942              * situation is infrequent enough that the performance cost is
01943              * negligible (especially since we know the state file has already
01944              * been fsynced).
01945              */
01946             gxact = MarkAsPreparing(xid, hdr->gid,
01947                                     hdr->prepared_at,
01948                                     hdr->owner, hdr->database);
01949             GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
01950             MarkAsPrepared(gxact);
01951 
01952             /*
01953              * Recover other state (notably locks) using resource managers
01954              */
01955             ProcessRecords(bufptr, xid, twophase_recover_callbacks);
01956 
01957             /*
01958              * Release locks held by the standby process after we process each
01959              * prepared transaction. As a result, we don't need too many
01960              * additional locks at any one time.
01961              */
01962             if (InHotStandby)
01963                 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
01964 
01965             pfree(buf);
01966         }
01967     }
01968     FreeDir(cldir);
01969 }
01970 
01971 /*
01972  *  RecordTransactionCommitPrepared
01973  *
01974  * This is basically the same as RecordTransactionCommit: in particular,
01975  * we must set the delayChkpt flag to avoid a race condition.
01976  *
01977  * We know the transaction made at least one XLOG entry (its PREPARE),
01978  * so it is never possible to optimize out the commit record.
01979  */
01980 static void
01981 RecordTransactionCommitPrepared(TransactionId xid,
01982                                 int nchildren,
01983                                 TransactionId *children,
01984                                 int nrels,
01985                                 RelFileNode *rels,
01986                                 int ninvalmsgs,
01987                                 SharedInvalidationMessage *invalmsgs,
01988                                 bool initfileinval)
01989 {
01990     XLogRecData rdata[4];
01991     int         lastrdata = 0;
01992     xl_xact_commit_prepared xlrec;
01993     XLogRecPtr  recptr;
01994 
01995     START_CRIT_SECTION();
01996 
01997     /* See notes in RecordTransactionCommit */
01998     MyPgXact->delayChkpt = true;
01999 
02000     /* Emit the XLOG commit record */
02001     xlrec.xid = xid;
02002     xlrec.crec.xact_time = GetCurrentTimestamp();
02003     xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
02004     xlrec.crec.nmsgs = 0;
02005     xlrec.crec.nrels = nrels;
02006     xlrec.crec.nsubxacts = nchildren;
02007     xlrec.crec.nmsgs = ninvalmsgs;
02008 
02009     rdata[0].data = (char *) (&xlrec);
02010     rdata[0].len = MinSizeOfXactCommitPrepared;
02011     rdata[0].buffer = InvalidBuffer;
02012     /* dump rels to delete */
02013     if (nrels > 0)
02014     {
02015         rdata[0].next = &(rdata[1]);
02016         rdata[1].data = (char *) rels;
02017         rdata[1].len = nrels * sizeof(RelFileNode);
02018         rdata[1].buffer = InvalidBuffer;
02019         lastrdata = 1;
02020     }
02021     /* dump committed child Xids */
02022     if (nchildren > 0)
02023     {
02024         rdata[lastrdata].next = &(rdata[2]);
02025         rdata[2].data = (char *) children;
02026         rdata[2].len = nchildren * sizeof(TransactionId);
02027         rdata[2].buffer = InvalidBuffer;
02028         lastrdata = 2;
02029     }
02030     /* dump cache invalidation messages */
02031     if (ninvalmsgs > 0)
02032     {
02033         rdata[lastrdata].next = &(rdata[3]);
02034         rdata[3].data = (char *) invalmsgs;
02035         rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
02036         rdata[3].buffer = InvalidBuffer;
02037         lastrdata = 3;
02038     }
02039     rdata[lastrdata].next = NULL;
02040 
02041     recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
02042 
02043     /*
02044      * We don't currently try to sleep before flush here ... nor is there any
02045      * support for async commit of a prepared xact (the very idea is probably
02046      * a contradiction)
02047      */
02048 
02049     /* Flush XLOG to disk */
02050     XLogFlush(recptr);
02051 
02052     /* Mark the transaction committed in pg_clog */
02053     TransactionIdCommitTree(xid, nchildren, children);
02054 
02055     /* Checkpoint can proceed now */
02056     MyPgXact->delayChkpt = false;
02057 
02058     END_CRIT_SECTION();
02059 
02060     /*
02061      * Wait for synchronous replication, if required.
02062      *
02063      * Note that at this stage we have marked clog, but still show as running
02064      * in the procarray and continue to hold locks.
02065      */
02066     SyncRepWaitForLSN(recptr);
02067 }
02068 
02069 /*
02070  *  RecordTransactionAbortPrepared
02071  *
02072  * This is basically the same as RecordTransactionAbort.
02073  *
02074  * We know the transaction made at least one XLOG entry (its PREPARE),
02075  * so it is never possible to optimize out the abort record.
02076  */
02077 static void
02078 RecordTransactionAbortPrepared(TransactionId xid,
02079                                int nchildren,
02080                                TransactionId *children,
02081                                int nrels,
02082                                RelFileNode *rels)
02083 {
02084     XLogRecData rdata[3];
02085     int         lastrdata = 0;
02086     xl_xact_abort_prepared xlrec;
02087     XLogRecPtr  recptr;
02088 
02089     /*
02090      * Catch the scenario where we aborted partway through
02091      * RecordTransactionCommitPrepared ...
02092      */
02093     if (TransactionIdDidCommit(xid))
02094         elog(PANIC, "cannot abort transaction %u, it was already committed",
02095              xid);
02096 
02097     START_CRIT_SECTION();
02098 
02099     /* Emit the XLOG abort record */
02100     xlrec.xid = xid;
02101     xlrec.arec.xact_time = GetCurrentTimestamp();
02102     xlrec.arec.nrels = nrels;
02103     xlrec.arec.nsubxacts = nchildren;
02104     rdata[0].data = (char *) (&xlrec);
02105     rdata[0].len = MinSizeOfXactAbortPrepared;
02106     rdata[0].buffer = InvalidBuffer;
02107     /* dump rels to delete */
02108     if (nrels > 0)
02109     {
02110         rdata[0].next = &(rdata[1]);
02111         rdata[1].data = (char *) rels;
02112         rdata[1].len = nrels * sizeof(RelFileNode);
02113         rdata[1].buffer = InvalidBuffer;
02114         lastrdata = 1;
02115     }
02116     /* dump committed child Xids */
02117     if (nchildren > 0)
02118     {
02119         rdata[lastrdata].next = &(rdata[2]);
02120         rdata[2].data = (char *) children;
02121         rdata[2].len = nchildren * sizeof(TransactionId);
02122         rdata[2].buffer = InvalidBuffer;
02123         lastrdata = 2;
02124     }
02125     rdata[lastrdata].next = NULL;
02126 
02127     recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
02128 
02129     /* Always flush, since we're about to remove the 2PC state file */
02130     XLogFlush(recptr);
02131 
02132     /*
02133      * Mark the transaction aborted in clog.  This is not absolutely necessary
02134      * but we may as well do it while we are here.
02135      */
02136     TransactionIdAbortTree(xid, nchildren, children);
02137 
02138     END_CRIT_SECTION();
02139 
02140     /*
02141      * Wait for synchronous replication, if required.
02142      *
02143      * Note that at this stage we have marked clog, but still show as running
02144      * in the procarray and continue to hold locks.
02145      */
02146     SyncRepWaitForLSN(recptr);
02147 }