Header And Logo

PostgreSQL
| The world's most advanced open source database.

libpqwalreceiver.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * libpqwalreceiver.c
00004  *
00005  * This file contains the libpq-specific parts of walreceiver. It's
00006  * loaded as a dynamic module to avoid linking the main server binary with
00007  * libpq.
00008  *
00009  * Portions Copyright (c) 2010-2013, PostgreSQL Global Development Group
00010  *
00011  *
00012  * IDENTIFICATION
00013  *    src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
00014  *
00015  *-------------------------------------------------------------------------
00016  */
00017 #include "postgres.h"
00018 
00019 #include <unistd.h>
00020 #include <sys/time.h>
00021 
00022 #include "libpq-fe.h"
00023 #include "access/xlog.h"
00024 #include "miscadmin.h"
00025 #include "replication/walreceiver.h"
00026 #include "utils/builtins.h"
00027 
00028 #ifdef HAVE_POLL_H
00029 #include <poll.h>
00030 #endif
00031 #ifdef HAVE_SYS_POLL_H
00032 #include <sys/poll.h>
00033 #endif
00034 #ifdef HAVE_SYS_SELECT_H
00035 #include <sys/select.h>
00036 #endif
00037 
00038 PG_MODULE_MAGIC;
00039 
00040 void        _PG_init(void);
00041 
00042 /* Current connection to the primary, if any */
00043 static PGconn *streamConn = NULL;
00044 
00045 /* Buffer for currently read records */
00046 static char *recvBuf = NULL;
00047 
00048 /* Prototypes for interface functions */
00049 static void libpqrcv_connect(char *conninfo);
00050 static void libpqrcv_identify_system(TimeLineID *primary_tli);
00051 static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
00052 static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
00053 static void libpqrcv_endstreaming(TimeLineID *next_tli);
00054 static int libpqrcv_receive(int timeout, char **buffer);
00055 static void libpqrcv_send(const char *buffer, int nbytes);
00056 static void libpqrcv_disconnect(void);
00057 
00058 /* Prototypes for private functions */
00059 static bool libpq_select(int timeout_ms);
00060 static PGresult *libpqrcv_PQexec(const char *query);
00061 
00062 /*
00063  * Module load callback
00064  */
00065 void
00066 _PG_init(void)
00067 {
00068     /* Tell walreceiver how to reach us */
00069     if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
00070         walrcv_readtimelinehistoryfile != NULL ||
00071         walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
00072         walrcv_receive != NULL || walrcv_send != NULL ||
00073         walrcv_disconnect != NULL)
00074         elog(ERROR, "libpqwalreceiver already loaded");
00075     walrcv_connect = libpqrcv_connect;
00076     walrcv_identify_system = libpqrcv_identify_system;
00077     walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
00078     walrcv_startstreaming = libpqrcv_startstreaming;
00079     walrcv_endstreaming = libpqrcv_endstreaming;
00080     walrcv_receive = libpqrcv_receive;
00081     walrcv_send = libpqrcv_send;
00082     walrcv_disconnect = libpqrcv_disconnect;
00083 }
00084 
00085 /*
00086  * Establish the connection to the primary server for XLOG streaming
00087  */
00088 static void
00089 libpqrcv_connect(char *conninfo)
00090 {
00091     char        conninfo_repl[MAXCONNINFO + 75];
00092 
00093     /*
00094      * Connect using deliberately undocumented parameter: replication. The
00095      * database name is ignored by the server in replication mode, but specify
00096      * "replication" for .pgpass lookup.
00097      */
00098     snprintf(conninfo_repl, sizeof(conninfo_repl),
00099              "%s dbname=replication replication=true fallback_application_name=walreceiver",
00100              conninfo);
00101 
00102     streamConn = PQconnectdb(conninfo_repl);
00103     if (PQstatus(streamConn) != CONNECTION_OK)
00104         ereport(ERROR,
00105                 (errmsg("could not connect to the primary server: %s",
00106                         PQerrorMessage(streamConn))));
00107 }
00108 
00109 /*
00110  * Check that primary's system identifier matches ours, and fetch the current
00111  * timeline ID of the primary.
00112  */
00113 static void
00114 libpqrcv_identify_system(TimeLineID *primary_tli)
00115 {
00116     PGresult   *res;
00117     char       *primary_sysid;
00118     char        standby_sysid[32];
00119 
00120     /*
00121      * Get the system identifier and timeline ID as a DataRow message from the
00122      * primary server.
00123      */
00124     res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
00125     if (PQresultStatus(res) != PGRES_TUPLES_OK)
00126     {
00127         PQclear(res);
00128         ereport(ERROR,
00129                 (errmsg("could not receive database system identifier and timeline ID from "
00130                         "the primary server: %s",
00131                         PQerrorMessage(streamConn))));
00132     }
00133     if (PQnfields(res) != 3 || PQntuples(res) != 1)
00134     {
00135         int         ntuples = PQntuples(res);
00136         int         nfields = PQnfields(res);
00137 
00138         PQclear(res);
00139         ereport(ERROR,
00140                 (errmsg("invalid response from primary server"),
00141                  errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
00142                            ntuples, nfields)));
00143     }
00144     primary_sysid = PQgetvalue(res, 0, 0);
00145     *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
00146 
00147     /*
00148      * Confirm that the system identifier of the primary is the same as ours.
00149      */
00150     snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
00151              GetSystemIdentifier());
00152     if (strcmp(primary_sysid, standby_sysid) != 0)
00153     {
00154         PQclear(res);
00155         ereport(ERROR,
00156                 (errmsg("database system identifier differs between the primary and standby"),
00157                  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
00158                            primary_sysid, standby_sysid)));
00159     }
00160     PQclear(res);
00161 }
00162 
00163 /*
00164  * Start streaming WAL data from given startpoint and timeline.
00165  *
00166  * Returns true if we switched successfully to copy-both mode. False
00167  * means the server received the command and executed it successfully, but
00168  * didn't switch to copy-mode.  That means that there was no WAL on the
00169  * requested timeline and starting point, because the server switched to
00170  * another timeline at or before the requested starting point. On failure,
00171  * throws an ERROR.
00172  */
00173 static bool
00174 libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
00175 {
00176     char        cmd[64];
00177     PGresult   *res;
00178 
00179     /* Start streaming from the point requested by startup process */
00180     snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
00181              (uint32) (startpoint >> 32), (uint32) startpoint,
00182              tli);
00183     res = libpqrcv_PQexec(cmd);
00184 
00185     if (PQresultStatus(res) == PGRES_COMMAND_OK)
00186     {
00187         PQclear(res);
00188         return false;
00189     }
00190     else if (PQresultStatus(res) != PGRES_COPY_BOTH)
00191     {
00192         PQclear(res);
00193         ereport(ERROR,
00194                 (errmsg("could not start WAL streaming: %s",
00195                         PQerrorMessage(streamConn))));
00196     }
00197     PQclear(res);
00198     return true;
00199 }
00200 
00201 /*
00202  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
00203  * reported by the server, or 0 if it did not report it.
00204  */
00205 static void
00206 libpqrcv_endstreaming(TimeLineID *next_tli)
00207 {
00208     PGresult   *res;
00209 
00210     if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
00211         ereport(ERROR,
00212                 (errmsg("could not send end-of-streaming message to primary: %s",
00213                         PQerrorMessage(streamConn))));
00214 
00215     /*
00216      * After COPY is finished, we should receive a result set indicating the
00217      * next timeline's ID, or just CommandComplete if the server was shut down.
00218      *
00219      * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
00220      * would also be possible. However, at the moment this function is only
00221      * called after receiving CopyDone from the backend - the walreceiver
00222      * never terminates replication on its own initiative.
00223      */
00224     res = PQgetResult(streamConn);
00225     if (PQresultStatus(res) == PGRES_TUPLES_OK)
00226     {
00227         /* Read the next timeline's ID */
00228         if (PQnfields(res) != 1 || PQntuples(res) != 1)
00229             ereport(ERROR,
00230                     (errmsg("unexpected result set after end-of-streaming")));
00231         *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
00232         PQclear(res);
00233 
00234         /* the result set should be followed by CommandComplete */
00235         res = PQgetResult(streamConn);
00236     }
00237     else
00238         *next_tli = 0;
00239 
00240     if (PQresultStatus(res) != PGRES_COMMAND_OK)
00241         ereport(ERROR,
00242                 (errmsg("error reading result of streaming command: %s",
00243                         PQerrorMessage(streamConn))));
00244 
00245     /* Verify that there are no more results */
00246     res = PQgetResult(streamConn);
00247     if (res != NULL)
00248         ereport(ERROR,
00249                 (errmsg("unexpected result after CommandComplete: %s",
00250                         PQerrorMessage(streamConn))));
00251 }
00252 
00253 /*
00254  * Fetch the timeline history file for 'tli' from primary.
00255  */
00256 static void
00257 libpqrcv_readtimelinehistoryfile(TimeLineID tli,
00258                                  char **filename, char **content, int *len)
00259 {
00260     PGresult   *res;
00261     char        cmd[64];
00262 
00263     /*
00264      * Request the primary to send over the history file for given timeline.
00265      */
00266     snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
00267     res = libpqrcv_PQexec(cmd);
00268     if (PQresultStatus(res) != PGRES_TUPLES_OK)
00269     {
00270         PQclear(res);
00271         ereport(ERROR,
00272                 (errmsg("could not receive timeline history file from "
00273                         "the primary server: %s",
00274                         PQerrorMessage(streamConn))));
00275     }
00276     if (PQnfields(res) != 2 || PQntuples(res) != 1)
00277     {
00278         int         ntuples = PQntuples(res);
00279         int         nfields = PQnfields(res);
00280 
00281         PQclear(res);
00282         ereport(ERROR,
00283                 (errmsg("invalid response from primary server"),
00284                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
00285                            ntuples, nfields)));
00286     }
00287     *filename = pstrdup(PQgetvalue(res, 0, 0));
00288 
00289     *len = PQgetlength(res, 0, 1);
00290     *content = palloc(*len);
00291     memcpy(*content, PQgetvalue(res, 0, 1), *len);
00292     PQclear(res);
00293 }
00294 
00295 /*
00296  * Wait until we can read WAL stream, or timeout.
00297  *
00298  * Returns true if data has become available for reading, false if timed out
00299  * or interrupted by signal.
00300  *
00301  * This is based on pqSocketCheck.
00302  */
00303 static bool
00304 libpq_select(int timeout_ms)
00305 {
00306     int         ret;
00307 
00308     Assert(streamConn != NULL);
00309     if (PQsocket(streamConn) < 0)
00310         ereport(ERROR,
00311                 (errcode_for_socket_access(),
00312                  errmsg("socket not open")));
00313 
00314     /* We use poll(2) if available, otherwise select(2) */
00315     {
00316 #ifdef HAVE_POLL
00317         struct pollfd input_fd;
00318 
00319         input_fd.fd = PQsocket(streamConn);
00320         input_fd.events = POLLIN | POLLERR;
00321         input_fd.revents = 0;
00322 
00323         ret = poll(&input_fd, 1, timeout_ms);
00324 #else                           /* !HAVE_POLL */
00325 
00326         fd_set      input_mask;
00327         struct timeval timeout;
00328         struct timeval *ptr_timeout;
00329 
00330         FD_ZERO(&input_mask);
00331         FD_SET(PQsocket(streamConn), &input_mask);
00332 
00333         if (timeout_ms < 0)
00334             ptr_timeout = NULL;
00335         else
00336         {
00337             timeout.tv_sec = timeout_ms / 1000;
00338             timeout.tv_usec = (timeout_ms % 1000) * 1000;
00339             ptr_timeout = &timeout;
00340         }
00341 
00342         ret = select(PQsocket(streamConn) + 1, &input_mask,
00343                      NULL, NULL, ptr_timeout);
00344 #endif   /* HAVE_POLL */
00345     }
00346 
00347     if (ret == 0 || (ret < 0 && errno == EINTR))
00348         return false;
00349     if (ret < 0)
00350         ereport(ERROR,
00351                 (errcode_for_socket_access(),
00352                  errmsg("select() failed: %m")));
00353     return true;
00354 }
00355 
00356 /*
00357  * Send a query and wait for the results by using the asynchronous libpq
00358  * functions and the backend version of select().
00359  *
00360  * We must not use the regular blocking libpq functions like PQexec()
00361  * since they are uninterruptible by signals on some platforms, such as
00362  * Windows.
00363  *
00364  * We must also not use vanilla select() here since it cannot handle the
00365  * signal emulation layer on Windows.
00366  *
00367  * The function is modeled on PQexec() in libpq, but only implements
00368  * those parts that are in use in the walreceiver.
00369  *
00370  * Queries are always executed on the connection in streamConn.
00371  */
00372 static PGresult *
00373 libpqrcv_PQexec(const char *query)
00374 {
00375     PGresult   *result = NULL;
00376     PGresult   *lastResult = NULL;
00377 
00378     /*
00379      * PQexec() silently discards any prior query results on the connection.
00380      * This is not required for walreceiver since it's expected that walsender
00381      * won't generate any such junk results.
00382      */
00383 
00384     /*
00385      * Submit a query. Since we don't use non-blocking mode, this also can
00386      * block. But its risk is relatively small, so we ignore that for now.
00387      */
00388     if (!PQsendQuery(streamConn, query))
00389         return NULL;
00390 
00391     for (;;)
00392     {
00393         /*
00394          * Receive data until PQgetResult is ready to get the result without
00395          * blocking.
00396          */
00397         while (PQisBusy(streamConn))
00398         {
00399             /*
00400              * We don't need to break down the sleep into smaller increments,
00401              * and check for interrupts after each nap, since we can just
00402              * elog(FATAL) within SIGTERM signal handler if the signal arrives
00403              * in the middle of establishment of replication connection.
00404              */
00405             if (!libpq_select(-1))
00406                 continue;       /* interrupted */
00407             if (PQconsumeInput(streamConn) == 0)
00408                 return NULL;    /* trouble */
00409         }
00410 
00411         /*
00412          * Emulate the PQexec()'s behavior of returning the last result when
00413          * there are many. Since walsender will never generate multiple
00414          * results, we skip the concatenation of error messages.
00415          */
00416         result = PQgetResult(streamConn);
00417         if (result == NULL)
00418             break;              /* query is complete */
00419 
00420         PQclear(lastResult);
00421         lastResult = result;
00422 
00423         if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
00424             PQresultStatus(lastResult) == PGRES_COPY_OUT ||
00425             PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
00426             PQstatus(streamConn) == CONNECTION_BAD)
00427             break;
00428     }
00429 
00430     return lastResult;
00431 }
00432 
00433 /*
00434  * Disconnect connection to primary, if any.
00435  */
00436 static void
00437 libpqrcv_disconnect(void)
00438 {
00439     PQfinish(streamConn);
00440     streamConn = NULL;
00441 }
00442 
00443 /*
00444  * Receive a message available from XLOG stream, blocking for
00445  * maximum of 'timeout' ms.
00446  *
00447  * Returns:
00448  *
00449  *   If data was received, returns the length of the data. *buffer is set to
00450  *   point to a buffer holding the received message. The buffer is only valid
00451  *   until the next libpqrcv_* call.
00452  *
00453  *   0 if no data was available within timeout, or wait was interrupted
00454  *   by signal.
00455  *
00456  *   -1 if the server ended the COPY.
00457  *
00458  * ereports on error.
00459  */
00460 static int
00461 libpqrcv_receive(int timeout, char **buffer)
00462 {
00463     int         rawlen;
00464 
00465     if (recvBuf != NULL)
00466         PQfreemem(recvBuf);
00467     recvBuf = NULL;
00468 
00469     /* Try to receive a CopyData message */
00470     rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
00471     if (rawlen == 0)
00472     {
00473         /*
00474          * No data available yet. If the caller requested to block, wait for
00475          * more data to arrive.
00476          */
00477         if (timeout > 0)
00478         {
00479             if (!libpq_select(timeout))
00480                 return 0;
00481         }
00482 
00483         if (PQconsumeInput(streamConn) == 0)
00484             ereport(ERROR,
00485                     (errmsg("could not receive data from WAL stream: %s",
00486                             PQerrorMessage(streamConn))));
00487 
00488         /* Now that we've consumed some input, try again */
00489         rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
00490         if (rawlen == 0)
00491             return 0;
00492     }
00493     if (rawlen == -1)           /* end-of-streaming or error */
00494     {
00495         PGresult   *res;
00496 
00497         res = PQgetResult(streamConn);
00498         if (PQresultStatus(res) == PGRES_COMMAND_OK ||
00499             PQresultStatus(res) == PGRES_COPY_IN)
00500         {
00501             PQclear(res);
00502             return -1;
00503         }
00504         else
00505         {
00506             PQclear(res);
00507             ereport(ERROR,
00508                     (errmsg("could not receive data from WAL stream: %s",
00509                             PQerrorMessage(streamConn))));
00510         }
00511     }
00512     if (rawlen < -1)
00513         ereport(ERROR,
00514                 (errmsg("could not receive data from WAL stream: %s",
00515                         PQerrorMessage(streamConn))));
00516 
00517     /* Return received messages to caller */
00518     *buffer = recvBuf;
00519     return rawlen;
00520 }
00521 
00522 /*
00523  * Send a message to XLOG stream.
00524  *
00525  * ereports on error.
00526  */
00527 static void
00528 libpqrcv_send(const char *buffer, int nbytes)
00529 {
00530     if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
00531         PQflush(streamConn))
00532         ereport(ERROR,
00533                 (errmsg("could not send data to WAL stream: %s",
00534                         PQerrorMessage(streamConn))));
00535 }