00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "pg_backup_db.h"
00014 #include "pg_backup_utils.h"
00015 #include "dumputils.h"
00016 #include "parallel.h"
00017
00018 #include <unistd.h>
00019 #include <ctype.h>
00020 #ifdef HAVE_TERMIOS_H
00021 #include <termios.h>
00022 #endif
00023
00024
00025 #define DB_MAX_ERR_STMT 128
00026
00027
00028 static const char *modulename = gettext_noop("archiver (db)");
00029
00030 static void _check_database_version(ArchiveHandle *AH);
00031 static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser);
00032 static void notice_processor(void *arg, const char *message);
00033
00034 static void
00035 _check_database_version(ArchiveHandle *AH)
00036 {
00037 const char *remoteversion_str;
00038 int remoteversion;
00039
00040 remoteversion_str = PQparameterStatus(AH->connection, "server_version");
00041 remoteversion = PQserverVersion(AH->connection);
00042 if (remoteversion == 0 || !remoteversion_str)
00043 exit_horribly(modulename, "could not get server_version from libpq\n");
00044
00045 AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
00046 AH->public.remoteVersion = remoteversion;
00047 if (!AH->archiveRemoteVersion)
00048 AH->archiveRemoteVersion = AH->public.remoteVersionStr;
00049
00050 if (remoteversion != PG_VERSION_NUM
00051 && (remoteversion < AH->public.minRemoteVersion ||
00052 remoteversion > AH->public.maxRemoteVersion))
00053 {
00054 write_msg(NULL, "server version: %s; %s version: %s\n",
00055 remoteversion_str, progname, PG_VERSION);
00056 exit_horribly(NULL, "aborting because of server version mismatch\n");
00057 }
00058 }
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 int
00070 ReconnectToServer(ArchiveHandle *AH, const char *dbname, const char *username)
00071 {
00072 PGconn *newConn;
00073 const char *newdbname;
00074 const char *newusername;
00075
00076 if (!dbname)
00077 newdbname = PQdb(AH->connection);
00078 else
00079 newdbname = dbname;
00080
00081 if (!username)
00082 newusername = PQuser(AH->connection);
00083 else
00084 newusername = username;
00085
00086
00087 if (strcmp(newdbname, PQdb(AH->connection)) == 0 &&
00088 strcmp(newusername, PQuser(AH->connection)) == 0)
00089 return 1;
00090
00091 newConn = _connectDB(AH, newdbname, newusername);
00092
00093 PQfinish(AH->connection);
00094 AH->connection = newConn;
00095
00096 return 1;
00097 }
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108 static PGconn *
00109 _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
00110 {
00111 PGconn *newConn;
00112 const char *newdb;
00113 const char *newuser;
00114 char *password = AH->savedPassword;
00115 bool new_pass;
00116
00117 if (!reqdb)
00118 newdb = PQdb(AH->connection);
00119 else
00120 newdb = reqdb;
00121
00122 if (!requser || strlen(requser) == 0)
00123 newuser = PQuser(AH->connection);
00124 else
00125 newuser = requser;
00126
00127 ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n",
00128 newdb, newuser);
00129
00130 if (AH->promptPassword == TRI_YES && password == NULL)
00131 {
00132 password = simple_prompt("Password: ", 100, false);
00133 if (password == NULL)
00134 exit_horribly(modulename, "out of memory\n");
00135 }
00136
00137 do
00138 {
00139 #define PARAMS_ARRAY_SIZE 7
00140 const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords));
00141 const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values));
00142
00143 keywords[0] = "host";
00144 values[0] = PQhost(AH->connection);
00145 keywords[1] = "port";
00146 values[1] = PQport(AH->connection);
00147 keywords[2] = "user";
00148 values[2] = newuser;
00149 keywords[3] = "password";
00150 values[3] = password;
00151 keywords[4] = "dbname";
00152 values[4] = newdb;
00153 keywords[5] = "fallback_application_name";
00154 values[5] = progname;
00155 keywords[6] = NULL;
00156 values[6] = NULL;
00157
00158 new_pass = false;
00159 newConn = PQconnectdbParams(keywords, values, true);
00160
00161 free(keywords);
00162 free(values);
00163
00164 if (!newConn)
00165 exit_horribly(modulename, "failed to reconnect to database\n");
00166
00167 if (PQstatus(newConn) == CONNECTION_BAD)
00168 {
00169 if (!PQconnectionNeedsPassword(newConn))
00170 exit_horribly(modulename, "could not reconnect to database: %s",
00171 PQerrorMessage(newConn));
00172 PQfinish(newConn);
00173
00174 if (password)
00175 fprintf(stderr, "Password incorrect\n");
00176
00177 fprintf(stderr, "Connecting to %s as %s\n",
00178 newdb, newuser);
00179
00180 if (password)
00181 free(password);
00182
00183 if (AH->promptPassword != TRI_NO)
00184 password = simple_prompt("Password: ", 100, false);
00185 else
00186 exit_horribly(modulename, "connection needs password\n");
00187
00188 if (password == NULL)
00189 exit_horribly(modulename, "out of memory\n");
00190 new_pass = true;
00191 }
00192 } while (new_pass);
00193
00194 AH->savedPassword = password;
00195
00196
00197 _check_database_version(AH);
00198
00199 PQsetNoticeProcessor(newConn, notice_processor, NULL);
00200
00201 return newConn;
00202 }
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214 void
00215 ConnectDatabase(Archive *AHX,
00216 const char *dbname,
00217 const char *pghost,
00218 const char *pgport,
00219 const char *username,
00220 enum trivalue prompt_password)
00221 {
00222 ArchiveHandle *AH = (ArchiveHandle *) AHX;
00223 char *password = AH->savedPassword;
00224 bool new_pass;
00225
00226 if (AH->connection)
00227 exit_horribly(modulename, "already connected to a database\n");
00228
00229 if (prompt_password == TRI_YES && password == NULL)
00230 {
00231 password = simple_prompt("Password: ", 100, false);
00232 if (password == NULL)
00233 exit_horribly(modulename, "out of memory\n");
00234 }
00235 AH->promptPassword = prompt_password;
00236
00237
00238
00239
00240
00241 do
00242 {
00243 #define PARAMS_ARRAY_SIZE 7
00244 const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords));
00245 const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values));
00246
00247 keywords[0] = "host";
00248 values[0] = pghost;
00249 keywords[1] = "port";
00250 values[1] = pgport;
00251 keywords[2] = "user";
00252 values[2] = username;
00253 keywords[3] = "password";
00254 values[3] = password;
00255 keywords[4] = "dbname";
00256 values[4] = dbname;
00257 keywords[5] = "fallback_application_name";
00258 values[5] = progname;
00259 keywords[6] = NULL;
00260 values[6] = NULL;
00261
00262 new_pass = false;
00263 AH->connection = PQconnectdbParams(keywords, values, true);
00264
00265 free(keywords);
00266 free(values);
00267
00268 if (!AH->connection)
00269 exit_horribly(modulename, "failed to connect to database\n");
00270
00271 if (PQstatus(AH->connection) == CONNECTION_BAD &&
00272 PQconnectionNeedsPassword(AH->connection) &&
00273 password == NULL &&
00274 prompt_password != TRI_NO)
00275 {
00276 PQfinish(AH->connection);
00277 password = simple_prompt("Password: ", 100, false);
00278 if (password == NULL)
00279 exit_horribly(modulename, "out of memory\n");
00280 new_pass = true;
00281 }
00282 } while (new_pass);
00283
00284 AH->savedPassword = password;
00285
00286
00287 if (PQstatus(AH->connection) == CONNECTION_BAD)
00288 exit_horribly(modulename, "connection to database \"%s\" failed: %s",
00289 PQdb(AH->connection) ? PQdb(AH->connection) : "",
00290 PQerrorMessage(AH->connection));
00291
00292
00293 _check_database_version(AH);
00294
00295 PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
00296 }
00297
00298
00299
00300
00301
00302 void
00303 DisconnectDatabase(Archive *AHX)
00304 {
00305 ArchiveHandle *AH = (ArchiveHandle *) AHX;
00306 PGcancel *cancel;
00307 char errbuf[1];
00308
00309 if (!AH->connection)
00310 return;
00311
00312 if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
00313 {
00314 if ((cancel = PQgetCancel(AH->connection)))
00315 {
00316 PQcancel(cancel, errbuf, sizeof(errbuf));
00317 PQfreeCancel(cancel);
00318 }
00319 }
00320
00321 PQfinish(AH->connection);
00322 AH->connection = NULL;
00323 }
00324
00325 PGconn *
00326 GetConnection(Archive *AHX)
00327 {
00328 ArchiveHandle *AH = (ArchiveHandle *) AHX;
00329
00330 return AH->connection;
00331 }
00332
00333 static void
00334 notice_processor(void *arg, const char *message)
00335 {
00336 write_msg(NULL, "%s", message);
00337 }
00338
00339
00340 static void
00341 die_on_query_failure(ArchiveHandle *AH, const char *modulename, const char *query)
00342 {
00343 write_msg(modulename, "query failed: %s",
00344 PQerrorMessage(AH->connection));
00345 exit_horribly(modulename, "query was: %s\n", query);
00346 }
00347
00348 void
00349 ExecuteSqlStatement(Archive *AHX, const char *query)
00350 {
00351 ArchiveHandle *AH = (ArchiveHandle *) AHX;
00352 PGresult *res;
00353
00354 res = PQexec(AH->connection, query);
00355 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00356 die_on_query_failure(AH, modulename, query);
00357 PQclear(res);
00358 }
00359
00360 PGresult *
00361 ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
00362 {
00363 ArchiveHandle *AH = (ArchiveHandle *) AHX;
00364 PGresult *res;
00365
00366 res = PQexec(AH->connection, query);
00367 if (PQresultStatus(res) != status)
00368 die_on_query_failure(AH, modulename, query);
00369 return res;
00370 }
00371
00372
00373
00374
00375
00376 static void
00377 ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
00378 {
00379 PGconn *conn = AH->connection;
00380 PGresult *res;
00381 char errStmt[DB_MAX_ERR_STMT];
00382
00383 #ifdef NOT_USED
00384 fprintf(stderr, "Executing: '%s'\n\n", qry);
00385 #endif
00386 res = PQexec(conn, qry);
00387
00388 switch (PQresultStatus(res))
00389 {
00390 case PGRES_COMMAND_OK:
00391 case PGRES_TUPLES_OK:
00392 case PGRES_EMPTY_QUERY:
00393
00394 break;
00395 case PGRES_COPY_IN:
00396
00397 AH->pgCopyIn = true;
00398 break;
00399 default:
00400
00401 strncpy(errStmt, qry, DB_MAX_ERR_STMT);
00402 if (errStmt[DB_MAX_ERR_STMT - 1] != '\0')
00403 {
00404 errStmt[DB_MAX_ERR_STMT - 4] = '.';
00405 errStmt[DB_MAX_ERR_STMT - 3] = '.';
00406 errStmt[DB_MAX_ERR_STMT - 2] = '.';
00407 errStmt[DB_MAX_ERR_STMT - 1] = '\0';
00408 }
00409 warn_or_exit_horribly(AH, modulename, "%s: %s Command was: %s\n",
00410 desc, PQerrorMessage(conn), errStmt);
00411 break;
00412 }
00413
00414 PQclear(res);
00415 }
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432 static void
00433 ExecuteInsertCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
00434 {
00435 const char *qry = buf;
00436 const char *eos = buf + bufLen;
00437
00438
00439 if (AH->sqlparse.curCmd == NULL)
00440 AH->sqlparse.curCmd = createPQExpBuffer();
00441
00442 for (; qry < eos; qry++)
00443 {
00444 char ch = *qry;
00445
00446
00447 if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
00448 appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
00449
00450 switch (AH->sqlparse.state)
00451 {
00452 case SQL_SCAN:
00453 if (ch == ';')
00454 {
00455
00456
00457
00458
00459 ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
00460 "could not execute query");
00461 resetPQExpBuffer(AH->sqlparse.curCmd);
00462 }
00463 else if (ch == '\'')
00464 {
00465 AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
00466 AH->sqlparse.backSlash = false;
00467 }
00468 else if (ch == '"')
00469 {
00470 AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
00471 }
00472 break;
00473
00474 case SQL_IN_SINGLE_QUOTE:
00475
00476 if (ch == '\'' && !AH->sqlparse.backSlash)
00477 AH->sqlparse.state = SQL_SCAN;
00478 else if (ch == '\\' && !AH->public.std_strings)
00479 AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
00480 else
00481 AH->sqlparse.backSlash = false;
00482 break;
00483
00484 case SQL_IN_DOUBLE_QUOTE:
00485
00486 if (ch == '"')
00487 AH->sqlparse.state = SQL_SCAN;
00488 break;
00489 }
00490 }
00491 }
00492
00493
00494
00495
00496
00497 int
00498 ExecuteSqlCommandBuf(ArchiveHandle *AH, const char *buf, size_t bufLen)
00499 {
00500 if (AH->outputKind == OUTPUT_COPYDATA)
00501 {
00502
00503
00504
00505
00506
00507
00508
00509 if (AH->pgCopyIn &&
00510 PQputCopyData(AH->connection, buf, bufLen) <= 0)
00511 exit_horribly(modulename, "error returned by PQputCopyData: %s",
00512 PQerrorMessage(AH->connection));
00513 }
00514 else if (AH->outputKind == OUTPUT_OTHERDATA)
00515 {
00516
00517
00518
00519 ExecuteInsertCommands(AH, buf, bufLen);
00520 }
00521 else
00522 {
00523
00524
00525
00526
00527
00528
00529
00530 if (buf[bufLen] == '\0')
00531 ExecuteSqlCommand(AH, buf, "could not execute query");
00532 else
00533 {
00534 char *str = (char *) pg_malloc(bufLen + 1);
00535
00536 memcpy(str, buf, bufLen);
00537 str[bufLen] = '\0';
00538 ExecuteSqlCommand(AH, str, "could not execute query");
00539 free(str);
00540 }
00541 }
00542
00543 return 1;
00544 }
00545
00546
00547
00548
00549 void
00550 EndDBCopyMode(ArchiveHandle *AH, TocEntry *te)
00551 {
00552 if (AH->pgCopyIn)
00553 {
00554 PGresult *res;
00555
00556 if (PQputCopyEnd(AH->connection, NULL) <= 0)
00557 exit_horribly(modulename, "error returned by PQputCopyEnd: %s",
00558 PQerrorMessage(AH->connection));
00559
00560
00561 res = PQgetResult(AH->connection);
00562 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00563 warn_or_exit_horribly(AH, modulename, "COPY failed for table \"%s\": %s",
00564 te->tag, PQerrorMessage(AH->connection));
00565 PQclear(res);
00566
00567 AH->pgCopyIn = false;
00568 }
00569 }
00570
00571 void
00572 StartTransaction(ArchiveHandle *AH)
00573 {
00574 ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
00575 }
00576
00577 void
00578 CommitTransaction(ArchiveHandle *AH)
00579 {
00580 ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
00581 }
00582
00583 void
00584 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
00585 {
00586
00587
00588
00589
00590 if (AH->connection == NULL ||
00591 PQserverVersion(AH->connection) >= 90000)
00592 {
00593 ahprintf(AH,
00594 "SELECT pg_catalog.lo_unlink(oid) "
00595 "FROM pg_catalog.pg_largeobject_metadata "
00596 "WHERE oid = '%u';\n",
00597 oid);
00598 }
00599 else
00600 {
00601
00602 ahprintf(AH,
00603 "SELECT CASE WHEN EXISTS("
00604 "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
00605 ") THEN pg_catalog.lo_unlink('%u') END;\n",
00606 oid, oid);
00607 }
00608 }