Header And Logo

PostgreSQL
| The world's most advanced open source database.

pg_backup_db.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * pg_backup_db.c
00004  *
00005  *  Implements the basic DB functions used by the archiver.
00006  *
00007  * IDENTIFICATION
00008  *    src/bin/pg_dump/pg_backup_db.c
00009  *
00010  *-------------------------------------------------------------------------
00011  */
00012 
00013 #include "pg_backup_db.h"
00014 #include "pg_backup_utils.h"
00015 #include "dumputils.h"
00016 #include "parallel.h"
00017 
00018 #include <unistd.h>
00019 #include <ctype.h>
00020 #ifdef HAVE_TERMIOS_H
00021 #include <termios.h>
00022 #endif
00023 
00024 
00025 #define DB_MAX_ERR_STMT 128
00026 
00027 /* translator: this is a module name */
00028 static const char *modulename = gettext_noop("archiver (db)");
00029 
00030 static void _check_database_version(ArchiveHandle *AH);
00031 static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser);
00032 static void notice_processor(void *arg, const char *message);
00033 
00034 static void
00035 _check_database_version(ArchiveHandle *AH)
00036 {
00037     const char *remoteversion_str;
00038     int         remoteversion;
00039 
00040     remoteversion_str = PQparameterStatus(AH->connection, "server_version");
00041     remoteversion = PQserverVersion(AH->connection);
00042     if (remoteversion == 0 || !remoteversion_str)
00043         exit_horribly(modulename, "could not get server_version from libpq\n");
00044 
00045     AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
00046     AH->public.remoteVersion = remoteversion;
00047     if (!AH->archiveRemoteVersion)
00048         AH->archiveRemoteVersion = AH->public.remoteVersionStr;
00049 
00050     if (remoteversion != PG_VERSION_NUM
00051         && (remoteversion < AH->public.minRemoteVersion ||
00052             remoteversion > AH->public.maxRemoteVersion))
00053     {
00054         write_msg(NULL, "server version: %s; %s version: %s\n",
00055                   remoteversion_str, progname, PG_VERSION);
00056         exit_horribly(NULL, "aborting because of server version mismatch\n");
00057     }
00058 }
00059 
00060 /*
00061  * Reconnect to the server.  If dbname is not NULL, use that database,
00062  * else the one associated with the archive handle.  If username is
00063  * not NULL, use that user name, else the one from the handle.  If
00064  * both the database and the user match the existing connection already,
00065  * nothing will be done.
00066  *
00067  * Returns 1 in any case.
00068  */
00069 int
00070 ReconnectToServer(ArchiveHandle *AH, const char *dbname, const char *username)
00071 {
00072     PGconn     *newConn;
00073     const char *newdbname;
00074     const char *newusername;
00075 
00076     if (!dbname)
00077         newdbname = PQdb(AH->connection);
00078     else
00079         newdbname = dbname;
00080 
00081     if (!username)
00082         newusername = PQuser(AH->connection);
00083     else
00084         newusername = username;
00085 
00086     /* Let's see if the request is already satisfied */
00087     if (strcmp(newdbname, PQdb(AH->connection)) == 0 &&
00088         strcmp(newusername, PQuser(AH->connection)) == 0)
00089         return 1;
00090 
00091     newConn = _connectDB(AH, newdbname, newusername);
00092 
00093     PQfinish(AH->connection);
00094     AH->connection = newConn;
00095 
00096     return 1;
00097 }
00098 
00099 /*
00100  * Connect to the db again.
00101  *
00102  * Note: it's not really all that sensible to use a single-entry password
00103  * cache if the username keeps changing.  In current usage, however, the
00104  * username never does change, so one savedPassword is sufficient.  We do
00105  * update the cache on the off chance that the password has changed since the
00106  * start of the run.
00107  */
00108 static PGconn *
00109 _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
00110 {
00111     PGconn     *newConn;
00112     const char *newdb;
00113     const char *newuser;
00114     char       *password = AH->savedPassword;
00115     bool        new_pass;
00116 
00117     if (!reqdb)
00118         newdb = PQdb(AH->connection);
00119     else
00120         newdb = reqdb;
00121 
00122     if (!requser || strlen(requser) == 0)
00123         newuser = PQuser(AH->connection);
00124     else
00125         newuser = requser;
00126 
00127     ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n",
00128           newdb, newuser);
00129 
00130     if (AH->promptPassword == TRI_YES && password == NULL)
00131     {
00132         password = simple_prompt("Password: ", 100, false);
00133         if (password == NULL)
00134             exit_horribly(modulename, "out of memory\n");
00135     }
00136 
00137     do
00138     {
00139 #define PARAMS_ARRAY_SIZE   7
00140         const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords));
00141         const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values));
00142 
00143         keywords[0] = "host";
00144         values[0] = PQhost(AH->connection);
00145         keywords[1] = "port";
00146         values[1] = PQport(AH->connection);
00147         keywords[2] = "user";
00148         values[2] = newuser;
00149         keywords[3] = "password";
00150         values[3] = password;
00151         keywords[4] = "dbname";
00152         values[4] = newdb;
00153         keywords[5] = "fallback_application_name";
00154         values[5] = progname;
00155         keywords[6] = NULL;
00156         values[6] = NULL;
00157 
00158         new_pass = false;
00159         newConn = PQconnectdbParams(keywords, values, true);
00160 
00161         free(keywords);
00162         free(values);
00163 
00164         if (!newConn)
00165             exit_horribly(modulename, "failed to reconnect to database\n");
00166 
00167         if (PQstatus(newConn) == CONNECTION_BAD)
00168         {
00169             if (!PQconnectionNeedsPassword(newConn))
00170                 exit_horribly(modulename, "could not reconnect to database: %s",
00171                               PQerrorMessage(newConn));
00172             PQfinish(newConn);
00173 
00174             if (password)
00175                 fprintf(stderr, "Password incorrect\n");
00176 
00177             fprintf(stderr, "Connecting to %s as %s\n",
00178                     newdb, newuser);
00179 
00180             if (password)
00181                 free(password);
00182 
00183             if (AH->promptPassword != TRI_NO)
00184                 password = simple_prompt("Password: ", 100, false);
00185             else
00186                 exit_horribly(modulename, "connection needs password\n");
00187 
00188             if (password == NULL)
00189                 exit_horribly(modulename, "out of memory\n");
00190             new_pass = true;
00191         }
00192     } while (new_pass);
00193 
00194     AH->savedPassword = password;
00195 
00196     /* check for version mismatch */
00197     _check_database_version(AH);
00198 
00199     PQsetNoticeProcessor(newConn, notice_processor, NULL);
00200 
00201     return newConn;
00202 }
00203 
00204 
00205 /*
00206  * Make a database connection with the given parameters.  The
00207  * connection handle is returned, the parameters are stored in AHX.
00208  * An interactive password prompt is automatically issued if required.
00209  *
00210  * Note: it's not really all that sensible to use a single-entry password
00211  * cache if the username keeps changing.  In current usage, however, the
00212  * username never does change, so one savedPassword is sufficient.
00213  */
00214 void
00215 ConnectDatabase(Archive *AHX,
00216                 const char *dbname,
00217                 const char *pghost,
00218                 const char *pgport,
00219                 const char *username,
00220                 enum trivalue prompt_password)
00221 {
00222     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00223     char       *password = AH->savedPassword;
00224     bool        new_pass;
00225 
00226     if (AH->connection)
00227         exit_horribly(modulename, "already connected to a database\n");
00228 
00229     if (prompt_password == TRI_YES && password == NULL)
00230     {
00231         password = simple_prompt("Password: ", 100, false);
00232         if (password == NULL)
00233             exit_horribly(modulename, "out of memory\n");
00234     }
00235     AH->promptPassword = prompt_password;
00236 
00237     /*
00238      * Start the connection.  Loop until we have a password if requested by
00239      * backend.
00240      */
00241     do
00242     {
00243 #define PARAMS_ARRAY_SIZE   7
00244         const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords));
00245         const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values));
00246 
00247         keywords[0] = "host";
00248         values[0] = pghost;
00249         keywords[1] = "port";
00250         values[1] = pgport;
00251         keywords[2] = "user";
00252         values[2] = username;
00253         keywords[3] = "password";
00254         values[3] = password;
00255         keywords[4] = "dbname";
00256         values[4] = dbname;
00257         keywords[5] = "fallback_application_name";
00258         values[5] = progname;
00259         keywords[6] = NULL;
00260         values[6] = NULL;
00261 
00262         new_pass = false;
00263         AH->connection = PQconnectdbParams(keywords, values, true);
00264 
00265         free(keywords);
00266         free(values);
00267 
00268         if (!AH->connection)
00269             exit_horribly(modulename, "failed to connect to database\n");
00270 
00271         if (PQstatus(AH->connection) == CONNECTION_BAD &&
00272             PQconnectionNeedsPassword(AH->connection) &&
00273             password == NULL &&
00274             prompt_password != TRI_NO)
00275         {
00276             PQfinish(AH->connection);
00277             password = simple_prompt("Password: ", 100, false);
00278             if (password == NULL)
00279                 exit_horribly(modulename, "out of memory\n");
00280             new_pass = true;
00281         }
00282     } while (new_pass);
00283 
00284     AH->savedPassword = password;
00285 
00286     /* check to see that the backend connection was successfully made */
00287     if (PQstatus(AH->connection) == CONNECTION_BAD)
00288         exit_horribly(modulename, "connection to database \"%s\" failed: %s",
00289                       PQdb(AH->connection) ? PQdb(AH->connection) : "",
00290                       PQerrorMessage(AH->connection));
00291 
00292     /* check for version mismatch */
00293     _check_database_version(AH);
00294 
00295     PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
00296 }
00297 
00298 /*
00299  * Close the connection to the database and also cancel off the query if we
00300  * have one running.
00301  */
00302 void
00303 DisconnectDatabase(Archive *AHX)
00304 {
00305     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00306     PGcancel   *cancel;
00307     char        errbuf[1];
00308 
00309     if (!AH->connection)
00310         return;
00311 
00312     if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
00313     {
00314         if ((cancel = PQgetCancel(AH->connection)))
00315         {
00316             PQcancel(cancel, errbuf, sizeof(errbuf));
00317             PQfreeCancel(cancel);
00318         }
00319     }
00320 
00321     PQfinish(AH->connection);
00322     AH->connection = NULL;
00323 }
00324 
00325 PGconn *
00326 GetConnection(Archive *AHX)
00327 {
00328     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00329 
00330     return AH->connection;
00331 }
00332 
00333 static void
00334 notice_processor(void *arg, const char *message)
00335 {
00336     write_msg(NULL, "%s", message);
00337 }
00338 
00339 /* Like exit_horribly(), but with a complaint about a particular query. */
00340 static void
00341 die_on_query_failure(ArchiveHandle *AH, const char *modulename, const char *query)
00342 {
00343     write_msg(modulename, "query failed: %s",
00344               PQerrorMessage(AH->connection));
00345     exit_horribly(modulename, "query was: %s\n", query);
00346 }
00347 
00348 void
00349 ExecuteSqlStatement(Archive *AHX, const char *query)
00350 {
00351     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00352     PGresult   *res;
00353 
00354     res = PQexec(AH->connection, query);
00355     if (PQresultStatus(res) != PGRES_COMMAND_OK)
00356         die_on_query_failure(AH, modulename, query);
00357     PQclear(res);
00358 }
00359 
00360 PGresult *
00361 ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
00362 {
00363     ArchiveHandle *AH = (ArchiveHandle *) AHX;
00364     PGresult   *res;
00365 
00366     res = PQexec(AH->connection, query);
00367     if (PQresultStatus(res) != status)
00368         die_on_query_failure(AH, modulename, query);
00369     return res;
00370 }
00371 
00372 /*
00373  * Convenience function to send a query.
00374  * Monitors result to detect COPY statements
00375  */
00376 static void
00377 ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
00378 {
00379     PGconn     *conn = AH->connection;
00380     PGresult   *res;
00381     char        errStmt[DB_MAX_ERR_STMT];
00382 
00383 #ifdef NOT_USED
00384     fprintf(stderr, "Executing: '%s'\n\n", qry);
00385 #endif
00386     res = PQexec(conn, qry);
00387 
00388     switch (PQresultStatus(res))
00389     {
00390         case PGRES_COMMAND_OK:
00391         case PGRES_TUPLES_OK:
00392         case PGRES_EMPTY_QUERY:
00393             /* A-OK */
00394             break;
00395         case PGRES_COPY_IN:
00396             /* Assume this is an expected result */
00397             AH->pgCopyIn = true;
00398             break;
00399         default:
00400             /* trouble */
00401             strncpy(errStmt, qry, DB_MAX_ERR_STMT);
00402             if (errStmt[DB_MAX_ERR_STMT - 1] != '\0')
00403             {
00404                 errStmt[DB_MAX_ERR_STMT - 4] = '.';
00405                 errStmt[DB_MAX_ERR_STMT - 3] = '.';
00406                 errStmt[DB_MAX_ERR_STMT - 2] = '.';
00407                 errStmt[DB_MAX_ERR_STMT - 1] = '\0';
00408             }
00409             warn_or_exit_horribly(AH, modulename, "%s: %s    Command was: %s\n",
00410                                   desc, PQerrorMessage(conn), errStmt);
00411             break;
00412     }
00413 
00414     PQclear(res);
00415 }
00416 
00417 
00418 /*
00419  * Process non-COPY table data (that is, INSERT commands).
00420  *
00421  * The commands have been run together as one long string for compressibility,
00422  * and we are receiving them in bufferloads with arbitrary boundaries, so we
00423  * have to locate command boundaries and save partial commands across calls.
00424  * All state must be kept in AH->sqlparse, not in local variables of this
00425  * routine.  We assume that AH->sqlparse was filled with zeroes when created.
00426  *
00427  * We have to lex the data to the extent of identifying literals and quoted
00428  * identifiers, so that we can recognize statement-terminating semicolons.
00429  * We assume that INSERT data will not contain SQL comments, E'' literals,
00430  * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
00431  */
00432 static void
00433 ExecuteInsertCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
00434 {
00435     const char *qry = buf;
00436     const char *eos = buf + bufLen;
00437 
00438     /* initialize command buffer if first time through */
00439     if (AH->sqlparse.curCmd == NULL)
00440         AH->sqlparse.curCmd = createPQExpBuffer();
00441 
00442     for (; qry < eos; qry++)
00443     {
00444         char        ch = *qry;
00445 
00446         /* For neatness, we skip any newlines between commands */
00447         if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
00448             appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
00449 
00450         switch (AH->sqlparse.state)
00451         {
00452             case SQL_SCAN:      /* Default state == 0, set in _allocAH */
00453                 if (ch == ';')
00454                 {
00455                     /*
00456                      * We've found the end of a statement. Send it and reset
00457                      * the buffer.
00458                      */
00459                     ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
00460                                       "could not execute query");
00461                     resetPQExpBuffer(AH->sqlparse.curCmd);
00462                 }
00463                 else if (ch == '\'')
00464                 {
00465                     AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
00466                     AH->sqlparse.backSlash = false;
00467                 }
00468                 else if (ch == '"')
00469                 {
00470                     AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
00471                 }
00472                 break;
00473 
00474             case SQL_IN_SINGLE_QUOTE:
00475                 /* We needn't handle '' specially */
00476                 if (ch == '\'' && !AH->sqlparse.backSlash)
00477                     AH->sqlparse.state = SQL_SCAN;
00478                 else if (ch == '\\' && !AH->public.std_strings)
00479                     AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
00480                 else
00481                     AH->sqlparse.backSlash = false;
00482                 break;
00483 
00484             case SQL_IN_DOUBLE_QUOTE:
00485                 /* We needn't handle "" specially */
00486                 if (ch == '"')
00487                     AH->sqlparse.state = SQL_SCAN;
00488                 break;
00489         }
00490     }
00491 }
00492 
00493 
00494 /*
00495  * Implement ahwrite() for direct-to-DB restore
00496  */
00497 int
00498 ExecuteSqlCommandBuf(ArchiveHandle *AH, const char *buf, size_t bufLen)
00499 {
00500     if (AH->outputKind == OUTPUT_COPYDATA)
00501     {
00502         /*
00503          * COPY data.
00504          *
00505          * We drop the data on the floor if libpq has failed to enter COPY
00506          * mode; this allows us to behave reasonably when trying to continue
00507          * after an error in a COPY command.
00508          */
00509         if (AH->pgCopyIn &&
00510             PQputCopyData(AH->connection, buf, bufLen) <= 0)
00511             exit_horribly(modulename, "error returned by PQputCopyData: %s",
00512                           PQerrorMessage(AH->connection));
00513     }
00514     else if (AH->outputKind == OUTPUT_OTHERDATA)
00515     {
00516         /*
00517          * Table data expressed as INSERT commands.
00518          */
00519         ExecuteInsertCommands(AH, buf, bufLen);
00520     }
00521     else
00522     {
00523         /*
00524          * General SQL commands; we assume that commands will not be split
00525          * across calls.
00526          *
00527          * In most cases the data passed to us will be a null-terminated
00528          * string, but if it's not, we have to add a trailing null.
00529          */
00530         if (buf[bufLen] == '\0')
00531             ExecuteSqlCommand(AH, buf, "could not execute query");
00532         else
00533         {
00534             char       *str = (char *) pg_malloc(bufLen + 1);
00535 
00536             memcpy(str, buf, bufLen);
00537             str[bufLen] = '\0';
00538             ExecuteSqlCommand(AH, str, "could not execute query");
00539             free(str);
00540         }
00541     }
00542 
00543     return 1;
00544 }
00545 
00546 /*
00547  * Terminate a COPY operation during direct-to-DB restore
00548  */
00549 void
00550 EndDBCopyMode(ArchiveHandle *AH, TocEntry *te)
00551 {
00552     if (AH->pgCopyIn)
00553     {
00554         PGresult   *res;
00555 
00556         if (PQputCopyEnd(AH->connection, NULL) <= 0)
00557             exit_horribly(modulename, "error returned by PQputCopyEnd: %s",
00558                           PQerrorMessage(AH->connection));
00559 
00560         /* Check command status and return to normal libpq state */
00561         res = PQgetResult(AH->connection);
00562         if (PQresultStatus(res) != PGRES_COMMAND_OK)
00563             warn_or_exit_horribly(AH, modulename, "COPY failed for table \"%s\": %s",
00564                                   te->tag, PQerrorMessage(AH->connection));
00565         PQclear(res);
00566 
00567         AH->pgCopyIn = false;
00568     }
00569 }
00570 
00571 void
00572 StartTransaction(ArchiveHandle *AH)
00573 {
00574     ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
00575 }
00576 
00577 void
00578 CommitTransaction(ArchiveHandle *AH)
00579 {
00580     ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
00581 }
00582 
00583 void
00584 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
00585 {
00586     /*
00587      * If we are not restoring to a direct database connection, we have to
00588      * guess about how to detect whether the blob exists.  Assume new-style.
00589      */
00590     if (AH->connection == NULL ||
00591         PQserverVersion(AH->connection) >= 90000)
00592     {
00593         ahprintf(AH,
00594                  "SELECT pg_catalog.lo_unlink(oid) "
00595                  "FROM pg_catalog.pg_largeobject_metadata "
00596                  "WHERE oid = '%u';\n",
00597                  oid);
00598     }
00599     else
00600     {
00601         /* Restoring to pre-9.0 server, so do it the old way */
00602         ahprintf(AH,
00603                  "SELECT CASE WHEN EXISTS("
00604                  "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
00605                  ") THEN pg_catalog.lo_unlink('%u') END;\n",
00606                  oid, oid);
00607     }
00608 }