00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "compress_io.h"
00037 #include "pg_backup_utils.h"
00038 #include "parallel.h"
00039
00040 #include <dirent.h>
00041 #include <sys/stat.h>
00042
00043 typedef struct
00044 {
00045
00046
00047
00048
00049 char *directory;
00050
00051 cfp *dataFH;
00052
00053 cfp *blobsTocFH;
00054 ParallelState *pstate;
00055 } lclContext;
00056
00057 typedef struct
00058 {
00059 char *filename;
00060 } lclTocEntry;
00061
00062
00063 static const char *modulename = gettext_noop("directory archiver");
00064
00065
00066 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
00067 static void _StartData(ArchiveHandle *AH, TocEntry *te);
00068 static void _EndData(ArchiveHandle *AH, TocEntry *te);
00069 static size_t _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
00070 static int _WriteByte(ArchiveHandle *AH, const int i);
00071 static int _ReadByte(ArchiveHandle *);
00072 static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
00073 static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
00074 static void _CloseArchive(ArchiveHandle *AH);
00075 static void _ReopenArchive(ArchiveHandle *AH);
00076 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
00077
00078 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
00079 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
00080 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
00081
00082 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
00083 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
00084 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
00085 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
00086 static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
00087
00088 static void _Clone(ArchiveHandle *AH);
00089 static void _DeClone(ArchiveHandle *AH);
00090
00091 static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
00092 static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
00093 const char *str, T_Action act);
00094 static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
00095 static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
00096
00097 static void setFilePath(ArchiveHandle *AH, char *buf,
00098 const char *relativeFilename);
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110 void
00111 InitArchiveFmt_Directory(ArchiveHandle *AH)
00112 {
00113 lclContext *ctx;
00114
00115
00116 AH->ArchiveEntryPtr = _ArchiveEntry;
00117 AH->StartDataPtr = _StartData;
00118 AH->WriteDataPtr = _WriteData;
00119 AH->EndDataPtr = _EndData;
00120 AH->WriteBytePtr = _WriteByte;
00121 AH->ReadBytePtr = _ReadByte;
00122 AH->WriteBufPtr = _WriteBuf;
00123 AH->ReadBufPtr = _ReadBuf;
00124 AH->ClosePtr = _CloseArchive;
00125 AH->ReopenPtr = _ReopenArchive;
00126 AH->PrintTocDataPtr = _PrintTocData;
00127 AH->ReadExtraTocPtr = _ReadExtraToc;
00128 AH->WriteExtraTocPtr = _WriteExtraToc;
00129 AH->PrintExtraTocPtr = _PrintExtraToc;
00130
00131 AH->StartBlobsPtr = _StartBlobs;
00132 AH->StartBlobPtr = _StartBlob;
00133 AH->EndBlobPtr = _EndBlob;
00134 AH->EndBlobsPtr = _EndBlobs;
00135
00136 AH->ClonePtr = _Clone;
00137 AH->DeClonePtr = _DeClone;
00138
00139 AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
00140 AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
00141
00142 AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
00143 AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
00144
00145
00146 ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
00147 AH->formatData = (void *) ctx;
00148
00149 ctx->dataFH = NULL;
00150 ctx->blobsTocFH = NULL;
00151
00152
00153 AH->lo_buf_size = LOBBUFSIZE;
00154 AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
00155
00156
00157
00158
00159
00160 if (!AH->fSpec || strcmp(AH->fSpec, "") == 0)
00161 exit_horribly(modulename, "no output directory specified\n");
00162
00163 ctx->directory = AH->fSpec;
00164
00165 if (AH->mode == archModeWrite)
00166 {
00167 struct stat st;
00168 bool is_empty = false;
00169
00170
00171 if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
00172 {
00173 DIR *dir = opendir(ctx->directory);
00174
00175 if (dir)
00176 {
00177 struct dirent *d;
00178
00179 is_empty = true;
00180 while ((d = readdir(dir)))
00181 {
00182 if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
00183 {
00184 is_empty = false;
00185 break;
00186 }
00187 }
00188 closedir(dir);
00189 }
00190 }
00191
00192 if (!is_empty && mkdir(ctx->directory, 0700) < 0)
00193 exit_horribly(modulename, "could not create directory \"%s\": %s\n",
00194 ctx->directory, strerror(errno));
00195 }
00196 else
00197 {
00198 char fname[MAXPGPATH];
00199 cfp *tocFH;
00200
00201 setFilePath(AH, fname, "toc.dat");
00202
00203 tocFH = cfopen_read(fname, PG_BINARY_R);
00204 if (tocFH == NULL)
00205 exit_horribly(modulename,
00206 "could not open input file \"%s\": %s\n",
00207 fname, strerror(errno));
00208
00209 ctx->dataFH = tocFH;
00210
00211
00212
00213
00214
00215 AH->format = archTar;
00216 ReadHead(AH);
00217 AH->format = archDirectory;
00218 ReadToc(AH);
00219
00220
00221 if (cfclose(tocFH) != 0)
00222 exit_horribly(modulename, "could not close TOC file: %s\n",
00223 strerror(errno));
00224 ctx->dataFH = NULL;
00225 }
00226 }
00227
00228
00229
00230
00231
00232
00233 static void
00234 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
00235 {
00236 lclTocEntry *tctx;
00237 char fn[MAXPGPATH];
00238
00239 tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
00240 if (te->dataDumper)
00241 {
00242 snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
00243 tctx->filename = pg_strdup(fn);
00244 }
00245 else if (strcmp(te->desc, "BLOBS") == 0)
00246 tctx->filename = pg_strdup("blobs.toc");
00247 else
00248 tctx->filename = NULL;
00249
00250 te->formatData = (void *) tctx;
00251 }
00252
00253
00254
00255
00256
00257
00258
00259
00260 static void
00261 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
00262 {
00263 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00264
00265
00266
00267
00268
00269 if (tctx->filename)
00270 WriteStr(AH, tctx->filename);
00271 else
00272 WriteStr(AH, "");
00273 }
00274
00275
00276
00277
00278
00279
00280
00281 static void
00282 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
00283 {
00284 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00285
00286 if (tctx == NULL)
00287 {
00288 tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
00289 te->formatData = (void *) tctx;
00290 }
00291
00292 tctx->filename = ReadStr(AH);
00293 if (strlen(tctx->filename) == 0)
00294 {
00295 free(tctx->filename);
00296 tctx->filename = NULL;
00297 }
00298 }
00299
00300
00301
00302
00303
00304 static void
00305 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
00306 {
00307 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00308
00309 if (AH->public.verbose && tctx->filename)
00310 ahprintf(AH, "-- File: %s\n", tctx->filename);
00311 }
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322 static void
00323 _StartData(ArchiveHandle *AH, TocEntry *te)
00324 {
00325 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00326 lclContext *ctx = (lclContext *) AH->formatData;
00327 char fname[MAXPGPATH];
00328
00329 setFilePath(AH, fname, tctx->filename);
00330
00331 ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
00332 if (ctx->dataFH == NULL)
00333 exit_horribly(modulename, "could not open output file \"%s\": %s\n",
00334 fname, strerror(errno));
00335 }
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346 static size_t
00347 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
00348 {
00349 lclContext *ctx = (lclContext *) AH->formatData;
00350
00351 if (dLen == 0)
00352 return 0;
00353
00354
00355 checkAborting(AH);
00356
00357 return cfwrite(data, dLen, ctx->dataFH);
00358 }
00359
00360
00361
00362
00363
00364
00365
00366 static void
00367 _EndData(ArchiveHandle *AH, TocEntry *te)
00368 {
00369 lclContext *ctx = (lclContext *) AH->formatData;
00370
00371
00372 cfclose(ctx->dataFH);
00373
00374 ctx->dataFH = NULL;
00375 }
00376
00377
00378
00379
00380 static void
00381 _PrintFileData(ArchiveHandle *AH, char *filename, RestoreOptions *ropt)
00382 {
00383 size_t cnt;
00384 char *buf;
00385 size_t buflen;
00386 cfp *cfp;
00387
00388 if (!filename)
00389 return;
00390
00391 cfp = cfopen_read(filename, PG_BINARY_R);
00392
00393 if (!cfp)
00394 exit_horribly(modulename, "could not open input file \"%s\": %s\n",
00395 filename, strerror(errno));
00396
00397 buf = pg_malloc(ZLIB_OUT_SIZE);
00398 buflen = ZLIB_OUT_SIZE;
00399
00400 while ((cnt = cfread(buf, buflen, cfp)))
00401 ahwrite(buf, 1, cnt, AH);
00402
00403 free(buf);
00404 if (cfclose(cfp) !=0)
00405 exit_horribly(modulename, "could not close data file: %s\n",
00406 strerror(errno));
00407 }
00408
00409
00410
00411
00412 static void
00413 _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
00414 {
00415 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00416
00417 if (!tctx->filename)
00418 return;
00419
00420 if (strcmp(te->desc, "BLOBS") == 0)
00421 _LoadBlobs(AH, ropt);
00422 else
00423 {
00424 char fname[MAXPGPATH];
00425
00426 setFilePath(AH, fname, tctx->filename);
00427 _PrintFileData(AH, fname, ropt);
00428 }
00429 }
00430
00431 static void
00432 _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
00433 {
00434 Oid oid;
00435 lclContext *ctx = (lclContext *) AH->formatData;
00436 char fname[MAXPGPATH];
00437 char line[MAXPGPATH];
00438
00439 StartRestoreBlobs(AH);
00440
00441 setFilePath(AH, fname, "blobs.toc");
00442
00443 ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
00444
00445 if (ctx->blobsTocFH == NULL)
00446 exit_horribly(modulename, "could not open large object TOC file \"%s\" for input: %s\n",
00447 fname, strerror(errno));
00448
00449
00450 while ((cfgets(ctx->blobsTocFH, line, MAXPGPATH)) != NULL)
00451 {
00452 char fname[MAXPGPATH];
00453 char path[MAXPGPATH];
00454
00455 if (sscanf(line, "%u %s\n", &oid, fname) != 2)
00456 exit_horribly(modulename, "invalid line in large object TOC file \"%s\": \"%s\"\n",
00457 fname, line);
00458
00459 StartRestoreBlob(AH, oid, ropt->dropSchema);
00460 snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, fname);
00461 _PrintFileData(AH, path, ropt);
00462 EndRestoreBlob(AH, oid);
00463 }
00464 if (!cfeof(ctx->blobsTocFH))
00465 exit_horribly(modulename, "error reading large object TOC file \"%s\"\n",
00466 fname);
00467
00468 if (cfclose(ctx->blobsTocFH) != 0)
00469 exit_horribly(modulename, "could not close large object TOC file \"%s\": %s\n",
00470 fname, strerror(errno));
00471
00472 ctx->blobsTocFH = NULL;
00473
00474 EndRestoreBlobs(AH);
00475 }
00476
00477
00478
00479
00480
00481
00482
00483 static int
00484 _WriteByte(ArchiveHandle *AH, const int i)
00485 {
00486 unsigned char c = (unsigned char) i;
00487 lclContext *ctx = (lclContext *) AH->formatData;
00488
00489 if (cfwrite(&c, 1, ctx->dataFH) != 1)
00490 exit_horribly(modulename, "could not write byte\n");
00491
00492 return 1;
00493 }
00494
00495
00496
00497
00498
00499
00500
00501 static int
00502 _ReadByte(ArchiveHandle *AH)
00503 {
00504 lclContext *ctx = (lclContext *) AH->formatData;
00505 int res;
00506
00507 res = cfgetc(ctx->dataFH);
00508 if (res == EOF)
00509 exit_horribly(modulename, "unexpected end of file\n");
00510
00511 return res;
00512 }
00513
00514
00515
00516
00517
00518 static size_t
00519 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
00520 {
00521 lclContext *ctx = (lclContext *) AH->formatData;
00522 size_t res;
00523
00524
00525 checkAborting(AH);
00526
00527 res = cfwrite(buf, len, ctx->dataFH);
00528 if (res != len)
00529 exit_horribly(modulename, "could not write to output file: %s\n",
00530 strerror(errno));
00531
00532 return res;
00533 }
00534
00535
00536
00537
00538
00539
00540 static size_t
00541 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
00542 {
00543 lclContext *ctx = (lclContext *) AH->formatData;
00544 size_t res;
00545
00546 res = cfread(buf, len, ctx->dataFH);
00547
00548 return res;
00549 }
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563 static void
00564 _CloseArchive(ArchiveHandle *AH)
00565 {
00566 lclContext *ctx = (lclContext *) AH->formatData;
00567
00568 if (AH->mode == archModeWrite)
00569 {
00570 cfp *tocFH;
00571 char fname[MAXPGPATH];
00572
00573 setFilePath(AH, fname, "toc.dat");
00574
00575
00576 ctx->pstate = ParallelBackupStart(AH, NULL);
00577
00578
00579 tocFH = cfopen_write(fname, PG_BINARY_W, 0);
00580 if (tocFH == NULL)
00581 exit_horribly(modulename, "could not open output file \"%s\": %s\n",
00582 fname, strerror(errno));
00583 ctx->dataFH = tocFH;
00584
00585
00586
00587
00588
00589
00590 AH->format = archTar;
00591 WriteHead(AH);
00592 AH->format = archDirectory;
00593 WriteToc(AH);
00594 if (cfclose(tocFH) != 0)
00595 exit_horribly(modulename, "could not close TOC file: %s\n",
00596 strerror(errno));
00597 WriteDataChunks(AH, ctx->pstate);
00598
00599 ParallelBackupEnd(AH, ctx->pstate);
00600 }
00601 AH->FH = NULL;
00602 }
00603
00604
00605
00606
00607 static void
00608 _ReopenArchive(ArchiveHandle *AH)
00609 {
00610
00611
00612
00613
00614
00615 }
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628 static void
00629 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
00630 {
00631 lclContext *ctx = (lclContext *) AH->formatData;
00632 char fname[MAXPGPATH];
00633
00634 setFilePath(AH, fname, "blobs.toc");
00635
00636
00637 ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
00638 if (ctx->blobsTocFH == NULL)
00639 exit_horribly(modulename, "could not open output file \"%s\": %s\n",
00640 fname, strerror(errno));
00641 }
00642
00643
00644
00645
00646
00647
00648 static void
00649 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
00650 {
00651 lclContext *ctx = (lclContext *) AH->formatData;
00652 char fname[MAXPGPATH];
00653
00654 snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
00655
00656 ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
00657
00658 if (ctx->dataFH == NULL)
00659 exit_horribly(modulename, "could not open output file \"%s\": %s\n",
00660 fname, strerror(errno));
00661 }
00662
00663
00664
00665
00666
00667
00668 static void
00669 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
00670 {
00671 lclContext *ctx = (lclContext *) AH->formatData;
00672 char buf[50];
00673 int len;
00674
00675
00676 cfclose(ctx->dataFH);
00677 ctx->dataFH = NULL;
00678
00679
00680 len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid);
00681 if (cfwrite(buf, len, ctx->blobsTocFH) != len)
00682 exit_horribly(modulename, "could not write to blobs TOC file\n");
00683 }
00684
00685
00686
00687
00688
00689
00690 static void
00691 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
00692 {
00693 lclContext *ctx = (lclContext *) AH->formatData;
00694
00695 cfclose(ctx->blobsTocFH);
00696 ctx->blobsTocFH = NULL;
00697 }
00698
00699
00700
00701
00702
00703
00704
00705 static void
00706 setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
00707 {
00708 lclContext *ctx = (lclContext *) AH->formatData;
00709 char *dname;
00710
00711 dname = ctx->directory;
00712
00713 if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
00714 exit_horribly(modulename, "file name too long: \"%s\"\n", dname);
00715
00716 strcpy(buf, dname);
00717 strcat(buf, "/");
00718 strcat(buf, relativeFilename);
00719 }
00720
00721
00722
00723
00724 static void
00725 _Clone(ArchiveHandle *AH)
00726 {
00727 lclContext *ctx = (lclContext *) AH->formatData;
00728
00729 AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
00730 memcpy(AH->formatData, ctx, sizeof(lclContext));
00731 ctx = (lclContext *) AH->formatData;
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744 }
00745
00746 static void
00747 _DeClone(ArchiveHandle *AH)
00748 {
00749 lclContext *ctx = (lclContext *) AH->formatData;
00750
00751 free(ctx);
00752 }
00753
00754
00755
00756
00757
00758
00759 static char *
00760 _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
00761 {
00762
00763
00764
00765
00766 static char buf[64];
00767
00768 if (act == ACT_DUMP)
00769 snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
00770 else if (act == ACT_RESTORE)
00771 snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
00772
00773 return buf;
00774 }
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787 static char *
00788 _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
00789 {
00790
00791
00792
00793
00794 const int buflen = 64;
00795 char *buf = (char *) pg_malloc(buflen);
00796 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
00797
00798
00799 if (!tctx)
00800 exit_horribly(modulename, "Error during backup\n");
00801
00802
00803
00804
00805
00806
00807 WriteDataChunksForTocEntry(AH, te);
00808
00809 snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
00810
00811 return buf;
00812 }
00813
00814
00815
00816
00817
00818 static char *
00819 _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
00820 {
00821
00822
00823
00824
00825 const int buflen = 64;
00826 char *buf = (char *) pg_malloc(buflen);
00827 ParallelArgs pargs;
00828 int status;
00829
00830 pargs.AH = AH;
00831 pargs.te = te;
00832
00833 status = parallel_restore(&pargs);
00834
00835 snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
00836 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
00837
00838 return buf;
00839 }
00840
00841
00842
00843
00844
00845
00846 static int
00847 _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
00848 {
00849 DumpId dumpId;
00850 int nBytes,
00851 n_errors;
00852 int status = 0;
00853
00854 if (act == ACT_DUMP)
00855 {
00856 sscanf(str, "%u%n", &dumpId, &nBytes);
00857
00858 Assert(dumpId == te->dumpId);
00859 Assert(nBytes == strlen(str));
00860 }
00861 else if (act == ACT_RESTORE)
00862 {
00863 sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
00864
00865 Assert(dumpId == te->dumpId);
00866 Assert(nBytes == strlen(str));
00867
00868 AH->public.n_errors += n_errors;
00869 }
00870
00871 return status;
00872 }