00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "postgres.h"
00023
00024 #include "access/transam.h"
00025 #include "access/xact.h"
00026 #include "funcapi.h"
00027 #include "miscadmin.h"
00028 #include "libpq/pqformat.h"
00029 #include "utils/builtins.h"
00030 #include "utils/snapmgr.h"
00031
00032
00033
00034 #define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
00035
00036
00037 typedef uint64 txid;
00038
00039
00040 #define TXID_FMT UINT64_FORMAT
00041
00042
00043
00044
00045
00046 #define USE_BSEARCH_IF_NXIP_GREATER 30
00047
00048
00049
00050
00051
00052 typedef struct
00053 {
00054
00055
00056
00057
00058
00059 int32 __varsz;
00060
00061 uint32 nxip;
00062 txid xmin;
00063 txid xmax;
00064 txid xip[1];
00065 } TxidSnapshot;
00066
00067 #define TXID_SNAPSHOT_SIZE(nxip) \
00068 (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
00069
00070
00071
00072
00073 typedef struct
00074 {
00075 TransactionId last_xid;
00076 uint32 epoch;
00077 } TxidEpoch;
00078
00079
00080
00081
00082
00083 static void
00084 load_xid_epoch(TxidEpoch *state)
00085 {
00086 GetNextXidAndEpoch(&state->last_xid, &state->epoch);
00087 }
00088
00089
00090
00091
00092 static txid
00093 convert_xid(TransactionId xid, const TxidEpoch *state)
00094 {
00095 uint64 epoch;
00096
00097
00098 if (!TransactionIdIsNormal(xid))
00099 return (txid) xid;
00100
00101
00102 epoch = (uint64) state->epoch;
00103 if (xid > state->last_xid &&
00104 TransactionIdPrecedes(xid, state->last_xid))
00105 epoch--;
00106 else if (xid < state->last_xid &&
00107 TransactionIdFollows(xid, state->last_xid))
00108 epoch++;
00109
00110 return (epoch << 32) | xid;
00111 }
00112
00113
00114
00115
00116 static int
00117 cmp_txid(const void *aa, const void *bb)
00118 {
00119 txid a = *(const txid *) aa;
00120 txid b = *(const txid *) bb;
00121
00122 if (a < b)
00123 return -1;
00124 if (a > b)
00125 return 1;
00126 return 0;
00127 }
00128
00129
00130
00131
00132
00133
00134
00135 static void
00136 sort_snapshot(TxidSnapshot *snap)
00137 {
00138 if (snap->nxip > 1)
00139 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
00140 }
00141
00142
00143
00144
00145 static bool
00146 is_visible_txid(txid value, const TxidSnapshot *snap)
00147 {
00148 if (value < snap->xmin)
00149 return true;
00150 else if (value >= snap->xmax)
00151 return false;
00152 #ifdef USE_BSEARCH_IF_NXIP_GREATER
00153 else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
00154 {
00155 void *res;
00156
00157 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
00158
00159 return (res) ? false : true;
00160 }
00161 #endif
00162 else
00163 {
00164 uint32 i;
00165
00166 for (i = 0; i < snap->nxip; i++)
00167 {
00168 if (value == snap->xip[i])
00169 return false;
00170 }
00171 return true;
00172 }
00173 }
00174
00175
00176
00177
00178
00179 static StringInfo
00180 buf_init(txid xmin, txid xmax)
00181 {
00182 TxidSnapshot snap;
00183 StringInfo buf;
00184
00185 snap.xmin = xmin;
00186 snap.xmax = xmax;
00187 snap.nxip = 0;
00188
00189 buf = makeStringInfo();
00190 appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
00191 return buf;
00192 }
00193
00194 static void
00195 buf_add_txid(StringInfo buf, txid xid)
00196 {
00197 TxidSnapshot *snap = (TxidSnapshot *) buf->data;
00198
00199
00200 snap->nxip++;
00201
00202 appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
00203 }
00204
00205 static TxidSnapshot *
00206 buf_finalize(StringInfo buf)
00207 {
00208 TxidSnapshot *snap = (TxidSnapshot *) buf->data;
00209
00210 SET_VARSIZE(snap, buf->len);
00211
00212
00213 buf->data = NULL;
00214 pfree(buf);
00215
00216 return snap;
00217 }
00218
00219
00220
00221
00222
00223
00224 static txid
00225 str2txid(const char *s, const char **endp)
00226 {
00227 txid val = 0;
00228 txid cutoff = MAX_TXID / 10;
00229 txid cutlim = MAX_TXID % 10;
00230
00231 for (; *s; s++)
00232 {
00233 unsigned d;
00234
00235 if (*s < '0' || *s > '9')
00236 break;
00237 d = *s - '0';
00238
00239
00240
00241
00242 if (val > cutoff || (val == cutoff && d > cutlim))
00243 {
00244 val = 0;
00245 break;
00246 }
00247
00248 val = val * 10 + d;
00249 }
00250 if (endp)
00251 *endp = s;
00252 return val;
00253 }
00254
00255
00256
00257
00258 static TxidSnapshot *
00259 parse_snapshot(const char *str)
00260 {
00261 txid xmin;
00262 txid xmax;
00263 txid last_val = 0,
00264 val;
00265 const char *str_start = str;
00266 const char *endp;
00267 StringInfo buf;
00268
00269 xmin = str2txid(str, &endp);
00270 if (*endp != ':')
00271 goto bad_format;
00272 str = endp + 1;
00273
00274 xmax = str2txid(str, &endp);
00275 if (*endp != ':')
00276 goto bad_format;
00277 str = endp + 1;
00278
00279
00280 if (xmin == 0 || xmax == 0 || xmin > xmax)
00281 goto bad_format;
00282
00283
00284 buf = buf_init(xmin, xmax);
00285
00286
00287 while (*str != '\0')
00288 {
00289
00290 val = str2txid(str, &endp);
00291 str = endp;
00292
00293
00294 if (val < xmin || val >= xmax || val <= last_val)
00295 goto bad_format;
00296
00297 buf_add_txid(buf, val);
00298 last_val = val;
00299
00300 if (*str == ',')
00301 str++;
00302 else if (*str != '\0')
00303 goto bad_format;
00304 }
00305
00306 return buf_finalize(buf);
00307
00308 bad_format:
00309 elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
00310 return NULL;
00311 }
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327 Datum
00328 txid_current(PG_FUNCTION_ARGS)
00329 {
00330 txid val;
00331 TxidEpoch state;
00332
00333
00334
00335
00336
00337
00338
00339 PreventCommandDuringRecovery("txid_current()");
00340
00341 load_xid_epoch(&state);
00342
00343 val = convert_xid(GetTopTransactionId(), &state);
00344
00345 PG_RETURN_INT64(val);
00346 }
00347
00348
00349
00350
00351
00352
00353
00354
00355 Datum
00356 txid_current_snapshot(PG_FUNCTION_ARGS)
00357 {
00358 TxidSnapshot *snap;
00359 uint32 nxip,
00360 i,
00361 size;
00362 TxidEpoch state;
00363 Snapshot cur;
00364
00365 cur = GetActiveSnapshot();
00366 if (cur == NULL)
00367 elog(ERROR, "no active snapshot set");
00368
00369 load_xid_epoch(&state);
00370
00371
00372 nxip = cur->xcnt;
00373 size = TXID_SNAPSHOT_SIZE(nxip);
00374 snap = palloc(size);
00375 SET_VARSIZE(snap, size);
00376
00377
00378 snap->xmin = convert_xid(cur->xmin, &state);
00379 snap->xmax = convert_xid(cur->xmax, &state);
00380 snap->nxip = nxip;
00381 for (i = 0; i < nxip; i++)
00382 snap->xip[i] = convert_xid(cur->xip[i], &state);
00383
00384
00385 sort_snapshot(snap);
00386
00387 PG_RETURN_POINTER(snap);
00388 }
00389
00390
00391
00392
00393
00394
00395 Datum
00396 txid_snapshot_in(PG_FUNCTION_ARGS)
00397 {
00398 char *str = PG_GETARG_CSTRING(0);
00399 TxidSnapshot *snap;
00400
00401 snap = parse_snapshot(str);
00402
00403 PG_RETURN_POINTER(snap);
00404 }
00405
00406
00407
00408
00409
00410
00411 Datum
00412 txid_snapshot_out(PG_FUNCTION_ARGS)
00413 {
00414 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00415 StringInfoData str;
00416 uint32 i;
00417
00418 initStringInfo(&str);
00419
00420 appendStringInfo(&str, TXID_FMT ":", snap->xmin);
00421 appendStringInfo(&str, TXID_FMT ":", snap->xmax);
00422
00423 for (i = 0; i < snap->nxip; i++)
00424 {
00425 if (i > 0)
00426 appendStringInfoChar(&str, ',');
00427 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
00428 }
00429
00430 PG_RETURN_CSTRING(str.data);
00431 }
00432
00433
00434
00435
00436
00437
00438
00439
00440 Datum
00441 txid_snapshot_recv(PG_FUNCTION_ARGS)
00442 {
00443 StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
00444 TxidSnapshot *snap;
00445 txid last = 0;
00446 int nxip;
00447 int i;
00448 int avail;
00449 int expect;
00450 txid xmin,
00451 xmax;
00452
00453
00454
00455
00456
00457
00458 nxip = pq_getmsgint(buf, 4);
00459 avail = buf->len - buf->cursor;
00460 expect = 8 + 8 + nxip * 8;
00461 if (nxip < 0 || nxip > avail || expect > avail)
00462 goto bad_format;
00463
00464 xmin = pq_getmsgint64(buf);
00465 xmax = pq_getmsgint64(buf);
00466 if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
00467 goto bad_format;
00468
00469 snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
00470 snap->xmin = xmin;
00471 snap->xmax = xmax;
00472 snap->nxip = nxip;
00473 SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
00474
00475 for (i = 0; i < nxip; i++)
00476 {
00477 txid cur = pq_getmsgint64(buf);
00478
00479 if (cur <= last || cur < xmin || cur >= xmax)
00480 goto bad_format;
00481 snap->xip[i] = cur;
00482 last = cur;
00483 }
00484 PG_RETURN_POINTER(snap);
00485
00486 bad_format:
00487 elog(ERROR, "invalid snapshot data");
00488 return (Datum) NULL;
00489 }
00490
00491
00492
00493
00494
00495
00496
00497
00498 Datum
00499 txid_snapshot_send(PG_FUNCTION_ARGS)
00500 {
00501 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00502 StringInfoData buf;
00503 uint32 i;
00504
00505 pq_begintypsend(&buf);
00506 pq_sendint(&buf, snap->nxip, 4);
00507 pq_sendint64(&buf, snap->xmin);
00508 pq_sendint64(&buf, snap->xmax);
00509 for (i = 0; i < snap->nxip; i++)
00510 pq_sendint64(&buf, snap->xip[i]);
00511 PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
00512 }
00513
00514
00515
00516
00517
00518
00519 Datum
00520 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
00521 {
00522 txid value = PG_GETARG_INT64(0);
00523 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
00524
00525 PG_RETURN_BOOL(is_visible_txid(value, snap));
00526 }
00527
00528
00529
00530
00531
00532
00533 Datum
00534 txid_snapshot_xmin(PG_FUNCTION_ARGS)
00535 {
00536 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00537
00538 PG_RETURN_INT64(snap->xmin);
00539 }
00540
00541
00542
00543
00544
00545
00546 Datum
00547 txid_snapshot_xmax(PG_FUNCTION_ARGS)
00548 {
00549 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00550
00551 PG_RETURN_INT64(snap->xmax);
00552 }
00553
00554
00555
00556
00557
00558
00559 Datum
00560 txid_snapshot_xip(PG_FUNCTION_ARGS)
00561 {
00562 FuncCallContext *fctx;
00563 TxidSnapshot *snap;
00564 txid value;
00565
00566
00567 if (SRF_IS_FIRSTCALL())
00568 {
00569 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
00570
00571 fctx = SRF_FIRSTCALL_INIT();
00572
00573
00574 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
00575 memcpy(snap, arg, VARSIZE(arg));
00576
00577 fctx->user_fctx = snap;
00578 }
00579
00580
00581 fctx = SRF_PERCALL_SETUP();
00582 snap = fctx->user_fctx;
00583 if (fctx->call_cntr < snap->nxip)
00584 {
00585 value = snap->xip[fctx->call_cntr];
00586 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
00587 }
00588 else
00589 {
00590 SRF_RETURN_DONE(fctx);
00591 }
00592 }