00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "postgres_fe.h"
00015
00016 #include <sys/stat.h>
00017 #include <sys/time.h>
00018 #include <sys/types.h>
00019 #include <unistd.h>
00020
00021 #include <netinet/in.h>
00022 #include <arpa/inet.h>
00023
00024 #include "libpq-fe.h"
00025 #include "access/xlog_internal.h"
00026
00027 #include "receivelog.h"
00028 #include "streamutil.h"
00029
00030
00031
00032 static int walfile = -1;
00033 static char current_walfile_name[MAXPGPATH] = "";
00034
00035 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
00036 uint32 timeline, char *basedir,
00037 stream_stop_callback stream_stop, int standby_message_timeout,
00038 char *partial_suffix, XLogRecPtr *stoppos);
00039
00040
00041
00042
00043
00044
00045
00046 static bool
00047 open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
00048 char *partial_suffix)
00049 {
00050 int f;
00051 char fn[MAXPGPATH];
00052 struct stat statbuf;
00053 char *zerobuf;
00054 int bytes;
00055 XLogSegNo segno;
00056
00057 XLByteToSeg(startpoint, segno);
00058 XLogFileName(current_walfile_name, timeline, segno);
00059
00060 snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
00061 partial_suffix ? partial_suffix : "");
00062 f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
00063 if (f == -1)
00064 {
00065 fprintf(stderr,
00066 _("%s: could not open transaction log file \"%s\": %s\n"),
00067 progname, fn, strerror(errno));
00068 return false;
00069 }
00070
00071
00072
00073
00074
00075 if (fstat(f, &statbuf) != 0)
00076 {
00077 fprintf(stderr,
00078 _("%s: could not stat transaction log file \"%s\": %s\n"),
00079 progname, fn, strerror(errno));
00080 close(f);
00081 return false;
00082 }
00083 if (statbuf.st_size == XLogSegSize)
00084 {
00085
00086 walfile = f;
00087 return true;
00088 }
00089 if (statbuf.st_size != 0)
00090 {
00091 fprintf(stderr,
00092 _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
00093 progname, fn, (int) statbuf.st_size, XLogSegSize);
00094 close(f);
00095 return false;
00096 }
00097
00098
00099 zerobuf = pg_malloc0(XLOG_BLCKSZ);
00100 for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
00101 {
00102 if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
00103 {
00104 fprintf(stderr,
00105 _("%s: could not pad transaction log file \"%s\": %s\n"),
00106 progname, fn, strerror(errno));
00107 free(zerobuf);
00108 close(f);
00109 unlink(fn);
00110 return false;
00111 }
00112 }
00113 free(zerobuf);
00114
00115 if (lseek(f, SEEK_SET, 0) != 0)
00116 {
00117 fprintf(stderr,
00118 _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
00119 progname, fn, strerror(errno));
00120 close(f);
00121 return false;
00122 }
00123 walfile = f;
00124 return true;
00125 }
00126
00127
00128
00129
00130
00131
00132 static bool
00133 close_walfile(char *basedir, char *partial_suffix)
00134 {
00135 off_t currpos;
00136
00137 if (walfile == -1)
00138 return true;
00139
00140 currpos = lseek(walfile, 0, SEEK_CUR);
00141 if (currpos == -1)
00142 {
00143 fprintf(stderr,
00144 _("%s: could not determine seek position in file \"%s\": %s\n"),
00145 progname, current_walfile_name, strerror(errno));
00146 return false;
00147 }
00148
00149 if (fsync(walfile) != 0)
00150 {
00151 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
00152 progname, current_walfile_name, strerror(errno));
00153 return false;
00154 }
00155
00156 if (close(walfile) != 0)
00157 {
00158 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
00159 progname, current_walfile_name, strerror(errno));
00160 walfile = -1;
00161 return false;
00162 }
00163 walfile = -1;
00164
00165
00166
00167
00168
00169 if (currpos == XLOG_SEG_SIZE && partial_suffix)
00170 {
00171 char oldfn[MAXPGPATH];
00172 char newfn[MAXPGPATH];
00173
00174 snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
00175 snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
00176 if (rename(oldfn, newfn) != 0)
00177 {
00178 fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
00179 progname, current_walfile_name, strerror(errno));
00180 return false;
00181 }
00182 }
00183 else if (partial_suffix)
00184 fprintf(stderr,
00185 _("%s: not renaming \"%s%s\", segment is not complete\n"),
00186 progname, current_walfile_name, partial_suffix);
00187
00188 return true;
00189 }
00190
00191
00192
00193
00194
00195
00196
00197 static int64
00198 localGetCurrentTimestamp(void)
00199 {
00200 int64 result;
00201 struct timeval tp;
00202
00203 gettimeofday(&tp, NULL);
00204
00205 result = (int64) tp.tv_sec -
00206 ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
00207
00208 result = (result * USECS_PER_SEC) + tp.tv_usec;
00209
00210 return result;
00211 }
00212
00213
00214
00215
00216
00217 static void
00218 localTimestampDifference(int64 start_time, int64 stop_time,
00219 long *secs, int *microsecs)
00220 {
00221 int64 diff = stop_time - start_time;
00222
00223 if (diff <= 0)
00224 {
00225 *secs = 0;
00226 *microsecs = 0;
00227 }
00228 else
00229 {
00230 *secs = (long) (diff / USECS_PER_SEC);
00231 *microsecs = (int) (diff % USECS_PER_SEC);
00232 }
00233 }
00234
00235
00236
00237
00238
00239 static bool
00240 localTimestampDifferenceExceeds(int64 start_time,
00241 int64 stop_time,
00242 int msec)
00243 {
00244 int64 diff = stop_time - start_time;
00245
00246 return (diff >= msec * INT64CONST(1000));
00247 }
00248
00249
00250
00251
00252 static bool
00253 existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
00254 {
00255 char path[MAXPGPATH];
00256 char histfname[MAXFNAMELEN];
00257 int fd;
00258
00259
00260
00261
00262
00263 if (tli == 1)
00264 return true;
00265
00266 TLHistoryFileName(histfname, tli);
00267
00268 snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
00269
00270 fd = open(path, O_RDONLY | PG_BINARY, 0);
00271 if (fd < 0)
00272 {
00273 if (errno != ENOENT)
00274 fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
00275 progname, path, strerror(errno));
00276 return false;
00277 }
00278 else
00279 {
00280 close(fd);
00281 return true;
00282 }
00283 }
00284
00285 static bool
00286 writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
00287 {
00288 int size = strlen(content);
00289 char path[MAXPGPATH];
00290 char tmppath[MAXPGPATH];
00291 char histfname[MAXFNAMELEN];
00292 int fd;
00293
00294
00295
00296
00297
00298 TLHistoryFileName(histfname, tli);
00299 if (strcmp(histfname, filename) != 0)
00300 {
00301 fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
00302 progname, tli, filename);
00303 return false;
00304 }
00305
00306
00307
00308
00309 snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
00310
00311 unlink(tmppath);
00312
00313 fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
00314 if (fd < 0)
00315 {
00316 fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
00317 progname, tmppath, strerror(errno));
00318 return false;
00319 }
00320
00321 errno = 0;
00322 if ((int) write(fd, content, size) != size)
00323 {
00324 int save_errno = errno;
00325
00326
00327
00328
00329 unlink(tmppath);
00330 errno = save_errno;
00331
00332 fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
00333 progname, tmppath, strerror(errno));
00334 return false;
00335 }
00336
00337 if (fsync(fd) != 0)
00338 {
00339 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
00340 progname, tmppath, strerror(errno));
00341 return false;
00342 }
00343
00344 if (close(fd) != 0)
00345 {
00346 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
00347 progname, tmppath, strerror(errno));
00348 return false;
00349 }
00350
00351
00352
00353
00354
00355 snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
00356 if (rename(tmppath, path) < 0)
00357 {
00358 fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
00359 progname, tmppath, path, strerror(errno));
00360 return false;
00361 }
00362
00363 return true;
00364 }
00365
00366
00367
00368
00369 static void
00370 sendint64(int64 i, char *buf)
00371 {
00372 uint32 n32;
00373
00374
00375 n32 = (uint32) (i >> 32);
00376 n32 = htonl(n32);
00377 memcpy(&buf[0], &n32, 4);
00378
00379
00380 n32 = (uint32) i;
00381 n32 = htonl(n32);
00382 memcpy(&buf[4], &n32, 4);
00383 }
00384
00385
00386
00387
00388 static int64
00389 recvint64(char *buf)
00390 {
00391 int64 result;
00392 uint32 h32;
00393 uint32 l32;
00394
00395 memcpy(&h32, buf, 4);
00396 memcpy(&l32, buf + 4, 4);
00397 h32 = ntohl(h32);
00398 l32 = ntohl(l32);
00399
00400 result = h32;
00401 result <<= 32;
00402 result |= l32;
00403
00404 return result;
00405 }
00406
00407
00408
00409
00410 static bool
00411 sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
00412 {
00413 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
00414 int len = 0;
00415
00416 replybuf[len] = 'r';
00417 len += 1;
00418 sendint64(blockpos, &replybuf[len]);
00419 len += 8;
00420 sendint64(InvalidXLogRecPtr, &replybuf[len]);
00421 len += 8;
00422 sendint64(InvalidXLogRecPtr, &replybuf[len]);
00423 len += 8;
00424 sendint64(now, &replybuf[len]);
00425 len += 8;
00426 replybuf[len] = replyRequested ? 1 : 0;
00427 len += 1;
00428
00429 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
00430 {
00431 fprintf(stderr, _("%s: could not send feedback packet: %s"),
00432 progname, PQerrorMessage(conn));
00433 return false;
00434 }
00435
00436 return true;
00437 }
00438
00439
00440
00441
00442
00443
00444
00445 bool
00446 CheckServerVersionForStreaming(PGconn *conn)
00447 {
00448 int minServerMajor,
00449 maxServerMajor;
00450 int serverMajor;
00451
00452
00453
00454
00455
00456
00457
00458 minServerMajor = 903;
00459 maxServerMajor = PG_VERSION_NUM / 100;
00460 serverMajor = PQserverVersion(conn) / 100;
00461 if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
00462 {
00463 const char *serverver = PQparameterStatus(conn, "server_version");
00464 fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"),
00465 progname,
00466 serverver ? serverver : "'unknown'",
00467 "9.3");
00468 return false;
00469 }
00470 return true;
00471 }
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502 bool
00503 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
00504 char *sysidentifier, char *basedir,
00505 stream_stop_callback stream_stop,
00506 int standby_message_timeout, char *partial_suffix)
00507 {
00508 char query[128];
00509 PGresult *res;
00510 XLogRecPtr stoppos;
00511
00512
00513
00514
00515
00516 if (!CheckServerVersionForStreaming(conn))
00517 return false;
00518
00519 if (sysidentifier != NULL)
00520 {
00521
00522 res = PQexec(conn, "IDENTIFY_SYSTEM");
00523 if (PQresultStatus(res) != PGRES_TUPLES_OK)
00524 {
00525 fprintf(stderr,
00526 _("%s: could not send replication command \"%s\": %s"),
00527 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
00528 PQclear(res);
00529 return false;
00530 }
00531 if (PQnfields(res) != 3 || PQntuples(res) != 1)
00532 {
00533 fprintf(stderr,
00534 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
00535 progname, PQntuples(res), PQnfields(res), 1, 3);
00536 PQclear(res);
00537 return false;
00538 }
00539 if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
00540 {
00541 fprintf(stderr,
00542 _("%s: system identifier does not match between base backup and streaming connection\n"),
00543 progname);
00544 PQclear(res);
00545 return false;
00546 }
00547 if (timeline > atoi(PQgetvalue(res, 0, 1)))
00548 {
00549 fprintf(stderr,
00550 _("%s: starting timeline %u is not present in the server\n"),
00551 progname, timeline);
00552 PQclear(res);
00553 return false;
00554 }
00555 PQclear(res);
00556 }
00557
00558 while (1)
00559 {
00560
00561
00562
00563
00564 if (!existsTimeLineHistoryFile(basedir, timeline))
00565 {
00566 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
00567 res = PQexec(conn, query);
00568 if (PQresultStatus(res) != PGRES_TUPLES_OK)
00569 {
00570
00571 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
00572 progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
00573 PQclear(res);
00574 return false;
00575 }
00576
00577
00578
00579
00580
00581 if (PQnfields(res) != 2 || PQntuples(res) != 1)
00582 {
00583 fprintf(stderr,
00584 _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
00585 progname, PQntuples(res), PQnfields(res), 1, 2);
00586 }
00587
00588
00589 writeTimeLineHistoryFile(basedir, timeline,
00590 PQgetvalue(res, 0, 0),
00591 PQgetvalue(res, 0, 1));
00592
00593 PQclear(res);
00594 }
00595
00596
00597
00598
00599
00600 if (stream_stop(startpos, timeline, false))
00601 return true;
00602
00603
00604 snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
00605 (uint32) (startpos >> 32), (uint32) startpos,
00606 timeline);
00607 res = PQexec(conn, query);
00608 if (PQresultStatus(res) != PGRES_COPY_BOTH)
00609 {
00610 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
00611 progname, "START_REPLICATION", PQresultErrorMessage(res));
00612 PQclear(res);
00613 return false;
00614 }
00615 PQclear(res);
00616
00617
00618 res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
00619 standby_message_timeout, partial_suffix,
00620 &stoppos);
00621 if (res == NULL)
00622 goto error;
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634 if (PQresultStatus(res) == PGRES_TUPLES_OK)
00635 {
00636
00637
00638
00639 uint32 newtimeline;
00640
00641 newtimeline = atoi(PQgetvalue(res, 0, 0));
00642 PQclear(res);
00643
00644 if (newtimeline <= timeline)
00645 {
00646
00647 fprintf(stderr,
00648 "server reported unexpected next timeline %u, following timeline %u\n",
00649 newtimeline, timeline);
00650 goto error;
00651 }
00652
00653
00654 res = PQgetResult(conn);
00655 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00656 {
00657 fprintf(stderr,
00658 _("%s: unexpected termination of replication stream: %s"),
00659 progname, PQresultErrorMessage(res));
00660 goto error;
00661 }
00662 PQclear(res);
00663
00664
00665
00666
00667
00668 timeline = newtimeline;
00669 startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
00670 continue;
00671 }
00672 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
00673 {
00674
00675
00676
00677
00678
00679
00680 if (stream_stop(stoppos, timeline, false))
00681 return true;
00682 else
00683 {
00684 fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
00685 progname);
00686 goto error;
00687 }
00688 }
00689 else
00690 {
00691
00692 fprintf(stderr,
00693 _("%s: unexpected termination of replication stream: %s"),
00694 progname, PQresultErrorMessage(res));
00695 goto error;
00696 }
00697 }
00698
00699 error:
00700 if (walfile != -1 && close(walfile) != 0)
00701 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
00702 progname, current_walfile_name, strerror(errno));
00703 walfile = -1;
00704 return false;
00705 }
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715 static PGresult *
00716 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
00717 char *basedir, stream_stop_callback stream_stop,
00718 int standby_message_timeout, char *partial_suffix,
00719 XLogRecPtr *stoppos)
00720 {
00721 char *copybuf = NULL;
00722 int64 last_status = -1;
00723 XLogRecPtr blockpos = startpos;
00724 bool still_sending = true;
00725
00726 while (1)
00727 {
00728 int r;
00729 int xlogoff;
00730 int bytes_left;
00731 int bytes_written;
00732 int64 now;
00733 int hdr_len;
00734
00735 if (copybuf != NULL)
00736 {
00737 PQfreemem(copybuf);
00738 copybuf = NULL;
00739 }
00740
00741
00742
00743
00744 if (still_sending && stream_stop(blockpos, timeline, false))
00745 {
00746 if (!close_walfile(basedir, partial_suffix))
00747 {
00748
00749 goto error;
00750 }
00751 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
00752 {
00753 fprintf(stderr, _("%s: could not send copy-end packet: %s"),
00754 progname, PQerrorMessage(conn));
00755 goto error;
00756 }
00757 still_sending = false;
00758 }
00759
00760
00761
00762
00763 now = localGetCurrentTimestamp();
00764 if (still_sending && standby_message_timeout > 0 &&
00765 localTimestampDifferenceExceeds(last_status, now,
00766 standby_message_timeout))
00767 {
00768
00769 if (!sendFeedback(conn, blockpos, now, false))
00770 goto error;
00771 last_status = now;
00772 }
00773
00774 r = PQgetCopyData(conn, ©buf, 1);
00775 if (r == 0)
00776 {
00777
00778
00779
00780
00781 fd_set input_mask;
00782 struct timeval timeout;
00783 struct timeval *timeoutptr;
00784
00785 FD_ZERO(&input_mask);
00786 FD_SET(PQsocket(conn), &input_mask);
00787 if (standby_message_timeout && still_sending)
00788 {
00789 int64 targettime;
00790 long secs;
00791 int usecs;
00792
00793 targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
00794 localTimestampDifference(now,
00795 targettime,
00796 &secs,
00797 &usecs);
00798 if (secs <= 0)
00799 timeout.tv_sec = 1;
00800 else
00801 timeout.tv_sec = secs;
00802 timeout.tv_usec = usecs;
00803 timeoutptr = &timeout;
00804 }
00805 else
00806 timeoutptr = NULL;
00807
00808 r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
00809 if (r == 0 || (r < 0 && errno == EINTR))
00810 {
00811
00812
00813
00814
00815
00816 continue;
00817 }
00818 else if (r < 0)
00819 {
00820 fprintf(stderr, _("%s: select() failed: %s\n"),
00821 progname, strerror(errno));
00822 goto error;
00823 }
00824
00825 if (PQconsumeInput(conn) == 0)
00826 {
00827 fprintf(stderr,
00828 _("%s: could not receive data from WAL stream: %s"),
00829 progname, PQerrorMessage(conn));
00830 goto error;
00831 }
00832 continue;
00833 }
00834 if (r == -1)
00835 {
00836 PGresult *res = PQgetResult(conn);
00837
00838
00839
00840
00841
00842
00843 if (still_sending)
00844 {
00845 if (!close_walfile(basedir, partial_suffix))
00846 {
00847
00848 goto error;
00849 }
00850 if (PQresultStatus(res) == PGRES_COPY_IN)
00851 {
00852 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
00853 {
00854 fprintf(stderr,
00855 _("%s: could not send copy-end packet: %s"),
00856 progname, PQerrorMessage(conn));
00857 goto error;
00858 }
00859 res = PQgetResult(conn);
00860 }
00861 still_sending = false;
00862 }
00863 if (copybuf != NULL)
00864 PQfreemem(copybuf);
00865 *stoppos = blockpos;
00866 return res;
00867 }
00868 if (r == -2)
00869 {
00870 fprintf(stderr, _("%s: could not read COPY data: %s"),
00871 progname, PQerrorMessage(conn));
00872 goto error;
00873 }
00874
00875
00876 if (copybuf[0] == 'k')
00877 {
00878 int pos;
00879 bool replyRequested;
00880
00881
00882
00883
00884
00885
00886 pos = 1;
00887 pos += 8;
00888 pos += 8;
00889
00890 if (r < pos + 1)
00891 {
00892 fprintf(stderr, _("%s: streaming header too small: %d\n"),
00893 progname, r);
00894 goto error;
00895 }
00896 replyRequested = copybuf[pos];
00897
00898
00899 if (replyRequested && still_sending)
00900 {
00901 now = localGetCurrentTimestamp();
00902 if (!sendFeedback(conn, blockpos, now, false))
00903 goto error;
00904 last_status = now;
00905 }
00906 }
00907 else if (copybuf[0] == 'w')
00908 {
00909
00910
00911
00912
00913 if (!still_sending)
00914 continue;
00915
00916
00917
00918
00919
00920
00921 hdr_len = 1;
00922 hdr_len += 8;
00923 hdr_len += 8;
00924 hdr_len += 8;
00925 if (r < hdr_len + 1)
00926 {
00927 fprintf(stderr, _("%s: streaming header too small: %d\n"),
00928 progname, r);
00929 goto error;
00930 }
00931 blockpos = recvint64(©buf[1]);
00932
00933
00934 xlogoff = blockpos % XLOG_SEG_SIZE;
00935
00936
00937
00938
00939
00940 if (walfile == -1)
00941 {
00942
00943 if (xlogoff != 0)
00944 {
00945 fprintf(stderr,
00946 _("%s: received transaction log record for offset %u with no file open\n"),
00947 progname, xlogoff);
00948 goto error;
00949 }
00950 }
00951 else
00952 {
00953
00954
00955 if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
00956 {
00957 fprintf(stderr,
00958 _("%s: got WAL data offset %08x, expected %08x\n"),
00959 progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
00960 goto error;
00961 }
00962 }
00963
00964 bytes_left = r - hdr_len;
00965 bytes_written = 0;
00966
00967 while (bytes_left)
00968 {
00969 int bytes_to_write;
00970
00971
00972
00973
00974
00975 if (xlogoff + bytes_left > XLOG_SEG_SIZE)
00976 bytes_to_write = XLOG_SEG_SIZE - xlogoff;
00977 else
00978 bytes_to_write = bytes_left;
00979
00980 if (walfile == -1)
00981 {
00982 if (!open_walfile(blockpos, timeline,
00983 basedir, partial_suffix))
00984 {
00985
00986 goto error;
00987 }
00988 }
00989
00990 if (write(walfile,
00991 copybuf + hdr_len + bytes_written,
00992 bytes_to_write) != bytes_to_write)
00993 {
00994 fprintf(stderr,
00995 _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
00996 progname, bytes_to_write, current_walfile_name,
00997 strerror(errno));
00998 goto error;
00999 }
01000
01001
01002 bytes_written += bytes_to_write;
01003 bytes_left -= bytes_to_write;
01004 blockpos += bytes_to_write;
01005 xlogoff += bytes_to_write;
01006
01007
01008 if (blockpos % XLOG_SEG_SIZE == 0)
01009 {
01010 if (!close_walfile(basedir, partial_suffix))
01011
01012 goto error;
01013
01014 xlogoff = 0;
01015
01016 if (still_sending && stream_stop(blockpos, timeline, false))
01017 {
01018 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
01019 {
01020 fprintf(stderr, _("%s: could not send copy-end packet: %s"),
01021 progname, PQerrorMessage(conn));
01022 goto error;
01023 }
01024 still_sending = false;
01025 break;
01026 }
01027 }
01028 }
01029
01030 }
01031 else
01032 {
01033 fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
01034 progname, copybuf[0]);
01035 goto error;
01036 }
01037 }
01038
01039 error:
01040 if (copybuf != NULL)
01041 PQfreemem(copybuf);
01042 return NULL;
01043 }