Fix pg_dump bug in the database-level collation patch. "datcollate" and
[PostgreSQL.git] / contrib / dblink / dblink.c
blob7a9ddcdb5b7425225c2476e38576d24f73273c5b
1 /*
2 * dblink.c
4 * Functions returning results from a remote database
6 * Joe Conway <mail@joeconway.com>
7 * And contributors:
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
11 * $PostgreSQL$
12 * Copyright (c) 2001-2008, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
33 #include "postgres.h"
35 #include <limits.h>
37 #include "libpq-fe.h"
38 #include "fmgr.h"
39 #include "funcapi.h"
40 #include "access/genam.h"
41 #include "access/heapam.h"
42 #include "access/tupdesc.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_index.h"
46 #include "catalog/pg_type.h"
47 #include "executor/executor.h"
48 #include "executor/spi.h"
49 #include "lib/stringinfo.h"
50 #include "miscadmin.h"
51 #include "nodes/execnodes.h"
52 #include "nodes/nodes.h"
53 #include "nodes/pg_list.h"
54 #include "parser/parse_type.h"
55 #include "utils/acl.h"
56 #include "utils/array.h"
57 #include "utils/builtins.h"
58 #include "utils/dynahash.h"
59 #include "utils/fmgroids.h"
60 #include "utils/hsearch.h"
61 #include "utils/lsyscache.h"
62 #include "utils/memutils.h"
63 #include "utils/syscache.h"
64 #include "utils/tqual.h"
66 #include "dblink.h"
68 PG_MODULE_MAGIC;
70 typedef struct remoteConn
72 PGconn *conn; /* Hold the remote connection */
73 int openCursorCount; /* The number of open cursors */
74 bool newXactForCursor; /* Opened a transaction for a cursor */
75 } remoteConn;
78 * Internal declarations
80 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
81 static remoteConn *getConnectionByName(const char *name);
82 static HTAB *createConnHash(void);
83 static void createNewConnection(const char *name, remoteConn * rconn);
84 static void deleteConnection(const char *name);
85 static char **get_pkey_attnames(Oid relid, int16 *numatts);
86 static char **get_text_array_contents(ArrayType *array, int *numitems);
87 static char *get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
88 static char *get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
89 static char *get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
90 static char *quote_literal_cstr(char *rawstr);
91 static char *quote_ident_cstr(char *rawstr);
92 static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key);
93 static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
94 static Oid get_relid_from_relname(text *relname_text);
95 static char *generate_relation_name(Oid relid);
96 static void dblink_connstr_check(const char *connstr);
97 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
98 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
100 /* Global */
101 static remoteConn *pconn = NULL;
102 static HTAB *remoteConnHash = NULL;
105 * Following is list that holds multiple remote connections.
106 * Calling convention of each dblink function changes to accept
107 * connection name as the first parameter. The connection list is
108 * much like ecpg e.g. a mapping between a name and a PGconn object.
111 typedef struct remoteConnHashEnt
113 char name[NAMEDATALEN];
114 remoteConn *rconn;
115 } remoteConnHashEnt;
117 /* initial number of connection hashes */
118 #define NUMCONN 16
120 /* general utility */
121 #define xpfree(var_) \
122 do { \
123 if (var_ != NULL) \
125 pfree(var_); \
126 var_ = NULL; \
128 } while (0)
130 #define xpstrdup(var_c, var_) \
131 do { \
132 if (var_ != NULL) \
133 var_c = pstrdup(var_); \
134 else \
135 var_c = NULL; \
136 } while (0)
138 #define DBLINK_RES_INTERNALERROR(p2) \
139 do { \
140 msg = pstrdup(PQerrorMessage(conn)); \
141 if (res) \
142 PQclear(res); \
143 elog(ERROR, "%s: %s", p2, msg); \
144 } while (0)
146 #define DBLINK_CONN_NOT_AVAIL \
147 do { \
148 if(conname) \
149 ereport(ERROR, \
150 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
151 errmsg("connection \"%s\" not available", conname))); \
152 else \
153 ereport(ERROR, \
154 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
155 errmsg("connection not available"))); \
156 } while (0)
158 #define DBLINK_GET_CONN \
159 do { \
160 char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
161 rconn = getConnectionByName(conname_or_str); \
162 if(rconn) \
164 conn = rconn->conn; \
166 else \
168 connstr = conname_or_str; \
169 dblink_connstr_check(connstr); \
170 conn = PQconnectdb(connstr); \
171 if (PQstatus(conn) == CONNECTION_BAD) \
173 msg = pstrdup(PQerrorMessage(conn)); \
174 PQfinish(conn); \
175 ereport(ERROR, \
176 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
177 errmsg("could not establish connection"), \
178 errdetail("%s", msg))); \
180 dblink_security_check(conn, rconn); \
181 freeconn = true; \
183 } while (0)
185 #define DBLINK_GET_NAMED_CONN \
186 do { \
187 char *conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
188 rconn = getConnectionByName(conname); \
189 if(rconn) \
190 conn = rconn->conn; \
191 else \
192 DBLINK_CONN_NOT_AVAIL; \
193 } while (0)
195 #define DBLINK_INIT \
196 do { \
197 if (!pconn) \
199 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
200 pconn->conn = NULL; \
201 pconn->openCursorCount = 0; \
202 pconn->newXactForCursor = FALSE; \
204 } while (0)
207 * Create a persistent connection to another database
209 PG_FUNCTION_INFO_V1(dblink_connect);
210 Datum
211 dblink_connect(PG_FUNCTION_ARGS)
213 char *connstr = NULL;
214 char *connname = NULL;
215 char *msg;
216 MemoryContext oldcontext;
217 PGconn *conn = NULL;
218 remoteConn *rconn = NULL;
220 DBLINK_INIT;
222 if (PG_NARGS() == 2)
224 connstr = text_to_cstring(PG_GETARG_TEXT_PP(1));
225 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
227 else if (PG_NARGS() == 1)
228 connstr = text_to_cstring(PG_GETARG_TEXT_PP(0));
230 oldcontext = MemoryContextSwitchTo(TopMemoryContext);
232 if (connname)
233 rconn = (remoteConn *) palloc(sizeof(remoteConn));
235 /* check password in connection string if not superuser */
236 dblink_connstr_check(connstr);
237 conn = PQconnectdb(connstr);
239 MemoryContextSwitchTo(oldcontext);
241 if (PQstatus(conn) == CONNECTION_BAD)
243 msg = pstrdup(PQerrorMessage(conn));
244 PQfinish(conn);
245 if (rconn)
246 pfree(rconn);
248 ereport(ERROR,
249 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
250 errmsg("could not establish connection"),
251 errdetail("%s", msg)));
254 /* check password actually used if not superuser */
255 dblink_security_check(conn, rconn);
257 if (connname)
259 rconn->conn = conn;
260 createNewConnection(connname, rconn);
262 else
263 pconn->conn = conn;
265 PG_RETURN_TEXT_P(cstring_to_text("OK"));
269 * Clear a persistent connection to another database
271 PG_FUNCTION_INFO_V1(dblink_disconnect);
272 Datum
273 dblink_disconnect(PG_FUNCTION_ARGS)
275 char *conname = NULL;
276 remoteConn *rconn = NULL;
277 PGconn *conn = NULL;
279 DBLINK_INIT;
281 if (PG_NARGS() == 1)
283 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
284 rconn = getConnectionByName(conname);
285 if (rconn)
286 conn = rconn->conn;
288 else
289 conn = pconn->conn;
291 if (!conn)
292 DBLINK_CONN_NOT_AVAIL;
294 PQfinish(conn);
295 if (rconn)
297 deleteConnection(conname);
298 pfree(rconn);
300 else
301 pconn->conn = NULL;
303 PG_RETURN_TEXT_P(cstring_to_text("OK"));
307 * opens a cursor using a persistent connection
309 PG_FUNCTION_INFO_V1(dblink_open);
310 Datum
311 dblink_open(PG_FUNCTION_ARGS)
313 char *msg;
314 PGresult *res = NULL;
315 PGconn *conn = NULL;
316 char *curname = NULL;
317 char *sql = NULL;
318 char *conname = NULL;
319 StringInfoData buf;
320 remoteConn *rconn = NULL;
321 bool fail = true; /* default to backward compatible behavior */
323 DBLINK_INIT;
324 initStringInfo(&buf);
326 if (PG_NARGS() == 2)
328 /* text,text */
329 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
330 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
331 rconn = pconn;
333 else if (PG_NARGS() == 3)
335 /* might be text,text,text or text,text,bool */
336 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
338 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
339 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
340 fail = PG_GETARG_BOOL(2);
341 rconn = pconn;
343 else
345 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
346 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
347 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
348 rconn = getConnectionByName(conname);
351 else if (PG_NARGS() == 4)
353 /* text,text,text,bool */
354 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
355 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
356 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
357 fail = PG_GETARG_BOOL(3);
358 rconn = getConnectionByName(conname);
361 if (!rconn || !rconn->conn)
362 DBLINK_CONN_NOT_AVAIL;
363 else
364 conn = rconn->conn;
366 /* If we are not in a transaction, start one */
367 if (PQtransactionStatus(conn) == PQTRANS_IDLE)
369 res = PQexec(conn, "BEGIN");
370 if (PQresultStatus(res) != PGRES_COMMAND_OK)
371 DBLINK_RES_INTERNALERROR("begin error");
372 PQclear(res);
373 rconn->newXactForCursor = TRUE;
376 * Since transaction state was IDLE, we force cursor count to
377 * initially be 0. This is needed as a previous ABORT might have wiped
378 * out our transaction without maintaining the cursor count for us.
380 rconn->openCursorCount = 0;
383 /* if we started a transaction, increment cursor count */
384 if (rconn->newXactForCursor)
385 (rconn->openCursorCount)++;
387 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
388 res = PQexec(conn, buf.data);
389 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
391 dblink_res_error(conname, res, "could not open cursor", fail);
392 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
395 PQclear(res);
396 PG_RETURN_TEXT_P(cstring_to_text("OK"));
400 * closes a cursor
402 PG_FUNCTION_INFO_V1(dblink_close);
403 Datum
404 dblink_close(PG_FUNCTION_ARGS)
406 PGconn *conn = NULL;
407 PGresult *res = NULL;
408 char *curname = NULL;
409 char *conname = NULL;
410 StringInfoData buf;
411 char *msg;
412 remoteConn *rconn = NULL;
413 bool fail = true; /* default to backward compatible behavior */
415 DBLINK_INIT;
416 initStringInfo(&buf);
418 if (PG_NARGS() == 1)
420 /* text */
421 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
422 rconn = pconn;
424 else if (PG_NARGS() == 2)
426 /* might be text,text or text,bool */
427 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
429 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
430 fail = PG_GETARG_BOOL(1);
431 rconn = pconn;
433 else
435 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
436 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
437 rconn = getConnectionByName(conname);
440 if (PG_NARGS() == 3)
442 /* text,text,bool */
443 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
444 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
445 fail = PG_GETARG_BOOL(2);
446 rconn = getConnectionByName(conname);
449 if (!rconn || !rconn->conn)
450 DBLINK_CONN_NOT_AVAIL;
451 else
452 conn = rconn->conn;
454 appendStringInfo(&buf, "CLOSE %s", curname);
456 /* close the cursor */
457 res = PQexec(conn, buf.data);
458 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
460 dblink_res_error(conname, res, "could not close cursor", fail);
461 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
464 PQclear(res);
466 /* if we started a transaction, decrement cursor count */
467 if (rconn->newXactForCursor)
469 (rconn->openCursorCount)--;
471 /* if count is zero, commit the transaction */
472 if (rconn->openCursorCount == 0)
474 rconn->newXactForCursor = FALSE;
476 res = PQexec(conn, "COMMIT");
477 if (PQresultStatus(res) != PGRES_COMMAND_OK)
478 DBLINK_RES_INTERNALERROR("commit error");
479 PQclear(res);
483 PG_RETURN_TEXT_P(cstring_to_text("OK"));
487 * Fetch results from an open cursor
489 PG_FUNCTION_INFO_V1(dblink_fetch);
490 Datum
491 dblink_fetch(PG_FUNCTION_ARGS)
493 FuncCallContext *funcctx;
494 TupleDesc tupdesc = NULL;
495 int call_cntr;
496 int max_calls;
497 AttInMetadata *attinmeta;
498 PGresult *res = NULL;
499 MemoryContext oldcontext;
500 char *conname = NULL;
501 remoteConn *rconn = NULL;
503 DBLINK_INIT;
505 /* stuff done only on the first call of the function */
506 if (SRF_IS_FIRSTCALL())
508 PGconn *conn = NULL;
509 StringInfoData buf;
510 char *curname = NULL;
511 int howmany = 0;
512 bool fail = true; /* default to backward compatible */
514 if (PG_NARGS() == 4)
516 /* text,text,int,bool */
517 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
518 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
519 howmany = PG_GETARG_INT32(2);
520 fail = PG_GETARG_BOOL(3);
522 rconn = getConnectionByName(conname);
523 if (rconn)
524 conn = rconn->conn;
526 else if (PG_NARGS() == 3)
528 /* text,text,int or text,int,bool */
529 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
531 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
532 howmany = PG_GETARG_INT32(1);
533 fail = PG_GETARG_BOOL(2);
534 conn = pconn->conn;
536 else
538 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
539 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
540 howmany = PG_GETARG_INT32(2);
542 rconn = getConnectionByName(conname);
543 if (rconn)
544 conn = rconn->conn;
547 else if (PG_NARGS() == 2)
549 /* text,int */
550 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
551 howmany = PG_GETARG_INT32(1);
552 conn = pconn->conn;
555 if (!conn)
556 DBLINK_CONN_NOT_AVAIL;
558 initStringInfo(&buf);
559 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
561 /* create a function context for cross-call persistence */
562 funcctx = SRF_FIRSTCALL_INIT();
565 * switch to memory context appropriate for multiple function calls
567 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
569 res = PQexec(conn, buf.data);
570 if (!res ||
571 (PQresultStatus(res) != PGRES_COMMAND_OK &&
572 PQresultStatus(res) != PGRES_TUPLES_OK))
574 dblink_res_error(conname, res, "could not fetch from cursor", fail);
575 SRF_RETURN_DONE(funcctx);
577 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
579 /* cursor does not exist - closed already or bad name */
580 PQclear(res);
581 ereport(ERROR,
582 (errcode(ERRCODE_INVALID_CURSOR_NAME),
583 errmsg("cursor \"%s\" does not exist", curname)));
586 funcctx->max_calls = PQntuples(res);
588 /* got results, keep track of them */
589 funcctx->user_fctx = res;
591 /* get a tuple descriptor for our result type */
592 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
594 case TYPEFUNC_COMPOSITE:
595 /* success */
596 break;
597 case TYPEFUNC_RECORD:
598 /* failed to determine actual type of RECORD */
599 ereport(ERROR,
600 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
601 errmsg("function returning record called in context "
602 "that cannot accept type record")));
603 break;
604 default:
605 /* result type isn't composite */
606 elog(ERROR, "return type must be a row type");
607 break;
610 /* make sure we have a persistent copy of the tupdesc */
611 tupdesc = CreateTupleDescCopy(tupdesc);
613 /* check result and tuple descriptor have the same number of columns */
614 if (PQnfields(res) != tupdesc->natts)
615 ereport(ERROR,
616 (errcode(ERRCODE_DATATYPE_MISMATCH),
617 errmsg("remote query result rowtype does not match "
618 "the specified FROM clause rowtype")));
620 /* fast track when no results */
621 if (funcctx->max_calls < 1)
623 if (res)
624 PQclear(res);
625 SRF_RETURN_DONE(funcctx);
628 /* store needed metadata for subsequent calls */
629 attinmeta = TupleDescGetAttInMetadata(tupdesc);
630 funcctx->attinmeta = attinmeta;
632 MemoryContextSwitchTo(oldcontext);
635 /* stuff done on every call of the function */
636 funcctx = SRF_PERCALL_SETUP();
639 * initialize per-call variables
641 call_cntr = funcctx->call_cntr;
642 max_calls = funcctx->max_calls;
644 res = (PGresult *) funcctx->user_fctx;
645 attinmeta = funcctx->attinmeta;
646 tupdesc = attinmeta->tupdesc;
648 if (call_cntr < max_calls) /* do when there is more left to send */
650 char **values;
651 HeapTuple tuple;
652 Datum result;
653 int i;
654 int nfields = PQnfields(res);
656 values = (char **) palloc(nfields * sizeof(char *));
657 for (i = 0; i < nfields; i++)
659 if (PQgetisnull(res, call_cntr, i) == 0)
660 values[i] = PQgetvalue(res, call_cntr, i);
661 else
662 values[i] = NULL;
665 /* build the tuple */
666 tuple = BuildTupleFromCStrings(attinmeta, values);
668 /* make the tuple into a datum */
669 result = HeapTupleGetDatum(tuple);
671 SRF_RETURN_NEXT(funcctx, result);
673 else
675 /* do when there is no more left */
676 PQclear(res);
677 SRF_RETURN_DONE(funcctx);
682 * Note: this is the new preferred version of dblink
684 PG_FUNCTION_INFO_V1(dblink_record);
685 Datum
686 dblink_record(PG_FUNCTION_ARGS)
688 return dblink_record_internal(fcinfo, false, false);
691 PG_FUNCTION_INFO_V1(dblink_send_query);
692 Datum
693 dblink_send_query(PG_FUNCTION_ARGS)
695 return dblink_record_internal(fcinfo, true, false);
698 PG_FUNCTION_INFO_V1(dblink_get_result);
699 Datum
700 dblink_get_result(PG_FUNCTION_ARGS)
702 return dblink_record_internal(fcinfo, true, true);
705 static Datum
706 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
708 FuncCallContext *funcctx;
709 TupleDesc tupdesc = NULL;
710 int call_cntr;
711 int max_calls;
712 AttInMetadata *attinmeta;
713 char *msg;
714 PGresult *res = NULL;
715 bool is_sql_cmd = false;
716 char *sql_cmd_status = NULL;
717 MemoryContext oldcontext;
718 bool freeconn = false;
720 DBLINK_INIT;
722 /* stuff done only on the first call of the function */
723 if (SRF_IS_FIRSTCALL())
725 PGconn *conn = NULL;
726 char *connstr = NULL;
727 char *sql = NULL;
728 char *conname = NULL;
729 remoteConn *rconn = NULL;
730 bool fail = true; /* default to backward compatible */
732 /* create a function context for cross-call persistence */
733 funcctx = SRF_FIRSTCALL_INIT();
736 * switch to memory context appropriate for multiple function calls
738 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
740 if (!is_async)
742 if (PG_NARGS() == 3)
744 /* text,text,bool */
745 DBLINK_GET_CONN;
746 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
747 fail = PG_GETARG_BOOL(2);
749 else if (PG_NARGS() == 2)
751 /* text,text or text,bool */
752 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
754 conn = pconn->conn;
755 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
756 fail = PG_GETARG_BOOL(1);
758 else
760 DBLINK_GET_CONN;
761 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
764 else if (PG_NARGS() == 1)
766 /* text */
767 conn = pconn->conn;
768 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
770 else
771 /* shouldn't happen */
772 elog(ERROR, "wrong number of arguments");
774 else if (is_async && do_get)
776 /* get async result */
777 if (PG_NARGS() == 2)
779 /* text,bool */
780 DBLINK_GET_CONN;
781 fail = PG_GETARG_BOOL(2);
783 else if (PG_NARGS() == 1)
785 /* text */
786 DBLINK_GET_CONN;
788 else
789 /* shouldn't happen */
790 elog(ERROR, "wrong number of arguments");
792 else
794 /* send async query */
795 if (PG_NARGS() == 2)
797 DBLINK_GET_CONN;
798 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
800 else
801 /* shouldn't happen */
802 elog(ERROR, "wrong number of arguments");
805 if (!conn)
806 DBLINK_CONN_NOT_AVAIL;
808 if (!is_async || (is_async && do_get))
810 /* synchronous query, or async result retrieval */
811 if (!is_async)
812 res = PQexec(conn, sql);
813 else
815 res = PQgetResult(conn);
816 /* NULL means we're all done with the async results */
817 if (!res)
818 SRF_RETURN_DONE(funcctx);
821 if (!res ||
822 (PQresultStatus(res) != PGRES_COMMAND_OK &&
823 PQresultStatus(res) != PGRES_TUPLES_OK))
825 dblink_res_error(conname, res, "could not execute query", fail);
826 if (freeconn)
827 PQfinish(conn);
828 SRF_RETURN_DONE(funcctx);
831 if (PQresultStatus(res) == PGRES_COMMAND_OK)
833 is_sql_cmd = true;
835 /* need a tuple descriptor representing one TEXT column */
836 tupdesc = CreateTemplateTupleDesc(1, false);
837 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
838 TEXTOID, -1, 0);
841 * and save a copy of the command status string to return as
842 * our result tuple
844 sql_cmd_status = PQcmdStatus(res);
845 funcctx->max_calls = 1;
847 else
848 funcctx->max_calls = PQntuples(res);
850 /* got results, keep track of them */
851 funcctx->user_fctx = res;
853 /* if needed, close the connection to the database and cleanup */
854 if (freeconn)
855 PQfinish(conn);
857 if (!is_sql_cmd)
859 /* get a tuple descriptor for our result type */
860 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
862 case TYPEFUNC_COMPOSITE:
863 /* success */
864 break;
865 case TYPEFUNC_RECORD:
866 /* failed to determine actual type of RECORD */
867 ereport(ERROR,
868 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
869 errmsg("function returning record called in context "
870 "that cannot accept type record")));
871 break;
872 default:
873 /* result type isn't composite */
874 elog(ERROR, "return type must be a row type");
875 break;
878 /* make sure we have a persistent copy of the tupdesc */
879 tupdesc = CreateTupleDescCopy(tupdesc);
883 * check result and tuple descriptor have the same number of
884 * columns
886 if (PQnfields(res) != tupdesc->natts)
887 ereport(ERROR,
888 (errcode(ERRCODE_DATATYPE_MISMATCH),
889 errmsg("remote query result rowtype does not match "
890 "the specified FROM clause rowtype")));
892 /* fast track when no results */
893 if (funcctx->max_calls < 1)
895 if (res)
896 PQclear(res);
897 SRF_RETURN_DONE(funcctx);
900 /* store needed metadata for subsequent calls */
901 attinmeta = TupleDescGetAttInMetadata(tupdesc);
902 funcctx->attinmeta = attinmeta;
904 MemoryContextSwitchTo(oldcontext);
906 else
908 /* async query send */
909 MemoryContextSwitchTo(oldcontext);
910 PG_RETURN_INT32(PQsendQuery(conn, sql));
914 if (is_async && !do_get)
916 /* async query send -- should not happen */
917 elog(ERROR, "async query send called more than once");
921 /* stuff done on every call of the function */
922 funcctx = SRF_PERCALL_SETUP();
925 * initialize per-call variables
927 call_cntr = funcctx->call_cntr;
928 max_calls = funcctx->max_calls;
930 res = (PGresult *) funcctx->user_fctx;
931 attinmeta = funcctx->attinmeta;
932 tupdesc = attinmeta->tupdesc;
934 if (call_cntr < max_calls) /* do when there is more left to send */
936 char **values;
937 HeapTuple tuple;
938 Datum result;
940 if (!is_sql_cmd)
942 int i;
943 int nfields = PQnfields(res);
945 values = (char **) palloc(nfields * sizeof(char *));
946 for (i = 0; i < nfields; i++)
948 if (PQgetisnull(res, call_cntr, i) == 0)
949 values[i] = PQgetvalue(res, call_cntr, i);
950 else
951 values[i] = NULL;
954 else
956 values = (char **) palloc(1 * sizeof(char *));
957 values[0] = sql_cmd_status;
960 /* build the tuple */
961 tuple = BuildTupleFromCStrings(attinmeta, values);
963 /* make the tuple into a datum */
964 result = HeapTupleGetDatum(tuple);
966 SRF_RETURN_NEXT(funcctx, result);
968 else
970 /* do when there is no more left */
971 PQclear(res);
972 SRF_RETURN_DONE(funcctx);
977 * List all open dblink connections by name.
978 * Returns an array of all connection names.
979 * Takes no params
981 PG_FUNCTION_INFO_V1(dblink_get_connections);
982 Datum
983 dblink_get_connections(PG_FUNCTION_ARGS)
985 HASH_SEQ_STATUS status;
986 remoteConnHashEnt *hentry;
987 ArrayBuildState *astate = NULL;
989 if (remoteConnHash)
991 hash_seq_init(&status, remoteConnHash);
992 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
994 /* stash away current value */
995 astate = accumArrayResult(astate,
996 CStringGetTextDatum(hentry->name),
997 false, TEXTOID, CurrentMemoryContext);
1001 if (astate)
1002 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
1003 CurrentMemoryContext));
1004 else
1005 PG_RETURN_NULL();
1009 * Checks if a given remote connection is busy
1011 * Returns 1 if the connection is busy, 0 otherwise
1012 * Params:
1013 * text connection_name - name of the connection to check
1016 PG_FUNCTION_INFO_V1(dblink_is_busy);
1017 Datum
1018 dblink_is_busy(PG_FUNCTION_ARGS)
1020 PGconn *conn = NULL;
1021 remoteConn *rconn = NULL;
1023 DBLINK_INIT;
1024 DBLINK_GET_NAMED_CONN;
1026 PQconsumeInput(conn);
1027 PG_RETURN_INT32(PQisBusy(conn));
1031 * Cancels a running request on a connection
1033 * Returns text:
1034 * "OK" if the cancel request has been sent correctly,
1035 * an error message otherwise
1037 * Params:
1038 * text connection_name - name of the connection to check
1041 PG_FUNCTION_INFO_V1(dblink_cancel_query);
1042 Datum
1043 dblink_cancel_query(PG_FUNCTION_ARGS)
1045 int res = 0;
1046 PGconn *conn = NULL;
1047 remoteConn *rconn = NULL;
1048 PGcancel *cancel;
1049 char errbuf[256];
1051 DBLINK_INIT;
1052 DBLINK_GET_NAMED_CONN;
1053 cancel = PQgetCancel(conn);
1055 res = PQcancel(cancel, errbuf, 256);
1056 PQfreeCancel(cancel);
1058 if (res == 1)
1059 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1060 else
1061 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1066 * Get error message from a connection
1068 * Returns text:
1069 * "OK" if no error, an error message otherwise
1071 * Params:
1072 * text connection_name - name of the connection to check
1075 PG_FUNCTION_INFO_V1(dblink_error_message);
1076 Datum
1077 dblink_error_message(PG_FUNCTION_ARGS)
1079 char *msg;
1080 PGconn *conn = NULL;
1081 remoteConn *rconn = NULL;
1083 DBLINK_INIT;
1084 DBLINK_GET_NAMED_CONN;
1086 msg = PQerrorMessage(conn);
1087 if (msg == NULL || msg[0] == '\0')
1088 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1089 else
1090 PG_RETURN_TEXT_P(cstring_to_text(msg));
1094 * Execute an SQL non-SELECT command
1096 PG_FUNCTION_INFO_V1(dblink_exec);
1097 Datum
1098 dblink_exec(PG_FUNCTION_ARGS)
1100 char *msg;
1101 PGresult *res = NULL;
1102 text *sql_cmd_status = NULL;
1103 TupleDesc tupdesc = NULL;
1104 PGconn *conn = NULL;
1105 char *connstr = NULL;
1106 char *sql = NULL;
1107 char *conname = NULL;
1108 remoteConn *rconn = NULL;
1109 bool freeconn = false;
1110 bool fail = true; /* default to backward compatible behavior */
1112 DBLINK_INIT;
1114 if (PG_NARGS() == 3)
1116 /* must be text,text,bool */
1117 DBLINK_GET_CONN;
1118 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1119 fail = PG_GETARG_BOOL(2);
1121 else if (PG_NARGS() == 2)
1123 /* might be text,text or text,bool */
1124 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1126 conn = pconn->conn;
1127 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1128 fail = PG_GETARG_BOOL(1);
1130 else
1132 DBLINK_GET_CONN;
1133 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1136 else if (PG_NARGS() == 1)
1138 /* must be single text argument */
1139 conn = pconn->conn;
1140 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1142 else
1143 /* shouldn't happen */
1144 elog(ERROR, "wrong number of arguments");
1146 if (!conn)
1147 DBLINK_CONN_NOT_AVAIL;
1149 res = PQexec(conn, sql);
1150 if (!res ||
1151 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1152 PQresultStatus(res) != PGRES_TUPLES_OK))
1154 dblink_res_error(conname, res, "could not execute command", fail);
1156 /* need a tuple descriptor representing one TEXT column */
1157 tupdesc = CreateTemplateTupleDesc(1, false);
1158 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1159 TEXTOID, -1, 0);
1162 * and save a copy of the command status string to return as our
1163 * result tuple
1165 sql_cmd_status = cstring_to_text("ERROR");
1167 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1169 /* need a tuple descriptor representing one TEXT column */
1170 tupdesc = CreateTemplateTupleDesc(1, false);
1171 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1172 TEXTOID, -1, 0);
1175 * and save a copy of the command status string to return as our
1176 * result tuple
1178 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1179 PQclear(res);
1181 else
1183 PQclear(res);
1184 ereport(ERROR,
1185 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1186 errmsg("statement returning results not allowed")));
1189 /* if needed, close the connection to the database and cleanup */
1190 if (freeconn)
1191 PQfinish(conn);
1193 PG_RETURN_TEXT_P(sql_cmd_status);
1198 * dblink_get_pkey
1200 * Return list of primary key fields for the supplied relation,
1201 * or NULL if none exists.
1203 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1204 Datum
1205 dblink_get_pkey(PG_FUNCTION_ARGS)
1207 int16 numatts;
1208 Oid relid;
1209 char **results;
1210 FuncCallContext *funcctx;
1211 int32 call_cntr;
1212 int32 max_calls;
1213 AttInMetadata *attinmeta;
1214 MemoryContext oldcontext;
1216 /* stuff done only on the first call of the function */
1217 if (SRF_IS_FIRSTCALL())
1219 TupleDesc tupdesc = NULL;
1221 /* create a function context for cross-call persistence */
1222 funcctx = SRF_FIRSTCALL_INIT();
1225 * switch to memory context appropriate for multiple function calls
1227 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1229 /* convert relname to rel Oid */
1230 relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
1231 if (!OidIsValid(relid))
1232 ereport(ERROR,
1233 (errcode(ERRCODE_UNDEFINED_TABLE),
1234 errmsg("relation \"%s\" does not exist",
1235 text_to_cstring(PG_GETARG_TEXT_PP(0)))));
1238 * need a tuple descriptor representing one INT and one TEXT column
1240 tupdesc = CreateTemplateTupleDesc(2, false);
1241 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1242 INT4OID, -1, 0);
1243 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1244 TEXTOID, -1, 0);
1247 * Generate attribute metadata needed later to produce tuples from raw
1248 * C strings
1250 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1251 funcctx->attinmeta = attinmeta;
1253 /* get an array of attnums */
1254 results = get_pkey_attnames(relid, &numatts);
1256 if ((results != NULL) && (numatts > 0))
1258 funcctx->max_calls = numatts;
1260 /* got results, keep track of them */
1261 funcctx->user_fctx = results;
1263 else
1264 /* fast track when no results */
1265 SRF_RETURN_DONE(funcctx);
1267 MemoryContextSwitchTo(oldcontext);
1270 /* stuff done on every call of the function */
1271 funcctx = SRF_PERCALL_SETUP();
1274 * initialize per-call variables
1276 call_cntr = funcctx->call_cntr;
1277 max_calls = funcctx->max_calls;
1279 results = (char **) funcctx->user_fctx;
1280 attinmeta = funcctx->attinmeta;
1282 if (call_cntr < max_calls) /* do when there is more left to send */
1284 char **values;
1285 HeapTuple tuple;
1286 Datum result;
1288 values = (char **) palloc(2 * sizeof(char *));
1289 values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */
1291 sprintf(values[0], "%d", call_cntr + 1);
1293 values[1] = results[call_cntr];
1295 /* build the tuple */
1296 tuple = BuildTupleFromCStrings(attinmeta, values);
1298 /* make the tuple into a datum */
1299 result = HeapTupleGetDatum(tuple);
1301 SRF_RETURN_NEXT(funcctx, result);
1303 else
1305 /* do when there is no more left */
1306 SRF_RETURN_DONE(funcctx);
1312 * dblink_build_sql_insert
1314 * Used to generate an SQL insert statement
1315 * based on an existing tuple in a local relation.
1316 * This is useful for selectively replicating data
1317 * to another server via dblink.
1319 * API:
1320 * <relname> - name of local table of interest
1321 * <pkattnums> - an int2vector of attnums which will be used
1322 * to identify the local tuple of interest
1323 * <pknumatts> - number of attnums in pkattnums
1324 * <src_pkattvals_arry> - text array of key values which will be used
1325 * to identify the local tuple of interest
1326 * <tgt_pkattvals_arry> - text array of key values which will be used
1327 * to build the string for execution remotely. These are substituted
1328 * for their counterparts in src_pkattvals_arry
1330 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1331 Datum
1332 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1334 text *relname_text = PG_GETARG_TEXT_P(0);
1335 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1336 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1337 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1338 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1339 Oid relid;
1340 int16 pknumatts = 0;
1341 char **src_pkattvals;
1342 char **tgt_pkattvals;
1343 int src_nitems;
1344 int tgt_nitems;
1345 char *sql;
1348 * Convert relname to rel OID.
1350 relid = get_relid_from_relname(relname_text);
1351 if (!OidIsValid(relid))
1352 ereport(ERROR,
1353 (errcode(ERRCODE_UNDEFINED_TABLE),
1354 errmsg("relation \"%s\" does not exist",
1355 text_to_cstring(relname_text))));
1358 * There should be at least one key attribute
1360 if (pknumatts_tmp <= 0)
1361 ereport(ERROR,
1362 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1363 errmsg("number of key attributes must be > 0")));
1365 if (pknumatts_tmp <= SHRT_MAX)
1366 pknumatts = pknumatts_tmp;
1367 else
1368 ereport(ERROR,
1369 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1370 errmsg("input for number of primary key " \
1371 "attributes too large")));
1374 * Source array is made up of key values that will be used to locate the
1375 * tuple of interest from the local system.
1377 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1380 * There should be one source array key value for each key attnum
1382 if (src_nitems != pknumatts)
1383 ereport(ERROR,
1384 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1385 errmsg("source key array length must match number of key " \
1386 "attributes")));
1389 * Target array is made up of key values that will be used to build the
1390 * SQL string for use on the remote system.
1392 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1395 * There should be one target array key value for each key attnum
1397 if (tgt_nitems != pknumatts)
1398 ereport(ERROR,
1399 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1400 errmsg("target key array length must match number of key " \
1401 "attributes")));
1404 * Prep work is finally done. Go get the SQL string.
1406 sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1409 * And send it
1411 PG_RETURN_TEXT_P(cstring_to_text(sql));
1416 * dblink_build_sql_delete
1418 * Used to generate an SQL delete statement.
1419 * This is useful for selectively replicating a
1420 * delete to another server via dblink.
1422 * API:
1423 * <relname> - name of remote table of interest
1424 * <pkattnums> - an int2vector of attnums which will be used
1425 * to identify the remote tuple of interest
1426 * <pknumatts> - number of attnums in pkattnums
1427 * <tgt_pkattvals_arry> - text array of key values which will be used
1428 * to build the string for execution remotely.
1430 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1431 Datum
1432 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1434 text *relname_text = PG_GETARG_TEXT_P(0);
1435 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1436 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1437 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1438 Oid relid;
1439 int16 pknumatts = 0;
1440 char **tgt_pkattvals;
1441 int tgt_nitems;
1442 char *sql;
1445 * Convert relname to rel OID.
1447 relid = get_relid_from_relname(relname_text);
1448 if (!OidIsValid(relid))
1449 ereport(ERROR,
1450 (errcode(ERRCODE_UNDEFINED_TABLE),
1451 errmsg("relation \"%s\" does not exist",
1452 text_to_cstring(relname_text))));
1455 * There should be at least one key attribute
1457 if (pknumatts_tmp <= 0)
1458 ereport(ERROR,
1459 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1460 errmsg("number of key attributes must be > 0")));
1462 if (pknumatts_tmp <= SHRT_MAX)
1463 pknumatts = pknumatts_tmp;
1464 else
1465 ereport(ERROR,
1466 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1467 errmsg("input for number of primary key " \
1468 "attributes too large")));
1471 * Target array is made up of key values that will be used to build the
1472 * SQL string for use on the remote system.
1474 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1477 * There should be one target array key value for each key attnum
1479 if (tgt_nitems != pknumatts)
1480 ereport(ERROR,
1481 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1482 errmsg("target key array length must match number of key " \
1483 "attributes")));
1486 * Prep work is finally done. Go get the SQL string.
1488 sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
1491 * And send it
1493 PG_RETURN_TEXT_P(cstring_to_text(sql));
1498 * dblink_build_sql_update
1500 * Used to generate an SQL update statement
1501 * based on an existing tuple in a local relation.
1502 * This is useful for selectively replicating data
1503 * to another server via dblink.
1505 * API:
1506 * <relname> - name of local table of interest
1507 * <pkattnums> - an int2vector of attnums which will be used
1508 * to identify the local tuple of interest
1509 * <pknumatts> - number of attnums in pkattnums
1510 * <src_pkattvals_arry> - text array of key values which will be used
1511 * to identify the local tuple of interest
1512 * <tgt_pkattvals_arry> - text array of key values which will be used
1513 * to build the string for execution remotely. These are substituted
1514 * for their counterparts in src_pkattvals_arry
1516 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1517 Datum
1518 dblink_build_sql_update(PG_FUNCTION_ARGS)
1520 text *relname_text = PG_GETARG_TEXT_P(0);
1521 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1522 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1523 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1524 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1525 Oid relid;
1526 int16 pknumatts = 0;
1527 char **src_pkattvals;
1528 char **tgt_pkattvals;
1529 int src_nitems;
1530 int tgt_nitems;
1531 char *sql;
1534 * Convert relname to rel OID.
1536 relid = get_relid_from_relname(relname_text);
1537 if (!OidIsValid(relid))
1538 ereport(ERROR,
1539 (errcode(ERRCODE_UNDEFINED_TABLE),
1540 errmsg("relation \"%s\" does not exist",
1541 text_to_cstring(relname_text))));
1544 * There should be one source array key values for each key attnum
1546 if (pknumatts_tmp <= 0)
1547 ereport(ERROR,
1548 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1549 errmsg("number of key attributes must be > 0")));
1551 if (pknumatts_tmp <= SHRT_MAX)
1552 pknumatts = pknumatts_tmp;
1553 else
1554 ereport(ERROR,
1555 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1556 errmsg("input for number of primary key " \
1557 "attributes too large")));
1560 * Source array is made up of key values that will be used to locate the
1561 * tuple of interest from the local system.
1563 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1566 * There should be one source array key value for each key attnum
1568 if (src_nitems != pknumatts)
1569 ereport(ERROR,
1570 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1571 errmsg("source key array length must match number of key " \
1572 "attributes")));
1575 * Target array is made up of key values that will be used to build the
1576 * SQL string for use on the remote system.
1578 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1581 * There should be one target array key value for each key attnum
1583 if (tgt_nitems != pknumatts)
1584 ereport(ERROR,
1585 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1586 errmsg("target key array length must match number of key " \
1587 "attributes")));
1590 * Prep work is finally done. Go get the SQL string.
1592 sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1595 * And send it
1597 PG_RETURN_TEXT_P(cstring_to_text(sql));
1600 /*************************************************************
1601 * internal functions
1606 * get_pkey_attnames
1608 * Get the primary key attnames for the given relation.
1609 * Return NULL, and set numatts = 0, if no primary key exists.
1611 static char **
1612 get_pkey_attnames(Oid relid, int16 *numatts)
1614 Relation indexRelation;
1615 ScanKeyData skey;
1616 SysScanDesc scan;
1617 HeapTuple indexTuple;
1618 int i;
1619 char **result = NULL;
1620 Relation rel;
1621 TupleDesc tupdesc;
1622 AclResult aclresult;
1624 /* initialize numatts to 0 in case no primary key exists */
1625 *numatts = 0;
1627 /* open relation using relid, check permissions, get tupdesc */
1628 rel = relation_open(relid, AccessShareLock);
1630 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1631 ACL_SELECT);
1632 if (aclresult != ACLCHECK_OK)
1633 aclcheck_error(aclresult, ACL_KIND_CLASS,
1634 RelationGetRelationName(rel));
1636 tupdesc = rel->rd_att;
1638 /* Prepare to scan pg_index for entries having indrelid = this rel. */
1639 indexRelation = heap_open(IndexRelationId, AccessShareLock);
1640 ScanKeyInit(&skey,
1641 Anum_pg_index_indrelid,
1642 BTEqualStrategyNumber, F_OIDEQ,
1643 ObjectIdGetDatum(relid));
1645 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
1646 SnapshotNow, 1, &skey);
1648 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
1650 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
1652 /* we're only interested if it is the primary key */
1653 if (index->indisprimary)
1655 *numatts = index->indnatts;
1656 if (*numatts > 0)
1658 result = (char **) palloc(*numatts * sizeof(char *));
1660 for (i = 0; i < *numatts; i++)
1661 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
1663 break;
1667 systable_endscan(scan);
1668 heap_close(indexRelation, AccessShareLock);
1669 relation_close(rel, AccessShareLock);
1671 return result;
1675 * Deconstruct a text[] into C-strings (note any NULL elements will be
1676 * returned as NULL pointers)
1678 static char **
1679 get_text_array_contents(ArrayType *array, int *numitems)
1681 int ndim = ARR_NDIM(array);
1682 int *dims = ARR_DIMS(array);
1683 int nitems;
1684 int16 typlen;
1685 bool typbyval;
1686 char typalign;
1687 char **values;
1688 char *ptr;
1689 bits8 *bitmap;
1690 int bitmask;
1691 int i;
1693 Assert(ARR_ELEMTYPE(array) == TEXTOID);
1695 *numitems = nitems = ArrayGetNItems(ndim, dims);
1697 get_typlenbyvalalign(ARR_ELEMTYPE(array),
1698 &typlen, &typbyval, &typalign);
1700 values = (char **) palloc(nitems * sizeof(char *));
1702 ptr = ARR_DATA_PTR(array);
1703 bitmap = ARR_NULLBITMAP(array);
1704 bitmask = 1;
1706 for (i = 0; i < nitems; i++)
1708 if (bitmap && (*bitmap & bitmask) == 0)
1710 values[i] = NULL;
1712 else
1714 values[i] = TextDatumGetCString(PointerGetDatum(ptr));
1715 ptr = att_addlength_pointer(ptr, typlen, ptr);
1716 ptr = (char *) att_align_nominal(ptr, typalign);
1719 /* advance bitmap pointer if any */
1720 if (bitmap)
1722 bitmask <<= 1;
1723 if (bitmask == 0x100)
1725 bitmap++;
1726 bitmask = 1;
1731 return values;
1734 static char *
1735 get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1737 Relation rel;
1738 char *relname;
1739 HeapTuple tuple;
1740 TupleDesc tupdesc;
1741 int natts;
1742 StringInfoData buf;
1743 char *val;
1744 int16 key;
1745 int i;
1746 bool needComma;
1748 initStringInfo(&buf);
1750 /* get relation name including any needed schema prefix and quoting */
1751 relname = generate_relation_name(relid);
1754 * Open relation using relid
1756 rel = relation_open(relid, AccessShareLock);
1757 tupdesc = rel->rd_att;
1758 natts = tupdesc->natts;
1760 tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
1761 if (!tuple)
1762 ereport(ERROR,
1763 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1764 errmsg("source row not found")));
1766 appendStringInfo(&buf, "INSERT INTO %s(", relname);
1768 needComma = false;
1769 for (i = 0; i < natts; i++)
1771 if (tupdesc->attrs[i]->attisdropped)
1772 continue;
1774 if (needComma)
1775 appendStringInfo(&buf, ",");
1777 appendStringInfoString(&buf,
1778 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1779 needComma = true;
1782 appendStringInfo(&buf, ") VALUES(");
1785 * remember attvals are 1 based
1787 needComma = false;
1788 for (i = 0; i < natts; i++)
1790 if (tupdesc->attrs[i]->attisdropped)
1791 continue;
1793 if (needComma)
1794 appendStringInfo(&buf, ",");
1796 if (tgt_pkattvals != NULL)
1797 key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
1798 else
1799 key = -1;
1801 if (key > -1)
1802 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1803 else
1804 val = SPI_getvalue(tuple, tupdesc, i + 1);
1806 if (val != NULL)
1808 appendStringInfoString(&buf, quote_literal_cstr(val));
1809 pfree(val);
1811 else
1812 appendStringInfo(&buf, "NULL");
1813 needComma = true;
1815 appendStringInfo(&buf, ")");
1817 relation_close(rel, AccessShareLock);
1818 return (buf.data);
1821 static char *
1822 get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
1824 Relation rel;
1825 char *relname;
1826 TupleDesc tupdesc;
1827 int natts;
1828 StringInfoData buf;
1829 int i;
1831 initStringInfo(&buf);
1833 /* get relation name including any needed schema prefix and quoting */
1834 relname = generate_relation_name(relid);
1837 * Open relation using relid
1839 rel = relation_open(relid, AccessShareLock);
1840 tupdesc = rel->rd_att;
1841 natts = tupdesc->natts;
1843 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
1844 for (i = 0; i < pknumatts; i++)
1846 int16 pkattnum = pkattnums->values[i];
1848 if (i > 0)
1849 appendStringInfo(&buf, " AND ");
1851 appendStringInfoString(&buf,
1852 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
1854 if (tgt_pkattvals == NULL)
1855 /* internal error */
1856 elog(ERROR, "target key array must not be NULL");
1858 if (tgt_pkattvals[i] != NULL)
1859 appendStringInfo(&buf, " = %s",
1860 quote_literal_cstr(tgt_pkattvals[i]));
1861 else
1862 appendStringInfo(&buf, " IS NULL");
1865 relation_close(rel, AccessShareLock);
1866 return (buf.data);
1869 static char *
1870 get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1872 Relation rel;
1873 char *relname;
1874 HeapTuple tuple;
1875 TupleDesc tupdesc;
1876 int natts;
1877 StringInfoData buf;
1878 char *val;
1879 int16 key;
1880 int i;
1881 bool needComma;
1883 initStringInfo(&buf);
1885 /* get relation name including any needed schema prefix and quoting */
1886 relname = generate_relation_name(relid);
1889 * Open relation using relid
1891 rel = relation_open(relid, AccessShareLock);
1892 tupdesc = rel->rd_att;
1893 natts = tupdesc->natts;
1895 tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
1896 if (!tuple)
1897 ereport(ERROR,
1898 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1899 errmsg("source row not found")));
1901 appendStringInfo(&buf, "UPDATE %s SET ", relname);
1903 needComma = false;
1904 for (i = 0; i < natts; i++)
1906 if (tupdesc->attrs[i]->attisdropped)
1907 continue;
1909 if (needComma)
1910 appendStringInfo(&buf, ", ");
1912 appendStringInfo(&buf, "%s = ",
1913 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1915 if (tgt_pkattvals != NULL)
1916 key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
1917 else
1918 key = -1;
1920 if (key > -1)
1921 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1922 else
1923 val = SPI_getvalue(tuple, tupdesc, i + 1);
1925 if (val != NULL)
1927 appendStringInfoString(&buf, quote_literal_cstr(val));
1928 pfree(val);
1930 else
1931 appendStringInfoString(&buf, "NULL");
1932 needComma = true;
1935 appendStringInfo(&buf, " WHERE ");
1937 for (i = 0; i < pknumatts; i++)
1939 int16 pkattnum = pkattnums->values[i];
1941 if (i > 0)
1942 appendStringInfo(&buf, " AND ");
1944 appendStringInfo(&buf, "%s",
1945 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
1947 if (tgt_pkattvals != NULL)
1948 val = tgt_pkattvals[i] ? pstrdup(tgt_pkattvals[i]) : NULL;
1949 else
1950 val = SPI_getvalue(tuple, tupdesc, pkattnum);
1952 if (val != NULL)
1954 appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
1955 pfree(val);
1957 else
1958 appendStringInfo(&buf, " IS NULL");
1961 relation_close(rel, AccessShareLock);
1962 return (buf.data);
1966 * Return a properly quoted literal value.
1967 * Uses quote_literal in quote.c
1969 static char *
1970 quote_literal_cstr(char *rawstr)
1972 text *rawstr_text;
1973 text *result_text;
1974 char *result;
1976 rawstr_text = cstring_to_text(rawstr);
1977 result_text = DatumGetTextP(DirectFunctionCall1(quote_literal,
1978 PointerGetDatum(rawstr_text)));
1979 result = text_to_cstring(result_text);
1981 return result;
1985 * Return a properly quoted identifier.
1986 * Uses quote_ident in quote.c
1988 static char *
1989 quote_ident_cstr(char *rawstr)
1991 text *rawstr_text;
1992 text *result_text;
1993 char *result;
1995 rawstr_text = cstring_to_text(rawstr);
1996 result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
1997 PointerGetDatum(rawstr_text)));
1998 result = text_to_cstring(result_text);
2000 return result;
2003 static int16
2004 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key)
2006 int i;
2009 * Not likely a long list anyway, so just scan for the value
2011 for (i = 0; i < pknumatts; i++)
2012 if (key == pkattnums->values[i])
2013 return i;
2015 return -1;
2018 static HeapTuple
2019 get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
2021 Relation rel;
2022 char *relname;
2023 TupleDesc tupdesc;
2024 StringInfoData buf;
2025 int ret;
2026 HeapTuple tuple;
2027 int i;
2029 initStringInfo(&buf);
2031 /* get relation name including any needed schema prefix and quoting */
2032 relname = generate_relation_name(relid);
2035 * Open relation using relid
2037 rel = relation_open(relid, AccessShareLock);
2038 tupdesc = CreateTupleDescCopy(rel->rd_att);
2039 relation_close(rel, AccessShareLock);
2042 * Connect to SPI manager
2044 if ((ret = SPI_connect()) < 0)
2045 /* internal error */
2046 elog(ERROR, "SPI connect failure - returned %d", ret);
2049 * Build sql statement to look up tuple of interest Use src_pkattvals as
2050 * the criteria.
2052 appendStringInfo(&buf, "SELECT * FROM %s WHERE ", relname);
2054 for (i = 0; i < pknumatts; i++)
2056 int16 pkattnum = pkattnums->values[i];
2058 if (i > 0)
2059 appendStringInfo(&buf, " AND ");
2061 appendStringInfoString(&buf,
2062 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
2064 if (src_pkattvals[i] != NULL)
2065 appendStringInfo(&buf, " = %s",
2066 quote_literal_cstr(src_pkattvals[i]));
2067 else
2068 appendStringInfo(&buf, " IS NULL");
2072 * Retrieve the desired tuple
2074 ret = SPI_exec(buf.data, 0);
2075 pfree(buf.data);
2078 * Only allow one qualifying tuple
2080 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2081 ereport(ERROR,
2082 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2083 errmsg("source criteria matched more than one record")));
2085 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2087 SPITupleTable *tuptable = SPI_tuptable;
2089 tuple = SPI_copytuple(tuptable->vals[0]);
2090 SPI_finish();
2092 return tuple;
2094 else
2097 * no qualifying tuples
2099 SPI_finish();
2101 return NULL;
2105 * never reached, but keep compiler quiet
2107 return NULL;
2110 static Oid
2111 get_relid_from_relname(text *relname_text)
2113 RangeVar *relvar;
2114 Relation rel;
2115 Oid relid;
2117 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2118 rel = heap_openrv(relvar, AccessShareLock);
2119 relid = RelationGetRelid(rel);
2120 relation_close(rel, AccessShareLock);
2122 return relid;
2126 * generate_relation_name - copied from ruleutils.c
2127 * Compute the name to display for a relation specified by OID
2129 * The result includes all necessary quoting and schema-prefixing.
2131 static char *
2132 generate_relation_name(Oid relid)
2134 HeapTuple tp;
2135 Form_pg_class reltup;
2136 char *nspname;
2137 char *result;
2139 tp = SearchSysCache(RELOID,
2140 ObjectIdGetDatum(relid),
2141 0, 0, 0);
2142 if (!HeapTupleIsValid(tp))
2143 elog(ERROR, "cache lookup failed for relation %u", relid);
2145 reltup = (Form_pg_class) GETSTRUCT(tp);
2147 /* Qualify the name if not visible in search path */
2148 if (RelationIsVisible(relid))
2149 nspname = NULL;
2150 else
2151 nspname = get_namespace_name(reltup->relnamespace);
2153 result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
2155 ReleaseSysCache(tp);
2157 return result;
2161 static remoteConn *
2162 getConnectionByName(const char *name)
2164 remoteConnHashEnt *hentry;
2165 char key[NAMEDATALEN];
2167 if (!remoteConnHash)
2168 remoteConnHash = createConnHash();
2170 MemSet(key, 0, NAMEDATALEN);
2171 snprintf(key, NAMEDATALEN - 1, "%s", name);
2172 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2173 key, HASH_FIND, NULL);
2175 if (hentry)
2176 return (hentry->rconn);
2178 return (NULL);
2181 static HTAB *
2182 createConnHash(void)
2184 HASHCTL ctl;
2186 ctl.keysize = NAMEDATALEN;
2187 ctl.entrysize = sizeof(remoteConnHashEnt);
2189 return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2192 static void
2193 createNewConnection(const char *name, remoteConn * rconn)
2195 remoteConnHashEnt *hentry;
2196 bool found;
2197 char key[NAMEDATALEN];
2199 if (!remoteConnHash)
2200 remoteConnHash = createConnHash();
2202 MemSet(key, 0, NAMEDATALEN);
2203 snprintf(key, NAMEDATALEN - 1, "%s", name);
2204 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2205 HASH_ENTER, &found);
2207 if (found)
2208 ereport(ERROR,
2209 (errcode(ERRCODE_DUPLICATE_OBJECT),
2210 errmsg("duplicate connection name")));
2212 hentry->rconn = rconn;
2213 strlcpy(hentry->name, name, sizeof(hentry->name));
2216 static void
2217 deleteConnection(const char *name)
2219 remoteConnHashEnt *hentry;
2220 bool found;
2221 char key[NAMEDATALEN];
2223 if (!remoteConnHash)
2224 remoteConnHash = createConnHash();
2226 MemSet(key, 0, NAMEDATALEN);
2227 snprintf(key, NAMEDATALEN - 1, "%s", name);
2229 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2230 key, HASH_REMOVE, &found);
2232 if (!hentry)
2233 ereport(ERROR,
2234 (errcode(ERRCODE_UNDEFINED_OBJECT),
2235 errmsg("undefined connection name")));
2239 static void
2240 dblink_security_check(PGconn *conn, remoteConn *rconn)
2242 if (!superuser())
2244 if (!PQconnectionUsedPassword(conn))
2246 PQfinish(conn);
2247 if (rconn)
2248 pfree(rconn);
2250 ereport(ERROR,
2251 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2252 errmsg("password is required"),
2253 errdetail("Non-superuser cannot connect if the server does not request a password."),
2254 errhint("Target server's authentication method must be changed.")));
2260 * For non-superusers, insist that the connstr specify a password. This
2261 * prevents a password from being picked up from .pgpass, a service file,
2262 * the environment, etc. We don't want the postgres user's passwords
2263 * to be accessible to non-superusers.
2265 static void
2266 dblink_connstr_check(const char *connstr)
2268 if (!superuser())
2270 PQconninfoOption *options;
2271 PQconninfoOption *option;
2272 bool connstr_gives_password = false;
2274 options = PQconninfoParse(connstr, NULL);
2275 if (options)
2277 for (option = options; option->keyword != NULL; option++)
2279 if (strcmp(option->keyword, "password") == 0)
2281 if (option->val != NULL && option->val[0] != '\0')
2283 connstr_gives_password = true;
2284 break;
2288 PQconninfoFree(options);
2291 if (!connstr_gives_password)
2292 ereport(ERROR,
2293 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2294 errmsg("password is required"),
2295 errdetail("Non-superusers must provide a password in the connection string.")));
2299 static void
2300 dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
2302 int level;
2303 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2304 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2305 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2306 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2307 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2308 int sqlstate;
2309 char *message_primary;
2310 char *message_detail;
2311 char *message_hint;
2312 char *message_context;
2313 const char *dblink_context_conname = "unnamed";
2315 if (fail)
2316 level = ERROR;
2317 else
2318 level = NOTICE;
2320 if (pg_diag_sqlstate)
2321 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2322 pg_diag_sqlstate[1],
2323 pg_diag_sqlstate[2],
2324 pg_diag_sqlstate[3],
2325 pg_diag_sqlstate[4]);
2326 else
2327 sqlstate = ERRCODE_CONNECTION_FAILURE;
2329 xpstrdup(message_primary, pg_diag_message_primary);
2330 xpstrdup(message_detail, pg_diag_message_detail);
2331 xpstrdup(message_hint, pg_diag_message_hint);
2332 xpstrdup(message_context, pg_diag_context);
2334 if (res)
2335 PQclear(res);
2337 if (conname)
2338 dblink_context_conname = conname;
2340 ereport(level,
2341 (errcode(sqlstate),
2342 message_primary ? errmsg("%s", message_primary) : errmsg("unknown error"),
2343 message_detail ? errdetail("%s", message_detail) : 0,
2344 message_hint ? errhint("%s", message_hint) : 0,
2345 message_context ? errcontext("%s", message_context) : 0,
2346 errcontext("Error occurred on dblink connection named \"%s\": %s.",
2347 dblink_context_conname, dblink_context_msg)));