Header And Logo

PostgreSQL
| The world's most advanced open source database.

compress_io.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * compress_io.c
00004  *   Routines for archivers to write an uncompressed or compressed data
00005  *   stream.
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * This file includes two APIs for dealing with compressed data. The first
00011  * provides more flexibility, using callbacks to read/write data from the
00012  * underlying stream. The second API is a wrapper around fopen/gzopen and
00013  * friends, providing an interface similar to those, but abstracts away
00014  * the possible compression. Both APIs use libz for the compression, but
00015  * the second API uses gzip headers, so the resulting files can be easily
00016  * manipulated with the gzip utility.
00017  *
00018  * Compressor API
00019  * --------------
00020  *
00021  *  The interface for writing to an archive consists of three functions:
00022  *  AllocateCompressor, WriteDataToArchive and EndCompressor. First you call
00023  *  AllocateCompressor, then write all the data by calling WriteDataToArchive
00024  *  as many times as needed, and finally EndCompressor. WriteDataToArchive
00025  *  and EndCompressor will call the WriteFunc that was provided to
00026  *  AllocateCompressor for each chunk of compressed data.
00027  *
00028  *  The interface for reading an archive consists of just one function:
00029  *  ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input
00030  *  stream, by repeatedly calling the given ReadFunc. ReadFunc returns the
00031  *  compressed data chunk at a time, and ReadDataFromArchive decompresses it
00032  *  and passes the decompressed data to ahwrite(), until ReadFunc returns 0
00033  *  to signal EOF.
00034  *
00035  *  The interface is the same for compressed and uncompressed streams.
00036  *
00037  * Compressed stream API
00038  * ----------------------
00039  *
00040  *  The compressed stream API is a wrapper around the C standard fopen() and
00041  *  libz's gzopen() APIs. It allows you to use the same functions for
00042  *  compressed and uncompressed streams. cfopen_read() first tries to open
00043  *  the file with given name, and if it fails, it tries to open the same
00044  *  file with the .gz suffix. cfopen_write() opens a file for writing, an
00045  *  extra argument specifies if the file should be compressed, and adds the
00046  *  .gz suffix to the filename if so. This allows you to easily handle both
00047  *  compressed and uncompressed files.
00048  *
00049  * IDENTIFICATION
00050  *     src/bin/pg_dump/compress_io.c
00051  *
00052  *-------------------------------------------------------------------------
00053  */
00054 
00055 #include "compress_io.h"
00056 #include "pg_backup_utils.h"
00057 #include "parallel.h"
00058 
00059 /*----------------------
00060  * Compressor API
00061  *----------------------
00062  */
00063 
00064 /* typedef appears in compress_io.h */
00065 struct CompressorState
00066 {
00067     CompressionAlgorithm comprAlg;
00068     WriteFunc   writeF;
00069 
00070 #ifdef HAVE_LIBZ
00071     z_streamp   zp;
00072     char       *zlibOut;
00073     size_t      zlibOutSize;
00074 #endif
00075 };
00076 
00077 /* translator: this is a module name */
00078 static const char *modulename = gettext_noop("compress_io");
00079 
00080 static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
00081                        int *level);
00082 
00083 /* Routines that support zlib compressed data I/O */
00084 #ifdef HAVE_LIBZ
00085 static void InitCompressorZlib(CompressorState *cs, int level);
00086 static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
00087                       bool flush);
00088 static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
00089 static size_t WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
00090                        const char *data, size_t dLen);
00091 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
00092 #endif
00093 
00094 /* Routines that support uncompressed data I/O */
00095 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
00096 static size_t WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
00097                        const char *data, size_t dLen);
00098 
00099 /*
00100  * Interprets a numeric 'compression' value. The algorithm implied by the
00101  * value (zlib or none at the moment), is returned in *alg, and the
00102  * zlib compression level in *level.
00103  */
00104 static void
00105 ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
00106 {
00107     if (compression == Z_DEFAULT_COMPRESSION ||
00108         (compression > 0 && compression <= 9))
00109         *alg = COMPR_ALG_LIBZ;
00110     else if (compression == 0)
00111         *alg = COMPR_ALG_NONE;
00112     else
00113     {
00114         exit_horribly(modulename, "invalid compression code: %d\n",
00115                       compression);
00116         *alg = COMPR_ALG_NONE;  /* keep compiler quiet */
00117     }
00118 
00119     /* The level is just the passed-in value. */
00120     if (level)
00121         *level = compression;
00122 }
00123 
00124 /* Public interface routines */
00125 
00126 /* Allocate a new compressor */
00127 CompressorState *
00128 AllocateCompressor(int compression, WriteFunc writeF)
00129 {
00130     CompressorState *cs;
00131     CompressionAlgorithm alg;
00132     int         level;
00133 
00134     ParseCompressionOption(compression, &alg, &level);
00135 
00136 #ifndef HAVE_LIBZ
00137     if (alg == COMPR_ALG_LIBZ)
00138         exit_horribly(modulename, "not built with zlib support\n");
00139 #endif
00140 
00141     cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
00142     cs->writeF = writeF;
00143     cs->comprAlg = alg;
00144 
00145     /*
00146      * Perform compression algorithm specific initialization.
00147      */
00148 #ifdef HAVE_LIBZ
00149     if (alg == COMPR_ALG_LIBZ)
00150         InitCompressorZlib(cs, level);
00151 #endif
00152 
00153     return cs;
00154 }
00155 
00156 /*
00157  * Read all compressed data from the input stream (via readF) and print it
00158  * out with ahwrite().
00159  */
00160 void
00161 ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
00162 {
00163     CompressionAlgorithm alg;
00164 
00165     ParseCompressionOption(compression, &alg, NULL);
00166 
00167     if (alg == COMPR_ALG_NONE)
00168         ReadDataFromArchiveNone(AH, readF);
00169     if (alg == COMPR_ALG_LIBZ)
00170     {
00171 #ifdef HAVE_LIBZ
00172         ReadDataFromArchiveZlib(AH, readF);
00173 #else
00174         exit_horribly(modulename, "not built with zlib support\n");
00175 #endif
00176     }
00177 }
00178 
00179 /*
00180  * Compress and write data to the output stream (via writeF).
00181  */
00182 size_t
00183 WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
00184                    const void *data, size_t dLen)
00185 {
00186     /* Are we aborting? */
00187     checkAborting(AH);
00188 
00189     switch (cs->comprAlg)
00190     {
00191         case COMPR_ALG_LIBZ:
00192 #ifdef HAVE_LIBZ
00193             return WriteDataToArchiveZlib(AH, cs, data, dLen);
00194 #else
00195             exit_horribly(modulename, "not built with zlib support\n");
00196 #endif
00197         case COMPR_ALG_NONE:
00198             return WriteDataToArchiveNone(AH, cs, data, dLen);
00199     }
00200     return 0;                   /* keep compiler quiet */
00201 }
00202 
00203 /*
00204  * Terminate compression library context and flush its buffers.
00205  */
00206 void
00207 EndCompressor(ArchiveHandle *AH, CompressorState *cs)
00208 {
00209 #ifdef HAVE_LIBZ
00210     if (cs->comprAlg == COMPR_ALG_LIBZ)
00211         EndCompressorZlib(AH, cs);
00212 #endif
00213     free(cs);
00214 }
00215 
00216 /* Private routines, specific to each compression method. */
00217 
00218 #ifdef HAVE_LIBZ
00219 /*
00220  * Functions for zlib compressed output.
00221  */
00222 
00223 static void
00224 InitCompressorZlib(CompressorState *cs, int level)
00225 {
00226     z_streamp   zp;
00227 
00228     zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
00229     zp->zalloc = Z_NULL;
00230     zp->zfree = Z_NULL;
00231     zp->opaque = Z_NULL;
00232 
00233     /*
00234      * zlibOutSize is the buffer size we tell zlib it can output to.  We
00235      * actually allocate one extra byte because some routines want to append a
00236      * trailing zero byte to the zlib output.
00237      */
00238     cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
00239     cs->zlibOutSize = ZLIB_OUT_SIZE;
00240 
00241     if (deflateInit(zp, level) != Z_OK)
00242         exit_horribly(modulename,
00243                       "could not initialize compression library: %s\n",
00244                       zp->msg);
00245 
00246     /* Just be paranoid - maybe End is called after Start, with no Write */
00247     zp->next_out = (void *) cs->zlibOut;
00248     zp->avail_out = cs->zlibOutSize;
00249 }
00250 
00251 static void
00252 EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
00253 {
00254     z_streamp   zp = cs->zp;
00255 
00256     zp->next_in = NULL;
00257     zp->avail_in = 0;
00258 
00259     /* Flush any remaining data from zlib buffer */
00260     DeflateCompressorZlib(AH, cs, true);
00261 
00262     if (deflateEnd(zp) != Z_OK)
00263         exit_horribly(modulename,
00264                       "could not close compression stream: %s\n", zp->msg);
00265 
00266     free(cs->zlibOut);
00267     free(cs->zp);
00268 }
00269 
00270 static void
00271 DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
00272 {
00273     z_streamp   zp = cs->zp;
00274     char       *out = cs->zlibOut;
00275     int         res = Z_OK;
00276 
00277     while (cs->zp->avail_in != 0 || flush)
00278     {
00279         res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
00280         if (res == Z_STREAM_ERROR)
00281             exit_horribly(modulename,
00282                           "could not compress data: %s\n", zp->msg);
00283         if ((flush && (zp->avail_out < cs->zlibOutSize))
00284             || (zp->avail_out == 0)
00285             || (zp->avail_in != 0)
00286             )
00287         {
00288             /*
00289              * Extra paranoia: avoid zero-length chunks, since a zero length
00290              * chunk is the EOF marker in the custom format. This should never
00291              * happen but...
00292              */
00293             if (zp->avail_out < cs->zlibOutSize)
00294             {
00295                 /*
00296                  * Any write function shoud do its own error checking but to
00297                  * make sure we do a check here as well...
00298                  */
00299                 size_t      len = cs->zlibOutSize - zp->avail_out;
00300 
00301                 if (cs->writeF(AH, out, len) != len)
00302                     exit_horribly(modulename,
00303                                   "could not write to output file: %s\n",
00304                                   strerror(errno));
00305             }
00306             zp->next_out = (void *) out;
00307             zp->avail_out = cs->zlibOutSize;
00308         }
00309 
00310         if (res == Z_STREAM_END)
00311             break;
00312     }
00313 }
00314 
00315 static size_t
00316 WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
00317                        const char *data, size_t dLen)
00318 {
00319     cs->zp->next_in = (void *) data;
00320     cs->zp->avail_in = dLen;
00321     DeflateCompressorZlib(AH, cs, false);
00322 
00323     /*
00324      * we have either succeeded in writing dLen bytes or we have called
00325      * exit_horribly()
00326      */
00327     return dLen;
00328 }
00329 
00330 static void
00331 ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
00332 {
00333     z_streamp   zp;
00334     char       *out;
00335     int         res = Z_OK;
00336     size_t      cnt;
00337     char       *buf;
00338     size_t      buflen;
00339 
00340     zp = (z_streamp) pg_malloc(sizeof(z_stream));
00341     zp->zalloc = Z_NULL;
00342     zp->zfree = Z_NULL;
00343     zp->opaque = Z_NULL;
00344 
00345     buf = pg_malloc(ZLIB_IN_SIZE);
00346     buflen = ZLIB_IN_SIZE;
00347 
00348     out = pg_malloc(ZLIB_OUT_SIZE + 1);
00349 
00350     if (inflateInit(zp) != Z_OK)
00351         exit_horribly(modulename,
00352                       "could not initialize compression library: %s\n",
00353                       zp->msg);
00354 
00355     /* no minimal chunk size for zlib */
00356     while ((cnt = readF(AH, &buf, &buflen)))
00357     {
00358         /* Are we aborting? */
00359         checkAborting(AH);
00360 
00361         zp->next_in = (void *) buf;
00362         zp->avail_in = cnt;
00363 
00364         while (zp->avail_in > 0)
00365         {
00366             zp->next_out = (void *) out;
00367             zp->avail_out = ZLIB_OUT_SIZE;
00368 
00369             res = inflate(zp, 0);
00370             if (res != Z_OK && res != Z_STREAM_END)
00371                 exit_horribly(modulename,
00372                               "could not uncompress data: %s\n", zp->msg);
00373 
00374             out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
00375             ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
00376         }
00377     }
00378 
00379     zp->next_in = NULL;
00380     zp->avail_in = 0;
00381     while (res != Z_STREAM_END)
00382     {
00383         zp->next_out = (void *) out;
00384         zp->avail_out = ZLIB_OUT_SIZE;
00385         res = inflate(zp, 0);
00386         if (res != Z_OK && res != Z_STREAM_END)
00387             exit_horribly(modulename,
00388                           "could not uncompress data: %s\n", zp->msg);
00389 
00390         out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
00391         ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
00392     }
00393 
00394     if (inflateEnd(zp) != Z_OK)
00395         exit_horribly(modulename,
00396                       "could not close compression library: %s\n", zp->msg);
00397 
00398     free(buf);
00399     free(out);
00400     free(zp);
00401 }
00402 #endif   /* HAVE_LIBZ */
00403 
00404 
00405 /*
00406  * Functions for uncompressed output.
00407  */
00408 
00409 static void
00410 ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
00411 {
00412     size_t      cnt;
00413     char       *buf;
00414     size_t      buflen;
00415 
00416     buf = pg_malloc(ZLIB_OUT_SIZE);
00417     buflen = ZLIB_OUT_SIZE;
00418 
00419     while ((cnt = readF(AH, &buf, &buflen)))
00420     {
00421         /* Are we aborting? */
00422         checkAborting(AH);
00423 
00424         ahwrite(buf, 1, cnt, AH);
00425     }
00426 
00427     free(buf);
00428 }
00429 
00430 static size_t
00431 WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
00432                        const char *data, size_t dLen)
00433 {
00434     /*
00435      * Any write function should do its own error checking but to make sure we
00436      * do a check here as well...
00437      */
00438     if (cs->writeF(AH, data, dLen) != dLen)
00439         exit_horribly(modulename,
00440                       "could not write to output file: %s\n",
00441                       strerror(errno));
00442     return dLen;
00443 }
00444 
00445 
00446 /*----------------------
00447  * Compressed stream API
00448  *----------------------
00449  */
00450 
00451 /*
00452  * cfp represents an open stream, wrapping the underlying FILE or gzFile
00453  * pointer. This is opaque to the callers.
00454  */
00455 struct cfp
00456 {
00457     FILE       *uncompressedfp;
00458 #ifdef HAVE_LIBZ
00459     gzFile      compressedfp;
00460 #endif
00461 };
00462 
00463 #ifdef HAVE_LIBZ
00464 static int  hasSuffix(const char *filename, const char *suffix);
00465 #endif
00466 
00467 /*
00468  * Open a file for reading. 'path' is the file to open, and 'mode' should
00469  * be either "r" or "rb".
00470  *
00471  * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path'
00472  * doesn't already have it) and try again. So if you pass "foo" as 'path',
00473  * this will open either "foo" or "foo.gz".
00474  */
00475 cfp *
00476 cfopen_read(const char *path, const char *mode)
00477 {
00478     cfp        *fp;
00479 
00480 #ifdef HAVE_LIBZ
00481     if (hasSuffix(path, ".gz"))
00482         fp = cfopen(path, mode, 1);
00483     else
00484 #endif
00485     {
00486         fp = cfopen(path, mode, 0);
00487 #ifdef HAVE_LIBZ
00488         if (fp == NULL)
00489         {
00490             int         fnamelen = strlen(path) + 4;
00491             char       *fname = pg_malloc(fnamelen);
00492 
00493             snprintf(fname, fnamelen, "%s%s", path, ".gz");
00494             fp = cfopen(fname, mode, 1);
00495             free(fname);
00496         }
00497 #endif
00498     }
00499     return fp;
00500 }
00501 
00502 /*
00503  * Open a file for writing. 'path' indicates the path name, and 'mode' must
00504  * be a filemode as accepted by fopen() and gzopen() that indicates writing
00505  * ("w", "wb", "a", or "ab").
00506  *
00507  * If 'compression' is non-zero, a gzip compressed stream is opened, and
00508  * and 'compression' indicates the compression level used. The ".gz" suffix
00509  * is automatically added to 'path' in that case.
00510  */
00511 cfp *
00512 cfopen_write(const char *path, const char *mode, int compression)
00513 {
00514     cfp        *fp;
00515 
00516     if (compression == 0)
00517         fp = cfopen(path, mode, 0);
00518     else
00519     {
00520 #ifdef HAVE_LIBZ
00521         int         fnamelen = strlen(path) + 4;
00522         char       *fname = pg_malloc(fnamelen);
00523 
00524         snprintf(fname, fnamelen, "%s%s", path, ".gz");
00525         fp = cfopen(fname, mode, 1);
00526         free(fname);
00527 #else
00528         exit_horribly(modulename, "not built with zlib support\n");
00529         fp = NULL;              /* keep compiler quiet */
00530 #endif
00531     }
00532     return fp;
00533 }
00534 
00535 /*
00536  * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file
00537  * is opened with libz gzopen(), otherwise with plain fopen()
00538  */
00539 cfp *
00540 cfopen(const char *path, const char *mode, int compression)
00541 {
00542     cfp        *fp = pg_malloc(sizeof(cfp));
00543 
00544     if (compression != 0)
00545     {
00546 #ifdef HAVE_LIBZ
00547         fp->compressedfp = gzopen(path, mode);
00548         fp->uncompressedfp = NULL;
00549         if (fp->compressedfp == NULL)
00550         {
00551             free(fp);
00552             fp = NULL;
00553         }
00554 #else
00555         exit_horribly(modulename, "not built with zlib support\n");
00556 #endif
00557     }
00558     else
00559     {
00560 #ifdef HAVE_LIBZ
00561         fp->compressedfp = NULL;
00562 #endif
00563         fp->uncompressedfp = fopen(path, mode);
00564         if (fp->uncompressedfp == NULL)
00565         {
00566             free(fp);
00567             fp = NULL;
00568         }
00569     }
00570 
00571     return fp;
00572 }
00573 
00574 
00575 int
00576 cfread(void *ptr, int size, cfp *fp)
00577 {
00578 #ifdef HAVE_LIBZ
00579     if (fp->compressedfp)
00580         return gzread(fp->compressedfp, ptr, size);
00581     else
00582 #endif
00583         return fread(ptr, 1, size, fp->uncompressedfp);
00584 }
00585 
00586 int
00587 cfwrite(const void *ptr, int size, cfp *fp)
00588 {
00589 #ifdef HAVE_LIBZ
00590     if (fp->compressedfp)
00591         return gzwrite(fp->compressedfp, ptr, size);
00592     else
00593 #endif
00594         return fwrite(ptr, 1, size, fp->uncompressedfp);
00595 }
00596 
00597 int
00598 cfgetc(cfp *fp)
00599 {
00600 #ifdef HAVE_LIBZ
00601     if (fp->compressedfp)
00602         return gzgetc(fp->compressedfp);
00603     else
00604 #endif
00605         return fgetc(fp->uncompressedfp);
00606 }
00607 
00608 char *
00609 cfgets(cfp *fp, char *buf, int len)
00610 {
00611 #ifdef HAVE_LIBZ
00612     if (fp->compressedfp)
00613         return gzgets(fp->compressedfp, buf, len);
00614     else
00615 #endif
00616         return fgets(buf, len, fp->uncompressedfp);
00617 }
00618 
00619 int
00620 cfclose(cfp *fp)
00621 {
00622     int         result;
00623 
00624     if (fp == NULL)
00625     {
00626         errno = EBADF;
00627         return EOF;
00628     }
00629 #ifdef HAVE_LIBZ
00630     if (fp->compressedfp)
00631     {
00632         result = gzclose(fp->compressedfp);
00633         fp->compressedfp = NULL;
00634     }
00635     else
00636 #endif
00637     {
00638         result = fclose(fp->uncompressedfp);
00639         fp->uncompressedfp = NULL;
00640     }
00641     free(fp);
00642 
00643     return result;
00644 }
00645 
00646 int
00647 cfeof(cfp *fp)
00648 {
00649 #ifdef HAVE_LIBZ
00650     if (fp->compressedfp)
00651         return gzeof(fp->compressedfp);
00652     else
00653 #endif
00654         return feof(fp->uncompressedfp);
00655 }
00656 
00657 #ifdef HAVE_LIBZ
00658 static int
00659 hasSuffix(const char *filename, const char *suffix)
00660 {
00661     int         filenamelen = strlen(filename);
00662     int         suffixlen = strlen(suffix);
00663 
00664     if (filenamelen < suffixlen)
00665         return 0;
00666 
00667     return memcmp(&filename[filenamelen - suffixlen],
00668                   suffix,
00669                   suffixlen) == 0;
00670 }
00671 
00672 #endif