4 * Functions returning results from a remote database
6 * Joe Conway <mail@joeconway.com>
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
12 * Copyright (c) 2001-2009, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
40 #include "access/genam.h"
41 #include "access/heapam.h"
42 #include "access/tupdesc.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_index.h"
46 #include "catalog/pg_type.h"
47 #include "executor/executor.h"
48 #include "executor/spi.h"
49 #include "foreign/foreign.h"
50 #include "lib/stringinfo.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "nodes/execnodes.h"
54 #include "nodes/nodes.h"
55 #include "nodes/pg_list.h"
56 #include "parser/parse_type.h"
57 #include "utils/acl.h"
58 #include "utils/array.h"
59 #include "utils/builtins.h"
60 #include "utils/dynahash.h"
61 #include "utils/fmgroids.h"
62 #include "utils/hsearch.h"
63 #include "utils/lsyscache.h"
64 #include "utils/memutils.h"
65 #include "utils/syscache.h"
66 #include "utils/tqual.h"
72 typedef struct remoteConn
74 PGconn
*conn
; /* Hold the remote connection */
75 int openCursorCount
; /* The number of open cursors */
76 bool newXactForCursor
; /* Opened a transaction for a cursor */
80 * Internal declarations
82 static Datum
dblink_record_internal(FunctionCallInfo fcinfo
, bool is_async
);
83 static remoteConn
*getConnectionByName(const char *name
);
84 static HTAB
*createConnHash(void);
85 static void createNewConnection(const char *name
, remoteConn
*rconn
);
86 static void deleteConnection(const char *name
);
87 static char **get_pkey_attnames(Oid relid
, int16
*numatts
);
88 static char **get_text_array_contents(ArrayType
*array
, int *numitems
);
89 static char *get_sql_insert(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
);
90 static char *get_sql_delete(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **tgt_pkattvals
);
91 static char *get_sql_update(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
);
92 static char *quote_literal_cstr(char *rawstr
);
93 static char *quote_ident_cstr(char *rawstr
);
94 static int16
get_attnum_pk_pos(int2vector
*pkattnums
, int16 pknumatts
, int16 key
);
95 static HeapTuple
get_tuple_of_interest(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
);
96 static Oid
get_relid_from_relname(text
*relname_text
);
97 static char *generate_relation_name(Oid relid
);
98 static void dblink_connstr_check(const char *connstr
);
99 static void dblink_security_check(PGconn
*conn
, remoteConn
*rconn
);
100 static void dblink_res_error(const char *conname
, PGresult
*res
, const char *dblink_context_msg
, bool fail
);
101 static char *get_connect_string(const char *servername
);
102 static char *escape_param_str(const char *from
);
105 static remoteConn
*pconn
= NULL
;
106 static HTAB
*remoteConnHash
= NULL
;
109 * Following is list that holds multiple remote connections.
110 * Calling convention of each dblink function changes to accept
111 * connection name as the first parameter. The connection list is
112 * much like ecpg e.g. a mapping between a name and a PGconn object.
115 typedef struct remoteConnHashEnt
117 char name
[NAMEDATALEN
];
121 /* initial number of connection hashes */
124 /* general utility */
125 #define xpfree(var_) \
134 #define xpstrdup(var_c, var_) \
137 var_c = pstrdup(var_); \
142 #define DBLINK_RES_INTERNALERROR(p2) \
144 msg = pstrdup(PQerrorMessage(conn)); \
147 elog(ERROR, "%s: %s", p2, msg); \
150 #define DBLINK_CONN_NOT_AVAIL \
154 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
155 errmsg("connection \"%s\" not available", conname))); \
158 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
159 errmsg("connection not available"))); \
162 #define DBLINK_GET_CONN \
164 char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
165 rconn = getConnectionByName(conname_or_str); \
168 conn = rconn->conn; \
172 connstr = get_connect_string(conname_or_str); \
173 if (connstr == NULL) \
175 connstr = conname_or_str; \
177 dblink_connstr_check(connstr); \
178 conn = PQconnectdb(connstr); \
179 if (PQstatus(conn) == CONNECTION_BAD) \
181 msg = pstrdup(PQerrorMessage(conn)); \
184 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
185 errmsg("could not establish connection"), \
186 errdetail("%s", msg))); \
188 dblink_security_check(conn, rconn); \
189 PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
194 #define DBLINK_GET_NAMED_CONN \
196 char *conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
197 rconn = getConnectionByName(conname); \
199 conn = rconn->conn; \
201 DBLINK_CONN_NOT_AVAIL; \
204 #define DBLINK_INIT \
208 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
209 pconn->conn = NULL; \
210 pconn->openCursorCount = 0; \
211 pconn->newXactForCursor = FALSE; \
216 * Create a persistent connection to another database
218 PG_FUNCTION_INFO_V1(dblink_connect
);
220 dblink_connect(PG_FUNCTION_ARGS
)
222 char *conname_or_str
= NULL
;
223 char *connstr
= NULL
;
224 char *connname
= NULL
;
227 remoteConn
*rconn
= NULL
;
233 conname_or_str
= text_to_cstring(PG_GETARG_TEXT_PP(1));
234 connname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
236 else if (PG_NARGS() == 1)
237 conname_or_str
= text_to_cstring(PG_GETARG_TEXT_PP(0));
240 rconn
= (remoteConn
*) MemoryContextAlloc(TopMemoryContext
,
243 /* first check for valid foreign data server */
244 connstr
= get_connect_string(conname_or_str
);
246 connstr
= conname_or_str
;
248 /* check password in connection string if not superuser */
249 dblink_connstr_check(connstr
);
250 conn
= PQconnectdb(connstr
);
252 if (PQstatus(conn
) == CONNECTION_BAD
)
254 msg
= pstrdup(PQerrorMessage(conn
));
260 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
),
261 errmsg("could not establish connection"),
262 errdetail("%s", msg
)));
265 /* check password actually used if not superuser */
266 dblink_security_check(conn
, rconn
);
268 /* attempt to set client encoding to match server encoding */
269 PQsetClientEncoding(conn
, GetDatabaseEncodingName());
274 createNewConnection(connname
, rconn
);
279 PG_RETURN_TEXT_P(cstring_to_text("OK"));
283 * Clear a persistent connection to another database
285 PG_FUNCTION_INFO_V1(dblink_disconnect
);
287 dblink_disconnect(PG_FUNCTION_ARGS
)
289 char *conname
= NULL
;
290 remoteConn
*rconn
= NULL
;
297 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
298 rconn
= getConnectionByName(conname
);
306 DBLINK_CONN_NOT_AVAIL
;
311 deleteConnection(conname
);
317 PG_RETURN_TEXT_P(cstring_to_text("OK"));
321 * opens a cursor using a persistent connection
323 PG_FUNCTION_INFO_V1(dblink_open
);
325 dblink_open(PG_FUNCTION_ARGS
)
328 PGresult
*res
= NULL
;
330 char *curname
= NULL
;
332 char *conname
= NULL
;
334 remoteConn
*rconn
= NULL
;
335 bool fail
= true; /* default to backward compatible behavior */
338 initStringInfo(&buf
);
343 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
344 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
347 else if (PG_NARGS() == 3)
349 /* might be text,text,text or text,text,bool */
350 if (get_fn_expr_argtype(fcinfo
->flinfo
, 2) == BOOLOID
)
352 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
353 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
354 fail
= PG_GETARG_BOOL(2);
359 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
360 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
361 sql
= text_to_cstring(PG_GETARG_TEXT_PP(2));
362 rconn
= getConnectionByName(conname
);
365 else if (PG_NARGS() == 4)
367 /* text,text,text,bool */
368 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
369 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
370 sql
= text_to_cstring(PG_GETARG_TEXT_PP(2));
371 fail
= PG_GETARG_BOOL(3);
372 rconn
= getConnectionByName(conname
);
375 if (!rconn
|| !rconn
->conn
)
376 DBLINK_CONN_NOT_AVAIL
;
380 /* If we are not in a transaction, start one */
381 if (PQtransactionStatus(conn
) == PQTRANS_IDLE
)
383 res
= PQexec(conn
, "BEGIN");
384 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
385 DBLINK_RES_INTERNALERROR("begin error");
387 rconn
->newXactForCursor
= TRUE
;
390 * Since transaction state was IDLE, we force cursor count to
391 * initially be 0. This is needed as a previous ABORT might have wiped
392 * out our transaction without maintaining the cursor count for us.
394 rconn
->openCursorCount
= 0;
397 /* if we started a transaction, increment cursor count */
398 if (rconn
->newXactForCursor
)
399 (rconn
->openCursorCount
)++;
401 appendStringInfo(&buf
, "DECLARE %s CURSOR FOR %s", curname
, sql
);
402 res
= PQexec(conn
, buf
.data
);
403 if (!res
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
405 dblink_res_error(conname
, res
, "could not open cursor", fail
);
406 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
410 PG_RETURN_TEXT_P(cstring_to_text("OK"));
416 PG_FUNCTION_INFO_V1(dblink_close
);
418 dblink_close(PG_FUNCTION_ARGS
)
421 PGresult
*res
= NULL
;
422 char *curname
= NULL
;
423 char *conname
= NULL
;
426 remoteConn
*rconn
= NULL
;
427 bool fail
= true; /* default to backward compatible behavior */
430 initStringInfo(&buf
);
435 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
438 else if (PG_NARGS() == 2)
440 /* might be text,text or text,bool */
441 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
443 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
444 fail
= PG_GETARG_BOOL(1);
449 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
450 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
451 rconn
= getConnectionByName(conname
);
457 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
458 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
459 fail
= PG_GETARG_BOOL(2);
460 rconn
= getConnectionByName(conname
);
463 if (!rconn
|| !rconn
->conn
)
464 DBLINK_CONN_NOT_AVAIL
;
468 appendStringInfo(&buf
, "CLOSE %s", curname
);
470 /* close the cursor */
471 res
= PQexec(conn
, buf
.data
);
472 if (!res
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
474 dblink_res_error(conname
, res
, "could not close cursor", fail
);
475 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
480 /* if we started a transaction, decrement cursor count */
481 if (rconn
->newXactForCursor
)
483 (rconn
->openCursorCount
)--;
485 /* if count is zero, commit the transaction */
486 if (rconn
->openCursorCount
== 0)
488 rconn
->newXactForCursor
= FALSE
;
490 res
= PQexec(conn
, "COMMIT");
491 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
492 DBLINK_RES_INTERNALERROR("commit error");
497 PG_RETURN_TEXT_P(cstring_to_text("OK"));
501 * Fetch results from an open cursor
503 PG_FUNCTION_INFO_V1(dblink_fetch
);
505 dblink_fetch(PG_FUNCTION_ARGS
)
507 FuncCallContext
*funcctx
;
508 TupleDesc tupdesc
= NULL
;
511 AttInMetadata
*attinmeta
;
512 PGresult
*res
= NULL
;
513 MemoryContext oldcontext
;
514 char *conname
= NULL
;
515 remoteConn
*rconn
= NULL
;
519 /* stuff done only on the first call of the function */
520 if (SRF_IS_FIRSTCALL())
524 char *curname
= NULL
;
526 bool fail
= true; /* default to backward compatible */
530 /* text,text,int,bool */
531 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
532 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
533 howmany
= PG_GETARG_INT32(2);
534 fail
= PG_GETARG_BOOL(3);
536 rconn
= getConnectionByName(conname
);
540 else if (PG_NARGS() == 3)
542 /* text,text,int or text,int,bool */
543 if (get_fn_expr_argtype(fcinfo
->flinfo
, 2) == BOOLOID
)
545 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
546 howmany
= PG_GETARG_INT32(1);
547 fail
= PG_GETARG_BOOL(2);
552 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
553 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
554 howmany
= PG_GETARG_INT32(2);
556 rconn
= getConnectionByName(conname
);
561 else if (PG_NARGS() == 2)
564 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
565 howmany
= PG_GETARG_INT32(1);
570 DBLINK_CONN_NOT_AVAIL
;
572 initStringInfo(&buf
);
573 appendStringInfo(&buf
, "FETCH %d FROM %s", howmany
, curname
);
575 /* create a function context for cross-call persistence */
576 funcctx
= SRF_FIRSTCALL_INIT();
579 * Try to execute the query. Note that since libpq uses malloc, the
580 * PGresult will be long-lived even though we are still in a
581 * short-lived memory context.
583 res
= PQexec(conn
, buf
.data
);
585 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
586 PQresultStatus(res
) != PGRES_TUPLES_OK
))
588 dblink_res_error(conname
, res
, "could not fetch from cursor", fail
);
589 SRF_RETURN_DONE(funcctx
);
591 else if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
593 /* cursor does not exist - closed already or bad name */
596 (errcode(ERRCODE_INVALID_CURSOR_NAME
),
597 errmsg("cursor \"%s\" does not exist", curname
)));
600 funcctx
->max_calls
= PQntuples(res
);
602 /* got results, keep track of them */
603 funcctx
->user_fctx
= res
;
605 /* get a tuple descriptor for our result type */
606 switch (get_call_result_type(fcinfo
, NULL
, &tupdesc
))
608 case TYPEFUNC_COMPOSITE
:
611 case TYPEFUNC_RECORD
:
612 /* failed to determine actual type of RECORD */
614 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
615 errmsg("function returning record called in context "
616 "that cannot accept type record")));
619 /* result type isn't composite */
620 elog(ERROR
, "return type must be a row type");
624 /* check result and tuple descriptor have the same number of columns */
625 if (PQnfields(res
) != tupdesc
->natts
)
627 (errcode(ERRCODE_DATATYPE_MISMATCH
),
628 errmsg("remote query result rowtype does not match "
629 "the specified FROM clause rowtype")));
632 * fast track when no results. We could exit earlier, but then we'd
633 * not report error if the result tuple type is wrong.
635 if (funcctx
->max_calls
< 1)
638 SRF_RETURN_DONE(funcctx
);
642 * switch to memory context appropriate for multiple function calls,
643 * so we can make long-lived copy of tupdesc etc
645 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
647 /* make sure we have a persistent copy of the tupdesc */
648 tupdesc
= CreateTupleDescCopy(tupdesc
);
650 /* store needed metadata for subsequent calls */
651 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
652 funcctx
->attinmeta
= attinmeta
;
654 MemoryContextSwitchTo(oldcontext
);
657 /* stuff done on every call of the function */
658 funcctx
= SRF_PERCALL_SETUP();
661 * initialize per-call variables
663 call_cntr
= funcctx
->call_cntr
;
664 max_calls
= funcctx
->max_calls
;
666 res
= (PGresult
*) funcctx
->user_fctx
;
667 attinmeta
= funcctx
->attinmeta
;
668 tupdesc
= attinmeta
->tupdesc
;
670 if (call_cntr
< max_calls
) /* do when there is more left to send */
676 int nfields
= PQnfields(res
);
678 values
= (char **) palloc(nfields
* sizeof(char *));
679 for (i
= 0; i
< nfields
; i
++)
681 if (PQgetisnull(res
, call_cntr
, i
) == 0)
682 values
[i
] = PQgetvalue(res
, call_cntr
, i
);
687 /* build the tuple */
688 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
690 /* make the tuple into a datum */
691 result
= HeapTupleGetDatum(tuple
);
693 SRF_RETURN_NEXT(funcctx
, result
);
697 /* do when there is no more left */
699 SRF_RETURN_DONE(funcctx
);
704 * Note: this is the new preferred version of dblink
706 PG_FUNCTION_INFO_V1(dblink_record
);
708 dblink_record(PG_FUNCTION_ARGS
)
710 return dblink_record_internal(fcinfo
, false);
713 PG_FUNCTION_INFO_V1(dblink_send_query
);
715 dblink_send_query(PG_FUNCTION_ARGS
)
718 char *connstr
= NULL
;
720 remoteConn
*rconn
= NULL
;
722 bool freeconn
= false;
728 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
731 /* shouldn't happen */
732 elog(ERROR
, "wrong number of arguments");
734 /* async query send */
735 retval
= PQsendQuery(conn
, sql
);
737 elog(NOTICE
, "%s", PQerrorMessage(conn
));
739 PG_RETURN_INT32(retval
);
742 PG_FUNCTION_INFO_V1(dblink_get_result
);
744 dblink_get_result(PG_FUNCTION_ARGS
)
746 return dblink_record_internal(fcinfo
, true);
750 dblink_record_internal(FunctionCallInfo fcinfo
, bool is_async
)
752 FuncCallContext
*funcctx
;
753 TupleDesc tupdesc
= NULL
;
756 AttInMetadata
*attinmeta
;
758 PGresult
*res
= NULL
;
759 bool is_sql_cmd
= false;
760 char *sql_cmd_status
= NULL
;
761 MemoryContext oldcontext
;
762 bool freeconn
= false;
766 /* stuff done only on the first call of the function */
767 if (SRF_IS_FIRSTCALL())
770 char *connstr
= NULL
;
772 char *conname
= NULL
;
773 remoteConn
*rconn
= NULL
;
774 bool fail
= true; /* default to backward compatible */
776 /* create a function context for cross-call persistence */
777 funcctx
= SRF_FIRSTCALL_INIT();
780 * switch to memory context appropriate for multiple function calls
782 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
790 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
791 fail
= PG_GETARG_BOOL(2);
793 else if (PG_NARGS() == 2)
795 /* text,text or text,bool */
796 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
799 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
800 fail
= PG_GETARG_BOOL(1);
805 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
808 else if (PG_NARGS() == 1)
812 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
815 /* shouldn't happen */
816 elog(ERROR
, "wrong number of arguments");
820 /* get async result */
825 fail
= PG_GETARG_BOOL(1);
827 else if (PG_NARGS() == 1)
833 /* shouldn't happen */
834 elog(ERROR
, "wrong number of arguments");
838 DBLINK_CONN_NOT_AVAIL
;
840 /* synchronous query, or async result retrieval */
842 res
= PQexec(conn
, sql
);
845 res
= PQgetResult(conn
);
846 /* NULL means we're all done with the async results */
849 MemoryContextSwitchTo(oldcontext
);
850 SRF_RETURN_DONE(funcctx
);
855 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
856 PQresultStatus(res
) != PGRES_TUPLES_OK
))
858 dblink_res_error(conname
, res
, "could not execute query", fail
);
861 MemoryContextSwitchTo(oldcontext
);
862 SRF_RETURN_DONE(funcctx
);
865 if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
869 /* need a tuple descriptor representing one TEXT column */
870 tupdesc
= CreateTemplateTupleDesc(1, false);
871 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "status",
875 * and save a copy of the command status string to return as our
878 sql_cmd_status
= PQcmdStatus(res
);
879 funcctx
->max_calls
= 1;
882 funcctx
->max_calls
= PQntuples(res
);
884 /* got results, keep track of them */
885 funcctx
->user_fctx
= res
;
887 /* if needed, close the connection to the database and cleanup */
893 /* get a tuple descriptor for our result type */
894 switch (get_call_result_type(fcinfo
, NULL
, &tupdesc
))
896 case TYPEFUNC_COMPOSITE
:
899 case TYPEFUNC_RECORD
:
900 /* failed to determine actual type of RECORD */
902 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
903 errmsg("function returning record called in context "
904 "that cannot accept type record")));
907 /* result type isn't composite */
908 elog(ERROR
, "return type must be a row type");
912 /* make sure we have a persistent copy of the tupdesc */
913 tupdesc
= CreateTupleDescCopy(tupdesc
);
917 * check result and tuple descriptor have the same number of columns
919 if (PQnfields(res
) != tupdesc
->natts
)
921 (errcode(ERRCODE_DATATYPE_MISMATCH
),
922 errmsg("remote query result rowtype does not match "
923 "the specified FROM clause rowtype")));
925 /* fast track when no results */
926 if (funcctx
->max_calls
< 1)
930 MemoryContextSwitchTo(oldcontext
);
931 SRF_RETURN_DONE(funcctx
);
934 /* store needed metadata for subsequent calls */
935 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
936 funcctx
->attinmeta
= attinmeta
;
938 MemoryContextSwitchTo(oldcontext
);
942 /* stuff done on every call of the function */
943 funcctx
= SRF_PERCALL_SETUP();
946 * initialize per-call variables
948 call_cntr
= funcctx
->call_cntr
;
949 max_calls
= funcctx
->max_calls
;
951 res
= (PGresult
*) funcctx
->user_fctx
;
952 attinmeta
= funcctx
->attinmeta
;
953 tupdesc
= attinmeta
->tupdesc
;
955 if (call_cntr
< max_calls
) /* do when there is more left to send */
964 int nfields
= PQnfields(res
);
966 values
= (char **) palloc(nfields
* sizeof(char *));
967 for (i
= 0; i
< nfields
; i
++)
969 if (PQgetisnull(res
, call_cntr
, i
) == 0)
970 values
[i
] = PQgetvalue(res
, call_cntr
, i
);
977 values
= (char **) palloc(1 * sizeof(char *));
978 values
[0] = sql_cmd_status
;
981 /* build the tuple */
982 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
984 /* make the tuple into a datum */
985 result
= HeapTupleGetDatum(tuple
);
987 SRF_RETURN_NEXT(funcctx
, result
);
991 /* do when there is no more left */
993 SRF_RETURN_DONE(funcctx
);
998 * List all open dblink connections by name.
999 * Returns an array of all connection names.
1002 PG_FUNCTION_INFO_V1(dblink_get_connections
);
1004 dblink_get_connections(PG_FUNCTION_ARGS
)
1006 HASH_SEQ_STATUS status
;
1007 remoteConnHashEnt
*hentry
;
1008 ArrayBuildState
*astate
= NULL
;
1012 hash_seq_init(&status
, remoteConnHash
);
1013 while ((hentry
= (remoteConnHashEnt
*) hash_seq_search(&status
)) != NULL
)
1015 /* stash away current value */
1016 astate
= accumArrayResult(astate
,
1017 CStringGetTextDatum(hentry
->name
),
1018 false, TEXTOID
, CurrentMemoryContext
);
1023 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate
,
1024 CurrentMemoryContext
));
1030 * Checks if a given remote connection is busy
1032 * Returns 1 if the connection is busy, 0 otherwise
1034 * text connection_name - name of the connection to check
1037 PG_FUNCTION_INFO_V1(dblink_is_busy
);
1039 dblink_is_busy(PG_FUNCTION_ARGS
)
1041 PGconn
*conn
= NULL
;
1042 remoteConn
*rconn
= NULL
;
1045 DBLINK_GET_NAMED_CONN
;
1047 PQconsumeInput(conn
);
1048 PG_RETURN_INT32(PQisBusy(conn
));
1052 * Cancels a running request on a connection
1055 * "OK" if the cancel request has been sent correctly,
1056 * an error message otherwise
1059 * text connection_name - name of the connection to check
1062 PG_FUNCTION_INFO_V1(dblink_cancel_query
);
1064 dblink_cancel_query(PG_FUNCTION_ARGS
)
1067 PGconn
*conn
= NULL
;
1068 remoteConn
*rconn
= NULL
;
1073 DBLINK_GET_NAMED_CONN
;
1074 cancel
= PQgetCancel(conn
);
1076 res
= PQcancel(cancel
, errbuf
, 256);
1077 PQfreeCancel(cancel
);
1080 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1082 PG_RETURN_TEXT_P(cstring_to_text(errbuf
));
1087 * Get error message from a connection
1090 * "OK" if no error, an error message otherwise
1093 * text connection_name - name of the connection to check
1096 PG_FUNCTION_INFO_V1(dblink_error_message
);
1098 dblink_error_message(PG_FUNCTION_ARGS
)
1101 PGconn
*conn
= NULL
;
1102 remoteConn
*rconn
= NULL
;
1105 DBLINK_GET_NAMED_CONN
;
1107 msg
= PQerrorMessage(conn
);
1108 if (msg
== NULL
|| msg
[0] == '\0')
1109 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1111 PG_RETURN_TEXT_P(cstring_to_text(msg
));
1115 * Execute an SQL non-SELECT command
1117 PG_FUNCTION_INFO_V1(dblink_exec
);
1119 dblink_exec(PG_FUNCTION_ARGS
)
1122 PGresult
*res
= NULL
;
1123 text
*sql_cmd_status
= NULL
;
1124 TupleDesc tupdesc
= NULL
;
1125 PGconn
*conn
= NULL
;
1126 char *connstr
= NULL
;
1128 char *conname
= NULL
;
1129 remoteConn
*rconn
= NULL
;
1130 bool freeconn
= false;
1131 bool fail
= true; /* default to backward compatible behavior */
1135 if (PG_NARGS() == 3)
1137 /* must be text,text,bool */
1139 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
1140 fail
= PG_GETARG_BOOL(2);
1142 else if (PG_NARGS() == 2)
1144 /* might be text,text or text,bool */
1145 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
1148 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
1149 fail
= PG_GETARG_BOOL(1);
1154 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
1157 else if (PG_NARGS() == 1)
1159 /* must be single text argument */
1161 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
1164 /* shouldn't happen */
1165 elog(ERROR
, "wrong number of arguments");
1168 DBLINK_CONN_NOT_AVAIL
;
1170 res
= PQexec(conn
, sql
);
1172 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
1173 PQresultStatus(res
) != PGRES_TUPLES_OK
))
1175 dblink_res_error(conname
, res
, "could not execute command", fail
);
1177 /* need a tuple descriptor representing one TEXT column */
1178 tupdesc
= CreateTemplateTupleDesc(1, false);
1179 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "status",
1183 * and save a copy of the command status string to return as our
1186 sql_cmd_status
= cstring_to_text("ERROR");
1188 else if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
1190 /* need a tuple descriptor representing one TEXT column */
1191 tupdesc
= CreateTemplateTupleDesc(1, false);
1192 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "status",
1196 * and save a copy of the command status string to return as our
1199 sql_cmd_status
= cstring_to_text(PQcmdStatus(res
));
1206 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
1207 errmsg("statement returning results not allowed")));
1210 /* if needed, close the connection to the database and cleanup */
1214 PG_RETURN_TEXT_P(sql_cmd_status
);
1221 * Return list of primary key fields for the supplied relation,
1222 * or NULL if none exists.
1224 PG_FUNCTION_INFO_V1(dblink_get_pkey
);
1226 dblink_get_pkey(PG_FUNCTION_ARGS
)
1231 FuncCallContext
*funcctx
;
1234 AttInMetadata
*attinmeta
;
1235 MemoryContext oldcontext
;
1237 /* stuff done only on the first call of the function */
1238 if (SRF_IS_FIRSTCALL())
1240 TupleDesc tupdesc
= NULL
;
1242 /* create a function context for cross-call persistence */
1243 funcctx
= SRF_FIRSTCALL_INIT();
1246 * switch to memory context appropriate for multiple function calls
1248 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
1250 /* convert relname to rel Oid */
1251 relid
= get_relid_from_relname(PG_GETARG_TEXT_P(0));
1252 if (!OidIsValid(relid
))
1254 (errcode(ERRCODE_UNDEFINED_TABLE
),
1255 errmsg("relation \"%s\" does not exist",
1256 text_to_cstring(PG_GETARG_TEXT_PP(0)))));
1259 * need a tuple descriptor representing one INT and one TEXT column
1261 tupdesc
= CreateTemplateTupleDesc(2, false);
1262 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "position",
1264 TupleDescInitEntry(tupdesc
, (AttrNumber
) 2, "colname",
1268 * Generate attribute metadata needed later to produce tuples from raw
1271 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
1272 funcctx
->attinmeta
= attinmeta
;
1274 /* get an array of attnums */
1275 results
= get_pkey_attnames(relid
, &numatts
);
1277 if ((results
!= NULL
) && (numatts
> 0))
1279 funcctx
->max_calls
= numatts
;
1281 /* got results, keep track of them */
1282 funcctx
->user_fctx
= results
;
1286 /* fast track when no results */
1287 MemoryContextSwitchTo(oldcontext
);
1288 SRF_RETURN_DONE(funcctx
);
1291 MemoryContextSwitchTo(oldcontext
);
1294 /* stuff done on every call of the function */
1295 funcctx
= SRF_PERCALL_SETUP();
1298 * initialize per-call variables
1300 call_cntr
= funcctx
->call_cntr
;
1301 max_calls
= funcctx
->max_calls
;
1303 results
= (char **) funcctx
->user_fctx
;
1304 attinmeta
= funcctx
->attinmeta
;
1306 if (call_cntr
< max_calls
) /* do when there is more left to send */
1312 values
= (char **) palloc(2 * sizeof(char *));
1313 values
[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */
1315 sprintf(values
[0], "%d", call_cntr
+ 1);
1317 values
[1] = results
[call_cntr
];
1319 /* build the tuple */
1320 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
1322 /* make the tuple into a datum */
1323 result
= HeapTupleGetDatum(tuple
);
1325 SRF_RETURN_NEXT(funcctx
, result
);
1329 /* do when there is no more left */
1330 SRF_RETURN_DONE(funcctx
);
1336 * dblink_build_sql_insert
1338 * Used to generate an SQL insert statement
1339 * based on an existing tuple in a local relation.
1340 * This is useful for selectively replicating data
1341 * to another server via dblink.
1344 * <relname> - name of local table of interest
1345 * <pkattnums> - an int2vector of attnums which will be used
1346 * to identify the local tuple of interest
1347 * <pknumatts> - number of attnums in pkattnums
1348 * <src_pkattvals_arry> - text array of key values which will be used
1349 * to identify the local tuple of interest
1350 * <tgt_pkattvals_arry> - text array of key values which will be used
1351 * to build the string for execution remotely. These are substituted
1352 * for their counterparts in src_pkattvals_arry
1354 PG_FUNCTION_INFO_V1(dblink_build_sql_insert
);
1356 dblink_build_sql_insert(PG_FUNCTION_ARGS
)
1358 text
*relname_text
= PG_GETARG_TEXT_P(0);
1359 int2vector
*pkattnums
= (int2vector
*) PG_GETARG_POINTER(1);
1360 int32 pknumatts_tmp
= PG_GETARG_INT32(2);
1361 ArrayType
*src_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1362 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(4);
1364 int16 pknumatts
= 0;
1365 char **src_pkattvals
;
1366 char **tgt_pkattvals
;
1372 * Convert relname to rel OID.
1374 relid
= get_relid_from_relname(relname_text
);
1375 if (!OidIsValid(relid
))
1377 (errcode(ERRCODE_UNDEFINED_TABLE
),
1378 errmsg("relation \"%s\" does not exist",
1379 text_to_cstring(relname_text
))));
1382 * There should be at least one key attribute
1384 if (pknumatts_tmp
<= 0)
1386 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1387 errmsg("number of key attributes must be > 0")));
1389 if (pknumatts_tmp
<= SHRT_MAX
)
1390 pknumatts
= pknumatts_tmp
;
1393 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1394 errmsg("input for number of primary key " \
1395 "attributes too large")));
1398 * Source array is made up of key values that will be used to locate the
1399 * tuple of interest from the local system.
1401 src_pkattvals
= get_text_array_contents(src_pkattvals_arry
, &src_nitems
);
1404 * There should be one source array key value for each key attnum
1406 if (src_nitems
!= pknumatts
)
1408 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1409 errmsg("source key array length must match number of key " \
1413 * Target array is made up of key values that will be used to build the
1414 * SQL string for use on the remote system.
1416 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1419 * There should be one target array key value for each key attnum
1421 if (tgt_nitems
!= pknumatts
)
1423 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1424 errmsg("target key array length must match number of key " \
1428 * Prep work is finally done. Go get the SQL string.
1430 sql
= get_sql_insert(relid
, pkattnums
, pknumatts
, src_pkattvals
, tgt_pkattvals
);
1435 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1440 * dblink_build_sql_delete
1442 * Used to generate an SQL delete statement.
1443 * This is useful for selectively replicating a
1444 * delete to another server via dblink.
1447 * <relname> - name of remote table of interest
1448 * <pkattnums> - an int2vector of attnums which will be used
1449 * to identify the remote tuple of interest
1450 * <pknumatts> - number of attnums in pkattnums
1451 * <tgt_pkattvals_arry> - text array of key values which will be used
1452 * to build the string for execution remotely.
1454 PG_FUNCTION_INFO_V1(dblink_build_sql_delete
);
1456 dblink_build_sql_delete(PG_FUNCTION_ARGS
)
1458 text
*relname_text
= PG_GETARG_TEXT_P(0);
1459 int2vector
*pkattnums
= (int2vector
*) PG_GETARG_POINTER(1);
1460 int32 pknumatts_tmp
= PG_GETARG_INT32(2);
1461 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1463 int16 pknumatts
= 0;
1464 char **tgt_pkattvals
;
1469 * Convert relname to rel OID.
1471 relid
= get_relid_from_relname(relname_text
);
1472 if (!OidIsValid(relid
))
1474 (errcode(ERRCODE_UNDEFINED_TABLE
),
1475 errmsg("relation \"%s\" does not exist",
1476 text_to_cstring(relname_text
))));
1479 * There should be at least one key attribute
1481 if (pknumatts_tmp
<= 0)
1483 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1484 errmsg("number of key attributes must be > 0")));
1486 if (pknumatts_tmp
<= SHRT_MAX
)
1487 pknumatts
= pknumatts_tmp
;
1490 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1491 errmsg("input for number of primary key " \
1492 "attributes too large")));
1495 * Target array is made up of key values that will be used to build the
1496 * SQL string for use on the remote system.
1498 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1501 * There should be one target array key value for each key attnum
1503 if (tgt_nitems
!= pknumatts
)
1505 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1506 errmsg("target key array length must match number of key " \
1510 * Prep work is finally done. Go get the SQL string.
1512 sql
= get_sql_delete(relid
, pkattnums
, pknumatts
, tgt_pkattvals
);
1517 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1522 * dblink_build_sql_update
1524 * Used to generate an SQL update statement
1525 * based on an existing tuple in a local relation.
1526 * This is useful for selectively replicating data
1527 * to another server via dblink.
1530 * <relname> - name of local table of interest
1531 * <pkattnums> - an int2vector of attnums which will be used
1532 * to identify the local tuple of interest
1533 * <pknumatts> - number of attnums in pkattnums
1534 * <src_pkattvals_arry> - text array of key values which will be used
1535 * to identify the local tuple of interest
1536 * <tgt_pkattvals_arry> - text array of key values which will be used
1537 * to build the string for execution remotely. These are substituted
1538 * for their counterparts in src_pkattvals_arry
1540 PG_FUNCTION_INFO_V1(dblink_build_sql_update
);
1542 dblink_build_sql_update(PG_FUNCTION_ARGS
)
1544 text
*relname_text
= PG_GETARG_TEXT_P(0);
1545 int2vector
*pkattnums
= (int2vector
*) PG_GETARG_POINTER(1);
1546 int32 pknumatts_tmp
= PG_GETARG_INT32(2);
1547 ArrayType
*src_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1548 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(4);
1550 int16 pknumatts
= 0;
1551 char **src_pkattvals
;
1552 char **tgt_pkattvals
;
1558 * Convert relname to rel OID.
1560 relid
= get_relid_from_relname(relname_text
);
1561 if (!OidIsValid(relid
))
1563 (errcode(ERRCODE_UNDEFINED_TABLE
),
1564 errmsg("relation \"%s\" does not exist",
1565 text_to_cstring(relname_text
))));
1568 * There should be one source array key values for each key attnum
1570 if (pknumatts_tmp
<= 0)
1572 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1573 errmsg("number of key attributes must be > 0")));
1575 if (pknumatts_tmp
<= SHRT_MAX
)
1576 pknumatts
= pknumatts_tmp
;
1579 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1580 errmsg("input for number of primary key " \
1581 "attributes too large")));
1584 * Source array is made up of key values that will be used to locate the
1585 * tuple of interest from the local system.
1587 src_pkattvals
= get_text_array_contents(src_pkattvals_arry
, &src_nitems
);
1590 * There should be one source array key value for each key attnum
1592 if (src_nitems
!= pknumatts
)
1594 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1595 errmsg("source key array length must match number of key " \
1599 * Target array is made up of key values that will be used to build the
1600 * SQL string for use on the remote system.
1602 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1605 * There should be one target array key value for each key attnum
1607 if (tgt_nitems
!= pknumatts
)
1609 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1610 errmsg("target key array length must match number of key " \
1614 * Prep work is finally done. Go get the SQL string.
1616 sql
= get_sql_update(relid
, pkattnums
, pknumatts
, src_pkattvals
, tgt_pkattvals
);
1621 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1625 * dblink_current_query
1626 * return the current query string
1627 * to allow its use in (among other things)
1630 PG_FUNCTION_INFO_V1(dblink_current_query
);
1632 dblink_current_query(PG_FUNCTION_ARGS
)
1634 /* This is now just an alias for the built-in function current_query() */
1635 PG_RETURN_DATUM(current_query(fcinfo
));
1638 /*************************************************************
1639 * internal functions
1646 * Get the primary key attnames for the given relation.
1647 * Return NULL, and set numatts = 0, if no primary key exists.
1650 get_pkey_attnames(Oid relid
, int16
*numatts
)
1652 Relation indexRelation
;
1655 HeapTuple indexTuple
;
1657 char **result
= NULL
;
1660 AclResult aclresult
;
1662 /* initialize numatts to 0 in case no primary key exists */
1665 /* open relation using relid, check permissions, get tupdesc */
1666 rel
= relation_open(relid
, AccessShareLock
);
1668 aclresult
= pg_class_aclcheck(RelationGetRelid(rel
), GetUserId(),
1670 if (aclresult
!= ACLCHECK_OK
)
1671 aclcheck_error(aclresult
, ACL_KIND_CLASS
,
1672 RelationGetRelationName(rel
));
1674 tupdesc
= rel
->rd_att
;
1676 /* Prepare to scan pg_index for entries having indrelid = this rel. */
1677 indexRelation
= heap_open(IndexRelationId
, AccessShareLock
);
1679 Anum_pg_index_indrelid
,
1680 BTEqualStrategyNumber
, F_OIDEQ
,
1681 ObjectIdGetDatum(relid
));
1683 scan
= systable_beginscan(indexRelation
, IndexIndrelidIndexId
, true,
1684 SnapshotNow
, 1, &skey
);
1686 while (HeapTupleIsValid(indexTuple
= systable_getnext(scan
)))
1688 Form_pg_index index
= (Form_pg_index
) GETSTRUCT(indexTuple
);
1690 /* we're only interested if it is the primary key */
1691 if (index
->indisprimary
)
1693 *numatts
= index
->indnatts
;
1696 result
= (char **) palloc(*numatts
* sizeof(char *));
1698 for (i
= 0; i
< *numatts
; i
++)
1699 result
[i
] = SPI_fname(tupdesc
, index
->indkey
.values
[i
]);
1705 systable_endscan(scan
);
1706 heap_close(indexRelation
, AccessShareLock
);
1707 relation_close(rel
, AccessShareLock
);
1713 * Deconstruct a text[] into C-strings (note any NULL elements will be
1714 * returned as NULL pointers)
1717 get_text_array_contents(ArrayType
*array
, int *numitems
)
1719 int ndim
= ARR_NDIM(array
);
1720 int *dims
= ARR_DIMS(array
);
1731 Assert(ARR_ELEMTYPE(array
) == TEXTOID
);
1733 *numitems
= nitems
= ArrayGetNItems(ndim
, dims
);
1735 get_typlenbyvalalign(ARR_ELEMTYPE(array
),
1736 &typlen
, &typbyval
, &typalign
);
1738 values
= (char **) palloc(nitems
* sizeof(char *));
1740 ptr
= ARR_DATA_PTR(array
);
1741 bitmap
= ARR_NULLBITMAP(array
);
1744 for (i
= 0; i
< nitems
; i
++)
1746 if (bitmap
&& (*bitmap
& bitmask
) == 0)
1752 values
[i
] = TextDatumGetCString(PointerGetDatum(ptr
));
1753 ptr
= att_addlength_pointer(ptr
, typlen
, ptr
);
1754 ptr
= (char *) att_align_nominal(ptr
, typalign
);
1757 /* advance bitmap pointer if any */
1761 if (bitmask
== 0x100)
1773 get_sql_insert(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
)
1786 initStringInfo(&buf
);
1788 /* get relation name including any needed schema prefix and quoting */
1789 relname
= generate_relation_name(relid
);
1792 * Open relation using relid
1794 rel
= relation_open(relid
, AccessShareLock
);
1795 tupdesc
= rel
->rd_att
;
1796 natts
= tupdesc
->natts
;
1798 tuple
= get_tuple_of_interest(relid
, pkattnums
, pknumatts
, src_pkattvals
);
1801 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
1802 errmsg("source row not found")));
1804 appendStringInfo(&buf
, "INSERT INTO %s(", relname
);
1807 for (i
= 0; i
< natts
; i
++)
1809 if (tupdesc
->attrs
[i
]->attisdropped
)
1813 appendStringInfo(&buf
, ",");
1815 appendStringInfoString(&buf
,
1816 quote_ident_cstr(NameStr(tupdesc
->attrs
[i
]->attname
)));
1820 appendStringInfo(&buf
, ") VALUES(");
1823 * remember attvals are 1 based
1826 for (i
= 0; i
< natts
; i
++)
1828 if (tupdesc
->attrs
[i
]->attisdropped
)
1832 appendStringInfo(&buf
, ",");
1834 if (tgt_pkattvals
!= NULL
)
1835 key
= get_attnum_pk_pos(pkattnums
, pknumatts
, i
+ 1);
1840 val
= tgt_pkattvals
[key
] ? pstrdup(tgt_pkattvals
[key
]) : NULL
;
1842 val
= SPI_getvalue(tuple
, tupdesc
, i
+ 1);
1846 appendStringInfoString(&buf
, quote_literal_cstr(val
));
1850 appendStringInfo(&buf
, "NULL");
1853 appendStringInfo(&buf
, ")");
1855 relation_close(rel
, AccessShareLock
);
1860 get_sql_delete(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **tgt_pkattvals
)
1869 initStringInfo(&buf
);
1871 /* get relation name including any needed schema prefix and quoting */
1872 relname
= generate_relation_name(relid
);
1875 * Open relation using relid
1877 rel
= relation_open(relid
, AccessShareLock
);
1878 tupdesc
= rel
->rd_att
;
1879 natts
= tupdesc
->natts
;
1881 appendStringInfo(&buf
, "DELETE FROM %s WHERE ", relname
);
1882 for (i
= 0; i
< pknumatts
; i
++)
1884 int16 pkattnum
= pkattnums
->values
[i
];
1887 appendStringInfo(&buf
, " AND ");
1889 appendStringInfoString(&buf
,
1890 quote_ident_cstr(NameStr(tupdesc
->attrs
[pkattnum
- 1]->attname
)));
1892 if (tgt_pkattvals
== NULL
)
1893 /* internal error */
1894 elog(ERROR
, "target key array must not be NULL");
1896 if (tgt_pkattvals
[i
] != NULL
)
1897 appendStringInfo(&buf
, " = %s",
1898 quote_literal_cstr(tgt_pkattvals
[i
]));
1900 appendStringInfo(&buf
, " IS NULL");
1903 relation_close(rel
, AccessShareLock
);
1908 get_sql_update(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
)
1921 initStringInfo(&buf
);
1923 /* get relation name including any needed schema prefix and quoting */
1924 relname
= generate_relation_name(relid
);
1927 * Open relation using relid
1929 rel
= relation_open(relid
, AccessShareLock
);
1930 tupdesc
= rel
->rd_att
;
1931 natts
= tupdesc
->natts
;
1933 tuple
= get_tuple_of_interest(relid
, pkattnums
, pknumatts
, src_pkattvals
);
1936 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
1937 errmsg("source row not found")));
1939 appendStringInfo(&buf
, "UPDATE %s SET ", relname
);
1942 for (i
= 0; i
< natts
; i
++)
1944 if (tupdesc
->attrs
[i
]->attisdropped
)
1948 appendStringInfo(&buf
, ", ");
1950 appendStringInfo(&buf
, "%s = ",
1951 quote_ident_cstr(NameStr(tupdesc
->attrs
[i
]->attname
)));
1953 if (tgt_pkattvals
!= NULL
)
1954 key
= get_attnum_pk_pos(pkattnums
, pknumatts
, i
+ 1);
1959 val
= tgt_pkattvals
[key
] ? pstrdup(tgt_pkattvals
[key
]) : NULL
;
1961 val
= SPI_getvalue(tuple
, tupdesc
, i
+ 1);
1965 appendStringInfoString(&buf
, quote_literal_cstr(val
));
1969 appendStringInfoString(&buf
, "NULL");
1973 appendStringInfo(&buf
, " WHERE ");
1975 for (i
= 0; i
< pknumatts
; i
++)
1977 int16 pkattnum
= pkattnums
->values
[i
];
1980 appendStringInfo(&buf
, " AND ");
1982 appendStringInfo(&buf
, "%s",
1983 quote_ident_cstr(NameStr(tupdesc
->attrs
[pkattnum
- 1]->attname
)));
1985 if (tgt_pkattvals
!= NULL
)
1986 val
= tgt_pkattvals
[i
] ? pstrdup(tgt_pkattvals
[i
]) : NULL
;
1988 val
= SPI_getvalue(tuple
, tupdesc
, pkattnum
);
1992 appendStringInfo(&buf
, " = %s", quote_literal_cstr(val
));
1996 appendStringInfo(&buf
, " IS NULL");
1999 relation_close(rel
, AccessShareLock
);
2004 * Return a properly quoted literal value.
2005 * Uses quote_literal in quote.c
2008 quote_literal_cstr(char *rawstr
)
2014 rawstr_text
= cstring_to_text(rawstr
);
2015 result_text
= DatumGetTextP(DirectFunctionCall1(quote_literal
,
2016 PointerGetDatum(rawstr_text
)));
2017 result
= text_to_cstring(result_text
);
2023 * Return a properly quoted identifier.
2024 * Uses quote_ident in quote.c
2027 quote_ident_cstr(char *rawstr
)
2033 rawstr_text
= cstring_to_text(rawstr
);
2034 result_text
= DatumGetTextP(DirectFunctionCall1(quote_ident
,
2035 PointerGetDatum(rawstr_text
)));
2036 result
= text_to_cstring(result_text
);
2042 get_attnum_pk_pos(int2vector
*pkattnums
, int16 pknumatts
, int16 key
)
2047 * Not likely a long list anyway, so just scan for the value
2049 for (i
= 0; i
< pknumatts
; i
++)
2050 if (key
== pkattnums
->values
[i
])
2057 get_tuple_of_interest(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
)
2067 initStringInfo(&buf
);
2069 /* get relation name including any needed schema prefix and quoting */
2070 relname
= generate_relation_name(relid
);
2073 * Open relation using relid
2075 rel
= relation_open(relid
, AccessShareLock
);
2076 tupdesc
= CreateTupleDescCopy(rel
->rd_att
);
2077 relation_close(rel
, AccessShareLock
);
2080 * Connect to SPI manager
2082 if ((ret
= SPI_connect()) < 0)
2083 /* internal error */
2084 elog(ERROR
, "SPI connect failure - returned %d", ret
);
2087 * Build sql statement to look up tuple of interest Use src_pkattvals as
2090 appendStringInfo(&buf
, "SELECT * FROM %s WHERE ", relname
);
2092 for (i
= 0; i
< pknumatts
; i
++)
2094 int16 pkattnum
= pkattnums
->values
[i
];
2097 appendStringInfo(&buf
, " AND ");
2099 appendStringInfoString(&buf
,
2100 quote_ident_cstr(NameStr(tupdesc
->attrs
[pkattnum
- 1]->attname
)));
2102 if (src_pkattvals
[i
] != NULL
)
2103 appendStringInfo(&buf
, " = %s",
2104 quote_literal_cstr(src_pkattvals
[i
]));
2106 appendStringInfo(&buf
, " IS NULL");
2110 * Retrieve the desired tuple
2112 ret
= SPI_exec(buf
.data
, 0);
2116 * Only allow one qualifying tuple
2118 if ((ret
== SPI_OK_SELECT
) && (SPI_processed
> 1))
2120 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
2121 errmsg("source criteria matched more than one record")));
2123 else if (ret
== SPI_OK_SELECT
&& SPI_processed
== 1)
2125 SPITupleTable
*tuptable
= SPI_tuptable
;
2127 tuple
= SPI_copytuple(tuptable
->vals
[0]);
2135 * no qualifying tuples
2143 * never reached, but keep compiler quiet
2149 get_relid_from_relname(text
*relname_text
)
2155 relvar
= makeRangeVarFromNameList(textToQualifiedNameList(relname_text
));
2156 rel
= heap_openrv(relvar
, AccessShareLock
);
2157 relid
= RelationGetRelid(rel
);
2158 relation_close(rel
, AccessShareLock
);
2164 * generate_relation_name - copied from ruleutils.c
2165 * Compute the name to display for a relation specified by OID
2167 * The result includes all necessary quoting and schema-prefixing.
2170 generate_relation_name(Oid relid
)
2173 Form_pg_class reltup
;
2177 tp
= SearchSysCache(RELOID
,
2178 ObjectIdGetDatum(relid
),
2180 if (!HeapTupleIsValid(tp
))
2181 elog(ERROR
, "cache lookup failed for relation %u", relid
);
2183 reltup
= (Form_pg_class
) GETSTRUCT(tp
);
2185 /* Qualify the name if not visible in search path */
2186 if (RelationIsVisible(relid
))
2189 nspname
= get_namespace_name(reltup
->relnamespace
);
2191 result
= quote_qualified_identifier(nspname
, NameStr(reltup
->relname
));
2193 ReleaseSysCache(tp
);
2200 getConnectionByName(const char *name
)
2202 remoteConnHashEnt
*hentry
;
2203 char key
[NAMEDATALEN
];
2205 if (!remoteConnHash
)
2206 remoteConnHash
= createConnHash();
2208 MemSet(key
, 0, NAMEDATALEN
);
2209 snprintf(key
, NAMEDATALEN
- 1, "%s", name
);
2210 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
,
2211 key
, HASH_FIND
, NULL
);
2214 return (hentry
->rconn
);
2220 createConnHash(void)
2224 ctl
.keysize
= NAMEDATALEN
;
2225 ctl
.entrysize
= sizeof(remoteConnHashEnt
);
2227 return hash_create("Remote Con hash", NUMCONN
, &ctl
, HASH_ELEM
);
2231 createNewConnection(const char *name
, remoteConn
*rconn
)
2233 remoteConnHashEnt
*hentry
;
2235 char key
[NAMEDATALEN
];
2237 if (!remoteConnHash
)
2238 remoteConnHash
= createConnHash();
2240 MemSet(key
, 0, NAMEDATALEN
);
2241 snprintf(key
, NAMEDATALEN
- 1, "%s", name
);
2242 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
, key
,
2243 HASH_ENTER
, &found
);
2247 (errcode(ERRCODE_DUPLICATE_OBJECT
),
2248 errmsg("duplicate connection name")));
2250 hentry
->rconn
= rconn
;
2251 strlcpy(hentry
->name
, name
, sizeof(hentry
->name
));
2255 deleteConnection(const char *name
)
2257 remoteConnHashEnt
*hentry
;
2259 char key
[NAMEDATALEN
];
2261 if (!remoteConnHash
)
2262 remoteConnHash
= createConnHash();
2264 MemSet(key
, 0, NAMEDATALEN
);
2265 snprintf(key
, NAMEDATALEN
- 1, "%s", name
);
2267 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
,
2268 key
, HASH_REMOVE
, &found
);
2272 (errcode(ERRCODE_UNDEFINED_OBJECT
),
2273 errmsg("undefined connection name")));
2278 dblink_security_check(PGconn
*conn
, remoteConn
*rconn
)
2282 if (!PQconnectionUsedPassword(conn
))
2289 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
2290 errmsg("password is required"),
2291 errdetail("Non-superuser cannot connect if the server does not request a password."),
2292 errhint("Target server's authentication method must be changed.")));
2298 * For non-superusers, insist that the connstr specify a password. This
2299 * prevents a password from being picked up from .pgpass, a service file,
2300 * the environment, etc. We don't want the postgres user's passwords
2301 * to be accessible to non-superusers.
2304 dblink_connstr_check(const char *connstr
)
2308 PQconninfoOption
*options
;
2309 PQconninfoOption
*option
;
2310 bool connstr_gives_password
= false;
2312 options
= PQconninfoParse(connstr
, NULL
);
2315 for (option
= options
; option
->keyword
!= NULL
; option
++)
2317 if (strcmp(option
->keyword
, "password") == 0)
2319 if (option
->val
!= NULL
&& option
->val
[0] != '\0')
2321 connstr_gives_password
= true;
2326 PQconninfoFree(options
);
2329 if (!connstr_gives_password
)
2331 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
2332 errmsg("password is required"),
2333 errdetail("Non-superusers must provide a password in the connection string.")));
2338 dblink_res_error(const char *conname
, PGresult
*res
, const char *dblink_context_msg
, bool fail
)
2341 char *pg_diag_sqlstate
= PQresultErrorField(res
, PG_DIAG_SQLSTATE
);
2342 char *pg_diag_message_primary
= PQresultErrorField(res
, PG_DIAG_MESSAGE_PRIMARY
);
2343 char *pg_diag_message_detail
= PQresultErrorField(res
, PG_DIAG_MESSAGE_DETAIL
);
2344 char *pg_diag_message_hint
= PQresultErrorField(res
, PG_DIAG_MESSAGE_HINT
);
2345 char *pg_diag_context
= PQresultErrorField(res
, PG_DIAG_CONTEXT
);
2347 char *message_primary
;
2348 char *message_detail
;
2350 char *message_context
;
2351 const char *dblink_context_conname
= "unnamed";
2358 if (pg_diag_sqlstate
)
2359 sqlstate
= MAKE_SQLSTATE(pg_diag_sqlstate
[0],
2360 pg_diag_sqlstate
[1],
2361 pg_diag_sqlstate
[2],
2362 pg_diag_sqlstate
[3],
2363 pg_diag_sqlstate
[4]);
2365 sqlstate
= ERRCODE_CONNECTION_FAILURE
;
2367 xpstrdup(message_primary
, pg_diag_message_primary
);
2368 xpstrdup(message_detail
, pg_diag_message_detail
);
2369 xpstrdup(message_hint
, pg_diag_message_hint
);
2370 xpstrdup(message_context
, pg_diag_context
);
2376 dblink_context_conname
= conname
;
2380 message_primary
? errmsg("%s", message_primary
) : errmsg("unknown error"),
2381 message_detail
? errdetail("%s", message_detail
) : 0,
2382 message_hint
? errhint("%s", message_hint
) : 0,
2383 message_context
? errcontext("%s", message_context
) : 0,
2384 errcontext("Error occurred on dblink connection named \"%s\": %s.",
2385 dblink_context_conname
, dblink_context_msg
)));
2389 * Obtain connection string for a foreign server
2392 get_connect_string(const char *servername
)
2394 ForeignServer
*foreign_server
= NULL
;
2395 UserMapping
*user_mapping
;
2397 StringInfo buf
= makeStringInfo();
2398 ForeignDataWrapper
*fdw
;
2399 AclResult aclresult
;
2401 /* first gather the server connstr options */
2402 if (strlen(servername
) < NAMEDATALEN
)
2403 foreign_server
= GetForeignServerByName(servername
, true);
2407 Oid serverid
= foreign_server
->serverid
;
2408 Oid fdwid
= foreign_server
->fdwid
;
2409 Oid userid
= GetUserId();
2411 user_mapping
= GetUserMapping(userid
, serverid
);
2412 fdw
= GetForeignDataWrapper(fdwid
);
2414 /* Check permissions, user must have usage on the server. */
2415 aclresult
= pg_foreign_server_aclcheck(serverid
, userid
, ACL_USAGE
);
2416 if (aclresult
!= ACLCHECK_OK
)
2417 aclcheck_error(aclresult
, ACL_KIND_FOREIGN_SERVER
, foreign_server
->servername
);
2419 foreach(cell
, fdw
->options
)
2421 DefElem
*def
= lfirst(cell
);
2423 appendStringInfo(buf
, "%s='%s' ", def
->defname
,
2424 escape_param_str(strVal(def
->arg
)));
2427 foreach(cell
, foreign_server
->options
)
2429 DefElem
*def
= lfirst(cell
);
2431 appendStringInfo(buf
, "%s='%s' ", def
->defname
,
2432 escape_param_str(strVal(def
->arg
)));
2435 foreach(cell
, user_mapping
->options
)
2438 DefElem
*def
= lfirst(cell
);
2440 appendStringInfo(buf
, "%s='%s' ", def
->defname
,
2441 escape_param_str(strVal(def
->arg
)));
2451 * Escaping libpq connect parameter strings.
2453 * Replaces "'" with "\'" and "\" with "\\".
2456 escape_param_str(const char *str
)
2459 StringInfo buf
= makeStringInfo();
2461 for (cp
= str
; *cp
; cp
++)
2463 if (*cp
== '\\' || *cp
== '\'')
2464 appendStringInfoChar(buf
, '\\');
2465 appendStringInfoChar(buf
, *cp
);