Header And Logo

PostgreSQL
| The world's most advanced open source database.

receivelog.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * receivelog.c - receive transaction log files using the streaming
00004  *                replication protocol.
00005  *
00006  * Author: Magnus Hagander <[email protected]>
00007  *
00008  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00009  *
00010  * IDENTIFICATION
00011  *        src/bin/pg_basebackup/receivelog.c
00012  *-------------------------------------------------------------------------
00013  */
00014 #include "postgres_fe.h"
00015 
00016 #include <sys/stat.h>
00017 #include <sys/time.h>
00018 #include <sys/types.h>
00019 #include <unistd.h>
00020 /* for ntohl/htonl */
00021 #include <netinet/in.h>
00022 #include <arpa/inet.h>
00023 
00024 #include "libpq-fe.h"
00025 #include "access/xlog_internal.h"
00026 
00027 #include "receivelog.h"
00028 #include "streamutil.h"
00029 
00030 
00031 /* fd and filename for currently open WAL file */
00032 static int  walfile = -1;
00033 static char current_walfile_name[MAXPGPATH] = "";
00034 
00035 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
00036                  uint32 timeline, char *basedir,
00037                  stream_stop_callback stream_stop, int standby_message_timeout,
00038                  char *partial_suffix, XLogRecPtr *stoppos);
00039 
00040 /*
00041  * Open a new WAL file in the specified directory.
00042  *
00043  * The file will be padded to 16Mb with zeroes. The base filename (without
00044  * partial_suffix) is stored in current_walfile_name.
00045  */
00046 static bool
00047 open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
00048              char *partial_suffix)
00049 {
00050     int         f;
00051     char        fn[MAXPGPATH];
00052     struct stat statbuf;
00053     char       *zerobuf;
00054     int         bytes;
00055     XLogSegNo   segno;
00056 
00057     XLByteToSeg(startpoint, segno);
00058     XLogFileName(current_walfile_name, timeline, segno);
00059 
00060     snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
00061              partial_suffix ? partial_suffix : "");
00062     f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
00063     if (f == -1)
00064     {
00065         fprintf(stderr,
00066                 _("%s: could not open transaction log file \"%s\": %s\n"),
00067                 progname, fn, strerror(errno));
00068         return false;
00069     }
00070 
00071     /*
00072      * Verify that the file is either empty (just created), or a complete
00073      * XLogSegSize segment. Anything in between indicates a corrupt file.
00074      */
00075     if (fstat(f, &statbuf) != 0)
00076     {
00077         fprintf(stderr,
00078                 _("%s: could not stat transaction log file \"%s\": %s\n"),
00079                 progname, fn, strerror(errno));
00080         close(f);
00081         return false;
00082     }
00083     if (statbuf.st_size == XLogSegSize)
00084     {
00085         /* File is open and ready to use */
00086         walfile = f;
00087         return true;
00088     }
00089     if (statbuf.st_size != 0)
00090     {
00091         fprintf(stderr,
00092                 _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
00093                 progname, fn, (int) statbuf.st_size, XLogSegSize);
00094         close(f);
00095         return false;
00096     }
00097 
00098     /* New, empty, file. So pad it to 16Mb with zeroes */
00099     zerobuf = pg_malloc0(XLOG_BLCKSZ);
00100     for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
00101     {
00102         if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
00103         {
00104             fprintf(stderr,
00105                     _("%s: could not pad transaction log file \"%s\": %s\n"),
00106                     progname, fn, strerror(errno));
00107             free(zerobuf);
00108             close(f);
00109             unlink(fn);
00110             return false;
00111         }
00112     }
00113     free(zerobuf);
00114 
00115     if (lseek(f, SEEK_SET, 0) != 0)
00116     {
00117         fprintf(stderr,
00118                 _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
00119                 progname, fn, strerror(errno));
00120         close(f);
00121         return false;
00122     }
00123     walfile = f;
00124     return true;
00125 }
00126 
00127 /*
00128  * Close the current WAL file (if open), and rename it to the correct
00129  * filename if it's complete. On failure, prints an error message to stderr
00130  * and returns false, otherwise returns true.
00131  */
00132 static bool
00133 close_walfile(char *basedir, char *partial_suffix)
00134 {
00135     off_t       currpos;
00136 
00137     if (walfile == -1)
00138         return true;
00139 
00140     currpos = lseek(walfile, 0, SEEK_CUR);
00141     if (currpos == -1)
00142     {
00143         fprintf(stderr,
00144              _("%s: could not determine seek position in file \"%s\": %s\n"),
00145                 progname, current_walfile_name, strerror(errno));
00146         return false;
00147     }
00148 
00149     if (fsync(walfile) != 0)
00150     {
00151         fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
00152                 progname, current_walfile_name, strerror(errno));
00153         return false;
00154     }
00155 
00156     if (close(walfile) != 0)
00157     {
00158         fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
00159                 progname, current_walfile_name, strerror(errno));
00160         walfile = -1;
00161         return false;
00162     }
00163     walfile = -1;
00164 
00165     /*
00166      * Rename the .partial file only if we've completed writing the whole
00167      * segment or segment_complete is true.
00168      */
00169     if (currpos == XLOG_SEG_SIZE && partial_suffix)
00170     {
00171         char        oldfn[MAXPGPATH];
00172         char        newfn[MAXPGPATH];
00173 
00174         snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
00175         snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
00176         if (rename(oldfn, newfn) != 0)
00177         {
00178             fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
00179                     progname, current_walfile_name, strerror(errno));
00180             return false;
00181         }
00182     }
00183     else if (partial_suffix)
00184         fprintf(stderr,
00185                 _("%s: not renaming \"%s%s\", segment is not complete\n"),
00186                 progname, current_walfile_name, partial_suffix);
00187 
00188     return true;
00189 }
00190 
00191 
00192 /*
00193  * Local version of GetCurrentTimestamp(), since we are not linked with
00194  * backend code. The protocol always uses integer timestamps, regardless of
00195  * server setting.
00196  */
00197 static int64
00198 localGetCurrentTimestamp(void)
00199 {
00200     int64 result;
00201     struct timeval tp;
00202 
00203     gettimeofday(&tp, NULL);
00204 
00205     result = (int64) tp.tv_sec -
00206         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
00207 
00208     result = (result * USECS_PER_SEC) + tp.tv_usec;
00209 
00210     return result;
00211 }
00212 
00213 /*
00214  * Local version of TimestampDifference(), since we are not linked with
00215  * backend code.
00216  */
00217 static void
00218 localTimestampDifference(int64 start_time, int64 stop_time,
00219                          long *secs, int *microsecs)
00220 {
00221     int64 diff = stop_time - start_time;
00222 
00223     if (diff <= 0)
00224     {
00225         *secs = 0;
00226         *microsecs = 0;
00227     }
00228     else
00229     {
00230         *secs = (long) (diff / USECS_PER_SEC);
00231         *microsecs = (int) (diff % USECS_PER_SEC);
00232     }
00233 }
00234 
00235 /*
00236  * Local version of TimestampDifferenceExceeds(), since we are not
00237  * linked with backend code.
00238  */
00239 static bool
00240 localTimestampDifferenceExceeds(int64 start_time,
00241                                 int64 stop_time,
00242                                 int msec)
00243 {
00244     int64 diff = stop_time - start_time;
00245 
00246     return (diff >= msec * INT64CONST(1000));
00247 }
00248 
00249 /*
00250  * Check if a timeline history file exists.
00251  */
00252 static bool
00253 existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
00254 {
00255     char        path[MAXPGPATH];
00256     char        histfname[MAXFNAMELEN];
00257     int         fd;
00258 
00259     /*
00260      * Timeline 1 never has a history file. We treat that as if it existed,
00261      * since we never need to stream it.
00262      */
00263     if (tli == 1)
00264         return true;
00265 
00266     TLHistoryFileName(histfname, tli);
00267 
00268     snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
00269 
00270     fd = open(path, O_RDONLY | PG_BINARY, 0);
00271     if (fd < 0)
00272     {
00273         if (errno != ENOENT)
00274             fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
00275                     progname, path, strerror(errno));
00276         return false;
00277     }
00278     else
00279     {
00280         close(fd);
00281         return true;
00282     }
00283 }
00284 
00285 static bool
00286 writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
00287 {
00288     int         size = strlen(content);
00289     char        path[MAXPGPATH];
00290     char        tmppath[MAXPGPATH];
00291     char        histfname[MAXFNAMELEN];
00292     int         fd;
00293 
00294     /*
00295      * Check that the server's idea of how timeline history files should be
00296      * named matches ours.
00297      */
00298     TLHistoryFileName(histfname, tli);
00299     if (strcmp(histfname, filename) != 0)
00300     {
00301         fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
00302                 progname, tli, filename);
00303         return false;
00304     }
00305 
00306     /*
00307      * Write into a temp file name.
00308      */
00309     snprintf(tmppath, MAXPGPATH,  "%s.tmp", path);
00310 
00311     unlink(tmppath);
00312 
00313     fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
00314     if (fd < 0)
00315     {
00316         fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
00317                 progname, tmppath, strerror(errno));
00318         return false;
00319     }
00320 
00321     errno = 0;
00322     if ((int) write(fd, content, size) != size)
00323     {
00324         int         save_errno = errno;
00325 
00326         /*
00327          * If we fail to make the file, delete it to release disk space
00328          */
00329         unlink(tmppath);
00330         errno = save_errno;
00331 
00332         fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
00333                 progname, tmppath, strerror(errno));
00334         return false;
00335     }
00336 
00337     if (fsync(fd) != 0)
00338     {
00339         fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
00340                 progname, tmppath, strerror(errno));
00341         return false;
00342     }
00343 
00344     if (close(fd) != 0)
00345     {
00346         fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
00347                 progname, tmppath, strerror(errno));
00348         return false;
00349     }
00350 
00351     /*
00352      * Now move the completed history file into place with its final name.
00353      */
00354 
00355     snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
00356     if (rename(tmppath, path) < 0)
00357     {
00358         fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
00359                 progname, tmppath, path, strerror(errno));
00360         return false;
00361     }
00362 
00363     return true;
00364 }
00365 
00366 /*
00367  * Converts an int64 to network byte order.
00368  */
00369 static void
00370 sendint64(int64 i, char *buf)
00371 {
00372     uint32      n32;
00373 
00374     /* High order half first, since we're doing MSB-first */
00375     n32 = (uint32) (i >> 32);
00376     n32 = htonl(n32);
00377     memcpy(&buf[0], &n32, 4);
00378 
00379     /* Now the low order half */
00380     n32 = (uint32) i;
00381     n32 = htonl(n32);
00382     memcpy(&buf[4], &n32, 4);
00383 }
00384 
00385 /*
00386  * Converts an int64 from network byte order to native format.
00387  */
00388 static int64
00389 recvint64(char *buf)
00390 {
00391     int64       result;
00392     uint32      h32;
00393     uint32      l32;
00394 
00395     memcpy(&h32, buf, 4);
00396     memcpy(&l32, buf + 4, 4);
00397     h32 = ntohl(h32);
00398     l32 = ntohl(l32);
00399 
00400     result = h32;
00401     result <<= 32;
00402     result |= l32;
00403 
00404     return result;
00405 }
00406 
00407 /*
00408  * Send a Standby Status Update message to server.
00409  */
00410 static bool
00411 sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
00412 {
00413     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
00414     int         len = 0;
00415 
00416     replybuf[len] = 'r';
00417     len += 1;
00418     sendint64(blockpos, &replybuf[len]);            /* write */
00419     len += 8;
00420     sendint64(InvalidXLogRecPtr, &replybuf[len]);   /* flush */
00421     len += 8;
00422     sendint64(InvalidXLogRecPtr, &replybuf[len]);   /* apply */
00423     len += 8;
00424     sendint64(now, &replybuf[len]);                 /* sendTime */
00425     len += 8;
00426     replybuf[len] = replyRequested ? 1 : 0;         /* replyRequested */
00427     len += 1;
00428 
00429     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
00430     {
00431         fprintf(stderr, _("%s: could not send feedback packet: %s"),
00432                 progname, PQerrorMessage(conn));
00433         return false;
00434     }
00435 
00436     return true;
00437 }
00438 
00439 /*
00440  * Check that the server version we're connected to is supported by
00441  * ReceiveXlogStream().
00442  *
00443  * If it's not, an error message is printed to stderr, and false is returned.
00444  */
00445 bool
00446 CheckServerVersionForStreaming(PGconn *conn)
00447 {
00448     int         minServerMajor,
00449                 maxServerMajor;
00450     int         serverMajor;
00451 
00452     /*
00453      * The message format used in streaming replication changed in 9.3, so we
00454      * cannot stream from older servers. And we don't support servers newer
00455      * than the client; it might work, but we don't know, so err on the safe
00456      * side.
00457      */
00458     minServerMajor = 903;
00459     maxServerMajor = PG_VERSION_NUM / 100;
00460     serverMajor = PQserverVersion(conn) / 100;
00461     if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
00462     {
00463         const char *serverver = PQparameterStatus(conn, "server_version");
00464         fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"),
00465                 progname,
00466                 serverver ? serverver : "'unknown'",
00467                 "9.3");
00468         return false;
00469     }
00470     return true;
00471 }
00472 
00473 /*
00474  * Receive a log stream starting at the specified position.
00475  *
00476  * If sysidentifier is specified, validate that both the system
00477  * identifier and the timeline matches the specified ones
00478  * (by sending an extra IDENTIFY_SYSTEM command)
00479  *
00480  * All received segments will be written to the directory
00481  * specified by basedir. This will also fetch any missing timeline history
00482  * files.
00483  *
00484  * The stream_stop callback will be called every time data
00485  * is received, and whenever a segment is completed. If it returns
00486  * true, the streaming will stop and the function
00487  * return. As long as it returns false, streaming will continue
00488  * indefinitely.
00489  *
00490  * standby_message_timeout controls how often we send a message
00491  * back to the master letting it know our progress, in seconds.
00492  * This message will only contain the write location, and never
00493  * flush or replay.
00494  *
00495  * If 'partial_suffix' is not NULL, files are initially created with the
00496  * given suffix, and the suffix is removed once the file is finished. That
00497  * allows you to tell the difference between partial and completed files,
00498  * so that you can continue later where you left.
00499  *
00500  * Note: The log position *must* be at a log segment start!
00501  */
00502 bool
00503 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
00504                   char *sysidentifier, char *basedir,
00505                   stream_stop_callback stream_stop,
00506                   int standby_message_timeout, char *partial_suffix)
00507 {
00508     char        query[128];
00509     PGresult   *res;
00510     XLogRecPtr  stoppos;
00511 
00512     /*
00513      * The caller should've checked the server version already, but doesn't do
00514      * any harm to check it here too.
00515      */
00516     if (!CheckServerVersionForStreaming(conn))
00517         return false;
00518 
00519     if (sysidentifier != NULL)
00520     {
00521         /* Validate system identifier hasn't changed */
00522         res = PQexec(conn, "IDENTIFY_SYSTEM");
00523         if (PQresultStatus(res) != PGRES_TUPLES_OK)
00524         {
00525             fprintf(stderr,
00526                     _("%s: could not send replication command \"%s\": %s"),
00527                     progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
00528             PQclear(res);
00529             return false;
00530         }
00531         if (PQnfields(res) != 3 || PQntuples(res) != 1)
00532         {
00533             fprintf(stderr,
00534                     _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
00535                     progname, PQntuples(res), PQnfields(res), 1, 3);
00536             PQclear(res);
00537             return false;
00538         }
00539         if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
00540         {
00541             fprintf(stderr,
00542                     _("%s: system identifier does not match between base backup and streaming connection\n"),
00543                     progname);
00544             PQclear(res);
00545             return false;
00546         }
00547         if (timeline > atoi(PQgetvalue(res, 0, 1)))
00548         {
00549             fprintf(stderr,
00550                     _("%s: starting timeline %u is not present in the server\n"),
00551                     progname, timeline);
00552             PQclear(res);
00553             return false;
00554         }
00555         PQclear(res);
00556     }
00557 
00558     while (1)
00559     {
00560         /*
00561          * Fetch the timeline history file for this timeline, if we don't
00562          * have it already.
00563          */
00564         if (!existsTimeLineHistoryFile(basedir, timeline))
00565         {
00566             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
00567             res = PQexec(conn, query);
00568             if (PQresultStatus(res) != PGRES_TUPLES_OK)
00569             {
00570                 /* FIXME: we might send it ok, but get an error */
00571                 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
00572                         progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
00573                 PQclear(res);
00574                 return false;
00575             }
00576 
00577             /*
00578              * The response to TIMELINE_HISTORY is a single row result set
00579              * with two fields: filename and content
00580              */
00581             if (PQnfields(res) != 2 || PQntuples(res) != 1)
00582             {
00583                 fprintf(stderr,
00584                         _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
00585                     progname, PQntuples(res), PQnfields(res), 1, 2);
00586             }
00587 
00588             /* Write the history file to disk */
00589             writeTimeLineHistoryFile(basedir, timeline,
00590                                      PQgetvalue(res, 0, 0),
00591                                      PQgetvalue(res, 0, 1));
00592 
00593             PQclear(res);
00594         }
00595 
00596         /*
00597          * Before we start streaming from the requested location, check
00598          * if the callback tells us to stop here.
00599          */
00600         if (stream_stop(startpos, timeline, false))
00601             return true;
00602 
00603         /* Initiate the replication stream at specified location */
00604         snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
00605                  (uint32) (startpos >> 32), (uint32) startpos,
00606                  timeline);
00607         res = PQexec(conn, query);
00608         if (PQresultStatus(res) != PGRES_COPY_BOTH)
00609         {
00610             fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
00611                     progname, "START_REPLICATION", PQresultErrorMessage(res));
00612             PQclear(res);
00613             return false;
00614         }
00615         PQclear(res);
00616 
00617         /* Stream the WAL */
00618         res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
00619                                standby_message_timeout, partial_suffix,
00620                                &stoppos);
00621         if (res == NULL)
00622             goto error;
00623 
00624         /*
00625          * Streaming finished.
00626          *
00627          * There are two possible reasons for that: a controlled shutdown,
00628          * or we reached the end of the current timeline. In case of
00629          * end-of-timeline, the server sends a result set after Copy has
00630          * finished, containing the next timeline's ID. Read that, and
00631          * restart streaming from the next timeline.
00632          */
00633 
00634         if (PQresultStatus(res) == PGRES_TUPLES_OK)
00635         {
00636             /*
00637              * End-of-timeline. Read the next timeline's ID.
00638              */
00639             uint32      newtimeline;
00640 
00641             newtimeline = atoi(PQgetvalue(res, 0, 0));
00642             PQclear(res);
00643 
00644             if (newtimeline <= timeline)
00645             {
00646                 /* shouldn't happen */
00647                 fprintf(stderr,
00648                         "server reported unexpected next timeline %u, following timeline %u\n",
00649                         newtimeline, timeline);
00650                 goto error;
00651             }
00652 
00653             /* Read the final result, which should be CommandComplete. */
00654             res = PQgetResult(conn);
00655             if (PQresultStatus(res) != PGRES_COMMAND_OK)
00656             {
00657                 fprintf(stderr,
00658                         _("%s: unexpected termination of replication stream: %s"),
00659                         progname, PQresultErrorMessage(res));
00660                 goto error;
00661             }
00662             PQclear(res);
00663 
00664             /*
00665              * Loop back to start streaming from the new timeline.
00666              * Always start streaming at the beginning of a segment.
00667              */
00668             timeline = newtimeline;
00669             startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
00670             continue;
00671         }
00672         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
00673         {
00674             /*
00675              * End of replication (ie. controlled shut down of the server).
00676              *
00677              * Check if the callback thinks it's OK to stop here. If not,
00678              * complain.
00679              */
00680             if (stream_stop(stoppos, timeline, false))
00681                 return true;
00682             else
00683             {
00684                 fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
00685                         progname);
00686                 goto error;
00687             }
00688         }
00689         else
00690         {
00691             /* Server returned an error. */
00692             fprintf(stderr,
00693                     _("%s: unexpected termination of replication stream: %s"),
00694                     progname, PQresultErrorMessage(res));
00695             goto error;
00696         }
00697     }
00698 
00699 error:
00700     if (walfile != -1 && close(walfile) != 0)
00701         fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
00702                 progname, current_walfile_name, strerror(errno));
00703     walfile = -1;
00704     return false;
00705 }
00706 
00707 /*
00708  * The main loop of ReceiveXLogStream. Handles the COPY stream after
00709  * initiating streaming with the START_STREAMING command.
00710  *
00711  * If the COPY ends (not necessarily successfully) due a message from the
00712  * server, returns a PGresult and sets sets *stoppos to the last byte written.
00713  * On any other sort of error, returns NULL.
00714  */
00715 static PGresult *
00716 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
00717                  char *basedir, stream_stop_callback stream_stop,
00718                  int standby_message_timeout, char *partial_suffix,
00719                  XLogRecPtr *stoppos)
00720 {
00721     char       *copybuf = NULL;
00722     int64       last_status = -1;
00723     XLogRecPtr  blockpos = startpos;
00724     bool        still_sending = true;
00725 
00726     while (1)
00727     {
00728         int         r;
00729         int         xlogoff;
00730         int         bytes_left;
00731         int         bytes_written;
00732         int64       now;
00733         int         hdr_len;
00734 
00735         if (copybuf != NULL)
00736         {
00737             PQfreemem(copybuf);
00738             copybuf = NULL;
00739         }
00740 
00741         /*
00742          * Check if we should continue streaming, or abort at this point.
00743          */
00744         if (still_sending && stream_stop(blockpos, timeline, false))
00745         {
00746             if (!close_walfile(basedir, partial_suffix))
00747             {
00748                 /* Potential error message is written by close_walfile */
00749                 goto error;
00750             }
00751             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
00752             {
00753                 fprintf(stderr, _("%s: could not send copy-end packet: %s"),
00754                         progname, PQerrorMessage(conn));
00755                 goto error;
00756             }
00757             still_sending = false;
00758         }
00759 
00760         /*
00761          * Potentially send a status message to the master
00762          */
00763         now = localGetCurrentTimestamp();
00764         if (still_sending && standby_message_timeout > 0 &&
00765             localTimestampDifferenceExceeds(last_status, now,
00766                                             standby_message_timeout))
00767         {
00768             /* Time to send feedback! */
00769             if (!sendFeedback(conn, blockpos, now, false))
00770                 goto error;
00771             last_status = now;
00772         }
00773 
00774         r = PQgetCopyData(conn, &copybuf, 1);
00775         if (r == 0)
00776         {
00777             /*
00778              * No data available. Wait for some to appear, but not longer
00779              * than the specified timeout, so that we can ping the server.
00780              */
00781             fd_set      input_mask;
00782             struct timeval timeout;
00783             struct timeval *timeoutptr;
00784 
00785             FD_ZERO(&input_mask);
00786             FD_SET(PQsocket(conn), &input_mask);
00787             if (standby_message_timeout && still_sending)
00788             {
00789                 int64       targettime;
00790                 long        secs;
00791                 int         usecs;
00792 
00793                 targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
00794                 localTimestampDifference(now,
00795                                          targettime,
00796                                          &secs,
00797                                          &usecs);
00798                 if (secs <= 0)
00799                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
00800                 else
00801                     timeout.tv_sec = secs;
00802                 timeout.tv_usec = usecs;
00803                 timeoutptr = &timeout;
00804             }
00805             else
00806                 timeoutptr = NULL;
00807 
00808             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
00809             if (r == 0 || (r < 0 && errno == EINTR))
00810             {
00811                 /*
00812                  * Got a timeout or signal. Continue the loop and either
00813                  * deliver a status packet to the server or just go back
00814                  * into blocking.
00815                  */
00816                 continue;
00817             }
00818             else if (r < 0)
00819             {
00820                 fprintf(stderr, _("%s: select() failed: %s\n"),
00821                         progname, strerror(errno));
00822                 goto error;
00823             }
00824             /* Else there is actually data on the socket */
00825             if (PQconsumeInput(conn) == 0)
00826             {
00827                 fprintf(stderr,
00828                         _("%s: could not receive data from WAL stream: %s"),
00829                         progname, PQerrorMessage(conn));
00830                 goto error;
00831             }
00832             continue;
00833         }
00834         if (r == -1)
00835         {
00836             PGresult   *res = PQgetResult(conn);
00837 
00838             /*
00839              * The server closed its end of the copy stream.  If we haven't
00840              * closed ours already, we need to do so now, unless the server
00841              * threw an error, in which case we don't.
00842              */
00843             if (still_sending)
00844             {
00845                 if (!close_walfile(basedir, partial_suffix))
00846                 {
00847                     /* Error message written in close_walfile() */
00848                     goto error;
00849                 }
00850                 if (PQresultStatus(res) == PGRES_COPY_IN)
00851                 {
00852                     if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
00853                     {
00854                         fprintf(stderr,
00855                                 _("%s: could not send copy-end packet: %s"),
00856                                 progname, PQerrorMessage(conn));
00857                         goto error;
00858                     }
00859                     res = PQgetResult(conn);
00860                 }
00861                 still_sending = false;
00862             }
00863             if (copybuf != NULL)
00864                 PQfreemem(copybuf);
00865             *stoppos = blockpos;
00866             return res;
00867         }
00868         if (r == -2)
00869         {
00870             fprintf(stderr, _("%s: could not read COPY data: %s"),
00871                     progname, PQerrorMessage(conn));
00872             goto error;
00873         }
00874 
00875         /* Check the message type. */
00876         if (copybuf[0] == 'k')
00877         {
00878             int     pos;
00879             bool    replyRequested;
00880 
00881             /*
00882              * Parse the keepalive message, enclosed in the CopyData message.
00883              * We just check if the server requested a reply, and ignore the
00884              * rest.
00885              */
00886             pos = 1;    /* skip msgtype 'k' */
00887             pos += 8;   /* skip walEnd */
00888             pos += 8;   /* skip sendTime */
00889 
00890             if (r < pos + 1)
00891             {
00892                 fprintf(stderr, _("%s: streaming header too small: %d\n"),
00893                         progname, r);
00894                 goto error;
00895             }
00896             replyRequested = copybuf[pos];
00897 
00898             /* If the server requested an immediate reply, send one. */
00899             if (replyRequested && still_sending)
00900             {
00901                 now = localGetCurrentTimestamp();
00902                 if (!sendFeedback(conn, blockpos, now, false))
00903                     goto error;
00904                 last_status = now;
00905             }
00906         }
00907         else if (copybuf[0] == 'w')
00908         {
00909             /*
00910              * Once we've decided we don't want to receive any more, just
00911              * ignore any subsequent XLogData messages.
00912              */
00913             if (!still_sending)
00914                 continue;
00915 
00916             /*
00917              * Read the header of the XLogData message, enclosed in the
00918              * CopyData message. We only need the WAL location field
00919              * (dataStart), the rest of the header is ignored.
00920              */
00921             hdr_len = 1;    /* msgtype 'w' */
00922             hdr_len += 8;   /* dataStart */
00923             hdr_len += 8;   /* walEnd */
00924             hdr_len += 8;   /* sendTime */
00925             if (r < hdr_len + 1)
00926             {
00927                 fprintf(stderr, _("%s: streaming header too small: %d\n"),
00928                         progname, r);
00929                 goto error;
00930             }
00931             blockpos = recvint64(&copybuf[1]);
00932 
00933             /* Extract WAL location for this block */
00934             xlogoff = blockpos % XLOG_SEG_SIZE;
00935 
00936             /*
00937              * Verify that the initial location in the stream matches where
00938              * we think we are.
00939              */
00940             if (walfile == -1)
00941             {
00942                 /* No file open yet */
00943                 if (xlogoff != 0)
00944                 {
00945                     fprintf(stderr,
00946                             _("%s: received transaction log record for offset %u with no file open\n"),
00947                             progname, xlogoff);
00948                     goto error;
00949                 }
00950             }
00951             else
00952             {
00953                 /* More data in existing segment */
00954                 /* XXX: store seek value don't reseek all the time */
00955                 if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
00956                 {
00957                     fprintf(stderr,
00958                             _("%s: got WAL data offset %08x, expected %08x\n"),
00959                             progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
00960                     goto error;
00961                 }
00962             }
00963 
00964             bytes_left = r - hdr_len;
00965             bytes_written = 0;
00966 
00967             while (bytes_left)
00968             {
00969                 int         bytes_to_write;
00970 
00971                 /*
00972                  * If crossing a WAL boundary, only write up until we reach
00973                  * XLOG_SEG_SIZE.
00974                  */
00975                 if (xlogoff + bytes_left > XLOG_SEG_SIZE)
00976                     bytes_to_write = XLOG_SEG_SIZE - xlogoff;
00977                 else
00978                     bytes_to_write = bytes_left;
00979 
00980                 if (walfile == -1)
00981                 {
00982                     if (!open_walfile(blockpos, timeline,
00983                                       basedir, partial_suffix))
00984                     {
00985                         /* Error logged by open_walfile */
00986                         goto error;
00987                     }
00988                 }
00989 
00990                 if (write(walfile,
00991                           copybuf + hdr_len + bytes_written,
00992                           bytes_to_write) != bytes_to_write)
00993                 {
00994                     fprintf(stderr,
00995                             _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
00996                             progname, bytes_to_write, current_walfile_name,
00997                             strerror(errno));
00998                     goto error;
00999                 }
01000 
01001                 /* Write was successful, advance our position */
01002                 bytes_written += bytes_to_write;
01003                 bytes_left -= bytes_to_write;
01004                 blockpos += bytes_to_write;
01005                 xlogoff += bytes_to_write;
01006 
01007                 /* Did we reach the end of a WAL segment? */
01008                 if (blockpos % XLOG_SEG_SIZE == 0)
01009                 {
01010                     if (!close_walfile(basedir, partial_suffix))
01011                         /* Error message written in close_walfile() */
01012                         goto error;
01013 
01014                     xlogoff = 0;
01015 
01016                     if (still_sending && stream_stop(blockpos, timeline, false))
01017                     {
01018                         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
01019                         {
01020                             fprintf(stderr, _("%s: could not send copy-end packet: %s"),
01021                                     progname, PQerrorMessage(conn));
01022                             goto error;
01023                         }
01024                         still_sending = false;
01025                         break; /* ignore the rest of this XLogData packet */
01026                     }
01027                 }
01028             }
01029             /* No more data left to write, receive next copy packet */
01030         }
01031         else
01032         {
01033             fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
01034                     progname, copybuf[0]);
01035             goto error;
01036         }
01037     }
01038 
01039 error:
01040     if (copybuf != NULL)
01041         PQfreemem(copybuf);
01042     return NULL;
01043 }