Header And Logo

PostgreSQL
| The world's most advanced open source database.

xlogreader.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * xlogreader.c
00004  *      Generic XLog reading facility
00005  *
00006  * Portions Copyright (c) 2013, PostgreSQL Global Development Group
00007  *
00008  * IDENTIFICATION
00009  *      src/backend/access/transam/xlogreader.c
00010  *
00011  * NOTES
00012  *      See xlogreader.h for more notes on this facility.
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/transam.h"
00019 #include "access/xlog.h"
00020 #include "access/xlog_internal.h"
00021 #include "access/xlogreader.h"
00022 #include "catalog/pg_control.h"
00023 
00024 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
00025 
00026 static bool ValidXLogPageHeader(XLogReaderState *state, XLogRecPtr recptr,
00027                     XLogPageHeader hdr);
00028 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
00029                  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
00030 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
00031                 XLogRecPtr recptr);
00032 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
00033                  int reqLen);
00034 static void
00035 report_invalid_record(XLogReaderState *state, const char *fmt,...)
00036 /* This extension allows gcc to check the format string for consistency with
00037    the supplied arguments. */
00038 __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
00039 
00040 /* size of the buffer allocated for error message. */
00041 #define MAX_ERRORMSG_LEN 1000
00042 
00043 /*
00044  * Construct a string in state->errormsg_buf explaining what's wrong with
00045  * the current record being read.
00046  */
00047 static void
00048 report_invalid_record(XLogReaderState *state, const char *fmt,...)
00049 {
00050     va_list     args;
00051 
00052     fmt = _(fmt);
00053 
00054     va_start(args, fmt);
00055     vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
00056     va_end(args);
00057 }
00058 
00059 /*
00060  * Allocate and initialize a new XLogReader.
00061  *
00062  * Returns NULL if the xlogreader couldn't be allocated.
00063  */
00064 XLogReaderState *
00065 XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
00066 {
00067     XLogReaderState *state;
00068 
00069     AssertArg(pagereadfunc != NULL);
00070 
00071     state = (XLogReaderState *) malloc(sizeof(XLogReaderState));
00072     if (!state)
00073         return NULL;
00074     MemSet(state, 0, sizeof(XLogReaderState));
00075 
00076     /*
00077      * Permanently allocate readBuf.  We do it this way, rather than just
00078      * making a static array, for two reasons: (1) no need to waste the
00079      * storage in most instantiations of the backend; (2) a static char array
00080      * isn't guaranteed to have any particular alignment, whereas malloc()
00081      * will provide MAXALIGN'd storage.
00082      */
00083     state->readBuf = (char *) malloc(XLOG_BLCKSZ);
00084     if (!state->readBuf)
00085     {
00086         free(state);
00087         return NULL;
00088     }
00089 
00090     state->read_page = pagereadfunc;
00091     /* system_identifier initialized to zeroes above */
00092     state->private_data = private_data;
00093     /* ReadRecPtr and EndRecPtr initialized to zeroes above */
00094     /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
00095     state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1);
00096     if (!state->errormsg_buf)
00097     {
00098         free(state->readBuf);
00099         free(state);
00100         return NULL;
00101     }
00102     state->errormsg_buf[0] = '\0';
00103 
00104     /*
00105      * Allocate an initial readRecordBuf of minimal size, which can later be
00106      * enlarged if necessary.
00107      */
00108     if (!allocate_recordbuf(state, 0))
00109     {
00110         free(state->errormsg_buf);
00111         free(state->readBuf);
00112         free(state);
00113         return NULL;
00114     }
00115 
00116     return state;
00117 }
00118 
00119 void
00120 XLogReaderFree(XLogReaderState *state)
00121 {
00122     free(state->errormsg_buf);
00123     if (state->readRecordBuf)
00124         free(state->readRecordBuf);
00125     free(state->readBuf);
00126     free(state);
00127 }
00128 
00129 /*
00130  * Allocate readRecordBuf to fit a record of at least the given length.
00131  * Returns true if successful, false if out of memory.
00132  *
00133  * readRecordBufSize is set to the new buffer size.
00134  *
00135  * To avoid useless small increases, round its size to a multiple of
00136  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
00137  * with.  (That is enough for all "normal" records, but very large commit or
00138  * abort records might need more space.)
00139  */
00140 static bool
00141 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
00142 {
00143     uint32      newSize = reclength;
00144 
00145     newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
00146     newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
00147 
00148     if (state->readRecordBuf)
00149         free(state->readRecordBuf);
00150     state->readRecordBuf = (char *) malloc(newSize);
00151     if (!state->readRecordBuf)
00152     {
00153         state->readRecordBufSize = 0;
00154         return false;
00155     }
00156 
00157     state->readRecordBufSize = newSize;
00158     return true;
00159 }
00160 
00161 /*
00162  * Attempt to read an XLOG record.
00163  *
00164  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
00165  * try to read a record just after the last one previously read.
00166  *
00167  * If the read_page callback fails to read the requested data, NULL is
00168  * returned.  The callback is expected to have reported the error; errormsg
00169  * is set to NULL.
00170  *
00171  * If the reading fails for some other reason, NULL is also returned, and
00172  * *errormsg is set to a string with details of the failure.
00173  *
00174  * The returned pointer (or *errormsg) points to an internal buffer that's
00175  * valid until the next call to XLogReadRecord.
00176  */
00177 XLogRecord *
00178 XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
00179 {
00180     XLogRecord *record;
00181     XLogRecPtr  targetPagePtr;
00182     bool        randAccess = false;
00183     uint32      len,
00184                 total_len;
00185     uint32      targetRecOff;
00186     uint32      pageHeaderSize;
00187     bool        gotheader;
00188     int         readOff;
00189 
00190     /* reset error state */
00191     *errormsg = NULL;
00192     state->errormsg_buf[0] = '\0';
00193 
00194     if (RecPtr == InvalidXLogRecPtr)
00195     {
00196         RecPtr = state->EndRecPtr;
00197 
00198         if (state->ReadRecPtr == InvalidXLogRecPtr)
00199             randAccess = true;
00200 
00201         /*
00202          * RecPtr is pointing to end+1 of the previous WAL record.  If we're
00203          * at a page boundary, no more records can fit on the current page. We
00204          * must skip over the page header, but we can't do that until we've
00205          * read in the page, since the header size is variable.
00206          */
00207     }
00208     else
00209     {
00210         /*
00211          * In this case, the passed-in record pointer should already be
00212          * pointing to a valid record starting position.
00213          */
00214         Assert(XRecOffIsValid(RecPtr));
00215         randAccess = true;      /* allow readPageTLI to go backwards too */
00216     }
00217 
00218     state->currRecPtr = RecPtr;
00219 
00220     targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
00221     targetRecOff = RecPtr % XLOG_BLCKSZ;
00222 
00223     /*
00224      * Read the page containing the record into state->readBuf. Request
00225      * enough byte to cover the whole record header, or at least the part of
00226      * it that fits on the same page.
00227      */
00228     readOff = ReadPageInternal(state,
00229                                targetPagePtr,
00230                           Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
00231     if (readOff < 0)
00232         goto err;
00233 
00234     /*
00235      * ReadPageInternal always returns at least the page header, so we can
00236      * examine it now.
00237      */
00238     pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
00239     if (targetRecOff == 0)
00240     {
00241         /*
00242          * At page start, so skip over page header.
00243          */
00244         RecPtr += pageHeaderSize;
00245         targetRecOff = pageHeaderSize;
00246     }
00247     else if (targetRecOff < pageHeaderSize)
00248     {
00249         report_invalid_record(state, "invalid record offset at %X/%X",
00250                               (uint32) (RecPtr >> 32), (uint32) RecPtr);
00251         goto err;
00252     }
00253 
00254     if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
00255         targetRecOff == pageHeaderSize)
00256     {
00257         report_invalid_record(state, "contrecord is requested by %X/%X",
00258                               (uint32) (RecPtr >> 32), (uint32) RecPtr);
00259         goto err;
00260     }
00261 
00262     /* ReadPageInternal has verified the page header */
00263     Assert(pageHeaderSize <= readOff);
00264 
00265     /*
00266      * Read the record length.
00267      *
00268      * NB: Even though we use an XLogRecord pointer here, the whole record
00269      * header might not fit on this page. xl_tot_len is the first field of the
00270      * struct, so it must be on this page (the records are MAXALIGNed), but we
00271      * cannot access any other fields until we've verified that we got the
00272      * whole header.
00273      */
00274     record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
00275     total_len = record->xl_tot_len;
00276 
00277     /*
00278      * If the whole record header is on this page, validate it immediately.
00279      * Otherwise do just a basic sanity check on xl_tot_len, and validate the
00280      * rest of the header after reading it from the next page.  The xl_tot_len
00281      * check is necessary here to ensure that we enter the "Need to reassemble
00282      * record" code path below; otherwise we might fail to apply
00283      * ValidXLogRecordHeader at all.
00284      */
00285     if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
00286     {
00287         if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
00288                                    randAccess))
00289             goto err;
00290         gotheader = true;
00291     }
00292     else
00293     {
00294         /* XXX: more validation should be done here */
00295         if (total_len < SizeOfXLogRecord)
00296         {
00297             report_invalid_record(state, "invalid record length at %X/%X",
00298                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
00299             goto err;
00300         }
00301         gotheader = false;
00302     }
00303 
00304     /*
00305      * Enlarge readRecordBuf as needed.
00306      */
00307     if (total_len > state->readRecordBufSize &&
00308         !allocate_recordbuf(state, total_len))
00309     {
00310         /* We treat this as a "bogus data" condition */
00311         report_invalid_record(state, "record length %u at %X/%X too long",
00312                               total_len,
00313                               (uint32) (RecPtr >> 32), (uint32) RecPtr);
00314         goto err;
00315     }
00316 
00317     len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
00318     if (total_len > len)
00319     {
00320         /* Need to reassemble record */
00321         char       *contdata;
00322         XLogPageHeader pageHeader;
00323         char       *buffer;
00324         uint32      gotlen;
00325 
00326         /* Copy the first fragment of the record from the first page. */
00327         memcpy(state->readRecordBuf,
00328                state->readBuf + RecPtr % XLOG_BLCKSZ, len);
00329         buffer = state->readRecordBuf + len;
00330         gotlen = len;
00331 
00332         do
00333         {
00334             /* Calculate pointer to beginning of next page */
00335             targetPagePtr += XLOG_BLCKSZ;
00336 
00337             /* Wait for the next page to become available */
00338             readOff = ReadPageInternal(state, targetPagePtr,
00339                                  Min(total_len - gotlen + SizeOfXLogShortPHD,
00340                                      XLOG_BLCKSZ));
00341 
00342             if (readOff < 0)
00343                 goto err;
00344 
00345             Assert(SizeOfXLogShortPHD <= readOff);
00346 
00347             /* Check that the continuation on next page looks valid */
00348             pageHeader = (XLogPageHeader) state->readBuf;
00349             if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
00350             {
00351                 report_invalid_record(state,
00352                                       "there is no contrecord flag at %X/%X",
00353                                    (uint32) (RecPtr >> 32), (uint32) RecPtr);
00354                 goto err;
00355             }
00356 
00357             /*
00358              * Cross-check that xlp_rem_len agrees with how much of the record
00359              * we expect there to be left.
00360              */
00361             if (pageHeader->xlp_rem_len == 0 ||
00362                 total_len != (pageHeader->xlp_rem_len + gotlen))
00363             {
00364                 report_invalid_record(state,
00365                                       "invalid contrecord length %u at %X/%X",
00366                                       pageHeader->xlp_rem_len,
00367                                    (uint32) (RecPtr >> 32), (uint32) RecPtr);
00368                 goto err;
00369             }
00370 
00371             /* Append the continuation from this page to the buffer */
00372             pageHeaderSize = XLogPageHeaderSize(pageHeader);
00373 
00374             if (readOff < pageHeaderSize)
00375                 readOff = ReadPageInternal(state, targetPagePtr,
00376                                            pageHeaderSize);
00377 
00378             Assert(pageHeaderSize <= readOff);
00379 
00380             contdata = (char *) state->readBuf + pageHeaderSize;
00381             len = XLOG_BLCKSZ - pageHeaderSize;
00382             if (pageHeader->xlp_rem_len < len)
00383                 len = pageHeader->xlp_rem_len;
00384 
00385             if (readOff < pageHeaderSize + len)
00386                 readOff = ReadPageInternal(state, targetPagePtr,
00387                                            pageHeaderSize + len);
00388 
00389             memcpy(buffer, (char *) contdata, len);
00390             buffer += len;
00391             gotlen += len;
00392 
00393             /* If we just reassembled the record header, validate it. */
00394             if (!gotheader)
00395             {
00396                 record = (XLogRecord *) state->readRecordBuf;
00397                 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
00398                                            record, randAccess))
00399                     goto err;
00400                 gotheader = true;
00401             }
00402         } while (gotlen < total_len);
00403 
00404         Assert(gotheader);
00405 
00406         record = (XLogRecord *) state->readRecordBuf;
00407         if (!ValidXLogRecord(state, record, RecPtr))
00408             goto err;
00409 
00410         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
00411         state->ReadRecPtr = RecPtr;
00412         state->EndRecPtr = targetPagePtr + pageHeaderSize
00413             + MAXALIGN(pageHeader->xlp_rem_len);
00414     }
00415     else
00416     {
00417         /* Wait for the record data to become available */
00418         readOff = ReadPageInternal(state, targetPagePtr,
00419                                  Min(targetRecOff + total_len, XLOG_BLCKSZ));
00420         if (readOff < 0)
00421             goto err;
00422 
00423         /* Record does not cross a page boundary */
00424         if (!ValidXLogRecord(state, record, RecPtr))
00425             goto err;
00426 
00427         state->EndRecPtr = RecPtr + MAXALIGN(total_len);
00428 
00429         state->ReadRecPtr = RecPtr;
00430         memcpy(state->readRecordBuf, record, total_len);
00431     }
00432 
00433     /*
00434      * Special processing if it's an XLOG SWITCH record
00435      */
00436     if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
00437     {
00438         /* Pretend it extends to end of segment */
00439         state->EndRecPtr += XLogSegSize - 1;
00440         state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
00441     }
00442 
00443     return record;
00444 
00445 err:
00446 
00447     /*
00448      * Invalidate the xlog page we've cached. We might read from a different
00449      * source after failure.
00450      */
00451     state->readSegNo = 0;
00452     state->readOff = 0;
00453     state->readLen = 0;
00454 
00455     if (state->errormsg_buf[0] != '\0')
00456         *errormsg = state->errormsg_buf;
00457 
00458     return NULL;
00459 }
00460 
00461 /*
00462  * Read a single xlog page including at least [pageptr, reqLen] of valid data
00463  * via the read_page() callback.
00464  *
00465  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
00466  * is set in that case (unless the error occurs in the read_page callback).
00467  *
00468  * We fetch the page from a reader-local cache if we know we have the required
00469  * data and if there hasn't been any error since caching the data.
00470  */
00471 static int
00472 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
00473 {
00474     int         readLen;
00475     uint32      targetPageOff;
00476     XLogSegNo   targetSegNo;
00477     XLogPageHeader hdr;
00478 
00479     Assert((pageptr % XLOG_BLCKSZ) == 0);
00480 
00481     XLByteToSeg(pageptr, targetSegNo);
00482     targetPageOff = (pageptr % XLogSegSize);
00483 
00484     /* check whether we have all the requested data already */
00485     if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
00486         reqLen < state->readLen)
00487         return state->readLen;
00488 
00489     /*
00490      * Data is not in our buffer.
00491      *
00492      * Every time we actually read the page, even if we looked at parts of it
00493      * before, we need to do verification as the read_page callback might now
00494      * be rereading data from a different source.
00495      *
00496      * Whenever switching to a new WAL segment, we read the first page of the
00497      * file and validate its header, even if that's not where the target
00498      * record is.  This is so that we can check the additional identification
00499      * info that is present in the first page's "long" header.
00500      */
00501     if (targetSegNo != state->readSegNo && targetPageOff != 0)
00502     {
00503         XLogPageHeader hdr;
00504         XLogRecPtr  targetSegmentPtr = pageptr - targetPageOff;
00505 
00506         readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
00507                                    state->currRecPtr,
00508                                    state->readBuf, &state->readPageTLI);
00509         if (readLen < 0)
00510             goto err;
00511 
00512         /* we can be sure to have enough WAL available, we scrolled back */
00513         Assert(readLen == XLOG_BLCKSZ);
00514 
00515         hdr = (XLogPageHeader) state->readBuf;
00516 
00517         if (!ValidXLogPageHeader(state, targetSegmentPtr, hdr))
00518             goto err;
00519     }
00520 
00521     /*
00522      * First, read the requested data length, but at least a short page header
00523      * so that we can validate it.
00524      */
00525     readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
00526                                state->currRecPtr,
00527                                state->readBuf, &state->readPageTLI);
00528     if (readLen < 0)
00529         goto err;
00530 
00531     Assert(readLen <= XLOG_BLCKSZ);
00532 
00533     /* Do we have enough data to check the header length? */
00534     if (readLen <= SizeOfXLogShortPHD)
00535         goto err;
00536 
00537     Assert(readLen >= reqLen);
00538 
00539     hdr = (XLogPageHeader) state->readBuf;
00540 
00541     /* still not enough */
00542     if (readLen < XLogPageHeaderSize(hdr))
00543     {
00544         readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
00545                                    state->currRecPtr,
00546                                    state->readBuf, &state->readPageTLI);
00547         if (readLen < 0)
00548             goto err;
00549     }
00550 
00551     /*
00552      * Now that we know we have the full header, validate it.
00553      */
00554     if (!ValidXLogPageHeader(state, pageptr, hdr))
00555         goto err;
00556 
00557     /* update cache information */
00558     state->readSegNo = targetSegNo;
00559     state->readOff = targetPageOff;
00560     state->readLen = readLen;
00561 
00562     return readLen;
00563 
00564 err:
00565     state->readSegNo = 0;
00566     state->readOff = 0;
00567     state->readLen = 0;
00568     return -1;
00569 }
00570 
00571 /*
00572  * Validate an XLOG record header.
00573  *
00574  * This is just a convenience subroutine to avoid duplicated code in
00575  * XLogReadRecord.  It's not intended for use from anywhere else.
00576  */
00577 static bool
00578 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
00579                       XLogRecPtr PrevRecPtr, XLogRecord *record,
00580                       bool randAccess)
00581 {
00582     /*
00583      * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
00584      * required.
00585      */
00586     if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
00587     {
00588         if (record->xl_len != 0)
00589         {
00590             report_invalid_record(state,
00591                                   "invalid xlog switch record at %X/%X",
00592                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
00593             return false;
00594         }
00595     }
00596     else if (record->xl_len == 0)
00597     {
00598         report_invalid_record(state,
00599                               "record with zero length at %X/%X",
00600                               (uint32) (RecPtr >> 32), (uint32) RecPtr);
00601         return false;
00602     }
00603     if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
00604         record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
00605         XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
00606     {
00607         report_invalid_record(state,
00608                               "invalid record length at %X/%X",
00609                               (uint32) (RecPtr >> 32), (uint32) RecPtr);
00610         return false;
00611     }
00612     if (record->xl_rmid > RM_MAX_ID)
00613     {
00614         report_invalid_record(state,
00615                               "invalid resource manager ID %u at %X/%X",
00616                               record->xl_rmid, (uint32) (RecPtr >> 32),
00617                               (uint32) RecPtr);
00618         return false;
00619     }
00620     if (randAccess)
00621     {
00622         /*
00623          * We can't exactly verify the prev-link, but surely it should be less
00624          * than the record's own address.
00625          */
00626         if (!(record->xl_prev < RecPtr))
00627         {
00628             report_invalid_record(state,
00629                             "record with incorrect prev-link %X/%X at %X/%X",
00630                                   (uint32) (record->xl_prev >> 32),
00631                                   (uint32) record->xl_prev,
00632                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
00633             return false;
00634         }
00635     }
00636     else
00637     {
00638         /*
00639          * Record's prev-link should exactly match our previous location. This
00640          * check guards against torn WAL pages where a stale but valid-looking
00641          * WAL record starts on a sector boundary.
00642          */
00643         if (record->xl_prev != PrevRecPtr)
00644         {
00645             report_invalid_record(state,
00646                             "record with incorrect prev-link %X/%X at %X/%X",
00647                                   (uint32) (record->xl_prev >> 32),
00648                                   (uint32) record->xl_prev,
00649                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
00650             return false;
00651         }
00652     }
00653 
00654     return true;
00655 }
00656 
00657 
00658 /*
00659  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
00660  * record (other than to the minimal extent of computing the amount of
00661  * data to read in) until we've checked the CRCs.
00662  *
00663  * We assume all of the record (that is, xl_tot_len bytes) has been read
00664  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
00665  * record's header, which means in particular that xl_tot_len is at least
00666  * SizeOfXlogRecord, so it is safe to fetch xl_len.
00667  */
00668 static bool
00669 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
00670 {
00671     pg_crc32    crc;
00672     int         i;
00673     uint32      len = record->xl_len;
00674     BkpBlock    bkpb;
00675     char       *blk;
00676     size_t      remaining = record->xl_tot_len;
00677 
00678     /* First the rmgr data */
00679     if (remaining < SizeOfXLogRecord + len)
00680     {
00681         /* ValidXLogRecordHeader() should've caught this already... */
00682         report_invalid_record(state, "invalid record length at %X/%X",
00683                               (uint32) (recptr >> 32), (uint32) recptr);
00684         return false;
00685     }
00686     remaining -= SizeOfXLogRecord + len;
00687     INIT_CRC32(crc);
00688     COMP_CRC32(crc, XLogRecGetData(record), len);
00689 
00690     /* Add in the backup blocks, if any */
00691     blk = (char *) XLogRecGetData(record) + len;
00692     for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
00693     {
00694         uint32      blen;
00695 
00696         if (!(record->xl_info & XLR_BKP_BLOCK(i)))
00697             continue;
00698 
00699         if (remaining < sizeof(BkpBlock))
00700         {
00701             report_invalid_record(state,
00702                               "invalid backup block size in record at %X/%X",
00703                                   (uint32) (recptr >> 32), (uint32) recptr);
00704             return false;
00705         }
00706         memcpy(&bkpb, blk, sizeof(BkpBlock));
00707 
00708         if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
00709         {
00710             report_invalid_record(state,
00711                                   "incorrect hole size in record at %X/%X",
00712                                   (uint32) (recptr >> 32), (uint32) recptr);
00713             return false;
00714         }
00715         blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
00716 
00717         if (remaining < blen)
00718         {
00719             report_invalid_record(state,
00720                               "invalid backup block size in record at %X/%X",
00721                                   (uint32) (recptr >> 32), (uint32) recptr);
00722             return false;
00723         }
00724         remaining -= blen;
00725         COMP_CRC32(crc, blk, blen);
00726         blk += blen;
00727     }
00728 
00729     /* Check that xl_tot_len agrees with our calculation */
00730     if (remaining != 0)
00731     {
00732         report_invalid_record(state,
00733                               "incorrect total length in record at %X/%X",
00734                               (uint32) (recptr >> 32), (uint32) recptr);
00735         return false;
00736     }
00737 
00738     /* Finally include the record header */
00739     COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
00740     FIN_CRC32(crc);
00741 
00742     if (!EQ_CRC32(record->xl_crc, crc))
00743     {
00744         report_invalid_record(state,
00745                "incorrect resource manager data checksum in record at %X/%X",
00746                               (uint32) (recptr >> 32), (uint32) recptr);
00747         return false;
00748     }
00749 
00750     return true;
00751 }
00752 
00753 /*
00754  * Validate a page header
00755  */
00756 static bool
00757 ValidXLogPageHeader(XLogReaderState *state, XLogRecPtr recptr,
00758                     XLogPageHeader hdr)
00759 {
00760     XLogRecPtr  recaddr;
00761     XLogSegNo   segno;
00762     int32       offset;
00763 
00764     Assert((recptr % XLOG_BLCKSZ) == 0);
00765 
00766     XLByteToSeg(recptr, segno);
00767     offset = recptr % XLogSegSize;
00768 
00769     XLogSegNoOffsetToRecPtr(segno, offset, recaddr);
00770 
00771     if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
00772     {
00773         char        fname[MAXFNAMELEN];
00774 
00775         XLogFileName(fname, state->readPageTLI, segno);
00776 
00777         report_invalid_record(state,
00778                     "invalid magic number %04X in log segment %s, offset %u",
00779                               hdr->xlp_magic,
00780                               fname,
00781                               offset);
00782         return false;
00783     }
00784 
00785     if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
00786     {
00787         char        fname[MAXFNAMELEN];
00788 
00789         XLogFileName(fname, state->readPageTLI, segno);
00790 
00791         report_invalid_record(state,
00792                        "invalid info bits %04X in log segment %s, offset %u",
00793                               hdr->xlp_info,
00794                               fname,
00795                               offset);
00796         return false;
00797     }
00798 
00799     if (hdr->xlp_info & XLP_LONG_HEADER)
00800     {
00801         XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
00802 
00803         if (state->system_identifier &&
00804             longhdr->xlp_sysid != state->system_identifier)
00805         {
00806             char        fhdrident_str[32];
00807             char        sysident_str[32];
00808 
00809             /*
00810              * Format sysids separately to keep platform-dependent format code
00811              * out of the translatable message string.
00812              */
00813             snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
00814                      longhdr->xlp_sysid);
00815             snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
00816                      state->system_identifier);
00817             report_invalid_record(state,
00818                                   "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s.",
00819                                   fhdrident_str, sysident_str);
00820             return false;
00821         }
00822         else if (longhdr->xlp_seg_size != XLogSegSize)
00823         {
00824             report_invalid_record(state,
00825                                   "WAL file is from different database system: Incorrect XLOG_SEG_SIZE in page header.");
00826             return false;
00827         }
00828         else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
00829         {
00830             report_invalid_record(state,
00831                                   "WAL file is from different database system: Incorrect XLOG_BLCKSZ in page header.");
00832             return false;
00833         }
00834     }
00835     else if (offset == 0)
00836     {
00837         char        fname[MAXFNAMELEN];
00838 
00839         XLogFileName(fname, state->readPageTLI, segno);
00840 
00841         /* hmm, first page of file doesn't have a long header? */
00842         report_invalid_record(state,
00843                        "invalid info bits %04X in log segment %s, offset %u",
00844                               hdr->xlp_info,
00845                               fname,
00846                               offset);
00847         return false;
00848     }
00849 
00850     if (hdr->xlp_pageaddr != recaddr)
00851     {
00852         char        fname[MAXFNAMELEN];
00853 
00854         XLogFileName(fname, state->readPageTLI, segno);
00855 
00856         report_invalid_record(state,
00857                     "unexpected pageaddr %X/%X in log segment %s, offset %u",
00858               (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
00859                               fname,
00860                               offset);
00861         return false;
00862     }
00863 
00864     /*
00865      * Since child timelines are always assigned a TLI greater than their
00866      * immediate parent's TLI, we should never see TLI go backwards across
00867      * successive pages of a consistent WAL sequence.
00868      *
00869      * Sometimes we re-read a segment that's already been (partially) read. So
00870      * we only verify TLIs for pages that are later than the last remembered
00871      * LSN.
00872      */
00873     if (recptr > state->latestPagePtr)
00874     {
00875         if (hdr->xlp_tli < state->latestPageTLI)
00876         {
00877             char        fname[MAXFNAMELEN];
00878 
00879             XLogFileName(fname, state->readPageTLI, segno);
00880 
00881             report_invalid_record(state,
00882                                   "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
00883                                   hdr->xlp_tli,
00884                                   state->latestPageTLI,
00885                                   fname,
00886                                   offset);
00887             return false;
00888         }
00889     }
00890     state->latestPagePtr = recptr;
00891     state->latestPageTLI = hdr->xlp_tli;
00892 
00893     return true;
00894 }
00895 
00896 #ifdef FRONTEND
00897 /*
00898  * Functions that are currently not needed in the backend, but are better
00899  * implemented inside xlogreader.c because of the internal facilities available
00900  * here.
00901  */
00902 
00903 /*
00904  * Find the first record with at an lsn >= RecPtr.
00905  *
00906  * Useful for checking wether RecPtr is a valid xlog address for reading and to
00907  * find the first valid address after some address when dumping records for
00908  * debugging purposes.
00909  */
00910 XLogRecPtr
00911 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
00912 {
00913     XLogReaderState saved_state = *state;
00914     XLogRecPtr  targetPagePtr;
00915     XLogRecPtr  tmpRecPtr;
00916     int         targetRecOff;
00917     XLogRecPtr  found = InvalidXLogRecPtr;
00918     uint32      pageHeaderSize;
00919     XLogPageHeader header;
00920     XLogRecord *record;
00921     int         readLen;
00922     char       *errormsg;
00923 
00924     Assert(!XLogRecPtrIsInvalid(RecPtr));
00925 
00926     targetRecOff = RecPtr % XLOG_BLCKSZ;
00927 
00928     /* scroll back to page boundary */
00929     targetPagePtr = RecPtr - targetRecOff;
00930 
00931     /* Read the page containing the record */
00932     readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
00933     if (readLen < 0)
00934         goto err;
00935 
00936     header = (XLogPageHeader) state->readBuf;
00937 
00938     pageHeaderSize = XLogPageHeaderSize(header);
00939 
00940     /* make sure we have enough data for the page header */
00941     readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
00942     if (readLen < 0)
00943         goto err;
00944 
00945     /* skip over potential continuation data */
00946     if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
00947     {
00948         /* record headers are MAXALIGN'ed */
00949         tmpRecPtr = targetPagePtr + pageHeaderSize
00950             + MAXALIGN(header->xlp_rem_len);
00951     }
00952     else
00953     {
00954         tmpRecPtr = targetPagePtr + pageHeaderSize;
00955     }
00956 
00957     /*
00958      * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
00959      * because either we're at the first record after the beginning of a page
00960      * or we just jumped over the remaining data of a continuation.
00961      */
00962     while ((record = XLogReadRecord(state, tmpRecPtr, &errormsg)))
00963     {
00964         /* continue after the record */
00965         tmpRecPtr = InvalidXLogRecPtr;
00966 
00967         /* past the record we've found, break out */
00968         if (RecPtr <= state->ReadRecPtr)
00969         {
00970             found = state->ReadRecPtr;
00971             goto out;
00972         }
00973     }
00974 
00975 err:
00976 out:
00977     /* Reset state to what we had before finding the record */
00978     state->readSegNo = 0;
00979     state->readOff = 0;
00980     state->readLen = 0;
00981     state->ReadRecPtr = saved_state.ReadRecPtr;
00982     state->EndRecPtr = saved_state.EndRecPtr;
00983 
00984     return found;
00985 }
00986 
00987 #endif   /* FRONTEND */