Header And Logo

PostgreSQL
| The world's most advanced open source database.

pg_backup_custom.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * pg_backup_custom.c
00004  *
00005  *  Implements the custom output format.
00006  *
00007  *  The comments with the routined in this code are a good place to
00008  *  understand how to write a new format.
00009  *
00010  *  See the headers to pg_restore for more details.
00011  *
00012  * Copyright (c) 2000, Philip Warner
00013  *      Rights are granted to use this software in any way so long
00014  *      as this notice is not removed.
00015  *
00016  *  The author is not responsible for loss or damages that may
00017  *  and any liability will be limited to the time taken to fix any
00018  *  related bug.
00019  *
00020  *
00021  * IDENTIFICATION
00022  *      src/bin/pg_dump/pg_backup_custom.c
00023  *
00024  *-------------------------------------------------------------------------
00025  */
00026 
00027 #include "compress_io.h"
00028 #include "parallel.h"
00029 #include "pg_backup_utils.h"
00030 
00031 /*--------
00032  * Routines in the format interface
00033  *--------
00034  */
00035 
00036 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
00037 static void _StartData(ArchiveHandle *AH, TocEntry *te);
00038 static size_t _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
00039 static void _EndData(ArchiveHandle *AH, TocEntry *te);
00040 static int  _WriteByte(ArchiveHandle *AH, const int i);
00041 static int  _ReadByte(ArchiveHandle *);
00042 static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
00043 static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
00044 static void _CloseArchive(ArchiveHandle *AH);
00045 static void _ReopenArchive(ArchiveHandle *AH);
00046 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
00047 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
00048 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
00049 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
00050 
00051 static void _PrintData(ArchiveHandle *AH);
00052 static void _skipData(ArchiveHandle *AH);
00053 static void _skipBlobs(ArchiveHandle *AH);
00054 
00055 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
00056 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
00057 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
00058 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
00059 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
00060 static void _Clone(ArchiveHandle *AH);
00061 static void _DeClone(ArchiveHandle *AH);
00062 
00063 static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
00064 static int  _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
00065 char       *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
00066 
00067 typedef struct
00068 {
00069     CompressorState *cs;
00070     int         hasSeek;
00071     pgoff_t     filePos;
00072     pgoff_t     dataStart;
00073 } lclContext;
00074 
00075 typedef struct
00076 {
00077     int         dataState;
00078     pgoff_t     dataPos;
00079 } lclTocEntry;
00080 
00081 
00082 /*------
00083  * Static declarations
00084  *------
00085  */
00086 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
00087 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
00088 
00089 static size_t _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
00090 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
00091 
00092 /* translator: this is a module name */
00093 static const char *modulename = gettext_noop("custom archiver");
00094 
00095 
00096 
00097 /*
00098  *  Init routine required by ALL formats. This is a global routine
00099  *  and should be declared in pg_backup_archiver.h
00100  *
00101  *  It's task is to create any extra archive context (using AH->formatData),
00102  *  and to initialize the supported function pointers.
00103  *
00104  *  It should also prepare whatever it's input source is for reading/writing,
00105  *  and in the case of a read mode connection, it should load the Header & TOC.
00106  */
00107 void
00108 InitArchiveFmt_Custom(ArchiveHandle *AH)
00109 {
00110     lclContext *ctx;
00111 
00112     /* Assuming static functions, this can be copied for each format. */
00113     AH->ArchiveEntryPtr = _ArchiveEntry;
00114     AH->StartDataPtr = _StartData;
00115     AH->WriteDataPtr = _WriteData;
00116     AH->EndDataPtr = _EndData;
00117     AH->WriteBytePtr = _WriteByte;
00118     AH->ReadBytePtr = _ReadByte;
00119     AH->WriteBufPtr = _WriteBuf;
00120     AH->ReadBufPtr = _ReadBuf;
00121     AH->ClosePtr = _CloseArchive;
00122     AH->ReopenPtr = _ReopenArchive;
00123     AH->PrintTocDataPtr = _PrintTocData;
00124     AH->ReadExtraTocPtr = _ReadExtraToc;
00125     AH->WriteExtraTocPtr = _WriteExtraToc;
00126     AH->PrintExtraTocPtr = _PrintExtraToc;
00127 
00128     AH->StartBlobsPtr = _StartBlobs;
00129     AH->StartBlobPtr = _StartBlob;
00130     AH->EndBlobPtr = _EndBlob;
00131     AH->EndBlobsPtr = _EndBlobs;
00132     AH->ClonePtr = _Clone;
00133     AH->DeClonePtr = _DeClone;
00134 
00135     AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
00136     AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
00137 
00138     /* no parallel dump in the custom archive, only parallel restore */
00139     AH->WorkerJobDumpPtr = NULL;
00140     AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
00141 
00142     /* Set up a private area. */
00143     ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
00144     AH->formatData = (void *) ctx;
00145 
00146     /* Initialize LO buffering */
00147     AH->lo_buf_size = LOBBUFSIZE;
00148     AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
00149 
00150     ctx->filePos = 0;
00151 
00152     /*
00153      * Now open the file
00154      */
00155     if (AH->mode == archModeWrite)
00156     {
00157         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
00158         {
00159             AH->FH = fopen(AH->fSpec, PG_BINARY_W);
00160             if (!AH->FH)
00161                 exit_horribly(modulename, "could not open output file \"%s\": %s\n",
00162                               AH->fSpec, strerror(errno));
00163         }
00164         else
00165         {
00166             AH->FH = stdout;
00167             if (!AH->FH)
00168                 exit_horribly(modulename, "could not open output file: %s\n",
00169                               strerror(errno));
00170         }
00171 
00172         ctx->hasSeek = checkSeek(AH->FH);
00173     }
00174     else
00175     {
00176         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
00177         {
00178             AH->FH = fopen(AH->fSpec, PG_BINARY_R);
00179             if (!AH->FH)
00180                 exit_horribly(modulename, "could not open input file \"%s\": %s\n",
00181                               AH->fSpec, strerror(errno));
00182         }
00183         else
00184         {
00185             AH->FH = stdin;
00186             if (!AH->FH)
00187                 exit_horribly(modulename, "could not open input file: %s\n",
00188                               strerror(errno));
00189         }
00190 
00191         ctx->hasSeek = checkSeek(AH->FH);
00192 
00193         ReadHead(AH);
00194         ReadToc(AH);
00195         ctx->dataStart = _getFilePos(AH, ctx);
00196     }
00197 
00198 }
00199 
00200 /*
00201  * Called by the Archiver when the dumper creates a new TOC entry.
00202  *
00203  * Optional.
00204  *
00205  * Set up extrac format-related TOC data.
00206 */
00207 static void
00208 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
00209 {
00210     lclTocEntry *ctx;
00211 
00212     ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
00213     if (te->dataDumper)
00214         ctx->dataState = K_OFFSET_POS_NOT_SET;
00215     else
00216         ctx->dataState = K_OFFSET_NO_DATA;
00217 
00218     te->formatData = (void *) ctx;
00219 }
00220 
00221 /*
00222  * Called by the Archiver to save any extra format-related TOC entry
00223  * data.
00224  *
00225  * Optional.
00226  *
00227  * Use the Archiver routines to write data - they are non-endian, and
00228  * maintain other important file information.
00229  */
00230 static void
00231 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
00232 {
00233     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
00234 
00235     WriteOffset(AH, ctx->dataPos, ctx->dataState);
00236 }
00237 
00238 /*
00239  * Called by the Archiver to read any extra format-related TOC data.
00240  *
00241  * Optional.
00242  *
00243  * Needs to match the order defined in _WriteExtraToc, and should also
00244  * use the Archiver input routines.
00245  */
00246 static void
00247 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
00248 {
00249     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
00250 
00251     if (ctx == NULL)
00252     {
00253         ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
00254         te->formatData = (void *) ctx;
00255     }
00256 
00257     ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
00258 
00259     /*
00260      * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
00261      * dump it at all.
00262      */
00263     if (AH->version < K_VERS_1_7)
00264         ReadInt(AH);
00265 }
00266 
00267 /*
00268  * Called by the Archiver when restoring an archive to output a comment
00269  * that includes useful information about the TOC entry.
00270  *
00271  * Optional.
00272  *
00273  */
00274 static void
00275 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
00276 {
00277     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
00278 
00279     if (AH->public.verbose)
00280         ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
00281                  (int64) ctx->dataPos);
00282 }
00283 
00284 /*
00285  * Called by the archiver when saving TABLE DATA (not schema). This routine
00286  * should save whatever format-specific information is needed to read
00287  * the archive back.
00288  *
00289  * It is called just prior to the dumper's 'DataDumper' routine being called.
00290  *
00291  * Optional, but strongly recommended.
00292  *
00293  */
00294 static void
00295 _StartData(ArchiveHandle *AH, TocEntry *te)
00296 {
00297     lclContext *ctx = (lclContext *) AH->formatData;
00298     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00299 
00300     tctx->dataPos = _getFilePos(AH, ctx);
00301     tctx->dataState = K_OFFSET_POS_SET;
00302 
00303     _WriteByte(AH, BLK_DATA);   /* Block type */
00304     WriteInt(AH, te->dumpId);   /* For sanity check */
00305 
00306     ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
00307 }
00308 
00309 /*
00310  * Called by archiver when dumper calls WriteData. This routine is
00311  * called for both BLOB and TABLE data; it is the responsibility of
00312  * the format to manage each kind of data using StartBlob/StartData.
00313  *
00314  * It should only be called from within a DataDumper routine.
00315  *
00316  * Mandatory.
00317  */
00318 static size_t
00319 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
00320 {
00321     lclContext *ctx = (lclContext *) AH->formatData;
00322     CompressorState *cs = ctx->cs;
00323 
00324     if (dLen == 0)
00325         return 0;
00326 
00327     return WriteDataToArchive(AH, cs, data, dLen);
00328 }
00329 
00330 /*
00331  * Called by the archiver when a dumper's 'DataDumper' routine has
00332  * finished.
00333  *
00334  * Optional.
00335  *
00336  */
00337 static void
00338 _EndData(ArchiveHandle *AH, TocEntry *te)
00339 {
00340     lclContext *ctx = (lclContext *) AH->formatData;
00341 
00342     EndCompressor(AH, ctx->cs);
00343     /* Send the end marker */
00344     WriteInt(AH, 0);
00345 }
00346 
00347 /*
00348  * Called by the archiver when starting to save all BLOB DATA (not schema).
00349  * This routine should save whatever format-specific information is needed
00350  * to read the BLOBs back into memory.
00351  *
00352  * It is called just prior to the dumper's DataDumper routine.
00353  *
00354  * Optional, but strongly recommended.
00355  */
00356 static void
00357 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
00358 {
00359     lclContext *ctx = (lclContext *) AH->formatData;
00360     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00361 
00362     tctx->dataPos = _getFilePos(AH, ctx);
00363     tctx->dataState = K_OFFSET_POS_SET;
00364 
00365     _WriteByte(AH, BLK_BLOBS);  /* Block type */
00366     WriteInt(AH, te->dumpId);   /* For sanity check */
00367 }
00368 
00369 /*
00370  * Called by the archiver when the dumper calls StartBlob.
00371  *
00372  * Mandatory.
00373  *
00374  * Must save the passed OID for retrieval at restore-time.
00375  */
00376 static void
00377 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
00378 {
00379     lclContext *ctx = (lclContext *) AH->formatData;
00380 
00381     if (oid == 0)
00382         exit_horribly(modulename, "invalid OID for large object\n");
00383 
00384     WriteInt(AH, oid);
00385 
00386     ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
00387 }
00388 
00389 /*
00390  * Called by the archiver when the dumper calls EndBlob.
00391  *
00392  * Optional.
00393  */
00394 static void
00395 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
00396 {
00397     lclContext *ctx = (lclContext *) AH->formatData;
00398 
00399     EndCompressor(AH, ctx->cs);
00400     /* Send the end marker */
00401     WriteInt(AH, 0);
00402 }
00403 
00404 /*
00405  * Called by the archiver when finishing saving all BLOB DATA.
00406  *
00407  * Optional.
00408  */
00409 static void
00410 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
00411 {
00412     /* Write out a fake zero OID to mark end-of-blobs. */
00413     WriteInt(AH, 0);
00414 }
00415 
00416 /*
00417  * Print data for a given TOC entry
00418  */
00419 static void
00420 _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
00421 {
00422     lclContext *ctx = (lclContext *) AH->formatData;
00423     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00424     int         blkType;
00425     int         id;
00426 
00427     if (tctx->dataState == K_OFFSET_NO_DATA)
00428         return;
00429 
00430     if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
00431     {
00432         /*
00433          * We cannot seek directly to the desired block.  Instead, skip over
00434          * block headers until we find the one we want.  This could fail if we
00435          * are asked to restore items out-of-order.
00436          */
00437         _readBlockHeader(AH, &blkType, &id);
00438 
00439         while (blkType != EOF && id != te->dumpId)
00440         {
00441             switch (blkType)
00442             {
00443                 case BLK_DATA:
00444                     _skipData(AH);
00445                     break;
00446 
00447                 case BLK_BLOBS:
00448                     _skipBlobs(AH);
00449                     break;
00450 
00451                 default:        /* Always have a default */
00452                     exit_horribly(modulename,
00453                                   "unrecognized data block type (%d) while searching archive\n",
00454                                   blkType);
00455                     break;
00456             }
00457             _readBlockHeader(AH, &blkType, &id);
00458         }
00459     }
00460     else
00461     {
00462         /* We can just seek to the place we need to be. */
00463         if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
00464             exit_horribly(modulename, "error during file seek: %s\n",
00465                           strerror(errno));
00466 
00467         _readBlockHeader(AH, &blkType, &id);
00468     }
00469 
00470     /* Produce suitable failure message if we fell off end of file */
00471     if (blkType == EOF)
00472     {
00473         if (tctx->dataState == K_OFFSET_POS_NOT_SET)
00474             exit_horribly(modulename, "could not find block ID %d in archive -- "
00475                           "possibly due to out-of-order restore request, "
00476                           "which cannot be handled due to lack of data offsets in archive\n",
00477                           te->dumpId);
00478         else if (!ctx->hasSeek)
00479             exit_horribly(modulename, "could not find block ID %d in archive -- "
00480                           "possibly due to out-of-order restore request, "
00481                   "which cannot be handled due to non-seekable input file\n",
00482                           te->dumpId);
00483         else    /* huh, the dataPos led us to EOF? */
00484             exit_horribly(modulename, "could not find block ID %d in archive -- "
00485                           "possibly corrupt archive\n",
00486                           te->dumpId);
00487     }
00488 
00489     /* Are we sane? */
00490     if (id != te->dumpId)
00491         exit_horribly(modulename, "found unexpected block ID (%d) when reading data -- expected %d\n",
00492                       id, te->dumpId);
00493 
00494     switch (blkType)
00495     {
00496         case BLK_DATA:
00497             _PrintData(AH);
00498             break;
00499 
00500         case BLK_BLOBS:
00501             _LoadBlobs(AH, ropt->dropSchema);
00502             break;
00503 
00504         default:                /* Always have a default */
00505             exit_horribly(modulename, "unrecognized data block type %d while restoring archive\n",
00506                           blkType);
00507             break;
00508     }
00509 }
00510 
00511 /*
00512  * Print data from current file position.
00513 */
00514 static void
00515 _PrintData(ArchiveHandle *AH)
00516 {
00517     ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
00518 }
00519 
00520 static void
00521 _LoadBlobs(ArchiveHandle *AH, bool drop)
00522 {
00523     Oid         oid;
00524 
00525     StartRestoreBlobs(AH);
00526 
00527     oid = ReadInt(AH);
00528     while (oid != 0)
00529     {
00530         StartRestoreBlob(AH, oid, drop);
00531         _PrintData(AH);
00532         EndRestoreBlob(AH, oid);
00533         oid = ReadInt(AH);
00534     }
00535 
00536     EndRestoreBlobs(AH);
00537 }
00538 
00539 /*
00540  * Skip the BLOBs from the current file position.
00541  * BLOBS are written sequentially as data blocks (see below).
00542  * Each BLOB is preceded by it's original OID.
00543  * A zero OID indicated the end of the BLOBS
00544  */
00545 static void
00546 _skipBlobs(ArchiveHandle *AH)
00547 {
00548     Oid         oid;
00549 
00550     oid = ReadInt(AH);
00551     while (oid != 0)
00552     {
00553         _skipData(AH);
00554         oid = ReadInt(AH);
00555     }
00556 }
00557 
00558 /*
00559  * Skip data from current file position.
00560  * Data blocks are formatted as an integer length, followed by data.
00561  * A zero length denoted the end of the block.
00562 */
00563 static void
00564 _skipData(ArchiveHandle *AH)
00565 {
00566     lclContext *ctx = (lclContext *) AH->formatData;
00567     size_t      blkLen;
00568     char       *buf = NULL;
00569     int         buflen = 0;
00570     size_t      cnt;
00571 
00572     blkLen = ReadInt(AH);
00573     while (blkLen != 0)
00574     {
00575         if (blkLen > buflen)
00576         {
00577             if (buf)
00578                 free(buf);
00579             buf = (char *) pg_malloc(blkLen);
00580             buflen = blkLen;
00581         }
00582         cnt = fread(buf, 1, blkLen, AH->FH);
00583         if (cnt != blkLen)
00584         {
00585             if (feof(AH->FH))
00586                 exit_horribly(modulename,
00587                             "could not read from input file: end of file\n");
00588             else
00589                 exit_horribly(modulename,
00590                     "could not read from input file: %s\n", strerror(errno));
00591         }
00592 
00593         ctx->filePos += blkLen;
00594 
00595         blkLen = ReadInt(AH);
00596     }
00597 
00598     if (buf)
00599         free(buf);
00600 }
00601 
00602 /*
00603  * Write a byte of data to the archive.
00604  *
00605  * Mandatory.
00606  *
00607  * Called by the archiver to do integer & byte output to the archive.
00608  */
00609 static int
00610 _WriteByte(ArchiveHandle *AH, const int i)
00611 {
00612     lclContext *ctx = (lclContext *) AH->formatData;
00613     int         res;
00614 
00615     res = fputc(i, AH->FH);
00616     if (res != EOF)
00617         ctx->filePos += 1;
00618     else
00619         exit_horribly(modulename, "could not write byte: %s\n", strerror(errno));
00620     return res;
00621 }
00622 
00623 /*
00624  * Read a byte of data from the archive.
00625  *
00626  * Mandatory
00627  *
00628  * Called by the archiver to read bytes & integers from the archive.
00629  * EOF should be treated as a fatal error.
00630  */
00631 static int
00632 _ReadByte(ArchiveHandle *AH)
00633 {
00634     lclContext *ctx = (lclContext *) AH->formatData;
00635     int         res;
00636 
00637     res = getc(AH->FH);
00638     if (res == EOF)
00639         exit_horribly(modulename, "unexpected end of file\n");
00640     ctx->filePos += 1;
00641     return res;
00642 }
00643 
00644 /*
00645  * Write a buffer of data to the archive.
00646  *
00647  * Mandatory.
00648  *
00649  * Called by the archiver to write a block of bytes to the archive.
00650  */
00651 static size_t
00652 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
00653 {
00654     lclContext *ctx = (lclContext *) AH->formatData;
00655     size_t      res;
00656 
00657     res = fwrite(buf, 1, len, AH->FH);
00658 
00659     if (res != len)
00660         exit_horribly(modulename,
00661                     "could not write to output file: %s\n", strerror(errno));
00662 
00663     ctx->filePos += res;
00664     return res;
00665 }
00666 
00667 /*
00668  * Read a block of bytes from the archive.
00669  *
00670  * Mandatory.
00671  *
00672  * Called by the archiver to read a block of bytes from the archive
00673  */
00674 static size_t
00675 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
00676 {
00677     lclContext *ctx = (lclContext *) AH->formatData;
00678     size_t      res;
00679 
00680     res = fread(buf, 1, len, AH->FH);
00681     ctx->filePos += res;
00682 
00683     return res;
00684 }
00685 
00686 /*
00687  * Close the archive.
00688  *
00689  * Mandatory.
00690  *
00691  * When writing the archive, this is the routine that actually starts
00692  * the process of saving it to files. No data should be written prior
00693  * to this point, since the user could sort the TOC after creating it.
00694  *
00695  * If an archive is to be written, this toutine must call:
00696  *      WriteHead           to save the archive header
00697  *      WriteToc            to save the TOC entries
00698  *      WriteDataChunks     to save all DATA & BLOBs.
00699  *
00700  */
00701 static void
00702 _CloseArchive(ArchiveHandle *AH)
00703 {
00704     lclContext *ctx = (lclContext *) AH->formatData;
00705     pgoff_t     tpos;
00706 
00707     if (AH->mode == archModeWrite)
00708     {
00709         WriteHead(AH);
00710         tpos = ftello(AH->FH);
00711         WriteToc(AH);
00712         ctx->dataStart = _getFilePos(AH, ctx);
00713         WriteDataChunks(AH, NULL);
00714 
00715         /*
00716          * If possible, re-write the TOC in order to update the data offset
00717          * information.  This is not essential, as pg_restore can cope in most
00718          * cases without it; but it can make pg_restore significantly faster
00719          * in some situations (especially parallel restore).
00720          */
00721         if (ctx->hasSeek &&
00722             fseeko(AH->FH, tpos, SEEK_SET) == 0)
00723             WriteToc(AH);
00724     }
00725 
00726     if (fclose(AH->FH) != 0)
00727         exit_horribly(modulename, "could not close archive file: %s\n", strerror(errno));
00728 
00729     AH->FH = NULL;
00730 }
00731 
00732 /*
00733  * Reopen the archive's file handle.
00734  *
00735  * We close the original file handle, except on Windows.  (The difference
00736  * is because on Windows, this is used within a multithreading context,
00737  * and we don't want a thread closing the parent file handle.)
00738  */
00739 static void
00740 _ReopenArchive(ArchiveHandle *AH)
00741 {
00742     lclContext *ctx = (lclContext *) AH->formatData;
00743     pgoff_t     tpos;
00744 
00745     if (AH->mode == archModeWrite)
00746         exit_horribly(modulename, "can only reopen input archives\n");
00747 
00748     /*
00749      * These two cases are user-facing errors since they represent unsupported
00750      * (but not invalid) use-cases.  Word the error messages appropriately.
00751      */
00752     if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
00753         exit_horribly(modulename, "parallel restore from standard input is not supported\n");
00754     if (!ctx->hasSeek)
00755         exit_horribly(modulename, "parallel restore from non-seekable file is not supported\n");
00756 
00757     errno = 0;
00758     tpos = ftello(AH->FH);
00759     if (errno)
00760         exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
00761                       strerror(errno));
00762 
00763 #ifndef WIN32
00764     if (fclose(AH->FH) != 0)
00765         exit_horribly(modulename, "could not close archive file: %s\n",
00766                       strerror(errno));
00767 #endif
00768 
00769     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
00770     if (!AH->FH)
00771         exit_horribly(modulename, "could not open input file \"%s\": %s\n",
00772                       AH->fSpec, strerror(errno));
00773 
00774     if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
00775         exit_horribly(modulename, "could not set seek position in archive file: %s\n",
00776                       strerror(errno));
00777 }
00778 
00779 /*
00780  * Clone format-specific fields during parallel restoration.
00781  */
00782 static void
00783 _Clone(ArchiveHandle *AH)
00784 {
00785     lclContext *ctx = (lclContext *) AH->formatData;
00786 
00787     AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
00788     memcpy(AH->formatData, ctx, sizeof(lclContext));
00789     ctx = (lclContext *) AH->formatData;
00790 
00791     /* sanity check, shouldn't happen */
00792     if (ctx->cs != NULL)
00793         exit_horribly(modulename, "compressor active\n");
00794 
00795     /*
00796      * Note: we do not make a local lo_buf because we expect at most one BLOBS
00797      * entry per archive, so no parallelism is possible.  Likewise,
00798      * TOC-entry-local state isn't an issue because any one TOC entry is
00799      * touched by just one worker child.
00800      */
00801 }
00802 
00803 static void
00804 _DeClone(ArchiveHandle *AH)
00805 {
00806     lclContext *ctx = (lclContext *) AH->formatData;
00807 
00808     free(ctx);
00809 }
00810 
00811 /*
00812  * This function is executed in the child of a parallel backup for the
00813  * custom format archive and dumps the actual data.
00814  */
00815 char *
00816 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
00817 {
00818     /*
00819      * short fixed-size string + some ID so far, this needs to be malloc'ed
00820      * instead of static because we work with threads on windows
00821      */
00822     const int   buflen = 64;
00823     char       *buf = (char *) pg_malloc(buflen);
00824     ParallelArgs pargs;
00825     int         status;
00826 
00827     pargs.AH = AH;
00828     pargs.te = te;
00829 
00830     status = parallel_restore(&pargs);
00831 
00832     snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
00833              status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
00834 
00835     return buf;
00836 }
00837 
00838 /*
00839  * This function is executed in the parent process. Depending on the desired
00840  * action (dump or restore) it creates a string that is understood by the
00841  * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
00842  */
00843 static char *
00844 _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
00845 {
00846     /*
00847      * A static char is okay here, even on Windows because we call this
00848      * function only from one process (the master).
00849      */
00850     static char buf[64];        /* short fixed-size string + number */
00851 
00852     /* no parallel dump in the custom archive format */
00853     Assert(act == ACT_RESTORE);
00854 
00855     snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
00856 
00857     return buf;
00858 }
00859 
00860 /*
00861  * This function is executed in the parent process. It analyzes the response of
00862  * the _WorkerJobDump / _WorkerJobRestore functions of the dump format.
00863  */
00864 static int
00865 _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
00866 {
00867     DumpId      dumpId;
00868     int         nBytes,
00869                 status,
00870                 n_errors;
00871 
00872     /* no parallel dump in the custom archive */
00873     Assert(act == ACT_RESTORE);
00874 
00875     sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
00876 
00877     Assert(nBytes == strlen(str));
00878     Assert(dumpId == te->dumpId);
00879 
00880     AH->public.n_errors += n_errors;
00881 
00882     return status;
00883 }
00884 
00885 /*--------------------------------------------------
00886  * END OF FORMAT CALLBACKS
00887  *--------------------------------------------------
00888  */
00889 
00890 /*
00891  * Get the current position in the archive file.
00892  */
00893 static pgoff_t
00894 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
00895 {
00896     pgoff_t     pos;
00897 
00898     if (ctx->hasSeek)
00899     {
00900         pos = ftello(AH->FH);
00901         if (pos != ctx->filePos)
00902         {
00903             write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell used\n");
00904 
00905             /*
00906              * Prior to 1.7 (pg7.3) we relied on the internally maintained
00907              * pointer. Now we rely on ftello() always, unless the file has
00908              * been found to not support it.
00909              */
00910         }
00911     }
00912     else
00913         pos = ctx->filePos;
00914     return pos;
00915 }
00916 
00917 /*
00918  * Read a data block header. The format changed in V1.3, so we
00919  * centralize the code here for simplicity.  Returns *type = EOF
00920  * if at EOF.
00921  */
00922 static void
00923 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
00924 {
00925     lclContext *ctx = (lclContext *) AH->formatData;
00926     int         byt;
00927 
00928     /*
00929      * Note: if we are at EOF with a pre-1.3 input file, we'll exit_horribly
00930      * inside ReadInt rather than returning EOF.  It doesn't seem worth
00931      * jumping through hoops to deal with that case better, because no such
00932      * files are likely to exist in the wild: only some 7.1 development
00933      * versions of pg_dump ever generated such files.
00934      */
00935     if (AH->version < K_VERS_1_3)
00936         *type = BLK_DATA;
00937     else
00938     {
00939         byt = getc(AH->FH);
00940         *type = byt;
00941         if (byt == EOF)
00942         {
00943             *id = 0;            /* don't return an uninitialized value */
00944             return;
00945         }
00946         ctx->filePos += 1;
00947     }
00948 
00949     *id = ReadInt(AH);
00950 }
00951 
00952 /*
00953  * Callback function for WriteDataToArchive. Writes one block of (compressed)
00954  * data to the archive.
00955  */
00956 static size_t
00957 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
00958 {
00959     /* never write 0-byte blocks (this should not happen) */
00960     if (len == 0)
00961         return 0;
00962 
00963     WriteInt(AH, len);
00964     return _WriteBuf(AH, buf, len);
00965 }
00966 
00967 /*
00968  * Callback function for ReadDataFromArchive. To keep things simple, we
00969  * always read one compressed block at a time.
00970  */
00971 static size_t
00972 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
00973 {
00974     size_t      blkLen;
00975     size_t      cnt;
00976 
00977     /* Read length */
00978     blkLen = ReadInt(AH);
00979     if (blkLen == 0)
00980         return 0;
00981 
00982     /* If the caller's buffer is not large enough, allocate a bigger one */
00983     if (blkLen > *buflen)
00984     {
00985         free(*buf);
00986         *buf = (char *) pg_malloc(blkLen);
00987         *buflen = blkLen;
00988     }
00989 
00990     cnt = _ReadBuf(AH, *buf, blkLen);
00991     if (cnt != blkLen)
00992     {
00993         if (feof(AH->FH))
00994             exit_horribly(modulename,
00995                           "could not read from input file: end of file\n");
00996         else
00997             exit_horribly(modulename,
00998                     "could not read from input file: %s\n", strerror(errno));
00999     }
01000     return cnt;
01001 }