Fix xslt_process() to ensure that it inserts a NULL terminator after the
[PostgreSQL.git] / contrib / dblink / dblink.c
blob276c7e140058c468b74e87ca0dbffa98433193dc
1 /*
2 * dblink.c
4 * Functions returning results from a remote database
6 * Joe Conway <mail@joeconway.com>
7 * And contributors:
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
11 * $PostgreSQL$
12 * Copyright (c) 2001-2009, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
33 #include "postgres.h"
35 #include <limits.h>
37 #include "libpq-fe.h"
38 #include "fmgr.h"
39 #include "funcapi.h"
40 #include "access/genam.h"
41 #include "access/heapam.h"
42 #include "access/tupdesc.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_index.h"
46 #include "catalog/pg_type.h"
47 #include "executor/executor.h"
48 #include "executor/spi.h"
49 #include "foreign/foreign.h"
50 #include "lib/stringinfo.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "nodes/execnodes.h"
54 #include "nodes/nodes.h"
55 #include "nodes/pg_list.h"
56 #include "parser/parse_type.h"
57 #include "utils/acl.h"
58 #include "utils/array.h"
59 #include "utils/builtins.h"
60 #include "utils/dynahash.h"
61 #include "utils/fmgroids.h"
62 #include "utils/hsearch.h"
63 #include "utils/lsyscache.h"
64 #include "utils/memutils.h"
65 #include "utils/syscache.h"
66 #include "utils/tqual.h"
68 #include "dblink.h"
70 PG_MODULE_MAGIC;
72 typedef struct remoteConn
74 PGconn *conn; /* Hold the remote connection */
75 int openCursorCount; /* The number of open cursors */
76 bool newXactForCursor; /* Opened a transaction for a cursor */
77 } remoteConn;
80 * Internal declarations
82 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
83 static remoteConn *getConnectionByName(const char *name);
84 static HTAB *createConnHash(void);
85 static void createNewConnection(const char *name, remoteConn *rconn);
86 static void deleteConnection(const char *name);
87 static char **get_pkey_attnames(Oid relid, int16 *numatts);
88 static char **get_text_array_contents(ArrayType *array, int *numitems);
89 static char *get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
90 static char *get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
91 static char *get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
92 static char *quote_literal_cstr(char *rawstr);
93 static char *quote_ident_cstr(char *rawstr);
94 static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key);
95 static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
96 static Oid get_relid_from_relname(text *relname_text);
97 static char *generate_relation_name(Oid relid);
98 static void dblink_connstr_check(const char *connstr);
99 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
100 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
101 static char *get_connect_string(const char *servername);
102 static char *escape_param_str(const char *from);
104 /* Global */
105 static remoteConn *pconn = NULL;
106 static HTAB *remoteConnHash = NULL;
109 * Following is list that holds multiple remote connections.
110 * Calling convention of each dblink function changes to accept
111 * connection name as the first parameter. The connection list is
112 * much like ecpg e.g. a mapping between a name and a PGconn object.
115 typedef struct remoteConnHashEnt
117 char name[NAMEDATALEN];
118 remoteConn *rconn;
119 } remoteConnHashEnt;
121 /* initial number of connection hashes */
122 #define NUMCONN 16
124 /* general utility */
125 #define xpfree(var_) \
126 do { \
127 if (var_ != NULL) \
129 pfree(var_); \
130 var_ = NULL; \
132 } while (0)
134 #define xpstrdup(var_c, var_) \
135 do { \
136 if (var_ != NULL) \
137 var_c = pstrdup(var_); \
138 else \
139 var_c = NULL; \
140 } while (0)
142 #define DBLINK_RES_INTERNALERROR(p2) \
143 do { \
144 msg = pstrdup(PQerrorMessage(conn)); \
145 if (res) \
146 PQclear(res); \
147 elog(ERROR, "%s: %s", p2, msg); \
148 } while (0)
150 #define DBLINK_CONN_NOT_AVAIL \
151 do { \
152 if(conname) \
153 ereport(ERROR, \
154 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
155 errmsg("connection \"%s\" not available", conname))); \
156 else \
157 ereport(ERROR, \
158 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
159 errmsg("connection not available"))); \
160 } while (0)
162 #define DBLINK_GET_CONN \
163 do { \
164 char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
165 rconn = getConnectionByName(conname_or_str); \
166 if(rconn) \
168 conn = rconn->conn; \
170 else \
172 connstr = get_connect_string(conname_or_str); \
173 if (connstr == NULL) \
175 connstr = conname_or_str; \
177 dblink_connstr_check(connstr); \
178 conn = PQconnectdb(connstr); \
179 if (PQstatus(conn) == CONNECTION_BAD) \
181 msg = pstrdup(PQerrorMessage(conn)); \
182 PQfinish(conn); \
183 ereport(ERROR, \
184 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
185 errmsg("could not establish connection"), \
186 errdetail("%s", msg))); \
188 dblink_security_check(conn, rconn); \
189 PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
190 freeconn = true; \
192 } while (0)
194 #define DBLINK_GET_NAMED_CONN \
195 do { \
196 char *conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
197 rconn = getConnectionByName(conname); \
198 if(rconn) \
199 conn = rconn->conn; \
200 else \
201 DBLINK_CONN_NOT_AVAIL; \
202 } while (0)
204 #define DBLINK_INIT \
205 do { \
206 if (!pconn) \
208 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
209 pconn->conn = NULL; \
210 pconn->openCursorCount = 0; \
211 pconn->newXactForCursor = FALSE; \
213 } while (0)
216 * Create a persistent connection to another database
218 PG_FUNCTION_INFO_V1(dblink_connect);
219 Datum
220 dblink_connect(PG_FUNCTION_ARGS)
222 char *conname_or_str = NULL;
223 char *connstr = NULL;
224 char *connname = NULL;
225 char *msg;
226 PGconn *conn = NULL;
227 remoteConn *rconn = NULL;
229 DBLINK_INIT;
231 if (PG_NARGS() == 2)
233 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
234 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
236 else if (PG_NARGS() == 1)
237 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
239 if (connname)
240 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
241 sizeof(remoteConn));
243 /* first check for valid foreign data server */
244 connstr = get_connect_string(conname_or_str);
245 if (connstr == NULL)
246 connstr = conname_or_str;
248 /* check password in connection string if not superuser */
249 dblink_connstr_check(connstr);
250 conn = PQconnectdb(connstr);
252 if (PQstatus(conn) == CONNECTION_BAD)
254 msg = pstrdup(PQerrorMessage(conn));
255 PQfinish(conn);
256 if (rconn)
257 pfree(rconn);
259 ereport(ERROR,
260 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
261 errmsg("could not establish connection"),
262 errdetail("%s", msg)));
265 /* check password actually used if not superuser */
266 dblink_security_check(conn, rconn);
268 /* attempt to set client encoding to match server encoding */
269 PQsetClientEncoding(conn, GetDatabaseEncodingName());
271 if (connname)
273 rconn->conn = conn;
274 createNewConnection(connname, rconn);
276 else
277 pconn->conn = conn;
279 PG_RETURN_TEXT_P(cstring_to_text("OK"));
283 * Clear a persistent connection to another database
285 PG_FUNCTION_INFO_V1(dblink_disconnect);
286 Datum
287 dblink_disconnect(PG_FUNCTION_ARGS)
289 char *conname = NULL;
290 remoteConn *rconn = NULL;
291 PGconn *conn = NULL;
293 DBLINK_INIT;
295 if (PG_NARGS() == 1)
297 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
298 rconn = getConnectionByName(conname);
299 if (rconn)
300 conn = rconn->conn;
302 else
303 conn = pconn->conn;
305 if (!conn)
306 DBLINK_CONN_NOT_AVAIL;
308 PQfinish(conn);
309 if (rconn)
311 deleteConnection(conname);
312 pfree(rconn);
314 else
315 pconn->conn = NULL;
317 PG_RETURN_TEXT_P(cstring_to_text("OK"));
321 * opens a cursor using a persistent connection
323 PG_FUNCTION_INFO_V1(dblink_open);
324 Datum
325 dblink_open(PG_FUNCTION_ARGS)
327 char *msg;
328 PGresult *res = NULL;
329 PGconn *conn = NULL;
330 char *curname = NULL;
331 char *sql = NULL;
332 char *conname = NULL;
333 StringInfoData buf;
334 remoteConn *rconn = NULL;
335 bool fail = true; /* default to backward compatible behavior */
337 DBLINK_INIT;
338 initStringInfo(&buf);
340 if (PG_NARGS() == 2)
342 /* text,text */
343 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
344 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
345 rconn = pconn;
347 else if (PG_NARGS() == 3)
349 /* might be text,text,text or text,text,bool */
350 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
352 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
353 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
354 fail = PG_GETARG_BOOL(2);
355 rconn = pconn;
357 else
359 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
360 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
361 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
362 rconn = getConnectionByName(conname);
365 else if (PG_NARGS() == 4)
367 /* text,text,text,bool */
368 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
369 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
370 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
371 fail = PG_GETARG_BOOL(3);
372 rconn = getConnectionByName(conname);
375 if (!rconn || !rconn->conn)
376 DBLINK_CONN_NOT_AVAIL;
377 else
378 conn = rconn->conn;
380 /* If we are not in a transaction, start one */
381 if (PQtransactionStatus(conn) == PQTRANS_IDLE)
383 res = PQexec(conn, "BEGIN");
384 if (PQresultStatus(res) != PGRES_COMMAND_OK)
385 DBLINK_RES_INTERNALERROR("begin error");
386 PQclear(res);
387 rconn->newXactForCursor = TRUE;
390 * Since transaction state was IDLE, we force cursor count to
391 * initially be 0. This is needed as a previous ABORT might have wiped
392 * out our transaction without maintaining the cursor count for us.
394 rconn->openCursorCount = 0;
397 /* if we started a transaction, increment cursor count */
398 if (rconn->newXactForCursor)
399 (rconn->openCursorCount)++;
401 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
402 res = PQexec(conn, buf.data);
403 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
405 dblink_res_error(conname, res, "could not open cursor", fail);
406 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
409 PQclear(res);
410 PG_RETURN_TEXT_P(cstring_to_text("OK"));
414 * closes a cursor
416 PG_FUNCTION_INFO_V1(dblink_close);
417 Datum
418 dblink_close(PG_FUNCTION_ARGS)
420 PGconn *conn = NULL;
421 PGresult *res = NULL;
422 char *curname = NULL;
423 char *conname = NULL;
424 StringInfoData buf;
425 char *msg;
426 remoteConn *rconn = NULL;
427 bool fail = true; /* default to backward compatible behavior */
429 DBLINK_INIT;
430 initStringInfo(&buf);
432 if (PG_NARGS() == 1)
434 /* text */
435 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
436 rconn = pconn;
438 else if (PG_NARGS() == 2)
440 /* might be text,text or text,bool */
441 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
443 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
444 fail = PG_GETARG_BOOL(1);
445 rconn = pconn;
447 else
449 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
450 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
451 rconn = getConnectionByName(conname);
454 if (PG_NARGS() == 3)
456 /* text,text,bool */
457 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
458 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
459 fail = PG_GETARG_BOOL(2);
460 rconn = getConnectionByName(conname);
463 if (!rconn || !rconn->conn)
464 DBLINK_CONN_NOT_AVAIL;
465 else
466 conn = rconn->conn;
468 appendStringInfo(&buf, "CLOSE %s", curname);
470 /* close the cursor */
471 res = PQexec(conn, buf.data);
472 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
474 dblink_res_error(conname, res, "could not close cursor", fail);
475 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
478 PQclear(res);
480 /* if we started a transaction, decrement cursor count */
481 if (rconn->newXactForCursor)
483 (rconn->openCursorCount)--;
485 /* if count is zero, commit the transaction */
486 if (rconn->openCursorCount == 0)
488 rconn->newXactForCursor = FALSE;
490 res = PQexec(conn, "COMMIT");
491 if (PQresultStatus(res) != PGRES_COMMAND_OK)
492 DBLINK_RES_INTERNALERROR("commit error");
493 PQclear(res);
497 PG_RETURN_TEXT_P(cstring_to_text("OK"));
501 * Fetch results from an open cursor
503 PG_FUNCTION_INFO_V1(dblink_fetch);
504 Datum
505 dblink_fetch(PG_FUNCTION_ARGS)
507 FuncCallContext *funcctx;
508 TupleDesc tupdesc = NULL;
509 int call_cntr;
510 int max_calls;
511 AttInMetadata *attinmeta;
512 PGresult *res = NULL;
513 MemoryContext oldcontext;
514 char *conname = NULL;
515 remoteConn *rconn = NULL;
517 DBLINK_INIT;
519 /* stuff done only on the first call of the function */
520 if (SRF_IS_FIRSTCALL())
522 PGconn *conn = NULL;
523 StringInfoData buf;
524 char *curname = NULL;
525 int howmany = 0;
526 bool fail = true; /* default to backward compatible */
528 if (PG_NARGS() == 4)
530 /* text,text,int,bool */
531 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
532 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
533 howmany = PG_GETARG_INT32(2);
534 fail = PG_GETARG_BOOL(3);
536 rconn = getConnectionByName(conname);
537 if (rconn)
538 conn = rconn->conn;
540 else if (PG_NARGS() == 3)
542 /* text,text,int or text,int,bool */
543 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
545 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
546 howmany = PG_GETARG_INT32(1);
547 fail = PG_GETARG_BOOL(2);
548 conn = pconn->conn;
550 else
552 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
553 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
554 howmany = PG_GETARG_INT32(2);
556 rconn = getConnectionByName(conname);
557 if (rconn)
558 conn = rconn->conn;
561 else if (PG_NARGS() == 2)
563 /* text,int */
564 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
565 howmany = PG_GETARG_INT32(1);
566 conn = pconn->conn;
569 if (!conn)
570 DBLINK_CONN_NOT_AVAIL;
572 initStringInfo(&buf);
573 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
575 /* create a function context for cross-call persistence */
576 funcctx = SRF_FIRSTCALL_INIT();
579 * Try to execute the query. Note that since libpq uses malloc, the
580 * PGresult will be long-lived even though we are still in a
581 * short-lived memory context.
583 res = PQexec(conn, buf.data);
584 if (!res ||
585 (PQresultStatus(res) != PGRES_COMMAND_OK &&
586 PQresultStatus(res) != PGRES_TUPLES_OK))
588 dblink_res_error(conname, res, "could not fetch from cursor", fail);
589 SRF_RETURN_DONE(funcctx);
591 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
593 /* cursor does not exist - closed already or bad name */
594 PQclear(res);
595 ereport(ERROR,
596 (errcode(ERRCODE_INVALID_CURSOR_NAME),
597 errmsg("cursor \"%s\" does not exist", curname)));
600 funcctx->max_calls = PQntuples(res);
602 /* got results, keep track of them */
603 funcctx->user_fctx = res;
605 /* get a tuple descriptor for our result type */
606 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
608 case TYPEFUNC_COMPOSITE:
609 /* success */
610 break;
611 case TYPEFUNC_RECORD:
612 /* failed to determine actual type of RECORD */
613 ereport(ERROR,
614 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
615 errmsg("function returning record called in context "
616 "that cannot accept type record")));
617 break;
618 default:
619 /* result type isn't composite */
620 elog(ERROR, "return type must be a row type");
621 break;
624 /* check result and tuple descriptor have the same number of columns */
625 if (PQnfields(res) != tupdesc->natts)
626 ereport(ERROR,
627 (errcode(ERRCODE_DATATYPE_MISMATCH),
628 errmsg("remote query result rowtype does not match "
629 "the specified FROM clause rowtype")));
632 * fast track when no results. We could exit earlier, but then we'd
633 * not report error if the result tuple type is wrong.
635 if (funcctx->max_calls < 1)
637 PQclear(res);
638 SRF_RETURN_DONE(funcctx);
642 * switch to memory context appropriate for multiple function calls,
643 * so we can make long-lived copy of tupdesc etc
645 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
647 /* make sure we have a persistent copy of the tupdesc */
648 tupdesc = CreateTupleDescCopy(tupdesc);
650 /* store needed metadata for subsequent calls */
651 attinmeta = TupleDescGetAttInMetadata(tupdesc);
652 funcctx->attinmeta = attinmeta;
654 MemoryContextSwitchTo(oldcontext);
657 /* stuff done on every call of the function */
658 funcctx = SRF_PERCALL_SETUP();
661 * initialize per-call variables
663 call_cntr = funcctx->call_cntr;
664 max_calls = funcctx->max_calls;
666 res = (PGresult *) funcctx->user_fctx;
667 attinmeta = funcctx->attinmeta;
668 tupdesc = attinmeta->tupdesc;
670 if (call_cntr < max_calls) /* do when there is more left to send */
672 char **values;
673 HeapTuple tuple;
674 Datum result;
675 int i;
676 int nfields = PQnfields(res);
678 values = (char **) palloc(nfields * sizeof(char *));
679 for (i = 0; i < nfields; i++)
681 if (PQgetisnull(res, call_cntr, i) == 0)
682 values[i] = PQgetvalue(res, call_cntr, i);
683 else
684 values[i] = NULL;
687 /* build the tuple */
688 tuple = BuildTupleFromCStrings(attinmeta, values);
690 /* make the tuple into a datum */
691 result = HeapTupleGetDatum(tuple);
693 SRF_RETURN_NEXT(funcctx, result);
695 else
697 /* do when there is no more left */
698 PQclear(res);
699 SRF_RETURN_DONE(funcctx);
704 * Note: this is the new preferred version of dblink
706 PG_FUNCTION_INFO_V1(dblink_record);
707 Datum
708 dblink_record(PG_FUNCTION_ARGS)
710 return dblink_record_internal(fcinfo, false);
713 PG_FUNCTION_INFO_V1(dblink_send_query);
714 Datum
715 dblink_send_query(PG_FUNCTION_ARGS)
717 PGconn *conn = NULL;
718 char *connstr = NULL;
719 char *sql = NULL;
720 remoteConn *rconn = NULL;
721 char *msg;
722 bool freeconn = false;
723 int retval;
725 if (PG_NARGS() == 2)
727 DBLINK_GET_CONN;
728 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
730 else
731 /* shouldn't happen */
732 elog(ERROR, "wrong number of arguments");
734 /* async query send */
735 retval = PQsendQuery(conn, sql);
736 if (retval != 1)
737 elog(NOTICE, "%s", PQerrorMessage(conn));
739 PG_RETURN_INT32(retval);
742 PG_FUNCTION_INFO_V1(dblink_get_result);
743 Datum
744 dblink_get_result(PG_FUNCTION_ARGS)
746 return dblink_record_internal(fcinfo, true);
749 static Datum
750 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
752 FuncCallContext *funcctx;
753 TupleDesc tupdesc = NULL;
754 int call_cntr;
755 int max_calls;
756 AttInMetadata *attinmeta;
757 char *msg;
758 PGresult *res = NULL;
759 bool is_sql_cmd = false;
760 char *sql_cmd_status = NULL;
761 MemoryContext oldcontext;
762 bool freeconn = false;
764 DBLINK_INIT;
766 /* stuff done only on the first call of the function */
767 if (SRF_IS_FIRSTCALL())
769 PGconn *conn = NULL;
770 char *connstr = NULL;
771 char *sql = NULL;
772 char *conname = NULL;
773 remoteConn *rconn = NULL;
774 bool fail = true; /* default to backward compatible */
776 /* create a function context for cross-call persistence */
777 funcctx = SRF_FIRSTCALL_INIT();
780 * switch to memory context appropriate for multiple function calls
782 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
784 if (!is_async)
786 if (PG_NARGS() == 3)
788 /* text,text,bool */
789 DBLINK_GET_CONN;
790 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
791 fail = PG_GETARG_BOOL(2);
793 else if (PG_NARGS() == 2)
795 /* text,text or text,bool */
796 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
798 conn = pconn->conn;
799 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
800 fail = PG_GETARG_BOOL(1);
802 else
804 DBLINK_GET_CONN;
805 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
808 else if (PG_NARGS() == 1)
810 /* text */
811 conn = pconn->conn;
812 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
814 else
815 /* shouldn't happen */
816 elog(ERROR, "wrong number of arguments");
818 else /* is_async */
820 /* get async result */
821 if (PG_NARGS() == 2)
823 /* text,bool */
824 DBLINK_GET_CONN;
825 fail = PG_GETARG_BOOL(1);
827 else if (PG_NARGS() == 1)
829 /* text */
830 DBLINK_GET_CONN;
832 else
833 /* shouldn't happen */
834 elog(ERROR, "wrong number of arguments");
837 if (!conn)
838 DBLINK_CONN_NOT_AVAIL;
840 /* synchronous query, or async result retrieval */
841 if (!is_async)
842 res = PQexec(conn, sql);
843 else
845 res = PQgetResult(conn);
846 /* NULL means we're all done with the async results */
847 if (!res)
849 MemoryContextSwitchTo(oldcontext);
850 SRF_RETURN_DONE(funcctx);
854 if (!res ||
855 (PQresultStatus(res) != PGRES_COMMAND_OK &&
856 PQresultStatus(res) != PGRES_TUPLES_OK))
858 dblink_res_error(conname, res, "could not execute query", fail);
859 if (freeconn)
860 PQfinish(conn);
861 MemoryContextSwitchTo(oldcontext);
862 SRF_RETURN_DONE(funcctx);
865 if (PQresultStatus(res) == PGRES_COMMAND_OK)
867 is_sql_cmd = true;
869 /* need a tuple descriptor representing one TEXT column */
870 tupdesc = CreateTemplateTupleDesc(1, false);
871 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
872 TEXTOID, -1, 0);
875 * and save a copy of the command status string to return as our
876 * result tuple
878 sql_cmd_status = PQcmdStatus(res);
879 funcctx->max_calls = 1;
881 else
882 funcctx->max_calls = PQntuples(res);
884 /* got results, keep track of them */
885 funcctx->user_fctx = res;
887 /* if needed, close the connection to the database and cleanup */
888 if (freeconn)
889 PQfinish(conn);
891 if (!is_sql_cmd)
893 /* get a tuple descriptor for our result type */
894 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
896 case TYPEFUNC_COMPOSITE:
897 /* success */
898 break;
899 case TYPEFUNC_RECORD:
900 /* failed to determine actual type of RECORD */
901 ereport(ERROR,
902 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
903 errmsg("function returning record called in context "
904 "that cannot accept type record")));
905 break;
906 default:
907 /* result type isn't composite */
908 elog(ERROR, "return type must be a row type");
909 break;
912 /* make sure we have a persistent copy of the tupdesc */
913 tupdesc = CreateTupleDescCopy(tupdesc);
917 * check result and tuple descriptor have the same number of columns
919 if (PQnfields(res) != tupdesc->natts)
920 ereport(ERROR,
921 (errcode(ERRCODE_DATATYPE_MISMATCH),
922 errmsg("remote query result rowtype does not match "
923 "the specified FROM clause rowtype")));
925 /* fast track when no results */
926 if (funcctx->max_calls < 1)
928 if (res)
929 PQclear(res);
930 MemoryContextSwitchTo(oldcontext);
931 SRF_RETURN_DONE(funcctx);
934 /* store needed metadata for subsequent calls */
935 attinmeta = TupleDescGetAttInMetadata(tupdesc);
936 funcctx->attinmeta = attinmeta;
938 MemoryContextSwitchTo(oldcontext);
942 /* stuff done on every call of the function */
943 funcctx = SRF_PERCALL_SETUP();
946 * initialize per-call variables
948 call_cntr = funcctx->call_cntr;
949 max_calls = funcctx->max_calls;
951 res = (PGresult *) funcctx->user_fctx;
952 attinmeta = funcctx->attinmeta;
953 tupdesc = attinmeta->tupdesc;
955 if (call_cntr < max_calls) /* do when there is more left to send */
957 char **values;
958 HeapTuple tuple;
959 Datum result;
961 if (!is_sql_cmd)
963 int i;
964 int nfields = PQnfields(res);
966 values = (char **) palloc(nfields * sizeof(char *));
967 for (i = 0; i < nfields; i++)
969 if (PQgetisnull(res, call_cntr, i) == 0)
970 values[i] = PQgetvalue(res, call_cntr, i);
971 else
972 values[i] = NULL;
975 else
977 values = (char **) palloc(1 * sizeof(char *));
978 values[0] = sql_cmd_status;
981 /* build the tuple */
982 tuple = BuildTupleFromCStrings(attinmeta, values);
984 /* make the tuple into a datum */
985 result = HeapTupleGetDatum(tuple);
987 SRF_RETURN_NEXT(funcctx, result);
989 else
991 /* do when there is no more left */
992 PQclear(res);
993 SRF_RETURN_DONE(funcctx);
998 * List all open dblink connections by name.
999 * Returns an array of all connection names.
1000 * Takes no params
1002 PG_FUNCTION_INFO_V1(dblink_get_connections);
1003 Datum
1004 dblink_get_connections(PG_FUNCTION_ARGS)
1006 HASH_SEQ_STATUS status;
1007 remoteConnHashEnt *hentry;
1008 ArrayBuildState *astate = NULL;
1010 if (remoteConnHash)
1012 hash_seq_init(&status, remoteConnHash);
1013 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1015 /* stash away current value */
1016 astate = accumArrayResult(astate,
1017 CStringGetTextDatum(hentry->name),
1018 false, TEXTOID, CurrentMemoryContext);
1022 if (astate)
1023 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
1024 CurrentMemoryContext));
1025 else
1026 PG_RETURN_NULL();
1030 * Checks if a given remote connection is busy
1032 * Returns 1 if the connection is busy, 0 otherwise
1033 * Params:
1034 * text connection_name - name of the connection to check
1037 PG_FUNCTION_INFO_V1(dblink_is_busy);
1038 Datum
1039 dblink_is_busy(PG_FUNCTION_ARGS)
1041 PGconn *conn = NULL;
1042 remoteConn *rconn = NULL;
1044 DBLINK_INIT;
1045 DBLINK_GET_NAMED_CONN;
1047 PQconsumeInput(conn);
1048 PG_RETURN_INT32(PQisBusy(conn));
1052 * Cancels a running request on a connection
1054 * Returns text:
1055 * "OK" if the cancel request has been sent correctly,
1056 * an error message otherwise
1058 * Params:
1059 * text connection_name - name of the connection to check
1062 PG_FUNCTION_INFO_V1(dblink_cancel_query);
1063 Datum
1064 dblink_cancel_query(PG_FUNCTION_ARGS)
1066 int res = 0;
1067 PGconn *conn = NULL;
1068 remoteConn *rconn = NULL;
1069 PGcancel *cancel;
1070 char errbuf[256];
1072 DBLINK_INIT;
1073 DBLINK_GET_NAMED_CONN;
1074 cancel = PQgetCancel(conn);
1076 res = PQcancel(cancel, errbuf, 256);
1077 PQfreeCancel(cancel);
1079 if (res == 1)
1080 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1081 else
1082 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1087 * Get error message from a connection
1089 * Returns text:
1090 * "OK" if no error, an error message otherwise
1092 * Params:
1093 * text connection_name - name of the connection to check
1096 PG_FUNCTION_INFO_V1(dblink_error_message);
1097 Datum
1098 dblink_error_message(PG_FUNCTION_ARGS)
1100 char *msg;
1101 PGconn *conn = NULL;
1102 remoteConn *rconn = NULL;
1104 DBLINK_INIT;
1105 DBLINK_GET_NAMED_CONN;
1107 msg = PQerrorMessage(conn);
1108 if (msg == NULL || msg[0] == '\0')
1109 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1110 else
1111 PG_RETURN_TEXT_P(cstring_to_text(msg));
1115 * Execute an SQL non-SELECT command
1117 PG_FUNCTION_INFO_V1(dblink_exec);
1118 Datum
1119 dblink_exec(PG_FUNCTION_ARGS)
1121 char *msg;
1122 PGresult *res = NULL;
1123 text *sql_cmd_status = NULL;
1124 TupleDesc tupdesc = NULL;
1125 PGconn *conn = NULL;
1126 char *connstr = NULL;
1127 char *sql = NULL;
1128 char *conname = NULL;
1129 remoteConn *rconn = NULL;
1130 bool freeconn = false;
1131 bool fail = true; /* default to backward compatible behavior */
1133 DBLINK_INIT;
1135 if (PG_NARGS() == 3)
1137 /* must be text,text,bool */
1138 DBLINK_GET_CONN;
1139 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1140 fail = PG_GETARG_BOOL(2);
1142 else if (PG_NARGS() == 2)
1144 /* might be text,text or text,bool */
1145 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1147 conn = pconn->conn;
1148 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1149 fail = PG_GETARG_BOOL(1);
1151 else
1153 DBLINK_GET_CONN;
1154 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1157 else if (PG_NARGS() == 1)
1159 /* must be single text argument */
1160 conn = pconn->conn;
1161 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1163 else
1164 /* shouldn't happen */
1165 elog(ERROR, "wrong number of arguments");
1167 if (!conn)
1168 DBLINK_CONN_NOT_AVAIL;
1170 res = PQexec(conn, sql);
1171 if (!res ||
1172 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1173 PQresultStatus(res) != PGRES_TUPLES_OK))
1175 dblink_res_error(conname, res, "could not execute command", fail);
1177 /* need a tuple descriptor representing one TEXT column */
1178 tupdesc = CreateTemplateTupleDesc(1, false);
1179 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1180 TEXTOID, -1, 0);
1183 * and save a copy of the command status string to return as our
1184 * result tuple
1186 sql_cmd_status = cstring_to_text("ERROR");
1188 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1190 /* need a tuple descriptor representing one TEXT column */
1191 tupdesc = CreateTemplateTupleDesc(1, false);
1192 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1193 TEXTOID, -1, 0);
1196 * and save a copy of the command status string to return as our
1197 * result tuple
1199 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1200 PQclear(res);
1202 else
1204 PQclear(res);
1205 ereport(ERROR,
1206 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1207 errmsg("statement returning results not allowed")));
1210 /* if needed, close the connection to the database and cleanup */
1211 if (freeconn)
1212 PQfinish(conn);
1214 PG_RETURN_TEXT_P(sql_cmd_status);
1219 * dblink_get_pkey
1221 * Return list of primary key fields for the supplied relation,
1222 * or NULL if none exists.
1224 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1225 Datum
1226 dblink_get_pkey(PG_FUNCTION_ARGS)
1228 int16 numatts;
1229 Oid relid;
1230 char **results;
1231 FuncCallContext *funcctx;
1232 int32 call_cntr;
1233 int32 max_calls;
1234 AttInMetadata *attinmeta;
1235 MemoryContext oldcontext;
1237 /* stuff done only on the first call of the function */
1238 if (SRF_IS_FIRSTCALL())
1240 TupleDesc tupdesc = NULL;
1242 /* create a function context for cross-call persistence */
1243 funcctx = SRF_FIRSTCALL_INIT();
1246 * switch to memory context appropriate for multiple function calls
1248 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1250 /* convert relname to rel Oid */
1251 relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
1252 if (!OidIsValid(relid))
1253 ereport(ERROR,
1254 (errcode(ERRCODE_UNDEFINED_TABLE),
1255 errmsg("relation \"%s\" does not exist",
1256 text_to_cstring(PG_GETARG_TEXT_PP(0)))));
1259 * need a tuple descriptor representing one INT and one TEXT column
1261 tupdesc = CreateTemplateTupleDesc(2, false);
1262 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1263 INT4OID, -1, 0);
1264 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1265 TEXTOID, -1, 0);
1268 * Generate attribute metadata needed later to produce tuples from raw
1269 * C strings
1271 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1272 funcctx->attinmeta = attinmeta;
1274 /* get an array of attnums */
1275 results = get_pkey_attnames(relid, &numatts);
1277 if ((results != NULL) && (numatts > 0))
1279 funcctx->max_calls = numatts;
1281 /* got results, keep track of them */
1282 funcctx->user_fctx = results;
1284 else
1286 /* fast track when no results */
1287 MemoryContextSwitchTo(oldcontext);
1288 SRF_RETURN_DONE(funcctx);
1291 MemoryContextSwitchTo(oldcontext);
1294 /* stuff done on every call of the function */
1295 funcctx = SRF_PERCALL_SETUP();
1298 * initialize per-call variables
1300 call_cntr = funcctx->call_cntr;
1301 max_calls = funcctx->max_calls;
1303 results = (char **) funcctx->user_fctx;
1304 attinmeta = funcctx->attinmeta;
1306 if (call_cntr < max_calls) /* do when there is more left to send */
1308 char **values;
1309 HeapTuple tuple;
1310 Datum result;
1312 values = (char **) palloc(2 * sizeof(char *));
1313 values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */
1315 sprintf(values[0], "%d", call_cntr + 1);
1317 values[1] = results[call_cntr];
1319 /* build the tuple */
1320 tuple = BuildTupleFromCStrings(attinmeta, values);
1322 /* make the tuple into a datum */
1323 result = HeapTupleGetDatum(tuple);
1325 SRF_RETURN_NEXT(funcctx, result);
1327 else
1329 /* do when there is no more left */
1330 SRF_RETURN_DONE(funcctx);
1336 * dblink_build_sql_insert
1338 * Used to generate an SQL insert statement
1339 * based on an existing tuple in a local relation.
1340 * This is useful for selectively replicating data
1341 * to another server via dblink.
1343 * API:
1344 * <relname> - name of local table of interest
1345 * <pkattnums> - an int2vector of attnums which will be used
1346 * to identify the local tuple of interest
1347 * <pknumatts> - number of attnums in pkattnums
1348 * <src_pkattvals_arry> - text array of key values which will be used
1349 * to identify the local tuple of interest
1350 * <tgt_pkattvals_arry> - text array of key values which will be used
1351 * to build the string for execution remotely. These are substituted
1352 * for their counterparts in src_pkattvals_arry
1354 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1355 Datum
1356 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1358 text *relname_text = PG_GETARG_TEXT_P(0);
1359 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1360 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1361 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1362 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1363 Oid relid;
1364 int16 pknumatts = 0;
1365 char **src_pkattvals;
1366 char **tgt_pkattvals;
1367 int src_nitems;
1368 int tgt_nitems;
1369 char *sql;
1372 * Convert relname to rel OID.
1374 relid = get_relid_from_relname(relname_text);
1375 if (!OidIsValid(relid))
1376 ereport(ERROR,
1377 (errcode(ERRCODE_UNDEFINED_TABLE),
1378 errmsg("relation \"%s\" does not exist",
1379 text_to_cstring(relname_text))));
1382 * There should be at least one key attribute
1384 if (pknumatts_tmp <= 0)
1385 ereport(ERROR,
1386 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1387 errmsg("number of key attributes must be > 0")));
1389 if (pknumatts_tmp <= SHRT_MAX)
1390 pknumatts = pknumatts_tmp;
1391 else
1392 ereport(ERROR,
1393 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1394 errmsg("input for number of primary key " \
1395 "attributes too large")));
1398 * Source array is made up of key values that will be used to locate the
1399 * tuple of interest from the local system.
1401 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1404 * There should be one source array key value for each key attnum
1406 if (src_nitems != pknumatts)
1407 ereport(ERROR,
1408 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1409 errmsg("source key array length must match number of key " \
1410 "attributes")));
1413 * Target array is made up of key values that will be used to build the
1414 * SQL string for use on the remote system.
1416 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1419 * There should be one target array key value for each key attnum
1421 if (tgt_nitems != pknumatts)
1422 ereport(ERROR,
1423 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1424 errmsg("target key array length must match number of key " \
1425 "attributes")));
1428 * Prep work is finally done. Go get the SQL string.
1430 sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1433 * And send it
1435 PG_RETURN_TEXT_P(cstring_to_text(sql));
1440 * dblink_build_sql_delete
1442 * Used to generate an SQL delete statement.
1443 * This is useful for selectively replicating a
1444 * delete to another server via dblink.
1446 * API:
1447 * <relname> - name of remote table of interest
1448 * <pkattnums> - an int2vector of attnums which will be used
1449 * to identify the remote tuple of interest
1450 * <pknumatts> - number of attnums in pkattnums
1451 * <tgt_pkattvals_arry> - text array of key values which will be used
1452 * to build the string for execution remotely.
1454 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1455 Datum
1456 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1458 text *relname_text = PG_GETARG_TEXT_P(0);
1459 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1460 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1461 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1462 Oid relid;
1463 int16 pknumatts = 0;
1464 char **tgt_pkattvals;
1465 int tgt_nitems;
1466 char *sql;
1469 * Convert relname to rel OID.
1471 relid = get_relid_from_relname(relname_text);
1472 if (!OidIsValid(relid))
1473 ereport(ERROR,
1474 (errcode(ERRCODE_UNDEFINED_TABLE),
1475 errmsg("relation \"%s\" does not exist",
1476 text_to_cstring(relname_text))));
1479 * There should be at least one key attribute
1481 if (pknumatts_tmp <= 0)
1482 ereport(ERROR,
1483 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1484 errmsg("number of key attributes must be > 0")));
1486 if (pknumatts_tmp <= SHRT_MAX)
1487 pknumatts = pknumatts_tmp;
1488 else
1489 ereport(ERROR,
1490 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1491 errmsg("input for number of primary key " \
1492 "attributes too large")));
1495 * Target array is made up of key values that will be used to build the
1496 * SQL string for use on the remote system.
1498 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1501 * There should be one target array key value for each key attnum
1503 if (tgt_nitems != pknumatts)
1504 ereport(ERROR,
1505 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1506 errmsg("target key array length must match number of key " \
1507 "attributes")));
1510 * Prep work is finally done. Go get the SQL string.
1512 sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
1515 * And send it
1517 PG_RETURN_TEXT_P(cstring_to_text(sql));
1522 * dblink_build_sql_update
1524 * Used to generate an SQL update statement
1525 * based on an existing tuple in a local relation.
1526 * This is useful for selectively replicating data
1527 * to another server via dblink.
1529 * API:
1530 * <relname> - name of local table of interest
1531 * <pkattnums> - an int2vector of attnums which will be used
1532 * to identify the local tuple of interest
1533 * <pknumatts> - number of attnums in pkattnums
1534 * <src_pkattvals_arry> - text array of key values which will be used
1535 * to identify the local tuple of interest
1536 * <tgt_pkattvals_arry> - text array of key values which will be used
1537 * to build the string for execution remotely. These are substituted
1538 * for their counterparts in src_pkattvals_arry
1540 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1541 Datum
1542 dblink_build_sql_update(PG_FUNCTION_ARGS)
1544 text *relname_text = PG_GETARG_TEXT_P(0);
1545 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1546 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1547 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1548 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1549 Oid relid;
1550 int16 pknumatts = 0;
1551 char **src_pkattvals;
1552 char **tgt_pkattvals;
1553 int src_nitems;
1554 int tgt_nitems;
1555 char *sql;
1558 * Convert relname to rel OID.
1560 relid = get_relid_from_relname(relname_text);
1561 if (!OidIsValid(relid))
1562 ereport(ERROR,
1563 (errcode(ERRCODE_UNDEFINED_TABLE),
1564 errmsg("relation \"%s\" does not exist",
1565 text_to_cstring(relname_text))));
1568 * There should be one source array key values for each key attnum
1570 if (pknumatts_tmp <= 0)
1571 ereport(ERROR,
1572 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1573 errmsg("number of key attributes must be > 0")));
1575 if (pknumatts_tmp <= SHRT_MAX)
1576 pknumatts = pknumatts_tmp;
1577 else
1578 ereport(ERROR,
1579 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1580 errmsg("input for number of primary key " \
1581 "attributes too large")));
1584 * Source array is made up of key values that will be used to locate the
1585 * tuple of interest from the local system.
1587 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1590 * There should be one source array key value for each key attnum
1592 if (src_nitems != pknumatts)
1593 ereport(ERROR,
1594 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1595 errmsg("source key array length must match number of key " \
1596 "attributes")));
1599 * Target array is made up of key values that will be used to build the
1600 * SQL string for use on the remote system.
1602 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1605 * There should be one target array key value for each key attnum
1607 if (tgt_nitems != pknumatts)
1608 ereport(ERROR,
1609 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1610 errmsg("target key array length must match number of key " \
1611 "attributes")));
1614 * Prep work is finally done. Go get the SQL string.
1616 sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1619 * And send it
1621 PG_RETURN_TEXT_P(cstring_to_text(sql));
1625 * dblink_current_query
1626 * return the current query string
1627 * to allow its use in (among other things)
1628 * rewrite rules
1630 PG_FUNCTION_INFO_V1(dblink_current_query);
1631 Datum
1632 dblink_current_query(PG_FUNCTION_ARGS)
1634 /* This is now just an alias for the built-in function current_query() */
1635 PG_RETURN_DATUM(current_query(fcinfo));
1638 /*************************************************************
1639 * internal functions
1644 * get_pkey_attnames
1646 * Get the primary key attnames for the given relation.
1647 * Return NULL, and set numatts = 0, if no primary key exists.
1649 static char **
1650 get_pkey_attnames(Oid relid, int16 *numatts)
1652 Relation indexRelation;
1653 ScanKeyData skey;
1654 SysScanDesc scan;
1655 HeapTuple indexTuple;
1656 int i;
1657 char **result = NULL;
1658 Relation rel;
1659 TupleDesc tupdesc;
1660 AclResult aclresult;
1662 /* initialize numatts to 0 in case no primary key exists */
1663 *numatts = 0;
1665 /* open relation using relid, check permissions, get tupdesc */
1666 rel = relation_open(relid, AccessShareLock);
1668 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1669 ACL_SELECT);
1670 if (aclresult != ACLCHECK_OK)
1671 aclcheck_error(aclresult, ACL_KIND_CLASS,
1672 RelationGetRelationName(rel));
1674 tupdesc = rel->rd_att;
1676 /* Prepare to scan pg_index for entries having indrelid = this rel. */
1677 indexRelation = heap_open(IndexRelationId, AccessShareLock);
1678 ScanKeyInit(&skey,
1679 Anum_pg_index_indrelid,
1680 BTEqualStrategyNumber, F_OIDEQ,
1681 ObjectIdGetDatum(relid));
1683 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
1684 SnapshotNow, 1, &skey);
1686 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
1688 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
1690 /* we're only interested if it is the primary key */
1691 if (index->indisprimary)
1693 *numatts = index->indnatts;
1694 if (*numatts > 0)
1696 result = (char **) palloc(*numatts * sizeof(char *));
1698 for (i = 0; i < *numatts; i++)
1699 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
1701 break;
1705 systable_endscan(scan);
1706 heap_close(indexRelation, AccessShareLock);
1707 relation_close(rel, AccessShareLock);
1709 return result;
1713 * Deconstruct a text[] into C-strings (note any NULL elements will be
1714 * returned as NULL pointers)
1716 static char **
1717 get_text_array_contents(ArrayType *array, int *numitems)
1719 int ndim = ARR_NDIM(array);
1720 int *dims = ARR_DIMS(array);
1721 int nitems;
1722 int16 typlen;
1723 bool typbyval;
1724 char typalign;
1725 char **values;
1726 char *ptr;
1727 bits8 *bitmap;
1728 int bitmask;
1729 int i;
1731 Assert(ARR_ELEMTYPE(array) == TEXTOID);
1733 *numitems = nitems = ArrayGetNItems(ndim, dims);
1735 get_typlenbyvalalign(ARR_ELEMTYPE(array),
1736 &typlen, &typbyval, &typalign);
1738 values = (char **) palloc(nitems * sizeof(char *));
1740 ptr = ARR_DATA_PTR(array);
1741 bitmap = ARR_NULLBITMAP(array);
1742 bitmask = 1;
1744 for (i = 0; i < nitems; i++)
1746 if (bitmap && (*bitmap & bitmask) == 0)
1748 values[i] = NULL;
1750 else
1752 values[i] = TextDatumGetCString(PointerGetDatum(ptr));
1753 ptr = att_addlength_pointer(ptr, typlen, ptr);
1754 ptr = (char *) att_align_nominal(ptr, typalign);
1757 /* advance bitmap pointer if any */
1758 if (bitmap)
1760 bitmask <<= 1;
1761 if (bitmask == 0x100)
1763 bitmap++;
1764 bitmask = 1;
1769 return values;
1772 static char *
1773 get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1775 Relation rel;
1776 char *relname;
1777 HeapTuple tuple;
1778 TupleDesc tupdesc;
1779 int natts;
1780 StringInfoData buf;
1781 char *val;
1782 int16 key;
1783 int i;
1784 bool needComma;
1786 initStringInfo(&buf);
1788 /* get relation name including any needed schema prefix and quoting */
1789 relname = generate_relation_name(relid);
1792 * Open relation using relid
1794 rel = relation_open(relid, AccessShareLock);
1795 tupdesc = rel->rd_att;
1796 natts = tupdesc->natts;
1798 tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
1799 if (!tuple)
1800 ereport(ERROR,
1801 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1802 errmsg("source row not found")));
1804 appendStringInfo(&buf, "INSERT INTO %s(", relname);
1806 needComma = false;
1807 for (i = 0; i < natts; i++)
1809 if (tupdesc->attrs[i]->attisdropped)
1810 continue;
1812 if (needComma)
1813 appendStringInfo(&buf, ",");
1815 appendStringInfoString(&buf,
1816 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1817 needComma = true;
1820 appendStringInfo(&buf, ") VALUES(");
1823 * remember attvals are 1 based
1825 needComma = false;
1826 for (i = 0; i < natts; i++)
1828 if (tupdesc->attrs[i]->attisdropped)
1829 continue;
1831 if (needComma)
1832 appendStringInfo(&buf, ",");
1834 if (tgt_pkattvals != NULL)
1835 key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
1836 else
1837 key = -1;
1839 if (key > -1)
1840 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1841 else
1842 val = SPI_getvalue(tuple, tupdesc, i + 1);
1844 if (val != NULL)
1846 appendStringInfoString(&buf, quote_literal_cstr(val));
1847 pfree(val);
1849 else
1850 appendStringInfo(&buf, "NULL");
1851 needComma = true;
1853 appendStringInfo(&buf, ")");
1855 relation_close(rel, AccessShareLock);
1856 return (buf.data);
1859 static char *
1860 get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
1862 Relation rel;
1863 char *relname;
1864 TupleDesc tupdesc;
1865 int natts;
1866 StringInfoData buf;
1867 int i;
1869 initStringInfo(&buf);
1871 /* get relation name including any needed schema prefix and quoting */
1872 relname = generate_relation_name(relid);
1875 * Open relation using relid
1877 rel = relation_open(relid, AccessShareLock);
1878 tupdesc = rel->rd_att;
1879 natts = tupdesc->natts;
1881 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
1882 for (i = 0; i < pknumatts; i++)
1884 int16 pkattnum = pkattnums->values[i];
1886 if (i > 0)
1887 appendStringInfo(&buf, " AND ");
1889 appendStringInfoString(&buf,
1890 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
1892 if (tgt_pkattvals == NULL)
1893 /* internal error */
1894 elog(ERROR, "target key array must not be NULL");
1896 if (tgt_pkattvals[i] != NULL)
1897 appendStringInfo(&buf, " = %s",
1898 quote_literal_cstr(tgt_pkattvals[i]));
1899 else
1900 appendStringInfo(&buf, " IS NULL");
1903 relation_close(rel, AccessShareLock);
1904 return (buf.data);
1907 static char *
1908 get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1910 Relation rel;
1911 char *relname;
1912 HeapTuple tuple;
1913 TupleDesc tupdesc;
1914 int natts;
1915 StringInfoData buf;
1916 char *val;
1917 int16 key;
1918 int i;
1919 bool needComma;
1921 initStringInfo(&buf);
1923 /* get relation name including any needed schema prefix and quoting */
1924 relname = generate_relation_name(relid);
1927 * Open relation using relid
1929 rel = relation_open(relid, AccessShareLock);
1930 tupdesc = rel->rd_att;
1931 natts = tupdesc->natts;
1933 tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
1934 if (!tuple)
1935 ereport(ERROR,
1936 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1937 errmsg("source row not found")));
1939 appendStringInfo(&buf, "UPDATE %s SET ", relname);
1941 needComma = false;
1942 for (i = 0; i < natts; i++)
1944 if (tupdesc->attrs[i]->attisdropped)
1945 continue;
1947 if (needComma)
1948 appendStringInfo(&buf, ", ");
1950 appendStringInfo(&buf, "%s = ",
1951 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1953 if (tgt_pkattvals != NULL)
1954 key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
1955 else
1956 key = -1;
1958 if (key > -1)
1959 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1960 else
1961 val = SPI_getvalue(tuple, tupdesc, i + 1);
1963 if (val != NULL)
1965 appendStringInfoString(&buf, quote_literal_cstr(val));
1966 pfree(val);
1968 else
1969 appendStringInfoString(&buf, "NULL");
1970 needComma = true;
1973 appendStringInfo(&buf, " WHERE ");
1975 for (i = 0; i < pknumatts; i++)
1977 int16 pkattnum = pkattnums->values[i];
1979 if (i > 0)
1980 appendStringInfo(&buf, " AND ");
1982 appendStringInfo(&buf, "%s",
1983 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
1985 if (tgt_pkattvals != NULL)
1986 val = tgt_pkattvals[i] ? pstrdup(tgt_pkattvals[i]) : NULL;
1987 else
1988 val = SPI_getvalue(tuple, tupdesc, pkattnum);
1990 if (val != NULL)
1992 appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
1993 pfree(val);
1995 else
1996 appendStringInfo(&buf, " IS NULL");
1999 relation_close(rel, AccessShareLock);
2000 return (buf.data);
2004 * Return a properly quoted literal value.
2005 * Uses quote_literal in quote.c
2007 static char *
2008 quote_literal_cstr(char *rawstr)
2010 text *rawstr_text;
2011 text *result_text;
2012 char *result;
2014 rawstr_text = cstring_to_text(rawstr);
2015 result_text = DatumGetTextP(DirectFunctionCall1(quote_literal,
2016 PointerGetDatum(rawstr_text)));
2017 result = text_to_cstring(result_text);
2019 return result;
2023 * Return a properly quoted identifier.
2024 * Uses quote_ident in quote.c
2026 static char *
2027 quote_ident_cstr(char *rawstr)
2029 text *rawstr_text;
2030 text *result_text;
2031 char *result;
2033 rawstr_text = cstring_to_text(rawstr);
2034 result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
2035 PointerGetDatum(rawstr_text)));
2036 result = text_to_cstring(result_text);
2038 return result;
2041 static int16
2042 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key)
2044 int i;
2047 * Not likely a long list anyway, so just scan for the value
2049 for (i = 0; i < pknumatts; i++)
2050 if (key == pkattnums->values[i])
2051 return i;
2053 return -1;
2056 static HeapTuple
2057 get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
2059 Relation rel;
2060 char *relname;
2061 TupleDesc tupdesc;
2062 StringInfoData buf;
2063 int ret;
2064 HeapTuple tuple;
2065 int i;
2067 initStringInfo(&buf);
2069 /* get relation name including any needed schema prefix and quoting */
2070 relname = generate_relation_name(relid);
2073 * Open relation using relid
2075 rel = relation_open(relid, AccessShareLock);
2076 tupdesc = CreateTupleDescCopy(rel->rd_att);
2077 relation_close(rel, AccessShareLock);
2080 * Connect to SPI manager
2082 if ((ret = SPI_connect()) < 0)
2083 /* internal error */
2084 elog(ERROR, "SPI connect failure - returned %d", ret);
2087 * Build sql statement to look up tuple of interest Use src_pkattvals as
2088 * the criteria.
2090 appendStringInfo(&buf, "SELECT * FROM %s WHERE ", relname);
2092 for (i = 0; i < pknumatts; i++)
2094 int16 pkattnum = pkattnums->values[i];
2096 if (i > 0)
2097 appendStringInfo(&buf, " AND ");
2099 appendStringInfoString(&buf,
2100 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
2102 if (src_pkattvals[i] != NULL)
2103 appendStringInfo(&buf, " = %s",
2104 quote_literal_cstr(src_pkattvals[i]));
2105 else
2106 appendStringInfo(&buf, " IS NULL");
2110 * Retrieve the desired tuple
2112 ret = SPI_exec(buf.data, 0);
2113 pfree(buf.data);
2116 * Only allow one qualifying tuple
2118 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2119 ereport(ERROR,
2120 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2121 errmsg("source criteria matched more than one record")));
2123 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2125 SPITupleTable *tuptable = SPI_tuptable;
2127 tuple = SPI_copytuple(tuptable->vals[0]);
2128 SPI_finish();
2130 return tuple;
2132 else
2135 * no qualifying tuples
2137 SPI_finish();
2139 return NULL;
2143 * never reached, but keep compiler quiet
2145 return NULL;
2148 static Oid
2149 get_relid_from_relname(text *relname_text)
2151 RangeVar *relvar;
2152 Relation rel;
2153 Oid relid;
2155 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2156 rel = heap_openrv(relvar, AccessShareLock);
2157 relid = RelationGetRelid(rel);
2158 relation_close(rel, AccessShareLock);
2160 return relid;
2164 * generate_relation_name - copied from ruleutils.c
2165 * Compute the name to display for a relation specified by OID
2167 * The result includes all necessary quoting and schema-prefixing.
2169 static char *
2170 generate_relation_name(Oid relid)
2172 HeapTuple tp;
2173 Form_pg_class reltup;
2174 char *nspname;
2175 char *result;
2177 tp = SearchSysCache(RELOID,
2178 ObjectIdGetDatum(relid),
2179 0, 0, 0);
2180 if (!HeapTupleIsValid(tp))
2181 elog(ERROR, "cache lookup failed for relation %u", relid);
2183 reltup = (Form_pg_class) GETSTRUCT(tp);
2185 /* Qualify the name if not visible in search path */
2186 if (RelationIsVisible(relid))
2187 nspname = NULL;
2188 else
2189 nspname = get_namespace_name(reltup->relnamespace);
2191 result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
2193 ReleaseSysCache(tp);
2195 return result;
2199 static remoteConn *
2200 getConnectionByName(const char *name)
2202 remoteConnHashEnt *hentry;
2203 char key[NAMEDATALEN];
2205 if (!remoteConnHash)
2206 remoteConnHash = createConnHash();
2208 MemSet(key, 0, NAMEDATALEN);
2209 snprintf(key, NAMEDATALEN - 1, "%s", name);
2210 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2211 key, HASH_FIND, NULL);
2213 if (hentry)
2214 return (hentry->rconn);
2216 return (NULL);
2219 static HTAB *
2220 createConnHash(void)
2222 HASHCTL ctl;
2224 ctl.keysize = NAMEDATALEN;
2225 ctl.entrysize = sizeof(remoteConnHashEnt);
2227 return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2230 static void
2231 createNewConnection(const char *name, remoteConn *rconn)
2233 remoteConnHashEnt *hentry;
2234 bool found;
2235 char key[NAMEDATALEN];
2237 if (!remoteConnHash)
2238 remoteConnHash = createConnHash();
2240 MemSet(key, 0, NAMEDATALEN);
2241 snprintf(key, NAMEDATALEN - 1, "%s", name);
2242 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2243 HASH_ENTER, &found);
2245 if (found)
2246 ereport(ERROR,
2247 (errcode(ERRCODE_DUPLICATE_OBJECT),
2248 errmsg("duplicate connection name")));
2250 hentry->rconn = rconn;
2251 strlcpy(hentry->name, name, sizeof(hentry->name));
2254 static void
2255 deleteConnection(const char *name)
2257 remoteConnHashEnt *hentry;
2258 bool found;
2259 char key[NAMEDATALEN];
2261 if (!remoteConnHash)
2262 remoteConnHash = createConnHash();
2264 MemSet(key, 0, NAMEDATALEN);
2265 snprintf(key, NAMEDATALEN - 1, "%s", name);
2267 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2268 key, HASH_REMOVE, &found);
2270 if (!hentry)
2271 ereport(ERROR,
2272 (errcode(ERRCODE_UNDEFINED_OBJECT),
2273 errmsg("undefined connection name")));
2277 static void
2278 dblink_security_check(PGconn *conn, remoteConn *rconn)
2280 if (!superuser())
2282 if (!PQconnectionUsedPassword(conn))
2284 PQfinish(conn);
2285 if (rconn)
2286 pfree(rconn);
2288 ereport(ERROR,
2289 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2290 errmsg("password is required"),
2291 errdetail("Non-superuser cannot connect if the server does not request a password."),
2292 errhint("Target server's authentication method must be changed.")));
2298 * For non-superusers, insist that the connstr specify a password. This
2299 * prevents a password from being picked up from .pgpass, a service file,
2300 * the environment, etc. We don't want the postgres user's passwords
2301 * to be accessible to non-superusers.
2303 static void
2304 dblink_connstr_check(const char *connstr)
2306 if (!superuser())
2308 PQconninfoOption *options;
2309 PQconninfoOption *option;
2310 bool connstr_gives_password = false;
2312 options = PQconninfoParse(connstr, NULL);
2313 if (options)
2315 for (option = options; option->keyword != NULL; option++)
2317 if (strcmp(option->keyword, "password") == 0)
2319 if (option->val != NULL && option->val[0] != '\0')
2321 connstr_gives_password = true;
2322 break;
2326 PQconninfoFree(options);
2329 if (!connstr_gives_password)
2330 ereport(ERROR,
2331 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2332 errmsg("password is required"),
2333 errdetail("Non-superusers must provide a password in the connection string.")));
2337 static void
2338 dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
2340 int level;
2341 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2342 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2343 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2344 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2345 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2346 int sqlstate;
2347 char *message_primary;
2348 char *message_detail;
2349 char *message_hint;
2350 char *message_context;
2351 const char *dblink_context_conname = "unnamed";
2353 if (fail)
2354 level = ERROR;
2355 else
2356 level = NOTICE;
2358 if (pg_diag_sqlstate)
2359 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2360 pg_diag_sqlstate[1],
2361 pg_diag_sqlstate[2],
2362 pg_diag_sqlstate[3],
2363 pg_diag_sqlstate[4]);
2364 else
2365 sqlstate = ERRCODE_CONNECTION_FAILURE;
2367 xpstrdup(message_primary, pg_diag_message_primary);
2368 xpstrdup(message_detail, pg_diag_message_detail);
2369 xpstrdup(message_hint, pg_diag_message_hint);
2370 xpstrdup(message_context, pg_diag_context);
2372 if (res)
2373 PQclear(res);
2375 if (conname)
2376 dblink_context_conname = conname;
2378 ereport(level,
2379 (errcode(sqlstate),
2380 message_primary ? errmsg("%s", message_primary) : errmsg("unknown error"),
2381 message_detail ? errdetail("%s", message_detail) : 0,
2382 message_hint ? errhint("%s", message_hint) : 0,
2383 message_context ? errcontext("%s", message_context) : 0,
2384 errcontext("Error occurred on dblink connection named \"%s\": %s.",
2385 dblink_context_conname, dblink_context_msg)));
2389 * Obtain connection string for a foreign server
2391 static char *
2392 get_connect_string(const char *servername)
2394 ForeignServer *foreign_server = NULL;
2395 UserMapping *user_mapping;
2396 ListCell *cell;
2397 StringInfo buf = makeStringInfo();
2398 ForeignDataWrapper *fdw;
2399 AclResult aclresult;
2401 /* first gather the server connstr options */
2402 if (strlen(servername) < NAMEDATALEN)
2403 foreign_server = GetForeignServerByName(servername, true);
2405 if (foreign_server)
2407 Oid serverid = foreign_server->serverid;
2408 Oid fdwid = foreign_server->fdwid;
2409 Oid userid = GetUserId();
2411 user_mapping = GetUserMapping(userid, serverid);
2412 fdw = GetForeignDataWrapper(fdwid);
2414 /* Check permissions, user must have usage on the server. */
2415 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2416 if (aclresult != ACLCHECK_OK)
2417 aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
2419 foreach(cell, fdw->options)
2421 DefElem *def = lfirst(cell);
2423 appendStringInfo(buf, "%s='%s' ", def->defname,
2424 escape_param_str(strVal(def->arg)));
2427 foreach(cell, foreign_server->options)
2429 DefElem *def = lfirst(cell);
2431 appendStringInfo(buf, "%s='%s' ", def->defname,
2432 escape_param_str(strVal(def->arg)));
2435 foreach(cell, user_mapping->options)
2438 DefElem *def = lfirst(cell);
2440 appendStringInfo(buf, "%s='%s' ", def->defname,
2441 escape_param_str(strVal(def->arg)));
2444 return buf->data;
2446 else
2447 return NULL;
2451 * Escaping libpq connect parameter strings.
2453 * Replaces "'" with "\'" and "\" with "\\".
2455 static char *
2456 escape_param_str(const char *str)
2458 const char *cp;
2459 StringInfo buf = makeStringInfo();
2461 for (cp = str; *cp; cp++)
2463 if (*cp == '\\' || *cp == '\'')
2464 appendStringInfoChar(buf, '\\');
2465 appendStringInfoChar(buf, *cp);
2468 return buf->data;