Header And Logo

PostgreSQL
| The world's most advanced open source database.

dblink.c

Go to the documentation of this file.
00001 /*
00002  * dblink.c
00003  *
00004  * Functions returning results from a remote database
00005  *
00006  * Joe Conway <[email protected]>
00007  * And contributors:
00008  * Darko Prenosil <[email protected]>
00009  * Shridhar Daithankar <[email protected]>
00010  *
00011  * contrib/dblink/dblink.c
00012  * Copyright (c) 2001-2013, PostgreSQL Global Development Group
00013  * ALL RIGHTS RESERVED;
00014  *
00015  * Permission to use, copy, modify, and distribute this software and its
00016  * documentation for any purpose, without fee, and without a written agreement
00017  * is hereby granted, provided that the above copyright notice and this
00018  * paragraph and the following two paragraphs appear in all copies.
00019  *
00020  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
00021  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
00022  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
00023  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
00024  * POSSIBILITY OF SUCH DAMAGE.
00025  *
00026  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
00027  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
00028  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
00029  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
00030  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
00031  *
00032  */
00033 #include "postgres.h"
00034 
00035 #include <limits.h>
00036 
00037 #include "libpq-fe.h"
00038 
00039 #include "access/htup_details.h"
00040 #include "access/reloptions.h"
00041 #include "catalog/indexing.h"
00042 #include "catalog/namespace.h"
00043 #include "catalog/pg_foreign_server.h"
00044 #include "catalog/pg_type.h"
00045 #include "catalog/pg_user_mapping.h"
00046 #include "executor/spi.h"
00047 #include "foreign/foreign.h"
00048 #include "funcapi.h"
00049 #include "lib/stringinfo.h"
00050 #include "mb/pg_wchar.h"
00051 #include "miscadmin.h"
00052 #include "parser/scansup.h"
00053 #include "utils/acl.h"
00054 #include "utils/builtins.h"
00055 #include "utils/fmgroids.h"
00056 #include "utils/guc.h"
00057 #include "utils/lsyscache.h"
00058 #include "utils/memutils.h"
00059 #include "utils/rel.h"
00060 #include "utils/tqual.h"
00061 
00062 #include "dblink.h"
00063 
00064 PG_MODULE_MAGIC;
00065 
00066 typedef struct remoteConn
00067 {
00068     PGconn     *conn;           /* Hold the remote connection */
00069     int         openCursorCount;    /* The number of open cursors */
00070     bool        newXactForCursor;       /* Opened a transaction for a cursor */
00071 } remoteConn;
00072 
00073 typedef struct storeInfo
00074 {
00075     FunctionCallInfo fcinfo;
00076     Tuplestorestate *tuplestore;
00077     AttInMetadata *attinmeta;
00078     MemoryContext tmpcontext;
00079     char      **cstrs;
00080     /* temp storage for results to avoid leaks on exception */
00081     PGresult   *last_res;
00082     PGresult   *cur_res;
00083 } storeInfo;
00084 
00085 /*
00086  * Internal declarations
00087  */
00088 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
00089 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
00090 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
00091                   PGresult *res);
00092 static void materializeQueryResult(FunctionCallInfo fcinfo,
00093                        PGconn *conn,
00094                        const char *conname,
00095                        const char *sql,
00096                        bool fail);
00097 static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
00098 static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
00099 static remoteConn *getConnectionByName(const char *name);
00100 static HTAB *createConnHash(void);
00101 static void createNewConnection(const char *name, remoteConn *rconn);
00102 static void deleteConnection(const char *name);
00103 static char **get_pkey_attnames(Relation rel, int16 *numatts);
00104 static char **get_text_array_contents(ArrayType *array, int *numitems);
00105 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
00106 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
00107 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
00108 static char *quote_ident_cstr(char *rawstr);
00109 static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
00110 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
00111 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
00112 static char *generate_relation_name(Relation rel);
00113 static void dblink_connstr_check(const char *connstr);
00114 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
00115 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
00116 static char *get_connect_string(const char *servername);
00117 static char *escape_param_str(const char *from);
00118 static void validate_pkattnums(Relation rel,
00119                    int2vector *pkattnums_arg, int32 pknumatts_arg,
00120                    int **pkattnums, int *pknumatts);
00121 static bool is_valid_dblink_option(const PQconninfoOption *options,
00122                        const char *option, Oid context);
00123 static int  applyRemoteGucs(PGconn *conn);
00124 static void restoreLocalGucs(int nestlevel);
00125 
00126 /* Global */
00127 static remoteConn *pconn = NULL;
00128 static HTAB *remoteConnHash = NULL;
00129 
00130 /*
00131  *  Following is list that holds multiple remote connections.
00132  *  Calling convention of each dblink function changes to accept
00133  *  connection name as the first parameter. The connection list is
00134  *  much like ecpg e.g. a mapping between a name and a PGconn object.
00135  */
00136 
00137 typedef struct remoteConnHashEnt
00138 {
00139     char        name[NAMEDATALEN];
00140     remoteConn *rconn;
00141 } remoteConnHashEnt;
00142 
00143 /* initial number of connection hashes */
00144 #define NUMCONN 16
00145 
00146 /* general utility */
00147 #define xpfree(var_) \
00148     do { \
00149         if (var_ != NULL) \
00150         { \
00151             pfree(var_); \
00152             var_ = NULL; \
00153         } \
00154     } while (0)
00155 
00156 #define xpstrdup(var_c, var_) \
00157     do { \
00158         if (var_ != NULL) \
00159             var_c = pstrdup(var_); \
00160         else \
00161             var_c = NULL; \
00162     } while (0)
00163 
00164 #define DBLINK_RES_INTERNALERROR(p2) \
00165     do { \
00166             msg = pstrdup(PQerrorMessage(conn)); \
00167             if (res) \
00168                 PQclear(res); \
00169             elog(ERROR, "%s: %s", p2, msg); \
00170     } while (0)
00171 
00172 #define DBLINK_CONN_NOT_AVAIL \
00173     do { \
00174         if(conname) \
00175             ereport(ERROR, \
00176                     (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
00177                      errmsg("connection \"%s\" not available", conname))); \
00178         else \
00179             ereport(ERROR, \
00180                     (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
00181                      errmsg("connection not available"))); \
00182     } while (0)
00183 
00184 #define DBLINK_GET_CONN \
00185     do { \
00186             char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
00187             rconn = getConnectionByName(conname_or_str); \
00188             if (rconn) \
00189             { \
00190                 conn = rconn->conn; \
00191                 conname = conname_or_str; \
00192             } \
00193             else \
00194             { \
00195                 connstr = get_connect_string(conname_or_str); \
00196                 if (connstr == NULL) \
00197                 { \
00198                     connstr = conname_or_str; \
00199                 } \
00200                 dblink_connstr_check(connstr); \
00201                 conn = PQconnectdb(connstr); \
00202                 if (PQstatus(conn) == CONNECTION_BAD) \
00203                 { \
00204                     msg = pstrdup(PQerrorMessage(conn)); \
00205                     PQfinish(conn); \
00206                     ereport(ERROR, \
00207                             (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
00208                              errmsg("could not establish connection"), \
00209                              errdetail_internal("%s", msg))); \
00210                 } \
00211                 dblink_security_check(conn, rconn); \
00212                 PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
00213                 freeconn = true; \
00214             } \
00215     } while (0)
00216 
00217 #define DBLINK_GET_NAMED_CONN \
00218     do { \
00219             conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
00220             rconn = getConnectionByName(conname); \
00221             if (rconn) \
00222                 conn = rconn->conn; \
00223             else \
00224                 DBLINK_CONN_NOT_AVAIL; \
00225     } while (0)
00226 
00227 #define DBLINK_INIT \
00228     do { \
00229             if (!pconn) \
00230             { \
00231                 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
00232                 pconn->conn = NULL; \
00233                 pconn->openCursorCount = 0; \
00234                 pconn->newXactForCursor = FALSE; \
00235             } \
00236     } while (0)
00237 
00238 /*
00239  * Create a persistent connection to another database
00240  */
00241 PG_FUNCTION_INFO_V1(dblink_connect);
00242 Datum
00243 dblink_connect(PG_FUNCTION_ARGS)
00244 {
00245     char       *conname_or_str = NULL;
00246     char       *connstr = NULL;
00247     char       *connname = NULL;
00248     char       *msg;
00249     PGconn     *conn = NULL;
00250     remoteConn *rconn = NULL;
00251 
00252     DBLINK_INIT;
00253 
00254     if (PG_NARGS() == 2)
00255     {
00256         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
00257         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00258     }
00259     else if (PG_NARGS() == 1)
00260         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
00261 
00262     if (connname)
00263         rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
00264                                                   sizeof(remoteConn));
00265 
00266     /* first check for valid foreign data server */
00267     connstr = get_connect_string(conname_or_str);
00268     if (connstr == NULL)
00269         connstr = conname_or_str;
00270 
00271     /* check password in connection string if not superuser */
00272     dblink_connstr_check(connstr);
00273     conn = PQconnectdb(connstr);
00274 
00275     if (PQstatus(conn) == CONNECTION_BAD)
00276     {
00277         msg = pstrdup(PQerrorMessage(conn));
00278         PQfinish(conn);
00279         if (rconn)
00280             pfree(rconn);
00281 
00282         ereport(ERROR,
00283                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
00284                  errmsg("could not establish connection"),
00285                  errdetail_internal("%s", msg)));
00286     }
00287 
00288     /* check password actually used if not superuser */
00289     dblink_security_check(conn, rconn);
00290 
00291     /* attempt to set client encoding to match server encoding */
00292     PQsetClientEncoding(conn, GetDatabaseEncodingName());
00293 
00294     if (connname)
00295     {
00296         rconn->conn = conn;
00297         createNewConnection(connname, rconn);
00298     }
00299     else
00300         pconn->conn = conn;
00301 
00302     PG_RETURN_TEXT_P(cstring_to_text("OK"));
00303 }
00304 
00305 /*
00306  * Clear a persistent connection to another database
00307  */
00308 PG_FUNCTION_INFO_V1(dblink_disconnect);
00309 Datum
00310 dblink_disconnect(PG_FUNCTION_ARGS)
00311 {
00312     char       *conname = NULL;
00313     remoteConn *rconn = NULL;
00314     PGconn     *conn = NULL;
00315 
00316     DBLINK_INIT;
00317 
00318     if (PG_NARGS() == 1)
00319     {
00320         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00321         rconn = getConnectionByName(conname);
00322         if (rconn)
00323             conn = rconn->conn;
00324     }
00325     else
00326         conn = pconn->conn;
00327 
00328     if (!conn)
00329         DBLINK_CONN_NOT_AVAIL;
00330 
00331     PQfinish(conn);
00332     if (rconn)
00333     {
00334         deleteConnection(conname);
00335         pfree(rconn);
00336     }
00337     else
00338         pconn->conn = NULL;
00339 
00340     PG_RETURN_TEXT_P(cstring_to_text("OK"));
00341 }
00342 
00343 /*
00344  * opens a cursor using a persistent connection
00345  */
00346 PG_FUNCTION_INFO_V1(dblink_open);
00347 Datum
00348 dblink_open(PG_FUNCTION_ARGS)
00349 {
00350     char       *msg;
00351     PGresult   *res = NULL;
00352     PGconn     *conn = NULL;
00353     char       *curname = NULL;
00354     char       *sql = NULL;
00355     char       *conname = NULL;
00356     StringInfoData buf;
00357     remoteConn *rconn = NULL;
00358     bool        fail = true;    /* default to backward compatible behavior */
00359 
00360     DBLINK_INIT;
00361     initStringInfo(&buf);
00362 
00363     if (PG_NARGS() == 2)
00364     {
00365         /* text,text */
00366         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00367         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00368         rconn = pconn;
00369     }
00370     else if (PG_NARGS() == 3)
00371     {
00372         /* might be text,text,text or text,text,bool */
00373         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
00374         {
00375             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00376             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00377             fail = PG_GETARG_BOOL(2);
00378             rconn = pconn;
00379         }
00380         else
00381         {
00382             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00383             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00384             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
00385             rconn = getConnectionByName(conname);
00386         }
00387     }
00388     else if (PG_NARGS() == 4)
00389     {
00390         /* text,text,text,bool */
00391         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00392         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00393         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
00394         fail = PG_GETARG_BOOL(3);
00395         rconn = getConnectionByName(conname);
00396     }
00397 
00398     if (!rconn || !rconn->conn)
00399         DBLINK_CONN_NOT_AVAIL;
00400     else
00401         conn = rconn->conn;
00402 
00403     /* If we are not in a transaction, start one */
00404     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
00405     {
00406         res = PQexec(conn, "BEGIN");
00407         if (PQresultStatus(res) != PGRES_COMMAND_OK)
00408             DBLINK_RES_INTERNALERROR("begin error");
00409         PQclear(res);
00410         rconn->newXactForCursor = TRUE;
00411 
00412         /*
00413          * Since transaction state was IDLE, we force cursor count to
00414          * initially be 0. This is needed as a previous ABORT might have wiped
00415          * out our transaction without maintaining the cursor count for us.
00416          */
00417         rconn->openCursorCount = 0;
00418     }
00419 
00420     /* if we started a transaction, increment cursor count */
00421     if (rconn->newXactForCursor)
00422         (rconn->openCursorCount)++;
00423 
00424     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
00425     res = PQexec(conn, buf.data);
00426     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
00427     {
00428         dblink_res_error(conname, res, "could not open cursor", fail);
00429         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
00430     }
00431 
00432     PQclear(res);
00433     PG_RETURN_TEXT_P(cstring_to_text("OK"));
00434 }
00435 
00436 /*
00437  * closes a cursor
00438  */
00439 PG_FUNCTION_INFO_V1(dblink_close);
00440 Datum
00441 dblink_close(PG_FUNCTION_ARGS)
00442 {
00443     PGconn     *conn = NULL;
00444     PGresult   *res = NULL;
00445     char       *curname = NULL;
00446     char       *conname = NULL;
00447     StringInfoData buf;
00448     char       *msg;
00449     remoteConn *rconn = NULL;
00450     bool        fail = true;    /* default to backward compatible behavior */
00451 
00452     DBLINK_INIT;
00453     initStringInfo(&buf);
00454 
00455     if (PG_NARGS() == 1)
00456     {
00457         /* text */
00458         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00459         rconn = pconn;
00460     }
00461     else if (PG_NARGS() == 2)
00462     {
00463         /* might be text,text or text,bool */
00464         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
00465         {
00466             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00467             fail = PG_GETARG_BOOL(1);
00468             rconn = pconn;
00469         }
00470         else
00471         {
00472             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00473             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00474             rconn = getConnectionByName(conname);
00475         }
00476     }
00477     if (PG_NARGS() == 3)
00478     {
00479         /* text,text,bool */
00480         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00481         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00482         fail = PG_GETARG_BOOL(2);
00483         rconn = getConnectionByName(conname);
00484     }
00485 
00486     if (!rconn || !rconn->conn)
00487         DBLINK_CONN_NOT_AVAIL;
00488     else
00489         conn = rconn->conn;
00490 
00491     appendStringInfo(&buf, "CLOSE %s", curname);
00492 
00493     /* close the cursor */
00494     res = PQexec(conn, buf.data);
00495     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
00496     {
00497         dblink_res_error(conname, res, "could not close cursor", fail);
00498         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
00499     }
00500 
00501     PQclear(res);
00502 
00503     /* if we started a transaction, decrement cursor count */
00504     if (rconn->newXactForCursor)
00505     {
00506         (rconn->openCursorCount)--;
00507 
00508         /* if count is zero, commit the transaction */
00509         if (rconn->openCursorCount == 0)
00510         {
00511             rconn->newXactForCursor = FALSE;
00512 
00513             res = PQexec(conn, "COMMIT");
00514             if (PQresultStatus(res) != PGRES_COMMAND_OK)
00515                 DBLINK_RES_INTERNALERROR("commit error");
00516             PQclear(res);
00517         }
00518     }
00519 
00520     PG_RETURN_TEXT_P(cstring_to_text("OK"));
00521 }
00522 
00523 /*
00524  * Fetch results from an open cursor
00525  */
00526 PG_FUNCTION_INFO_V1(dblink_fetch);
00527 Datum
00528 dblink_fetch(PG_FUNCTION_ARGS)
00529 {
00530     PGresult   *res = NULL;
00531     char       *conname = NULL;
00532     remoteConn *rconn = NULL;
00533     PGconn     *conn = NULL;
00534     StringInfoData buf;
00535     char       *curname = NULL;
00536     int         howmany = 0;
00537     bool        fail = true;    /* default to backward compatible */
00538 
00539     prepTuplestoreResult(fcinfo);
00540 
00541     DBLINK_INIT;
00542 
00543     if (PG_NARGS() == 4)
00544     {
00545         /* text,text,int,bool */
00546         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00547         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00548         howmany = PG_GETARG_INT32(2);
00549         fail = PG_GETARG_BOOL(3);
00550 
00551         rconn = getConnectionByName(conname);
00552         if (rconn)
00553             conn = rconn->conn;
00554     }
00555     else if (PG_NARGS() == 3)
00556     {
00557         /* text,text,int or text,int,bool */
00558         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
00559         {
00560             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00561             howmany = PG_GETARG_INT32(1);
00562             fail = PG_GETARG_BOOL(2);
00563             conn = pconn->conn;
00564         }
00565         else
00566         {
00567             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00568             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
00569             howmany = PG_GETARG_INT32(2);
00570 
00571             rconn = getConnectionByName(conname);
00572             if (rconn)
00573                 conn = rconn->conn;
00574         }
00575     }
00576     else if (PG_NARGS() == 2)
00577     {
00578         /* text,int */
00579         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00580         howmany = PG_GETARG_INT32(1);
00581         conn = pconn->conn;
00582     }
00583 
00584     if (!conn)
00585         DBLINK_CONN_NOT_AVAIL;
00586 
00587     initStringInfo(&buf);
00588     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
00589 
00590     /*
00591      * Try to execute the query.  Note that since libpq uses malloc, the
00592      * PGresult will be long-lived even though we are still in a short-lived
00593      * memory context.
00594      */
00595     res = PQexec(conn, buf.data);
00596     if (!res ||
00597         (PQresultStatus(res) != PGRES_COMMAND_OK &&
00598          PQresultStatus(res) != PGRES_TUPLES_OK))
00599     {
00600         dblink_res_error(conname, res, "could not fetch from cursor", fail);
00601         return (Datum) 0;
00602     }
00603     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
00604     {
00605         /* cursor does not exist - closed already or bad name */
00606         PQclear(res);
00607         ereport(ERROR,
00608                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
00609                  errmsg("cursor \"%s\" does not exist", curname)));
00610     }
00611 
00612     materializeResult(fcinfo, conn, res);
00613     return (Datum) 0;
00614 }
00615 
00616 /*
00617  * Note: this is the new preferred version of dblink
00618  */
00619 PG_FUNCTION_INFO_V1(dblink_record);
00620 Datum
00621 dblink_record(PG_FUNCTION_ARGS)
00622 {
00623     return dblink_record_internal(fcinfo, false);
00624 }
00625 
00626 PG_FUNCTION_INFO_V1(dblink_send_query);
00627 Datum
00628 dblink_send_query(PG_FUNCTION_ARGS)
00629 {
00630     char       *conname = NULL;
00631     PGconn     *conn = NULL;
00632     char       *sql = NULL;
00633     remoteConn *rconn = NULL;
00634     int         retval;
00635 
00636     if (PG_NARGS() == 2)
00637     {
00638         DBLINK_GET_NAMED_CONN;
00639         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00640     }
00641     else
00642         /* shouldn't happen */
00643         elog(ERROR, "wrong number of arguments");
00644 
00645     /* async query send */
00646     retval = PQsendQuery(conn, sql);
00647     if (retval != 1)
00648         elog(NOTICE, "could not send query: %s", PQerrorMessage(conn));
00649 
00650     PG_RETURN_INT32(retval);
00651 }
00652 
00653 PG_FUNCTION_INFO_V1(dblink_get_result);
00654 Datum
00655 dblink_get_result(PG_FUNCTION_ARGS)
00656 {
00657     return dblink_record_internal(fcinfo, true);
00658 }
00659 
00660 static Datum
00661 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
00662 {
00663     PGconn     *volatile conn = NULL;
00664     volatile bool freeconn = false;
00665 
00666     prepTuplestoreResult(fcinfo);
00667 
00668     DBLINK_INIT;
00669 
00670     PG_TRY();
00671     {
00672         char       *msg;
00673         char       *connstr = NULL;
00674         char       *sql = NULL;
00675         char       *conname = NULL;
00676         remoteConn *rconn = NULL;
00677         bool        fail = true;    /* default to backward compatible */
00678 
00679         if (!is_async)
00680         {
00681             if (PG_NARGS() == 3)
00682             {
00683                 /* text,text,bool */
00684                 DBLINK_GET_CONN;
00685                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00686                 fail = PG_GETARG_BOOL(2);
00687             }
00688             else if (PG_NARGS() == 2)
00689             {
00690                 /* text,text or text,bool */
00691                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
00692                 {
00693                     conn = pconn->conn;
00694                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
00695                     fail = PG_GETARG_BOOL(1);
00696                 }
00697                 else
00698                 {
00699                     DBLINK_GET_CONN;
00700                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00701                 }
00702             }
00703             else if (PG_NARGS() == 1)
00704             {
00705                 /* text */
00706                 conn = pconn->conn;
00707                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
00708             }
00709             else
00710                 /* shouldn't happen */
00711                 elog(ERROR, "wrong number of arguments");
00712         }
00713         else    /* is_async */
00714         {
00715             /* get async result */
00716             if (PG_NARGS() == 2)
00717             {
00718                 /* text,bool */
00719                 DBLINK_GET_NAMED_CONN;
00720                 fail = PG_GETARG_BOOL(1);
00721             }
00722             else if (PG_NARGS() == 1)
00723             {
00724                 /* text */
00725                 DBLINK_GET_NAMED_CONN;
00726             }
00727             else
00728                 /* shouldn't happen */
00729                 elog(ERROR, "wrong number of arguments");
00730         }
00731 
00732         if (!conn)
00733             DBLINK_CONN_NOT_AVAIL;
00734 
00735         if (!is_async)
00736         {
00737             /* synchronous query, use efficient tuple collection method */
00738             materializeQueryResult(fcinfo, conn, conname, sql, fail);
00739         }
00740         else
00741         {
00742             /* async result retrieval, do it the old way */
00743             PGresult   *res = PQgetResult(conn);
00744 
00745             /* NULL means we're all done with the async results */
00746             if (res)
00747             {
00748                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
00749                     PQresultStatus(res) != PGRES_TUPLES_OK)
00750                 {
00751                     dblink_res_error(conname, res, "could not execute query",
00752                                      fail);
00753                     /* if fail isn't set, we'll return an empty query result */
00754                 }
00755                 else
00756                 {
00757                     materializeResult(fcinfo, conn, res);
00758                 }
00759             }
00760         }
00761     }
00762     PG_CATCH();
00763     {
00764         /* if needed, close the connection to the database */
00765         if (freeconn)
00766             PQfinish(conn);
00767         PG_RE_THROW();
00768     }
00769     PG_END_TRY();
00770 
00771     /* if needed, close the connection to the database */
00772     if (freeconn)
00773         PQfinish(conn);
00774 
00775     return (Datum) 0;
00776 }
00777 
00778 /*
00779  * Verify function caller can handle a tuplestore result, and set up for that.
00780  *
00781  * Note: if the caller returns without actually creating a tuplestore, the
00782  * executor will treat the function result as an empty set.
00783  */
00784 static void
00785 prepTuplestoreResult(FunctionCallInfo fcinfo)
00786 {
00787     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00788 
00789     /* check to see if query supports us returning a tuplestore */
00790     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
00791         ereport(ERROR,
00792                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00793                  errmsg("set-valued function called in context that cannot accept a set")));
00794     if (!(rsinfo->allowedModes & SFRM_Materialize))
00795         ereport(ERROR,
00796                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00797                  errmsg("materialize mode required, but it is not allowed in this context")));
00798 
00799     /* let the executor know we're sending back a tuplestore */
00800     rsinfo->returnMode = SFRM_Materialize;
00801 
00802     /* caller must fill these to return a non-empty result */
00803     rsinfo->setResult = NULL;
00804     rsinfo->setDesc = NULL;
00805 }
00806 
00807 /*
00808  * Copy the contents of the PGresult into a tuplestore to be returned
00809  * as the result of the current function.
00810  * The PGresult will be released in this function.
00811  */
00812 static void
00813 materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
00814 {
00815     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00816 
00817     /* prepTuplestoreResult must have been called previously */
00818     Assert(rsinfo->returnMode == SFRM_Materialize);
00819 
00820     PG_TRY();
00821     {
00822         TupleDesc   tupdesc;
00823         bool        is_sql_cmd;
00824         int         ntuples;
00825         int         nfields;
00826 
00827         if (PQresultStatus(res) == PGRES_COMMAND_OK)
00828         {
00829             is_sql_cmd = true;
00830 
00831             /*
00832              * need a tuple descriptor representing one TEXT column to return
00833              * the command status string as our result tuple
00834              */
00835             tupdesc = CreateTemplateTupleDesc(1, false);
00836             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
00837                                TEXTOID, -1, 0);
00838             ntuples = 1;
00839             nfields = 1;
00840         }
00841         else
00842         {
00843             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
00844 
00845             is_sql_cmd = false;
00846 
00847             /* get a tuple descriptor for our result type */
00848             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
00849             {
00850                 case TYPEFUNC_COMPOSITE:
00851                     /* success */
00852                     break;
00853                 case TYPEFUNC_RECORD:
00854                     /* failed to determine actual type of RECORD */
00855                     ereport(ERROR,
00856                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00857                         errmsg("function returning record called in context "
00858                                "that cannot accept type record")));
00859                     break;
00860                 default:
00861                     /* result type isn't composite */
00862                     elog(ERROR, "return type must be a row type");
00863                     break;
00864             }
00865 
00866             /* make sure we have a persistent copy of the tupdesc */
00867             tupdesc = CreateTupleDescCopy(tupdesc);
00868             ntuples = PQntuples(res);
00869             nfields = PQnfields(res);
00870         }
00871 
00872         /*
00873          * check result and tuple descriptor have the same number of columns
00874          */
00875         if (nfields != tupdesc->natts)
00876             ereport(ERROR,
00877                     (errcode(ERRCODE_DATATYPE_MISMATCH),
00878                      errmsg("remote query result rowtype does not match "
00879                             "the specified FROM clause rowtype")));
00880 
00881         if (ntuples > 0)
00882         {
00883             AttInMetadata *attinmeta;
00884             int         nestlevel = -1;
00885             Tuplestorestate *tupstore;
00886             MemoryContext oldcontext;
00887             int         row;
00888             char      **values;
00889 
00890             attinmeta = TupleDescGetAttInMetadata(tupdesc);
00891 
00892             /* Set GUCs to ensure we read GUC-sensitive data types correctly */
00893             if (!is_sql_cmd)
00894                 nestlevel = applyRemoteGucs(conn);
00895 
00896             oldcontext = MemoryContextSwitchTo(
00897                                     rsinfo->econtext->ecxt_per_query_memory);
00898             tupstore = tuplestore_begin_heap(true, false, work_mem);
00899             rsinfo->setResult = tupstore;
00900             rsinfo->setDesc = tupdesc;
00901             MemoryContextSwitchTo(oldcontext);
00902 
00903             values = (char **) palloc(nfields * sizeof(char *));
00904 
00905             /* put all tuples into the tuplestore */
00906             for (row = 0; row < ntuples; row++)
00907             {
00908                 HeapTuple   tuple;
00909 
00910                 if (!is_sql_cmd)
00911                 {
00912                     int         i;
00913 
00914                     for (i = 0; i < nfields; i++)
00915                     {
00916                         if (PQgetisnull(res, row, i))
00917                             values[i] = NULL;
00918                         else
00919                             values[i] = PQgetvalue(res, row, i);
00920                     }
00921                 }
00922                 else
00923                 {
00924                     values[0] = PQcmdStatus(res);
00925                 }
00926 
00927                 /* build the tuple and put it into the tuplestore. */
00928                 tuple = BuildTupleFromCStrings(attinmeta, values);
00929                 tuplestore_puttuple(tupstore, tuple);
00930             }
00931 
00932             /* clean up GUC settings, if we changed any */
00933             restoreLocalGucs(nestlevel);
00934 
00935             /* clean up and return the tuplestore */
00936             tuplestore_donestoring(tupstore);
00937         }
00938 
00939         PQclear(res);
00940     }
00941     PG_CATCH();
00942     {
00943         /* be sure to release the libpq result */
00944         PQclear(res);
00945         PG_RE_THROW();
00946     }
00947     PG_END_TRY();
00948 }
00949 
00950 /*
00951  * Execute the given SQL command and store its results into a tuplestore
00952  * to be returned as the result of the current function.
00953  *
00954  * This is equivalent to PQexec followed by materializeResult, but we make
00955  * use of libpq's single-row mode to avoid accumulating the whole result
00956  * inside libpq before it gets transferred to the tuplestore.
00957  */
00958 static void
00959 materializeQueryResult(FunctionCallInfo fcinfo,
00960                        PGconn *conn,
00961                        const char *conname,
00962                        const char *sql,
00963                        bool fail)
00964 {
00965     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00966     PGresult   *volatile res = NULL;
00967     storeInfo   sinfo;
00968 
00969     /* prepTuplestoreResult must have been called previously */
00970     Assert(rsinfo->returnMode == SFRM_Materialize);
00971 
00972     /* initialize storeInfo to empty */
00973     memset(&sinfo, 0, sizeof(sinfo));
00974     sinfo.fcinfo = fcinfo;
00975 
00976     PG_TRY();
00977     {
00978         /* execute query, collecting any tuples into the tuplestore */
00979         res = storeQueryResult(&sinfo, conn, sql);
00980 
00981         if (!res ||
00982             (PQresultStatus(res) != PGRES_COMMAND_OK &&
00983              PQresultStatus(res) != PGRES_TUPLES_OK))
00984         {
00985             /*
00986              * dblink_res_error will clear the passed PGresult, so we need
00987              * this ugly dance to avoid doing so twice during error exit
00988              */
00989             PGresult   *res1 = res;
00990 
00991             res = NULL;
00992             dblink_res_error(conname, res1, "could not execute query", fail);
00993             /* if fail isn't set, we'll return an empty query result */
00994         }
00995         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
00996         {
00997             /*
00998              * storeRow didn't get called, so we need to convert the command
00999              * status string to a tuple manually
01000              */
01001             TupleDesc   tupdesc;
01002             AttInMetadata *attinmeta;
01003             Tuplestorestate *tupstore;
01004             HeapTuple   tuple;
01005             char       *values[1];
01006             MemoryContext oldcontext;
01007 
01008             /*
01009              * need a tuple descriptor representing one TEXT column to return
01010              * the command status string as our result tuple
01011              */
01012             tupdesc = CreateTemplateTupleDesc(1, false);
01013             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
01014                                TEXTOID, -1, 0);
01015             attinmeta = TupleDescGetAttInMetadata(tupdesc);
01016 
01017             oldcontext = MemoryContextSwitchTo(
01018                                     rsinfo->econtext->ecxt_per_query_memory);
01019             tupstore = tuplestore_begin_heap(true, false, work_mem);
01020             rsinfo->setResult = tupstore;
01021             rsinfo->setDesc = tupdesc;
01022             MemoryContextSwitchTo(oldcontext);
01023 
01024             values[0] = PQcmdStatus(res);
01025 
01026             /* build the tuple and put it into the tuplestore. */
01027             tuple = BuildTupleFromCStrings(attinmeta, values);
01028             tuplestore_puttuple(tupstore, tuple);
01029 
01030             PQclear(res);
01031             res = NULL;
01032         }
01033         else
01034         {
01035             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
01036             /* storeRow should have created a tuplestore */
01037             Assert(rsinfo->setResult != NULL);
01038 
01039             PQclear(res);
01040             res = NULL;
01041         }
01042         PQclear(sinfo.last_res);
01043         sinfo.last_res = NULL;
01044         PQclear(sinfo.cur_res);
01045         sinfo.cur_res = NULL;
01046     }
01047     PG_CATCH();
01048     {
01049         /* be sure to release any libpq result we collected */
01050         PQclear(res);
01051         PQclear(sinfo.last_res);
01052         PQclear(sinfo.cur_res);
01053         /* and clear out any pending data in libpq */
01054         while ((res = PQgetResult(conn)) != NULL)
01055             PQclear(res);
01056         PG_RE_THROW();
01057     }
01058     PG_END_TRY();
01059 }
01060 
01061 /*
01062  * Execute query, and send any result rows to sinfo->tuplestore.
01063  */
01064 static PGresult *
01065 storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
01066 {
01067     bool        first = true;
01068     int         nestlevel = -1;
01069     PGresult   *res;
01070 
01071     if (!PQsendQuery(conn, sql))
01072         elog(ERROR, "could not send query: %s", PQerrorMessage(conn));
01073 
01074     if (!PQsetSingleRowMode(conn))      /* shouldn't fail */
01075         elog(ERROR, "failed to set single-row mode for dblink query");
01076 
01077     for (;;)
01078     {
01079         CHECK_FOR_INTERRUPTS();
01080 
01081         sinfo->cur_res = PQgetResult(conn);
01082         if (!sinfo->cur_res)
01083             break;
01084 
01085         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
01086         {
01087             /* got one row from possibly-bigger resultset */
01088 
01089             /*
01090              * Set GUCs to ensure we read GUC-sensitive data types correctly.
01091              * We shouldn't do this until we have a row in hand, to ensure
01092              * libpq has seen any earlier ParameterStatus protocol messages.
01093              */
01094             if (first && nestlevel < 0)
01095                 nestlevel = applyRemoteGucs(conn);
01096 
01097             storeRow(sinfo, sinfo->cur_res, first);
01098 
01099             PQclear(sinfo->cur_res);
01100             sinfo->cur_res = NULL;
01101             first = false;
01102         }
01103         else
01104         {
01105             /* if empty resultset, fill tuplestore header */
01106             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
01107                 storeRow(sinfo, sinfo->cur_res, first);
01108 
01109             /* store completed result at last_res */
01110             PQclear(sinfo->last_res);
01111             sinfo->last_res = sinfo->cur_res;
01112             sinfo->cur_res = NULL;
01113             first = true;
01114         }
01115     }
01116 
01117     /* clean up GUC settings, if we changed any */
01118     restoreLocalGucs(nestlevel);
01119 
01120     /* return last_res */
01121     res = sinfo->last_res;
01122     sinfo->last_res = NULL;
01123     return res;
01124 }
01125 
01126 /*
01127  * Send single row to sinfo->tuplestore.
01128  *
01129  * If "first" is true, create the tuplestore using PGresult's metadata
01130  * (in this case the PGresult might contain either zero or one row).
01131  */
01132 static void
01133 storeRow(storeInfo *sinfo, PGresult *res, bool first)
01134 {
01135     int         nfields = PQnfields(res);
01136     HeapTuple   tuple;
01137     int         i;
01138     MemoryContext oldcontext;
01139 
01140     if (first)
01141     {
01142         /* Prepare for new result set */
01143         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
01144         TupleDesc   tupdesc;
01145 
01146         /*
01147          * It's possible to get more than one result set if the query string
01148          * contained multiple SQL commands.  In that case, we follow PQexec's
01149          * traditional behavior of throwing away all but the last result.
01150          */
01151         if (sinfo->tuplestore)
01152             tuplestore_end(sinfo->tuplestore);
01153         sinfo->tuplestore = NULL;
01154 
01155         /* get a tuple descriptor for our result type */
01156         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
01157         {
01158             case TYPEFUNC_COMPOSITE:
01159                 /* success */
01160                 break;
01161             case TYPEFUNC_RECORD:
01162                 /* failed to determine actual type of RECORD */
01163                 ereport(ERROR,
01164                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01165                          errmsg("function returning record called in context "
01166                                 "that cannot accept type record")));
01167                 break;
01168             default:
01169                 /* result type isn't composite */
01170                 elog(ERROR, "return type must be a row type");
01171                 break;
01172         }
01173 
01174         /* make sure we have a persistent copy of the tupdesc */
01175         tupdesc = CreateTupleDescCopy(tupdesc);
01176 
01177         /* check result and tuple descriptor have the same number of columns */
01178         if (nfields != tupdesc->natts)
01179             ereport(ERROR,
01180                     (errcode(ERRCODE_DATATYPE_MISMATCH),
01181                      errmsg("remote query result rowtype does not match "
01182                             "the specified FROM clause rowtype")));
01183 
01184         /* Prepare attinmeta for later data conversions */
01185         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
01186 
01187         /* Create a new, empty tuplestore */
01188         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
01189         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
01190         rsinfo->setResult = sinfo->tuplestore;
01191         rsinfo->setDesc = tupdesc;
01192         MemoryContextSwitchTo(oldcontext);
01193 
01194         /* Done if empty resultset */
01195         if (PQntuples(res) == 0)
01196             return;
01197 
01198         /*
01199          * Set up sufficiently-wide string pointers array; this won't change
01200          * in size so it's easy to preallocate.
01201          */
01202         if (sinfo->cstrs)
01203             pfree(sinfo->cstrs);
01204         sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
01205 
01206         /* Create short-lived memory context for data conversions */
01207         if (!sinfo->tmpcontext)
01208             sinfo->tmpcontext =
01209                 AllocSetContextCreate(CurrentMemoryContext,
01210                                       "dblink temporary context",
01211                                       ALLOCSET_DEFAULT_MINSIZE,
01212                                       ALLOCSET_DEFAULT_INITSIZE,
01213                                       ALLOCSET_DEFAULT_MAXSIZE);
01214     }
01215 
01216     /* Should have a single-row result if we get here */
01217     Assert(PQntuples(res) == 1);
01218 
01219     /*
01220      * Do the following work in a temp context that we reset after each tuple.
01221      * This cleans up not only the data we have direct access to, but any
01222      * cruft the I/O functions might leak.
01223      */
01224     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
01225 
01226     /*
01227      * Fill cstrs with null-terminated strings of column values.
01228      */
01229     for (i = 0; i < nfields; i++)
01230     {
01231         if (PQgetisnull(res, 0, i))
01232             sinfo->cstrs[i] = NULL;
01233         else
01234             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
01235     }
01236 
01237     /* Convert row to a tuple, and add it to the tuplestore */
01238     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
01239 
01240     tuplestore_puttuple(sinfo->tuplestore, tuple);
01241 
01242     /* Clean up */
01243     MemoryContextSwitchTo(oldcontext);
01244     MemoryContextReset(sinfo->tmpcontext);
01245 }
01246 
01247 /*
01248  * List all open dblink connections by name.
01249  * Returns an array of all connection names.
01250  * Takes no params
01251  */
01252 PG_FUNCTION_INFO_V1(dblink_get_connections);
01253 Datum
01254 dblink_get_connections(PG_FUNCTION_ARGS)
01255 {
01256     HASH_SEQ_STATUS status;
01257     remoteConnHashEnt *hentry;
01258     ArrayBuildState *astate = NULL;
01259 
01260     if (remoteConnHash)
01261     {
01262         hash_seq_init(&status, remoteConnHash);
01263         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
01264         {
01265             /* stash away current value */
01266             astate = accumArrayResult(astate,
01267                                       CStringGetTextDatum(hentry->name),
01268                                       false, TEXTOID, CurrentMemoryContext);
01269         }
01270     }
01271 
01272     if (astate)
01273         PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
01274                                               CurrentMemoryContext));
01275     else
01276         PG_RETURN_NULL();
01277 }
01278 
01279 /*
01280  * Checks if a given remote connection is busy
01281  *
01282  * Returns 1 if the connection is busy, 0 otherwise
01283  * Params:
01284  *  text connection_name - name of the connection to check
01285  *
01286  */
01287 PG_FUNCTION_INFO_V1(dblink_is_busy);
01288 Datum
01289 dblink_is_busy(PG_FUNCTION_ARGS)
01290 {
01291     char       *conname = NULL;
01292     PGconn     *conn = NULL;
01293     remoteConn *rconn = NULL;
01294 
01295     DBLINK_INIT;
01296     DBLINK_GET_NAMED_CONN;
01297 
01298     PQconsumeInput(conn);
01299     PG_RETURN_INT32(PQisBusy(conn));
01300 }
01301 
01302 /*
01303  * Cancels a running request on a connection
01304  *
01305  * Returns text:
01306  *  "OK" if the cancel request has been sent correctly,
01307  *      an error message otherwise
01308  *
01309  * Params:
01310  *  text connection_name - name of the connection to check
01311  *
01312  */
01313 PG_FUNCTION_INFO_V1(dblink_cancel_query);
01314 Datum
01315 dblink_cancel_query(PG_FUNCTION_ARGS)
01316 {
01317     int         res = 0;
01318     char       *conname = NULL;
01319     PGconn     *conn = NULL;
01320     remoteConn *rconn = NULL;
01321     PGcancel   *cancel;
01322     char        errbuf[256];
01323 
01324     DBLINK_INIT;
01325     DBLINK_GET_NAMED_CONN;
01326     cancel = PQgetCancel(conn);
01327 
01328     res = PQcancel(cancel, errbuf, 256);
01329     PQfreeCancel(cancel);
01330 
01331     if (res == 1)
01332         PG_RETURN_TEXT_P(cstring_to_text("OK"));
01333     else
01334         PG_RETURN_TEXT_P(cstring_to_text(errbuf));
01335 }
01336 
01337 
01338 /*
01339  * Get error message from a connection
01340  *
01341  * Returns text:
01342  *  "OK" if no error, an error message otherwise
01343  *
01344  * Params:
01345  *  text connection_name - name of the connection to check
01346  *
01347  */
01348 PG_FUNCTION_INFO_V1(dblink_error_message);
01349 Datum
01350 dblink_error_message(PG_FUNCTION_ARGS)
01351 {
01352     char       *msg;
01353     char       *conname = NULL;
01354     PGconn     *conn = NULL;
01355     remoteConn *rconn = NULL;
01356 
01357     DBLINK_INIT;
01358     DBLINK_GET_NAMED_CONN;
01359 
01360     msg = PQerrorMessage(conn);
01361     if (msg == NULL || msg[0] == '\0')
01362         PG_RETURN_TEXT_P(cstring_to_text("OK"));
01363     else
01364         PG_RETURN_TEXT_P(cstring_to_text(msg));
01365 }
01366 
01367 /*
01368  * Execute an SQL non-SELECT command
01369  */
01370 PG_FUNCTION_INFO_V1(dblink_exec);
01371 Datum
01372 dblink_exec(PG_FUNCTION_ARGS)
01373 {
01374     text       *volatile sql_cmd_status = NULL;
01375     PGconn     *volatile conn = NULL;
01376     volatile bool freeconn = false;
01377 
01378     DBLINK_INIT;
01379 
01380     PG_TRY();
01381     {
01382         char       *msg;
01383         PGresult   *res = NULL;
01384         char       *connstr = NULL;
01385         char       *sql = NULL;
01386         char       *conname = NULL;
01387         remoteConn *rconn = NULL;
01388         bool        fail = true;    /* default to backward compatible behavior */
01389 
01390         if (PG_NARGS() == 3)
01391         {
01392             /* must be text,text,bool */
01393             DBLINK_GET_CONN;
01394             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
01395             fail = PG_GETARG_BOOL(2);
01396         }
01397         else if (PG_NARGS() == 2)
01398         {
01399             /* might be text,text or text,bool */
01400             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
01401             {
01402                 conn = pconn->conn;
01403                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
01404                 fail = PG_GETARG_BOOL(1);
01405             }
01406             else
01407             {
01408                 DBLINK_GET_CONN;
01409                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
01410             }
01411         }
01412         else if (PG_NARGS() == 1)
01413         {
01414             /* must be single text argument */
01415             conn = pconn->conn;
01416             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
01417         }
01418         else
01419             /* shouldn't happen */
01420             elog(ERROR, "wrong number of arguments");
01421 
01422         if (!conn)
01423             DBLINK_CONN_NOT_AVAIL;
01424 
01425         res = PQexec(conn, sql);
01426         if (!res ||
01427             (PQresultStatus(res) != PGRES_COMMAND_OK &&
01428              PQresultStatus(res) != PGRES_TUPLES_OK))
01429         {
01430             dblink_res_error(conname, res, "could not execute command", fail);
01431 
01432             /*
01433              * and save a copy of the command status string to return as our
01434              * result tuple
01435              */
01436             sql_cmd_status = cstring_to_text("ERROR");
01437         }
01438         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
01439         {
01440             /*
01441              * and save a copy of the command status string to return as our
01442              * result tuple
01443              */
01444             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
01445             PQclear(res);
01446         }
01447         else
01448         {
01449             PQclear(res);
01450             ereport(ERROR,
01451                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
01452                    errmsg("statement returning results not allowed")));
01453         }
01454     }
01455     PG_CATCH();
01456     {
01457         /* if needed, close the connection to the database */
01458         if (freeconn)
01459             PQfinish(conn);
01460         PG_RE_THROW();
01461     }
01462     PG_END_TRY();
01463 
01464     /* if needed, close the connection to the database */
01465     if (freeconn)
01466         PQfinish(conn);
01467 
01468     PG_RETURN_TEXT_P(sql_cmd_status);
01469 }
01470 
01471 
01472 /*
01473  * dblink_get_pkey
01474  *
01475  * Return list of primary key fields for the supplied relation,
01476  * or NULL if none exists.
01477  */
01478 PG_FUNCTION_INFO_V1(dblink_get_pkey);
01479 Datum
01480 dblink_get_pkey(PG_FUNCTION_ARGS)
01481 {
01482     int16       numatts;
01483     char      **results;
01484     FuncCallContext *funcctx;
01485     int32       call_cntr;
01486     int32       max_calls;
01487     AttInMetadata *attinmeta;
01488     MemoryContext oldcontext;
01489 
01490     /* stuff done only on the first call of the function */
01491     if (SRF_IS_FIRSTCALL())
01492     {
01493         Relation    rel;
01494         TupleDesc   tupdesc;
01495 
01496         /* create a function context for cross-call persistence */
01497         funcctx = SRF_FIRSTCALL_INIT();
01498 
01499         /*
01500          * switch to memory context appropriate for multiple function calls
01501          */
01502         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
01503 
01504         /* open target relation */
01505         rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
01506 
01507         /* get the array of attnums */
01508         results = get_pkey_attnames(rel, &numatts);
01509 
01510         relation_close(rel, AccessShareLock);
01511 
01512         /*
01513          * need a tuple descriptor representing one INT and one TEXT column
01514          */
01515         tupdesc = CreateTemplateTupleDesc(2, false);
01516         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
01517                            INT4OID, -1, 0);
01518         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
01519                            TEXTOID, -1, 0);
01520 
01521         /*
01522          * Generate attribute metadata needed later to produce tuples from raw
01523          * C strings
01524          */
01525         attinmeta = TupleDescGetAttInMetadata(tupdesc);
01526         funcctx->attinmeta = attinmeta;
01527 
01528         if ((results != NULL) && (numatts > 0))
01529         {
01530             funcctx->max_calls = numatts;
01531 
01532             /* got results, keep track of them */
01533             funcctx->user_fctx = results;
01534         }
01535         else
01536         {
01537             /* fast track when no results */
01538             MemoryContextSwitchTo(oldcontext);
01539             SRF_RETURN_DONE(funcctx);
01540         }
01541 
01542         MemoryContextSwitchTo(oldcontext);
01543     }
01544 
01545     /* stuff done on every call of the function */
01546     funcctx = SRF_PERCALL_SETUP();
01547 
01548     /*
01549      * initialize per-call variables
01550      */
01551     call_cntr = funcctx->call_cntr;
01552     max_calls = funcctx->max_calls;
01553 
01554     results = (char **) funcctx->user_fctx;
01555     attinmeta = funcctx->attinmeta;
01556 
01557     if (call_cntr < max_calls)  /* do when there is more left to send */
01558     {
01559         char      **values;
01560         HeapTuple   tuple;
01561         Datum       result;
01562 
01563         values = (char **) palloc(2 * sizeof(char *));
01564         values[0] = (char *) palloc(12);        /* sign, 10 digits, '\0' */
01565 
01566         sprintf(values[0], "%d", call_cntr + 1);
01567 
01568         values[1] = results[call_cntr];
01569 
01570         /* build the tuple */
01571         tuple = BuildTupleFromCStrings(attinmeta, values);
01572 
01573         /* make the tuple into a datum */
01574         result = HeapTupleGetDatum(tuple);
01575 
01576         SRF_RETURN_NEXT(funcctx, result);
01577     }
01578     else
01579     {
01580         /* do when there is no more left */
01581         SRF_RETURN_DONE(funcctx);
01582     }
01583 }
01584 
01585 
01586 /*
01587  * dblink_build_sql_insert
01588  *
01589  * Used to generate an SQL insert statement
01590  * based on an existing tuple in a local relation.
01591  * This is useful for selectively replicating data
01592  * to another server via dblink.
01593  *
01594  * API:
01595  * <relname> - name of local table of interest
01596  * <pkattnums> - an int2vector of attnums which will be used
01597  * to identify the local tuple of interest
01598  * <pknumatts> - number of attnums in pkattnums
01599  * <src_pkattvals_arry> - text array of key values which will be used
01600  * to identify the local tuple of interest
01601  * <tgt_pkattvals_arry> - text array of key values which will be used
01602  * to build the string for execution remotely. These are substituted
01603  * for their counterparts in src_pkattvals_arry
01604  */
01605 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
01606 Datum
01607 dblink_build_sql_insert(PG_FUNCTION_ARGS)
01608 {
01609     text       *relname_text = PG_GETARG_TEXT_P(0);
01610     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
01611     int32       pknumatts_arg = PG_GETARG_INT32(2);
01612     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
01613     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
01614     Relation    rel;
01615     int        *pkattnums;
01616     int         pknumatts;
01617     char      **src_pkattvals;
01618     char      **tgt_pkattvals;
01619     int         src_nitems;
01620     int         tgt_nitems;
01621     char       *sql;
01622 
01623     /*
01624      * Open target relation.
01625      */
01626     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
01627 
01628     /*
01629      * Process pkattnums argument.
01630      */
01631     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
01632                        &pkattnums, &pknumatts);
01633 
01634     /*
01635      * Source array is made up of key values that will be used to locate the
01636      * tuple of interest from the local system.
01637      */
01638     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
01639 
01640     /*
01641      * There should be one source array key value for each key attnum
01642      */
01643     if (src_nitems != pknumatts)
01644         ereport(ERROR,
01645                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01646                  errmsg("source key array length must match number of key " \
01647                         "attributes")));
01648 
01649     /*
01650      * Target array is made up of key values that will be used to build the
01651      * SQL string for use on the remote system.
01652      */
01653     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
01654 
01655     /*
01656      * There should be one target array key value for each key attnum
01657      */
01658     if (tgt_nitems != pknumatts)
01659         ereport(ERROR,
01660                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01661                  errmsg("target key array length must match number of key " \
01662                         "attributes")));
01663 
01664     /*
01665      * Prep work is finally done. Go get the SQL string.
01666      */
01667     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
01668 
01669     /*
01670      * Now we can close the relation.
01671      */
01672     relation_close(rel, AccessShareLock);
01673 
01674     /*
01675      * And send it
01676      */
01677     PG_RETURN_TEXT_P(cstring_to_text(sql));
01678 }
01679 
01680 
01681 /*
01682  * dblink_build_sql_delete
01683  *
01684  * Used to generate an SQL delete statement.
01685  * This is useful for selectively replicating a
01686  * delete to another server via dblink.
01687  *
01688  * API:
01689  * <relname> - name of remote table of interest
01690  * <pkattnums> - an int2vector of attnums which will be used
01691  * to identify the remote tuple of interest
01692  * <pknumatts> - number of attnums in pkattnums
01693  * <tgt_pkattvals_arry> - text array of key values which will be used
01694  * to build the string for execution remotely.
01695  */
01696 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
01697 Datum
01698 dblink_build_sql_delete(PG_FUNCTION_ARGS)
01699 {
01700     text       *relname_text = PG_GETARG_TEXT_P(0);
01701     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
01702     int32       pknumatts_arg = PG_GETARG_INT32(2);
01703     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
01704     Relation    rel;
01705     int        *pkattnums;
01706     int         pknumatts;
01707     char      **tgt_pkattvals;
01708     int         tgt_nitems;
01709     char       *sql;
01710 
01711     /*
01712      * Open target relation.
01713      */
01714     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
01715 
01716     /*
01717      * Process pkattnums argument.
01718      */
01719     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
01720                        &pkattnums, &pknumatts);
01721 
01722     /*
01723      * Target array is made up of key values that will be used to build the
01724      * SQL string for use on the remote system.
01725      */
01726     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
01727 
01728     /*
01729      * There should be one target array key value for each key attnum
01730      */
01731     if (tgt_nitems != pknumatts)
01732         ereport(ERROR,
01733                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01734                  errmsg("target key array length must match number of key " \
01735                         "attributes")));
01736 
01737     /*
01738      * Prep work is finally done. Go get the SQL string.
01739      */
01740     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
01741 
01742     /*
01743      * Now we can close the relation.
01744      */
01745     relation_close(rel, AccessShareLock);
01746 
01747     /*
01748      * And send it
01749      */
01750     PG_RETURN_TEXT_P(cstring_to_text(sql));
01751 }
01752 
01753 
01754 /*
01755  * dblink_build_sql_update
01756  *
01757  * Used to generate an SQL update statement
01758  * based on an existing tuple in a local relation.
01759  * This is useful for selectively replicating data
01760  * to another server via dblink.
01761  *
01762  * API:
01763  * <relname> - name of local table of interest
01764  * <pkattnums> - an int2vector of attnums which will be used
01765  * to identify the local tuple of interest
01766  * <pknumatts> - number of attnums in pkattnums
01767  * <src_pkattvals_arry> - text array of key values which will be used
01768  * to identify the local tuple of interest
01769  * <tgt_pkattvals_arry> - text array of key values which will be used
01770  * to build the string for execution remotely. These are substituted
01771  * for their counterparts in src_pkattvals_arry
01772  */
01773 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
01774 Datum
01775 dblink_build_sql_update(PG_FUNCTION_ARGS)
01776 {
01777     text       *relname_text = PG_GETARG_TEXT_P(0);
01778     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
01779     int32       pknumatts_arg = PG_GETARG_INT32(2);
01780     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
01781     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
01782     Relation    rel;
01783     int        *pkattnums;
01784     int         pknumatts;
01785     char      **src_pkattvals;
01786     char      **tgt_pkattvals;
01787     int         src_nitems;
01788     int         tgt_nitems;
01789     char       *sql;
01790 
01791     /*
01792      * Open target relation.
01793      */
01794     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
01795 
01796     /*
01797      * Process pkattnums argument.
01798      */
01799     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
01800                        &pkattnums, &pknumatts);
01801 
01802     /*
01803      * Source array is made up of key values that will be used to locate the
01804      * tuple of interest from the local system.
01805      */
01806     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
01807 
01808     /*
01809      * There should be one source array key value for each key attnum
01810      */
01811     if (src_nitems != pknumatts)
01812         ereport(ERROR,
01813                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01814                  errmsg("source key array length must match number of key " \
01815                         "attributes")));
01816 
01817     /*
01818      * Target array is made up of key values that will be used to build the
01819      * SQL string for use on the remote system.
01820      */
01821     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
01822 
01823     /*
01824      * There should be one target array key value for each key attnum
01825      */
01826     if (tgt_nitems != pknumatts)
01827         ereport(ERROR,
01828                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
01829                  errmsg("target key array length must match number of key " \
01830                         "attributes")));
01831 
01832     /*
01833      * Prep work is finally done. Go get the SQL string.
01834      */
01835     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
01836 
01837     /*
01838      * Now we can close the relation.
01839      */
01840     relation_close(rel, AccessShareLock);
01841 
01842     /*
01843      * And send it
01844      */
01845     PG_RETURN_TEXT_P(cstring_to_text(sql));
01846 }
01847 
01848 /*
01849  * dblink_current_query
01850  * return the current query string
01851  * to allow its use in (among other things)
01852  * rewrite rules
01853  */
01854 PG_FUNCTION_INFO_V1(dblink_current_query);
01855 Datum
01856 dblink_current_query(PG_FUNCTION_ARGS)
01857 {
01858     /* This is now just an alias for the built-in function current_query() */
01859     PG_RETURN_DATUM(current_query(fcinfo));
01860 }
01861 
01862 /*
01863  * Retrieve async notifications for a connection.
01864  *
01865  * Returns a setof record of notifications, or an empty set if none received.
01866  * Can optionally take a named connection as parameter, but uses the unnamed
01867  * connection per default.
01868  *
01869  */
01870 #define DBLINK_NOTIFY_COLS      3
01871 
01872 PG_FUNCTION_INFO_V1(dblink_get_notify);
01873 Datum
01874 dblink_get_notify(PG_FUNCTION_ARGS)
01875 {
01876     char       *conname = NULL;
01877     PGconn     *conn = NULL;
01878     remoteConn *rconn = NULL;
01879     PGnotify   *notify;
01880     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
01881     TupleDesc   tupdesc;
01882     Tuplestorestate *tupstore;
01883     MemoryContext per_query_ctx;
01884     MemoryContext oldcontext;
01885 
01886     prepTuplestoreResult(fcinfo);
01887 
01888     DBLINK_INIT;
01889     if (PG_NARGS() == 1)
01890         DBLINK_GET_NAMED_CONN;
01891     else
01892         conn = pconn->conn;
01893 
01894     /* create the tuplestore in per-query memory */
01895     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
01896     oldcontext = MemoryContextSwitchTo(per_query_ctx);
01897 
01898     tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
01899     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
01900                        TEXTOID, -1, 0);
01901     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
01902                        INT4OID, -1, 0);
01903     TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
01904                        TEXTOID, -1, 0);
01905 
01906     tupstore = tuplestore_begin_heap(true, false, work_mem);
01907     rsinfo->setResult = tupstore;
01908     rsinfo->setDesc = tupdesc;
01909 
01910     MemoryContextSwitchTo(oldcontext);
01911 
01912     PQconsumeInput(conn);
01913     while ((notify = PQnotifies(conn)) != NULL)
01914     {
01915         Datum       values[DBLINK_NOTIFY_COLS];
01916         bool        nulls[DBLINK_NOTIFY_COLS];
01917 
01918         memset(values, 0, sizeof(values));
01919         memset(nulls, 0, sizeof(nulls));
01920 
01921         if (notify->relname != NULL)
01922             values[0] = CStringGetTextDatum(notify->relname);
01923         else
01924             nulls[0] = true;
01925 
01926         values[1] = Int32GetDatum(notify->be_pid);
01927 
01928         if (notify->extra != NULL)
01929             values[2] = CStringGetTextDatum(notify->extra);
01930         else
01931             nulls[2] = true;
01932 
01933         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
01934 
01935         PQfreemem(notify);
01936         PQconsumeInput(conn);
01937     }
01938 
01939     /* clean up and return the tuplestore */
01940     tuplestore_donestoring(tupstore);
01941 
01942     return (Datum) 0;
01943 }
01944 
01945 /*
01946  * Validate the options given to a dblink foreign server or user mapping.
01947  * Raise an error if any option is invalid.
01948  *
01949  * We just check the names of options here, so semantic errors in options,
01950  * such as invalid numeric format, will be detected at the attempt to connect.
01951  */
01952 PG_FUNCTION_INFO_V1(dblink_fdw_validator);
01953 Datum
01954 dblink_fdw_validator(PG_FUNCTION_ARGS)
01955 {
01956     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
01957     Oid         context = PG_GETARG_OID(1);
01958     ListCell   *cell;
01959 
01960     static const PQconninfoOption *options = NULL;
01961 
01962     /*
01963      * Get list of valid libpq options.
01964      *
01965      * To avoid unnecessary work, we get the list once and use it throughout
01966      * the lifetime of this backend process.  We don't need to care about
01967      * memory context issues, because PQconndefaults allocates with malloc.
01968      */
01969     if (!options)
01970     {
01971         options = PQconndefaults();
01972         if (!options)           /* assume reason for failure is OOM */
01973             ereport(ERROR,
01974                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
01975                      errmsg("out of memory"),
01976                      errdetail("could not get libpq's default connection options")));
01977     }
01978 
01979     /* Validate each supplied option. */
01980     foreach(cell, options_list)
01981     {
01982         DefElem    *def = (DefElem *) lfirst(cell);
01983 
01984         if (!is_valid_dblink_option(options, def->defname, context))
01985         {
01986             /*
01987              * Unknown option, or invalid option for the context specified,
01988              * so complain about it.  Provide a hint with list of valid
01989              * options for the context.
01990              */
01991             StringInfoData buf;
01992             const PQconninfoOption *opt;
01993 
01994             initStringInfo(&buf);
01995             for (opt = options; opt->keyword; opt++)
01996             {
01997                 if (is_valid_dblink_option(options, opt->keyword, context))
01998                     appendStringInfo(&buf, "%s%s",
01999                                      (buf.len > 0) ? ", " : "",
02000                                      opt->keyword);
02001             }
02002             ereport(ERROR,
02003                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
02004                      errmsg("invalid option \"%s\"", def->defname),
02005                      errhint("Valid options in this context are: %s",
02006                              buf.data)));
02007         }
02008     }
02009 
02010     PG_RETURN_VOID();
02011 }
02012 
02013 
02014 /*************************************************************
02015  * internal functions
02016  */
02017 
02018 
02019 /*
02020  * get_pkey_attnames
02021  *
02022  * Get the primary key attnames for the given relation.
02023  * Return NULL, and set numatts = 0, if no primary key exists.
02024  */
02025 static char **
02026 get_pkey_attnames(Relation rel, int16 *numatts)
02027 {
02028     Relation    indexRelation;
02029     ScanKeyData skey;
02030     SysScanDesc scan;
02031     HeapTuple   indexTuple;
02032     int         i;
02033     char      **result = NULL;
02034     TupleDesc   tupdesc;
02035 
02036     /* initialize numatts to 0 in case no primary key exists */
02037     *numatts = 0;
02038 
02039     tupdesc = rel->rd_att;
02040 
02041     /* Prepare to scan pg_index for entries having indrelid = this rel. */
02042     indexRelation = heap_open(IndexRelationId, AccessShareLock);
02043     ScanKeyInit(&skey,
02044                 Anum_pg_index_indrelid,
02045                 BTEqualStrategyNumber, F_OIDEQ,
02046                 ObjectIdGetDatum(RelationGetRelid(rel)));
02047 
02048     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
02049                               SnapshotNow, 1, &skey);
02050 
02051     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
02052     {
02053         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
02054 
02055         /* we're only interested if it is the primary key */
02056         if (index->indisprimary)
02057         {
02058             *numatts = index->indnatts;
02059             if (*numatts > 0)
02060             {
02061                 result = (char **) palloc(*numatts * sizeof(char *));
02062 
02063                 for (i = 0; i < *numatts; i++)
02064                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
02065             }
02066             break;
02067         }
02068     }
02069 
02070     systable_endscan(scan);
02071     heap_close(indexRelation, AccessShareLock);
02072 
02073     return result;
02074 }
02075 
02076 /*
02077  * Deconstruct a text[] into C-strings (note any NULL elements will be
02078  * returned as NULL pointers)
02079  */
02080 static char **
02081 get_text_array_contents(ArrayType *array, int *numitems)
02082 {
02083     int         ndim = ARR_NDIM(array);
02084     int        *dims = ARR_DIMS(array);
02085     int         nitems;
02086     int16       typlen;
02087     bool        typbyval;
02088     char        typalign;
02089     char      **values;
02090     char       *ptr;
02091     bits8      *bitmap;
02092     int         bitmask;
02093     int         i;
02094 
02095     Assert(ARR_ELEMTYPE(array) == TEXTOID);
02096 
02097     *numitems = nitems = ArrayGetNItems(ndim, dims);
02098 
02099     get_typlenbyvalalign(ARR_ELEMTYPE(array),
02100                          &typlen, &typbyval, &typalign);
02101 
02102     values = (char **) palloc(nitems * sizeof(char *));
02103 
02104     ptr = ARR_DATA_PTR(array);
02105     bitmap = ARR_NULLBITMAP(array);
02106     bitmask = 1;
02107 
02108     for (i = 0; i < nitems; i++)
02109     {
02110         if (bitmap && (*bitmap & bitmask) == 0)
02111         {
02112             values[i] = NULL;
02113         }
02114         else
02115         {
02116             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
02117             ptr = att_addlength_pointer(ptr, typlen, ptr);
02118             ptr = (char *) att_align_nominal(ptr, typalign);
02119         }
02120 
02121         /* advance bitmap pointer if any */
02122         if (bitmap)
02123         {
02124             bitmask <<= 1;
02125             if (bitmask == 0x100)
02126             {
02127                 bitmap++;
02128                 bitmask = 1;
02129             }
02130         }
02131     }
02132 
02133     return values;
02134 }
02135 
02136 static char *
02137 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
02138 {
02139     char       *relname;
02140     HeapTuple   tuple;
02141     TupleDesc   tupdesc;
02142     int         natts;
02143     StringInfoData buf;
02144     char       *val;
02145     int         key;
02146     int         i;
02147     bool        needComma;
02148 
02149     initStringInfo(&buf);
02150 
02151     /* get relation name including any needed schema prefix and quoting */
02152     relname = generate_relation_name(rel);
02153 
02154     tupdesc = rel->rd_att;
02155     natts = tupdesc->natts;
02156 
02157     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
02158     if (!tuple)
02159         ereport(ERROR,
02160                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
02161                  errmsg("source row not found")));
02162 
02163     appendStringInfo(&buf, "INSERT INTO %s(", relname);
02164 
02165     needComma = false;
02166     for (i = 0; i < natts; i++)
02167     {
02168         if (tupdesc->attrs[i]->attisdropped)
02169             continue;
02170 
02171         if (needComma)
02172             appendStringInfo(&buf, ",");
02173 
02174         appendStringInfoString(&buf,
02175                       quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
02176         needComma = true;
02177     }
02178 
02179     appendStringInfo(&buf, ") VALUES(");
02180 
02181     /*
02182      * Note: i is physical column number (counting from 0).
02183      */
02184     needComma = false;
02185     for (i = 0; i < natts; i++)
02186     {
02187         if (tupdesc->attrs[i]->attisdropped)
02188             continue;
02189 
02190         if (needComma)
02191             appendStringInfo(&buf, ",");
02192 
02193         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
02194 
02195         if (key >= 0)
02196             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
02197         else
02198             val = SPI_getvalue(tuple, tupdesc, i + 1);
02199 
02200         if (val != NULL)
02201         {
02202             appendStringInfoString(&buf, quote_literal_cstr(val));
02203             pfree(val);
02204         }
02205         else
02206             appendStringInfo(&buf, "NULL");
02207         needComma = true;
02208     }
02209     appendStringInfo(&buf, ")");
02210 
02211     return (buf.data);
02212 }
02213 
02214 static char *
02215 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
02216 {
02217     char       *relname;
02218     TupleDesc   tupdesc;
02219     StringInfoData buf;
02220     int         i;
02221 
02222     initStringInfo(&buf);
02223 
02224     /* get relation name including any needed schema prefix and quoting */
02225     relname = generate_relation_name(rel);
02226 
02227     tupdesc = rel->rd_att;
02228 
02229     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
02230     for (i = 0; i < pknumatts; i++)
02231     {
02232         int         pkattnum = pkattnums[i];
02233 
02234         if (i > 0)
02235             appendStringInfo(&buf, " AND ");
02236 
02237         appendStringInfoString(&buf,
02238                quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
02239 
02240         if (tgt_pkattvals[i] != NULL)
02241             appendStringInfo(&buf, " = %s",
02242                              quote_literal_cstr(tgt_pkattvals[i]));
02243         else
02244             appendStringInfo(&buf, " IS NULL");
02245     }
02246 
02247     return (buf.data);
02248 }
02249 
02250 static char *
02251 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
02252 {
02253     char       *relname;
02254     HeapTuple   tuple;
02255     TupleDesc   tupdesc;
02256     int         natts;
02257     StringInfoData buf;
02258     char       *val;
02259     int         key;
02260     int         i;
02261     bool        needComma;
02262 
02263     initStringInfo(&buf);
02264 
02265     /* get relation name including any needed schema prefix and quoting */
02266     relname = generate_relation_name(rel);
02267 
02268     tupdesc = rel->rd_att;
02269     natts = tupdesc->natts;
02270 
02271     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
02272     if (!tuple)
02273         ereport(ERROR,
02274                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
02275                  errmsg("source row not found")));
02276 
02277     appendStringInfo(&buf, "UPDATE %s SET ", relname);
02278 
02279     /*
02280      * Note: i is physical column number (counting from 0).
02281      */
02282     needComma = false;
02283     for (i = 0; i < natts; i++)
02284     {
02285         if (tupdesc->attrs[i]->attisdropped)
02286             continue;
02287 
02288         if (needComma)
02289             appendStringInfo(&buf, ", ");
02290 
02291         appendStringInfo(&buf, "%s = ",
02292                       quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
02293 
02294         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
02295 
02296         if (key >= 0)
02297             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
02298         else
02299             val = SPI_getvalue(tuple, tupdesc, i + 1);
02300 
02301         if (val != NULL)
02302         {
02303             appendStringInfoString(&buf, quote_literal_cstr(val));
02304             pfree(val);
02305         }
02306         else
02307             appendStringInfoString(&buf, "NULL");
02308         needComma = true;
02309     }
02310 
02311     appendStringInfo(&buf, " WHERE ");
02312 
02313     for (i = 0; i < pknumatts; i++)
02314     {
02315         int         pkattnum = pkattnums[i];
02316 
02317         if (i > 0)
02318             appendStringInfo(&buf, " AND ");
02319 
02320         appendStringInfo(&buf, "%s",
02321                quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
02322 
02323         val = tgt_pkattvals[i];
02324 
02325         if (val != NULL)
02326             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
02327         else
02328             appendStringInfo(&buf, " IS NULL");
02329     }
02330 
02331     return (buf.data);
02332 }
02333 
02334 /*
02335  * Return a properly quoted identifier.
02336  * Uses quote_ident in quote.c
02337  */
02338 static char *
02339 quote_ident_cstr(char *rawstr)
02340 {
02341     text       *rawstr_text;
02342     text       *result_text;
02343     char       *result;
02344 
02345     rawstr_text = cstring_to_text(rawstr);
02346     result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
02347                                               PointerGetDatum(rawstr_text)));
02348     result = text_to_cstring(result_text);
02349 
02350     return result;
02351 }
02352 
02353 static int
02354 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
02355 {
02356     int         i;
02357 
02358     /*
02359      * Not likely a long list anyway, so just scan for the value
02360      */
02361     for (i = 0; i < pknumatts; i++)
02362         if (key == pkattnums[i])
02363             return i;
02364 
02365     return -1;
02366 }
02367 
02368 static HeapTuple
02369 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
02370 {
02371     char       *relname;
02372     TupleDesc   tupdesc;
02373     int         natts;
02374     StringInfoData buf;
02375     int         ret;
02376     HeapTuple   tuple;
02377     int         i;
02378 
02379     /*
02380      * Connect to SPI manager
02381      */
02382     if ((ret = SPI_connect()) < 0)
02383         /* internal error */
02384         elog(ERROR, "SPI connect failure - returned %d", ret);
02385 
02386     initStringInfo(&buf);
02387 
02388     /* get relation name including any needed schema prefix and quoting */
02389     relname = generate_relation_name(rel);
02390 
02391     tupdesc = rel->rd_att;
02392     natts = tupdesc->natts;
02393 
02394     /*
02395      * Build sql statement to look up tuple of interest, ie, the one matching
02396      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
02397      * generate a result tuple that matches the table's physical structure,
02398      * with NULLs for any dropped columns.  Otherwise we have to deal with two
02399      * different tupdescs and everything's very confusing.
02400      */
02401     appendStringInfoString(&buf, "SELECT ");
02402 
02403     for (i = 0; i < natts; i++)
02404     {
02405         if (i > 0)
02406             appendStringInfoString(&buf, ", ");
02407 
02408         if (tupdesc->attrs[i]->attisdropped)
02409             appendStringInfoString(&buf, "NULL");
02410         else
02411             appendStringInfoString(&buf,
02412                       quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
02413     }
02414 
02415     appendStringInfo(&buf, " FROM %s WHERE ", relname);
02416 
02417     for (i = 0; i < pknumatts; i++)
02418     {
02419         int         pkattnum = pkattnums[i];
02420 
02421         if (i > 0)
02422             appendStringInfo(&buf, " AND ");
02423 
02424         appendStringInfoString(&buf,
02425                quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
02426 
02427         if (src_pkattvals[i] != NULL)
02428             appendStringInfo(&buf, " = %s",
02429                              quote_literal_cstr(src_pkattvals[i]));
02430         else
02431             appendStringInfo(&buf, " IS NULL");
02432     }
02433 
02434     /*
02435      * Retrieve the desired tuple
02436      */
02437     ret = SPI_exec(buf.data, 0);
02438     pfree(buf.data);
02439 
02440     /*
02441      * Only allow one qualifying tuple
02442      */
02443     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
02444         ereport(ERROR,
02445                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
02446                  errmsg("source criteria matched more than one record")));
02447 
02448     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
02449     {
02450         SPITupleTable *tuptable = SPI_tuptable;
02451 
02452         tuple = SPI_copytuple(tuptable->vals[0]);
02453         SPI_finish();
02454 
02455         return tuple;
02456     }
02457     else
02458     {
02459         /*
02460          * no qualifying tuples
02461          */
02462         SPI_finish();
02463 
02464         return NULL;
02465     }
02466 
02467     /*
02468      * never reached, but keep compiler quiet
02469      */
02470     return NULL;
02471 }
02472 
02473 /*
02474  * Open the relation named by relname_text, acquire specified type of lock,
02475  * verify we have specified permissions.
02476  * Caller must close rel when done with it.
02477  */
02478 static Relation
02479 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
02480 {
02481     RangeVar   *relvar;
02482     Relation    rel;
02483     AclResult   aclresult;
02484 
02485     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
02486     rel = heap_openrv(relvar, lockmode);
02487 
02488     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
02489                                   aclmode);
02490     if (aclresult != ACLCHECK_OK)
02491         aclcheck_error(aclresult, ACL_KIND_CLASS,
02492                        RelationGetRelationName(rel));
02493 
02494     return rel;
02495 }
02496 
02497 /*
02498  * generate_relation_name - copied from ruleutils.c
02499  *      Compute the name to display for a relation
02500  *
02501  * The result includes all necessary quoting and schema-prefixing.
02502  */
02503 static char *
02504 generate_relation_name(Relation rel)
02505 {
02506     char       *nspname;
02507     char       *result;
02508 
02509     /* Qualify the name if not visible in search path */
02510     if (RelationIsVisible(RelationGetRelid(rel)))
02511         nspname = NULL;
02512     else
02513         nspname = get_namespace_name(rel->rd_rel->relnamespace);
02514 
02515     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
02516 
02517     return result;
02518 }
02519 
02520 
02521 static remoteConn *
02522 getConnectionByName(const char *name)
02523 {
02524     remoteConnHashEnt *hentry;
02525     char       *key;
02526 
02527     if (!remoteConnHash)
02528         remoteConnHash = createConnHash();
02529 
02530     key = pstrdup(name);
02531     truncate_identifier(key, strlen(key), false);
02532     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
02533                                                key, HASH_FIND, NULL);
02534 
02535     if (hentry)
02536         return (hentry->rconn);
02537 
02538     return (NULL);
02539 }
02540 
02541 static HTAB *
02542 createConnHash(void)
02543 {
02544     HASHCTL     ctl;
02545 
02546     ctl.keysize = NAMEDATALEN;
02547     ctl.entrysize = sizeof(remoteConnHashEnt);
02548 
02549     return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
02550 }
02551 
02552 static void
02553 createNewConnection(const char *name, remoteConn *rconn)
02554 {
02555     remoteConnHashEnt *hentry;
02556     bool        found;
02557     char       *key;
02558 
02559     if (!remoteConnHash)
02560         remoteConnHash = createConnHash();
02561 
02562     key = pstrdup(name);
02563     truncate_identifier(key, strlen(key), true);
02564     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
02565                                                HASH_ENTER, &found);
02566 
02567     if (found)
02568     {
02569         PQfinish(rconn->conn);
02570         pfree(rconn);
02571 
02572         ereport(ERROR,
02573                 (errcode(ERRCODE_DUPLICATE_OBJECT),
02574                  errmsg("duplicate connection name")));
02575     }
02576 
02577     hentry->rconn = rconn;
02578     strlcpy(hentry->name, name, sizeof(hentry->name));
02579 }
02580 
02581 static void
02582 deleteConnection(const char *name)
02583 {
02584     remoteConnHashEnt *hentry;
02585     bool        found;
02586     char       *key;
02587 
02588     if (!remoteConnHash)
02589         remoteConnHash = createConnHash();
02590 
02591     key = pstrdup(name);
02592     truncate_identifier(key, strlen(key), false);
02593     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
02594                                                key, HASH_REMOVE, &found);
02595 
02596     if (!hentry)
02597         ereport(ERROR,
02598                 (errcode(ERRCODE_UNDEFINED_OBJECT),
02599                  errmsg("undefined connection name")));
02600 
02601 }
02602 
02603 static void
02604 dblink_security_check(PGconn *conn, remoteConn *rconn)
02605 {
02606     if (!superuser())
02607     {
02608         if (!PQconnectionUsedPassword(conn))
02609         {
02610             PQfinish(conn);
02611             if (rconn)
02612                 pfree(rconn);
02613 
02614             ereport(ERROR,
02615                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
02616                    errmsg("password is required"),
02617                    errdetail("Non-superuser cannot connect if the server does not request a password."),
02618                    errhint("Target server's authentication method must be changed.")));
02619         }
02620     }
02621 }
02622 
02623 /*
02624  * For non-superusers, insist that the connstr specify a password.  This
02625  * prevents a password from being picked up from .pgpass, a service file,
02626  * the environment, etc.  We don't want the postgres user's passwords
02627  * to be accessible to non-superusers.
02628  */
02629 static void
02630 dblink_connstr_check(const char *connstr)
02631 {
02632     if (!superuser())
02633     {
02634         PQconninfoOption *options;
02635         PQconninfoOption *option;
02636         bool        connstr_gives_password = false;
02637 
02638         options = PQconninfoParse(connstr, NULL);
02639         if (options)
02640         {
02641             for (option = options; option->keyword != NULL; option++)
02642             {
02643                 if (strcmp(option->keyword, "password") == 0)
02644                 {
02645                     if (option->val != NULL && option->val[0] != '\0')
02646                     {
02647                         connstr_gives_password = true;
02648                         break;
02649                     }
02650                 }
02651             }
02652             PQconninfoFree(options);
02653         }
02654 
02655         if (!connstr_gives_password)
02656             ereport(ERROR,
02657                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
02658                    errmsg("password is required"),
02659                    errdetail("Non-superusers must provide a password in the connection string.")));
02660     }
02661 }
02662 
02663 static void
02664 dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
02665 {
02666     int         level;
02667     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
02668     char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
02669     char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
02670     char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
02671     char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
02672     int         sqlstate;
02673     char       *message_primary;
02674     char       *message_detail;
02675     char       *message_hint;
02676     char       *message_context;
02677     const char *dblink_context_conname = "unnamed";
02678 
02679     if (fail)
02680         level = ERROR;
02681     else
02682         level = NOTICE;
02683 
02684     if (pg_diag_sqlstate)
02685         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
02686                                  pg_diag_sqlstate[1],
02687                                  pg_diag_sqlstate[2],
02688                                  pg_diag_sqlstate[3],
02689                                  pg_diag_sqlstate[4]);
02690     else
02691         sqlstate = ERRCODE_CONNECTION_FAILURE;
02692 
02693     xpstrdup(message_primary, pg_diag_message_primary);
02694     xpstrdup(message_detail, pg_diag_message_detail);
02695     xpstrdup(message_hint, pg_diag_message_hint);
02696     xpstrdup(message_context, pg_diag_context);
02697 
02698     if (res)
02699         PQclear(res);
02700 
02701     if (conname)
02702         dblink_context_conname = conname;
02703 
02704     ereport(level,
02705             (errcode(sqlstate),
02706              message_primary ? errmsg_internal("%s", message_primary) :
02707              errmsg("unknown error"),
02708              message_detail ? errdetail_internal("%s", message_detail) : 0,
02709              message_hint ? errhint("%s", message_hint) : 0,
02710              message_context ? errcontext("%s", message_context) : 0,
02711           errcontext("Error occurred on dblink connection named \"%s\": %s.",
02712                      dblink_context_conname, dblink_context_msg)));
02713 }
02714 
02715 /*
02716  * Obtain connection string for a foreign server
02717  */
02718 static char *
02719 get_connect_string(const char *servername)
02720 {
02721     ForeignServer *foreign_server = NULL;
02722     UserMapping *user_mapping;
02723     ListCell   *cell;
02724     StringInfo  buf = makeStringInfo();
02725     ForeignDataWrapper *fdw;
02726     AclResult   aclresult;
02727     char       *srvname;
02728 
02729     /* first gather the server connstr options */
02730     srvname = pstrdup(servername);
02731     truncate_identifier(srvname, strlen(srvname), false);
02732     foreign_server = GetForeignServerByName(srvname, true);
02733 
02734     if (foreign_server)
02735     {
02736         Oid         serverid = foreign_server->serverid;
02737         Oid         fdwid = foreign_server->fdwid;
02738         Oid         userid = GetUserId();
02739 
02740         user_mapping = GetUserMapping(userid, serverid);
02741         fdw = GetForeignDataWrapper(fdwid);
02742 
02743         /* Check permissions, user must have usage on the server. */
02744         aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
02745         if (aclresult != ACLCHECK_OK)
02746             aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
02747 
02748         foreach(cell, fdw->options)
02749         {
02750             DefElem    *def = lfirst(cell);
02751 
02752             appendStringInfo(buf, "%s='%s' ", def->defname,
02753                              escape_param_str(strVal(def->arg)));
02754         }
02755 
02756         foreach(cell, foreign_server->options)
02757         {
02758             DefElem    *def = lfirst(cell);
02759 
02760             appendStringInfo(buf, "%s='%s' ", def->defname,
02761                              escape_param_str(strVal(def->arg)));
02762         }
02763 
02764         foreach(cell, user_mapping->options)
02765         {
02766 
02767             DefElem    *def = lfirst(cell);
02768 
02769             appendStringInfo(buf, "%s='%s' ", def->defname,
02770                              escape_param_str(strVal(def->arg)));
02771         }
02772 
02773         return buf->data;
02774     }
02775     else
02776         return NULL;
02777 }
02778 
02779 /*
02780  * Escaping libpq connect parameter strings.
02781  *
02782  * Replaces "'" with "\'" and "\" with "\\".
02783  */
02784 static char *
02785 escape_param_str(const char *str)
02786 {
02787     const char *cp;
02788     StringInfo  buf = makeStringInfo();
02789 
02790     for (cp = str; *cp; cp++)
02791     {
02792         if (*cp == '\\' || *cp == '\'')
02793             appendStringInfoChar(buf, '\\');
02794         appendStringInfoChar(buf, *cp);
02795     }
02796 
02797     return buf->data;
02798 }
02799 
02800 /*
02801  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
02802  * functions, and translate to the internal representation.
02803  *
02804  * The user supplies an int2vector of 1-based logical attnums, plus a count
02805  * argument (the need for the separate count argument is historical, but we
02806  * still check it).  We check that each attnum corresponds to a valid,
02807  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
02808  * listed twice, though the actual use-case for such things is dubious.
02809  * Note that before Postgres 9.0, the user's attnums were interpreted as
02810  * physical not logical column numbers; this was changed for future-proofing.
02811  *
02812  * The internal representation is a palloc'd int array of 0-based physical
02813  * attnums.
02814  */
02815 static void
02816 validate_pkattnums(Relation rel,
02817                    int2vector *pkattnums_arg, int32 pknumatts_arg,
02818                    int **pkattnums, int *pknumatts)
02819 {
02820     TupleDesc   tupdesc = rel->rd_att;
02821     int         natts = tupdesc->natts;
02822     int         i;
02823 
02824     /* Don't take more array elements than there are */
02825     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
02826 
02827     /* Must have at least one pk attnum selected */
02828     if (pknumatts_arg <= 0)
02829         ereport(ERROR,
02830                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
02831                  errmsg("number of key attributes must be > 0")));
02832 
02833     /* Allocate output array */
02834     *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
02835     *pknumatts = pknumatts_arg;
02836 
02837     /* Validate attnums and convert to internal form */
02838     for (i = 0; i < pknumatts_arg; i++)
02839     {
02840         int         pkattnum = pkattnums_arg->values[i];
02841         int         lnum;
02842         int         j;
02843 
02844         /* Can throw error immediately if out of range */
02845         if (pkattnum <= 0 || pkattnum > natts)
02846             ereport(ERROR,
02847                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
02848                      errmsg("invalid attribute number %d", pkattnum)));
02849 
02850         /* Identify which physical column has this logical number */
02851         lnum = 0;
02852         for (j = 0; j < natts; j++)
02853         {
02854             /* dropped columns don't count */
02855             if (tupdesc->attrs[j]->attisdropped)
02856                 continue;
02857 
02858             if (++lnum == pkattnum)
02859                 break;
02860         }
02861 
02862         if (j < natts)
02863             (*pkattnums)[i] = j;
02864         else
02865             ereport(ERROR,
02866                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
02867                      errmsg("invalid attribute number %d", pkattnum)));
02868     }
02869 }
02870 
02871 /*
02872  * Check if the specified connection option is valid.
02873  *
02874  * We basically allow whatever libpq thinks is an option, with these
02875  * restrictions:
02876  *      debug options: disallowed
02877  *      "client_encoding": disallowed
02878  *      "user": valid only in USER MAPPING options
02879  *      secure options (eg password): valid only in USER MAPPING options
02880  *      others: valid only in FOREIGN SERVER options
02881  *
02882  * We disallow client_encoding because it would be overridden anyway via
02883  * PQclientEncoding; allowing it to be specified would merely promote
02884  * confusion.
02885  */
02886 static bool
02887 is_valid_dblink_option(const PQconninfoOption *options, const char *option,
02888                        Oid context)
02889 {
02890     const PQconninfoOption *opt;
02891 
02892     /* Look up the option in libpq result */
02893     for (opt = options; opt->keyword; opt++)
02894     {
02895         if (strcmp(opt->keyword, option) == 0)
02896             break;
02897     }
02898     if (opt->keyword == NULL)
02899         return false;
02900 
02901     /* Disallow debug options (particularly "replication") */
02902     if (strchr(opt->dispchar, 'D'))
02903         return false;
02904 
02905     /* Disallow "client_encoding" */
02906     if (strcmp(opt->keyword, "client_encoding") == 0)
02907         return false;
02908 
02909     /*
02910      * If the option is "user" or marked secure, it should be specified only
02911      * in USER MAPPING.  Others should be specified only in SERVER.
02912      */
02913     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
02914     {
02915         if (context != UserMappingRelationId)
02916             return false;
02917     }
02918     else
02919     {
02920         if (context != ForeignServerRelationId)
02921             return false;
02922     }
02923 
02924     return true;
02925 }
02926 
02927 /*
02928  * Copy the remote session's values of GUCs that affect datatype I/O
02929  * and apply them locally in a new GUC nesting level.  Returns the new
02930  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
02931  * or -1 if no new nestlevel was needed.
02932  *
02933  * We use the equivalent of a function SET option to allow the settings to
02934  * persist only until the caller calls restoreLocalGucs.  If an error is
02935  * thrown in between, guc.c will take care of undoing the settings.
02936  */
02937 static int
02938 applyRemoteGucs(PGconn *conn)
02939 {
02940     static const char *const GUCsAffectingIO[] = {
02941         "DateStyle",
02942         "IntervalStyle"
02943     };
02944 
02945     int         nestlevel = -1;
02946     int         i;
02947 
02948     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
02949     {
02950         const char *gucName = GUCsAffectingIO[i];
02951         const char *remoteVal = PQparameterStatus(conn, gucName);
02952         const char *localVal;
02953 
02954         /*
02955          * If the remote server is pre-8.4, it won't have IntervalStyle, but
02956          * that's okay because its output format won't be ambiguous.  So just
02957          * skip the GUC if we don't get a value for it.  (We might eventually
02958          * need more complicated logic with remote-version checks here.)
02959          */
02960         if (remoteVal == NULL)
02961             continue;
02962 
02963         /*
02964          * Avoid GUC-setting overhead if the remote and local GUCs already
02965          * have the same value.
02966          */
02967         localVal = GetConfigOption(gucName, false, false);
02968         Assert(localVal != NULL);
02969 
02970         if (strcmp(remoteVal, localVal) == 0)
02971             continue;
02972 
02973         /* Create new GUC nest level if we didn't already */
02974         if (nestlevel < 0)
02975             nestlevel = NewGUCNestLevel();
02976 
02977         /* Apply the option (this will throw error on failure) */
02978         (void) set_config_option(gucName, remoteVal,
02979                                  PGC_USERSET, PGC_S_SESSION,
02980                                  GUC_ACTION_SAVE, true, 0);
02981     }
02982 
02983     return nestlevel;
02984 }
02985 
02986 /*
02987  * Restore local GUCs after they have been overlaid with remote settings.
02988  */
02989 static void
02990 restoreLocalGucs(int nestlevel)
02991 {
02992     /* Do nothing if no new nestlevel was created */
02993     if (nestlevel > 0)
02994         AtEOXact_GUC(true, nestlevel);
02995 }