00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include "postgres.h"
00034
00035 #include <limits.h>
00036
00037 #include "libpq-fe.h"
00038
00039 #include "access/htup_details.h"
00040 #include "access/reloptions.h"
00041 #include "catalog/indexing.h"
00042 #include "catalog/namespace.h"
00043 #include "catalog/pg_foreign_server.h"
00044 #include "catalog/pg_type.h"
00045 #include "catalog/pg_user_mapping.h"
00046 #include "executor/spi.h"
00047 #include "foreign/foreign.h"
00048 #include "funcapi.h"
00049 #include "lib/stringinfo.h"
00050 #include "mb/pg_wchar.h"
00051 #include "miscadmin.h"
00052 #include "parser/scansup.h"
00053 #include "utils/acl.h"
00054 #include "utils/builtins.h"
00055 #include "utils/fmgroids.h"
00056 #include "utils/guc.h"
00057 #include "utils/lsyscache.h"
00058 #include "utils/memutils.h"
00059 #include "utils/rel.h"
00060 #include "utils/tqual.h"
00061
00062 #include "dblink.h"
00063
00064 PG_MODULE_MAGIC;
00065
00066 typedef struct remoteConn
00067 {
00068 PGconn *conn;
00069 int openCursorCount;
00070 bool newXactForCursor;
00071 } remoteConn;
00072
00073 typedef struct storeInfo
00074 {
00075 FunctionCallInfo fcinfo;
00076 Tuplestorestate *tuplestore;
00077 AttInMetadata *attinmeta;
00078 MemoryContext tmpcontext;
00079 char **cstrs;
00080
00081 PGresult *last_res;
00082 PGresult *cur_res;
00083 } storeInfo;
00084
00085
00086
00087
00088 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
00089 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
00090 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
00091 PGresult *res);
00092 static void materializeQueryResult(FunctionCallInfo fcinfo,
00093 PGconn *conn,
00094 const char *conname,
00095 const char *sql,
00096 bool fail);
00097 static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
00098 static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
00099 static remoteConn *getConnectionByName(const char *name);
00100 static HTAB *createConnHash(void);
00101 static void createNewConnection(const char *name, remoteConn *rconn);
00102 static void deleteConnection(const char *name);
00103 static char **get_pkey_attnames(Relation rel, int16 *numatts);
00104 static char **get_text_array_contents(ArrayType *array, int *numitems);
00105 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
00106 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
00107 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
00108 static char *quote_ident_cstr(char *rawstr);
00109 static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
00110 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
00111 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
00112 static char *generate_relation_name(Relation rel);
00113 static void dblink_connstr_check(const char *connstr);
00114 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
00115 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
00116 static char *get_connect_string(const char *servername);
00117 static char *escape_param_str(const char *from);
00118 static void validate_pkattnums(Relation rel,
00119 int2vector *pkattnums_arg, int32 pknumatts_arg,
00120 int **pkattnums, int *pknumatts);
00121 static bool is_valid_dblink_option(const PQconninfoOption *options,
00122 const char *option, Oid context);
00123 static int applyRemoteGucs(PGconn *conn);
00124 static void restoreLocalGucs(int nestlevel);
00125
00126
00127 static remoteConn *pconn = NULL;
00128 static HTAB *remoteConnHash = NULL;
00129
00130
00131
00132
00133
00134
00135
00136
00137 typedef struct remoteConnHashEnt
00138 {
00139 char name[NAMEDATALEN];
00140 remoteConn *rconn;
00141 } remoteConnHashEnt;
00142
00143
00144 #define NUMCONN 16
00145
00146
00147 #define xpfree(var_) \
00148 do { \
00149 if (var_ != NULL) \
00150 { \
00151 pfree(var_); \
00152 var_ = NULL; \
00153 } \
00154 } while (0)
00155
00156 #define xpstrdup(var_c, var_) \
00157 do { \
00158 if (var_ != NULL) \
00159 var_c = pstrdup(var_); \
00160 else \
00161 var_c = NULL; \
00162 } while (0)
00163
00164 #define DBLINK_RES_INTERNALERROR(p2) \
00165 do { \
00166 msg = pstrdup(PQerrorMessage(conn)); \
00167 if (res) \
00168 PQclear(res); \
00169 elog(ERROR, "%s: %s", p2, msg); \
00170 } while (0)
00171
00172 #define DBLINK_CONN_NOT_AVAIL \
00173 do { \
00174 if(conname) \
00175 ereport(ERROR, \
00176 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
00177 errmsg("connection \"%s\" not available", conname))); \
00178 else \
00179 ereport(ERROR, \
00180 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
00181 errmsg("connection not available"))); \
00182 } while (0)
00183
00184 #define DBLINK_GET_CONN \
00185 do { \
00186 char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
00187 rconn = getConnectionByName(conname_or_str); \
00188 if (rconn) \
00189 { \
00190 conn = rconn->conn; \
00191 conname = conname_or_str; \
00192 } \
00193 else \
00194 { \
00195 connstr = get_connect_string(conname_or_str); \
00196 if (connstr == NULL) \
00197 { \
00198 connstr = conname_or_str; \
00199 } \
00200 dblink_connstr_check(connstr); \
00201 conn = PQconnectdb(connstr); \
00202 if (PQstatus(conn) == CONNECTION_BAD) \
00203 { \
00204 msg = pstrdup(PQerrorMessage(conn)); \
00205 PQfinish(conn); \
00206 ereport(ERROR, \
00207 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
00208 errmsg("could not establish connection"), \
00209 errdetail_internal("%s", msg))); \
00210 } \
00211 dblink_security_check(conn, rconn); \
00212 PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
00213 freeconn = true; \
00214 } \
00215 } while (0)
00216
00217 #define DBLINK_GET_NAMED_CONN \
00218 do { \
00219 conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
00220 rconn = getConnectionByName(conname); \
00221 if (rconn) \
00222 conn = rconn->conn; \
00223 else \
00224 DBLINK_CONN_NOT_AVAIL; \
00225 } while (0)
00226
00227 #define DBLINK_INIT \
00228 do { \
00229 if (!pconn) \
00230 { \
00231 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
00232 pconn->conn = NULL; \
00233 pconn->openCursorCount = 0; \
00234 pconn->newXactForCursor = FALSE; \
00235 } \
00236 } while (0)
00237
00238
00239
00240
00241 PG_FUNCTION_INFO_V1(dblink_connect);
00242 Datum
00243 dblink_connect(PG_FUNCTION_ARGS)
00244 {
00245 char *conname_or_str = NULL;
00246 char *connstr = NULL;
00247 char *connname = NULL;
00248 char *msg;
00249 PGconn *conn = NULL;
00250 remoteConn *rconn = NULL;
00251
00252 DBLINK_INIT;
00253
00254 if (PG_NARGS() == 2)
00255 {
00256 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
00257 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00258 }
00259 else if (PG_NARGS() == 1)
00260 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
00261
00262 if (connname)
00263 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
00264 sizeof(remoteConn));
00265
00266
00267 connstr = get_connect_string(conname_or_str);
00268 if (connstr == NULL)
00269 connstr = conname_or_str;
00270
00271
00272 dblink_connstr_check(connstr);
00273 conn = PQconnectdb(connstr);
00274
00275 if (PQstatus(conn) == CONNECTION_BAD)
00276 {
00277 msg = pstrdup(PQerrorMessage(conn));
00278 PQfinish(conn);
00279 if (rconn)
00280 pfree(rconn);
00281
00282 ereport(ERROR,
00283 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
00284 errmsg("could not establish connection"),
00285 errdetail_internal("%s", msg)));
00286 }
00287
00288
00289 dblink_security_check(conn, rconn);
00290
00291
00292 PQsetClientEncoding(conn, GetDatabaseEncodingName());
00293
00294 if (connname)
00295 {
00296 rconn->conn = conn;
00297 createNewConnection(connname, rconn);
00298 }
00299 else
00300 pconn->conn = conn;
00301
00302 PG_RETURN_TEXT_P(cstring_to_text("OK"));
00303 }
00304
00305
00306
00307
00308 PG_FUNCTION_INFO_V1(dblink_disconnect);
00309 Datum
00310 dblink_disconnect(PG_FUNCTION_ARGS)
00311 {
00312 char *conname = NULL;
00313 remoteConn *rconn = NULL;
00314 PGconn *conn = NULL;
00315
00316 DBLINK_INIT;
00317
00318 if (PG_NARGS() == 1)
00319 {
00320 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00321 rconn = getConnectionByName(conname);
00322 if (rconn)
00323 conn = rconn->conn;
00324 }
00325 else
00326 conn = pconn->conn;
00327
00328 if (!conn)
00329 DBLINK_CONN_NOT_AVAIL;
00330
00331 PQfinish(conn);
00332 if (rconn)
00333 {
00334 deleteConnection(conname);
00335 pfree(rconn);
00336 }
00337 else
00338 pconn->conn = NULL;
00339
00340 PG_RETURN_TEXT_P(cstring_to_text("OK"));
00341 }
00342
00343
00344
00345
00346 PG_FUNCTION_INFO_V1(dblink_open);
00347 Datum
00348 dblink_open(PG_FUNCTION_ARGS)
00349 {
00350 char *msg;
00351 PGresult *res = NULL;
00352 PGconn *conn = NULL;
00353 char *curname = NULL;
00354 char *sql = NULL;
00355 char *conname = NULL;
00356 StringInfoData buf;
00357 remoteConn *rconn = NULL;
00358 bool fail = true;
00359
00360 DBLINK_INIT;
00361 initStringInfo(&buf);
00362
00363 if (PG_NARGS() == 2)
00364 {
00365
00366 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00367 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00368 rconn = pconn;
00369 }
00370 else if (PG_NARGS() == 3)
00371 {
00372
00373 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
00374 {
00375 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00376 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00377 fail = PG_GETARG_BOOL(2);
00378 rconn = pconn;
00379 }
00380 else
00381 {
00382 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00383 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00384 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
00385 rconn = getConnectionByName(conname);
00386 }
00387 }
00388 else if (PG_NARGS() == 4)
00389 {
00390
00391 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00392 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00393 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
00394 fail = PG_GETARG_BOOL(3);
00395 rconn = getConnectionByName(conname);
00396 }
00397
00398 if (!rconn || !rconn->conn)
00399 DBLINK_CONN_NOT_AVAIL;
00400 else
00401 conn = rconn->conn;
00402
00403
00404 if (PQtransactionStatus(conn) == PQTRANS_IDLE)
00405 {
00406 res = PQexec(conn, "BEGIN");
00407 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00408 DBLINK_RES_INTERNALERROR("begin error");
00409 PQclear(res);
00410 rconn->newXactForCursor = TRUE;
00411
00412
00413
00414
00415
00416
00417 rconn->openCursorCount = 0;
00418 }
00419
00420
00421 if (rconn->newXactForCursor)
00422 (rconn->openCursorCount)++;
00423
00424 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
00425 res = PQexec(conn, buf.data);
00426 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
00427 {
00428 dblink_res_error(conname, res, "could not open cursor", fail);
00429 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
00430 }
00431
00432 PQclear(res);
00433 PG_RETURN_TEXT_P(cstring_to_text("OK"));
00434 }
00435
00436
00437
00438
00439 PG_FUNCTION_INFO_V1(dblink_close);
00440 Datum
00441 dblink_close(PG_FUNCTION_ARGS)
00442 {
00443 PGconn *conn = NULL;
00444 PGresult *res = NULL;
00445 char *curname = NULL;
00446 char *conname = NULL;
00447 StringInfoData buf;
00448 char *msg;
00449 remoteConn *rconn = NULL;
00450 bool fail = true;
00451
00452 DBLINK_INIT;
00453 initStringInfo(&buf);
00454
00455 if (PG_NARGS() == 1)
00456 {
00457
00458 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00459 rconn = pconn;
00460 }
00461 else if (PG_NARGS() == 2)
00462 {
00463
00464 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
00465 {
00466 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00467 fail = PG_GETARG_BOOL(1);
00468 rconn = pconn;
00469 }
00470 else
00471 {
00472 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00473 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00474 rconn = getConnectionByName(conname);
00475 }
00476 }
00477 if (PG_NARGS() == 3)
00478 {
00479
00480 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00481 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00482 fail = PG_GETARG_BOOL(2);
00483 rconn = getConnectionByName(conname);
00484 }
00485
00486 if (!rconn || !rconn->conn)
00487 DBLINK_CONN_NOT_AVAIL;
00488 else
00489 conn = rconn->conn;
00490
00491 appendStringInfo(&buf, "CLOSE %s", curname);
00492
00493
00494 res = PQexec(conn, buf.data);
00495 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
00496 {
00497 dblink_res_error(conname, res, "could not close cursor", fail);
00498 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
00499 }
00500
00501 PQclear(res);
00502
00503
00504 if (rconn->newXactForCursor)
00505 {
00506 (rconn->openCursorCount)--;
00507
00508
00509 if (rconn->openCursorCount == 0)
00510 {
00511 rconn->newXactForCursor = FALSE;
00512
00513 res = PQexec(conn, "COMMIT");
00514 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00515 DBLINK_RES_INTERNALERROR("commit error");
00516 PQclear(res);
00517 }
00518 }
00519
00520 PG_RETURN_TEXT_P(cstring_to_text("OK"));
00521 }
00522
00523
00524
00525
00526 PG_FUNCTION_INFO_V1(dblink_fetch);
00527 Datum
00528 dblink_fetch(PG_FUNCTION_ARGS)
00529 {
00530 PGresult *res = NULL;
00531 char *conname = NULL;
00532 remoteConn *rconn = NULL;
00533 PGconn *conn = NULL;
00534 StringInfoData buf;
00535 char *curname = NULL;
00536 int howmany = 0;
00537 bool fail = true;
00538
00539 prepTuplestoreResult(fcinfo);
00540
00541 DBLINK_INIT;
00542
00543 if (PG_NARGS() == 4)
00544 {
00545
00546 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00547 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00548 howmany = PG_GETARG_INT32(2);
00549 fail = PG_GETARG_BOOL(3);
00550
00551 rconn = getConnectionByName(conname);
00552 if (rconn)
00553 conn = rconn->conn;
00554 }
00555 else if (PG_NARGS() == 3)
00556 {
00557
00558 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
00559 {
00560 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00561 howmany = PG_GETARG_INT32(1);
00562 fail = PG_GETARG_BOOL(2);
00563 conn = pconn->conn;
00564 }
00565 else
00566 {
00567 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00568 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00569 howmany = PG_GETARG_INT32(2);
00570
00571 rconn = getConnectionByName(conname);
00572 if (rconn)
00573 conn = rconn->conn;
00574 }
00575 }
00576 else if (PG_NARGS() == 2)
00577 {
00578
00579 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00580 howmany = PG_GETARG_INT32(1);
00581 conn = pconn->conn;
00582 }
00583
00584 if (!conn)
00585 DBLINK_CONN_NOT_AVAIL;
00586
00587 initStringInfo(&buf);
00588 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
00589
00590
00591
00592
00593
00594
00595 res = PQexec(conn, buf.data);
00596 if (!res ||
00597 (PQresultStatus(res) != PGRES_COMMAND_OK &&
00598 PQresultStatus(res) != PGRES_TUPLES_OK))
00599 {
00600 dblink_res_error(conname, res, "could not fetch from cursor", fail);
00601 return (Datum) 0;
00602 }
00603 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
00604 {
00605
00606 PQclear(res);
00607 ereport(ERROR,
00608 (errcode(ERRCODE_INVALID_CURSOR_NAME),
00609 errmsg("cursor \"%s\" does not exist", curname)));
00610 }
00611
00612 materializeResult(fcinfo, conn, res);
00613 return (Datum) 0;
00614 }
00615
00616
00617
00618
00619 PG_FUNCTION_INFO_V1(dblink_record);
00620 Datum
00621 dblink_record(PG_FUNCTION_ARGS)
00622 {
00623 return dblink_record_internal(fcinfo, false);
00624 }
00625
00626 PG_FUNCTION_INFO_V1(dblink_send_query);
00627 Datum
00628 dblink_send_query(PG_FUNCTION_ARGS)
00629 {
00630 char *conname = NULL;
00631 PGconn *conn = NULL;
00632 char *sql = NULL;
00633 remoteConn *rconn = NULL;
00634 int retval;
00635
00636 if (PG_NARGS() == 2)
00637 {
00638 DBLINK_GET_NAMED_CONN;
00639 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00640 }
00641 else
00642
00643 elog(ERROR, "wrong number of arguments");
00644
00645
00646 retval = PQsendQuery(conn, sql);
00647 if (retval != 1)
00648 elog(NOTICE, "could not send query: %s", PQerrorMessage(conn));
00649
00650 PG_RETURN_INT32(retval);
00651 }
00652
00653 PG_FUNCTION_INFO_V1(dblink_get_result);
00654 Datum
00655 dblink_get_result(PG_FUNCTION_ARGS)
00656 {
00657 return dblink_record_internal(fcinfo, true);
00658 }
00659
00660 static Datum
00661 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
00662 {
00663 PGconn *volatile conn = NULL;
00664 volatile bool freeconn = false;
00665
00666 prepTuplestoreResult(fcinfo);
00667
00668 DBLINK_INIT;
00669
00670 PG_TRY();
00671 {
00672 char *msg;
00673 char *connstr = NULL;
00674 char *sql = NULL;
00675 char *conname = NULL;
00676 remoteConn *rconn = NULL;
00677 bool fail = true;
00678
00679 if (!is_async)
00680 {
00681 if (PG_NARGS() == 3)
00682 {
00683
00684 DBLINK_GET_CONN;
00685 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00686 fail = PG_GETARG_BOOL(2);
00687 }
00688 else if (PG_NARGS() == 2)
00689 {
00690
00691 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
00692 {
00693 conn = pconn->conn;
00694 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
00695 fail = PG_GETARG_BOOL(1);
00696 }
00697 else
00698 {
00699 DBLINK_GET_CONN;
00700 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00701 }
00702 }
00703 else if (PG_NARGS() == 1)
00704 {
00705
00706 conn = pconn->conn;
00707 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
00708 }
00709 else
00710
00711 elog(ERROR, "wrong number of arguments");
00712 }
00713 else
00714 {
00715
00716 if (PG_NARGS() == 2)
00717 {
00718
00719 DBLINK_GET_NAMED_CONN;
00720 fail = PG_GETARG_BOOL(1);
00721 }
00722 else if (PG_NARGS() == 1)
00723 {
00724
00725 DBLINK_GET_NAMED_CONN;
00726 }
00727 else
00728
00729 elog(ERROR, "wrong number of arguments");
00730 }
00731
00732 if (!conn)
00733 DBLINK_CONN_NOT_AVAIL;
00734
00735 if (!is_async)
00736 {
00737
00738 materializeQueryResult(fcinfo, conn, conname, sql, fail);
00739 }
00740 else
00741 {
00742
00743 PGresult *res = PQgetResult(conn);
00744
00745
00746 if (res)
00747 {
00748 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
00749 PQresultStatus(res) != PGRES_TUPLES_OK)
00750 {
00751 dblink_res_error(conname, res, "could not execute query",
00752 fail);
00753
00754 }
00755 else
00756 {
00757 materializeResult(fcinfo, conn, res);
00758 }
00759 }
00760 }
00761 }
00762 PG_CATCH();
00763 {
00764
00765 if (freeconn)
00766 PQfinish(conn);
00767 PG_RE_THROW();
00768 }
00769 PG_END_TRY();
00770
00771
00772 if (freeconn)
00773 PQfinish(conn);
00774
00775 return (Datum) 0;
00776 }
00777
00778
00779
00780
00781
00782
00783
00784 static void
00785 prepTuplestoreResult(FunctionCallInfo fcinfo)
00786 {
00787 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00788
00789
00790 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
00791 ereport(ERROR,
00792 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00793 errmsg("set-valued function called in context that cannot accept a set")));
00794 if (!(rsinfo->allowedModes & SFRM_Materialize))
00795 ereport(ERROR,
00796 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00797 errmsg("materialize mode required, but it is not allowed in this context")));
00798
00799
00800 rsinfo->returnMode = SFRM_Materialize;
00801
00802
00803 rsinfo->setResult = NULL;
00804 rsinfo->setDesc = NULL;
00805 }
00806
00807
00808
00809
00810
00811
00812 static void
00813 materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
00814 {
00815 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00816
00817
00818 Assert(rsinfo->returnMode == SFRM_Materialize);
00819
00820 PG_TRY();
00821 {
00822 TupleDesc tupdesc;
00823 bool is_sql_cmd;
00824 int ntuples;
00825 int nfields;
00826
00827 if (PQresultStatus(res) == PGRES_COMMAND_OK)
00828 {
00829 is_sql_cmd = true;
00830
00831
00832
00833
00834
00835 tupdesc = CreateTemplateTupleDesc(1, false);
00836 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
00837 TEXTOID, -1, 0);
00838 ntuples = 1;
00839 nfields = 1;
00840 }
00841 else
00842 {
00843 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
00844
00845 is_sql_cmd = false;
00846
00847
00848 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
00849 {
00850 case TYPEFUNC_COMPOSITE:
00851
00852 break;
00853 case TYPEFUNC_RECORD:
00854
00855 ereport(ERROR,
00856 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00857 errmsg("function returning record called in context "
00858 "that cannot accept type record")));
00859 break;
00860 default:
00861
00862 elog(ERROR, "return type must be a row type");
00863 break;
00864 }
00865
00866
00867 tupdesc = CreateTupleDescCopy(tupdesc);
00868 ntuples = PQntuples(res);
00869 nfields = PQnfields(res);
00870 }
00871
00872
00873
00874
00875 if (nfields != tupdesc->natts)
00876 ereport(ERROR,
00877 (errcode(ERRCODE_DATATYPE_MISMATCH),
00878 errmsg("remote query result rowtype does not match "
00879 "the specified FROM clause rowtype")));
00880
00881 if (ntuples > 0)
00882 {
00883 AttInMetadata *attinmeta;
00884 int nestlevel = -1;
00885 Tuplestorestate *tupstore;
00886 MemoryContext oldcontext;
00887 int row;
00888 char **values;
00889
00890 attinmeta = TupleDescGetAttInMetadata(tupdesc);
00891
00892
00893 if (!is_sql_cmd)
00894 nestlevel = applyRemoteGucs(conn);
00895
00896 oldcontext = MemoryContextSwitchTo(
00897 rsinfo->econtext->ecxt_per_query_memory);
00898 tupstore = tuplestore_begin_heap(true, false, work_mem);
00899 rsinfo->setResult = tupstore;
00900 rsinfo->setDesc = tupdesc;
00901 MemoryContextSwitchTo(oldcontext);
00902
00903 values = (char **) palloc(nfields * sizeof(char *));
00904
00905
00906 for (row = 0; row < ntuples; row++)
00907 {
00908 HeapTuple tuple;
00909
00910 if (!is_sql_cmd)
00911 {
00912 int i;
00913
00914 for (i = 0; i < nfields; i++)
00915 {
00916 if (PQgetisnull(res, row, i))
00917 values[i] = NULL;
00918 else
00919 values[i] = PQgetvalue(res, row, i);
00920 }
00921 }
00922 else
00923 {
00924 values[0] = PQcmdStatus(res);
00925 }
00926
00927
00928 tuple = BuildTupleFromCStrings(attinmeta, values);
00929 tuplestore_puttuple(tupstore, tuple);
00930 }
00931
00932
00933 restoreLocalGucs(nestlevel);
00934
00935
00936 tuplestore_donestoring(tupstore);
00937 }
00938
00939 PQclear(res);
00940 }
00941 PG_CATCH();
00942 {
00943
00944 PQclear(res);
00945 PG_RE_THROW();
00946 }
00947 PG_END_TRY();
00948 }
00949
00950
00951
00952
00953
00954
00955
00956
00957
00958 static void
00959 materializeQueryResult(FunctionCallInfo fcinfo,
00960 PGconn *conn,
00961 const char *conname,
00962 const char *sql,
00963 bool fail)
00964 {
00965 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00966 PGresult *volatile res = NULL;
00967 storeInfo sinfo;
00968
00969
00970 Assert(rsinfo->returnMode == SFRM_Materialize);
00971
00972
00973 memset(&sinfo, 0, sizeof(sinfo));
00974 sinfo.fcinfo = fcinfo;
00975
00976 PG_TRY();
00977 {
00978
00979 res = storeQueryResult(&sinfo, conn, sql);
00980
00981 if (!res ||
00982 (PQresultStatus(res) != PGRES_COMMAND_OK &&
00983 PQresultStatus(res) != PGRES_TUPLES_OK))
00984 {
00985
00986
00987
00988
00989 PGresult *res1 = res;
00990
00991 res = NULL;
00992 dblink_res_error(conname, res1, "could not execute query", fail);
00993
00994 }
00995 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
00996 {
00997
00998
00999
01000
01001 TupleDesc tupdesc;
01002 AttInMetadata *attinmeta;
01003 Tuplestorestate *tupstore;
01004 HeapTuple tuple;
01005 char *values[1];
01006 MemoryContext oldcontext;
01007
01008
01009
01010
01011
01012 tupdesc = CreateTemplateTupleDesc(1, false);
01013 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
01014 TEXTOID, -1, 0);
01015 attinmeta = TupleDescGetAttInMetadata(tupdesc);
01016
01017 oldcontext = MemoryContextSwitchTo(
01018 rsinfo->econtext->ecxt_per_query_memory);
01019 tupstore = tuplestore_begin_heap(true, false, work_mem);
01020 rsinfo->setResult = tupstore;
01021 rsinfo->setDesc = tupdesc;
01022 MemoryContextSwitchTo(oldcontext);
01023
01024 values[0] = PQcmdStatus(res);
01025
01026
01027 tuple = BuildTupleFromCStrings(attinmeta, values);
01028 tuplestore_puttuple(tupstore, tuple);
01029
01030 PQclear(res);
01031 res = NULL;
01032 }
01033 else
01034 {
01035 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
01036
01037 Assert(rsinfo->setResult != NULL);
01038
01039 PQclear(res);
01040 res = NULL;
01041 }
01042 PQclear(sinfo.last_res);
01043 sinfo.last_res = NULL;
01044 PQclear(sinfo.cur_res);
01045 sinfo.cur_res = NULL;
01046 }
01047 PG_CATCH();
01048 {
01049
01050 PQclear(res);
01051 PQclear(sinfo.last_res);
01052 PQclear(sinfo.cur_res);
01053
01054 while ((res = PQgetResult(conn)) != NULL)
01055 PQclear(res);
01056 PG_RE_THROW();
01057 }
01058 PG_END_TRY();
01059 }
01060
01061
01062
01063
01064 static PGresult *
01065 storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
01066 {
01067 bool first = true;
01068 int nestlevel = -1;
01069 PGresult *res;
01070
01071 if (!PQsendQuery(conn, sql))
01072 elog(ERROR, "could not send query: %s", PQerrorMessage(conn));
01073
01074 if (!PQsetSingleRowMode(conn))
01075 elog(ERROR, "failed to set single-row mode for dblink query");
01076
01077 for (;;)
01078 {
01079 CHECK_FOR_INTERRUPTS();
01080
01081 sinfo->cur_res = PQgetResult(conn);
01082 if (!sinfo->cur_res)
01083 break;
01084
01085 if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
01086 {
01087
01088
01089
01090
01091
01092
01093
01094 if (first && nestlevel < 0)
01095 nestlevel = applyRemoteGucs(conn);
01096
01097 storeRow(sinfo, sinfo->cur_res, first);
01098
01099 PQclear(sinfo->cur_res);
01100 sinfo->cur_res = NULL;
01101 first = false;
01102 }
01103 else
01104 {
01105
01106 if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
01107 storeRow(sinfo, sinfo->cur_res, first);
01108
01109
01110 PQclear(sinfo->last_res);
01111 sinfo->last_res = sinfo->cur_res;
01112 sinfo->cur_res = NULL;
01113 first = true;
01114 }
01115 }
01116
01117
01118 restoreLocalGucs(nestlevel);
01119
01120
01121 res = sinfo->last_res;
01122 sinfo->last_res = NULL;
01123 return res;
01124 }
01125
01126
01127
01128
01129
01130
01131
01132 static void
01133 storeRow(storeInfo *sinfo, PGresult *res, bool first)
01134 {
01135 int nfields = PQnfields(res);
01136 HeapTuple tuple;
01137 int i;
01138 MemoryContext oldcontext;
01139
01140 if (first)
01141 {
01142
01143 ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
01144 TupleDesc tupdesc;
01145
01146
01147
01148
01149
01150
01151 if (sinfo->tuplestore)
01152 tuplestore_end(sinfo->tuplestore);
01153 sinfo->tuplestore = NULL;
01154
01155
01156 switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
01157 {
01158 case TYPEFUNC_COMPOSITE:
01159
01160 break;
01161 case TYPEFUNC_RECORD:
01162
01163 ereport(ERROR,
01164 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01165 errmsg("function returning record called in context "
01166 "that cannot accept type record")));
01167 break;
01168 default:
01169
01170 elog(ERROR, "return type must be a row type");
01171 break;
01172 }
01173
01174
01175 tupdesc = CreateTupleDescCopy(tupdesc);
01176
01177
01178 if (nfields != tupdesc->natts)
01179 ereport(ERROR,
01180 (errcode(ERRCODE_DATATYPE_MISMATCH),
01181 errmsg("remote query result rowtype does not match "
01182 "the specified FROM clause rowtype")));
01183
01184
01185 sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
01186
01187
01188 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
01189 sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
01190 rsinfo->setResult = sinfo->tuplestore;
01191 rsinfo->setDesc = tupdesc;
01192 MemoryContextSwitchTo(oldcontext);
01193
01194
01195 if (PQntuples(res) == 0)
01196 return;
01197
01198
01199
01200
01201
01202 if (sinfo->cstrs)
01203 pfree(sinfo->cstrs);
01204 sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
01205
01206
01207 if (!sinfo->tmpcontext)
01208 sinfo->tmpcontext =
01209 AllocSetContextCreate(CurrentMemoryContext,
01210 "dblink temporary context",
01211 ALLOCSET_DEFAULT_MINSIZE,
01212 ALLOCSET_DEFAULT_INITSIZE,
01213 ALLOCSET_DEFAULT_MAXSIZE);
01214 }
01215
01216
01217 Assert(PQntuples(res) == 1);
01218
01219
01220
01221
01222
01223
01224 oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
01225
01226
01227
01228
01229 for (i = 0; i < nfields; i++)
01230 {
01231 if (PQgetisnull(res, 0, i))
01232 sinfo->cstrs[i] = NULL;
01233 else
01234 sinfo->cstrs[i] = PQgetvalue(res, 0, i);
01235 }
01236
01237
01238 tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
01239
01240 tuplestore_puttuple(sinfo->tuplestore, tuple);
01241
01242
01243 MemoryContextSwitchTo(oldcontext);
01244 MemoryContextReset(sinfo->tmpcontext);
01245 }
01246
01247
01248
01249
01250
01251
01252 PG_FUNCTION_INFO_V1(dblink_get_connections);
01253 Datum
01254 dblink_get_connections(PG_FUNCTION_ARGS)
01255 {
01256 HASH_SEQ_STATUS status;
01257 remoteConnHashEnt *hentry;
01258 ArrayBuildState *astate = NULL;
01259
01260 if (remoteConnHash)
01261 {
01262 hash_seq_init(&status, remoteConnHash);
01263 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
01264 {
01265
01266 astate = accumArrayResult(astate,
01267 CStringGetTextDatum(hentry->name),
01268 false, TEXTOID, CurrentMemoryContext);
01269 }
01270 }
01271
01272 if (astate)
01273 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
01274 CurrentMemoryContext));
01275 else
01276 PG_RETURN_NULL();
01277 }
01278
01279
01280
01281
01282
01283
01284
01285
01286
01287 PG_FUNCTION_INFO_V1(dblink_is_busy);
01288 Datum
01289 dblink_is_busy(PG_FUNCTION_ARGS)
01290 {
01291 char *conname = NULL;
01292 PGconn *conn = NULL;
01293 remoteConn *rconn = NULL;
01294
01295 DBLINK_INIT;
01296 DBLINK_GET_NAMED_CONN;
01297
01298 PQconsumeInput(conn);
01299 PG_RETURN_INT32(PQisBusy(conn));
01300 }
01301
01302
01303
01304
01305
01306
01307
01308
01309
01310
01311
01312
01313 PG_FUNCTION_INFO_V1(dblink_cancel_query);
01314 Datum
01315 dblink_cancel_query(PG_FUNCTION_ARGS)
01316 {
01317 int res = 0;
01318 char *conname = NULL;
01319 PGconn *conn = NULL;
01320 remoteConn *rconn = NULL;
01321 PGcancel *cancel;
01322 char errbuf[256];
01323
01324 DBLINK_INIT;
01325 DBLINK_GET_NAMED_CONN;
01326 cancel = PQgetCancel(conn);
01327
01328 res = PQcancel(cancel, errbuf, 256);
01329 PQfreeCancel(cancel);
01330
01331 if (res == 1)
01332 PG_RETURN_TEXT_P(cstring_to_text("OK"));
01333 else
01334 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
01335 }
01336
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348 PG_FUNCTION_INFO_V1(dblink_error_message);
01349 Datum
01350 dblink_error_message(PG_FUNCTION_ARGS)
01351 {
01352 char *msg;
01353 char *conname = NULL;
01354 PGconn *conn = NULL;
01355 remoteConn *rconn = NULL;
01356
01357 DBLINK_INIT;
01358 DBLINK_GET_NAMED_CONN;
01359
01360 msg = PQerrorMessage(conn);
01361 if (msg == NULL || msg[0] == '\0')
01362 PG_RETURN_TEXT_P(cstring_to_text("OK"));
01363 else
01364 PG_RETURN_TEXT_P(cstring_to_text(msg));
01365 }
01366
01367
01368
01369
01370 PG_FUNCTION_INFO_V1(dblink_exec);
01371 Datum
01372 dblink_exec(PG_FUNCTION_ARGS)
01373 {
01374 text *volatile sql_cmd_status = NULL;
01375 PGconn *volatile conn = NULL;
01376 volatile bool freeconn = false;
01377
01378 DBLINK_INIT;
01379
01380 PG_TRY();
01381 {
01382 char *msg;
01383 PGresult *res = NULL;
01384 char *connstr = NULL;
01385 char *sql = NULL;
01386 char *conname = NULL;
01387 remoteConn *rconn = NULL;
01388 bool fail = true;
01389
01390 if (PG_NARGS() == 3)
01391 {
01392
01393 DBLINK_GET_CONN;
01394 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
01395 fail = PG_GETARG_BOOL(2);
01396 }
01397 else if (PG_NARGS() == 2)
01398 {
01399
01400 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
01401 {
01402 conn = pconn->conn;
01403 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
01404 fail = PG_GETARG_BOOL(1);
01405 }
01406 else
01407 {
01408 DBLINK_GET_CONN;
01409 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
01410 }
01411 }
01412 else if (PG_NARGS() == 1)
01413 {
01414
01415 conn = pconn->conn;
01416 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
01417 }
01418 else
01419
01420 elog(ERROR, "wrong number of arguments");
01421
01422 if (!conn)
01423 DBLINK_CONN_NOT_AVAIL;
01424
01425 res = PQexec(conn, sql);
01426 if (!res ||
01427 (PQresultStatus(res) != PGRES_COMMAND_OK &&
01428 PQresultStatus(res) != PGRES_TUPLES_OK))
01429 {
01430 dblink_res_error(conname, res, "could not execute command", fail);
01431
01432
01433
01434
01435
01436 sql_cmd_status = cstring_to_text("ERROR");
01437 }
01438 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
01439 {
01440
01441
01442
01443
01444 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
01445 PQclear(res);
01446 }
01447 else
01448 {
01449 PQclear(res);
01450 ereport(ERROR,
01451 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
01452 errmsg("statement returning results not allowed")));
01453 }
01454 }
01455 PG_CATCH();
01456 {
01457
01458 if (freeconn)
01459 PQfinish(conn);
01460 PG_RE_THROW();
01461 }
01462 PG_END_TRY();
01463
01464
01465 if (freeconn)
01466 PQfinish(conn);
01467
01468 PG_RETURN_TEXT_P(sql_cmd_status);
01469 }
01470
01471
01472
01473
01474
01475
01476
01477
01478 PG_FUNCTION_INFO_V1(dblink_get_pkey);
01479 Datum
01480 dblink_get_pkey(PG_FUNCTION_ARGS)
01481 {
01482 int16 numatts;
01483 char **results;
01484 FuncCallContext *funcctx;
01485 int32 call_cntr;
01486 int32 max_calls;
01487 AttInMetadata *attinmeta;
01488 MemoryContext oldcontext;
01489
01490
01491 if (SRF_IS_FIRSTCALL())
01492 {
01493 Relation rel;
01494 TupleDesc tupdesc;
01495
01496
01497 funcctx = SRF_FIRSTCALL_INIT();
01498
01499
01500
01501
01502 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
01503
01504
01505 rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
01506
01507
01508 results = get_pkey_attnames(rel, &numatts);
01509
01510 relation_close(rel, AccessShareLock);
01511
01512
01513
01514
01515 tupdesc = CreateTemplateTupleDesc(2, false);
01516 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
01517 INT4OID, -1, 0);
01518 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
01519 TEXTOID, -1, 0);
01520
01521
01522
01523
01524
01525 attinmeta = TupleDescGetAttInMetadata(tupdesc);
01526 funcctx->attinmeta = attinmeta;
01527
01528 if ((results != NULL) && (numatts > 0))
01529 {
01530 funcctx->max_calls = numatts;
01531
01532
01533 funcctx->user_fctx = results;
01534 }
01535 else
01536 {
01537
01538 MemoryContextSwitchTo(oldcontext);
01539 SRF_RETURN_DONE(funcctx);
01540 }
01541
01542 MemoryContextSwitchTo(oldcontext);
01543 }
01544
01545
01546 funcctx = SRF_PERCALL_SETUP();
01547
01548
01549
01550
01551 call_cntr = funcctx->call_cntr;
01552 max_calls = funcctx->max_calls;
01553
01554 results = (char **) funcctx->user_fctx;
01555 attinmeta = funcctx->attinmeta;
01556
01557 if (call_cntr < max_calls)
01558 {
01559 char **values;
01560 HeapTuple tuple;
01561 Datum result;
01562
01563 values = (char **) palloc(2 * sizeof(char *));
01564 values[0] = (char *) palloc(12);
01565
01566 sprintf(values[0], "%d", call_cntr + 1);
01567
01568 values[1] = results[call_cntr];
01569
01570
01571 tuple = BuildTupleFromCStrings(attinmeta, values);
01572
01573
01574 result = HeapTupleGetDatum(tuple);
01575
01576 SRF_RETURN_NEXT(funcctx, result);
01577 }
01578 else
01579 {
01580
01581 SRF_RETURN_DONE(funcctx);
01582 }
01583 }
01584
01585
01586
01587
01588
01589
01590
01591
01592
01593
01594
01595
01596
01597
01598
01599
01600
01601
01602
01603
01604
01605 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
01606 Datum
01607 dblink_build_sql_insert(PG_FUNCTION_ARGS)
01608 {
01609 text *relname_text = PG_GETARG_TEXT_P(0);
01610 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
01611 int32 pknumatts_arg = PG_GETARG_INT32(2);
01612 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
01613 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
01614 Relation rel;
01615 int *pkattnums;
01616 int pknumatts;
01617 char **src_pkattvals;
01618 char **tgt_pkattvals;
01619 int src_nitems;
01620 int tgt_nitems;
01621 char *sql;
01622
01623
01624
01625
01626 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
01627
01628
01629
01630
01631 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
01632 &pkattnums, &pknumatts);
01633
01634
01635
01636
01637
01638 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
01639
01640
01641
01642
01643 if (src_nitems != pknumatts)
01644 ereport(ERROR,
01645 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01646 errmsg("source key array length must match number of key " \
01647 "attributes")));
01648
01649
01650
01651
01652
01653 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
01654
01655
01656
01657
01658 if (tgt_nitems != pknumatts)
01659 ereport(ERROR,
01660 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01661 errmsg("target key array length must match number of key " \
01662 "attributes")));
01663
01664
01665
01666
01667 sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
01668
01669
01670
01671
01672 relation_close(rel, AccessShareLock);
01673
01674
01675
01676
01677 PG_RETURN_TEXT_P(cstring_to_text(sql));
01678 }
01679
01680
01681
01682
01683
01684
01685
01686
01687
01688
01689
01690
01691
01692
01693
01694
01695
01696 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
01697 Datum
01698 dblink_build_sql_delete(PG_FUNCTION_ARGS)
01699 {
01700 text *relname_text = PG_GETARG_TEXT_P(0);
01701 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
01702 int32 pknumatts_arg = PG_GETARG_INT32(2);
01703 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
01704 Relation rel;
01705 int *pkattnums;
01706 int pknumatts;
01707 char **tgt_pkattvals;
01708 int tgt_nitems;
01709 char *sql;
01710
01711
01712
01713
01714 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
01715
01716
01717
01718
01719 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
01720 &pkattnums, &pknumatts);
01721
01722
01723
01724
01725
01726 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
01727
01728
01729
01730
01731 if (tgt_nitems != pknumatts)
01732 ereport(ERROR,
01733 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01734 errmsg("target key array length must match number of key " \
01735 "attributes")));
01736
01737
01738
01739
01740 sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
01741
01742
01743
01744
01745 relation_close(rel, AccessShareLock);
01746
01747
01748
01749
01750 PG_RETURN_TEXT_P(cstring_to_text(sql));
01751 }
01752
01753
01754
01755
01756
01757
01758
01759
01760
01761
01762
01763
01764
01765
01766
01767
01768
01769
01770
01771
01772
01773 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
01774 Datum
01775 dblink_build_sql_update(PG_FUNCTION_ARGS)
01776 {
01777 text *relname_text = PG_GETARG_TEXT_P(0);
01778 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
01779 int32 pknumatts_arg = PG_GETARG_INT32(2);
01780 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
01781 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
01782 Relation rel;
01783 int *pkattnums;
01784 int pknumatts;
01785 char **src_pkattvals;
01786 char **tgt_pkattvals;
01787 int src_nitems;
01788 int tgt_nitems;
01789 char *sql;
01790
01791
01792
01793
01794 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
01795
01796
01797
01798
01799 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
01800 &pkattnums, &pknumatts);
01801
01802
01803
01804
01805
01806 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
01807
01808
01809
01810
01811 if (src_nitems != pknumatts)
01812 ereport(ERROR,
01813 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01814 errmsg("source key array length must match number of key " \
01815 "attributes")));
01816
01817
01818
01819
01820
01821 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
01822
01823
01824
01825
01826 if (tgt_nitems != pknumatts)
01827 ereport(ERROR,
01828 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01829 errmsg("target key array length must match number of key " \
01830 "attributes")));
01831
01832
01833
01834
01835 sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
01836
01837
01838
01839
01840 relation_close(rel, AccessShareLock);
01841
01842
01843
01844
01845 PG_RETURN_TEXT_P(cstring_to_text(sql));
01846 }
01847
01848
01849
01850
01851
01852
01853
01854 PG_FUNCTION_INFO_V1(dblink_current_query);
01855 Datum
01856 dblink_current_query(PG_FUNCTION_ARGS)
01857 {
01858
01859 PG_RETURN_DATUM(current_query(fcinfo));
01860 }
01861
01862
01863
01864
01865
01866
01867
01868
01869
01870 #define DBLINK_NOTIFY_COLS 3
01871
01872 PG_FUNCTION_INFO_V1(dblink_get_notify);
01873 Datum
01874 dblink_get_notify(PG_FUNCTION_ARGS)
01875 {
01876 char *conname = NULL;
01877 PGconn *conn = NULL;
01878 remoteConn *rconn = NULL;
01879 PGnotify *notify;
01880 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
01881 TupleDesc tupdesc;
01882 Tuplestorestate *tupstore;
01883 MemoryContext per_query_ctx;
01884 MemoryContext oldcontext;
01885
01886 prepTuplestoreResult(fcinfo);
01887
01888 DBLINK_INIT;
01889 if (PG_NARGS() == 1)
01890 DBLINK_GET_NAMED_CONN;
01891 else
01892 conn = pconn->conn;
01893
01894
01895 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
01896 oldcontext = MemoryContextSwitchTo(per_query_ctx);
01897
01898 tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
01899 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
01900 TEXTOID, -1, 0);
01901 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
01902 INT4OID, -1, 0);
01903 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
01904 TEXTOID, -1, 0);
01905
01906 tupstore = tuplestore_begin_heap(true, false, work_mem);
01907 rsinfo->setResult = tupstore;
01908 rsinfo->setDesc = tupdesc;
01909
01910 MemoryContextSwitchTo(oldcontext);
01911
01912 PQconsumeInput(conn);
01913 while ((notify = PQnotifies(conn)) != NULL)
01914 {
01915 Datum values[DBLINK_NOTIFY_COLS];
01916 bool nulls[DBLINK_NOTIFY_COLS];
01917
01918 memset(values, 0, sizeof(values));
01919 memset(nulls, 0, sizeof(nulls));
01920
01921 if (notify->relname != NULL)
01922 values[0] = CStringGetTextDatum(notify->relname);
01923 else
01924 nulls[0] = true;
01925
01926 values[1] = Int32GetDatum(notify->be_pid);
01927
01928 if (notify->extra != NULL)
01929 values[2] = CStringGetTextDatum(notify->extra);
01930 else
01931 nulls[2] = true;
01932
01933 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
01934
01935 PQfreemem(notify);
01936 PQconsumeInput(conn);
01937 }
01938
01939
01940 tuplestore_donestoring(tupstore);
01941
01942 return (Datum) 0;
01943 }
01944
01945
01946
01947
01948
01949
01950
01951
01952 PG_FUNCTION_INFO_V1(dblink_fdw_validator);
01953 Datum
01954 dblink_fdw_validator(PG_FUNCTION_ARGS)
01955 {
01956 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
01957 Oid context = PG_GETARG_OID(1);
01958 ListCell *cell;
01959
01960 static const PQconninfoOption *options = NULL;
01961
01962
01963
01964
01965
01966
01967
01968
01969 if (!options)
01970 {
01971 options = PQconndefaults();
01972 if (!options)
01973 ereport(ERROR,
01974 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
01975 errmsg("out of memory"),
01976 errdetail("could not get libpq's default connection options")));
01977 }
01978
01979
01980 foreach(cell, options_list)
01981 {
01982 DefElem *def = (DefElem *) lfirst(cell);
01983
01984 if (!is_valid_dblink_option(options, def->defname, context))
01985 {
01986
01987
01988
01989
01990
01991 StringInfoData buf;
01992 const PQconninfoOption *opt;
01993
01994 initStringInfo(&buf);
01995 for (opt = options; opt->keyword; opt++)
01996 {
01997 if (is_valid_dblink_option(options, opt->keyword, context))
01998 appendStringInfo(&buf, "%s%s",
01999 (buf.len > 0) ? ", " : "",
02000 opt->keyword);
02001 }
02002 ereport(ERROR,
02003 (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
02004 errmsg("invalid option \"%s\"", def->defname),
02005 errhint("Valid options in this context are: %s",
02006 buf.data)));
02007 }
02008 }
02009
02010 PG_RETURN_VOID();
02011 }
02012
02013
02014
02015
02016
02017
02018
02019
02020
02021
02022
02023
02024
02025 static char **
02026 get_pkey_attnames(Relation rel, int16 *numatts)
02027 {
02028 Relation indexRelation;
02029 ScanKeyData skey;
02030 SysScanDesc scan;
02031 HeapTuple indexTuple;
02032 int i;
02033 char **result = NULL;
02034 TupleDesc tupdesc;
02035
02036
02037 *numatts = 0;
02038
02039 tupdesc = rel->rd_att;
02040
02041
02042 indexRelation = heap_open(IndexRelationId, AccessShareLock);
02043 ScanKeyInit(&skey,
02044 Anum_pg_index_indrelid,
02045 BTEqualStrategyNumber, F_OIDEQ,
02046 ObjectIdGetDatum(RelationGetRelid(rel)));
02047
02048 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
02049 SnapshotNow, 1, &skey);
02050
02051 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
02052 {
02053 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
02054
02055
02056 if (index->indisprimary)
02057 {
02058 *numatts = index->indnatts;
02059 if (*numatts > 0)
02060 {
02061 result = (char **) palloc(*numatts * sizeof(char *));
02062
02063 for (i = 0; i < *numatts; i++)
02064 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
02065 }
02066 break;
02067 }
02068 }
02069
02070 systable_endscan(scan);
02071 heap_close(indexRelation, AccessShareLock);
02072
02073 return result;
02074 }
02075
02076
02077
02078
02079
02080 static char **
02081 get_text_array_contents(ArrayType *array, int *numitems)
02082 {
02083 int ndim = ARR_NDIM(array);
02084 int *dims = ARR_DIMS(array);
02085 int nitems;
02086 int16 typlen;
02087 bool typbyval;
02088 char typalign;
02089 char **values;
02090 char *ptr;
02091 bits8 *bitmap;
02092 int bitmask;
02093 int i;
02094
02095 Assert(ARR_ELEMTYPE(array) == TEXTOID);
02096
02097 *numitems = nitems = ArrayGetNItems(ndim, dims);
02098
02099 get_typlenbyvalalign(ARR_ELEMTYPE(array),
02100 &typlen, &typbyval, &typalign);
02101
02102 values = (char **) palloc(nitems * sizeof(char *));
02103
02104 ptr = ARR_DATA_PTR(array);
02105 bitmap = ARR_NULLBITMAP(array);
02106 bitmask = 1;
02107
02108 for (i = 0; i < nitems; i++)
02109 {
02110 if (bitmap && (*bitmap & bitmask) == 0)
02111 {
02112 values[i] = NULL;
02113 }
02114 else
02115 {
02116 values[i] = TextDatumGetCString(PointerGetDatum(ptr));
02117 ptr = att_addlength_pointer(ptr, typlen, ptr);
02118 ptr = (char *) att_align_nominal(ptr, typalign);
02119 }
02120
02121
02122 if (bitmap)
02123 {
02124 bitmask <<= 1;
02125 if (bitmask == 0x100)
02126 {
02127 bitmap++;
02128 bitmask = 1;
02129 }
02130 }
02131 }
02132
02133 return values;
02134 }
02135
02136 static char *
02137 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
02138 {
02139 char *relname;
02140 HeapTuple tuple;
02141 TupleDesc tupdesc;
02142 int natts;
02143 StringInfoData buf;
02144 char *val;
02145 int key;
02146 int i;
02147 bool needComma;
02148
02149 initStringInfo(&buf);
02150
02151
02152 relname = generate_relation_name(rel);
02153
02154 tupdesc = rel->rd_att;
02155 natts = tupdesc->natts;
02156
02157 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
02158 if (!tuple)
02159 ereport(ERROR,
02160 (errcode(ERRCODE_CARDINALITY_VIOLATION),
02161 errmsg("source row not found")));
02162
02163 appendStringInfo(&buf, "INSERT INTO %s(", relname);
02164
02165 needComma = false;
02166 for (i = 0; i < natts; i++)
02167 {
02168 if (tupdesc->attrs[i]->attisdropped)
02169 continue;
02170
02171 if (needComma)
02172 appendStringInfo(&buf, ",");
02173
02174 appendStringInfoString(&buf,
02175 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
02176 needComma = true;
02177 }
02178
02179 appendStringInfo(&buf, ") VALUES(");
02180
02181
02182
02183
02184 needComma = false;
02185 for (i = 0; i < natts; i++)
02186 {
02187 if (tupdesc->attrs[i]->attisdropped)
02188 continue;
02189
02190 if (needComma)
02191 appendStringInfo(&buf, ",");
02192
02193 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
02194
02195 if (key >= 0)
02196 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
02197 else
02198 val = SPI_getvalue(tuple, tupdesc, i + 1);
02199
02200 if (val != NULL)
02201 {
02202 appendStringInfoString(&buf, quote_literal_cstr(val));
02203 pfree(val);
02204 }
02205 else
02206 appendStringInfo(&buf, "NULL");
02207 needComma = true;
02208 }
02209 appendStringInfo(&buf, ")");
02210
02211 return (buf.data);
02212 }
02213
02214 static char *
02215 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
02216 {
02217 char *relname;
02218 TupleDesc tupdesc;
02219 StringInfoData buf;
02220 int i;
02221
02222 initStringInfo(&buf);
02223
02224
02225 relname = generate_relation_name(rel);
02226
02227 tupdesc = rel->rd_att;
02228
02229 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
02230 for (i = 0; i < pknumatts; i++)
02231 {
02232 int pkattnum = pkattnums[i];
02233
02234 if (i > 0)
02235 appendStringInfo(&buf, " AND ");
02236
02237 appendStringInfoString(&buf,
02238 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
02239
02240 if (tgt_pkattvals[i] != NULL)
02241 appendStringInfo(&buf, " = %s",
02242 quote_literal_cstr(tgt_pkattvals[i]));
02243 else
02244 appendStringInfo(&buf, " IS NULL");
02245 }
02246
02247 return (buf.data);
02248 }
02249
02250 static char *
02251 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
02252 {
02253 char *relname;
02254 HeapTuple tuple;
02255 TupleDesc tupdesc;
02256 int natts;
02257 StringInfoData buf;
02258 char *val;
02259 int key;
02260 int i;
02261 bool needComma;
02262
02263 initStringInfo(&buf);
02264
02265
02266 relname = generate_relation_name(rel);
02267
02268 tupdesc = rel->rd_att;
02269 natts = tupdesc->natts;
02270
02271 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
02272 if (!tuple)
02273 ereport(ERROR,
02274 (errcode(ERRCODE_CARDINALITY_VIOLATION),
02275 errmsg("source row not found")));
02276
02277 appendStringInfo(&buf, "UPDATE %s SET ", relname);
02278
02279
02280
02281
02282 needComma = false;
02283 for (i = 0; i < natts; i++)
02284 {
02285 if (tupdesc->attrs[i]->attisdropped)
02286 continue;
02287
02288 if (needComma)
02289 appendStringInfo(&buf, ", ");
02290
02291 appendStringInfo(&buf, "%s = ",
02292 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
02293
02294 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
02295
02296 if (key >= 0)
02297 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
02298 else
02299 val = SPI_getvalue(tuple, tupdesc, i + 1);
02300
02301 if (val != NULL)
02302 {
02303 appendStringInfoString(&buf, quote_literal_cstr(val));
02304 pfree(val);
02305 }
02306 else
02307 appendStringInfoString(&buf, "NULL");
02308 needComma = true;
02309 }
02310
02311 appendStringInfo(&buf, " WHERE ");
02312
02313 for (i = 0; i < pknumatts; i++)
02314 {
02315 int pkattnum = pkattnums[i];
02316
02317 if (i > 0)
02318 appendStringInfo(&buf, " AND ");
02319
02320 appendStringInfo(&buf, "%s",
02321 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
02322
02323 val = tgt_pkattvals[i];
02324
02325 if (val != NULL)
02326 appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
02327 else
02328 appendStringInfo(&buf, " IS NULL");
02329 }
02330
02331 return (buf.data);
02332 }
02333
02334
02335
02336
02337
02338 static char *
02339 quote_ident_cstr(char *rawstr)
02340 {
02341 text *rawstr_text;
02342 text *result_text;
02343 char *result;
02344
02345 rawstr_text = cstring_to_text(rawstr);
02346 result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
02347 PointerGetDatum(rawstr_text)));
02348 result = text_to_cstring(result_text);
02349
02350 return result;
02351 }
02352
02353 static int
02354 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
02355 {
02356 int i;
02357
02358
02359
02360
02361 for (i = 0; i < pknumatts; i++)
02362 if (key == pkattnums[i])
02363 return i;
02364
02365 return -1;
02366 }
02367
02368 static HeapTuple
02369 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
02370 {
02371 char *relname;
02372 TupleDesc tupdesc;
02373 int natts;
02374 StringInfoData buf;
02375 int ret;
02376 HeapTuple tuple;
02377 int i;
02378
02379
02380
02381
02382 if ((ret = SPI_connect()) < 0)
02383
02384 elog(ERROR, "SPI connect failure - returned %d", ret);
02385
02386 initStringInfo(&buf);
02387
02388
02389 relname = generate_relation_name(rel);
02390
02391 tupdesc = rel->rd_att;
02392 natts = tupdesc->natts;
02393
02394
02395
02396
02397
02398
02399
02400
02401 appendStringInfoString(&buf, "SELECT ");
02402
02403 for (i = 0; i < natts; i++)
02404 {
02405 if (i > 0)
02406 appendStringInfoString(&buf, ", ");
02407
02408 if (tupdesc->attrs[i]->attisdropped)
02409 appendStringInfoString(&buf, "NULL");
02410 else
02411 appendStringInfoString(&buf,
02412 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
02413 }
02414
02415 appendStringInfo(&buf, " FROM %s WHERE ", relname);
02416
02417 for (i = 0; i < pknumatts; i++)
02418 {
02419 int pkattnum = pkattnums[i];
02420
02421 if (i > 0)
02422 appendStringInfo(&buf, " AND ");
02423
02424 appendStringInfoString(&buf,
02425 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
02426
02427 if (src_pkattvals[i] != NULL)
02428 appendStringInfo(&buf, " = %s",
02429 quote_literal_cstr(src_pkattvals[i]));
02430 else
02431 appendStringInfo(&buf, " IS NULL");
02432 }
02433
02434
02435
02436
02437 ret = SPI_exec(buf.data, 0);
02438 pfree(buf.data);
02439
02440
02441
02442
02443 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
02444 ereport(ERROR,
02445 (errcode(ERRCODE_CARDINALITY_VIOLATION),
02446 errmsg("source criteria matched more than one record")));
02447
02448 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
02449 {
02450 SPITupleTable *tuptable = SPI_tuptable;
02451
02452 tuple = SPI_copytuple(tuptable->vals[0]);
02453 SPI_finish();
02454
02455 return tuple;
02456 }
02457 else
02458 {
02459
02460
02461
02462 SPI_finish();
02463
02464 return NULL;
02465 }
02466
02467
02468
02469
02470 return NULL;
02471 }
02472
02473
02474
02475
02476
02477
02478 static Relation
02479 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
02480 {
02481 RangeVar *relvar;
02482 Relation rel;
02483 AclResult aclresult;
02484
02485 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
02486 rel = heap_openrv(relvar, lockmode);
02487
02488 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
02489 aclmode);
02490 if (aclresult != ACLCHECK_OK)
02491 aclcheck_error(aclresult, ACL_KIND_CLASS,
02492 RelationGetRelationName(rel));
02493
02494 return rel;
02495 }
02496
02497
02498
02499
02500
02501
02502
02503 static char *
02504 generate_relation_name(Relation rel)
02505 {
02506 char *nspname;
02507 char *result;
02508
02509
02510 if (RelationIsVisible(RelationGetRelid(rel)))
02511 nspname = NULL;
02512 else
02513 nspname = get_namespace_name(rel->rd_rel->relnamespace);
02514
02515 result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
02516
02517 return result;
02518 }
02519
02520
02521 static remoteConn *
02522 getConnectionByName(const char *name)
02523 {
02524 remoteConnHashEnt *hentry;
02525 char *key;
02526
02527 if (!remoteConnHash)
02528 remoteConnHash = createConnHash();
02529
02530 key = pstrdup(name);
02531 truncate_identifier(key, strlen(key), false);
02532 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
02533 key, HASH_FIND, NULL);
02534
02535 if (hentry)
02536 return (hentry->rconn);
02537
02538 return (NULL);
02539 }
02540
02541 static HTAB *
02542 createConnHash(void)
02543 {
02544 HASHCTL ctl;
02545
02546 ctl.keysize = NAMEDATALEN;
02547 ctl.entrysize = sizeof(remoteConnHashEnt);
02548
02549 return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
02550 }
02551
02552 static void
02553 createNewConnection(const char *name, remoteConn *rconn)
02554 {
02555 remoteConnHashEnt *hentry;
02556 bool found;
02557 char *key;
02558
02559 if (!remoteConnHash)
02560 remoteConnHash = createConnHash();
02561
02562 key = pstrdup(name);
02563 truncate_identifier(key, strlen(key), true);
02564 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
02565 HASH_ENTER, &found);
02566
02567 if (found)
02568 {
02569 PQfinish(rconn->conn);
02570 pfree(rconn);
02571
02572 ereport(ERROR,
02573 (errcode(ERRCODE_DUPLICATE_OBJECT),
02574 errmsg("duplicate connection name")));
02575 }
02576
02577 hentry->rconn = rconn;
02578 strlcpy(hentry->name, name, sizeof(hentry->name));
02579 }
02580
02581 static void
02582 deleteConnection(const char *name)
02583 {
02584 remoteConnHashEnt *hentry;
02585 bool found;
02586 char *key;
02587
02588 if (!remoteConnHash)
02589 remoteConnHash = createConnHash();
02590
02591 key = pstrdup(name);
02592 truncate_identifier(key, strlen(key), false);
02593 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
02594 key, HASH_REMOVE, &found);
02595
02596 if (!hentry)
02597 ereport(ERROR,
02598 (errcode(ERRCODE_UNDEFINED_OBJECT),
02599 errmsg("undefined connection name")));
02600
02601 }
02602
02603 static void
02604 dblink_security_check(PGconn *conn, remoteConn *rconn)
02605 {
02606 if (!superuser())
02607 {
02608 if (!PQconnectionUsedPassword(conn))
02609 {
02610 PQfinish(conn);
02611 if (rconn)
02612 pfree(rconn);
02613
02614 ereport(ERROR,
02615 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
02616 errmsg("password is required"),
02617 errdetail("Non-superuser cannot connect if the server does not request a password."),
02618 errhint("Target server's authentication method must be changed.")));
02619 }
02620 }
02621 }
02622
02623
02624
02625
02626
02627
02628
02629 static void
02630 dblink_connstr_check(const char *connstr)
02631 {
02632 if (!superuser())
02633 {
02634 PQconninfoOption *options;
02635 PQconninfoOption *option;
02636 bool connstr_gives_password = false;
02637
02638 options = PQconninfoParse(connstr, NULL);
02639 if (options)
02640 {
02641 for (option = options; option->keyword != NULL; option++)
02642 {
02643 if (strcmp(option->keyword, "password") == 0)
02644 {
02645 if (option->val != NULL && option->val[0] != '\0')
02646 {
02647 connstr_gives_password = true;
02648 break;
02649 }
02650 }
02651 }
02652 PQconninfoFree(options);
02653 }
02654
02655 if (!connstr_gives_password)
02656 ereport(ERROR,
02657 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
02658 errmsg("password is required"),
02659 errdetail("Non-superusers must provide a password in the connection string.")));
02660 }
02661 }
02662
02663 static void
02664 dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
02665 {
02666 int level;
02667 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
02668 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
02669 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
02670 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
02671 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
02672 int sqlstate;
02673 char *message_primary;
02674 char *message_detail;
02675 char *message_hint;
02676 char *message_context;
02677 const char *dblink_context_conname = "unnamed";
02678
02679 if (fail)
02680 level = ERROR;
02681 else
02682 level = NOTICE;
02683
02684 if (pg_diag_sqlstate)
02685 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
02686 pg_diag_sqlstate[1],
02687 pg_diag_sqlstate[2],
02688 pg_diag_sqlstate[3],
02689 pg_diag_sqlstate[4]);
02690 else
02691 sqlstate = ERRCODE_CONNECTION_FAILURE;
02692
02693 xpstrdup(message_primary, pg_diag_message_primary);
02694 xpstrdup(message_detail, pg_diag_message_detail);
02695 xpstrdup(message_hint, pg_diag_message_hint);
02696 xpstrdup(message_context, pg_diag_context);
02697
02698 if (res)
02699 PQclear(res);
02700
02701 if (conname)
02702 dblink_context_conname = conname;
02703
02704 ereport(level,
02705 (errcode(sqlstate),
02706 message_primary ? errmsg_internal("%s", message_primary) :
02707 errmsg("unknown error"),
02708 message_detail ? errdetail_internal("%s", message_detail) : 0,
02709 message_hint ? errhint("%s", message_hint) : 0,
02710 message_context ? errcontext("%s", message_context) : 0,
02711 errcontext("Error occurred on dblink connection named \"%s\": %s.",
02712 dblink_context_conname, dblink_context_msg)));
02713 }
02714
02715
02716
02717
02718 static char *
02719 get_connect_string(const char *servername)
02720 {
02721 ForeignServer *foreign_server = NULL;
02722 UserMapping *user_mapping;
02723 ListCell *cell;
02724 StringInfo buf = makeStringInfo();
02725 ForeignDataWrapper *fdw;
02726 AclResult aclresult;
02727 char *srvname;
02728
02729
02730 srvname = pstrdup(servername);
02731 truncate_identifier(srvname, strlen(srvname), false);
02732 foreign_server = GetForeignServerByName(srvname, true);
02733
02734 if (foreign_server)
02735 {
02736 Oid serverid = foreign_server->serverid;
02737 Oid fdwid = foreign_server->fdwid;
02738 Oid userid = GetUserId();
02739
02740 user_mapping = GetUserMapping(userid, serverid);
02741 fdw = GetForeignDataWrapper(fdwid);
02742
02743
02744 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
02745 if (aclresult != ACLCHECK_OK)
02746 aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
02747
02748 foreach(cell, fdw->options)
02749 {
02750 DefElem *def = lfirst(cell);
02751
02752 appendStringInfo(buf, "%s='%s' ", def->defname,
02753 escape_param_str(strVal(def->arg)));
02754 }
02755
02756 foreach(cell, foreign_server->options)
02757 {
02758 DefElem *def = lfirst(cell);
02759
02760 appendStringInfo(buf, "%s='%s' ", def->defname,
02761 escape_param_str(strVal(def->arg)));
02762 }
02763
02764 foreach(cell, user_mapping->options)
02765 {
02766
02767 DefElem *def = lfirst(cell);
02768
02769 appendStringInfo(buf, "%s='%s' ", def->defname,
02770 escape_param_str(strVal(def->arg)));
02771 }
02772
02773 return buf->data;
02774 }
02775 else
02776 return NULL;
02777 }
02778
02779
02780
02781
02782
02783
02784 static char *
02785 escape_param_str(const char *str)
02786 {
02787 const char *cp;
02788 StringInfo buf = makeStringInfo();
02789
02790 for (cp = str; *cp; cp++)
02791 {
02792 if (*cp == '\\' || *cp == '\'')
02793 appendStringInfoChar(buf, '\\');
02794 appendStringInfoChar(buf, *cp);
02795 }
02796
02797 return buf->data;
02798 }
02799
02800
02801
02802
02803
02804
02805
02806
02807
02808
02809
02810
02811
02812
02813
02814
02815 static void
02816 validate_pkattnums(Relation rel,
02817 int2vector *pkattnums_arg, int32 pknumatts_arg,
02818 int **pkattnums, int *pknumatts)
02819 {
02820 TupleDesc tupdesc = rel->rd_att;
02821 int natts = tupdesc->natts;
02822 int i;
02823
02824
02825 pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
02826
02827
02828 if (pknumatts_arg <= 0)
02829 ereport(ERROR,
02830 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
02831 errmsg("number of key attributes must be > 0")));
02832
02833
02834 *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
02835 *pknumatts = pknumatts_arg;
02836
02837
02838 for (i = 0; i < pknumatts_arg; i++)
02839 {
02840 int pkattnum = pkattnums_arg->values[i];
02841 int lnum;
02842 int j;
02843
02844
02845 if (pkattnum <= 0 || pkattnum > natts)
02846 ereport(ERROR,
02847 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
02848 errmsg("invalid attribute number %d", pkattnum)));
02849
02850
02851 lnum = 0;
02852 for (j = 0; j < natts; j++)
02853 {
02854
02855 if (tupdesc->attrs[j]->attisdropped)
02856 continue;
02857
02858 if (++lnum == pkattnum)
02859 break;
02860 }
02861
02862 if (j < natts)
02863 (*pkattnums)[i] = j;
02864 else
02865 ereport(ERROR,
02866 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
02867 errmsg("invalid attribute number %d", pkattnum)));
02868 }
02869 }
02870
02871
02872
02873
02874
02875
02876
02877
02878
02879
02880
02881
02882
02883
02884
02885
02886 static bool
02887 is_valid_dblink_option(const PQconninfoOption *options, const char *option,
02888 Oid context)
02889 {
02890 const PQconninfoOption *opt;
02891
02892
02893 for (opt = options; opt->keyword; opt++)
02894 {
02895 if (strcmp(opt->keyword, option) == 0)
02896 break;
02897 }
02898 if (opt->keyword == NULL)
02899 return false;
02900
02901
02902 if (strchr(opt->dispchar, 'D'))
02903 return false;
02904
02905
02906 if (strcmp(opt->keyword, "client_encoding") == 0)
02907 return false;
02908
02909
02910
02911
02912
02913 if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
02914 {
02915 if (context != UserMappingRelationId)
02916 return false;
02917 }
02918 else
02919 {
02920 if (context != ForeignServerRelationId)
02921 return false;
02922 }
02923
02924 return true;
02925 }
02926
02927
02928
02929
02930
02931
02932
02933
02934
02935
02936
02937 static int
02938 applyRemoteGucs(PGconn *conn)
02939 {
02940 static const char *const GUCsAffectingIO[] = {
02941 "DateStyle",
02942 "IntervalStyle"
02943 };
02944
02945 int nestlevel = -1;
02946 int i;
02947
02948 for (i = 0; i < lengthof(GUCsAffectingIO); i++)
02949 {
02950 const char *gucName = GUCsAffectingIO[i];
02951 const char *remoteVal = PQparameterStatus(conn, gucName);
02952 const char *localVal;
02953
02954
02955
02956
02957
02958
02959
02960 if (remoteVal == NULL)
02961 continue;
02962
02963
02964
02965
02966
02967 localVal = GetConfigOption(gucName, false, false);
02968 Assert(localVal != NULL);
02969
02970 if (strcmp(remoteVal, localVal) == 0)
02971 continue;
02972
02973
02974 if (nestlevel < 0)
02975 nestlevel = NewGUCNestLevel();
02976
02977
02978 (void) set_config_option(gucName, remoteVal,
02979 PGC_USERSET, PGC_S_SESSION,
02980 GUC_ACTION_SAVE, true, 0);
02981 }
02982
02983 return nestlevel;
02984 }
02985
02986
02987
02988
02989 static void
02990 restoreLocalGucs(int nestlevel)
02991 {
02992
02993 if (nestlevel > 0)
02994 AtEOXact_GUC(true, nestlevel);
02995 }