Header And Logo

PostgreSQL
| The world's most advanced open source database.

Functions | Variables

libpqwalreceiver.c File Reference

#include "postgres.h"
#include <unistd.h>
#include <sys/time.h>
#include "libpq-fe.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "utils/builtins.h"
Include dependency graph for libpqwalreceiver.c:

Go to the source code of this file.

Functions

void _PG_init (void)
static void libpqrcv_connect (char *conninfo)
static void libpqrcv_identify_system (TimeLineID *primary_tli)
static void libpqrcv_readtimelinehistoryfile (TimeLineID tli, char **filename, char **content, int *len)
static bool libpqrcv_startstreaming (TimeLineID tli, XLogRecPtr startpoint)
static void libpqrcv_endstreaming (TimeLineID *next_tli)
static int libpqrcv_receive (int timeout, char **buffer)
static void libpqrcv_send (const char *buffer, int nbytes)
static void libpqrcv_disconnect (void)
static bool libpq_select (int timeout_ms)
static PGresultlibpqrcv_PQexec (const char *query)

Variables

 PG_MODULE_MAGIC
static PGconnstreamConn = NULL
static char * recvBuf = NULL

Function Documentation

void _PG_init ( void   ) 

Definition at line 66 of file libpqwalreceiver.c.

References elog, ERROR, NULL, walrcv_connect, walrcv_disconnect, walrcv_endstreaming, walrcv_identify_system, walrcv_readtimelinehistoryfile, walrcv_receive, walrcv_send, and walrcv_startstreaming.

{
    /* Tell walreceiver how to reach us */
    if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
        walrcv_readtimelinehistoryfile != NULL ||
        walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
        walrcv_receive != NULL || walrcv_send != NULL ||
        walrcv_disconnect != NULL)
        elog(ERROR, "libpqwalreceiver already loaded");
    walrcv_connect = libpqrcv_connect;
    walrcv_identify_system = libpqrcv_identify_system;
    walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
    walrcv_startstreaming = libpqrcv_startstreaming;
    walrcv_endstreaming = libpqrcv_endstreaming;
    walrcv_receive = libpqrcv_receive;
    walrcv_send = libpqrcv_send;
    walrcv_disconnect = libpqrcv_disconnect;
}

static bool libpq_select ( int  timeout_ms  )  [static]

Definition at line 304 of file libpqwalreceiver.c.

References Assert, EINTR, ereport, errcode_for_socket_access(), errmsg(), ERROR, NULL, PQsocket(), and select.

Referenced by libpqrcv_PQexec(), and libpqrcv_receive().

{
    int         ret;

    Assert(streamConn != NULL);
    if (PQsocket(streamConn) < 0)
        ereport(ERROR,
                (errcode_for_socket_access(),
                 errmsg("socket not open")));

    /* We use poll(2) if available, otherwise select(2) */
    {
#ifdef HAVE_POLL
        struct pollfd input_fd;

        input_fd.fd = PQsocket(streamConn);
        input_fd.events = POLLIN | POLLERR;
        input_fd.revents = 0;

        ret = poll(&input_fd, 1, timeout_ms);
#else                           /* !HAVE_POLL */

        fd_set      input_mask;
        struct timeval timeout;
        struct timeval *ptr_timeout;

        FD_ZERO(&input_mask);
        FD_SET(PQsocket(streamConn), &input_mask);

        if (timeout_ms < 0)
            ptr_timeout = NULL;
        else
        {
            timeout.tv_sec = timeout_ms / 1000;
            timeout.tv_usec = (timeout_ms % 1000) * 1000;
            ptr_timeout = &timeout;
        }

        ret = select(PQsocket(streamConn) + 1, &input_mask,
                     NULL, NULL, ptr_timeout);
#endif   /* HAVE_POLL */
    }

    if (ret == 0 || (ret < 0 && errno == EINTR))
        return false;
    if (ret < 0)
        ereport(ERROR,
                (errcode_for_socket_access(),
                 errmsg("select() failed: %m")));
    return true;
}

static void libpqrcv_connect ( char *  conninfo  )  [static]

Definition at line 89 of file libpqwalreceiver.c.

References CONNECTION_OK, ereport, errmsg(), ERROR, MAXCONNINFO, PQconnectdb(), PQerrorMessage(), PQstatus(), and snprintf().

{
    char        conninfo_repl[MAXCONNINFO + 75];

    /*
     * Connect using deliberately undocumented parameter: replication. The
     * database name is ignored by the server in replication mode, but specify
     * "replication" for .pgpass lookup.
     */
    snprintf(conninfo_repl, sizeof(conninfo_repl),
             "%s dbname=replication replication=true fallback_application_name=walreceiver",
             conninfo);

    streamConn = PQconnectdb(conninfo_repl);
    if (PQstatus(streamConn) != CONNECTION_OK)
        ereport(ERROR,
                (errmsg("could not connect to the primary server: %s",
                        PQerrorMessage(streamConn))));
}

static void libpqrcv_disconnect ( void   )  [static]

Definition at line 437 of file libpqwalreceiver.c.

References PQfinish().

static void libpqrcv_endstreaming ( TimeLineID next_tli  )  [static]

Definition at line 206 of file libpqwalreceiver.c.

References ereport, errmsg(), ERROR, NULL, pg_atoi(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQflush(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQputCopyEnd(), and PQresultStatus().

{
    PGresult   *res;

    if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
        ereport(ERROR,
                (errmsg("could not send end-of-streaming message to primary: %s",
                        PQerrorMessage(streamConn))));

    /*
     * After COPY is finished, we should receive a result set indicating the
     * next timeline's ID, or just CommandComplete if the server was shut down.
     *
     * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
     * would also be possible. However, at the moment this function is only
     * called after receiving CopyDone from the backend - the walreceiver
     * never terminates replication on its own initiative.
     */
    res = PQgetResult(streamConn);
    if (PQresultStatus(res) == PGRES_TUPLES_OK)
    {
        /* Read the next timeline's ID */
        if (PQnfields(res) != 1 || PQntuples(res) != 1)
            ereport(ERROR,
                    (errmsg("unexpected result set after end-of-streaming")));
        *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
        PQclear(res);

        /* the result set should be followed by CommandComplete */
        res = PQgetResult(streamConn);
    }
    else
        *next_tli = 0;

    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        ereport(ERROR,
                (errmsg("error reading result of streaming command: %s",
                        PQerrorMessage(streamConn))));

    /* Verify that there are no more results */
    res = PQgetResult(streamConn);
    if (res != NULL)
        ereport(ERROR,
                (errmsg("unexpected result after CommandComplete: %s",
                        PQerrorMessage(streamConn))));
}

static void libpqrcv_identify_system ( TimeLineID primary_tli  )  [static]

Definition at line 114 of file libpqwalreceiver.c.

References ereport, errdetail(), errmsg(), ERROR, GetSystemIdentifier(), libpqrcv_PQexec(), pg_atoi(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), and snprintf().

{
    PGresult   *res;
    char       *primary_sysid;
    char        standby_sysid[32];

    /*
     * Get the system identifier and timeline ID as a DataRow message from the
     * primary server.
     */
    res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        PQclear(res);
        ereport(ERROR,
                (errmsg("could not receive database system identifier and timeline ID from "
                        "the primary server: %s",
                        PQerrorMessage(streamConn))));
    }
    if (PQnfields(res) != 3 || PQntuples(res) != 1)
    {
        int         ntuples = PQntuples(res);
        int         nfields = PQnfields(res);

        PQclear(res);
        ereport(ERROR,
                (errmsg("invalid response from primary server"),
                 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
                           ntuples, nfields)));
    }
    primary_sysid = PQgetvalue(res, 0, 0);
    *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);

    /*
     * Confirm that the system identifier of the primary is the same as ours.
     */
    snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
             GetSystemIdentifier());
    if (strcmp(primary_sysid, standby_sysid) != 0)
    {
        PQclear(res);
        ereport(ERROR,
                (errmsg("database system identifier differs between the primary and standby"),
                 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
                           primary_sysid, standby_sysid)));
    }
    PQclear(res);
}

static PGresult * libpqrcv_PQexec ( const char *  query  )  [static]

Definition at line 373 of file libpqwalreceiver.c.

References CONNECTION_BAD, libpq_select(), NULL, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQresultStatus(), PQsendQuery(), and PQstatus().

Referenced by libpqrcv_identify_system(), libpqrcv_readtimelinehistoryfile(), and libpqrcv_startstreaming().

{
    PGresult   *result = NULL;
    PGresult   *lastResult = NULL;

    /*
     * PQexec() silently discards any prior query results on the connection.
     * This is not required for walreceiver since it's expected that walsender
     * won't generate any such junk results.
     */

    /*
     * Submit a query. Since we don't use non-blocking mode, this also can
     * block. But its risk is relatively small, so we ignore that for now.
     */
    if (!PQsendQuery(streamConn, query))
        return NULL;

    for (;;)
    {
        /*
         * Receive data until PQgetResult is ready to get the result without
         * blocking.
         */
        while (PQisBusy(streamConn))
        {
            /*
             * We don't need to break down the sleep into smaller increments,
             * and check for interrupts after each nap, since we can just
             * elog(FATAL) within SIGTERM signal handler if the signal arrives
             * in the middle of establishment of replication connection.
             */
            if (!libpq_select(-1))
                continue;       /* interrupted */
            if (PQconsumeInput(streamConn) == 0)
                return NULL;    /* trouble */
        }

        /*
         * Emulate the PQexec()'s behavior of returning the last result when
         * there are many. Since walsender will never generate multiple
         * results, we skip the concatenation of error messages.
         */
        result = PQgetResult(streamConn);
        if (result == NULL)
            break;              /* query is complete */

        PQclear(lastResult);
        lastResult = result;

        if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
            PQresultStatus(lastResult) == PGRES_COPY_OUT ||
            PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
            PQstatus(streamConn) == CONNECTION_BAD)
            break;
    }

    return lastResult;
}

static void libpqrcv_readtimelinehistoryfile ( TimeLineID  tli,
char **  filename,
char **  content,
int *  len 
) [static]

Definition at line 257 of file libpqwalreceiver.c.

References ereport, errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), palloc(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetlength(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), and snprintf().

{
    PGresult   *res;
    char        cmd[64];

    /*
     * Request the primary to send over the history file for given timeline.
     */
    snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
    res = libpqrcv_PQexec(cmd);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        PQclear(res);
        ereport(ERROR,
                (errmsg("could not receive timeline history file from "
                        "the primary server: %s",
                        PQerrorMessage(streamConn))));
    }
    if (PQnfields(res) != 2 || PQntuples(res) != 1)
    {
        int         ntuples = PQntuples(res);
        int         nfields = PQnfields(res);

        PQclear(res);
        ereport(ERROR,
                (errmsg("invalid response from primary server"),
                 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
                           ntuples, nfields)));
    }
    *filename = pstrdup(PQgetvalue(res, 0, 0));

    *len = PQgetlength(res, 0, 1);
    *content = palloc(*len);
    memcpy(*content, PQgetvalue(res, 0, 1), *len);
    PQclear(res);
}

static int libpqrcv_receive ( int  timeout,
char **  buffer 
) [static]

Definition at line 461 of file libpqwalreceiver.c.

References ereport, errmsg(), ERROR, libpq_select(), NULL, PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQgetResult(), PQresultStatus(), and recvBuf.

{
    int         rawlen;

    if (recvBuf != NULL)
        PQfreemem(recvBuf);
    recvBuf = NULL;

    /* Try to receive a CopyData message */
    rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
    if (rawlen == 0)
    {
        /*
         * No data available yet. If the caller requested to block, wait for
         * more data to arrive.
         */
        if (timeout > 0)
        {
            if (!libpq_select(timeout))
                return 0;
        }

        if (PQconsumeInput(streamConn) == 0)
            ereport(ERROR,
                    (errmsg("could not receive data from WAL stream: %s",
                            PQerrorMessage(streamConn))));

        /* Now that we've consumed some input, try again */
        rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
        if (rawlen == 0)
            return 0;
    }
    if (rawlen == -1)           /* end-of-streaming or error */
    {
        PGresult   *res;

        res = PQgetResult(streamConn);
        if (PQresultStatus(res) == PGRES_COMMAND_OK ||
            PQresultStatus(res) == PGRES_COPY_IN)
        {
            PQclear(res);
            return -1;
        }
        else
        {
            PQclear(res);
            ereport(ERROR,
                    (errmsg("could not receive data from WAL stream: %s",
                            PQerrorMessage(streamConn))));
        }
    }
    if (rawlen < -1)
        ereport(ERROR,
                (errmsg("could not receive data from WAL stream: %s",
                        PQerrorMessage(streamConn))));

    /* Return received messages to caller */
    *buffer = recvBuf;
    return rawlen;
}

static void libpqrcv_send ( const char *  buffer,
int  nbytes 
) [static]

Definition at line 528 of file libpqwalreceiver.c.

References ereport, errmsg(), ERROR, PQerrorMessage(), PQflush(), and PQputCopyData().

{
    if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
        PQflush(streamConn))
        ereport(ERROR,
                (errmsg("could not send data to WAL stream: %s",
                        PQerrorMessage(streamConn))));
}

static bool libpqrcv_startstreaming ( TimeLineID  tli,
XLogRecPtr  startpoint 
) [static]

Definition at line 174 of file libpqwalreceiver.c.

References ereport, errmsg(), ERROR, libpqrcv_PQexec(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, PQclear(), PQerrorMessage(), PQresultStatus(), and snprintf().

{
    char        cmd[64];
    PGresult   *res;

    /* Start streaming from the point requested by startup process */
    snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
             (uint32) (startpoint >> 32), (uint32) startpoint,
             tli);
    res = libpqrcv_PQexec(cmd);

    if (PQresultStatus(res) == PGRES_COMMAND_OK)
    {
        PQclear(res);
        return false;
    }
    else if (PQresultStatus(res) != PGRES_COPY_BOTH)
    {
        PQclear(res);
        ereport(ERROR,
                (errmsg("could not start WAL streaming: %s",
                        PQerrorMessage(streamConn))));
    }
    PQclear(res);
    return true;
}


Variable Documentation

Definition at line 38 of file libpqwalreceiver.c.

char* recvBuf = NULL [static]

Definition at line 46 of file libpqwalreceiver.c.

Referenced by libpqrcv_receive().

PGconn* streamConn = NULL [static]

Definition at line 43 of file libpqwalreceiver.c.