Header And Logo

PostgreSQL
| The world's most advanced open source database.

txid.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  * txid.c
00003  *
00004  *  Export internal transaction IDs to user level.
00005  *
00006  * Note that only top-level transaction IDs are ever converted to TXID.
00007  * This is important because TXIDs frequently persist beyond the global
00008  * xmin horizon, or may even be shipped to other machines, so we cannot
00009  * rely on being able to correlate subtransaction IDs with their parents
00010  * via functions such as SubTransGetTopmostTransaction().
00011  *
00012  *
00013  *  Copyright (c) 2003-2013, PostgreSQL Global Development Group
00014  *  Author: Jan Wieck, Afilias USA INC.
00015  *  64-bit txids: Marko Kreen, Skype Technologies
00016  *
00017  *  src/backend/utils/adt/txid.c
00018  *
00019  *-------------------------------------------------------------------------
00020  */
00021 
00022 #include "postgres.h"
00023 
00024 #include "access/transam.h"
00025 #include "access/xact.h"
00026 #include "funcapi.h"
00027 #include "miscadmin.h"
00028 #include "libpq/pqformat.h"
00029 #include "utils/builtins.h"
00030 #include "utils/snapmgr.h"
00031 
00032 
00033 /* txid will be signed int8 in database, so must limit to 63 bits */
00034 #define MAX_TXID   UINT64CONST(0x7FFFFFFFFFFFFFFF)
00035 
00036 /* Use unsigned variant internally */
00037 typedef uint64 txid;
00038 
00039 /* sprintf format code for uint64 */
00040 #define TXID_FMT UINT64_FORMAT
00041 
00042 /*
00043  * If defined, use bsearch() function for searching for txids in snapshots
00044  * that have more than the specified number of values.
00045  */
00046 #define USE_BSEARCH_IF_NXIP_GREATER 30
00047 
00048 
00049 /*
00050  * Snapshot containing 8byte txids.
00051  */
00052 typedef struct
00053 {
00054     /*
00055      * 4-byte length hdr, should not be touched directly.
00056      *
00057      * Explicit embedding is ok as we want always correct alignment anyway.
00058      */
00059     int32       __varsz;
00060 
00061     uint32      nxip;           /* number of txids in xip array */
00062     txid        xmin;
00063     txid        xmax;
00064     txid        xip[1];         /* in-progress txids, xmin <= xip[i] < xmax */
00065 } TxidSnapshot;
00066 
00067 #define TXID_SNAPSHOT_SIZE(nxip) \
00068     (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
00069 
00070 /*
00071  * Epoch values from xact.c
00072  */
00073 typedef struct
00074 {
00075     TransactionId last_xid;
00076     uint32      epoch;
00077 } TxidEpoch;
00078 
00079 
00080 /*
00081  * Fetch epoch data from xact.c.
00082  */
00083 static void
00084 load_xid_epoch(TxidEpoch *state)
00085 {
00086     GetNextXidAndEpoch(&state->last_xid, &state->epoch);
00087 }
00088 
00089 /*
00090  * do a TransactionId -> txid conversion for an XID near the given epoch
00091  */
00092 static txid
00093 convert_xid(TransactionId xid, const TxidEpoch *state)
00094 {
00095     uint64      epoch;
00096 
00097     /* return special xid's as-is */
00098     if (!TransactionIdIsNormal(xid))
00099         return (txid) xid;
00100 
00101     /* xid can be on either side when near wrap-around */
00102     epoch = (uint64) state->epoch;
00103     if (xid > state->last_xid &&
00104         TransactionIdPrecedes(xid, state->last_xid))
00105         epoch--;
00106     else if (xid < state->last_xid &&
00107              TransactionIdFollows(xid, state->last_xid))
00108         epoch++;
00109 
00110     return (epoch << 32) | xid;
00111 }
00112 
00113 /*
00114  * txid comparator for qsort/bsearch
00115  */
00116 static int
00117 cmp_txid(const void *aa, const void *bb)
00118 {
00119     txid        a = *(const txid *) aa;
00120     txid        b = *(const txid *) bb;
00121 
00122     if (a < b)
00123         return -1;
00124     if (a > b)
00125         return 1;
00126     return 0;
00127 }
00128 
00129 /*
00130  * sort a snapshot's txids, so we can use bsearch() later.
00131  *
00132  * For consistency of on-disk representation, we always sort even if bsearch
00133  * will not be used.
00134  */
00135 static void
00136 sort_snapshot(TxidSnapshot *snap)
00137 {
00138     if (snap->nxip > 1)
00139         qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
00140 }
00141 
00142 /*
00143  * check txid visibility.
00144  */
00145 static bool
00146 is_visible_txid(txid value, const TxidSnapshot *snap)
00147 {
00148     if (value < snap->xmin)
00149         return true;
00150     else if (value >= snap->xmax)
00151         return false;
00152 #ifdef USE_BSEARCH_IF_NXIP_GREATER
00153     else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
00154     {
00155         void       *res;
00156 
00157         res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
00158         /* if found, transaction is still in progress */
00159         return (res) ? false : true;
00160     }
00161 #endif
00162     else
00163     {
00164         uint32      i;
00165 
00166         for (i = 0; i < snap->nxip; i++)
00167         {
00168             if (value == snap->xip[i])
00169                 return false;
00170         }
00171         return true;
00172     }
00173 }
00174 
00175 /*
00176  * helper functions to use StringInfo for TxidSnapshot creation.
00177  */
00178 
00179 static StringInfo
00180 buf_init(txid xmin, txid xmax)
00181 {
00182     TxidSnapshot snap;
00183     StringInfo  buf;
00184 
00185     snap.xmin = xmin;
00186     snap.xmax = xmax;
00187     snap.nxip = 0;
00188 
00189     buf = makeStringInfo();
00190     appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
00191     return buf;
00192 }
00193 
00194 static void
00195 buf_add_txid(StringInfo buf, txid xid)
00196 {
00197     TxidSnapshot *snap = (TxidSnapshot *) buf->data;
00198 
00199     /* do this before possible realloc */
00200     snap->nxip++;
00201 
00202     appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
00203 }
00204 
00205 static TxidSnapshot *
00206 buf_finalize(StringInfo buf)
00207 {
00208     TxidSnapshot *snap = (TxidSnapshot *) buf->data;
00209 
00210     SET_VARSIZE(snap, buf->len);
00211 
00212     /* buf is not needed anymore */
00213     buf->data = NULL;
00214     pfree(buf);
00215 
00216     return snap;
00217 }
00218 
00219 /*
00220  * simple number parser.
00221  *
00222  * We return 0 on error, which is invalid value for txid.
00223  */
00224 static txid
00225 str2txid(const char *s, const char **endp)
00226 {
00227     txid        val = 0;
00228     txid        cutoff = MAX_TXID / 10;
00229     txid        cutlim = MAX_TXID % 10;
00230 
00231     for (; *s; s++)
00232     {
00233         unsigned    d;
00234 
00235         if (*s < '0' || *s > '9')
00236             break;
00237         d = *s - '0';
00238 
00239         /*
00240          * check for overflow
00241          */
00242         if (val > cutoff || (val == cutoff && d > cutlim))
00243         {
00244             val = 0;
00245             break;
00246         }
00247 
00248         val = val * 10 + d;
00249     }
00250     if (endp)
00251         *endp = s;
00252     return val;
00253 }
00254 
00255 /*
00256  * parse snapshot from cstring
00257  */
00258 static TxidSnapshot *
00259 parse_snapshot(const char *str)
00260 {
00261     txid        xmin;
00262     txid        xmax;
00263     txid        last_val = 0,
00264                 val;
00265     const char *str_start = str;
00266     const char *endp;
00267     StringInfo  buf;
00268 
00269     xmin = str2txid(str, &endp);
00270     if (*endp != ':')
00271         goto bad_format;
00272     str = endp + 1;
00273 
00274     xmax = str2txid(str, &endp);
00275     if (*endp != ':')
00276         goto bad_format;
00277     str = endp + 1;
00278 
00279     /* it should look sane */
00280     if (xmin == 0 || xmax == 0 || xmin > xmax)
00281         goto bad_format;
00282 
00283     /* allocate buffer */
00284     buf = buf_init(xmin, xmax);
00285 
00286     /* loop over values */
00287     while (*str != '\0')
00288     {
00289         /* read next value */
00290         val = str2txid(str, &endp);
00291         str = endp;
00292 
00293         /* require the input to be in order */
00294         if (val < xmin || val >= xmax || val <= last_val)
00295             goto bad_format;
00296 
00297         buf_add_txid(buf, val);
00298         last_val = val;
00299 
00300         if (*str == ',')
00301             str++;
00302         else if (*str != '\0')
00303             goto bad_format;
00304     }
00305 
00306     return buf_finalize(buf);
00307 
00308 bad_format:
00309     elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
00310     return NULL;
00311 }
00312 
00313 /*
00314  * Public functions.
00315  *
00316  * txid_current() and txid_current_snapshot() are the only ones that
00317  * communicate with core xid machinery.  All the others work on data
00318  * returned by them.
00319  */
00320 
00321 /*
00322  * txid_current() returns int8
00323  *
00324  *  Return the current toplevel transaction ID as TXID
00325  *  If the current transaction does not have one, one is assigned.
00326  */
00327 Datum
00328 txid_current(PG_FUNCTION_ARGS)
00329 {
00330     txid        val;
00331     TxidEpoch   state;
00332 
00333     /*
00334      * Must prevent during recovery because if an xid is not assigned we try
00335      * to assign one, which would fail. Programs already rely on this function
00336      * to always return a valid current xid, so we should not change this to
00337      * return NULL or similar invalid xid.
00338      */
00339     PreventCommandDuringRecovery("txid_current()");
00340 
00341     load_xid_epoch(&state);
00342 
00343     val = convert_xid(GetTopTransactionId(), &state);
00344 
00345     PG_RETURN_INT64(val);
00346 }
00347 
00348 /*
00349  * txid_current_snapshot() returns txid_snapshot
00350  *
00351  *      Return current snapshot in TXID format
00352  *
00353  * Note that only top-transaction XIDs are included in the snapshot.
00354  */
00355 Datum
00356 txid_current_snapshot(PG_FUNCTION_ARGS)
00357 {
00358     TxidSnapshot *snap;
00359     uint32      nxip,
00360                 i,
00361                 size;
00362     TxidEpoch   state;
00363     Snapshot    cur;
00364 
00365     cur = GetActiveSnapshot();
00366     if (cur == NULL)
00367         elog(ERROR, "no active snapshot set");
00368 
00369     load_xid_epoch(&state);
00370 
00371     /* allocate */
00372     nxip = cur->xcnt;
00373     size = TXID_SNAPSHOT_SIZE(nxip);
00374     snap = palloc(size);
00375     SET_VARSIZE(snap, size);
00376 
00377     /* fill */
00378     snap->xmin = convert_xid(cur->xmin, &state);
00379     snap->xmax = convert_xid(cur->xmax, &state);
00380     snap->nxip = nxip;
00381     for (i = 0; i < nxip; i++)
00382         snap->xip[i] = convert_xid(cur->xip[i], &state);
00383 
00384     /* we want them guaranteed to be in ascending order */
00385     sort_snapshot(snap);
00386 
00387     PG_RETURN_POINTER(snap);
00388 }
00389 
00390 /*
00391  * txid_snapshot_in(cstring) returns txid_snapshot
00392  *
00393  *      input function for type txid_snapshot
00394  */
00395 Datum
00396 txid_snapshot_in(PG_FUNCTION_ARGS)
00397 {
00398     char       *str = PG_GETARG_CSTRING(0);
00399     TxidSnapshot *snap;
00400 
00401     snap = parse_snapshot(str);
00402 
00403     PG_RETURN_POINTER(snap);
00404 }
00405 
00406 /*
00407  * txid_snapshot_out(txid_snapshot) returns cstring
00408  *
00409  *      output function for type txid_snapshot
00410  */
00411 Datum
00412 txid_snapshot_out(PG_FUNCTION_ARGS)
00413 {
00414     TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00415     StringInfoData str;
00416     uint32      i;
00417 
00418     initStringInfo(&str);
00419 
00420     appendStringInfo(&str, TXID_FMT ":", snap->xmin);
00421     appendStringInfo(&str, TXID_FMT ":", snap->xmax);
00422 
00423     for (i = 0; i < snap->nxip; i++)
00424     {
00425         if (i > 0)
00426             appendStringInfoChar(&str, ',');
00427         appendStringInfo(&str, TXID_FMT, snap->xip[i]);
00428     }
00429 
00430     PG_RETURN_CSTRING(str.data);
00431 }
00432 
00433 /*
00434  * txid_snapshot_recv(internal) returns txid_snapshot
00435  *
00436  *      binary input function for type txid_snapshot
00437  *
00438  *      format: int4 nxip, int8 xmin, int8 xmax, int8 xip
00439  */
00440 Datum
00441 txid_snapshot_recv(PG_FUNCTION_ARGS)
00442 {
00443     StringInfo  buf = (StringInfo) PG_GETARG_POINTER(0);
00444     TxidSnapshot *snap;
00445     txid        last = 0;
00446     int         nxip;
00447     int         i;
00448     int         avail;
00449     int         expect;
00450     txid        xmin,
00451                 xmax;
00452 
00453     /*
00454      * load nxip and check for nonsense.
00455      *
00456      * (nxip > avail) check is against int overflows in 'expect'.
00457      */
00458     nxip = pq_getmsgint(buf, 4);
00459     avail = buf->len - buf->cursor;
00460     expect = 8 + 8 + nxip * 8;
00461     if (nxip < 0 || nxip > avail || expect > avail)
00462         goto bad_format;
00463 
00464     xmin = pq_getmsgint64(buf);
00465     xmax = pq_getmsgint64(buf);
00466     if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
00467         goto bad_format;
00468 
00469     snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
00470     snap->xmin = xmin;
00471     snap->xmax = xmax;
00472     snap->nxip = nxip;
00473     SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
00474 
00475     for (i = 0; i < nxip; i++)
00476     {
00477         txid        cur = pq_getmsgint64(buf);
00478 
00479         if (cur <= last || cur < xmin || cur >= xmax)
00480             goto bad_format;
00481         snap->xip[i] = cur;
00482         last = cur;
00483     }
00484     PG_RETURN_POINTER(snap);
00485 
00486 bad_format:
00487     elog(ERROR, "invalid snapshot data");
00488     return (Datum) NULL;
00489 }
00490 
00491 /*
00492  * txid_snapshot_send(txid_snapshot) returns bytea
00493  *
00494  *      binary output function for type txid_snapshot
00495  *
00496  *      format: int4 nxip, int8 xmin, int8 xmax, int8 xip
00497  */
00498 Datum
00499 txid_snapshot_send(PG_FUNCTION_ARGS)
00500 {
00501     TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00502     StringInfoData buf;
00503     uint32      i;
00504 
00505     pq_begintypsend(&buf);
00506     pq_sendint(&buf, snap->nxip, 4);
00507     pq_sendint64(&buf, snap->xmin);
00508     pq_sendint64(&buf, snap->xmax);
00509     for (i = 0; i < snap->nxip; i++)
00510         pq_sendint64(&buf, snap->xip[i]);
00511     PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
00512 }
00513 
00514 /*
00515  * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
00516  *
00517  *      is txid visible in snapshot ?
00518  */
00519 Datum
00520 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
00521 {
00522     txid        value = PG_GETARG_INT64(0);
00523     TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
00524 
00525     PG_RETURN_BOOL(is_visible_txid(value, snap));
00526 }
00527 
00528 /*
00529  * txid_snapshot_xmin(txid_snapshot) returns int8
00530  *
00531  *      return snapshot's xmin
00532  */
00533 Datum
00534 txid_snapshot_xmin(PG_FUNCTION_ARGS)
00535 {
00536     TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00537 
00538     PG_RETURN_INT64(snap->xmin);
00539 }
00540 
00541 /*
00542  * txid_snapshot_xmax(txid_snapshot) returns int8
00543  *
00544  *      return snapshot's xmax
00545  */
00546 Datum
00547 txid_snapshot_xmax(PG_FUNCTION_ARGS)
00548 {
00549     TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00550 
00551     PG_RETURN_INT64(snap->xmax);
00552 }
00553 
00554 /*
00555  * txid_snapshot_xip(txid_snapshot) returns setof int8
00556  *
00557  *      return in-progress TXIDs in snapshot.
00558  */
00559 Datum
00560 txid_snapshot_xip(PG_FUNCTION_ARGS)
00561 {
00562     FuncCallContext *fctx;
00563     TxidSnapshot *snap;
00564     txid        value;
00565 
00566     /* on first call initialize snap_state and get copy of snapshot */
00567     if (SRF_IS_FIRSTCALL())
00568     {
00569         TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00570 
00571         fctx = SRF_FIRSTCALL_INIT();
00572 
00573         /* make a copy of user snapshot */
00574         snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
00575         memcpy(snap, arg, VARSIZE(arg));
00576 
00577         fctx->user_fctx = snap;
00578     }
00579 
00580     /* return values one-by-one */
00581     fctx = SRF_PERCALL_SETUP();
00582     snap = fctx->user_fctx;
00583     if (fctx->call_cntr < snap->nxip)
00584     {
00585         value = snap->xip[fctx->call_cntr];
00586         SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
00587     }
00588     else
00589     {
00590         SRF_RETURN_DONE(fctx);
00591     }
00592 }