Header And Logo

PostgreSQL
| The world's most advanced open source database.

pg_backup_archiver.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * pg_backup_archiver.c
00004  *
00005  *  Private implementation of the archiver routines.
00006  *
00007  *  See the headers to pg_restore for more details.
00008  *
00009  * Copyright (c) 2000, Philip Warner
00010  *  Rights are granted to use this software in any way so long
00011  *  as this notice is not removed.
00012  *
00013  *  The author is not responsible for loss or damages that may
00014  *  result from its use.
00015  *
00016  *
00017  * IDENTIFICATION
00018  *      src/bin/pg_dump/pg_backup_archiver.c
00019  *
00020  *-------------------------------------------------------------------------
00021  */
00022 
00023 #include "pg_backup_db.h"
00024 #include "pg_backup_utils.h"
00025 #include "parallel.h"
00026 
00027 #include <ctype.h>
00028 #include <fcntl.h>
00029 #include <unistd.h>
00030 #include <sys/stat.h>
00031 #include <sys/types.h>
00032 #include <sys/wait.h>
00033 
00034 #ifdef WIN32
00035 #include <io.h>
00036 #endif
00037 
00038 #include "libpq/libpq-fs.h"
00039 
00040 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
00041 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
00042 
00043 /* state needed to save/restore an archive's output target */
00044 typedef struct _outputContext
00045 {
00046     void       *OF;
00047     int         gzOut;
00048 } OutputContext;
00049 
00050 /* translator: this is a module name */
00051 static const char *modulename = gettext_noop("archiver");
00052 
00053 
00054 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
00055      const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr);
00056 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
00057                       ArchiveHandle *AH);
00058 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
00059 static char *replace_line_endings(const char *str);
00060 static void _doSetFixedOutputState(ArchiveHandle *AH);
00061 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
00062 static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
00063 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
00064 static void _becomeUser(ArchiveHandle *AH, const char *user);
00065 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
00066 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
00067 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
00068 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
00069 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
00070 static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt);
00071 static bool _tocEntryIsACL(TocEntry *te);
00072 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
00073 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
00074 static void buildTocEntryArrays(ArchiveHandle *AH);
00075 static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
00076 static int  _discoverArchiveFormat(ArchiveHandle *AH);
00077 
00078 static int  RestoringToDB(ArchiveHandle *AH);
00079 static void dump_lo_buf(ArchiveHandle *AH);
00080 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
00081 static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
00082 static OutputContext SaveOutput(ArchiveHandle *AH);
00083 static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
00084 
00085 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
00086                   RestoreOptions *ropt, bool is_parallel);
00087 static void restore_toc_entries_prefork(ArchiveHandle *AH);
00088 static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
00089                              TocEntry *pending_list);
00090 static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
00091 static void par_list_header_init(TocEntry *l);
00092 static void par_list_append(TocEntry *l, TocEntry *te);
00093 static void par_list_remove(TocEntry *te);
00094 static TocEntry *get_next_work_item(ArchiveHandle *AH,
00095                    TocEntry *ready_list,
00096                    ParallelState *pstate);
00097 static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
00098                int worker, int status,
00099                ParallelState *pstate);
00100 static void fix_dependencies(ArchiveHandle *AH);
00101 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
00102 static void repoint_table_dependencies(ArchiveHandle *AH);
00103 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
00104 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
00105                     TocEntry *ready_list);
00106 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
00107 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
00108 
00109 /*
00110  *  Wrapper functions.
00111  *
00112  *  The objective it to make writing new formats and dumpers as simple
00113  *  as possible, if necessary at the expense of extra function calls etc.
00114  *
00115  */
00116 
00117 /*
00118  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
00119  * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
00120  * setup doesn't need to know anything much, so it's defined here.
00121  */
00122 static void
00123 setupRestoreWorker(Archive *AHX, RestoreOptions *ropt)
00124 {
00125     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00126 
00127     (AH->ReopenPtr) (AH);
00128 }
00129 
00130 
00131 /* Create a new archive */
00132 /* Public */
00133 Archive *
00134 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
00135      const int compression, ArchiveMode mode, SetupWorkerPtr setupDumpWorker)
00136 
00137 {
00138     ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode, setupDumpWorker);
00139 
00140     return (Archive *) AH;
00141 }
00142 
00143 /* Open an existing archive */
00144 /* Public */
00145 Archive *
00146 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
00147 {
00148     ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead, setupRestoreWorker);
00149 
00150     return (Archive *) AH;
00151 }
00152 
00153 /* Public */
00154 void
00155 CloseArchive(Archive *AHX)
00156 {
00157     int         res = 0;
00158     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00159 
00160     (*AH->ClosePtr) (AH);
00161 
00162     /* Close the output */
00163     if (AH->gzOut)
00164         res = GZCLOSE(AH->OF);
00165     else if (AH->OF != stdout)
00166         res = fclose(AH->OF);
00167 
00168     if (res != 0)
00169         exit_horribly(modulename, "could not close output file: %s\n",
00170                       strerror(errno));
00171 }
00172 
00173 /* Public */
00174 void
00175 SetArchiveRestoreOptions(Archive *AHX, RestoreOptions *ropt)
00176 {
00177     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00178     TocEntry   *te;
00179     teSection   curSection;
00180 
00181     /* Save options for later access */
00182     AH->ropt = ropt;
00183 
00184     /* Decide which TOC entries will be dumped/restored, and mark them */
00185     curSection = SECTION_PRE_DATA;
00186     for (te = AH->toc->next; te != AH->toc; te = te->next)
00187     {
00188         /*
00189          * When writing an archive, we also take this opportunity to check
00190          * that we have generated the entries in a sane order that respects
00191          * the section divisions.  When reading, don't complain, since buggy
00192          * old versions of pg_dump might generate out-of-order archives.
00193          */
00194         if (AH->mode != archModeRead)
00195         {
00196             switch (te->section)
00197             {
00198                 case SECTION_NONE:
00199                     /* ok to be anywhere */
00200                     break;
00201                 case SECTION_PRE_DATA:
00202                     if (curSection != SECTION_PRE_DATA)
00203                         write_msg(modulename,
00204                                   "WARNING: archive items not in correct section order\n");
00205                     break;
00206                 case SECTION_DATA:
00207                     if (curSection == SECTION_POST_DATA)
00208                         write_msg(modulename,
00209                                   "WARNING: archive items not in correct section order\n");
00210                     break;
00211                 case SECTION_POST_DATA:
00212                     /* ok no matter which section we were in */
00213                     break;
00214                 default:
00215                     exit_horribly(modulename, "unexpected section code %d\n",
00216                                   (int) te->section);
00217                     break;
00218             }
00219         }
00220 
00221         if (te->section != SECTION_NONE)
00222             curSection = te->section;
00223 
00224         te->reqs = _tocEntryRequired(te, curSection, ropt);
00225     }
00226 }
00227 
00228 /* Public */
00229 void
00230 RestoreArchive(Archive *AHX)
00231 {
00232     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00233     RestoreOptions *ropt = AH->ropt;
00234     bool        parallel_mode;
00235     TocEntry   *te;
00236     OutputContext sav;
00237 
00238     AH->stage = STAGE_INITIALIZING;
00239 
00240     /*
00241      * Check for nonsensical option combinations.
00242      *
00243      * -C is not compatible with -1, because we can't create a database inside
00244      * a transaction block.
00245      */
00246     if (ropt->createDB && ropt->single_txn)
00247         exit_horribly(modulename, "-C and -1 are incompatible options\n");
00248 
00249     /*
00250      * If we're going to do parallel restore, there are some restrictions.
00251      */
00252     parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
00253     if (parallel_mode)
00254     {
00255         /* We haven't got round to making this work for all archive formats */
00256         if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
00257             exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
00258 
00259         /* Doesn't work if the archive represents dependencies as OIDs */
00260         if (AH->version < K_VERS_1_8)
00261             exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
00262 
00263         /*
00264          * It's also not gonna work if we can't reopen the input file, so
00265          * let's try that immediately.
00266          */
00267         (AH->ReopenPtr) (AH);
00268     }
00269 
00270     /*
00271      * Make sure we won't need (de)compression we haven't got
00272      */
00273 #ifndef HAVE_LIBZ
00274     if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL)
00275     {
00276         for (te = AH->toc->next; te != AH->toc; te = te->next)
00277         {
00278             if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
00279                 exit_horribly(modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
00280         }
00281     }
00282 #endif
00283 
00284     /*
00285      * Prepare index arrays, so we can assume we have them throughout restore.
00286      * It's possible we already did this, though.
00287      */
00288     if (AH->tocsByDumpId == NULL)
00289         buildTocEntryArrays(AH);
00290 
00291     /*
00292      * If we're using a DB connection, then connect it.
00293      */
00294     if (ropt->useDB)
00295     {
00296         ahlog(AH, 1, "connecting to database for restore\n");
00297         if (AH->version < K_VERS_1_3)
00298             exit_horribly(modulename, "direct database connections are not supported in pre-1.3 archives\n");
00299 
00300         /* XXX Should get this from the archive */
00301         AHX->minRemoteVersion = 070100;
00302         AHX->maxRemoteVersion = 999999;
00303 
00304         ConnectDatabase(AHX, ropt->dbname,
00305                         ropt->pghost, ropt->pgport, ropt->username,
00306                         ropt->promptPassword);
00307 
00308         /*
00309          * If we're talking to the DB directly, don't send comments since they
00310          * obscure SQL when displaying errors
00311          */
00312         AH->noTocComments = 1;
00313     }
00314 
00315     /*
00316      * Work out if we have an implied data-only restore. This can happen if
00317      * the dump was data only or if the user has used a toc list to exclude
00318      * all of the schema data. All we do is look for schema entries - if none
00319      * are found then we set the dataOnly flag.
00320      *
00321      * We could scan for wanted TABLE entries, but that is not the same as
00322      * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
00323      */
00324     if (!ropt->dataOnly)
00325     {
00326         int         impliedDataOnly = 1;
00327 
00328         for (te = AH->toc->next; te != AH->toc; te = te->next)
00329         {
00330             if ((te->reqs & REQ_SCHEMA) != 0)
00331             {                   /* It's schema, and it's wanted */
00332                 impliedDataOnly = 0;
00333                 break;
00334             }
00335         }
00336         if (impliedDataOnly)
00337         {
00338             ropt->dataOnly = impliedDataOnly;
00339             ahlog(AH, 1, "implied data-only restore\n");
00340         }
00341     }
00342 
00343     /*
00344      * Setup the output file if necessary.
00345      */
00346     sav = SaveOutput(AH);
00347     if (ropt->filename || ropt->compression)
00348         SetOutput(AH, ropt->filename, ropt->compression);
00349 
00350     ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
00351 
00352     if (AH->public.verbose)
00353     {
00354         if (AH->archiveRemoteVersion)
00355             ahprintf(AH, "-- Dumped from database version %s\n",
00356                      AH->archiveRemoteVersion);
00357         if (AH->archiveDumpVersion)
00358             ahprintf(AH, "-- Dumped by pg_dump version %s\n",
00359                      AH->archiveDumpVersion);
00360         dumpTimestamp(AH, "Started on", AH->createDate);
00361     }
00362 
00363     if (ropt->single_txn)
00364     {
00365         if (AH->connection)
00366             StartTransaction(AH);
00367         else
00368             ahprintf(AH, "BEGIN;\n\n");
00369     }
00370 
00371     /*
00372      * Establish important parameter values right away.
00373      */
00374     _doSetFixedOutputState(AH);
00375 
00376     AH->stage = STAGE_PROCESSING;
00377 
00378     /*
00379      * Drop the items at the start, in reverse order
00380      */
00381     if (ropt->dropSchema)
00382     {
00383         for (te = AH->toc->prev; te != AH->toc; te = te->prev)
00384         {
00385             AH->currentTE = te;
00386 
00387             /*
00388              * In createDB mode, issue a DROP *only* for the database as a
00389              * whole.  Issuing drops against anything else would be wrong,
00390              * because at this point we're connected to the wrong database.
00391              * Conversely, if we're not in createDB mode, we'd better not
00392              * issue a DROP against the database at all.
00393              */
00394             if (ropt->createDB)
00395             {
00396                 if (strcmp(te->desc, "DATABASE") != 0)
00397                     continue;
00398             }
00399             else
00400             {
00401                 if (strcmp(te->desc, "DATABASE") == 0)
00402                     continue;
00403             }
00404 
00405             /* Otherwise, drop anything that's selected and has a dropStmt */
00406             if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
00407             {
00408                 ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
00409                 /* Select owner and schema as necessary */
00410                 _becomeOwner(AH, te);
00411                 _selectOutputSchema(AH, te->namespace);
00412                 /* Drop it */
00413                 ahprintf(AH, "%s", te->dropStmt);
00414             }
00415         }
00416 
00417         /*
00418          * _selectOutputSchema may have set currSchema to reflect the effect
00419          * of a "SET search_path" command it emitted.  However, by now we may
00420          * have dropped that schema; or it might not have existed in the first
00421          * place.  In either case the effective value of search_path will not
00422          * be what we think.  Forcibly reset currSchema so that we will
00423          * re-establish the search_path setting when needed (after creating
00424          * the schema).
00425          *
00426          * If we treated users as pg_dump'able objects then we'd need to reset
00427          * currUser here too.
00428          */
00429         if (AH->currSchema)
00430             free(AH->currSchema);
00431         AH->currSchema = NULL;
00432     }
00433 
00434     /*
00435      * In serial mode, we now process each non-ACL TOC entry.
00436      *
00437      * In parallel mode, turn control over to the parallel-restore logic.
00438      */
00439     if (parallel_mode)
00440     {
00441         ParallelState *pstate;
00442         TocEntry    pending_list;
00443 
00444         par_list_header_init(&pending_list);
00445 
00446         /* This runs PRE_DATA items and then disconnects from the database */
00447         restore_toc_entries_prefork(AH);
00448         Assert(AH->connection == NULL);
00449 
00450         /* ParallelBackupStart() will actually fork the processes */
00451         pstate = ParallelBackupStart(AH, ropt);
00452         restore_toc_entries_parallel(AH, pstate, &pending_list);
00453         ParallelBackupEnd(AH, pstate);
00454 
00455         /* reconnect the master and see if we missed something */
00456         restore_toc_entries_postfork(AH, &pending_list);
00457         Assert(AH->connection != NULL);
00458     }
00459     else
00460     {
00461         for (te = AH->toc->next; te != AH->toc; te = te->next)
00462             (void) restore_toc_entry(AH, te, ropt, false);
00463     }
00464 
00465     /*
00466      * Scan TOC again to output ownership commands and ACLs
00467      */
00468     for (te = AH->toc->next; te != AH->toc; te = te->next)
00469     {
00470         AH->currentTE = te;
00471 
00472         /* Both schema and data objects might now have ownership/ACLs */
00473         if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
00474         {
00475             ahlog(AH, 1, "setting owner and privileges for %s %s\n",
00476                   te->desc, te->tag);
00477             _printTocEntry(AH, te, ropt, false, true);
00478         }
00479     }
00480 
00481     if (ropt->single_txn)
00482     {
00483         if (AH->connection)
00484             CommitTransaction(AH);
00485         else
00486             ahprintf(AH, "COMMIT;\n\n");
00487     }
00488 
00489     if (AH->public.verbose)
00490         dumpTimestamp(AH, "Completed on", time(NULL));
00491 
00492     ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
00493 
00494     /*
00495      * Clean up & we're done.
00496      */
00497     AH->stage = STAGE_FINALIZING;
00498 
00499     if (ropt->filename || ropt->compression)
00500         RestoreOutput(AH, sav);
00501 
00502     if (ropt->useDB)
00503         DisconnectDatabase(&AH->public);
00504 }
00505 
00506 /*
00507  * Restore a single TOC item.  Used in both parallel and non-parallel restore;
00508  * is_parallel is true if we are in a worker child process.
00509  *
00510  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
00511  * the parallel parent has to make the corresponding status update.
00512  */
00513 static int
00514 restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
00515                   RestoreOptions *ropt, bool is_parallel)
00516 {
00517     int         status = WORKER_OK;
00518     teReqs      reqs;
00519     bool        defnDumped;
00520 
00521     AH->currentTE = te;
00522 
00523     /* Work out what, if anything, we want from this entry */
00524     if (_tocEntryIsACL(te))
00525         reqs = 0;               /* ACLs are never restored here */
00526     else
00527         reqs = te->reqs;
00528 
00529     /*
00530      * Ignore DATABASE entry unless we should create it.  We must check this
00531      * here, not in _tocEntryRequired, because the createDB option should not
00532      * affect emitting a DATABASE entry to an archive file.
00533      */
00534     if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0)
00535         reqs = 0;
00536 
00537     /* Dump any relevant dump warnings to stderr */
00538     if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
00539     {
00540         if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
00541             write_msg(modulename, "warning from original dump file: %s\n", te->defn);
00542         else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
00543             write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
00544     }
00545 
00546     defnDumped = false;
00547 
00548     if ((reqs & REQ_SCHEMA) != 0)       /* We want the schema */
00549     {
00550         ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
00551 
00552         _printTocEntry(AH, te, ropt, false, false);
00553         defnDumped = true;
00554 
00555         if (strcmp(te->desc, "TABLE") == 0)
00556         {
00557             if (AH->lastErrorTE == te)
00558             {
00559                 /*
00560                  * We failed to create the table. If
00561                  * --no-data-for-failed-tables was given, mark the
00562                  * corresponding TABLE DATA to be ignored.
00563                  *
00564                  * In the parallel case this must be done in the parent, so we
00565                  * just set the return value.
00566                  */
00567                 if (ropt->noDataForFailedTables)
00568                 {
00569                     if (is_parallel)
00570                         status = WORKER_INHIBIT_DATA;
00571                     else
00572                         inhibit_data_for_failed_table(AH, te);
00573                 }
00574             }
00575             else
00576             {
00577                 /*
00578                  * We created the table successfully.  Mark the corresponding
00579                  * TABLE DATA for possible truncation.
00580                  *
00581                  * In the parallel case this must be done in the parent, so we
00582                  * just set the return value.
00583                  */
00584                 if (is_parallel)
00585                     status = WORKER_CREATE_DONE;
00586                 else
00587                     mark_create_done(AH, te);
00588             }
00589         }
00590 
00591         /* If we created a DB, connect to it... */
00592         if (strcmp(te->desc, "DATABASE") == 0)
00593         {
00594             ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
00595             _reconnectToDB(AH, te->tag);
00596             ropt->dbname = pg_strdup(te->tag);
00597         }
00598     }
00599 
00600     /*
00601      * If we have a data component, then process it
00602      */
00603     if ((reqs & REQ_DATA) != 0)
00604     {
00605         /*
00606          * hadDumper will be set if there is genuine data component for this
00607          * node. Otherwise, we need to check the defn field for statements
00608          * that need to be executed in data-only restores.
00609          */
00610         if (te->hadDumper)
00611         {
00612             /*
00613              * If we can output the data, then restore it.
00614              */
00615             if (AH->PrintTocDataPtr !=NULL)
00616             {
00617                 _printTocEntry(AH, te, ropt, true, false);
00618 
00619                 if (strcmp(te->desc, "BLOBS") == 0 ||
00620                     strcmp(te->desc, "BLOB COMMENTS") == 0)
00621                 {
00622                     ahlog(AH, 1, "processing %s\n", te->desc);
00623 
00624                     _selectOutputSchema(AH, "pg_catalog");
00625 
00626                     (*AH->PrintTocDataPtr) (AH, te, ropt);
00627                 }
00628                 else
00629                 {
00630                     _disableTriggersIfNecessary(AH, te, ropt);
00631 
00632                     /* Select owner and schema as necessary */
00633                     _becomeOwner(AH, te);
00634                     _selectOutputSchema(AH, te->namespace);
00635 
00636                     ahlog(AH, 1, "processing data for table \"%s\"\n",
00637                           te->tag);
00638 
00639                     /*
00640                      * In parallel restore, if we created the table earlier in
00641                      * the run then we wrap the COPY in a transaction and
00642                      * precede it with a TRUNCATE.  If archiving is not on
00643                      * this prevents WAL-logging the COPY.  This obtains a
00644                      * speedup similar to that from using single_txn mode in
00645                      * non-parallel restores.
00646                      */
00647                     if (is_parallel && te->created)
00648                     {
00649                         /*
00650                          * Parallel restore is always talking directly to a
00651                          * server, so no need to see if we should issue BEGIN.
00652                          */
00653                         StartTransaction(AH);
00654 
00655                         /*
00656                          * If the server version is >= 8.4, make sure we issue
00657                          * TRUNCATE with ONLY so that child tables are not
00658                          * wiped.
00659                          */
00660                         ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
00661                                  (PQserverVersion(AH->connection) >= 80400 ?
00662                                   "ONLY " : ""),
00663                                  fmtId(te->tag));
00664                     }
00665 
00666                     /*
00667                      * If we have a copy statement, use it.
00668                      */
00669                     if (te->copyStmt && strlen(te->copyStmt) > 0)
00670                     {
00671                         ahprintf(AH, "%s", te->copyStmt);
00672                         AH->outputKind = OUTPUT_COPYDATA;
00673                     }
00674                     else
00675                         AH->outputKind = OUTPUT_OTHERDATA;
00676 
00677                     (*AH->PrintTocDataPtr) (AH, te, ropt);
00678 
00679                     /*
00680                      * Terminate COPY if needed.
00681                      */
00682                     if (AH->outputKind == OUTPUT_COPYDATA &&
00683                         RestoringToDB(AH))
00684                         EndDBCopyMode(AH, te);
00685                     AH->outputKind = OUTPUT_SQLCMDS;
00686 
00687                     /* close out the transaction started above */
00688                     if (is_parallel && te->created)
00689                         CommitTransaction(AH);
00690 
00691                     _enableTriggersIfNecessary(AH, te, ropt);
00692                 }
00693             }
00694         }
00695         else if (!defnDumped)
00696         {
00697             /* If we haven't already dumped the defn part, do so now */
00698             ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
00699             _printTocEntry(AH, te, ropt, false, false);
00700         }
00701     }
00702 
00703     if (AH->public.n_errors > 0 && status == WORKER_OK)
00704         status = WORKER_IGNORED_ERRORS;
00705 
00706     return status;
00707 }
00708 
00709 /*
00710  * Allocate a new RestoreOptions block.
00711  * This is mainly so we can initialize it, but also for future expansion,
00712  */
00713 RestoreOptions *
00714 NewRestoreOptions(void)
00715 {
00716     RestoreOptions *opts;
00717 
00718     opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
00719 
00720     /* set any fields that shouldn't default to zeroes */
00721     opts->format = archUnknown;
00722     opts->promptPassword = TRI_DEFAULT;
00723     opts->dumpSections = DUMP_UNSECTIONED;
00724 
00725     return opts;
00726 }
00727 
00728 static void
00729 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
00730 {
00731     /* This hack is only needed in a data-only restore */
00732     if (!ropt->dataOnly || !ropt->disable_triggers)
00733         return;
00734 
00735     ahlog(AH, 1, "disabling triggers for %s\n", te->tag);
00736 
00737     /*
00738      * Become superuser if possible, since they are the only ones who can
00739      * disable constraint triggers.  If -S was not given, assume the initial
00740      * user identity is a superuser.  (XXX would it be better to become the
00741      * table owner?)
00742      */
00743     _becomeUser(AH, ropt->superuser);
00744 
00745     /*
00746      * Disable them.
00747      */
00748     _selectOutputSchema(AH, te->namespace);
00749 
00750     ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
00751              fmtId(te->tag));
00752 }
00753 
00754 static void
00755 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
00756 {
00757     /* This hack is only needed in a data-only restore */
00758     if (!ropt->dataOnly || !ropt->disable_triggers)
00759         return;
00760 
00761     ahlog(AH, 1, "enabling triggers for %s\n", te->tag);
00762 
00763     /*
00764      * Become superuser if possible, since they are the only ones who can
00765      * disable constraint triggers.  If -S was not given, assume the initial
00766      * user identity is a superuser.  (XXX would it be better to become the
00767      * table owner?)
00768      */
00769     _becomeUser(AH, ropt->superuser);
00770 
00771     /*
00772      * Enable them.
00773      */
00774     _selectOutputSchema(AH, te->namespace);
00775 
00776     ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
00777              fmtId(te->tag));
00778 }
00779 
00780 /*
00781  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
00782  */
00783 
00784 /* Public */
00785 size_t
00786 WriteData(Archive *AHX, const void *data, size_t dLen)
00787 {
00788     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00789 
00790     if (!AH->currToc)
00791         exit_horribly(modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
00792 
00793     return (*AH->WriteDataPtr) (AH, data, dLen);
00794 }
00795 
00796 /*
00797  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
00798  * repository for all metadata. But the name has stuck.
00799  */
00800 
00801 /* Public */
00802 void
00803 ArchiveEntry(Archive *AHX,
00804              CatalogId catalogId, DumpId dumpId,
00805              const char *tag,
00806              const char *namespace,
00807              const char *tablespace,
00808              const char *owner, bool withOids,
00809              const char *desc, teSection section,
00810              const char *defn,
00811              const char *dropStmt, const char *copyStmt,
00812              const DumpId *deps, int nDeps,
00813              DataDumperPtr dumpFn, void *dumpArg)
00814 {
00815     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00816     TocEntry   *newToc;
00817 
00818     newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
00819 
00820     AH->tocCount++;
00821     if (dumpId > AH->maxDumpId)
00822         AH->maxDumpId = dumpId;
00823 
00824     newToc->prev = AH->toc->prev;
00825     newToc->next = AH->toc;
00826     AH->toc->prev->next = newToc;
00827     AH->toc->prev = newToc;
00828 
00829     newToc->catalogId = catalogId;
00830     newToc->dumpId = dumpId;
00831     newToc->section = section;
00832 
00833     newToc->tag = pg_strdup(tag);
00834     newToc->namespace = namespace ? pg_strdup(namespace) : NULL;
00835     newToc->tablespace = tablespace ? pg_strdup(tablespace) : NULL;
00836     newToc->owner = pg_strdup(owner);
00837     newToc->withOids = withOids;
00838     newToc->desc = pg_strdup(desc);
00839     newToc->defn = pg_strdup(defn);
00840     newToc->dropStmt = pg_strdup(dropStmt);
00841     newToc->copyStmt = copyStmt ? pg_strdup(copyStmt) : NULL;
00842 
00843     if (nDeps > 0)
00844     {
00845         newToc->dependencies = (DumpId *) pg_malloc(nDeps * sizeof(DumpId));
00846         memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
00847         newToc->nDeps = nDeps;
00848     }
00849     else
00850     {
00851         newToc->dependencies = NULL;
00852         newToc->nDeps = 0;
00853     }
00854 
00855     newToc->dataDumper = dumpFn;
00856     newToc->dataDumperArg = dumpArg;
00857     newToc->hadDumper = dumpFn ? true : false;
00858 
00859     newToc->formatData = NULL;
00860 
00861     if (AH->ArchiveEntryPtr !=NULL)
00862         (*AH->ArchiveEntryPtr) (AH, newToc);
00863 }
00864 
00865 /* Public */
00866 void
00867 PrintTOCSummary(Archive *AHX, RestoreOptions *ropt)
00868 {
00869     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00870     TocEntry   *te;
00871     teSection   curSection;
00872     OutputContext sav;
00873     const char *fmtName;
00874 
00875     sav = SaveOutput(AH);
00876     if (ropt->filename)
00877         SetOutput(AH, ropt->filename, 0 /* no compression */ );
00878 
00879     ahprintf(AH, ";\n; Archive created at %s", ctime(&AH->createDate));
00880     ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
00881              AH->archdbname, AH->tocCount, AH->compression);
00882 
00883     switch (AH->format)
00884     {
00885         case archCustom:
00886             fmtName = "CUSTOM";
00887             break;
00888         case archTar:
00889             fmtName = "TAR";
00890             break;
00891         default:
00892             fmtName = "UNKNOWN";
00893     }
00894 
00895     ahprintf(AH, ";     Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
00896     ahprintf(AH, ";     Format: %s\n", fmtName);
00897     ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
00898     ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
00899     if (AH->archiveRemoteVersion)
00900         ahprintf(AH, ";     Dumped from database version: %s\n",
00901                  AH->archiveRemoteVersion);
00902     if (AH->archiveDumpVersion)
00903         ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
00904                  AH->archiveDumpVersion);
00905 
00906     ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
00907 
00908     curSection = SECTION_PRE_DATA;
00909     for (te = AH->toc->next; te != AH->toc; te = te->next)
00910     {
00911         if (te->section != SECTION_NONE)
00912             curSection = te->section;
00913         if (ropt->verbose ||
00914             (_tocEntryRequired(te, curSection, ropt) & (REQ_SCHEMA | REQ_DATA)) != 0)
00915             ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
00916                      te->catalogId.tableoid, te->catalogId.oid,
00917                      te->desc, te->namespace ? te->namespace : "-",
00918                      te->tag, te->owner);
00919         if (ropt->verbose && te->nDeps > 0)
00920         {
00921             int         i;
00922 
00923             ahprintf(AH, ";\tdepends on:");
00924             for (i = 0; i < te->nDeps; i++)
00925                 ahprintf(AH, " %d", te->dependencies[i]);
00926             ahprintf(AH, "\n");
00927         }
00928     }
00929 
00930     if (ropt->filename)
00931         RestoreOutput(AH, sav);
00932 }
00933 
00934 /***********
00935  * BLOB Archival
00936  ***********/
00937 
00938 /* Called by a dumper to signal start of a BLOB */
00939 int
00940 StartBlob(Archive *AHX, Oid oid)
00941 {
00942     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00943 
00944     if (!AH->StartBlobPtr)
00945         exit_horribly(modulename, "large-object output not supported in chosen format\n");
00946 
00947     (*AH->StartBlobPtr) (AH, AH->currToc, oid);
00948 
00949     return 1;
00950 }
00951 
00952 /* Called by a dumper to signal end of a BLOB */
00953 int
00954 EndBlob(Archive *AHX, Oid oid)
00955 {
00956     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00957 
00958     if (AH->EndBlobPtr)
00959         (*AH->EndBlobPtr) (AH, AH->currToc, oid);
00960 
00961     return 1;
00962 }
00963 
00964 /**********
00965  * BLOB Restoration
00966  **********/
00967 
00968 /*
00969  * Called by a format handler before any blobs are restored
00970  */
00971 void
00972 StartRestoreBlobs(ArchiveHandle *AH)
00973 {
00974     if (!AH->ropt->single_txn)
00975     {
00976         if (AH->connection)
00977             StartTransaction(AH);
00978         else
00979             ahprintf(AH, "BEGIN;\n\n");
00980     }
00981 
00982     AH->blobCount = 0;
00983 }
00984 
00985 /*
00986  * Called by a format handler after all blobs are restored
00987  */
00988 void
00989 EndRestoreBlobs(ArchiveHandle *AH)
00990 {
00991     if (!AH->ropt->single_txn)
00992     {
00993         if (AH->connection)
00994             CommitTransaction(AH);
00995         else
00996             ahprintf(AH, "COMMIT;\n\n");
00997     }
00998 
00999     ahlog(AH, 1, ngettext("restored %d large object\n",
01000                           "restored %d large objects\n",
01001                           AH->blobCount),
01002           AH->blobCount);
01003 }
01004 
01005 
01006 /*
01007  * Called by a format handler to initiate restoration of a blob
01008  */
01009 void
01010 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
01011 {
01012     bool        old_blob_style = (AH->version < K_VERS_1_12);
01013     Oid         loOid;
01014 
01015     AH->blobCount++;
01016 
01017     /* Initialize the LO Buffer */
01018     AH->lo_buf_used = 0;
01019 
01020     ahlog(AH, 1, "restoring large object with OID %u\n", oid);
01021 
01022     /* With an old archive we must do drop and create logic here */
01023     if (old_blob_style && drop)
01024         DropBlobIfExists(AH, oid);
01025 
01026     if (AH->connection)
01027     {
01028         if (old_blob_style)
01029         {
01030             loOid = lo_create(AH->connection, oid);
01031             if (loOid == 0 || loOid != oid)
01032                 exit_horribly(modulename, "could not create large object %u: %s",
01033                               oid, PQerrorMessage(AH->connection));
01034         }
01035         AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
01036         if (AH->loFd == -1)
01037             exit_horribly(modulename, "could not open large object %u: %s",
01038                           oid, PQerrorMessage(AH->connection));
01039     }
01040     else
01041     {
01042         if (old_blob_style)
01043             ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
01044                      oid, INV_WRITE);
01045         else
01046             ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
01047                      oid, INV_WRITE);
01048     }
01049 
01050     AH->writingBlob = 1;
01051 }
01052 
01053 void
01054 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
01055 {
01056     if (AH->lo_buf_used > 0)
01057     {
01058         /* Write remaining bytes from the LO buffer */
01059         dump_lo_buf(AH);
01060     }
01061 
01062     AH->writingBlob = 0;
01063 
01064     if (AH->connection)
01065     {
01066         lo_close(AH->connection, AH->loFd);
01067         AH->loFd = -1;
01068     }
01069     else
01070     {
01071         ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
01072     }
01073 }
01074 
01075 /***********
01076  * Sorting and Reordering
01077  ***********/
01078 
01079 void
01080 SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
01081 {
01082     ArchiveHandle *AH = (ArchiveHandle *) AHX;
01083     FILE       *fh;
01084     char        buf[100];
01085     bool        incomplete_line;
01086 
01087     /* Allocate space for the 'wanted' array, and init it */
01088     ropt->idWanted = (bool *) pg_malloc(sizeof(bool) * AH->maxDumpId);
01089     memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
01090 
01091     /* Setup the file */
01092     fh = fopen(ropt->tocFile, PG_BINARY_R);
01093     if (!fh)
01094         exit_horribly(modulename, "could not open TOC file \"%s\": %s\n",
01095                       ropt->tocFile, strerror(errno));
01096 
01097     incomplete_line = false;
01098     while (fgets(buf, sizeof(buf), fh) != NULL)
01099     {
01100         bool        prev_incomplete_line = incomplete_line;
01101         int         buflen;
01102         char       *cmnt;
01103         char       *endptr;
01104         DumpId      id;
01105         TocEntry   *te;
01106 
01107         /*
01108          * Some lines in the file might be longer than sizeof(buf).  This is
01109          * no problem, since we only care about the leading numeric ID which
01110          * can be at most a few characters; but we have to skip continuation
01111          * bufferloads when processing a long line.
01112          */
01113         buflen = strlen(buf);
01114         if (buflen > 0 && buf[buflen - 1] == '\n')
01115             incomplete_line = false;
01116         else
01117             incomplete_line = true;
01118         if (prev_incomplete_line)
01119             continue;
01120 
01121         /* Truncate line at comment, if any */
01122         cmnt = strchr(buf, ';');
01123         if (cmnt != NULL)
01124             cmnt[0] = '\0';
01125 
01126         /* Ignore if all blank */
01127         if (strspn(buf, " \t\r\n") == strlen(buf))
01128             continue;
01129 
01130         /* Get an ID, check it's valid and not already seen */
01131         id = strtol(buf, &endptr, 10);
01132         if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
01133             ropt->idWanted[id - 1])
01134         {
01135             write_msg(modulename, "WARNING: line ignored: %s\n", buf);
01136             continue;
01137         }
01138 
01139         /* Find TOC entry */
01140         te = getTocEntryByDumpId(AH, id);
01141         if (!te)
01142             exit_horribly(modulename, "could not find entry for ID %d\n",
01143                           id);
01144 
01145         /* Mark it wanted */
01146         ropt->idWanted[id - 1] = true;
01147 
01148         /*
01149          * Move each item to the end of the list as it is selected, so that
01150          * they are placed in the desired order.  Any unwanted items will end
01151          * up at the front of the list, which may seem unintuitive but it's
01152          * what we need.  In an ordinary serial restore that makes no
01153          * difference, but in a parallel restore we need to mark unrestored
01154          * items' dependencies as satisfied before we start examining
01155          * restorable items.  Otherwise they could have surprising
01156          * side-effects on the order in which restorable items actually get
01157          * restored.
01158          */
01159         _moveBefore(AH, AH->toc, te);
01160     }
01161 
01162     if (fclose(fh) != 0)
01163         exit_horribly(modulename, "could not close TOC file: %s\n",
01164                       strerror(errno));
01165 }
01166 
01167 /**********************
01168  * 'Convenience functions that look like standard IO functions
01169  * for writing data when in dump mode.
01170  **********************/
01171 
01172 /* Public */
01173 int
01174 archputs(const char *s, Archive *AH)
01175 {
01176     return WriteData(AH, s, strlen(s));
01177 }
01178 
01179 /* Public */
01180 int
01181 archprintf(Archive *AH, const char *fmt,...)
01182 {
01183     char       *p = NULL;
01184     va_list     ap;
01185     int         bSize = strlen(fmt) + 256;
01186     int         cnt = -1;
01187 
01188     /*
01189      * This is paranoid: deal with the possibility that vsnprintf is willing
01190      * to ignore trailing null or returns > 0 even if string does not fit. It
01191      * may be the case that it returns cnt = bufsize
01192      */
01193     while (cnt < 0 || cnt >= (bSize - 1))
01194     {
01195         if (p != NULL)
01196             free(p);
01197         bSize *= 2;
01198         p = (char *) pg_malloc(bSize);
01199         va_start(ap, fmt);
01200         cnt = vsnprintf(p, bSize, fmt, ap);
01201         va_end(ap);
01202     }
01203     WriteData(AH, p, cnt);
01204     free(p);
01205     return cnt;
01206 }
01207 
01208 
01209 /*******************************
01210  * Stuff below here should be 'private' to the archiver routines
01211  *******************************/
01212 
01213 static void
01214 SetOutput(ArchiveHandle *AH, const char *filename, int compression)
01215 {
01216     int         fn;
01217 
01218     if (filename)
01219         fn = -1;
01220     else if (AH->FH)
01221         fn = fileno(AH->FH);
01222     else if (AH->fSpec)
01223     {
01224         fn = -1;
01225         filename = AH->fSpec;
01226     }
01227     else
01228         fn = fileno(stdout);
01229 
01230     /* If compression explicitly requested, use gzopen */
01231 #ifdef HAVE_LIBZ
01232     if (compression != 0)
01233     {
01234         char        fmode[10];
01235 
01236         /* Don't use PG_BINARY_x since this is zlib */
01237         sprintf(fmode, "wb%d", compression);
01238         if (fn >= 0)
01239             AH->OF = gzdopen(dup(fn), fmode);
01240         else
01241             AH->OF = gzopen(filename, fmode);
01242         AH->gzOut = 1;
01243     }
01244     else
01245 #endif
01246     {                           /* Use fopen */
01247         if (AH->mode == archModeAppend)
01248         {
01249             if (fn >= 0)
01250                 AH->OF = fdopen(dup(fn), PG_BINARY_A);
01251             else
01252                 AH->OF = fopen(filename, PG_BINARY_A);
01253         }
01254         else
01255         {
01256             if (fn >= 0)
01257                 AH->OF = fdopen(dup(fn), PG_BINARY_W);
01258             else
01259                 AH->OF = fopen(filename, PG_BINARY_W);
01260         }
01261         AH->gzOut = 0;
01262     }
01263 
01264     if (!AH->OF)
01265     {
01266         if (filename)
01267             exit_horribly(modulename, "could not open output file \"%s\": %s\n",
01268                           filename, strerror(errno));
01269         else
01270             exit_horribly(modulename, "could not open output file: %s\n",
01271                           strerror(errno));
01272     }
01273 }
01274 
01275 static OutputContext
01276 SaveOutput(ArchiveHandle *AH)
01277 {
01278     OutputContext sav;
01279 
01280     sav.OF = AH->OF;
01281     sav.gzOut = AH->gzOut;
01282 
01283     return sav;
01284 }
01285 
01286 static void
01287 RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
01288 {
01289     int         res;
01290 
01291     if (AH->gzOut)
01292         res = GZCLOSE(AH->OF);
01293     else
01294         res = fclose(AH->OF);
01295 
01296     if (res != 0)
01297         exit_horribly(modulename, "could not close output file: %s\n",
01298                       strerror(errno));
01299 
01300     AH->gzOut = savedContext.gzOut;
01301     AH->OF = savedContext.OF;
01302 }
01303 
01304 
01305 
01306 /*
01307  *  Print formatted text to the output file (usually stdout).
01308  */
01309 int
01310 ahprintf(ArchiveHandle *AH, const char *fmt,...)
01311 {
01312     char       *p = NULL;
01313     va_list     ap;
01314     int         bSize = strlen(fmt) + 256;      /* Usually enough */
01315     int         cnt = -1;
01316 
01317     /*
01318      * This is paranoid: deal with the possibility that vsnprintf is willing
01319      * to ignore trailing null or returns > 0 even if string does not fit. It
01320      * may be the case that it returns cnt = bufsize.
01321      */
01322     while (cnt < 0 || cnt >= (bSize - 1))
01323     {
01324         if (p != NULL)
01325             free(p);
01326         bSize *= 2;
01327         p = (char *) pg_malloc(bSize);
01328         va_start(ap, fmt);
01329         cnt = vsnprintf(p, bSize, fmt, ap);
01330         va_end(ap);
01331     }
01332     ahwrite(p, 1, cnt, AH);
01333     free(p);
01334     return cnt;
01335 }
01336 
01337 void
01338 ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
01339 {
01340     va_list     ap;
01341 
01342     if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
01343         return;
01344 
01345     va_start(ap, fmt);
01346     vwrite_msg(NULL, fmt, ap);
01347     va_end(ap);
01348 }
01349 
01350 /*
01351  * Single place for logic which says 'We are restoring to a direct DB connection'.
01352  */
01353 static int
01354 RestoringToDB(ArchiveHandle *AH)
01355 {
01356     return (AH->ropt && AH->ropt->useDB && AH->connection);
01357 }
01358 
01359 /*
01360  * Dump the current contents of the LO data buffer while writing a BLOB
01361  */
01362 static void
01363 dump_lo_buf(ArchiveHandle *AH)
01364 {
01365     if (AH->connection)
01366     {
01367         size_t      res;
01368 
01369         res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
01370         ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
01371                      "wrote %lu bytes of large object data (result = %lu)\n",
01372                               AH->lo_buf_used),
01373               (unsigned long) AH->lo_buf_used, (unsigned long) res);
01374         if (res != AH->lo_buf_used)
01375             exit_horribly(modulename,
01376             "could not write to large object (result: %lu, expected: %lu)\n",
01377                        (unsigned long) res, (unsigned long) AH->lo_buf_used);
01378     }
01379     else
01380     {
01381         PQExpBuffer buf = createPQExpBuffer();
01382 
01383         appendByteaLiteralAHX(buf,
01384                               (const unsigned char *) AH->lo_buf,
01385                               AH->lo_buf_used,
01386                               AH);
01387 
01388         /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
01389         AH->writingBlob = 0;
01390         ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
01391         AH->writingBlob = 1;
01392 
01393         destroyPQExpBuffer(buf);
01394     }
01395     AH->lo_buf_used = 0;
01396 }
01397 
01398 
01399 /*
01400  *  Write buffer to the output file (usually stdout). This is used for
01401  *  outputting 'restore' scripts etc. It is even possible for an archive
01402  *  format to create a custom output routine to 'fake' a restore if it
01403  *  wants to generate a script (see TAR output).
01404  */
01405 int
01406 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
01407 {
01408     size_t      res;
01409 
01410     if (AH->writingBlob)
01411     {
01412         size_t      remaining = size * nmemb;
01413 
01414         while (AH->lo_buf_used + remaining > AH->lo_buf_size)
01415         {
01416             size_t      avail = AH->lo_buf_size - AH->lo_buf_used;
01417 
01418             memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
01419             ptr = (const void *) ((const char *) ptr + avail);
01420             remaining -= avail;
01421             AH->lo_buf_used += avail;
01422             dump_lo_buf(AH);
01423         }
01424 
01425         memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
01426         AH->lo_buf_used += remaining;
01427 
01428         return size * nmemb;
01429     }
01430     else if (AH->gzOut)
01431     {
01432         res = GZWRITE(ptr, size, nmemb, AH->OF);
01433         if (res != (nmemb * size))
01434             exit_horribly(modulename, "could not write to output file: %s\n", strerror(errno));
01435         return res;
01436     }
01437     else if (AH->CustomOutPtr)
01438     {
01439         res = AH->CustomOutPtr (AH, ptr, size * nmemb);
01440 
01441         if (res != (nmemb * size))
01442             exit_horribly(modulename, "could not write to custom output routine\n");
01443         return res;
01444     }
01445     else
01446     {
01447         /*
01448          * If we're doing a restore, and it's direct to DB, and we're
01449          * connected then send it to the DB.
01450          */
01451         if (RestoringToDB(AH))
01452             return ExecuteSqlCommandBuf(AH, (const char *) ptr, size * nmemb);
01453         else
01454         {
01455             res = fwrite(ptr, size, nmemb, AH->OF);
01456             if (res != nmemb)
01457                 exit_horribly(modulename, "could not write to output file: %s\n",
01458                               strerror(errno));
01459             return res;
01460         }
01461     }
01462 }
01463 
01464 /* on some error, we may decide to go on... */
01465 void
01466 warn_or_exit_horribly(ArchiveHandle *AH,
01467                       const char *modulename, const char *fmt,...)
01468 {
01469     va_list     ap;
01470 
01471     switch (AH->stage)
01472     {
01473 
01474         case STAGE_NONE:
01475             /* Do nothing special */
01476             break;
01477 
01478         case STAGE_INITIALIZING:
01479             if (AH->stage != AH->lastErrorStage)
01480                 write_msg(modulename, "Error while INITIALIZING:\n");
01481             break;
01482 
01483         case STAGE_PROCESSING:
01484             if (AH->stage != AH->lastErrorStage)
01485                 write_msg(modulename, "Error while PROCESSING TOC:\n");
01486             break;
01487 
01488         case STAGE_FINALIZING:
01489             if (AH->stage != AH->lastErrorStage)
01490                 write_msg(modulename, "Error while FINALIZING:\n");
01491             break;
01492     }
01493     if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
01494     {
01495         write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
01496                   AH->currentTE->dumpId,
01497              AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid,
01498               AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner);
01499     }
01500     AH->lastErrorStage = AH->stage;
01501     AH->lastErrorTE = AH->currentTE;
01502 
01503     va_start(ap, fmt);
01504     vwrite_msg(modulename, fmt, ap);
01505     va_end(ap);
01506 
01507     if (AH->public.exit_on_error)
01508         exit_nicely(1);
01509     else
01510         AH->public.n_errors++;
01511 }
01512 
01513 #ifdef NOT_USED
01514 
01515 static void
01516 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
01517 {
01518     /* Unlink te from list */
01519     te->prev->next = te->next;
01520     te->next->prev = te->prev;
01521 
01522     /* and insert it after "pos" */
01523     te->prev = pos;
01524     te->next = pos->next;
01525     pos->next->prev = te;
01526     pos->next = te;
01527 }
01528 #endif
01529 
01530 static void
01531 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
01532 {
01533     /* Unlink te from list */
01534     te->prev->next = te->next;
01535     te->next->prev = te->prev;
01536 
01537     /* and insert it before "pos" */
01538     te->prev = pos->prev;
01539     te->next = pos;
01540     pos->prev->next = te;
01541     pos->prev = te;
01542 }
01543 
01544 /*
01545  * Build index arrays for the TOC list
01546  *
01547  * This should be invoked only after we have created or read in all the TOC
01548  * items.
01549  *
01550  * The arrays are indexed by dump ID (so entry zero is unused).  Note that the
01551  * array entries run only up to maxDumpId.  We might see dependency dump IDs
01552  * beyond that (if the dump was partial); so always check the array bound
01553  * before trying to touch an array entry.
01554  */
01555 static void
01556 buildTocEntryArrays(ArchiveHandle *AH)
01557 {
01558     DumpId      maxDumpId = AH->maxDumpId;
01559     TocEntry   *te;
01560 
01561     AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
01562     AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
01563 
01564     for (te = AH->toc->next; te != AH->toc; te = te->next)
01565     {
01566         /* this check is purely paranoia, maxDumpId should be correct */
01567         if (te->dumpId <= 0 || te->dumpId > maxDumpId)
01568             exit_horribly(modulename, "bad dumpId\n");
01569 
01570         /* tocsByDumpId indexes all TOCs by their dump ID */
01571         AH->tocsByDumpId[te->dumpId] = te;
01572 
01573         /*
01574          * tableDataId provides the TABLE DATA item's dump ID for each TABLE
01575          * TOC entry that has a DATA item.  We compute this by reversing the
01576          * TABLE DATA item's dependency, knowing that a TABLE DATA item has
01577          * just one dependency and it is the TABLE item.
01578          */
01579         if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
01580         {
01581             DumpId      tableId = te->dependencies[0];
01582 
01583             /*
01584              * The TABLE item might not have been in the archive, if this was
01585              * a data-only dump; but its dump ID should be less than its data
01586              * item's dump ID, so there should be a place for it in the array.
01587              */
01588             if (tableId <= 0 || tableId > maxDumpId)
01589                 exit_horribly(modulename, "bad table dumpId for TABLE DATA item\n");
01590 
01591             AH->tableDataId[tableId] = te->dumpId;
01592         }
01593     }
01594 }
01595 
01596 TocEntry *
01597 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
01598 {
01599     /* build index arrays if we didn't already */
01600     if (AH->tocsByDumpId == NULL)
01601         buildTocEntryArrays(AH);
01602 
01603     if (id > 0 && id <= AH->maxDumpId)
01604         return AH->tocsByDumpId[id];
01605 
01606     return NULL;
01607 }
01608 
01609 teReqs
01610 TocIDRequired(ArchiveHandle *AH, DumpId id)
01611 {
01612     TocEntry   *te = getTocEntryByDumpId(AH, id);
01613 
01614     if (!te)
01615         return 0;
01616 
01617     return te->reqs;
01618 }
01619 
01620 size_t
01621 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
01622 {
01623     int         off;
01624 
01625     /* Save the flag */
01626     (*AH->WriteBytePtr) (AH, wasSet);
01627 
01628     /* Write out pgoff_t smallest byte first, prevents endian mismatch */
01629     for (off = 0; off < sizeof(pgoff_t); off++)
01630     {
01631         (*AH->WriteBytePtr) (AH, o & 0xFF);
01632         o >>= 8;
01633     }
01634     return sizeof(pgoff_t) + 1;
01635 }
01636 
01637 int
01638 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
01639 {
01640     int         i;
01641     int         off;
01642     int         offsetFlg;
01643 
01644     /* Initialize to zero */
01645     *o = 0;
01646 
01647     /* Check for old version */
01648     if (AH->version < K_VERS_1_7)
01649     {
01650         /* Prior versions wrote offsets using WriteInt */
01651         i = ReadInt(AH);
01652         /* -1 means not set */
01653         if (i < 0)
01654             return K_OFFSET_POS_NOT_SET;
01655         else if (i == 0)
01656             return K_OFFSET_NO_DATA;
01657 
01658         /* Cast to pgoff_t because it was written as an int. */
01659         *o = (pgoff_t) i;
01660         return K_OFFSET_POS_SET;
01661     }
01662 
01663     /*
01664      * Read the flag indicating the state of the data pointer. Check if valid
01665      * and die if not.
01666      *
01667      * This used to be handled by a negative or zero pointer, now we use an
01668      * extra byte specifically for the state.
01669      */
01670     offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;
01671 
01672     switch (offsetFlg)
01673     {
01674         case K_OFFSET_POS_NOT_SET:
01675         case K_OFFSET_NO_DATA:
01676         case K_OFFSET_POS_SET:
01677 
01678             break;
01679 
01680         default:
01681             exit_horribly(modulename, "unexpected data offset flag %d\n", offsetFlg);
01682     }
01683 
01684     /*
01685      * Read the bytes
01686      */
01687     for (off = 0; off < AH->offSize; off++)
01688     {
01689         if (off < sizeof(pgoff_t))
01690             *o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
01691         else
01692         {
01693             if ((*AH->ReadBytePtr) (AH) != 0)
01694                 exit_horribly(modulename, "file offset in dump file is too large\n");
01695         }
01696     }
01697 
01698     return offsetFlg;
01699 }
01700 
01701 size_t
01702 WriteInt(ArchiveHandle *AH, int i)
01703 {
01704     int         b;
01705 
01706     /*
01707      * This is a bit yucky, but I don't want to make the binary format very
01708      * dependent on representation, and not knowing much about it, I write out
01709      * a sign byte. If you change this, don't forget to change the file
01710      * version #, and modify readInt to read the new format AS WELL AS the old
01711      * formats.
01712      */
01713 
01714     /* SIGN byte */
01715     if (i < 0)
01716     {
01717         (*AH->WriteBytePtr) (AH, 1);
01718         i = -i;
01719     }
01720     else
01721         (*AH->WriteBytePtr) (AH, 0);
01722 
01723     for (b = 0; b < AH->intSize; b++)
01724     {
01725         (*AH->WriteBytePtr) (AH, i & 0xFF);
01726         i >>= 8;
01727     }
01728 
01729     return AH->intSize + 1;
01730 }
01731 
01732 int
01733 ReadInt(ArchiveHandle *AH)
01734 {
01735     int         res = 0;
01736     int         bv,
01737                 b;
01738     int         sign = 0;       /* Default positive */
01739     int         bitShift = 0;
01740 
01741     if (AH->version > K_VERS_1_0)
01742         /* Read a sign byte */
01743         sign = (*AH->ReadBytePtr) (AH);
01744 
01745     for (b = 0; b < AH->intSize; b++)
01746     {
01747         bv = (*AH->ReadBytePtr) (AH) & 0xFF;
01748         if (bv != 0)
01749             res = res + (bv << bitShift);
01750         bitShift += 8;
01751     }
01752 
01753     if (sign)
01754         res = -res;
01755 
01756     return res;
01757 }
01758 
01759 size_t
01760 WriteStr(ArchiveHandle *AH, const char *c)
01761 {
01762     size_t      res;
01763 
01764     if (c)
01765     {
01766         res = WriteInt(AH, strlen(c));
01767         res += (*AH->WriteBufPtr) (AH, c, strlen(c));
01768     }
01769     else
01770         res = WriteInt(AH, -1);
01771 
01772     return res;
01773 }
01774 
01775 char *
01776 ReadStr(ArchiveHandle *AH)
01777 {
01778     char       *buf;
01779     int         l;
01780 
01781     l = ReadInt(AH);
01782     if (l < 0)
01783         buf = NULL;
01784     else
01785     {
01786         buf = (char *) pg_malloc(l + 1);
01787         if ((*AH->ReadBufPtr) (AH, (void *) buf, l) != l)
01788             exit_horribly(modulename, "unexpected end of file\n");
01789 
01790         buf[l] = '\0';
01791     }
01792 
01793     return buf;
01794 }
01795 
01796 static int
01797 _discoverArchiveFormat(ArchiveHandle *AH)
01798 {
01799     FILE       *fh;
01800     char        sig[6];         /* More than enough */
01801     size_t      cnt;
01802     int         wantClose = 0;
01803 
01804 #if 0
01805     write_msg(modulename, "attempting to ascertain archive format\n");
01806 #endif
01807 
01808     if (AH->lookahead)
01809         free(AH->lookahead);
01810 
01811     AH->lookaheadSize = 512;
01812     AH->lookahead = pg_malloc0(512);
01813     AH->lookaheadLen = 0;
01814     AH->lookaheadPos = 0;
01815 
01816     if (AH->fSpec)
01817     {
01818         struct stat st;
01819 
01820         wantClose = 1;
01821 
01822         /*
01823          * Check if the specified archive is a directory. If so, check if
01824          * there's a "toc.dat" (or "toc.dat.gz") file in it.
01825          */
01826         if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
01827         {
01828             char        buf[MAXPGPATH];
01829 
01830             if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
01831                 exit_horribly(modulename, "directory name too long: \"%s\"\n",
01832                               AH->fSpec);
01833             if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
01834             {
01835                 AH->format = archDirectory;
01836                 return AH->format;
01837             }
01838 
01839 #ifdef HAVE_LIBZ
01840             if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
01841                 exit_horribly(modulename, "directory name too long: \"%s\"\n",
01842                               AH->fSpec);
01843             if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
01844             {
01845                 AH->format = archDirectory;
01846                 return AH->format;
01847             }
01848 #endif
01849             exit_horribly(modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n",
01850                           AH->fSpec);
01851             fh = NULL;          /* keep compiler quiet */
01852         }
01853         else
01854         {
01855             fh = fopen(AH->fSpec, PG_BINARY_R);
01856             if (!fh)
01857                 exit_horribly(modulename, "could not open input file \"%s\": %s\n",
01858                               AH->fSpec, strerror(errno));
01859         }
01860     }
01861     else
01862     {
01863         fh = stdin;
01864         if (!fh)
01865             exit_horribly(modulename, "could not open input file: %s\n",
01866                           strerror(errno));
01867     }
01868 
01869     cnt = fread(sig, 1, 5, fh);
01870 
01871     if (cnt != 5)
01872     {
01873         if (ferror(fh))
01874             exit_horribly(modulename, "could not read input file: %s\n", strerror(errno));
01875         else
01876             exit_horribly(modulename, "input file is too short (read %lu, expected 5)\n",
01877                           (unsigned long) cnt);
01878     }
01879 
01880     /* Save it, just in case we need it later */
01881     strncpy(&AH->lookahead[0], sig, 5);
01882     AH->lookaheadLen = 5;
01883 
01884     if (strncmp(sig, "PGDMP", 5) == 0)
01885     {
01886         /*
01887          * Finish reading (most of) a custom-format header.
01888          *
01889          * NB: this code must agree with ReadHead().
01890          */
01891         AH->vmaj = fgetc(fh);
01892         AH->vmin = fgetc(fh);
01893 
01894         /* Save these too... */
01895         AH->lookahead[AH->lookaheadLen++] = AH->vmaj;
01896         AH->lookahead[AH->lookaheadLen++] = AH->vmin;
01897 
01898         /* Check header version; varies from V1.0 */
01899         if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))        /* Version > 1.0 */
01900         {
01901             AH->vrev = fgetc(fh);
01902             AH->lookahead[AH->lookaheadLen++] = AH->vrev;
01903         }
01904         else
01905             AH->vrev = 0;
01906 
01907         /* Make a convenient integer <maj><min><rev>00 */
01908         AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
01909 
01910         AH->intSize = fgetc(fh);
01911         AH->lookahead[AH->lookaheadLen++] = AH->intSize;
01912 
01913         if (AH->version >= K_VERS_1_7)
01914         {
01915             AH->offSize = fgetc(fh);
01916             AH->lookahead[AH->lookaheadLen++] = AH->offSize;
01917         }
01918         else
01919             AH->offSize = AH->intSize;
01920 
01921         AH->format = fgetc(fh);
01922         AH->lookahead[AH->lookaheadLen++] = AH->format;
01923     }
01924     else
01925     {
01926         /*
01927          * *Maybe* we have a tar archive format file or a text dump ... So,
01928          * read first 512 byte header...
01929          */
01930         cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
01931         AH->lookaheadLen += cnt;
01932 
01933         if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
01934             (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
01935              strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
01936         {
01937             /*
01938              * looks like it's probably a text format dump. so suggest they
01939              * try psql
01940              */
01941             exit_horribly(modulename, "input file appears to be a text format dump. Please use psql.\n");
01942         }
01943 
01944         if (AH->lookaheadLen != 512)
01945             exit_horribly(modulename, "input file does not appear to be a valid archive (too short?)\n");
01946 
01947         if (!isValidTarHeader(AH->lookahead))
01948             exit_horribly(modulename, "input file does not appear to be a valid archive\n");
01949 
01950         AH->format = archTar;
01951     }
01952 
01953     /* If we can't seek, then mark the header as read */
01954     if (fseeko(fh, 0, SEEK_SET) != 0)
01955     {
01956         /*
01957          * NOTE: Formats that use the lookahead buffer can unset this in their
01958          * Init routine.
01959          */
01960         AH->readHeader = 1;
01961     }
01962     else
01963         AH->lookaheadLen = 0;   /* Don't bother since we've reset the file */
01964 
01965     /* Close the file */
01966     if (wantClose)
01967         if (fclose(fh) != 0)
01968             exit_horribly(modulename, "could not close input file: %s\n",
01969                           strerror(errno));
01970 
01971     return AH->format;
01972 }
01973 
01974 
01975 /*
01976  * Allocate an archive handle
01977  */
01978 static ArchiveHandle *
01979 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
01980       const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr)
01981 {
01982     ArchiveHandle *AH;
01983 
01984 #if 0
01985     write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
01986 #endif
01987 
01988     AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
01989 
01990     /* AH->debugLevel = 100; */
01991 
01992     AH->vmaj = K_VERS_MAJOR;
01993     AH->vmin = K_VERS_MINOR;
01994     AH->vrev = K_VERS_REV;
01995 
01996     /* Make a convenient integer <maj><min><rev>00 */
01997     AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
01998 
01999     /* initialize for backwards compatible string processing */
02000     AH->public.encoding = 0;    /* PG_SQL_ASCII */
02001     AH->public.std_strings = false;
02002 
02003     /* sql error handling */
02004     AH->public.exit_on_error = true;
02005     AH->public.n_errors = 0;
02006 
02007     AH->archiveDumpVersion = PG_VERSION;
02008 
02009     AH->createDate = time(NULL);
02010 
02011     AH->intSize = sizeof(int);
02012     AH->offSize = sizeof(pgoff_t);
02013     if (FileSpec)
02014     {
02015         AH->fSpec = pg_strdup(FileSpec);
02016 
02017         /*
02018          * Not used; maybe later....
02019          *
02020          * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
02021          * i--) if (AH->workDir[i-1] == '/')
02022          */
02023     }
02024     else
02025         AH->fSpec = NULL;
02026 
02027     AH->currUser = NULL;        /* unknown */
02028     AH->currSchema = NULL;      /* ditto */
02029     AH->currTablespace = NULL;  /* ditto */
02030     AH->currWithOids = -1;      /* force SET */
02031 
02032     AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
02033 
02034     AH->toc->next = AH->toc;
02035     AH->toc->prev = AH->toc;
02036 
02037     AH->mode = mode;
02038     AH->compression = compression;
02039 
02040     memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
02041 
02042     /* Open stdout with no compression for AH output handle */
02043     AH->gzOut = 0;
02044     AH->OF = stdout;
02045 
02046     /*
02047      * On Windows, we need to use binary mode to read/write non-text archive
02048      * formats.  Force stdin/stdout into binary mode if that is what we are
02049      * using.
02050      */
02051 #ifdef WIN32
02052     if (fmt != archNull &&
02053         (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
02054     {
02055         if (mode == archModeWrite)
02056             setmode(fileno(stdout), O_BINARY);
02057         else
02058             setmode(fileno(stdin), O_BINARY);
02059     }
02060 #endif
02061 
02062     AH->SetupWorkerPtr = setupWorkerPtr;
02063 
02064     if (fmt == archUnknown)
02065         AH->format = _discoverArchiveFormat(AH);
02066     else
02067         AH->format = fmt;
02068 
02069     AH->promptPassword = TRI_DEFAULT;
02070 
02071     switch (AH->format)
02072     {
02073         case archCustom:
02074             InitArchiveFmt_Custom(AH);
02075             break;
02076 
02077         case archNull:
02078             InitArchiveFmt_Null(AH);
02079             break;
02080 
02081         case archDirectory:
02082             InitArchiveFmt_Directory(AH);
02083             break;
02084 
02085         case archTar:
02086             InitArchiveFmt_Tar(AH);
02087             break;
02088 
02089         default:
02090             exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
02091     }
02092 
02093     return AH;
02094 }
02095 
02096 void
02097 WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
02098 {
02099     TocEntry   *te;
02100 
02101     for (te = AH->toc->next; te != AH->toc; te = te->next)
02102     {
02103         if (!te->dataDumper)
02104             continue;
02105 
02106         if ((te->reqs & REQ_DATA) == 0)
02107             continue;
02108 
02109         if (pstate && pstate->numWorkers > 1)
02110         {
02111             /*
02112              * If we are in a parallel backup, then we are always the master
02113              * process.
02114              */
02115             EnsureIdleWorker(AH, pstate);
02116             Assert(GetIdleWorker(pstate) != NO_SLOT);
02117             DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
02118         }
02119         else
02120             WriteDataChunksForTocEntry(AH, te);
02121     }
02122     EnsureWorkersFinished(AH, pstate);
02123 }
02124 
02125 void
02126 WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
02127 {
02128     StartDataPtr startPtr;
02129     EndDataPtr  endPtr;
02130 
02131     AH->currToc = te;
02132 
02133     if (strcmp(te->desc, "BLOBS") == 0)
02134     {
02135         startPtr = AH->StartBlobsPtr;
02136         endPtr = AH->EndBlobsPtr;
02137     }
02138     else
02139     {
02140         startPtr = AH->StartDataPtr;
02141         endPtr = AH->EndDataPtr;
02142     }
02143 
02144     if (startPtr != NULL)
02145         (*startPtr) (AH, te);
02146 
02147     /*
02148      * The user-provided DataDumper routine needs to call AH->WriteData
02149      */
02150     (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
02151 
02152     if (endPtr != NULL)
02153         (*endPtr) (AH, te);
02154 
02155     AH->currToc = NULL;
02156 }
02157 
02158 void
02159 WriteToc(ArchiveHandle *AH)
02160 {
02161     TocEntry   *te;
02162     char        workbuf[32];
02163     int         tocCount;
02164     int         i;
02165 
02166     /* count entries that will actually be dumped */
02167     tocCount = 0;
02168     for (te = AH->toc->next; te != AH->toc; te = te->next)
02169     {
02170         if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
02171             tocCount++;
02172     }
02173 
02174     /* printf("%d TOC Entries to save\n", tocCount); */
02175 
02176     WriteInt(AH, tocCount);
02177 
02178     for (te = AH->toc->next; te != AH->toc; te = te->next)
02179     {
02180         if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
02181             continue;
02182 
02183         WriteInt(AH, te->dumpId);
02184         WriteInt(AH, te->dataDumper ? 1 : 0);
02185 
02186         /* OID is recorded as a string for historical reasons */
02187         sprintf(workbuf, "%u", te->catalogId.tableoid);
02188         WriteStr(AH, workbuf);
02189         sprintf(workbuf, "%u", te->catalogId.oid);
02190         WriteStr(AH, workbuf);
02191 
02192         WriteStr(AH, te->tag);
02193         WriteStr(AH, te->desc);
02194         WriteInt(AH, te->section);
02195         WriteStr(AH, te->defn);
02196         WriteStr(AH, te->dropStmt);
02197         WriteStr(AH, te->copyStmt);
02198         WriteStr(AH, te->namespace);
02199         WriteStr(AH, te->tablespace);
02200         WriteStr(AH, te->owner);
02201         WriteStr(AH, te->withOids ? "true" : "false");
02202 
02203         /* Dump list of dependencies */
02204         for (i = 0; i < te->nDeps; i++)
02205         {
02206             sprintf(workbuf, "%d", te->dependencies[i]);
02207             WriteStr(AH, workbuf);
02208         }
02209         WriteStr(AH, NULL);     /* Terminate List */
02210 
02211         if (AH->WriteExtraTocPtr)
02212             (*AH->WriteExtraTocPtr) (AH, te);
02213     }
02214 }
02215 
02216 void
02217 ReadToc(ArchiveHandle *AH)
02218 {
02219     int         i;
02220     char       *tmp;
02221     DumpId     *deps;
02222     int         depIdx;
02223     int         depSize;
02224     TocEntry   *te;
02225 
02226     AH->tocCount = ReadInt(AH);
02227     AH->maxDumpId = 0;
02228 
02229     for (i = 0; i < AH->tocCount; i++)
02230     {
02231         te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
02232         te->dumpId = ReadInt(AH);
02233 
02234         if (te->dumpId > AH->maxDumpId)
02235             AH->maxDumpId = te->dumpId;
02236 
02237         /* Sanity check */
02238         if (te->dumpId <= 0)
02239             exit_horribly(modulename,
02240                        "entry ID %d out of range -- perhaps a corrupt TOC\n",
02241                           te->dumpId);
02242 
02243         te->hadDumper = ReadInt(AH);
02244 
02245         if (AH->version >= K_VERS_1_8)
02246         {
02247             tmp = ReadStr(AH);
02248             sscanf(tmp, "%u", &te->catalogId.tableoid);
02249             free(tmp);
02250         }
02251         else
02252             te->catalogId.tableoid = InvalidOid;
02253         tmp = ReadStr(AH);
02254         sscanf(tmp, "%u", &te->catalogId.oid);
02255         free(tmp);
02256 
02257         te->tag = ReadStr(AH);
02258         te->desc = ReadStr(AH);
02259 
02260         if (AH->version >= K_VERS_1_11)
02261         {
02262             te->section = ReadInt(AH);
02263         }
02264         else
02265         {
02266             /*
02267              * Rules for pre-8.4 archives wherein pg_dump hasn't classified
02268              * the entries into sections.  This list need not cover entry
02269              * types added later than 8.4.
02270              */
02271             if (strcmp(te->desc, "COMMENT") == 0 ||
02272                 strcmp(te->desc, "ACL") == 0 ||
02273                 strcmp(te->desc, "ACL LANGUAGE") == 0)
02274                 te->section = SECTION_NONE;
02275             else if (strcmp(te->desc, "TABLE DATA") == 0 ||
02276                      strcmp(te->desc, "BLOBS") == 0 ||
02277                      strcmp(te->desc, "BLOB COMMENTS") == 0)
02278                 te->section = SECTION_DATA;
02279             else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
02280                      strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
02281                      strcmp(te->desc, "FK CONSTRAINT") == 0 ||
02282                      strcmp(te->desc, "INDEX") == 0 ||
02283                      strcmp(te->desc, "RULE") == 0 ||
02284                      strcmp(te->desc, "TRIGGER") == 0)
02285                 te->section = SECTION_POST_DATA;
02286             else
02287                 te->section = SECTION_PRE_DATA;
02288         }
02289 
02290         te->defn = ReadStr(AH);
02291         te->dropStmt = ReadStr(AH);
02292 
02293         if (AH->version >= K_VERS_1_3)
02294             te->copyStmt = ReadStr(AH);
02295 
02296         if (AH->version >= K_VERS_1_6)
02297             te->namespace = ReadStr(AH);
02298 
02299         if (AH->version >= K_VERS_1_10)
02300             te->tablespace = ReadStr(AH);
02301 
02302         te->owner = ReadStr(AH);
02303         if (AH->version >= K_VERS_1_9)
02304         {
02305             if (strcmp(ReadStr(AH), "true") == 0)
02306                 te->withOids = true;
02307             else
02308                 te->withOids = false;
02309         }
02310         else
02311             te->withOids = true;
02312 
02313         /* Read TOC entry dependencies */
02314         if (AH->version >= K_VERS_1_5)
02315         {
02316             depSize = 100;
02317             deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
02318             depIdx = 0;
02319             for (;;)
02320             {
02321                 tmp = ReadStr(AH);
02322                 if (!tmp)
02323                     break;      /* end of list */
02324                 if (depIdx >= depSize)
02325                 {
02326                     depSize *= 2;
02327                     deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
02328                 }
02329                 sscanf(tmp, "%d", &deps[depIdx]);
02330                 free(tmp);
02331                 depIdx++;
02332             }
02333 
02334             if (depIdx > 0)     /* We have a non-null entry */
02335             {
02336                 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
02337                 te->dependencies = deps;
02338                 te->nDeps = depIdx;
02339             }
02340             else
02341             {
02342                 free(deps);
02343                 te->dependencies = NULL;
02344                 te->nDeps = 0;
02345             }
02346         }
02347         else
02348         {
02349             te->dependencies = NULL;
02350             te->nDeps = 0;
02351         }
02352 
02353         if (AH->ReadExtraTocPtr)
02354             (*AH->ReadExtraTocPtr) (AH, te);
02355 
02356         ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
02357               i, te->dumpId, te->desc, te->tag);
02358 
02359         /* link completed entry into TOC circular list */
02360         te->prev = AH->toc->prev;
02361         AH->toc->prev->next = te;
02362         AH->toc->prev = te;
02363         te->next = AH->toc;
02364 
02365         /* special processing immediately upon read for some items */
02366         if (strcmp(te->desc, "ENCODING") == 0)
02367             processEncodingEntry(AH, te);
02368         else if (strcmp(te->desc, "STDSTRINGS") == 0)
02369             processStdStringsEntry(AH, te);
02370     }
02371 }
02372 
02373 static void
02374 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
02375 {
02376     /* te->defn should have the form SET client_encoding = 'foo'; */
02377     char       *defn = pg_strdup(te->defn);
02378     char       *ptr1;
02379     char       *ptr2 = NULL;
02380     int         encoding;
02381 
02382     ptr1 = strchr(defn, '\'');
02383     if (ptr1)
02384         ptr2 = strchr(++ptr1, '\'');
02385     if (ptr2)
02386     {
02387         *ptr2 = '\0';
02388         encoding = pg_char_to_encoding(ptr1);
02389         if (encoding < 0)
02390             exit_horribly(modulename, "unrecognized encoding \"%s\"\n",
02391                           ptr1);
02392         AH->public.encoding = encoding;
02393     }
02394     else
02395         exit_horribly(modulename, "invalid ENCODING item: %s\n",
02396                       te->defn);
02397 
02398     free(defn);
02399 }
02400 
02401 static void
02402 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
02403 {
02404     /* te->defn should have the form SET standard_conforming_strings = 'x'; */
02405     char       *ptr1;
02406 
02407     ptr1 = strchr(te->defn, '\'');
02408     if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
02409         AH->public.std_strings = true;
02410     else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
02411         AH->public.std_strings = false;
02412     else
02413         exit_horribly(modulename, "invalid STDSTRINGS item: %s\n",
02414                       te->defn);
02415 }
02416 
02417 static teReqs
02418 _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt)
02419 {
02420     teReqs      res = REQ_SCHEMA | REQ_DATA;
02421 
02422     /* ENCODING and STDSTRINGS items are treated specially */
02423     if (strcmp(te->desc, "ENCODING") == 0 ||
02424         strcmp(te->desc, "STDSTRINGS") == 0)
02425         return REQ_SPECIAL;
02426 
02427     /* If it's an ACL, maybe ignore it */
02428     if (ropt->aclsSkip && _tocEntryIsACL(te))
02429         return 0;
02430 
02431     /* If it's security labels, maybe ignore it */
02432     if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
02433         return 0;
02434 
02435     /* Ignore it if section is not to be dumped/restored */
02436     switch (curSection)
02437     {
02438         case SECTION_PRE_DATA:
02439             if (!(ropt->dumpSections & DUMP_PRE_DATA))
02440                 return 0;
02441             break;
02442         case SECTION_DATA:
02443             if (!(ropt->dumpSections & DUMP_DATA))
02444                 return 0;
02445             break;
02446         case SECTION_POST_DATA:
02447             if (!(ropt->dumpSections & DUMP_POST_DATA))
02448                 return 0;
02449             break;
02450         default:
02451             /* shouldn't get here, really, but ignore it */
02452             return 0;
02453     }
02454 
02455     /* Check options for selective dump/restore */
02456     if (ropt->schemaNames)
02457     {
02458         /* If no namespace is specified, it means all. */
02459         if (!te->namespace)
02460             return 0;
02461         if (strcmp(ropt->schemaNames, te->namespace) != 0)
02462             return 0;
02463     }
02464 
02465     if (ropt->selTypes)
02466     {
02467         if (strcmp(te->desc, "TABLE") == 0 ||
02468             strcmp(te->desc, "TABLE DATA") == 0)
02469         {
02470             if (!ropt->selTable)
02471                 return 0;
02472             if (ropt->tableNames.head != NULL && (!(simple_string_list_member(&ropt->tableNames, te->tag))))
02473                 return 0;
02474         }
02475         else if (strcmp(te->desc, "INDEX") == 0)
02476         {
02477             if (!ropt->selIndex)
02478                 return 0;
02479             if (ropt->indexNames && strcmp(ropt->indexNames, te->tag) != 0)
02480                 return 0;
02481         }
02482         else if (strcmp(te->desc, "FUNCTION") == 0)
02483         {
02484             if (!ropt->selFunction)
02485                 return 0;
02486             if (ropt->functionNames && strcmp(ropt->functionNames, te->tag) != 0)
02487                 return 0;
02488         }
02489         else if (strcmp(te->desc, "TRIGGER") == 0)
02490         {
02491             if (!ropt->selTrigger)
02492                 return 0;
02493             if (ropt->triggerNames && strcmp(ropt->triggerNames, te->tag) != 0)
02494                 return 0;
02495         }
02496         else
02497             return 0;
02498     }
02499 
02500     /*
02501      * Check if we had a dataDumper. Indicates if the entry is schema or data
02502      */
02503     if (!te->hadDumper)
02504     {
02505         /*
02506          * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
02507          * it is considered a data entry.  We don't need to check for the
02508          * BLOBS entry or old-style BLOB COMMENTS, because they will have
02509          * hadDumper = true ... but we do need to check new-style BLOB
02510          * comments.
02511          */
02512         if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
02513             strcmp(te->desc, "BLOB") == 0 ||
02514             (strcmp(te->desc, "ACL") == 0 &&
02515              strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
02516             (strcmp(te->desc, "COMMENT") == 0 &&
02517              strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
02518             (strcmp(te->desc, "SECURITY LABEL") == 0 &&
02519              strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
02520             res = res & REQ_DATA;
02521         else
02522             res = res & ~REQ_DATA;
02523     }
02524 
02525     /*
02526      * Special case: <Init> type with <Max OID> tag; this is obsolete and we
02527      * always ignore it.
02528      */
02529     if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
02530         return 0;
02531 
02532     /* Mask it if we only want schema */
02533     if (ropt->schemaOnly)
02534         res = res & REQ_SCHEMA;
02535 
02536     /* Mask it if we only want data */
02537     if (ropt->dataOnly)
02538         res = res & REQ_DATA;
02539 
02540     /* Mask it if we don't have a schema contribution */
02541     if (!te->defn || strlen(te->defn) == 0)
02542         res = res & ~REQ_SCHEMA;
02543 
02544     /* Finally, if there's a per-ID filter, limit based on that as well */
02545     if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
02546         return 0;
02547 
02548     return res;
02549 }
02550 
02551 /*
02552  * Identify TOC entries that are ACLs.
02553  */
02554 static bool
02555 _tocEntryIsACL(TocEntry *te)
02556 {
02557     /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
02558     if (strcmp(te->desc, "ACL") == 0 ||
02559         strcmp(te->desc, "ACL LANGUAGE") == 0 ||
02560         strcmp(te->desc, "DEFAULT ACL") == 0)
02561         return true;
02562     return false;
02563 }
02564 
02565 /*
02566  * Issue SET commands for parameters that we want to have set the same way
02567  * at all times during execution of a restore script.
02568  */
02569 static void
02570 _doSetFixedOutputState(ArchiveHandle *AH)
02571 {
02572     /* Disable statement_timeout since restore is probably slow */
02573     ahprintf(AH, "SET statement_timeout = 0;\n");
02574 
02575     /* Likewise for lock_timeout */
02576     ahprintf(AH, "SET lock_timeout = 0;\n");
02577 
02578     /* Select the correct character set encoding */
02579     ahprintf(AH, "SET client_encoding = '%s';\n",
02580              pg_encoding_to_char(AH->public.encoding));
02581 
02582     /* Select the correct string literal syntax */
02583     ahprintf(AH, "SET standard_conforming_strings = %s;\n",
02584              AH->public.std_strings ? "on" : "off");
02585 
02586     /* Select the role to be used during restore */
02587     if (AH->ropt && AH->ropt->use_role)
02588         ahprintf(AH, "SET ROLE %s;\n", fmtId(AH->ropt->use_role));
02589 
02590     /* Make sure function checking is disabled */
02591     ahprintf(AH, "SET check_function_bodies = false;\n");
02592 
02593     /* Avoid annoying notices etc */
02594     ahprintf(AH, "SET client_min_messages = warning;\n");
02595     if (!AH->public.std_strings)
02596         ahprintf(AH, "SET escape_string_warning = off;\n");
02597 
02598     ahprintf(AH, "\n");
02599 }
02600 
02601 /*
02602  * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
02603  * for updating state if appropriate.  If user is NULL or an empty string,
02604  * the specification DEFAULT will be used.
02605  */
02606 static void
02607 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
02608 {
02609     PQExpBuffer cmd = createPQExpBuffer();
02610 
02611     appendPQExpBuffer(cmd, "SET SESSION AUTHORIZATION ");
02612 
02613     /*
02614      * SQL requires a string literal here.  Might as well be correct.
02615      */
02616     if (user && *user)
02617         appendStringLiteralAHX(cmd, user, AH);
02618     else
02619         appendPQExpBuffer(cmd, "DEFAULT");
02620     appendPQExpBuffer(cmd, ";");
02621 
02622     if (RestoringToDB(AH))
02623     {
02624         PGresult   *res;
02625 
02626         res = PQexec(AH->connection, cmd->data);
02627 
02628         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
02629             /* NOT warn_or_exit_horribly... use -O instead to skip this. */
02630             exit_horribly(modulename, "could not set session user to \"%s\": %s",
02631                           user, PQerrorMessage(AH->connection));
02632 
02633         PQclear(res);
02634     }
02635     else
02636         ahprintf(AH, "%s\n\n", cmd->data);
02637 
02638     destroyPQExpBuffer(cmd);
02639 }
02640 
02641 
02642 /*
02643  * Issue a SET default_with_oids command.  Caller is responsible
02644  * for updating state if appropriate.
02645  */
02646 static void
02647 _doSetWithOids(ArchiveHandle *AH, const bool withOids)
02648 {
02649     PQExpBuffer cmd = createPQExpBuffer();
02650 
02651     appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
02652                       "true" : "false");
02653 
02654     if (RestoringToDB(AH))
02655     {
02656         PGresult   *res;
02657 
02658         res = PQexec(AH->connection, cmd->data);
02659 
02660         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
02661             warn_or_exit_horribly(AH, modulename,
02662                                   "could not set default_with_oids: %s",
02663                                   PQerrorMessage(AH->connection));
02664 
02665         PQclear(res);
02666     }
02667     else
02668         ahprintf(AH, "%s\n\n", cmd->data);
02669 
02670     destroyPQExpBuffer(cmd);
02671 }
02672 
02673 
02674 /*
02675  * Issue the commands to connect to the specified database.
02676  *
02677  * If we're currently restoring right into a database, this will
02678  * actually establish a connection. Otherwise it puts a \connect into
02679  * the script output.
02680  *
02681  * NULL dbname implies reconnecting to the current DB (pretty useless).
02682  */
02683 static void
02684 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
02685 {
02686     if (RestoringToDB(AH))
02687         ReconnectToServer(AH, dbname, NULL);
02688     else
02689     {
02690         PQExpBuffer qry = createPQExpBuffer();
02691 
02692         appendPQExpBuffer(qry, "\\connect %s\n\n",
02693                           dbname ? fmtId(dbname) : "-");
02694         ahprintf(AH, "%s", qry->data);
02695         destroyPQExpBuffer(qry);
02696     }
02697 
02698     /*
02699      * NOTE: currUser keeps track of what the imaginary session user in our
02700      * script is.  It's now effectively reset to the original userID.
02701      */
02702     if (AH->currUser)
02703         free(AH->currUser);
02704     AH->currUser = NULL;
02705 
02706     /* don't assume we still know the output schema, tablespace, etc either */
02707     if (AH->currSchema)
02708         free(AH->currSchema);
02709     AH->currSchema = NULL;
02710     if (AH->currTablespace)
02711         free(AH->currTablespace);
02712     AH->currTablespace = NULL;
02713     AH->currWithOids = -1;
02714 
02715     /* re-establish fixed state */
02716     _doSetFixedOutputState(AH);
02717 }
02718 
02719 /*
02720  * Become the specified user, and update state to avoid redundant commands
02721  *
02722  * NULL or empty argument is taken to mean restoring the session default
02723  */
02724 static void
02725 _becomeUser(ArchiveHandle *AH, const char *user)
02726 {
02727     if (!user)
02728         user = "";              /* avoid null pointers */
02729 
02730     if (AH->currUser && strcmp(AH->currUser, user) == 0)
02731         return;                 /* no need to do anything */
02732 
02733     _doSetSessionAuth(AH, user);
02734 
02735     /*
02736      * NOTE: currUser keeps track of what the imaginary session user in our
02737      * script is
02738      */
02739     if (AH->currUser)
02740         free(AH->currUser);
02741     AH->currUser = pg_strdup(user);
02742 }
02743 
02744 /*
02745  * Become the owner of the given TOC entry object.  If
02746  * changes in ownership are not allowed, this doesn't do anything.
02747  */
02748 static void
02749 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
02750 {
02751     if (AH->ropt && (AH->ropt->noOwner || !AH->ropt->use_setsessauth))
02752         return;
02753 
02754     _becomeUser(AH, te->owner);
02755 }
02756 
02757 
02758 /*
02759  * Set the proper default_with_oids value for the table.
02760  */
02761 static void
02762 _setWithOids(ArchiveHandle *AH, TocEntry *te)
02763 {
02764     if (AH->currWithOids != te->withOids)
02765     {
02766         _doSetWithOids(AH, te->withOids);
02767         AH->currWithOids = te->withOids;
02768     }
02769 }
02770 
02771 
02772 /*
02773  * Issue the commands to select the specified schema as the current schema
02774  * in the target database.
02775  */
02776 static void
02777 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
02778 {
02779     PQExpBuffer qry;
02780 
02781     if (!schemaName || *schemaName == '\0' ||
02782         (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
02783         return;                 /* no need to do anything */
02784 
02785     qry = createPQExpBuffer();
02786 
02787     appendPQExpBuffer(qry, "SET search_path = %s",
02788                       fmtId(schemaName));
02789     if (strcmp(schemaName, "pg_catalog") != 0)
02790         appendPQExpBuffer(qry, ", pg_catalog");
02791 
02792     if (RestoringToDB(AH))
02793     {
02794         PGresult   *res;
02795 
02796         res = PQexec(AH->connection, qry->data);
02797 
02798         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
02799             warn_or_exit_horribly(AH, modulename,
02800                                   "could not set search_path to \"%s\": %s",
02801                                   schemaName, PQerrorMessage(AH->connection));
02802 
02803         PQclear(res);
02804     }
02805     else
02806         ahprintf(AH, "%s;\n\n", qry->data);
02807 
02808     if (AH->currSchema)
02809         free(AH->currSchema);
02810     AH->currSchema = pg_strdup(schemaName);
02811 
02812     destroyPQExpBuffer(qry);
02813 }
02814 
02815 /*
02816  * Issue the commands to select the specified tablespace as the current one
02817  * in the target database.
02818  */
02819 static void
02820 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
02821 {
02822     PQExpBuffer qry;
02823     const char *want,
02824                *have;
02825 
02826     /* do nothing in --no-tablespaces mode */
02827     if (AH->ropt->noTablespace)
02828         return;
02829 
02830     have = AH->currTablespace;
02831     want = tablespace;
02832 
02833     /* no need to do anything for non-tablespace object */
02834     if (!want)
02835         return;
02836 
02837     if (have && strcmp(want, have) == 0)
02838         return;                 /* no need to do anything */
02839 
02840     qry = createPQExpBuffer();
02841 
02842     if (strcmp(want, "") == 0)
02843     {
02844         /* We want the tablespace to be the database's default */
02845         appendPQExpBuffer(qry, "SET default_tablespace = ''");
02846     }
02847     else
02848     {
02849         /* We want an explicit tablespace */
02850         appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
02851     }
02852 
02853     if (RestoringToDB(AH))
02854     {
02855         PGresult   *res;
02856 
02857         res = PQexec(AH->connection, qry->data);
02858 
02859         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
02860             warn_or_exit_horribly(AH, modulename,
02861                                 "could not set default_tablespace to %s: %s",
02862                                 fmtId(want), PQerrorMessage(AH->connection));
02863 
02864         PQclear(res);
02865     }
02866     else
02867         ahprintf(AH, "%s;\n\n", qry->data);
02868 
02869     if (AH->currTablespace)
02870         free(AH->currTablespace);
02871     AH->currTablespace = pg_strdup(want);
02872 
02873     destroyPQExpBuffer(qry);
02874 }
02875 
02876 /*
02877  * Extract an object description for a TOC entry, and append it to buf.
02878  *
02879  * This is not quite as general as it may seem, since it really only
02880  * handles constructing the right thing to put into ALTER ... OWNER TO.
02881  *
02882  * The whole thing is pretty grotty, but we are kind of stuck since the
02883  * information used is all that's available in older dump files.
02884  */
02885 static void
02886 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
02887 {
02888     const char *type = te->desc;
02889 
02890     /* Use ALTER TABLE for views and sequences */
02891     if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
02892         strcmp(type, "MATERIALIZED VIEW") == 0)
02893         type = "TABLE";
02894 
02895     /* objects named by a schema and name */
02896     if (strcmp(type, "COLLATION") == 0 ||
02897         strcmp(type, "CONVERSION") == 0 ||
02898         strcmp(type, "DOMAIN") == 0 ||
02899         strcmp(type, "TABLE") == 0 ||
02900         strcmp(type, "TYPE") == 0 ||
02901         strcmp(type, "FOREIGN TABLE") == 0 ||
02902         strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
02903         strcmp(type, "TEXT SEARCH CONFIGURATION") == 0)
02904     {
02905         appendPQExpBuffer(buf, "%s ", type);
02906         if (te->namespace && te->namespace[0])  /* is null pre-7.3 */
02907             appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
02908 
02909         /*
02910          * Pre-7.3 pg_dump would sometimes (not always) put a fmtId'd name
02911          * into te->tag for an index. This check is heuristic, so make its
02912          * scope as narrow as possible.
02913          */
02914         if (AH->version < K_VERS_1_7 &&
02915             te->tag[0] == '"' &&
02916             te->tag[strlen(te->tag) - 1] == '"' &&
02917             strcmp(type, "INDEX") == 0)
02918             appendPQExpBuffer(buf, "%s", te->tag);
02919         else
02920             appendPQExpBuffer(buf, "%s", fmtId(te->tag));
02921         return;
02922     }
02923 
02924     /* objects named by just a name */
02925     if (strcmp(type, "DATABASE") == 0 ||
02926         strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
02927         strcmp(type, "SCHEMA") == 0 ||
02928         strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
02929         strcmp(type, "SERVER") == 0 ||
02930         strcmp(type, "USER MAPPING") == 0)
02931     {
02932         appendPQExpBuffer(buf, "%s %s", type, fmtId(te->tag));
02933         return;
02934     }
02935 
02936     /* BLOBs just have a name, but it's numeric so must not use fmtId */
02937     if (strcmp(type, "BLOB") == 0)
02938     {
02939         appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
02940         return;
02941     }
02942 
02943     /*
02944      * These object types require additional decoration.  Fortunately, the
02945      * information needed is exactly what's in the DROP command.
02946      */
02947     if (strcmp(type, "AGGREGATE") == 0 ||
02948         strcmp(type, "FUNCTION") == 0 ||
02949         strcmp(type, "OPERATOR") == 0 ||
02950         strcmp(type, "OPERATOR CLASS") == 0 ||
02951         strcmp(type, "OPERATOR FAMILY") == 0)
02952     {
02953         /* Chop "DROP " off the front and make a modifiable copy */
02954         char       *first = pg_strdup(te->dropStmt + 5);
02955         char       *last;
02956 
02957         /* point to last character in string */
02958         last = first + strlen(first) - 1;
02959 
02960         /* Strip off any ';' or '\n' at the end */
02961         while (last >= first && (*last == '\n' || *last == ';'))
02962             last--;
02963         *(last + 1) = '\0';
02964 
02965         appendPQExpBufferStr(buf, first);
02966 
02967         free(first);
02968         return;
02969     }
02970 
02971     write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
02972               type);
02973 }
02974 
02975 static void
02976 _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass)
02977 {
02978     /* ACLs are dumped only during acl pass */
02979     if (acl_pass)
02980     {
02981         if (!_tocEntryIsACL(te))
02982             return;
02983     }
02984     else
02985     {
02986         if (_tocEntryIsACL(te))
02987             return;
02988     }
02989 
02990     /*
02991      * Avoid dumping the public schema, as it will already be created ...
02992      * unless we are using --clean mode, in which case it's been deleted and
02993      * we'd better recreate it.  Likewise for its comment, if any.
02994      */
02995     if (!ropt->dropSchema)
02996     {
02997         if (strcmp(te->desc, "SCHEMA") == 0 &&
02998             strcmp(te->tag, "public") == 0)
02999             return;
03000         /* The comment restore would require super-user privs, so avoid it. */
03001         if (strcmp(te->desc, "COMMENT") == 0 &&
03002             strcmp(te->tag, "SCHEMA public") == 0)
03003             return;
03004     }
03005 
03006     /* Select owner, schema, and tablespace as necessary */
03007     _becomeOwner(AH, te);
03008     _selectOutputSchema(AH, te->namespace);
03009     _selectTablespace(AH, te->tablespace);
03010 
03011     /* Set up OID mode too */
03012     if (strcmp(te->desc, "TABLE") == 0)
03013         _setWithOids(AH, te);
03014 
03015     /* Emit header comment for item */
03016     if (!AH->noTocComments)
03017     {
03018         const char *pfx;
03019         char       *sanitized_name;
03020         char       *sanitized_schema;
03021         char       *sanitized_owner;
03022 
03023         if (isData)
03024             pfx = "Data for ";
03025         else
03026             pfx = "";
03027 
03028         ahprintf(AH, "--\n");
03029         if (AH->public.verbose)
03030         {
03031             ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
03032                      te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
03033             if (te->nDeps > 0)
03034             {
03035                 int         i;
03036 
03037                 ahprintf(AH, "-- Dependencies:");
03038                 for (i = 0; i < te->nDeps; i++)
03039                     ahprintf(AH, " %d", te->dependencies[i]);
03040                 ahprintf(AH, "\n");
03041             }
03042         }
03043 
03044         /*
03045          * Zap any line endings embedded in user-supplied fields, to prevent
03046          * corruption of the dump (which could, in the worst case, present an
03047          * SQL injection vulnerability if someone were to incautiously load a
03048          * dump containing objects with maliciously crafted names).
03049          */
03050         sanitized_name = replace_line_endings(te->tag);
03051         if (te->namespace)
03052             sanitized_schema = replace_line_endings(te->namespace);
03053         else
03054             sanitized_schema = pg_strdup("-");
03055         if (!ropt->noOwner)
03056             sanitized_owner = replace_line_endings(te->owner);
03057         else
03058             sanitized_owner = pg_strdup("-");
03059 
03060         ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
03061                  pfx, sanitized_name, te->desc, sanitized_schema,
03062                  sanitized_owner);
03063 
03064         free(sanitized_name);
03065         free(sanitized_schema);
03066         free(sanitized_owner);
03067 
03068         if (te->tablespace && !ropt->noTablespace)
03069         {
03070             char       *sanitized_tablespace;
03071 
03072             sanitized_tablespace = replace_line_endings(te->tablespace);
03073             ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
03074             free(sanitized_tablespace);
03075         }
03076         ahprintf(AH, "\n");
03077 
03078         if (AH->PrintExtraTocPtr !=NULL)
03079             (*AH->PrintExtraTocPtr) (AH, te);
03080         ahprintf(AH, "--\n\n");
03081     }
03082 
03083     /*
03084      * Actually print the definition.
03085      *
03086      * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
03087      * versions put into CREATE SCHEMA.  We have to do this when --no-owner
03088      * mode is selected.  This is ugly, but I see no other good way ...
03089      */
03090     if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
03091     {
03092         ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
03093     }
03094     else
03095     {
03096         if (strlen(te->defn) > 0)
03097             ahprintf(AH, "%s\n\n", te->defn);
03098     }
03099 
03100     /*
03101      * If we aren't using SET SESSION AUTH to determine ownership, we must
03102      * instead issue an ALTER OWNER command.  We assume that anything without
03103      * a DROP command is not a separately ownable object.  All the categories
03104      * with DROP commands must appear in one list or the other.
03105      */
03106     if (!ropt->noOwner && !ropt->use_setsessauth &&
03107         strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
03108     {
03109         if (strcmp(te->desc, "AGGREGATE") == 0 ||
03110             strcmp(te->desc, "BLOB") == 0 ||
03111             strcmp(te->desc, "COLLATION") == 0 ||
03112             strcmp(te->desc, "CONVERSION") == 0 ||
03113             strcmp(te->desc, "DATABASE") == 0 ||
03114             strcmp(te->desc, "DOMAIN") == 0 ||
03115             strcmp(te->desc, "FUNCTION") == 0 ||
03116             strcmp(te->desc, "OPERATOR") == 0 ||
03117             strcmp(te->desc, "OPERATOR CLASS") == 0 ||
03118             strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
03119             strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
03120             strcmp(te->desc, "SCHEMA") == 0 ||
03121             strcmp(te->desc, "TABLE") == 0 ||
03122             strcmp(te->desc, "TYPE") == 0 ||
03123             strcmp(te->desc, "VIEW") == 0 ||
03124             strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
03125             strcmp(te->desc, "SEQUENCE") == 0 ||
03126             strcmp(te->desc, "FOREIGN TABLE") == 0 ||
03127             strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
03128             strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
03129             strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
03130             strcmp(te->desc, "SERVER") == 0)
03131         {
03132             PQExpBuffer temp = createPQExpBuffer();
03133 
03134             appendPQExpBuffer(temp, "ALTER ");
03135             _getObjectDescription(temp, te, AH);
03136             appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
03137             ahprintf(AH, "%s\n\n", temp->data);
03138             destroyPQExpBuffer(temp);
03139         }
03140         else if (strcmp(te->desc, "CAST") == 0 ||
03141                  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
03142                  strcmp(te->desc, "CONSTRAINT") == 0 ||
03143                  strcmp(te->desc, "DEFAULT") == 0 ||
03144                  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
03145                  strcmp(te->desc, "INDEX") == 0 ||
03146                  strcmp(te->desc, "RULE") == 0 ||
03147                  strcmp(te->desc, "TRIGGER") == 0 ||
03148                  strcmp(te->desc, "USER MAPPING") == 0)
03149         {
03150             /* these object types don't have separate owners */
03151         }
03152         else
03153         {
03154             write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
03155                       te->desc);
03156         }
03157     }
03158 
03159     /*
03160      * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
03161      * commands, so we can no longer assume we know the current auth setting.
03162      */
03163     if (acl_pass)
03164     {
03165         if (AH->currUser)
03166             free(AH->currUser);
03167         AH->currUser = NULL;
03168     }
03169 }
03170 
03171 /*
03172  * Sanitize a string to be included in an SQL comment, by replacing any
03173  * newlines with spaces.
03174  */
03175 static char *
03176 replace_line_endings(const char *str)
03177 {
03178     char       *result;
03179     char       *s;
03180 
03181     result = pg_strdup(str);
03182 
03183     for (s = result; *s != '\0'; s++)
03184     {
03185         if (*s == '\n' || *s == '\r')
03186             *s = ' ';
03187     }
03188 
03189     return result;
03190 }
03191 
03192 void
03193 WriteHead(ArchiveHandle *AH)
03194 {
03195     struct tm   crtm;
03196 
03197     (*AH->WriteBufPtr) (AH, "PGDMP", 5);        /* Magic code */
03198     (*AH->WriteBytePtr) (AH, AH->vmaj);
03199     (*AH->WriteBytePtr) (AH, AH->vmin);
03200     (*AH->WriteBytePtr) (AH, AH->vrev);
03201     (*AH->WriteBytePtr) (AH, AH->intSize);
03202     (*AH->WriteBytePtr) (AH, AH->offSize);
03203     (*AH->WriteBytePtr) (AH, AH->format);
03204 
03205 #ifndef HAVE_LIBZ
03206     if (AH->compression != 0)
03207         write_msg(modulename, "WARNING: requested compression not available in this "
03208                   "installation -- archive will be uncompressed\n");
03209 
03210     AH->compression = 0;
03211 #endif
03212 
03213     WriteInt(AH, AH->compression);
03214 
03215     crtm = *localtime(&AH->createDate);
03216     WriteInt(AH, crtm.tm_sec);
03217     WriteInt(AH, crtm.tm_min);
03218     WriteInt(AH, crtm.tm_hour);
03219     WriteInt(AH, crtm.tm_mday);
03220     WriteInt(AH, crtm.tm_mon);
03221     WriteInt(AH, crtm.tm_year);
03222     WriteInt(AH, crtm.tm_isdst);
03223     WriteStr(AH, PQdb(AH->connection));
03224     WriteStr(AH, AH->public.remoteVersionStr);
03225     WriteStr(AH, PG_VERSION);
03226 }
03227 
03228 void
03229 ReadHead(ArchiveHandle *AH)
03230 {
03231     char        tmpMag[7];
03232     int         fmt;
03233     struct tm   crtm;
03234 
03235     /*
03236      * If we haven't already read the header, do so.
03237      *
03238      * NB: this code must agree with _discoverArchiveFormat().  Maybe find a
03239      * way to unify the cases?
03240      */
03241     if (!AH->readHeader)
03242     {
03243         if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5)
03244             exit_horribly(modulename, "unexpected end of file\n");
03245 
03246         if (strncmp(tmpMag, "PGDMP", 5) != 0)
03247             exit_horribly(modulename, "did not find magic string in file header\n");
03248 
03249         AH->vmaj = (*AH->ReadBytePtr) (AH);
03250         AH->vmin = (*AH->ReadBytePtr) (AH);
03251 
03252         if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))        /* Version > 1.0 */
03253             AH->vrev = (*AH->ReadBytePtr) (AH);
03254         else
03255             AH->vrev = 0;
03256 
03257         AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
03258 
03259         if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
03260             exit_horribly(modulename, "unsupported version (%d.%d) in file header\n",
03261                           AH->vmaj, AH->vmin);
03262 
03263         AH->intSize = (*AH->ReadBytePtr) (AH);
03264         if (AH->intSize > 32)
03265             exit_horribly(modulename, "sanity check on integer size (%lu) failed\n",
03266                           (unsigned long) AH->intSize);
03267 
03268         if (AH->intSize > sizeof(int))
03269             write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
03270 
03271         if (AH->version >= K_VERS_1_7)
03272             AH->offSize = (*AH->ReadBytePtr) (AH);
03273         else
03274             AH->offSize = AH->intSize;
03275 
03276         fmt = (*AH->ReadBytePtr) (AH);
03277 
03278         if (AH->format != fmt)
03279             exit_horribly(modulename, "expected format (%d) differs from format found in file (%d)\n",
03280                           AH->format, fmt);
03281     }
03282 
03283     if (AH->version >= K_VERS_1_2)
03284     {
03285         if (AH->version < K_VERS_1_4)
03286             AH->compression = (*AH->ReadBytePtr) (AH);
03287         else
03288             AH->compression = ReadInt(AH);
03289     }
03290     else
03291         AH->compression = Z_DEFAULT_COMPRESSION;
03292 
03293 #ifndef HAVE_LIBZ
03294     if (AH->compression != 0)
03295         write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
03296 #endif
03297 
03298     if (AH->version >= K_VERS_1_4)
03299     {
03300         crtm.tm_sec = ReadInt(AH);
03301         crtm.tm_min = ReadInt(AH);
03302         crtm.tm_hour = ReadInt(AH);
03303         crtm.tm_mday = ReadInt(AH);
03304         crtm.tm_mon = ReadInt(AH);
03305         crtm.tm_year = ReadInt(AH);
03306         crtm.tm_isdst = ReadInt(AH);
03307 
03308         AH->archdbname = ReadStr(AH);
03309 
03310         AH->createDate = mktime(&crtm);
03311 
03312         if (AH->createDate == (time_t) -1)
03313             write_msg(modulename, "WARNING: invalid creation date in header\n");
03314     }
03315 
03316     if (AH->version >= K_VERS_1_10)
03317     {
03318         AH->archiveRemoteVersion = ReadStr(AH);
03319         AH->archiveDumpVersion = ReadStr(AH);
03320     }
03321 }
03322 
03323 
03324 /*
03325  * checkSeek
03326  *    check to see if ftell/fseek can be performed.
03327  */
03328 bool
03329 checkSeek(FILE *fp)
03330 {
03331     pgoff_t     tpos;
03332 
03333     /*
03334      * If pgoff_t is wider than long, we must have "real" fseeko and not an
03335      * emulation using fseek.  Otherwise report no seek capability.
03336      */
03337 #ifndef HAVE_FSEEKO
03338     if (sizeof(pgoff_t) > sizeof(long))
03339         return false;
03340 #endif
03341 
03342     /* Check that ftello works on this file */
03343     errno = 0;
03344     tpos = ftello(fp);
03345     if (errno)
03346         return false;
03347 
03348     /*
03349      * Check that fseeko(SEEK_SET) works, too.  NB: we used to try to test
03350      * this with fseeko(fp, 0, SEEK_CUR).  But some platforms treat that as a
03351      * successful no-op even on files that are otherwise unseekable.
03352      */
03353     if (fseeko(fp, tpos, SEEK_SET) != 0)
03354         return false;
03355 
03356     return true;
03357 }
03358 
03359 
03360 /*
03361  * dumpTimestamp
03362  */
03363 static void
03364 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
03365 {
03366     char        buf[256];
03367 
03368     /*
03369      * We don't print the timezone on Win32, because the names are long and
03370      * localized, which means they may contain characters in various random
03371      * encodings; this has been seen to cause encoding errors when reading the
03372      * dump script.
03373      */
03374     if (strftime(buf, sizeof(buf),
03375 #ifndef WIN32
03376                  "%Y-%m-%d %H:%M:%S %Z",
03377 #else
03378                  "%Y-%m-%d %H:%M:%S",
03379 #endif
03380                  localtime(&tim)) != 0)
03381         ahprintf(AH, "-- %s %s\n\n", msg, buf);
03382 }
03383 
03384 /*
03385  * Main engine for parallel restore.
03386  *
03387  * Work is done in three phases.
03388  * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
03389  * just as for a standard restore.  Second we process the remaining non-ACL
03390  * steps in parallel worker children (threads on Windows, processes on Unix),
03391  * each of which connects separately to the database.  Finally we process all
03392  * the ACL entries in a single connection (that happens back in
03393  * RestoreArchive).
03394  */
03395 static void
03396 restore_toc_entries_prefork(ArchiveHandle *AH)
03397 {
03398     RestoreOptions *ropt = AH->ropt;
03399     bool        skipped_some;
03400     TocEntry   *next_work_item;
03401 
03402     ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
03403 
03404     /* Adjust dependency information */
03405     fix_dependencies(AH);
03406 
03407     /*
03408      * Do all the early stuff in a single connection in the parent. There's no
03409      * great point in running it in parallel, in fact it will actually run
03410      * faster in a single connection because we avoid all the connection and
03411      * setup overhead.  Also, pre-9.2 pg_dump versions were not very good
03412      * about showing all the dependencies of SECTION_PRE_DATA items, so we do
03413      * not risk trying to process them out-of-order.
03414      *
03415      * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
03416      * before DATA items, and all DATA items before POST_DATA items.  That is
03417      * not certain to be true in older archives, though, so this loop is coded
03418      * to not assume it.
03419      */
03420     skipped_some = false;
03421     for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
03422     {
03423         /* NB: process-or-continue logic must be the inverse of loop below */
03424         if (next_work_item->section != SECTION_PRE_DATA)
03425         {
03426             /* DATA and POST_DATA items are just ignored for now */
03427             if (next_work_item->section == SECTION_DATA ||
03428                 next_work_item->section == SECTION_POST_DATA)
03429             {
03430                 skipped_some = true;
03431                 continue;
03432             }
03433             else
03434             {
03435                 /*
03436                  * SECTION_NONE items, such as comments, can be processed now
03437                  * if we are still in the PRE_DATA part of the archive.  Once
03438                  * we've skipped any items, we have to consider whether the
03439                  * comment's dependencies are satisfied, so skip it for now.
03440                  */
03441                 if (skipped_some)
03442                     continue;
03443             }
03444         }
03445 
03446         ahlog(AH, 1, "processing item %d %s %s\n",
03447               next_work_item->dumpId,
03448               next_work_item->desc, next_work_item->tag);
03449 
03450         (void) restore_toc_entry(AH, next_work_item, ropt, false);
03451 
03452         /* there should be no touch of ready_list here, so pass NULL */
03453         reduce_dependencies(AH, next_work_item, NULL);
03454     }
03455 
03456     /*
03457      * Now close parent connection in prep for parallel steps.  We do this
03458      * mainly to ensure that we don't exceed the specified number of parallel
03459      * connections.
03460      */
03461     DisconnectDatabase(&AH->public);
03462 
03463     /* blow away any transient state from the old connection */
03464     if (AH->currUser)
03465         free(AH->currUser);
03466     AH->currUser = NULL;
03467     if (AH->currSchema)
03468         free(AH->currSchema);
03469     AH->currSchema = NULL;
03470     if (AH->currTablespace)
03471         free(AH->currTablespace);
03472     AH->currTablespace = NULL;
03473     AH->currWithOids = -1;
03474 }
03475 
03476 /*
03477  * Main engine for parallel restore.
03478  *
03479  * Work is done in three phases.
03480  * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
03481  * just as for a standard restore. This is done in restore_toc_entries_prefork().
03482  * Second we process the remaining non-ACL steps in parallel worker children
03483  * (threads on Windows, processes on Unix), these fork off and set up their
03484  * connections before we call restore_toc_entries_parallel_forked.
03485  * Finally we process all the ACL entries in a single connection (that happens
03486  * back in RestoreArchive).
03487  */
03488 static void
03489 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
03490                              TocEntry *pending_list)
03491 {
03492     int         work_status;
03493     bool        skipped_some;
03494     TocEntry    ready_list;
03495     TocEntry   *next_work_item;
03496     int         ret_child;
03497 
03498     ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
03499 
03500     /*
03501      * Initialize the lists of ready items, the list for pending items has
03502      * already been initialized in the caller.  After this setup, the pending
03503      * list is everything that needs to be done but is blocked by one or more
03504      * dependencies, while the ready list contains items that have no
03505      * remaining dependencies. Note: we don't yet filter out entries that
03506      * aren't going to be restored. They might participate in dependency
03507      * chains connecting entries that should be restored, so we treat them as
03508      * live until we actually process them.
03509      */
03510     par_list_header_init(&ready_list);
03511     skipped_some = false;
03512     for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
03513     {
03514         /* NB: process-or-continue logic must be the inverse of loop above */
03515         if (next_work_item->section == SECTION_PRE_DATA)
03516         {
03517             /* All PRE_DATA items were dealt with above */
03518             continue;
03519         }
03520         if (next_work_item->section == SECTION_DATA ||
03521             next_work_item->section == SECTION_POST_DATA)
03522         {
03523             /* set this flag at same point that previous loop did */
03524             skipped_some = true;
03525         }
03526         else
03527         {
03528             /* SECTION_NONE items must be processed if previous loop didn't */
03529             if (!skipped_some)
03530                 continue;
03531         }
03532 
03533         if (next_work_item->depCount > 0)
03534             par_list_append(pending_list, next_work_item);
03535         else
03536             par_list_append(&ready_list, next_work_item);
03537     }
03538 
03539     /*
03540      * main parent loop
03541      *
03542      * Keep going until there is no worker still running AND there is no work
03543      * left to be done.
03544      */
03545 
03546     ahlog(AH, 1, "entering main parallel loop\n");
03547 
03548     while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
03549            !IsEveryWorkerIdle(pstate))
03550     {
03551         if (next_work_item != NULL)
03552         {
03553             /* If not to be restored, don't waste time launching a worker */
03554             if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0 ||
03555                 _tocEntryIsACL(next_work_item))
03556             {
03557                 ahlog(AH, 1, "skipping item %d %s %s\n",
03558                       next_work_item->dumpId,
03559                       next_work_item->desc, next_work_item->tag);
03560 
03561                 par_list_remove(next_work_item);
03562                 reduce_dependencies(AH, next_work_item, &ready_list);
03563 
03564                 continue;
03565             }
03566 
03567             ahlog(AH, 1, "launching item %d %s %s\n",
03568                   next_work_item->dumpId,
03569                   next_work_item->desc, next_work_item->tag);
03570 
03571             par_list_remove(next_work_item);
03572 
03573             Assert(GetIdleWorker(pstate) != NO_SLOT);
03574             DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
03575         }
03576         else
03577         {
03578             /* at least one child is working and we have nothing ready. */
03579             Assert(!IsEveryWorkerIdle(pstate));
03580         }
03581 
03582         for (;;)
03583         {
03584             int         nTerm = 0;
03585 
03586             /*
03587              * In order to reduce dependencies as soon as possible and
03588              * especially to reap the status of workers who are working on
03589              * items that pending items depend on, we do a non-blocking check
03590              * for ended workers first.
03591              *
03592              * However, if we do not have any other work items currently that
03593              * workers can work on, we do not busy-loop here but instead
03594              * really wait for at least one worker to terminate. Hence we call
03595              * ListenToWorkers(..., ..., do_wait = true) in this case.
03596              */
03597             ListenToWorkers(AH, pstate, !next_work_item);
03598 
03599             while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
03600             {
03601                 nTerm++;
03602                 mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
03603             }
03604 
03605             /*
03606              * We need to make sure that we have an idle worker before
03607              * re-running the loop. If nTerm > 0 we already have that (quick
03608              * check).
03609              */
03610             if (nTerm > 0)
03611                 break;
03612 
03613             /* if nobody terminated, explicitly check for an idle worker */
03614             if (GetIdleWorker(pstate) != NO_SLOT)
03615                 break;
03616 
03617             /*
03618              * If we have no idle worker, read the result of one or more
03619              * workers and loop the loop to call ReapWorkerStatus() on them.
03620              */
03621             ListenToWorkers(AH, pstate, true);
03622         }
03623     }
03624 
03625     ahlog(AH, 1, "finished main parallel loop\n");
03626 }
03627 
03628 static void
03629 restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
03630 {
03631     RestoreOptions *ropt = AH->ropt;
03632     TocEntry   *te;
03633 
03634     ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
03635 
03636     /*
03637      * Now reconnect the single parent connection.
03638      */
03639     ConnectDatabase((Archive *) AH, ropt->dbname,
03640                     ropt->pghost, ropt->pgport, ropt->username,
03641                     ropt->promptPassword);
03642 
03643     _doSetFixedOutputState(AH);
03644 
03645     /*
03646      * Make sure there is no non-ACL work left due to, say, circular
03647      * dependencies, or some other pathological condition. If so, do it in the
03648      * single parent connection.
03649      */
03650     for (te = pending_list->par_next; te != pending_list; te = te->par_next)
03651     {
03652         ahlog(AH, 1, "processing missed item %d %s %s\n",
03653               te->dumpId, te->desc, te->tag);
03654         (void) restore_toc_entry(AH, te, ropt, false);
03655     }
03656 
03657     /* The ACLs will be handled back in RestoreArchive. */
03658 }
03659 
03660 /*
03661  * Check if te1 has an exclusive lock requirement for an item that te2 also
03662  * requires, whether or not te2's requirement is for an exclusive lock.
03663  */
03664 static bool
03665 has_lock_conflicts(TocEntry *te1, TocEntry *te2)
03666 {
03667     int         j,
03668                 k;
03669 
03670     for (j = 0; j < te1->nLockDeps; j++)
03671     {
03672         for (k = 0; k < te2->nDeps; k++)
03673         {
03674             if (te1->lockDeps[j] == te2->dependencies[k])
03675                 return true;
03676         }
03677     }
03678     return false;
03679 }
03680 
03681 
03682 /*
03683  * Initialize the header of a parallel-processing list.
03684  *
03685  * These are circular lists with a dummy TocEntry as header, just like the
03686  * main TOC list; but we use separate list links so that an entry can be in
03687  * the main TOC list as well as in a parallel-processing list.
03688  */
03689 static void
03690 par_list_header_init(TocEntry *l)
03691 {
03692     l->par_prev = l->par_next = l;
03693 }
03694 
03695 /* Append te to the end of the parallel-processing list headed by l */
03696 static void
03697 par_list_append(TocEntry *l, TocEntry *te)
03698 {
03699     te->par_prev = l->par_prev;
03700     l->par_prev->par_next = te;
03701     l->par_prev = te;
03702     te->par_next = l;
03703 }
03704 
03705 /* Remove te from whatever parallel-processing list it's in */
03706 static void
03707 par_list_remove(TocEntry *te)
03708 {
03709     te->par_prev->par_next = te->par_next;
03710     te->par_next->par_prev = te->par_prev;
03711     te->par_prev = NULL;
03712     te->par_next = NULL;
03713 }
03714 
03715 
03716 /*
03717  * Find the next work item (if any) that is capable of being run now.
03718  *
03719  * To qualify, the item must have no remaining dependencies
03720  * and no requirements for locks that are incompatible with
03721  * items currently running.  Items in the ready_list are known to have
03722  * no remaining dependencies, but we have to check for lock conflicts.
03723  *
03724  * Note that the returned item has *not* been removed from ready_list.
03725  * The caller must do that after successfully dispatching the item.
03726  *
03727  * pref_non_data is for an alternative selection algorithm that gives
03728  * preference to non-data items if there is already a data load running.
03729  * It is currently disabled.
03730  */
03731 static TocEntry *
03732 get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
03733                    ParallelState *pstate)
03734 {
03735     bool        pref_non_data = false;  /* or get from AH->ropt */
03736     TocEntry   *data_te = NULL;
03737     TocEntry   *te;
03738     int         i,
03739                 k;
03740 
03741     /*
03742      * Bogus heuristics for pref_non_data
03743      */
03744     if (pref_non_data)
03745     {
03746         int         count = 0;
03747 
03748         for (k = 0; k < pstate->numWorkers; k++)
03749             if (pstate->parallelSlot[k].args->te != NULL &&
03750                 pstate->parallelSlot[k].args->te->section == SECTION_DATA)
03751                 count++;
03752         if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
03753             pref_non_data = false;
03754     }
03755 
03756     /*
03757      * Search the ready_list until we find a suitable item.
03758      */
03759     for (te = ready_list->par_next; te != ready_list; te = te->par_next)
03760     {
03761         bool        conflicts = false;
03762 
03763         /*
03764          * Check to see if the item would need exclusive lock on something
03765          * that a currently running item also needs lock on, or vice versa. If
03766          * so, we don't want to schedule them together.
03767          */
03768         for (i = 0; i < pstate->numWorkers && !conflicts; i++)
03769         {
03770             TocEntry   *running_te;
03771 
03772             if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
03773                 continue;
03774             running_te = pstate->parallelSlot[i].args->te;
03775 
03776             if (has_lock_conflicts(te, running_te) ||
03777                 has_lock_conflicts(running_te, te))
03778             {
03779                 conflicts = true;
03780                 break;
03781             }
03782         }
03783 
03784         if (conflicts)
03785             continue;
03786 
03787         if (pref_non_data && te->section == SECTION_DATA)
03788         {
03789             if (data_te == NULL)
03790                 data_te = te;
03791             continue;
03792         }
03793 
03794         /* passed all tests, so this item can run */
03795         return te;
03796     }
03797 
03798     if (data_te != NULL)
03799         return data_te;
03800 
03801     ahlog(AH, 2, "no item ready\n");
03802     return NULL;
03803 }
03804 
03805 
03806 /*
03807  * Restore a single TOC item in parallel with others
03808  *
03809  * this is run in the worker, i.e. in a thread (Windows) or a separate process
03810  * (everything else). A worker process executes several such work items during
03811  * a parallel backup or restore. Once we terminate here and report back that
03812  * our work is finished, the master process will assign us a new work item.
03813  */
03814 int
03815 parallel_restore(ParallelArgs * args)
03816 {
03817     ArchiveHandle *AH = args->AH;
03818     TocEntry   *te = args->te;
03819     RestoreOptions *ropt = AH->ropt;
03820     int         status;
03821 
03822     _doSetFixedOutputState(AH);
03823 
03824     Assert(AH->connection != NULL);
03825 
03826     AH->public.n_errors = 0;
03827 
03828     /* Restore the TOC item */
03829     status = restore_toc_entry(AH, te, ropt, true);
03830 
03831     return status;
03832 }
03833 
03834 
03835 /*
03836  * Housekeeping to be done after a step has been parallel restored.
03837  *
03838  * Clear the appropriate slot, free all the extra memory we allocated,
03839  * update status, and reduce the dependency count of any dependent items.
03840  */
03841 static void
03842 mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
03843                int worker, int status,
03844                ParallelState *pstate)
03845 {
03846     TocEntry   *te = NULL;
03847 
03848     te = pstate->parallelSlot[worker].args->te;
03849 
03850     if (te == NULL)
03851         exit_horribly(modulename, "could not find slot of finished worker\n");
03852 
03853     ahlog(AH, 1, "finished item %d %s %s\n",
03854           te->dumpId, te->desc, te->tag);
03855 
03856     if (status == WORKER_CREATE_DONE)
03857         mark_create_done(AH, te);
03858     else if (status == WORKER_INHIBIT_DATA)
03859     {
03860         inhibit_data_for_failed_table(AH, te);
03861         AH->public.n_errors++;
03862     }
03863     else if (status == WORKER_IGNORED_ERRORS)
03864         AH->public.n_errors++;
03865     else if (status != 0)
03866         exit_horribly(modulename, "worker process failed: exit code %d\n",
03867                       status);
03868 
03869     reduce_dependencies(AH, te, ready_list);
03870 }
03871 
03872 
03873 /*
03874  * Process the dependency information into a form useful for parallel restore.
03875  *
03876  * This function takes care of fixing up some missing or badly designed
03877  * dependencies, and then prepares subsidiary data structures that will be
03878  * used in the main parallel-restore logic, including:
03879  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
03880  * 2. We set up depCount fields that are the number of as-yet-unprocessed
03881  * dependencies for each TOC entry.
03882  *
03883  * We also identify locking dependencies so that we can avoid trying to
03884  * schedule conflicting items at the same time.
03885  */
03886 static void
03887 fix_dependencies(ArchiveHandle *AH)
03888 {
03889     TocEntry   *te;
03890     int         i;
03891 
03892     /*
03893      * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
03894      * items are marked as not being in any parallel-processing list.
03895      */
03896     for (te = AH->toc->next; te != AH->toc; te = te->next)
03897     {
03898         te->depCount = te->nDeps;
03899         te->revDeps = NULL;
03900         te->nRevDeps = 0;
03901         te->par_prev = NULL;
03902         te->par_next = NULL;
03903     }
03904 
03905     /*
03906      * POST_DATA items that are shown as depending on a table need to be
03907      * re-pointed to depend on that table's data, instead.  This ensures they
03908      * won't get scheduled until the data has been loaded.
03909      */
03910     repoint_table_dependencies(AH);
03911 
03912     /*
03913      * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
03914      * COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS and only
03915      * one BLOB COMMENTS in such files.)
03916      */
03917     if (AH->version < K_VERS_1_11)
03918     {
03919         for (te = AH->toc->next; te != AH->toc; te = te->next)
03920         {
03921             if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
03922             {
03923                 TocEntry   *te2;
03924 
03925                 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
03926                 {
03927                     if (strcmp(te2->desc, "BLOBS") == 0)
03928                     {
03929                         te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
03930                         te->dependencies[0] = te2->dumpId;
03931                         te->nDeps++;
03932                         te->depCount++;
03933                         break;
03934                     }
03935                 }
03936                 break;
03937             }
03938         }
03939     }
03940 
03941     /*
03942      * At this point we start to build the revDeps reverse-dependency arrays,
03943      * so all changes of dependencies must be complete.
03944      */
03945 
03946     /*
03947      * Count the incoming dependencies for each item.  Also, it is possible
03948      * that the dependencies list items that are not in the archive at all
03949      * (that should not happen in 9.2 and later, but is highly likely in older
03950      * archives).  Subtract such items from the depCounts.
03951      */
03952     for (te = AH->toc->next; te != AH->toc; te = te->next)
03953     {
03954         for (i = 0; i < te->nDeps; i++)
03955         {
03956             DumpId      depid = te->dependencies[i];
03957 
03958             if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
03959                 AH->tocsByDumpId[depid]->nRevDeps++;
03960             else
03961                 te->depCount--;
03962         }
03963     }
03964 
03965     /*
03966      * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
03967      * it as a counter below.
03968      */
03969     for (te = AH->toc->next; te != AH->toc; te = te->next)
03970     {
03971         if (te->nRevDeps > 0)
03972             te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
03973         te->nRevDeps = 0;
03974     }
03975 
03976     /*
03977      * Build the revDeps[] arrays of incoming-dependency dumpIds.  This had
03978      * better agree with the loops above.
03979      */
03980     for (te = AH->toc->next; te != AH->toc; te = te->next)
03981     {
03982         for (i = 0; i < te->nDeps; i++)
03983         {
03984             DumpId      depid = te->dependencies[i];
03985 
03986             if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
03987             {
03988                 TocEntry   *otherte = AH->tocsByDumpId[depid];
03989 
03990                 otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
03991             }
03992         }
03993     }
03994 
03995     /*
03996      * Lastly, work out the locking dependencies.
03997      */
03998     for (te = AH->toc->next; te != AH->toc; te = te->next)
03999     {
04000         te->lockDeps = NULL;
04001         te->nLockDeps = 0;
04002         identify_locking_dependencies(AH, te);
04003     }
04004 }
04005 
04006 /*
04007  * Change dependencies on table items to depend on table data items instead,
04008  * but only in POST_DATA items.
04009  */
04010 static void
04011 repoint_table_dependencies(ArchiveHandle *AH)
04012 {
04013     TocEntry   *te;
04014     int         i;
04015     DumpId      olddep;
04016 
04017     for (te = AH->toc->next; te != AH->toc; te = te->next)
04018     {
04019         if (te->section != SECTION_POST_DATA)
04020             continue;
04021         for (i = 0; i < te->nDeps; i++)
04022         {
04023             olddep = te->dependencies[i];
04024             if (olddep <= AH->maxDumpId &&
04025                 AH->tableDataId[olddep] != 0)
04026             {
04027                 te->dependencies[i] = AH->tableDataId[olddep];
04028                 ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
04029                       te->dumpId, olddep, AH->tableDataId[olddep]);
04030             }
04031         }
04032     }
04033 }
04034 
04035 /*
04036  * Identify which objects we'll need exclusive lock on in order to restore
04037  * the given TOC entry (*other* than the one identified by the TOC entry
04038  * itself).  Record their dump IDs in the entry's lockDeps[] array.
04039  */
04040 static void
04041 identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
04042 {
04043     DumpId     *lockids;
04044     int         nlockids;
04045     int         i;
04046 
04047     /* Quick exit if no dependencies at all */
04048     if (te->nDeps == 0)
04049         return;
04050 
04051     /* Exit if this entry doesn't need exclusive lock on other objects */
04052     if (!(strcmp(te->desc, "CONSTRAINT") == 0 ||
04053           strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
04054           strcmp(te->desc, "FK CONSTRAINT") == 0 ||
04055           strcmp(te->desc, "RULE") == 0 ||
04056           strcmp(te->desc, "TRIGGER") == 0))
04057         return;
04058 
04059     /*
04060      * We assume the item requires exclusive lock on each TABLE DATA item
04061      * listed among its dependencies.  (This was originally a dependency on
04062      * the TABLE, but fix_dependencies repointed it to the data item. Note
04063      * that all the entry types we are interested in here are POST_DATA, so
04064      * they will all have been changed this way.)
04065      */
04066     lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
04067     nlockids = 0;
04068     for (i = 0; i < te->nDeps; i++)
04069     {
04070         DumpId      depid = te->dependencies[i];
04071 
04072         if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
04073             strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0)
04074             lockids[nlockids++] = depid;
04075     }
04076 
04077     if (nlockids == 0)
04078     {
04079         free(lockids);
04080         return;
04081     }
04082 
04083     te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
04084     te->nLockDeps = nlockids;
04085 }
04086 
04087 /*
04088  * Remove the specified TOC entry from the depCounts of items that depend on
04089  * it, thereby possibly making them ready-to-run.  Any pending item that
04090  * becomes ready should be moved to the ready list.
04091  */
04092 static void
04093 reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
04094 {
04095     int         i;
04096 
04097     ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
04098 
04099     for (i = 0; i < te->nRevDeps; i++)
04100     {
04101         TocEntry   *otherte = AH->tocsByDumpId[te->revDeps[i]];
04102 
04103         otherte->depCount--;
04104         if (otherte->depCount == 0 && otherte->par_prev != NULL)
04105         {
04106             /* It must be in the pending list, so remove it ... */
04107             par_list_remove(otherte);
04108             /* ... and add to ready_list */
04109             par_list_append(ready_list, otherte);
04110         }
04111     }
04112 }
04113 
04114 /*
04115  * Set the created flag on the DATA member corresponding to the given
04116  * TABLE member
04117  */
04118 static void
04119 mark_create_done(ArchiveHandle *AH, TocEntry *te)
04120 {
04121     if (AH->tableDataId[te->dumpId] != 0)
04122     {
04123         TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
04124 
04125         ted->created = true;
04126     }
04127 }
04128 
04129 /*
04130  * Mark the DATA member corresponding to the given TABLE member
04131  * as not wanted
04132  */
04133 static void
04134 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
04135 {
04136     ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
04137           te->tag);
04138 
04139     if (AH->tableDataId[te->dumpId] != 0)
04140     {
04141         TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
04142 
04143         ted->reqs = 0;
04144     }
04145 }
04146 
04147 /*
04148  * Clone and de-clone routines used in parallel restoration.
04149  *
04150  * Enough of the structure is cloned to ensure that there is no
04151  * conflict between different threads each with their own clone.
04152  */
04153 ArchiveHandle *
04154 CloneArchive(ArchiveHandle *AH)
04155 {
04156     ArchiveHandle *clone;
04157 
04158     /* Make a "flat" copy */
04159     clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
04160     memcpy(clone, AH, sizeof(ArchiveHandle));
04161 
04162     /* Handle format-independent fields */
04163     memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
04164 
04165     /* The clone will have its own connection, so disregard connection state */
04166     clone->connection = NULL;
04167     clone->currUser = NULL;
04168     clone->currSchema = NULL;
04169     clone->currTablespace = NULL;
04170     clone->currWithOids = -1;
04171 
04172     /* savedPassword must be local in case we change it while connecting */
04173     if (clone->savedPassword)
04174         clone->savedPassword = pg_strdup(clone->savedPassword);
04175 
04176     /* clone has its own error count, too */
04177     clone->public.n_errors = 0;
04178 
04179     /*
04180      * Connect our new clone object to the database: In parallel restore the
04181      * parent is already disconnected, because we can connect the worker
04182      * processes independently to the database (no snapshot sync required). In
04183      * parallel backup we clone the parent's existing connection.
04184      */
04185     if (AH->mode == archModeRead)
04186     {
04187         RestoreOptions *ropt = AH->ropt;
04188 
04189         Assert(AH->connection == NULL);
04190         /* this also sets clone->connection */
04191         ConnectDatabase((Archive *) clone, ropt->dbname,
04192                         ropt->pghost, ropt->pgport, ropt->username,
04193                         ropt->promptPassword);
04194     }
04195     else
04196     {
04197         char       *dbname;
04198         char       *pghost;
04199         char       *pgport;
04200         char       *username;
04201         const char *encname;
04202 
04203         Assert(AH->connection != NULL);
04204 
04205         /*
04206          * Even though we are technically accessing the parent's database
04207          * object here, these functions are fine to be called like that
04208          * because all just return a pointer and do not actually send/receive
04209          * any data to/from the database.
04210          */
04211         dbname = PQdb(AH->connection);
04212         pghost = PQhost(AH->connection);
04213         pgport = PQport(AH->connection);
04214         username = PQuser(AH->connection);
04215         encname = pg_encoding_to_char(AH->public.encoding);
04216 
04217         /* this also sets clone->connection */
04218         ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
04219 
04220         /*
04221          * Set the same encoding, whatever we set here is what we got from
04222          * pg_encoding_to_char(), so we really shouldn't run into an error
04223          * setting that very same value. Also see the comment in
04224          * SetupConnection().
04225          */
04226         PQsetClientEncoding(clone->connection, encname);
04227     }
04228 
04229     /* Let the format-specific code have a chance too */
04230     (clone->ClonePtr) (clone);
04231 
04232     Assert(clone->connection != NULL);
04233     return clone;
04234 }
04235 
04236 /*
04237  * Release clone-local storage.
04238  *
04239  * Note: we assume any clone-local connection was already closed.
04240  */
04241 void
04242 DeCloneArchive(ArchiveHandle *AH)
04243 {
04244     /* Clear format-specific state */
04245     (AH->DeClonePtr) (AH);
04246 
04247     /* Clear state allocated by CloneArchive */
04248     if (AH->sqlparse.curCmd)
04249         destroyPQExpBuffer(AH->sqlparse.curCmd);
04250 
04251     /* Clear any connection-local state */
04252     if (AH->currUser)
04253         free(AH->currUser);
04254     if (AH->currSchema)
04255         free(AH->currSchema);
04256     if (AH->currTablespace)
04257         free(AH->currTablespace);
04258     if (AH->savedPassword)
04259         free(AH->savedPassword);
04260 
04261     free(AH);
04262 }