00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "postgres.h"
00014
00015 #include "postgres_fdw.h"
00016
00017 #include "access/xact.h"
00018 #include "mb/pg_wchar.h"
00019 #include "miscadmin.h"
00020 #include "utils/hsearch.h"
00021 #include "utils/memutils.h"
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 typedef struct ConnCacheKey
00039 {
00040 Oid serverid;
00041 Oid userid;
00042 } ConnCacheKey;
00043
00044 typedef struct ConnCacheEntry
00045 {
00046 ConnCacheKey key;
00047 PGconn *conn;
00048 int xact_depth;
00049
00050 bool have_prep_stmt;
00051 bool have_error;
00052 } ConnCacheEntry;
00053
00054
00055
00056
00057 static HTAB *ConnectionHash = NULL;
00058
00059
00060 static unsigned int cursor_number = 0;
00061 static unsigned int prep_stmt_number = 0;
00062
00063
00064 static bool xact_got_connection = false;
00065
00066
00067 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
00068 static void check_conn_params(const char **keywords, const char **values);
00069 static void configure_remote_session(PGconn *conn);
00070 static void do_sql_command(PGconn *conn, const char *sql);
00071 static void begin_remote_xact(ConnCacheEntry *entry);
00072 static void pgfdw_xact_callback(XactEvent event, void *arg);
00073 static void pgfdw_subxact_callback(SubXactEvent event,
00074 SubTransactionId mySubid,
00075 SubTransactionId parentSubid,
00076 void *arg);
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096 PGconn *
00097 GetConnection(ForeignServer *server, UserMapping *user,
00098 bool will_prep_stmt)
00099 {
00100 bool found;
00101 ConnCacheEntry *entry;
00102 ConnCacheKey key;
00103
00104
00105 if (ConnectionHash == NULL)
00106 {
00107 HASHCTL ctl;
00108
00109 MemSet(&ctl, 0, sizeof(ctl));
00110 ctl.keysize = sizeof(ConnCacheKey);
00111 ctl.entrysize = sizeof(ConnCacheEntry);
00112 ctl.hash = tag_hash;
00113
00114 ctl.hcxt = CacheMemoryContext;
00115 ConnectionHash = hash_create("postgres_fdw connections", 8,
00116 &ctl,
00117 HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
00118
00119
00120
00121
00122
00123 RegisterXactCallback(pgfdw_xact_callback, NULL);
00124 RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
00125 }
00126
00127
00128 xact_got_connection = true;
00129
00130
00131 key.serverid = server->serverid;
00132 key.userid = user->userid;
00133
00134
00135
00136
00137 entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
00138 if (!found)
00139 {
00140
00141 entry->conn = NULL;
00142 entry->xact_depth = 0;
00143 entry->have_prep_stmt = false;
00144 entry->have_error = false;
00145 }
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158 if (entry->conn == NULL)
00159 {
00160 entry->xact_depth = 0;
00161 entry->have_prep_stmt = false;
00162 entry->have_error = false;
00163 entry->conn = connect_pg_server(server, user);
00164 elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
00165 entry->conn, server->servername);
00166 }
00167
00168
00169
00170
00171 begin_remote_xact(entry);
00172
00173
00174 entry->have_prep_stmt |= will_prep_stmt;
00175
00176 return entry->conn;
00177 }
00178
00179
00180
00181
00182 static PGconn *
00183 connect_pg_server(ForeignServer *server, UserMapping *user)
00184 {
00185 PGconn *volatile conn = NULL;
00186
00187
00188
00189
00190 PG_TRY();
00191 {
00192 const char **keywords;
00193 const char **values;
00194 int n;
00195
00196
00197
00198
00199
00200
00201
00202 n = list_length(server->options) + list_length(user->options) + 3;
00203 keywords = (const char **) palloc(n * sizeof(char *));
00204 values = (const char **) palloc(n * sizeof(char *));
00205
00206 n = 0;
00207 n += ExtractConnectionOptions(server->options,
00208 keywords + n, values + n);
00209 n += ExtractConnectionOptions(user->options,
00210 keywords + n, values + n);
00211
00212
00213 keywords[n] = "fallback_application_name";
00214 values[n] = "postgres_fdw";
00215 n++;
00216
00217
00218 keywords[n] = "client_encoding";
00219 values[n] = GetDatabaseEncodingName();
00220 n++;
00221
00222 keywords[n] = values[n] = NULL;
00223
00224
00225 check_conn_params(keywords, values);
00226
00227 conn = PQconnectdbParams(keywords, values, false);
00228 if (!conn || PQstatus(conn) != CONNECTION_OK)
00229 {
00230 char *connmessage;
00231 int msglen;
00232
00233
00234 connmessage = pstrdup(PQerrorMessage(conn));
00235 msglen = strlen(connmessage);
00236 if (msglen > 0 && connmessage[msglen - 1] == '\n')
00237 connmessage[msglen - 1] = '\0';
00238 ereport(ERROR,
00239 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
00240 errmsg("could not connect to server \"%s\"",
00241 server->servername),
00242 errdetail_internal("%s", connmessage)));
00243 }
00244
00245
00246
00247
00248
00249
00250 if (!superuser() && !PQconnectionUsedPassword(conn))
00251 ereport(ERROR,
00252 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
00253 errmsg("password is required"),
00254 errdetail("Non-superuser cannot connect if the server does not request a password."),
00255 errhint("Target server's authentication method must be changed.")));
00256
00257
00258 configure_remote_session(conn);
00259
00260 pfree(keywords);
00261 pfree(values);
00262 }
00263 PG_CATCH();
00264 {
00265
00266 if (conn)
00267 PQfinish(conn);
00268 PG_RE_THROW();
00269 }
00270 PG_END_TRY();
00271
00272 return conn;
00273 }
00274
00275
00276
00277
00278
00279
00280
00281
00282 static void
00283 check_conn_params(const char **keywords, const char **values)
00284 {
00285 int i;
00286
00287
00288 if (superuser())
00289 return;
00290
00291
00292 for (i = 0; keywords[i] != NULL; i++)
00293 {
00294 if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
00295 return;
00296 }
00297
00298 ereport(ERROR,
00299 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
00300 errmsg("password is required"),
00301 errdetail("Non-superusers must provide a password in the user mapping.")));
00302 }
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315 static void
00316 configure_remote_session(PGconn *conn)
00317 {
00318 int remoteversion = PQserverVersion(conn);
00319
00320
00321 do_sql_command(conn, "SET search_path = pg_catalog");
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332 do_sql_command(conn, "SET timezone = 'UTC'");
00333
00334
00335
00336
00337
00338
00339 do_sql_command(conn, "SET datestyle = ISO");
00340 if (remoteversion >= 80400)
00341 do_sql_command(conn, "SET intervalstyle = postgres");
00342 if (remoteversion >= 90000)
00343 do_sql_command(conn, "SET extra_float_digits = 3");
00344 else
00345 do_sql_command(conn, "SET extra_float_digits = 2");
00346 }
00347
00348
00349
00350
00351 static void
00352 do_sql_command(PGconn *conn, const char *sql)
00353 {
00354 PGresult *res;
00355
00356 res = PQexec(conn, sql);
00357 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00358 pgfdw_report_error(ERROR, res, true, sql);
00359 PQclear(res);
00360 }
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372 static void
00373 begin_remote_xact(ConnCacheEntry *entry)
00374 {
00375 int curlevel = GetCurrentTransactionNestLevel();
00376
00377
00378 if (entry->xact_depth <= 0)
00379 {
00380 const char *sql;
00381
00382 elog(DEBUG3, "starting remote transaction on connection %p",
00383 entry->conn);
00384
00385 if (IsolationIsSerializable())
00386 sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
00387 else
00388 sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
00389 do_sql_command(entry->conn, sql);
00390 entry->xact_depth = 1;
00391 }
00392
00393
00394
00395
00396
00397
00398 while (entry->xact_depth < curlevel)
00399 {
00400 char sql[64];
00401
00402 snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
00403 do_sql_command(entry->conn, sql);
00404 entry->xact_depth++;
00405 }
00406 }
00407
00408
00409
00410
00411 void
00412 ReleaseConnection(PGconn *conn)
00413 {
00414
00415
00416
00417
00418
00419 }
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432 unsigned int
00433 GetCursorNumber(PGconn *conn)
00434 {
00435 return ++cursor_number;
00436 }
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446 unsigned int
00447 GetPrepStmtNumber(PGconn *conn)
00448 {
00449 return ++prep_stmt_number;
00450 }
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464 void
00465 pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
00466 {
00467
00468 PG_TRY();
00469 {
00470 char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
00471 char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
00472 char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
00473 char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
00474 char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
00475 int sqlstate;
00476
00477 if (diag_sqlstate)
00478 sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
00479 diag_sqlstate[1],
00480 diag_sqlstate[2],
00481 diag_sqlstate[3],
00482 diag_sqlstate[4]);
00483 else
00484 sqlstate = ERRCODE_CONNECTION_FAILURE;
00485
00486 ereport(elevel,
00487 (errcode(sqlstate),
00488 message_primary ? errmsg_internal("%s", message_primary) :
00489 errmsg("unknown error"),
00490 message_detail ? errdetail_internal("%s", message_detail) : 0,
00491 message_hint ? errhint("%s", message_hint) : 0,
00492 message_context ? errcontext("%s", message_context) : 0,
00493 sql ? errcontext("Remote SQL command: %s", sql) : 0));
00494 }
00495 PG_CATCH();
00496 {
00497 if (clear)
00498 PQclear(res);
00499 PG_RE_THROW();
00500 }
00501 PG_END_TRY();
00502 if (clear)
00503 PQclear(res);
00504 }
00505
00506
00507
00508
00509 static void
00510 pgfdw_xact_callback(XactEvent event, void *arg)
00511 {
00512 HASH_SEQ_STATUS scan;
00513 ConnCacheEntry *entry;
00514
00515
00516 if (!xact_got_connection)
00517 return;
00518
00519
00520
00521
00522
00523 hash_seq_init(&scan, ConnectionHash);
00524 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
00525 {
00526 PGresult *res;
00527
00528
00529 if (entry->conn == NULL || entry->xact_depth == 0)
00530 continue;
00531
00532 elog(DEBUG3, "closing remote transaction on connection %p",
00533 entry->conn);
00534
00535 switch (event)
00536 {
00537 case XACT_EVENT_PRE_COMMIT:
00538
00539 do_sql_command(entry->conn, "COMMIT TRANSACTION");
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555 if (entry->have_prep_stmt && entry->have_error)
00556 {
00557 res = PQexec(entry->conn, "DEALLOCATE ALL");
00558 PQclear(res);
00559 }
00560 entry->have_prep_stmt = false;
00561 entry->have_error = false;
00562 break;
00563 case XACT_EVENT_PRE_PREPARE:
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574 ereport(ERROR,
00575 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00576 errmsg("cannot prepare a transaction that modified remote tables")));
00577 break;
00578 case XACT_EVENT_COMMIT:
00579 case XACT_EVENT_PREPARE:
00580
00581 elog(ERROR, "missed cleaning up connection during pre-commit");
00582 break;
00583 case XACT_EVENT_ABORT:
00584
00585 entry->have_error = true;
00586
00587 res = PQexec(entry->conn, "ABORT TRANSACTION");
00588
00589 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00590 pgfdw_report_error(WARNING, res, true,
00591 "ABORT TRANSACTION");
00592 else
00593 {
00594 PQclear(res);
00595
00596 if (entry->have_prep_stmt && entry->have_error)
00597 {
00598 res = PQexec(entry->conn, "DEALLOCATE ALL");
00599 PQclear(res);
00600 }
00601 entry->have_prep_stmt = false;
00602 entry->have_error = false;
00603 }
00604 break;
00605 }
00606
00607
00608 entry->xact_depth = 0;
00609
00610
00611
00612
00613
00614 if (PQstatus(entry->conn) != CONNECTION_OK ||
00615 PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
00616 {
00617 elog(DEBUG3, "discarding connection %p", entry->conn);
00618 PQfinish(entry->conn);
00619 entry->conn = NULL;
00620 }
00621 }
00622
00623
00624
00625
00626
00627
00628 xact_got_connection = false;
00629
00630
00631 cursor_number = 0;
00632 }
00633
00634
00635
00636
00637 static void
00638 pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
00639 SubTransactionId parentSubid, void *arg)
00640 {
00641 HASH_SEQ_STATUS scan;
00642 ConnCacheEntry *entry;
00643 int curlevel;
00644
00645
00646 if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
00647 event == SUBXACT_EVENT_ABORT_SUB))
00648 return;
00649
00650
00651 if (!xact_got_connection)
00652 return;
00653
00654
00655
00656
00657
00658 curlevel = GetCurrentTransactionNestLevel();
00659 hash_seq_init(&scan, ConnectionHash);
00660 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
00661 {
00662 PGresult *res;
00663 char sql[100];
00664
00665
00666
00667
00668
00669 if (entry->conn == NULL || entry->xact_depth < curlevel)
00670 continue;
00671
00672 if (entry->xact_depth > curlevel)
00673 elog(ERROR, "missed cleaning up remote subtransaction at level %d",
00674 entry->xact_depth);
00675
00676 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
00677 {
00678
00679 snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
00680 do_sql_command(entry->conn, sql);
00681 }
00682 else
00683 {
00684
00685 entry->have_error = true;
00686
00687 snprintf(sql, sizeof(sql),
00688 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
00689 curlevel, curlevel);
00690 res = PQexec(entry->conn, sql);
00691 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00692 pgfdw_report_error(WARNING, res, true, sql);
00693 else
00694 PQclear(res);
00695 }
00696
00697
00698 entry->xact_depth--;
00699 }
00700 }