00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "postgres.h"
00018
00019 #include <unistd.h>
00020 #include <sys/time.h>
00021
00022 #include "libpq-fe.h"
00023 #include "access/xlog.h"
00024 #include "miscadmin.h"
00025 #include "replication/walreceiver.h"
00026 #include "utils/builtins.h"
00027
00028 #ifdef HAVE_POLL_H
00029 #include <poll.h>
00030 #endif
00031 #ifdef HAVE_SYS_POLL_H
00032 #include <sys/poll.h>
00033 #endif
00034 #ifdef HAVE_SYS_SELECT_H
00035 #include <sys/select.h>
00036 #endif
00037
00038 PG_MODULE_MAGIC;
00039
00040 void _PG_init(void);
00041
00042
00043 static PGconn *streamConn = NULL;
00044
00045
00046 static char *recvBuf = NULL;
00047
00048
00049 static void libpqrcv_connect(char *conninfo);
00050 static void libpqrcv_identify_system(TimeLineID *primary_tli);
00051 static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
00052 static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
00053 static void libpqrcv_endstreaming(TimeLineID *next_tli);
00054 static int libpqrcv_receive(int timeout, char **buffer);
00055 static void libpqrcv_send(const char *buffer, int nbytes);
00056 static void libpqrcv_disconnect(void);
00057
00058
00059 static bool libpq_select(int timeout_ms);
00060 static PGresult *libpqrcv_PQexec(const char *query);
00061
00062
00063
00064
00065 void
00066 _PG_init(void)
00067 {
00068
00069 if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
00070 walrcv_readtimelinehistoryfile != NULL ||
00071 walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
00072 walrcv_receive != NULL || walrcv_send != NULL ||
00073 walrcv_disconnect != NULL)
00074 elog(ERROR, "libpqwalreceiver already loaded");
00075 walrcv_connect = libpqrcv_connect;
00076 walrcv_identify_system = libpqrcv_identify_system;
00077 walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
00078 walrcv_startstreaming = libpqrcv_startstreaming;
00079 walrcv_endstreaming = libpqrcv_endstreaming;
00080 walrcv_receive = libpqrcv_receive;
00081 walrcv_send = libpqrcv_send;
00082 walrcv_disconnect = libpqrcv_disconnect;
00083 }
00084
00085
00086
00087
00088 static void
00089 libpqrcv_connect(char *conninfo)
00090 {
00091 char conninfo_repl[MAXCONNINFO + 75];
00092
00093
00094
00095
00096
00097
00098 snprintf(conninfo_repl, sizeof(conninfo_repl),
00099 "%s dbname=replication replication=true fallback_application_name=walreceiver",
00100 conninfo);
00101
00102 streamConn = PQconnectdb(conninfo_repl);
00103 if (PQstatus(streamConn) != CONNECTION_OK)
00104 ereport(ERROR,
00105 (errmsg("could not connect to the primary server: %s",
00106 PQerrorMessage(streamConn))));
00107 }
00108
00109
00110
00111
00112
00113 static void
00114 libpqrcv_identify_system(TimeLineID *primary_tli)
00115 {
00116 PGresult *res;
00117 char *primary_sysid;
00118 char standby_sysid[32];
00119
00120
00121
00122
00123
00124 res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
00125 if (PQresultStatus(res) != PGRES_TUPLES_OK)
00126 {
00127 PQclear(res);
00128 ereport(ERROR,
00129 (errmsg("could not receive database system identifier and timeline ID from "
00130 "the primary server: %s",
00131 PQerrorMessage(streamConn))));
00132 }
00133 if (PQnfields(res) != 3 || PQntuples(res) != 1)
00134 {
00135 int ntuples = PQntuples(res);
00136 int nfields = PQnfields(res);
00137
00138 PQclear(res);
00139 ereport(ERROR,
00140 (errmsg("invalid response from primary server"),
00141 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
00142 ntuples, nfields)));
00143 }
00144 primary_sysid = PQgetvalue(res, 0, 0);
00145 *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
00146
00147
00148
00149
00150 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
00151 GetSystemIdentifier());
00152 if (strcmp(primary_sysid, standby_sysid) != 0)
00153 {
00154 PQclear(res);
00155 ereport(ERROR,
00156 (errmsg("database system identifier differs between the primary and standby"),
00157 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
00158 primary_sysid, standby_sysid)));
00159 }
00160 PQclear(res);
00161 }
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173 static bool
00174 libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
00175 {
00176 char cmd[64];
00177 PGresult *res;
00178
00179
00180 snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
00181 (uint32) (startpoint >> 32), (uint32) startpoint,
00182 tli);
00183 res = libpqrcv_PQexec(cmd);
00184
00185 if (PQresultStatus(res) == PGRES_COMMAND_OK)
00186 {
00187 PQclear(res);
00188 return false;
00189 }
00190 else if (PQresultStatus(res) != PGRES_COPY_BOTH)
00191 {
00192 PQclear(res);
00193 ereport(ERROR,
00194 (errmsg("could not start WAL streaming: %s",
00195 PQerrorMessage(streamConn))));
00196 }
00197 PQclear(res);
00198 return true;
00199 }
00200
00201
00202
00203
00204
00205 static void
00206 libpqrcv_endstreaming(TimeLineID *next_tli)
00207 {
00208 PGresult *res;
00209
00210 if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
00211 ereport(ERROR,
00212 (errmsg("could not send end-of-streaming message to primary: %s",
00213 PQerrorMessage(streamConn))));
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224 res = PQgetResult(streamConn);
00225 if (PQresultStatus(res) == PGRES_TUPLES_OK)
00226 {
00227
00228 if (PQnfields(res) != 1 || PQntuples(res) != 1)
00229 ereport(ERROR,
00230 (errmsg("unexpected result set after end-of-streaming")));
00231 *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
00232 PQclear(res);
00233
00234
00235 res = PQgetResult(streamConn);
00236 }
00237 else
00238 *next_tli = 0;
00239
00240 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00241 ereport(ERROR,
00242 (errmsg("error reading result of streaming command: %s",
00243 PQerrorMessage(streamConn))));
00244
00245
00246 res = PQgetResult(streamConn);
00247 if (res != NULL)
00248 ereport(ERROR,
00249 (errmsg("unexpected result after CommandComplete: %s",
00250 PQerrorMessage(streamConn))));
00251 }
00252
00253
00254
00255
00256 static void
00257 libpqrcv_readtimelinehistoryfile(TimeLineID tli,
00258 char **filename, char **content, int *len)
00259 {
00260 PGresult *res;
00261 char cmd[64];
00262
00263
00264
00265
00266 snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
00267 res = libpqrcv_PQexec(cmd);
00268 if (PQresultStatus(res) != PGRES_TUPLES_OK)
00269 {
00270 PQclear(res);
00271 ereport(ERROR,
00272 (errmsg("could not receive timeline history file from "
00273 "the primary server: %s",
00274 PQerrorMessage(streamConn))));
00275 }
00276 if (PQnfields(res) != 2 || PQntuples(res) != 1)
00277 {
00278 int ntuples = PQntuples(res);
00279 int nfields = PQnfields(res);
00280
00281 PQclear(res);
00282 ereport(ERROR,
00283 (errmsg("invalid response from primary server"),
00284 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
00285 ntuples, nfields)));
00286 }
00287 *filename = pstrdup(PQgetvalue(res, 0, 0));
00288
00289 *len = PQgetlength(res, 0, 1);
00290 *content = palloc(*len);
00291 memcpy(*content, PQgetvalue(res, 0, 1), *len);
00292 PQclear(res);
00293 }
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303 static bool
00304 libpq_select(int timeout_ms)
00305 {
00306 int ret;
00307
00308 Assert(streamConn != NULL);
00309 if (PQsocket(streamConn) < 0)
00310 ereport(ERROR,
00311 (errcode_for_socket_access(),
00312 errmsg("socket not open")));
00313
00314
00315 {
00316 #ifdef HAVE_POLL
00317 struct pollfd input_fd;
00318
00319 input_fd.fd = PQsocket(streamConn);
00320 input_fd.events = POLLIN | POLLERR;
00321 input_fd.revents = 0;
00322
00323 ret = poll(&input_fd, 1, timeout_ms);
00324 #else
00325
00326 fd_set input_mask;
00327 struct timeval timeout;
00328 struct timeval *ptr_timeout;
00329
00330 FD_ZERO(&input_mask);
00331 FD_SET(PQsocket(streamConn), &input_mask);
00332
00333 if (timeout_ms < 0)
00334 ptr_timeout = NULL;
00335 else
00336 {
00337 timeout.tv_sec = timeout_ms / 1000;
00338 timeout.tv_usec = (timeout_ms % 1000) * 1000;
00339 ptr_timeout = &timeout;
00340 }
00341
00342 ret = select(PQsocket(streamConn) + 1, &input_mask,
00343 NULL, NULL, ptr_timeout);
00344 #endif
00345 }
00346
00347 if (ret == 0 || (ret < 0 && errno == EINTR))
00348 return false;
00349 if (ret < 0)
00350 ereport(ERROR,
00351 (errcode_for_socket_access(),
00352 errmsg("select() failed: %m")));
00353 return true;
00354 }
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372 static PGresult *
00373 libpqrcv_PQexec(const char *query)
00374 {
00375 PGresult *result = NULL;
00376 PGresult *lastResult = NULL;
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 if (!PQsendQuery(streamConn, query))
00389 return NULL;
00390
00391 for (;;)
00392 {
00393
00394
00395
00396
00397 while (PQisBusy(streamConn))
00398 {
00399
00400
00401
00402
00403
00404
00405 if (!libpq_select(-1))
00406 continue;
00407 if (PQconsumeInput(streamConn) == 0)
00408 return NULL;
00409 }
00410
00411
00412
00413
00414
00415
00416 result = PQgetResult(streamConn);
00417 if (result == NULL)
00418 break;
00419
00420 PQclear(lastResult);
00421 lastResult = result;
00422
00423 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
00424 PQresultStatus(lastResult) == PGRES_COPY_OUT ||
00425 PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
00426 PQstatus(streamConn) == CONNECTION_BAD)
00427 break;
00428 }
00429
00430 return lastResult;
00431 }
00432
00433
00434
00435
00436 static void
00437 libpqrcv_disconnect(void)
00438 {
00439 PQfinish(streamConn);
00440 streamConn = NULL;
00441 }
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460 static int
00461 libpqrcv_receive(int timeout, char **buffer)
00462 {
00463 int rawlen;
00464
00465 if (recvBuf != NULL)
00466 PQfreemem(recvBuf);
00467 recvBuf = NULL;
00468
00469
00470 rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
00471 if (rawlen == 0)
00472 {
00473
00474
00475
00476
00477 if (timeout > 0)
00478 {
00479 if (!libpq_select(timeout))
00480 return 0;
00481 }
00482
00483 if (PQconsumeInput(streamConn) == 0)
00484 ereport(ERROR,
00485 (errmsg("could not receive data from WAL stream: %s",
00486 PQerrorMessage(streamConn))));
00487
00488
00489 rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
00490 if (rawlen == 0)
00491 return 0;
00492 }
00493 if (rawlen == -1)
00494 {
00495 PGresult *res;
00496
00497 res = PQgetResult(streamConn);
00498 if (PQresultStatus(res) == PGRES_COMMAND_OK ||
00499 PQresultStatus(res) == PGRES_COPY_IN)
00500 {
00501 PQclear(res);
00502 return -1;
00503 }
00504 else
00505 {
00506 PQclear(res);
00507 ereport(ERROR,
00508 (errmsg("could not receive data from WAL stream: %s",
00509 PQerrorMessage(streamConn))));
00510 }
00511 }
00512 if (rawlen < -1)
00513 ereport(ERROR,
00514 (errmsg("could not receive data from WAL stream: %s",
00515 PQerrorMessage(streamConn))));
00516
00517
00518 *buffer = recvBuf;
00519 return rawlen;
00520 }
00521
00522
00523
00524
00525
00526
00527 static void
00528 libpqrcv_send(const char *buffer, int nbytes)
00529 {
00530 if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
00531 PQflush(streamConn))
00532 ereport(ERROR,
00533 (errmsg("could not send data to WAL stream: %s",
00534 PQerrorMessage(streamConn))));
00535 }