Header And Logo

PostgreSQL
| The world's most advanced open source database.

pg_receivexlog.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * pg_receivexlog.c - receive streaming transaction log data and write it
00004  *                    to a local file.
00005  *
00006  * Author: Magnus Hagander <[email protected]>
00007  *
00008  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00009  *
00010  * IDENTIFICATION
00011  *        src/bin/pg_basebackup/pg_receivexlog.c
00012  *-------------------------------------------------------------------------
00013  */
00014 
00015 #include "postgres_fe.h"
00016 
00017 #include <dirent.h>
00018 #include <signal.h>
00019 #include <sys/stat.h>
00020 #include <sys/types.h>
00021 #include <unistd.h>
00022 
00023 #include "libpq-fe.h"
00024 #include "access/xlog_internal.h"
00025 #include "getopt_long.h"
00026 
00027 #include "receivelog.h"
00028 #include "streamutil.h"
00029 
00030 
00031 /* Time to sleep between reconnection attempts */
00032 #define RECONNECT_SLEEP_TIME 5
00033 
00034 /* Global options */
00035 char       *basedir = NULL;
00036 int         verbose = 0;
00037 int         noloop = 0;
00038 int         standby_message_timeout = 10 * 1000;        /* 10 sec = default */
00039 volatile bool time_to_abort = false;
00040 
00041 
00042 static void usage(void);
00043 static XLogRecPtr FindStreamingStart(uint32 *tli);
00044 static void StreamLog();
00045 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
00046                bool segment_finished);
00047 
00048 static void
00049 usage(void)
00050 {
00051     printf(_("%s receives PostgreSQL streaming transaction logs.\n\n"),
00052            progname);
00053     printf(_("Usage:\n"));
00054     printf(_("  %s [OPTION]...\n"), progname);
00055     printf(_("\nOptions:\n"));
00056     printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
00057     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
00058     printf(_("  -v, --verbose          output verbose messages\n"));
00059     printf(_("  -V, --version          output version information, then exit\n"));
00060     printf(_("  -?, --help             show this help, then exit\n"));
00061     printf(_("\nConnection options:\n"));
00062     printf(_("  -d, --dbname=CONNSTR   connection string\n"));
00063     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
00064     printf(_("  -p, --port=PORT        database server port number\n"));
00065     printf(_("  -s, --status-interval=INTERVAL\n"
00066              "                         time between status packets sent to server (in seconds)\n"));
00067     printf(_("  -U, --username=NAME    connect as specified database user\n"));
00068     printf(_("  -w, --no-password      never prompt for password\n"));
00069     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
00070     printf(_("\nReport bugs to <[email protected]>.\n"));
00071 }
00072 
00073 static bool
00074 stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
00075 {
00076     static uint32 prevtimeline = 0;
00077     static XLogRecPtr prevpos = InvalidXLogRecPtr;
00078 
00079     /* we assume that we get called once at the end of each segment */
00080     if (verbose && segment_finished)
00081         fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
00082                 progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
00083                 timeline);
00084 
00085     /*
00086      * Note that we report the previous, not current, position here. That's
00087      * the exact location where the timeline switch happend. After the switch,
00088      * we restart streaming from the beginning of the segment, so xlogpos can
00089      * smaller than prevpos if we just switched to new timeline.
00090      */
00091     if (prevtimeline != 0 && prevtimeline != timeline)
00092         fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
00093                 progname, timeline,
00094                 (uint32) (prevpos >> 32), (uint32) prevpos);
00095 
00096     prevtimeline = timeline;
00097     prevpos = xlogpos;
00098 
00099     if (time_to_abort)
00100     {
00101         fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
00102                 progname);
00103         return true;
00104     }
00105     return false;
00106 }
00107 
00108 /*
00109  * Determine starting location for streaming, based on any existing xlog
00110  * segments in the directory. We start at the end of the last one that is
00111  * complete (size matches XLogSegSize), on the timeline with highest ID.
00112  *
00113  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
00114  */
00115 static XLogRecPtr
00116 FindStreamingStart(uint32 *tli)
00117 {
00118     DIR        *dir;
00119     struct dirent *dirent;
00120     XLogSegNo   high_segno = 0;
00121     uint32      high_tli = 0;
00122 
00123     dir = opendir(basedir);
00124     if (dir == NULL)
00125     {
00126         fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
00127                 progname, basedir, strerror(errno));
00128         disconnect_and_exit(1);
00129     }
00130 
00131     while ((dirent = readdir(dir)) != NULL)
00132     {
00133         char        fullpath[MAXPGPATH];
00134         struct stat statbuf;
00135         uint32      tli;
00136         unsigned int log,
00137                     seg;
00138         XLogSegNo   segno;
00139 
00140         /*
00141          * Check if the filename looks like an xlog file, or a .partial file.
00142          * Xlog files are always 24 characters, and .partial files are 32
00143          * characters.
00144          */
00145         if (strlen(dirent->d_name) != 24 ||
00146             !strspn(dirent->d_name, "0123456789ABCDEF") == 24)
00147             continue;
00148 
00149         /*
00150          * Looks like an xlog file. Parse its position.
00151          */
00152         if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
00153         {
00154             fprintf(stderr,
00155                  _("%s: could not parse transaction log file name \"%s\"\n"),
00156                     progname, dirent->d_name);
00157             disconnect_and_exit(1);
00158         }
00159         segno = ((uint64) log) << 32 | seg;
00160 
00161         /* Check if this is a completed segment or not */
00162         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
00163         if (stat(fullpath, &statbuf) != 0)
00164         {
00165             fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
00166                     progname, fullpath, strerror(errno));
00167             disconnect_and_exit(1);
00168         }
00169 
00170         if (statbuf.st_size == XLOG_SEG_SIZE)
00171         {
00172             /* Completed segment */
00173             if (segno > high_segno || (segno == high_segno && tli > high_tli))
00174             {
00175                 high_segno = segno;
00176                 high_tli = tli;
00177                 continue;
00178             }
00179         }
00180         else
00181         {
00182             fprintf(stderr,
00183               _("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
00184                     progname, dirent->d_name, (int) statbuf.st_size);
00185             continue;
00186         }
00187     }
00188 
00189     closedir(dir);
00190 
00191     if (high_segno > 0)
00192     {
00193         XLogRecPtr  high_ptr;
00194 
00195         /*
00196          * Move the starting pointer to the start of the next segment, since
00197          * the highest one we've seen was completed.
00198          */
00199         high_segno++;
00200 
00201         XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr);
00202 
00203         *tli = high_tli;
00204         return high_ptr;
00205     }
00206     else
00207         return InvalidXLogRecPtr;
00208 }
00209 
00210 /*
00211  * Start the log streaming
00212  */
00213 static void
00214 StreamLog(void)
00215 {
00216     PGresult   *res;
00217     XLogRecPtr  startpos;
00218     uint32      starttli;
00219     XLogRecPtr  serverpos;
00220     uint32      servertli;
00221     uint32      hi,
00222                 lo;
00223 
00224     /*
00225      * Connect in replication mode to the server
00226      */
00227     conn = GetConnection();
00228     if (!conn)
00229         /* Error message already written in GetConnection() */
00230         return;
00231 
00232     if (!CheckServerVersionForStreaming(conn))
00233     {
00234         /*
00235          * Error message already written in CheckServerVersionForStreaming().
00236          * There's no hope of recovering from a version mismatch, so don't
00237          * retry.
00238          */
00239         disconnect_and_exit(1);
00240     }
00241 
00242     /*
00243      * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
00244      * position.
00245      */
00246     res = PQexec(conn, "IDENTIFY_SYSTEM");
00247     if (PQresultStatus(res) != PGRES_TUPLES_OK)
00248     {
00249         fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
00250                 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
00251         disconnect_and_exit(1);
00252     }
00253     if (PQntuples(res) != 1 || PQnfields(res) != 3)
00254     {
00255         fprintf(stderr,
00256                 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
00257                 progname, PQntuples(res), PQnfields(res), 1, 3);
00258         disconnect_and_exit(1);
00259     }
00260     servertli = atoi(PQgetvalue(res, 0, 1));
00261     if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
00262     {
00263         fprintf(stderr,
00264                 _("%s: could not parse transaction log location \"%s\"\n"),
00265                 progname, PQgetvalue(res, 0, 2));
00266         disconnect_and_exit(1);
00267     }
00268     serverpos = ((uint64) hi) << 32 | lo;
00269     PQclear(res);
00270 
00271     /*
00272      * Figure out where to start streaming.
00273      */
00274     startpos = FindStreamingStart(&starttli);
00275     if (startpos == InvalidXLogRecPtr)
00276     {
00277         startpos = serverpos;
00278         starttli = servertli;
00279     }
00280 
00281     /*
00282      * Always start streaming at the beginning of a segment
00283      */
00284     startpos -= startpos % XLOG_SEG_SIZE;
00285 
00286     /*
00287      * Start the replication
00288      */
00289     if (verbose)
00290         fprintf(stderr,
00291                 _("%s: starting log streaming at %X/%X (timeline %u)\n"),
00292                 progname, (uint32) (startpos >> 32), (uint32) startpos,
00293                 starttli);
00294 
00295     ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
00296                       stop_streaming, standby_message_timeout, ".partial");
00297 
00298     PQfinish(conn);
00299 }
00300 
00301 /*
00302  * When sigint is called, just tell the system to exit at the next possible
00303  * moment.
00304  */
00305 #ifndef WIN32
00306 
00307 static void
00308 sigint_handler(int signum)
00309 {
00310     time_to_abort = true;
00311 }
00312 #endif
00313 
00314 int
00315 main(int argc, char **argv)
00316 {
00317     static struct option long_options[] = {
00318         {"help", no_argument, NULL, '?'},
00319         {"version", no_argument, NULL, 'V'},
00320         {"directory", required_argument, NULL, 'D'},
00321         {"dbname", required_argument, NULL, 'd'},
00322         {"host", required_argument, NULL, 'h'},
00323         {"port", required_argument, NULL, 'p'},
00324         {"username", required_argument, NULL, 'U'},
00325         {"no-loop", no_argument, NULL, 'n'},
00326         {"no-password", no_argument, NULL, 'w'},
00327         {"password", no_argument, NULL, 'W'},
00328         {"status-interval", required_argument, NULL, 's'},
00329         {"verbose", no_argument, NULL, 'v'},
00330         {NULL, 0, NULL, 0}
00331     };
00332 
00333     int         c;
00334     int         option_index;
00335 
00336     progname = get_progname(argv[0]);
00337     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
00338 
00339     if (argc > 1)
00340     {
00341         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
00342         {
00343             usage();
00344             exit(0);
00345         }
00346         else if (strcmp(argv[1], "-V") == 0 ||
00347                  strcmp(argv[1], "--version") == 0)
00348         {
00349             puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
00350             exit(0);
00351         }
00352     }
00353 
00354     while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
00355                             long_options, &option_index)) != -1)
00356     {
00357         switch (c)
00358         {
00359             case 'D':
00360                 basedir = pg_strdup(optarg);
00361                 break;
00362             case 'd':
00363                 connection_string = pg_strdup(optarg);
00364                 break;
00365             case 'h':
00366                 dbhost = pg_strdup(optarg);
00367                 break;
00368             case 'p':
00369                 if (atoi(optarg) <= 0)
00370                 {
00371                     fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
00372                             progname, optarg);
00373                     exit(1);
00374                 }
00375                 dbport = pg_strdup(optarg);
00376                 break;
00377             case 'U':
00378                 dbuser = pg_strdup(optarg);
00379                 break;
00380             case 'w':
00381                 dbgetpassword = -1;
00382                 break;
00383             case 'W':
00384                 dbgetpassword = 1;
00385                 break;
00386             case 's':
00387                 standby_message_timeout = atoi(optarg) * 1000;
00388                 if (standby_message_timeout < 0)
00389                 {
00390                     fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
00391                             progname, optarg);
00392                     exit(1);
00393                 }
00394                 break;
00395             case 'n':
00396                 noloop = 1;
00397                 break;
00398             case 'v':
00399                 verbose++;
00400                 break;
00401             default:
00402 
00403                 /*
00404                  * getopt_long already emitted a complaint
00405                  */
00406                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
00407                         progname);
00408                 exit(1);
00409         }
00410     }
00411 
00412     /*
00413      * Any non-option arguments?
00414      */
00415     if (optind < argc)
00416     {
00417         fprintf(stderr,
00418                 _("%s: too many command-line arguments (first is \"%s\")\n"),
00419                 progname, argv[optind]);
00420         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
00421                 progname);
00422         exit(1);
00423     }
00424 
00425     /*
00426      * Required arguments
00427      */
00428     if (basedir == NULL)
00429     {
00430         fprintf(stderr, _("%s: no target directory specified\n"), progname);
00431         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
00432                 progname);
00433         exit(1);
00434     }
00435 
00436 #ifndef WIN32
00437     pqsignal(SIGINT, sigint_handler);
00438 #endif
00439 
00440     while (true)
00441     {
00442         StreamLog();
00443         if (time_to_abort)
00444         {
00445             /*
00446              * We've been Ctrl-C'ed. That's not an error, so exit without an
00447              * errorcode.
00448              */
00449             exit(0);
00450         }
00451         else if (noloop)
00452         {
00453             fprintf(stderr, _("%s: disconnected\n"), progname);
00454             exit(1);
00455         }
00456         else
00457         {
00458             fprintf(stderr,
00459                     /* translator: check source for value for %d */
00460                     _("%s: disconnected; waiting %d seconds to try again\n"),
00461                     progname, RECONNECT_SLEEP_TIME);
00462             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
00463         }
00464     }
00465 }