Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Typedefs | Functions

xlogreader.h File Reference

#include "access/xlog_internal.h"
Include dependency graph for xlogreader.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  XLogReaderState

Typedefs

typedef struct XLogReaderState XLogReaderState
typedef int(* XLogPageReadCB )(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI)

Functions

XLogReaderStateXLogReaderAllocate (XLogPageReadCB pagereadfunc, void *private_data)
void XLogReaderFree (XLogReaderState *state)
struct XLogRecordXLogReadRecord (XLogReaderState *state, XLogRecPtr recptr, char **errormsg)

Typedef Documentation

typedef int(* XLogPageReadCB)(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI)

Definition at line 27 of file xlogreader.h.

Definition at line 24 of file xlogreader.h.


Function Documentation

XLogReaderState* XLogReaderAllocate ( XLogPageReadCB  pagereadfunc,
void *  private_data 
)

Definition at line 65 of file xlogreader.c.

References allocate_recordbuf(), AssertArg, XLogReaderState::errormsg_buf, free, malloc, MAX_ERRORMSG_LEN, MemSet, NULL, XLogReaderState::private_data, XLogReaderState::read_page, and XLogReaderState::readBuf.

Referenced by main(), and StartupXLOG().

{
    XLogReaderState *state;

    AssertArg(pagereadfunc != NULL);

    state = (XLogReaderState *) malloc(sizeof(XLogReaderState));
    if (!state)
        return NULL;
    MemSet(state, 0, sizeof(XLogReaderState));

    /*
     * Permanently allocate readBuf.  We do it this way, rather than just
     * making a static array, for two reasons: (1) no need to waste the
     * storage in most instantiations of the backend; (2) a static char array
     * isn't guaranteed to have any particular alignment, whereas malloc()
     * will provide MAXALIGN'd storage.
     */
    state->readBuf = (char *) malloc(XLOG_BLCKSZ);
    if (!state->readBuf)
    {
        free(state);
        return NULL;
    }

    state->read_page = pagereadfunc;
    /* system_identifier initialized to zeroes above */
    state->private_data = private_data;
    /* ReadRecPtr and EndRecPtr initialized to zeroes above */
    /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
    state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1);
    if (!state->errormsg_buf)
    {
        free(state->readBuf);
        free(state);
        return NULL;
    }
    state->errormsg_buf[0] = '\0';

    /*
     * Allocate an initial readRecordBuf of minimal size, which can later be
     * enlarged if necessary.
     */
    if (!allocate_recordbuf(state, 0))
    {
        free(state->errormsg_buf);
        free(state->readBuf);
        free(state);
        return NULL;
    }

    return state;
}

void XLogReaderFree ( XLogReaderState state  ) 

Definition at line 120 of file xlogreader.c.

References XLogReaderState::errormsg_buf, free, XLogReaderState::readBuf, and XLogReaderState::readRecordBuf.

Referenced by main(), and StartupXLOG().

{
    free(state->errormsg_buf);
    if (state->readRecordBuf)
        free(state->readRecordBuf);
    free(state->readBuf);
    free(state);
}

struct XLogRecord* XLogReadRecord ( XLogReaderState state,
XLogRecPtr  recptr,
char **  errormsg 
) [read]

Definition at line 178 of file xlogreader.c.

References allocate_recordbuf(), Assert, XLogReaderState::currRecPtr, XLogReaderState::EndRecPtr, XLogReaderState::errormsg_buf, InvalidXLogRecPtr, MAXALIGN, Min, XLogReaderState::readBuf, XLogReaderState::readLen, XLogReaderState::readOff, readOff, ReadPageInternal(), XLogReaderState::readRecordBuf, XLogReaderState::readRecordBufSize, XLogReaderState::ReadRecPtr, XLogReaderState::readSegNo, report_invalid_record(), SizeOfXLogRecord, SizeOfXLogShortPHD, ValidXLogRecord(), ValidXLogRecordHeader(), XLogRecord::xl_info, XLogRecord::xl_rmid, XLogRecord::xl_tot_len, XLOG_SWITCH, XLogPageHeaderSize, XLogSegSize, XLP_FIRST_IS_CONTRECORD, XLogPageHeaderData::xlp_info, XLogPageHeaderData::xlp_rem_len, and XRecOffIsValid.

Referenced by main(), and ReadRecord().

{
    XLogRecord *record;
    XLogRecPtr  targetPagePtr;
    bool        randAccess = false;
    uint32      len,
                total_len;
    uint32      targetRecOff;
    uint32      pageHeaderSize;
    bool        gotheader;
    int         readOff;

    /* reset error state */
    *errormsg = NULL;
    state->errormsg_buf[0] = '\0';

    if (RecPtr == InvalidXLogRecPtr)
    {
        RecPtr = state->EndRecPtr;

        if (state->ReadRecPtr == InvalidXLogRecPtr)
            randAccess = true;

        /*
         * RecPtr is pointing to end+1 of the previous WAL record.  If we're
         * at a page boundary, no more records can fit on the current page. We
         * must skip over the page header, but we can't do that until we've
         * read in the page, since the header size is variable.
         */
    }
    else
    {
        /*
         * In this case, the passed-in record pointer should already be
         * pointing to a valid record starting position.
         */
        Assert(XRecOffIsValid(RecPtr));
        randAccess = true;      /* allow readPageTLI to go backwards too */
    }

    state->currRecPtr = RecPtr;

    targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
    targetRecOff = RecPtr % XLOG_BLCKSZ;

    /*
     * Read the page containing the record into state->readBuf. Request
     * enough byte to cover the whole record header, or at least the part of
     * it that fits on the same page.
     */
    readOff = ReadPageInternal(state,
                               targetPagePtr,
                          Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
    if (readOff < 0)
        goto err;

    /*
     * ReadPageInternal always returns at least the page header, so we can
     * examine it now.
     */
    pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
    if (targetRecOff == 0)
    {
        /*
         * At page start, so skip over page header.
         */
        RecPtr += pageHeaderSize;
        targetRecOff = pageHeaderSize;
    }
    else if (targetRecOff < pageHeaderSize)
    {
        report_invalid_record(state, "invalid record offset at %X/%X",
                              (uint32) (RecPtr >> 32), (uint32) RecPtr);
        goto err;
    }

    if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
        targetRecOff == pageHeaderSize)
    {
        report_invalid_record(state, "contrecord is requested by %X/%X",
                              (uint32) (RecPtr >> 32), (uint32) RecPtr);
        goto err;
    }

    /* ReadPageInternal has verified the page header */
    Assert(pageHeaderSize <= readOff);

    /*
     * Read the record length.
     *
     * NB: Even though we use an XLogRecord pointer here, the whole record
     * header might not fit on this page. xl_tot_len is the first field of the
     * struct, so it must be on this page (the records are MAXALIGNed), but we
     * cannot access any other fields until we've verified that we got the
     * whole header.
     */
    record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
    total_len = record->xl_tot_len;

    /*
     * If the whole record header is on this page, validate it immediately.
     * Otherwise do just a basic sanity check on xl_tot_len, and validate the
     * rest of the header after reading it from the next page.  The xl_tot_len
     * check is necessary here to ensure that we enter the "Need to reassemble
     * record" code path below; otherwise we might fail to apply
     * ValidXLogRecordHeader at all.
     */
    if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
    {
        if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
                                   randAccess))
            goto err;
        gotheader = true;
    }
    else
    {
        /* XXX: more validation should be done here */
        if (total_len < SizeOfXLogRecord)
        {
            report_invalid_record(state, "invalid record length at %X/%X",
                                  (uint32) (RecPtr >> 32), (uint32) RecPtr);
            goto err;
        }
        gotheader = false;
    }

    /*
     * Enlarge readRecordBuf as needed.
     */
    if (total_len > state->readRecordBufSize &&
        !allocate_recordbuf(state, total_len))
    {
        /* We treat this as a "bogus data" condition */
        report_invalid_record(state, "record length %u at %X/%X too long",
                              total_len,
                              (uint32) (RecPtr >> 32), (uint32) RecPtr);
        goto err;
    }

    len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
    if (total_len > len)
    {
        /* Need to reassemble record */
        char       *contdata;
        XLogPageHeader pageHeader;
        char       *buffer;
        uint32      gotlen;

        /* Copy the first fragment of the record from the first page. */
        memcpy(state->readRecordBuf,
               state->readBuf + RecPtr % XLOG_BLCKSZ, len);
        buffer = state->readRecordBuf + len;
        gotlen = len;

        do
        {
            /* Calculate pointer to beginning of next page */
            targetPagePtr += XLOG_BLCKSZ;

            /* Wait for the next page to become available */
            readOff = ReadPageInternal(state, targetPagePtr,
                                 Min(total_len - gotlen + SizeOfXLogShortPHD,
                                     XLOG_BLCKSZ));

            if (readOff < 0)
                goto err;

            Assert(SizeOfXLogShortPHD <= readOff);

            /* Check that the continuation on next page looks valid */
            pageHeader = (XLogPageHeader) state->readBuf;
            if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
            {
                report_invalid_record(state,
                                      "there is no contrecord flag at %X/%X",
                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
                goto err;
            }

            /*
             * Cross-check that xlp_rem_len agrees with how much of the record
             * we expect there to be left.
             */
            if (pageHeader->xlp_rem_len == 0 ||
                total_len != (pageHeader->xlp_rem_len + gotlen))
            {
                report_invalid_record(state,
                                      "invalid contrecord length %u at %X/%X",
                                      pageHeader->xlp_rem_len,
                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
                goto err;
            }

            /* Append the continuation from this page to the buffer */
            pageHeaderSize = XLogPageHeaderSize(pageHeader);

            if (readOff < pageHeaderSize)
                readOff = ReadPageInternal(state, targetPagePtr,
                                           pageHeaderSize);

            Assert(pageHeaderSize <= readOff);

            contdata = (char *) state->readBuf + pageHeaderSize;
            len = XLOG_BLCKSZ - pageHeaderSize;
            if (pageHeader->xlp_rem_len < len)
                len = pageHeader->xlp_rem_len;

            if (readOff < pageHeaderSize + len)
                readOff = ReadPageInternal(state, targetPagePtr,
                                           pageHeaderSize + len);

            memcpy(buffer, (char *) contdata, len);
            buffer += len;
            gotlen += len;

            /* If we just reassembled the record header, validate it. */
            if (!gotheader)
            {
                record = (XLogRecord *) state->readRecordBuf;
                if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
                                           record, randAccess))
                    goto err;
                gotheader = true;
            }
        } while (gotlen < total_len);

        Assert(gotheader);

        record = (XLogRecord *) state->readRecordBuf;
        if (!ValidXLogRecord(state, record, RecPtr))
            goto err;

        pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
        state->ReadRecPtr = RecPtr;
        state->EndRecPtr = targetPagePtr + pageHeaderSize
            + MAXALIGN(pageHeader->xlp_rem_len);
    }
    else
    {
        /* Wait for the record data to become available */
        readOff = ReadPageInternal(state, targetPagePtr,
                                 Min(targetRecOff + total_len, XLOG_BLCKSZ));
        if (readOff < 0)
            goto err;

        /* Record does not cross a page boundary */
        if (!ValidXLogRecord(state, record, RecPtr))
            goto err;

        state->EndRecPtr = RecPtr + MAXALIGN(total_len);

        state->ReadRecPtr = RecPtr;
        memcpy(state->readRecordBuf, record, total_len);
    }

    /*
     * Special processing if it's an XLOG SWITCH record
     */
    if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
    {
        /* Pretend it extends to end of segment */
        state->EndRecPtr += XLogSegSize - 1;
        state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
    }

    return record;

err:

    /*
     * Invalidate the xlog page we've cached. We might read from a different
     * source after failure.
     */
    state->readSegNo = 0;
    state->readOff = 0;
    state->readLen = 0;

    if (state->errormsg_buf[0] != '\0')
        *errormsg = state->errormsg_buf;

    return NULL;
}