What is (logical) Changeset Extraction

  1. some sessions perform DML
  2. another session gets all those changes in a ordered fashion

What can we do with it

High Level Architecture

index__1.png

Management of changeset extraction

Demonstration of the SQL Interface #1

postgres=# SELECT * FROM init_logical_replication('my-slot-name', 'test_decoding');
   slotname   | xlog_position
--------------+---------------
 my-slot-name | 0/194F158

Demonstration of the SQL Interface #2

postgres=# CREATE TABLE foo(id serial primary key, data int);
postgres=# INSERT INTO foo(data) VALUES (1),(5);

Demonstration of the SQL Interface #3

postgres=# SELECT * FROM start_logical_replication('my-slot-name', 'now');
 location  | xid |                     data
-----------+-----+----------------------------------------------
 0/19B9758 | 832 | BEGIN 832
 0/19B9758 | 832 | COMMIT 832
 0/19C0E90 | 833 | BEGIN 833
 0/19C0E90 | 833 | table "foo": INSERT: id[int4]:1 data[int4]:1
 0/19C0F58 | 833 | table "foo": INSERT: id[int4]:2 data[int4]:5
 0/19C0E90 | 833 | COMMIT 833
(6 rows)

Demonstration of the SQL Interface #4

postgres=# INSERT INTO foo(data) VALUES (6);
postgres=# DROP TABLE foo;

postgres=# SELECT * FROM start_logical_replication('my-slot-name', 'now', 'hide-xids', '1');
 location  | xid |                     data
-----------+-----+----------------------------------------------
 0/19C1210 | 834 | BEGIN
 0/19C1210 | 834 | table "foo": INSERT: id[int4]:3 data[int4]:6
 0/19C1210 | 834 | COMMIT
...

Demonstration of the SQL Interface #5

postgres=# SELECT stop_logical_replication('my-slot-name');

Demonstration of the streaming interface

$ pg_receivellog                                \
 -d "host=/tmp port=5440 dbname=postgres"       \
 --slot my-slot-name                            \
 -f /dev/stdout                                 \
 --init --start
WARNING:  Initiating logical rep from 0/19D3498
WARNING:  Starting logical replication
BEGIN 839
table "foo": UPDATE: old-pkey: id[int4]:1 new-tuple: id[int4]:-1 data[int4]:-1
COMMIT 839

How does an Output Plugin look like: #1

How does an Output Plugin look like: #2

00001: void
00002: pg_decode_commit_txn(LogicalDecodingContext * ctx, ReorderBufferTXN * txn,
00003:                      XLogRecPtr commit_lsn)
00004: {
00005:         TestDecodingData *data = ctx->output_plugin_private;
00006:
00007:         AssertVariableIsOfType(&pg_decode_commit_txn, LogicalDecodeCommitCB);
00008:
00009:         ctx->prepare_write(ctx, txn->lsn, txn->xid);
00010:         if (data->include_xids)
00011:                 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
00012:         else
00013:                 appendStringInfoString(ctx->out, "COMMIT");
00014:         ctx->write(ctx, txn->lsn, txn->xid);
00015: }

How does an Output Plugin look like: #3

00001: void
00002: pg_decode_change(LogicalDecodingContext * ctx, ReorderBufferTXN * txn,
00003:                  Relation relation, ReorderBufferChange * change)
00004: {
00005:         ctx->prepare_write(ctx, change->lsn, txn->xid);
00006:         ...
00007:         switch (change->action)
00008:         {
00009:                 case REORDER_BUFFER_CHANGE_INSERT:
00010:                     ...
00011:                 case REORDER_BUFFER_CHANGE_UPDATE:
00012:                     ...
00013:                 case REORDER_BUFFER_CHANGE_DELETE:
00014:                     ...
00015:         }
00016:         ...
00017:         ctx->write(ctx, change->lsn, txn->xid);
00018: }

How does an Output Plugin look like: #4

00001: case REORDER_BUFFER_CHANGE_INSERT:
00002:         appendStringInfoString(ctx->out, " INSERT:");
00003:         tuple_to_stringinfo(ctx->out, tupdesc, &change->newtuple->tuple);

Initial Clone

psql "host=/tmp port=5440 dbname=postgres replication=1"
postgres=# INIT_LOGICAL_REPLICATION my_slot test_decoding;
 replication_id | consistent_point | snapshot_name |    plugin
----------------+------------------+---------------+---------------
 my_slot        | 0/19E6908        | 00000348-1    | test_decoding
postgres=# BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY;
postgres=# SET TRANSACTION SNAPSHOT '00000348-1';

How does it work: WAL #1

lsn: 0/17A89B0
tx: 687
rmgr: Heap : insert
rel 1663/12032/11782; tid 41/35
DATA

How does it work: WAL #2

00001: S1: BEGIN;
00002: S2: BEGIN;
00003: S1: SAVEPOINT s1s1;
00004: S1: INSERT INTO foo(data) VALUES (1);
00005: S2: INSERT INTO foo(data) VALUES (2);
00006: S1: RELEASE SAVEPOINT s1s1;
00007: S1: INSERT INTO foo(data) VALUES (3);
00008: S2: COMMIT;
00009: S1: COMMIT;

How does it work: WAL #3

rmgr: Heap        tx:        704, desc: insert: lsn: 0/018EBFD8; rel 1663/12051/16950; tid 0/5
rmgr: Heap        tx:        705, desc: insert: lsn: 0/018EC020; rel 1663/12051/16950; tid 0/6
rmgr: Heap        tx:        703, desc: insert: lsn: 0/018EC180; rel 1663/12051/16950; tid 0/7
rmgr: Transaction tx:        705, desc: lsn: 0/018EC330; commit
rmgr: Transaction tx:        703, desc: lsn: 0/018EC370; commit; subxacts: 704

How does it work: Reordering of Changes

How does it work: Timetravel Snapshots #1

00001: CREATE TABLE foo(id serial, data int);
00002: INSERT INTO foo(data) VALUES(1);
00003: ALTER TABLE foo ALTER COLUMN data TYPE text;
00004: INSERT INTO foo(data) VALUES('1');

How does it work: Timetravel Snapshots #2

I have discovered a truly remarkable proof of this theorem which this margin is too small to contain.

— Pierre de Fermat

Future Work #1 Commit Timestamps

postgres=# SHOW track_commit_timestamp ;
 track_commit_timestamp
 -----------------------
 on

postgres=# SELECT txid_current();

postgres=# SELECT pg_get_transaction_committime('695'::xid);
 pg_get_transaction_committime
 -------------------------------
 2013-05-23 06:58:43.10488+02

Future Work #2 Sequence AM & Distributed Sequences

CREATE EXTENSION bdr;
CREATE SEQUENCE test3 USING bdr WITH (bdr.batch_size = 1000);

Future Work #3 Multimaster w. Conflict Resolution

Info

Thanks to