00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres_fe.h"
00016
00017 #include <dirent.h>
00018 #include <signal.h>
00019 #include <sys/stat.h>
00020 #include <sys/types.h>
00021 #include <unistd.h>
00022
00023 #include "libpq-fe.h"
00024 #include "access/xlog_internal.h"
00025 #include "getopt_long.h"
00026
00027 #include "receivelog.h"
00028 #include "streamutil.h"
00029
00030
00031
00032 #define RECONNECT_SLEEP_TIME 5
00033
00034
00035 char *basedir = NULL;
00036 int verbose = 0;
00037 int noloop = 0;
00038 int standby_message_timeout = 10 * 1000;
00039 volatile bool time_to_abort = false;
00040
00041
00042 static void usage(void);
00043 static XLogRecPtr FindStreamingStart(uint32 *tli);
00044 static void StreamLog();
00045 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
00046 bool segment_finished);
00047
00048 static void
00049 usage(void)
00050 {
00051 printf(_("%s receives PostgreSQL streaming transaction logs.\n\n"),
00052 progname);
00053 printf(_("Usage:\n"));
00054 printf(_(" %s [OPTION]...\n"), progname);
00055 printf(_("\nOptions:\n"));
00056 printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
00057 printf(_(" -n, --no-loop do not loop on connection lost\n"));
00058 printf(_(" -v, --verbose output verbose messages\n"));
00059 printf(_(" -V, --version output version information, then exit\n"));
00060 printf(_(" -?, --help show this help, then exit\n"));
00061 printf(_("\nConnection options:\n"));
00062 printf(_(" -d, --dbname=CONNSTR connection string\n"));
00063 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
00064 printf(_(" -p, --port=PORT database server port number\n"));
00065 printf(_(" -s, --status-interval=INTERVAL\n"
00066 " time between status packets sent to server (in seconds)\n"));
00067 printf(_(" -U, --username=NAME connect as specified database user\n"));
00068 printf(_(" -w, --no-password never prompt for password\n"));
00069 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
00070 printf(_("\nReport bugs to <[email protected]>.\n"));
00071 }
00072
00073 static bool
00074 stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
00075 {
00076 static uint32 prevtimeline = 0;
00077 static XLogRecPtr prevpos = InvalidXLogRecPtr;
00078
00079
00080 if (verbose && segment_finished)
00081 fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
00082 progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
00083 timeline);
00084
00085
00086
00087
00088
00089
00090
00091 if (prevtimeline != 0 && prevtimeline != timeline)
00092 fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
00093 progname, timeline,
00094 (uint32) (prevpos >> 32), (uint32) prevpos);
00095
00096 prevtimeline = timeline;
00097 prevpos = xlogpos;
00098
00099 if (time_to_abort)
00100 {
00101 fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
00102 progname);
00103 return true;
00104 }
00105 return false;
00106 }
00107
00108
00109
00110
00111
00112
00113
00114
00115 static XLogRecPtr
00116 FindStreamingStart(uint32 *tli)
00117 {
00118 DIR *dir;
00119 struct dirent *dirent;
00120 XLogSegNo high_segno = 0;
00121 uint32 high_tli = 0;
00122
00123 dir = opendir(basedir);
00124 if (dir == NULL)
00125 {
00126 fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
00127 progname, basedir, strerror(errno));
00128 disconnect_and_exit(1);
00129 }
00130
00131 while ((dirent = readdir(dir)) != NULL)
00132 {
00133 char fullpath[MAXPGPATH];
00134 struct stat statbuf;
00135 uint32 tli;
00136 unsigned int log,
00137 seg;
00138 XLogSegNo segno;
00139
00140
00141
00142
00143
00144
00145 if (strlen(dirent->d_name) != 24 ||
00146 !strspn(dirent->d_name, "0123456789ABCDEF") == 24)
00147 continue;
00148
00149
00150
00151
00152 if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
00153 {
00154 fprintf(stderr,
00155 _("%s: could not parse transaction log file name \"%s\"\n"),
00156 progname, dirent->d_name);
00157 disconnect_and_exit(1);
00158 }
00159 segno = ((uint64) log) << 32 | seg;
00160
00161
00162 snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
00163 if (stat(fullpath, &statbuf) != 0)
00164 {
00165 fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
00166 progname, fullpath, strerror(errno));
00167 disconnect_and_exit(1);
00168 }
00169
00170 if (statbuf.st_size == XLOG_SEG_SIZE)
00171 {
00172
00173 if (segno > high_segno || (segno == high_segno && tli > high_tli))
00174 {
00175 high_segno = segno;
00176 high_tli = tli;
00177 continue;
00178 }
00179 }
00180 else
00181 {
00182 fprintf(stderr,
00183 _("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
00184 progname, dirent->d_name, (int) statbuf.st_size);
00185 continue;
00186 }
00187 }
00188
00189 closedir(dir);
00190
00191 if (high_segno > 0)
00192 {
00193 XLogRecPtr high_ptr;
00194
00195
00196
00197
00198
00199 high_segno++;
00200
00201 XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr);
00202
00203 *tli = high_tli;
00204 return high_ptr;
00205 }
00206 else
00207 return InvalidXLogRecPtr;
00208 }
00209
00210
00211
00212
00213 static void
00214 StreamLog(void)
00215 {
00216 PGresult *res;
00217 XLogRecPtr startpos;
00218 uint32 starttli;
00219 XLogRecPtr serverpos;
00220 uint32 servertli;
00221 uint32 hi,
00222 lo;
00223
00224
00225
00226
00227 conn = GetConnection();
00228 if (!conn)
00229
00230 return;
00231
00232 if (!CheckServerVersionForStreaming(conn))
00233 {
00234
00235
00236
00237
00238
00239 disconnect_and_exit(1);
00240 }
00241
00242
00243
00244
00245
00246 res = PQexec(conn, "IDENTIFY_SYSTEM");
00247 if (PQresultStatus(res) != PGRES_TUPLES_OK)
00248 {
00249 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
00250 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
00251 disconnect_and_exit(1);
00252 }
00253 if (PQntuples(res) != 1 || PQnfields(res) != 3)
00254 {
00255 fprintf(stderr,
00256 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
00257 progname, PQntuples(res), PQnfields(res), 1, 3);
00258 disconnect_and_exit(1);
00259 }
00260 servertli = atoi(PQgetvalue(res, 0, 1));
00261 if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
00262 {
00263 fprintf(stderr,
00264 _("%s: could not parse transaction log location \"%s\"\n"),
00265 progname, PQgetvalue(res, 0, 2));
00266 disconnect_and_exit(1);
00267 }
00268 serverpos = ((uint64) hi) << 32 | lo;
00269 PQclear(res);
00270
00271
00272
00273
00274 startpos = FindStreamingStart(&starttli);
00275 if (startpos == InvalidXLogRecPtr)
00276 {
00277 startpos = serverpos;
00278 starttli = servertli;
00279 }
00280
00281
00282
00283
00284 startpos -= startpos % XLOG_SEG_SIZE;
00285
00286
00287
00288
00289 if (verbose)
00290 fprintf(stderr,
00291 _("%s: starting log streaming at %X/%X (timeline %u)\n"),
00292 progname, (uint32) (startpos >> 32), (uint32) startpos,
00293 starttli);
00294
00295 ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
00296 stop_streaming, standby_message_timeout, ".partial");
00297
00298 PQfinish(conn);
00299 }
00300
00301
00302
00303
00304
00305 #ifndef WIN32
00306
00307 static void
00308 sigint_handler(int signum)
00309 {
00310 time_to_abort = true;
00311 }
00312 #endif
00313
00314 int
00315 main(int argc, char **argv)
00316 {
00317 static struct option long_options[] = {
00318 {"help", no_argument, NULL, '?'},
00319 {"version", no_argument, NULL, 'V'},
00320 {"directory", required_argument, NULL, 'D'},
00321 {"dbname", required_argument, NULL, 'd'},
00322 {"host", required_argument, NULL, 'h'},
00323 {"port", required_argument, NULL, 'p'},
00324 {"username", required_argument, NULL, 'U'},
00325 {"no-loop", no_argument, NULL, 'n'},
00326 {"no-password", no_argument, NULL, 'w'},
00327 {"password", no_argument, NULL, 'W'},
00328 {"status-interval", required_argument, NULL, 's'},
00329 {"verbose", no_argument, NULL, 'v'},
00330 {NULL, 0, NULL, 0}
00331 };
00332
00333 int c;
00334 int option_index;
00335
00336 progname = get_progname(argv[0]);
00337 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
00338
00339 if (argc > 1)
00340 {
00341 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
00342 {
00343 usage();
00344 exit(0);
00345 }
00346 else if (strcmp(argv[1], "-V") == 0 ||
00347 strcmp(argv[1], "--version") == 0)
00348 {
00349 puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
00350 exit(0);
00351 }
00352 }
00353
00354 while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
00355 long_options, &option_index)) != -1)
00356 {
00357 switch (c)
00358 {
00359 case 'D':
00360 basedir = pg_strdup(optarg);
00361 break;
00362 case 'd':
00363 connection_string = pg_strdup(optarg);
00364 break;
00365 case 'h':
00366 dbhost = pg_strdup(optarg);
00367 break;
00368 case 'p':
00369 if (atoi(optarg) <= 0)
00370 {
00371 fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
00372 progname, optarg);
00373 exit(1);
00374 }
00375 dbport = pg_strdup(optarg);
00376 break;
00377 case 'U':
00378 dbuser = pg_strdup(optarg);
00379 break;
00380 case 'w':
00381 dbgetpassword = -1;
00382 break;
00383 case 'W':
00384 dbgetpassword = 1;
00385 break;
00386 case 's':
00387 standby_message_timeout = atoi(optarg) * 1000;
00388 if (standby_message_timeout < 0)
00389 {
00390 fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
00391 progname, optarg);
00392 exit(1);
00393 }
00394 break;
00395 case 'n':
00396 noloop = 1;
00397 break;
00398 case 'v':
00399 verbose++;
00400 break;
00401 default:
00402
00403
00404
00405
00406 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
00407 progname);
00408 exit(1);
00409 }
00410 }
00411
00412
00413
00414
00415 if (optind < argc)
00416 {
00417 fprintf(stderr,
00418 _("%s: too many command-line arguments (first is \"%s\")\n"),
00419 progname, argv[optind]);
00420 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
00421 progname);
00422 exit(1);
00423 }
00424
00425
00426
00427
00428 if (basedir == NULL)
00429 {
00430 fprintf(stderr, _("%s: no target directory specified\n"), progname);
00431 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
00432 progname);
00433 exit(1);
00434 }
00435
00436 #ifndef WIN32
00437 pqsignal(SIGINT, sigint_handler);
00438 #endif
00439
00440 while (true)
00441 {
00442 StreamLog();
00443 if (time_to_abort)
00444 {
00445
00446
00447
00448
00449 exit(0);
00450 }
00451 else if (noloop)
00452 {
00453 fprintf(stderr, _("%s: disconnected\n"), progname);
00454 exit(1);
00455 }
00456 else
00457 {
00458 fprintf(stderr,
00459
00460 _("%s: disconnected; waiting %d seconds to try again\n"),
00461 progname, RECONNECT_SLEEP_TIME);
00462 pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
00463 }
00464 }
00465 }