#include "access/xlog_internal.h"

Go to the source code of this file.
Data Structures | |
| struct | XLogReaderState |
Typedefs | |
| typedef struct XLogReaderState | XLogReaderState |
| typedef int(* | XLogPageReadCB )(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI) |
Functions | |
| XLogReaderState * | XLogReaderAllocate (XLogPageReadCB pagereadfunc, void *private_data) |
| void | XLogReaderFree (XLogReaderState *state) |
| struct XLogRecord * | XLogReadRecord (XLogReaderState *state, XLogRecPtr recptr, char **errormsg) |
| typedef int(* XLogPageReadCB)(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI) |
Definition at line 27 of file xlogreader.h.
| typedef struct XLogReaderState XLogReaderState |
Definition at line 24 of file xlogreader.h.
| XLogReaderState* XLogReaderAllocate | ( | XLogPageReadCB | pagereadfunc, | |
| void * | private_data | |||
| ) |
Definition at line 65 of file xlogreader.c.
References allocate_recordbuf(), AssertArg, XLogReaderState::errormsg_buf, free, malloc, MAX_ERRORMSG_LEN, MemSet, NULL, XLogReaderState::private_data, XLogReaderState::read_page, and XLogReaderState::readBuf.
Referenced by main(), and StartupXLOG().
{
XLogReaderState *state;
AssertArg(pagereadfunc != NULL);
state = (XLogReaderState *) malloc(sizeof(XLogReaderState));
if (!state)
return NULL;
MemSet(state, 0, sizeof(XLogReaderState));
/*
* Permanently allocate readBuf. We do it this way, rather than just
* making a static array, for two reasons: (1) no need to waste the
* storage in most instantiations of the backend; (2) a static char array
* isn't guaranteed to have any particular alignment, whereas malloc()
* will provide MAXALIGN'd storage.
*/
state->readBuf = (char *) malloc(XLOG_BLCKSZ);
if (!state->readBuf)
{
free(state);
return NULL;
}
state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr and EndRecPtr initialized to zeroes above */
/* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1);
if (!state->errormsg_buf)
{
free(state->readBuf);
free(state);
return NULL;
}
state->errormsg_buf[0] = '\0';
/*
* Allocate an initial readRecordBuf of minimal size, which can later be
* enlarged if necessary.
*/
if (!allocate_recordbuf(state, 0))
{
free(state->errormsg_buf);
free(state->readBuf);
free(state);
return NULL;
}
return state;
}
| void XLogReaderFree | ( | XLogReaderState * | state | ) |
Definition at line 120 of file xlogreader.c.
References XLogReaderState::errormsg_buf, free, XLogReaderState::readBuf, and XLogReaderState::readRecordBuf.
Referenced by main(), and StartupXLOG().
{
free(state->errormsg_buf);
if (state->readRecordBuf)
free(state->readRecordBuf);
free(state->readBuf);
free(state);
}
| struct XLogRecord* XLogReadRecord | ( | XLogReaderState * | state, | |
| XLogRecPtr | recptr, | |||
| char ** | errormsg | |||
| ) | [read] |
Definition at line 178 of file xlogreader.c.
References allocate_recordbuf(), Assert, XLogReaderState::currRecPtr, XLogReaderState::EndRecPtr, XLogReaderState::errormsg_buf, InvalidXLogRecPtr, MAXALIGN, Min, XLogReaderState::readBuf, XLogReaderState::readLen, XLogReaderState::readOff, readOff, ReadPageInternal(), XLogReaderState::readRecordBuf, XLogReaderState::readRecordBufSize, XLogReaderState::ReadRecPtr, XLogReaderState::readSegNo, report_invalid_record(), SizeOfXLogRecord, SizeOfXLogShortPHD, ValidXLogRecord(), ValidXLogRecordHeader(), XLogRecord::xl_info, XLogRecord::xl_rmid, XLogRecord::xl_tot_len, XLOG_SWITCH, XLogPageHeaderSize, XLogSegSize, XLP_FIRST_IS_CONTRECORD, XLogPageHeaderData::xlp_info, XLogPageHeaderData::xlp_rem_len, and XRecOffIsValid.
Referenced by main(), and ReadRecord().
{
XLogRecord *record;
XLogRecPtr targetPagePtr;
bool randAccess = false;
uint32 len,
total_len;
uint32 targetRecOff;
uint32 pageHeaderSize;
bool gotheader;
int readOff;
/* reset error state */
*errormsg = NULL;
state->errormsg_buf[0] = '\0';
if (RecPtr == InvalidXLogRecPtr)
{
RecPtr = state->EndRecPtr;
if (state->ReadRecPtr == InvalidXLogRecPtr)
randAccess = true;
/*
* RecPtr is pointing to end+1 of the previous WAL record. If we're
* at a page boundary, no more records can fit on the current page. We
* must skip over the page header, but we can't do that until we've
* read in the page, since the header size is variable.
*/
}
else
{
/*
* In this case, the passed-in record pointer should already be
* pointing to a valid record starting position.
*/
Assert(XRecOffIsValid(RecPtr));
randAccess = true; /* allow readPageTLI to go backwards too */
}
state->currRecPtr = RecPtr;
targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
targetRecOff = RecPtr % XLOG_BLCKSZ;
/*
* Read the page containing the record into state->readBuf. Request
* enough byte to cover the whole record header, or at least the part of
* it that fits on the same page.
*/
readOff = ReadPageInternal(state,
targetPagePtr,
Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
if (readOff < 0)
goto err;
/*
* ReadPageInternal always returns at least the page header, so we can
* examine it now.
*/
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
if (targetRecOff == 0)
{
/*
* At page start, so skip over page header.
*/
RecPtr += pageHeaderSize;
targetRecOff = pageHeaderSize;
}
else if (targetRecOff < pageHeaderSize)
{
report_invalid_record(state, "invalid record offset at %X/%X",
(uint32) (RecPtr >> 32), (uint32) RecPtr);
goto err;
}
if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
targetRecOff == pageHeaderSize)
{
report_invalid_record(state, "contrecord is requested by %X/%X",
(uint32) (RecPtr >> 32), (uint32) RecPtr);
goto err;
}
/* ReadPageInternal has verified the page header */
Assert(pageHeaderSize <= readOff);
/*
* Read the record length.
*
* NB: Even though we use an XLogRecord pointer here, the whole record
* header might not fit on this page. xl_tot_len is the first field of the
* struct, so it must be on this page (the records are MAXALIGNed), but we
* cannot access any other fields until we've verified that we got the
* whole header.
*/
record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
total_len = record->xl_tot_len;
/*
* If the whole record header is on this page, validate it immediately.
* Otherwise do just a basic sanity check on xl_tot_len, and validate the
* rest of the header after reading it from the next page. The xl_tot_len
* check is necessary here to ensure that we enter the "Need to reassemble
* record" code path below; otherwise we might fail to apply
* ValidXLogRecordHeader at all.
*/
if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
{
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
randAccess))
goto err;
gotheader = true;
}
else
{
/* XXX: more validation should be done here */
if (total_len < SizeOfXLogRecord)
{
report_invalid_record(state, "invalid record length at %X/%X",
(uint32) (RecPtr >> 32), (uint32) RecPtr);
goto err;
}
gotheader = false;
}
/*
* Enlarge readRecordBuf as needed.
*/
if (total_len > state->readRecordBufSize &&
!allocate_recordbuf(state, total_len))
{
/* We treat this as a "bogus data" condition */
report_invalid_record(state, "record length %u at %X/%X too long",
total_len,
(uint32) (RecPtr >> 32), (uint32) RecPtr);
goto err;
}
len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
if (total_len > len)
{
/* Need to reassemble record */
char *contdata;
XLogPageHeader pageHeader;
char *buffer;
uint32 gotlen;
/* Copy the first fragment of the record from the first page. */
memcpy(state->readRecordBuf,
state->readBuf + RecPtr % XLOG_BLCKSZ, len);
buffer = state->readRecordBuf + len;
gotlen = len;
do
{
/* Calculate pointer to beginning of next page */
targetPagePtr += XLOG_BLCKSZ;
/* Wait for the next page to become available */
readOff = ReadPageInternal(state, targetPagePtr,
Min(total_len - gotlen + SizeOfXLogShortPHD,
XLOG_BLCKSZ));
if (readOff < 0)
goto err;
Assert(SizeOfXLogShortPHD <= readOff);
/* Check that the continuation on next page looks valid */
pageHeader = (XLogPageHeader) state->readBuf;
if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
report_invalid_record(state,
"there is no contrecord flag at %X/%X",
(uint32) (RecPtr >> 32), (uint32) RecPtr);
goto err;
}
/*
* Cross-check that xlp_rem_len agrees with how much of the record
* we expect there to be left.
*/
if (pageHeader->xlp_rem_len == 0 ||
total_len != (pageHeader->xlp_rem_len + gotlen))
{
report_invalid_record(state,
"invalid contrecord length %u at %X/%X",
pageHeader->xlp_rem_len,
(uint32) (RecPtr >> 32), (uint32) RecPtr);
goto err;
}
/* Append the continuation from this page to the buffer */
pageHeaderSize = XLogPageHeaderSize(pageHeader);
if (readOff < pageHeaderSize)
readOff = ReadPageInternal(state, targetPagePtr,
pageHeaderSize);
Assert(pageHeaderSize <= readOff);
contdata = (char *) state->readBuf + pageHeaderSize;
len = XLOG_BLCKSZ - pageHeaderSize;
if (pageHeader->xlp_rem_len < len)
len = pageHeader->xlp_rem_len;
if (readOff < pageHeaderSize + len)
readOff = ReadPageInternal(state, targetPagePtr,
pageHeaderSize + len);
memcpy(buffer, (char *) contdata, len);
buffer += len;
gotlen += len;
/* If we just reassembled the record header, validate it. */
if (!gotheader)
{
record = (XLogRecord *) state->readRecordBuf;
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
record, randAccess))
goto err;
gotheader = true;
}
} while (gotlen < total_len);
Assert(gotheader);
record = (XLogRecord *) state->readRecordBuf;
if (!ValidXLogRecord(state, record, RecPtr))
goto err;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
state->ReadRecPtr = RecPtr;
state->EndRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(pageHeader->xlp_rem_len);
}
else
{
/* Wait for the record data to become available */
readOff = ReadPageInternal(state, targetPagePtr,
Min(targetRecOff + total_len, XLOG_BLCKSZ));
if (readOff < 0)
goto err;
/* Record does not cross a page boundary */
if (!ValidXLogRecord(state, record, RecPtr))
goto err;
state->EndRecPtr = RecPtr + MAXALIGN(total_len);
state->ReadRecPtr = RecPtr;
memcpy(state->readRecordBuf, record, total_len);
}
/*
* Special processing if it's an XLOG SWITCH record
*/
if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
state->EndRecPtr += XLogSegSize - 1;
state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
}
return record;
err:
/*
* Invalidate the xlog page we've cached. We might read from a different
* source after failure.
*/
state->readSegNo = 0;
state->readOff = 0;
state->readLen = 0;
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
return NULL;
}
1.7.1