4 * Functions returning results from a remote database
6 * Joe Conway <mail@joeconway.com>
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
12 * Copyright (c) 2001-2008, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
40 #include "access/genam.h"
41 #include "access/heapam.h"
42 #include "access/tupdesc.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_index.h"
46 #include "catalog/pg_type.h"
47 #include "executor/executor.h"
48 #include "executor/spi.h"
49 #include "lib/stringinfo.h"
50 #include "miscadmin.h"
51 #include "nodes/execnodes.h"
52 #include "nodes/nodes.h"
53 #include "nodes/pg_list.h"
54 #include "parser/parse_type.h"
55 #include "utils/acl.h"
56 #include "utils/array.h"
57 #include "utils/builtins.h"
58 #include "utils/dynahash.h"
59 #include "utils/fmgroids.h"
60 #include "utils/hsearch.h"
61 #include "utils/lsyscache.h"
62 #include "utils/memutils.h"
63 #include "utils/syscache.h"
64 #include "utils/tqual.h"
70 typedef struct remoteConn
72 PGconn
*conn
; /* Hold the remote connection */
73 int openCursorCount
; /* The number of open cursors */
74 bool newXactForCursor
; /* Opened a transaction for a cursor */
78 * Internal declarations
80 static Datum
dblink_record_internal(FunctionCallInfo fcinfo
, bool is_async
, bool do_get
);
81 static remoteConn
*getConnectionByName(const char *name
);
82 static HTAB
*createConnHash(void);
83 static void createNewConnection(const char *name
, remoteConn
* rconn
);
84 static void deleteConnection(const char *name
);
85 static char **get_pkey_attnames(Oid relid
, int16
*numatts
);
86 static char **get_text_array_contents(ArrayType
*array
, int *numitems
);
87 static char *get_sql_insert(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
);
88 static char *get_sql_delete(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **tgt_pkattvals
);
89 static char *get_sql_update(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
);
90 static char *quote_literal_cstr(char *rawstr
);
91 static char *quote_ident_cstr(char *rawstr
);
92 static int16
get_attnum_pk_pos(int2vector
*pkattnums
, int16 pknumatts
, int16 key
);
93 static HeapTuple
get_tuple_of_interest(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
);
94 static Oid
get_relid_from_relname(text
*relname_text
);
95 static char *generate_relation_name(Oid relid
);
96 static void dblink_connstr_check(const char *connstr
);
97 static void dblink_security_check(PGconn
*conn
, remoteConn
*rconn
);
98 static void dblink_res_error(const char *conname
, PGresult
*res
, const char *dblink_context_msg
, bool fail
);
101 static remoteConn
*pconn
= NULL
;
102 static HTAB
*remoteConnHash
= NULL
;
105 * Following is list that holds multiple remote connections.
106 * Calling convention of each dblink function changes to accept
107 * connection name as the first parameter. The connection list is
108 * much like ecpg e.g. a mapping between a name and a PGconn object.
111 typedef struct remoteConnHashEnt
113 char name
[NAMEDATALEN
];
117 /* initial number of connection hashes */
120 /* general utility */
121 #define xpfree(var_) \
130 #define xpstrdup(var_c, var_) \
133 var_c = pstrdup(var_); \
138 #define DBLINK_RES_INTERNALERROR(p2) \
140 msg = pstrdup(PQerrorMessage(conn)); \
143 elog(ERROR, "%s: %s", p2, msg); \
146 #define DBLINK_CONN_NOT_AVAIL \
150 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
151 errmsg("connection \"%s\" not available", conname))); \
154 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
155 errmsg("connection not available"))); \
158 #define DBLINK_GET_CONN \
160 char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
161 rconn = getConnectionByName(conname_or_str); \
164 conn = rconn->conn; \
168 connstr = conname_or_str; \
169 dblink_connstr_check(connstr); \
170 conn = PQconnectdb(connstr); \
171 if (PQstatus(conn) == CONNECTION_BAD) \
173 msg = pstrdup(PQerrorMessage(conn)); \
176 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
177 errmsg("could not establish connection"), \
178 errdetail("%s", msg))); \
180 dblink_security_check(conn, rconn); \
185 #define DBLINK_GET_NAMED_CONN \
187 char *conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
188 rconn = getConnectionByName(conname); \
190 conn = rconn->conn; \
192 DBLINK_CONN_NOT_AVAIL; \
195 #define DBLINK_INIT \
199 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
200 pconn->conn = NULL; \
201 pconn->openCursorCount = 0; \
202 pconn->newXactForCursor = FALSE; \
207 * Create a persistent connection to another database
209 PG_FUNCTION_INFO_V1(dblink_connect
);
211 dblink_connect(PG_FUNCTION_ARGS
)
213 char *connstr
= NULL
;
214 char *connname
= NULL
;
216 MemoryContext oldcontext
;
218 remoteConn
*rconn
= NULL
;
224 connstr
= text_to_cstring(PG_GETARG_TEXT_PP(1));
225 connname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
227 else if (PG_NARGS() == 1)
228 connstr
= text_to_cstring(PG_GETARG_TEXT_PP(0));
230 oldcontext
= MemoryContextSwitchTo(TopMemoryContext
);
233 rconn
= (remoteConn
*) palloc(sizeof(remoteConn
));
235 /* check password in connection string if not superuser */
236 dblink_connstr_check(connstr
);
237 conn
= PQconnectdb(connstr
);
239 MemoryContextSwitchTo(oldcontext
);
241 if (PQstatus(conn
) == CONNECTION_BAD
)
243 msg
= pstrdup(PQerrorMessage(conn
));
249 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
),
250 errmsg("could not establish connection"),
251 errdetail("%s", msg
)));
254 /* check password actually used if not superuser */
255 dblink_security_check(conn
, rconn
);
260 createNewConnection(connname
, rconn
);
265 PG_RETURN_TEXT_P(cstring_to_text("OK"));
269 * Clear a persistent connection to another database
271 PG_FUNCTION_INFO_V1(dblink_disconnect
);
273 dblink_disconnect(PG_FUNCTION_ARGS
)
275 char *conname
= NULL
;
276 remoteConn
*rconn
= NULL
;
283 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
284 rconn
= getConnectionByName(conname
);
292 DBLINK_CONN_NOT_AVAIL
;
297 deleteConnection(conname
);
303 PG_RETURN_TEXT_P(cstring_to_text("OK"));
307 * opens a cursor using a persistent connection
309 PG_FUNCTION_INFO_V1(dblink_open
);
311 dblink_open(PG_FUNCTION_ARGS
)
314 PGresult
*res
= NULL
;
316 char *curname
= NULL
;
318 char *conname
= NULL
;
320 remoteConn
*rconn
= NULL
;
321 bool fail
= true; /* default to backward compatible behavior */
324 initStringInfo(&buf
);
329 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
330 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
333 else if (PG_NARGS() == 3)
335 /* might be text,text,text or text,text,bool */
336 if (get_fn_expr_argtype(fcinfo
->flinfo
, 2) == BOOLOID
)
338 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
339 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
340 fail
= PG_GETARG_BOOL(2);
345 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
346 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
347 sql
= text_to_cstring(PG_GETARG_TEXT_PP(2));
348 rconn
= getConnectionByName(conname
);
351 else if (PG_NARGS() == 4)
353 /* text,text,text,bool */
354 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
355 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
356 sql
= text_to_cstring(PG_GETARG_TEXT_PP(2));
357 fail
= PG_GETARG_BOOL(3);
358 rconn
= getConnectionByName(conname
);
361 if (!rconn
|| !rconn
->conn
)
362 DBLINK_CONN_NOT_AVAIL
;
366 /* If we are not in a transaction, start one */
367 if (PQtransactionStatus(conn
) == PQTRANS_IDLE
)
369 res
= PQexec(conn
, "BEGIN");
370 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
371 DBLINK_RES_INTERNALERROR("begin error");
373 rconn
->newXactForCursor
= TRUE
;
376 * Since transaction state was IDLE, we force cursor count to
377 * initially be 0. This is needed as a previous ABORT might have wiped
378 * out our transaction without maintaining the cursor count for us.
380 rconn
->openCursorCount
= 0;
383 /* if we started a transaction, increment cursor count */
384 if (rconn
->newXactForCursor
)
385 (rconn
->openCursorCount
)++;
387 appendStringInfo(&buf
, "DECLARE %s CURSOR FOR %s", curname
, sql
);
388 res
= PQexec(conn
, buf
.data
);
389 if (!res
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
391 dblink_res_error(conname
, res
, "could not open cursor", fail
);
392 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
396 PG_RETURN_TEXT_P(cstring_to_text("OK"));
402 PG_FUNCTION_INFO_V1(dblink_close
);
404 dblink_close(PG_FUNCTION_ARGS
)
407 PGresult
*res
= NULL
;
408 char *curname
= NULL
;
409 char *conname
= NULL
;
412 remoteConn
*rconn
= NULL
;
413 bool fail
= true; /* default to backward compatible behavior */
416 initStringInfo(&buf
);
421 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
424 else if (PG_NARGS() == 2)
426 /* might be text,text or text,bool */
427 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
429 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
430 fail
= PG_GETARG_BOOL(1);
435 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
436 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
437 rconn
= getConnectionByName(conname
);
443 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
444 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
445 fail
= PG_GETARG_BOOL(2);
446 rconn
= getConnectionByName(conname
);
449 if (!rconn
|| !rconn
->conn
)
450 DBLINK_CONN_NOT_AVAIL
;
454 appendStringInfo(&buf
, "CLOSE %s", curname
);
456 /* close the cursor */
457 res
= PQexec(conn
, buf
.data
);
458 if (!res
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
460 dblink_res_error(conname
, res
, "could not close cursor", fail
);
461 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
466 /* if we started a transaction, decrement cursor count */
467 if (rconn
->newXactForCursor
)
469 (rconn
->openCursorCount
)--;
471 /* if count is zero, commit the transaction */
472 if (rconn
->openCursorCount
== 0)
474 rconn
->newXactForCursor
= FALSE
;
476 res
= PQexec(conn
, "COMMIT");
477 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
478 DBLINK_RES_INTERNALERROR("commit error");
483 PG_RETURN_TEXT_P(cstring_to_text("OK"));
487 * Fetch results from an open cursor
489 PG_FUNCTION_INFO_V1(dblink_fetch
);
491 dblink_fetch(PG_FUNCTION_ARGS
)
493 FuncCallContext
*funcctx
;
494 TupleDesc tupdesc
= NULL
;
497 AttInMetadata
*attinmeta
;
498 PGresult
*res
= NULL
;
499 MemoryContext oldcontext
;
500 char *conname
= NULL
;
501 remoteConn
*rconn
= NULL
;
505 /* stuff done only on the first call of the function */
506 if (SRF_IS_FIRSTCALL())
510 char *curname
= NULL
;
512 bool fail
= true; /* default to backward compatible */
516 /* text,text,int,bool */
517 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
518 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
519 howmany
= PG_GETARG_INT32(2);
520 fail
= PG_GETARG_BOOL(3);
522 rconn
= getConnectionByName(conname
);
526 else if (PG_NARGS() == 3)
528 /* text,text,int or text,int,bool */
529 if (get_fn_expr_argtype(fcinfo
->flinfo
, 2) == BOOLOID
)
531 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
532 howmany
= PG_GETARG_INT32(1);
533 fail
= PG_GETARG_BOOL(2);
538 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
539 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
540 howmany
= PG_GETARG_INT32(2);
542 rconn
= getConnectionByName(conname
);
547 else if (PG_NARGS() == 2)
550 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
551 howmany
= PG_GETARG_INT32(1);
556 DBLINK_CONN_NOT_AVAIL
;
558 initStringInfo(&buf
);
559 appendStringInfo(&buf
, "FETCH %d FROM %s", howmany
, curname
);
561 /* create a function context for cross-call persistence */
562 funcctx
= SRF_FIRSTCALL_INIT();
565 * switch to memory context appropriate for multiple function calls
567 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
569 res
= PQexec(conn
, buf
.data
);
571 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
572 PQresultStatus(res
) != PGRES_TUPLES_OK
))
574 dblink_res_error(conname
, res
, "could not fetch from cursor", fail
);
575 SRF_RETURN_DONE(funcctx
);
577 else if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
579 /* cursor does not exist - closed already or bad name */
582 (errcode(ERRCODE_INVALID_CURSOR_NAME
),
583 errmsg("cursor \"%s\" does not exist", curname
)));
586 funcctx
->max_calls
= PQntuples(res
);
588 /* got results, keep track of them */
589 funcctx
->user_fctx
= res
;
591 /* get a tuple descriptor for our result type */
592 switch (get_call_result_type(fcinfo
, NULL
, &tupdesc
))
594 case TYPEFUNC_COMPOSITE
:
597 case TYPEFUNC_RECORD
:
598 /* failed to determine actual type of RECORD */
600 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
601 errmsg("function returning record called in context "
602 "that cannot accept type record")));
605 /* result type isn't composite */
606 elog(ERROR
, "return type must be a row type");
610 /* make sure we have a persistent copy of the tupdesc */
611 tupdesc
= CreateTupleDescCopy(tupdesc
);
613 /* check result and tuple descriptor have the same number of columns */
614 if (PQnfields(res
) != tupdesc
->natts
)
616 (errcode(ERRCODE_DATATYPE_MISMATCH
),
617 errmsg("remote query result rowtype does not match "
618 "the specified FROM clause rowtype")));
620 /* fast track when no results */
621 if (funcctx
->max_calls
< 1)
625 SRF_RETURN_DONE(funcctx
);
628 /* store needed metadata for subsequent calls */
629 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
630 funcctx
->attinmeta
= attinmeta
;
632 MemoryContextSwitchTo(oldcontext
);
635 /* stuff done on every call of the function */
636 funcctx
= SRF_PERCALL_SETUP();
639 * initialize per-call variables
641 call_cntr
= funcctx
->call_cntr
;
642 max_calls
= funcctx
->max_calls
;
644 res
= (PGresult
*) funcctx
->user_fctx
;
645 attinmeta
= funcctx
->attinmeta
;
646 tupdesc
= attinmeta
->tupdesc
;
648 if (call_cntr
< max_calls
) /* do when there is more left to send */
654 int nfields
= PQnfields(res
);
656 values
= (char **) palloc(nfields
* sizeof(char *));
657 for (i
= 0; i
< nfields
; i
++)
659 if (PQgetisnull(res
, call_cntr
, i
) == 0)
660 values
[i
] = PQgetvalue(res
, call_cntr
, i
);
665 /* build the tuple */
666 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
668 /* make the tuple into a datum */
669 result
= HeapTupleGetDatum(tuple
);
671 SRF_RETURN_NEXT(funcctx
, result
);
675 /* do when there is no more left */
677 SRF_RETURN_DONE(funcctx
);
682 * Note: this is the new preferred version of dblink
684 PG_FUNCTION_INFO_V1(dblink_record
);
686 dblink_record(PG_FUNCTION_ARGS
)
688 return dblink_record_internal(fcinfo
, false, false);
691 PG_FUNCTION_INFO_V1(dblink_send_query
);
693 dblink_send_query(PG_FUNCTION_ARGS
)
695 return dblink_record_internal(fcinfo
, true, false);
698 PG_FUNCTION_INFO_V1(dblink_get_result
);
700 dblink_get_result(PG_FUNCTION_ARGS
)
702 return dblink_record_internal(fcinfo
, true, true);
706 dblink_record_internal(FunctionCallInfo fcinfo
, bool is_async
, bool do_get
)
708 FuncCallContext
*funcctx
;
709 TupleDesc tupdesc
= NULL
;
712 AttInMetadata
*attinmeta
;
714 PGresult
*res
= NULL
;
715 bool is_sql_cmd
= false;
716 char *sql_cmd_status
= NULL
;
717 MemoryContext oldcontext
;
718 bool freeconn
= false;
722 /* stuff done only on the first call of the function */
723 if (SRF_IS_FIRSTCALL())
726 char *connstr
= NULL
;
728 char *conname
= NULL
;
729 remoteConn
*rconn
= NULL
;
730 bool fail
= true; /* default to backward compatible */
732 /* create a function context for cross-call persistence */
733 funcctx
= SRF_FIRSTCALL_INIT();
736 * switch to memory context appropriate for multiple function calls
738 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
746 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
747 fail
= PG_GETARG_BOOL(2);
749 else if (PG_NARGS() == 2)
751 /* text,text or text,bool */
752 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
755 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
756 fail
= PG_GETARG_BOOL(1);
761 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
764 else if (PG_NARGS() == 1)
768 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
771 /* shouldn't happen */
772 elog(ERROR
, "wrong number of arguments");
774 else if (is_async
&& do_get
)
776 /* get async result */
781 fail
= PG_GETARG_BOOL(2);
783 else if (PG_NARGS() == 1)
789 /* shouldn't happen */
790 elog(ERROR
, "wrong number of arguments");
794 /* send async query */
798 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
801 /* shouldn't happen */
802 elog(ERROR
, "wrong number of arguments");
806 DBLINK_CONN_NOT_AVAIL
;
808 if (!is_async
|| (is_async
&& do_get
))
810 /* synchronous query, or async result retrieval */
812 res
= PQexec(conn
, sql
);
815 res
= PQgetResult(conn
);
816 /* NULL means we're all done with the async results */
818 SRF_RETURN_DONE(funcctx
);
822 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
823 PQresultStatus(res
) != PGRES_TUPLES_OK
))
825 dblink_res_error(conname
, res
, "could not execute query", fail
);
828 SRF_RETURN_DONE(funcctx
);
831 if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
835 /* need a tuple descriptor representing one TEXT column */
836 tupdesc
= CreateTemplateTupleDesc(1, false);
837 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "status",
841 * and save a copy of the command status string to return as
844 sql_cmd_status
= PQcmdStatus(res
);
845 funcctx
->max_calls
= 1;
848 funcctx
->max_calls
= PQntuples(res
);
850 /* got results, keep track of them */
851 funcctx
->user_fctx
= res
;
853 /* if needed, close the connection to the database and cleanup */
859 /* get a tuple descriptor for our result type */
860 switch (get_call_result_type(fcinfo
, NULL
, &tupdesc
))
862 case TYPEFUNC_COMPOSITE
:
865 case TYPEFUNC_RECORD
:
866 /* failed to determine actual type of RECORD */
868 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
869 errmsg("function returning record called in context "
870 "that cannot accept type record")));
873 /* result type isn't composite */
874 elog(ERROR
, "return type must be a row type");
878 /* make sure we have a persistent copy of the tupdesc */
879 tupdesc
= CreateTupleDescCopy(tupdesc
);
883 * check result and tuple descriptor have the same number of
886 if (PQnfields(res
) != tupdesc
->natts
)
888 (errcode(ERRCODE_DATATYPE_MISMATCH
),
889 errmsg("remote query result rowtype does not match "
890 "the specified FROM clause rowtype")));
892 /* fast track when no results */
893 if (funcctx
->max_calls
< 1)
897 SRF_RETURN_DONE(funcctx
);
900 /* store needed metadata for subsequent calls */
901 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
902 funcctx
->attinmeta
= attinmeta
;
904 MemoryContextSwitchTo(oldcontext
);
908 /* async query send */
909 MemoryContextSwitchTo(oldcontext
);
910 PG_RETURN_INT32(PQsendQuery(conn
, sql
));
914 if (is_async
&& !do_get
)
916 /* async query send -- should not happen */
917 elog(ERROR
, "async query send called more than once");
921 /* stuff done on every call of the function */
922 funcctx
= SRF_PERCALL_SETUP();
925 * initialize per-call variables
927 call_cntr
= funcctx
->call_cntr
;
928 max_calls
= funcctx
->max_calls
;
930 res
= (PGresult
*) funcctx
->user_fctx
;
931 attinmeta
= funcctx
->attinmeta
;
932 tupdesc
= attinmeta
->tupdesc
;
934 if (call_cntr
< max_calls
) /* do when there is more left to send */
943 int nfields
= PQnfields(res
);
945 values
= (char **) palloc(nfields
* sizeof(char *));
946 for (i
= 0; i
< nfields
; i
++)
948 if (PQgetisnull(res
, call_cntr
, i
) == 0)
949 values
[i
] = PQgetvalue(res
, call_cntr
, i
);
956 values
= (char **) palloc(1 * sizeof(char *));
957 values
[0] = sql_cmd_status
;
960 /* build the tuple */
961 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
963 /* make the tuple into a datum */
964 result
= HeapTupleGetDatum(tuple
);
966 SRF_RETURN_NEXT(funcctx
, result
);
970 /* do when there is no more left */
972 SRF_RETURN_DONE(funcctx
);
977 * List all open dblink connections by name.
978 * Returns an array of all connection names.
981 PG_FUNCTION_INFO_V1(dblink_get_connections
);
983 dblink_get_connections(PG_FUNCTION_ARGS
)
985 HASH_SEQ_STATUS status
;
986 remoteConnHashEnt
*hentry
;
987 ArrayBuildState
*astate
= NULL
;
991 hash_seq_init(&status
, remoteConnHash
);
992 while ((hentry
= (remoteConnHashEnt
*) hash_seq_search(&status
)) != NULL
)
994 /* stash away current value */
995 astate
= accumArrayResult(astate
,
996 CStringGetTextDatum(hentry
->name
),
997 false, TEXTOID
, CurrentMemoryContext
);
1002 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate
,
1003 CurrentMemoryContext
));
1009 * Checks if a given remote connection is busy
1011 * Returns 1 if the connection is busy, 0 otherwise
1013 * text connection_name - name of the connection to check
1016 PG_FUNCTION_INFO_V1(dblink_is_busy
);
1018 dblink_is_busy(PG_FUNCTION_ARGS
)
1020 PGconn
*conn
= NULL
;
1021 remoteConn
*rconn
= NULL
;
1024 DBLINK_GET_NAMED_CONN
;
1026 PQconsumeInput(conn
);
1027 PG_RETURN_INT32(PQisBusy(conn
));
1031 * Cancels a running request on a connection
1034 * "OK" if the cancel request has been sent correctly,
1035 * an error message otherwise
1038 * text connection_name - name of the connection to check
1041 PG_FUNCTION_INFO_V1(dblink_cancel_query
);
1043 dblink_cancel_query(PG_FUNCTION_ARGS
)
1046 PGconn
*conn
= NULL
;
1047 remoteConn
*rconn
= NULL
;
1052 DBLINK_GET_NAMED_CONN
;
1053 cancel
= PQgetCancel(conn
);
1055 res
= PQcancel(cancel
, errbuf
, 256);
1056 PQfreeCancel(cancel
);
1059 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1061 PG_RETURN_TEXT_P(cstring_to_text(errbuf
));
1066 * Get error message from a connection
1069 * "OK" if no error, an error message otherwise
1072 * text connection_name - name of the connection to check
1075 PG_FUNCTION_INFO_V1(dblink_error_message
);
1077 dblink_error_message(PG_FUNCTION_ARGS
)
1080 PGconn
*conn
= NULL
;
1081 remoteConn
*rconn
= NULL
;
1084 DBLINK_GET_NAMED_CONN
;
1086 msg
= PQerrorMessage(conn
);
1087 if (msg
== NULL
|| msg
[0] == '\0')
1088 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1090 PG_RETURN_TEXT_P(cstring_to_text(msg
));
1094 * Execute an SQL non-SELECT command
1096 PG_FUNCTION_INFO_V1(dblink_exec
);
1098 dblink_exec(PG_FUNCTION_ARGS
)
1101 PGresult
*res
= NULL
;
1102 text
*sql_cmd_status
= NULL
;
1103 TupleDesc tupdesc
= NULL
;
1104 PGconn
*conn
= NULL
;
1105 char *connstr
= NULL
;
1107 char *conname
= NULL
;
1108 remoteConn
*rconn
= NULL
;
1109 bool freeconn
= false;
1110 bool fail
= true; /* default to backward compatible behavior */
1114 if (PG_NARGS() == 3)
1116 /* must be text,text,bool */
1118 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
1119 fail
= PG_GETARG_BOOL(2);
1121 else if (PG_NARGS() == 2)
1123 /* might be text,text or text,bool */
1124 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
1127 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
1128 fail
= PG_GETARG_BOOL(1);
1133 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
1136 else if (PG_NARGS() == 1)
1138 /* must be single text argument */
1140 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
1143 /* shouldn't happen */
1144 elog(ERROR
, "wrong number of arguments");
1147 DBLINK_CONN_NOT_AVAIL
;
1149 res
= PQexec(conn
, sql
);
1151 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
1152 PQresultStatus(res
) != PGRES_TUPLES_OK
))
1154 dblink_res_error(conname
, res
, "could not execute command", fail
);
1156 /* need a tuple descriptor representing one TEXT column */
1157 tupdesc
= CreateTemplateTupleDesc(1, false);
1158 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "status",
1162 * and save a copy of the command status string to return as our
1165 sql_cmd_status
= cstring_to_text("ERROR");
1167 else if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
1169 /* need a tuple descriptor representing one TEXT column */
1170 tupdesc
= CreateTemplateTupleDesc(1, false);
1171 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "status",
1175 * and save a copy of the command status string to return as our
1178 sql_cmd_status
= cstring_to_text(PQcmdStatus(res
));
1185 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
1186 errmsg("statement returning results not allowed")));
1189 /* if needed, close the connection to the database and cleanup */
1193 PG_RETURN_TEXT_P(sql_cmd_status
);
1200 * Return list of primary key fields for the supplied relation,
1201 * or NULL if none exists.
1203 PG_FUNCTION_INFO_V1(dblink_get_pkey
);
1205 dblink_get_pkey(PG_FUNCTION_ARGS
)
1210 FuncCallContext
*funcctx
;
1213 AttInMetadata
*attinmeta
;
1214 MemoryContext oldcontext
;
1216 /* stuff done only on the first call of the function */
1217 if (SRF_IS_FIRSTCALL())
1219 TupleDesc tupdesc
= NULL
;
1221 /* create a function context for cross-call persistence */
1222 funcctx
= SRF_FIRSTCALL_INIT();
1225 * switch to memory context appropriate for multiple function calls
1227 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
1229 /* convert relname to rel Oid */
1230 relid
= get_relid_from_relname(PG_GETARG_TEXT_P(0));
1231 if (!OidIsValid(relid
))
1233 (errcode(ERRCODE_UNDEFINED_TABLE
),
1234 errmsg("relation \"%s\" does not exist",
1235 text_to_cstring(PG_GETARG_TEXT_PP(0)))));
1238 * need a tuple descriptor representing one INT and one TEXT column
1240 tupdesc
= CreateTemplateTupleDesc(2, false);
1241 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "position",
1243 TupleDescInitEntry(tupdesc
, (AttrNumber
) 2, "colname",
1247 * Generate attribute metadata needed later to produce tuples from raw
1250 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
1251 funcctx
->attinmeta
= attinmeta
;
1253 /* get an array of attnums */
1254 results
= get_pkey_attnames(relid
, &numatts
);
1256 if ((results
!= NULL
) && (numatts
> 0))
1258 funcctx
->max_calls
= numatts
;
1260 /* got results, keep track of them */
1261 funcctx
->user_fctx
= results
;
1264 /* fast track when no results */
1265 SRF_RETURN_DONE(funcctx
);
1267 MemoryContextSwitchTo(oldcontext
);
1270 /* stuff done on every call of the function */
1271 funcctx
= SRF_PERCALL_SETUP();
1274 * initialize per-call variables
1276 call_cntr
= funcctx
->call_cntr
;
1277 max_calls
= funcctx
->max_calls
;
1279 results
= (char **) funcctx
->user_fctx
;
1280 attinmeta
= funcctx
->attinmeta
;
1282 if (call_cntr
< max_calls
) /* do when there is more left to send */
1288 values
= (char **) palloc(2 * sizeof(char *));
1289 values
[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */
1291 sprintf(values
[0], "%d", call_cntr
+ 1);
1293 values
[1] = results
[call_cntr
];
1295 /* build the tuple */
1296 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
1298 /* make the tuple into a datum */
1299 result
= HeapTupleGetDatum(tuple
);
1301 SRF_RETURN_NEXT(funcctx
, result
);
1305 /* do when there is no more left */
1306 SRF_RETURN_DONE(funcctx
);
1312 * dblink_build_sql_insert
1314 * Used to generate an SQL insert statement
1315 * based on an existing tuple in a local relation.
1316 * This is useful for selectively replicating data
1317 * to another server via dblink.
1320 * <relname> - name of local table of interest
1321 * <pkattnums> - an int2vector of attnums which will be used
1322 * to identify the local tuple of interest
1323 * <pknumatts> - number of attnums in pkattnums
1324 * <src_pkattvals_arry> - text array of key values which will be used
1325 * to identify the local tuple of interest
1326 * <tgt_pkattvals_arry> - text array of key values which will be used
1327 * to build the string for execution remotely. These are substituted
1328 * for their counterparts in src_pkattvals_arry
1330 PG_FUNCTION_INFO_V1(dblink_build_sql_insert
);
1332 dblink_build_sql_insert(PG_FUNCTION_ARGS
)
1334 text
*relname_text
= PG_GETARG_TEXT_P(0);
1335 int2vector
*pkattnums
= (int2vector
*) PG_GETARG_POINTER(1);
1336 int32 pknumatts_tmp
= PG_GETARG_INT32(2);
1337 ArrayType
*src_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1338 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(4);
1340 int16 pknumatts
= 0;
1341 char **src_pkattvals
;
1342 char **tgt_pkattvals
;
1348 * Convert relname to rel OID.
1350 relid
= get_relid_from_relname(relname_text
);
1351 if (!OidIsValid(relid
))
1353 (errcode(ERRCODE_UNDEFINED_TABLE
),
1354 errmsg("relation \"%s\" does not exist",
1355 text_to_cstring(relname_text
))));
1358 * There should be at least one key attribute
1360 if (pknumatts_tmp
<= 0)
1362 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1363 errmsg("number of key attributes must be > 0")));
1365 if (pknumatts_tmp
<= SHRT_MAX
)
1366 pknumatts
= pknumatts_tmp
;
1369 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1370 errmsg("input for number of primary key " \
1371 "attributes too large")));
1374 * Source array is made up of key values that will be used to locate the
1375 * tuple of interest from the local system.
1377 src_pkattvals
= get_text_array_contents(src_pkattvals_arry
, &src_nitems
);
1380 * There should be one source array key value for each key attnum
1382 if (src_nitems
!= pknumatts
)
1384 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1385 errmsg("source key array length must match number of key " \
1389 * Target array is made up of key values that will be used to build the
1390 * SQL string for use on the remote system.
1392 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1395 * There should be one target array key value for each key attnum
1397 if (tgt_nitems
!= pknumatts
)
1399 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1400 errmsg("target key array length must match number of key " \
1404 * Prep work is finally done. Go get the SQL string.
1406 sql
= get_sql_insert(relid
, pkattnums
, pknumatts
, src_pkattvals
, tgt_pkattvals
);
1411 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1416 * dblink_build_sql_delete
1418 * Used to generate an SQL delete statement.
1419 * This is useful for selectively replicating a
1420 * delete to another server via dblink.
1423 * <relname> - name of remote table of interest
1424 * <pkattnums> - an int2vector of attnums which will be used
1425 * to identify the remote tuple of interest
1426 * <pknumatts> - number of attnums in pkattnums
1427 * <tgt_pkattvals_arry> - text array of key values which will be used
1428 * to build the string for execution remotely.
1430 PG_FUNCTION_INFO_V1(dblink_build_sql_delete
);
1432 dblink_build_sql_delete(PG_FUNCTION_ARGS
)
1434 text
*relname_text
= PG_GETARG_TEXT_P(0);
1435 int2vector
*pkattnums
= (int2vector
*) PG_GETARG_POINTER(1);
1436 int32 pknumatts_tmp
= PG_GETARG_INT32(2);
1437 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1439 int16 pknumatts
= 0;
1440 char **tgt_pkattvals
;
1445 * Convert relname to rel OID.
1447 relid
= get_relid_from_relname(relname_text
);
1448 if (!OidIsValid(relid
))
1450 (errcode(ERRCODE_UNDEFINED_TABLE
),
1451 errmsg("relation \"%s\" does not exist",
1452 text_to_cstring(relname_text
))));
1455 * There should be at least one key attribute
1457 if (pknumatts_tmp
<= 0)
1459 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1460 errmsg("number of key attributes must be > 0")));
1462 if (pknumatts_tmp
<= SHRT_MAX
)
1463 pknumatts
= pknumatts_tmp
;
1466 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1467 errmsg("input for number of primary key " \
1468 "attributes too large")));
1471 * Target array is made up of key values that will be used to build the
1472 * SQL string for use on the remote system.
1474 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1477 * There should be one target array key value for each key attnum
1479 if (tgt_nitems
!= pknumatts
)
1481 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1482 errmsg("target key array length must match number of key " \
1486 * Prep work is finally done. Go get the SQL string.
1488 sql
= get_sql_delete(relid
, pkattnums
, pknumatts
, tgt_pkattvals
);
1493 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1498 * dblink_build_sql_update
1500 * Used to generate an SQL update statement
1501 * based on an existing tuple in a local relation.
1502 * This is useful for selectively replicating data
1503 * to another server via dblink.
1506 * <relname> - name of local table of interest
1507 * <pkattnums> - an int2vector of attnums which will be used
1508 * to identify the local tuple of interest
1509 * <pknumatts> - number of attnums in pkattnums
1510 * <src_pkattvals_arry> - text array of key values which will be used
1511 * to identify the local tuple of interest
1512 * <tgt_pkattvals_arry> - text array of key values which will be used
1513 * to build the string for execution remotely. These are substituted
1514 * for their counterparts in src_pkattvals_arry
1516 PG_FUNCTION_INFO_V1(dblink_build_sql_update
);
1518 dblink_build_sql_update(PG_FUNCTION_ARGS
)
1520 text
*relname_text
= PG_GETARG_TEXT_P(0);
1521 int2vector
*pkattnums
= (int2vector
*) PG_GETARG_POINTER(1);
1522 int32 pknumatts_tmp
= PG_GETARG_INT32(2);
1523 ArrayType
*src_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1524 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(4);
1526 int16 pknumatts
= 0;
1527 char **src_pkattvals
;
1528 char **tgt_pkattvals
;
1534 * Convert relname to rel OID.
1536 relid
= get_relid_from_relname(relname_text
);
1537 if (!OidIsValid(relid
))
1539 (errcode(ERRCODE_UNDEFINED_TABLE
),
1540 errmsg("relation \"%s\" does not exist",
1541 text_to_cstring(relname_text
))));
1544 * There should be one source array key values for each key attnum
1546 if (pknumatts_tmp
<= 0)
1548 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1549 errmsg("number of key attributes must be > 0")));
1551 if (pknumatts_tmp
<= SHRT_MAX
)
1552 pknumatts
= pknumatts_tmp
;
1555 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1556 errmsg("input for number of primary key " \
1557 "attributes too large")));
1560 * Source array is made up of key values that will be used to locate the
1561 * tuple of interest from the local system.
1563 src_pkattvals
= get_text_array_contents(src_pkattvals_arry
, &src_nitems
);
1566 * There should be one source array key value for each key attnum
1568 if (src_nitems
!= pknumatts
)
1570 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1571 errmsg("source key array length must match number of key " \
1575 * Target array is made up of key values that will be used to build the
1576 * SQL string for use on the remote system.
1578 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1581 * There should be one target array key value for each key attnum
1583 if (tgt_nitems
!= pknumatts
)
1585 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1586 errmsg("target key array length must match number of key " \
1590 * Prep work is finally done. Go get the SQL string.
1592 sql
= get_sql_update(relid
, pkattnums
, pknumatts
, src_pkattvals
, tgt_pkattvals
);
1597 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1600 /*************************************************************
1601 * internal functions
1608 * Get the primary key attnames for the given relation.
1609 * Return NULL, and set numatts = 0, if no primary key exists.
1612 get_pkey_attnames(Oid relid
, int16
*numatts
)
1614 Relation indexRelation
;
1617 HeapTuple indexTuple
;
1619 char **result
= NULL
;
1622 AclResult aclresult
;
1624 /* initialize numatts to 0 in case no primary key exists */
1627 /* open relation using relid, check permissions, get tupdesc */
1628 rel
= relation_open(relid
, AccessShareLock
);
1630 aclresult
= pg_class_aclcheck(RelationGetRelid(rel
), GetUserId(),
1632 if (aclresult
!= ACLCHECK_OK
)
1633 aclcheck_error(aclresult
, ACL_KIND_CLASS
,
1634 RelationGetRelationName(rel
));
1636 tupdesc
= rel
->rd_att
;
1638 /* Prepare to scan pg_index for entries having indrelid = this rel. */
1639 indexRelation
= heap_open(IndexRelationId
, AccessShareLock
);
1641 Anum_pg_index_indrelid
,
1642 BTEqualStrategyNumber
, F_OIDEQ
,
1643 ObjectIdGetDatum(relid
));
1645 scan
= systable_beginscan(indexRelation
, IndexIndrelidIndexId
, true,
1646 SnapshotNow
, 1, &skey
);
1648 while (HeapTupleIsValid(indexTuple
= systable_getnext(scan
)))
1650 Form_pg_index index
= (Form_pg_index
) GETSTRUCT(indexTuple
);
1652 /* we're only interested if it is the primary key */
1653 if (index
->indisprimary
)
1655 *numatts
= index
->indnatts
;
1658 result
= (char **) palloc(*numatts
* sizeof(char *));
1660 for (i
= 0; i
< *numatts
; i
++)
1661 result
[i
] = SPI_fname(tupdesc
, index
->indkey
.values
[i
]);
1667 systable_endscan(scan
);
1668 heap_close(indexRelation
, AccessShareLock
);
1669 relation_close(rel
, AccessShareLock
);
1675 * Deconstruct a text[] into C-strings (note any NULL elements will be
1676 * returned as NULL pointers)
1679 get_text_array_contents(ArrayType
*array
, int *numitems
)
1681 int ndim
= ARR_NDIM(array
);
1682 int *dims
= ARR_DIMS(array
);
1693 Assert(ARR_ELEMTYPE(array
) == TEXTOID
);
1695 *numitems
= nitems
= ArrayGetNItems(ndim
, dims
);
1697 get_typlenbyvalalign(ARR_ELEMTYPE(array
),
1698 &typlen
, &typbyval
, &typalign
);
1700 values
= (char **) palloc(nitems
* sizeof(char *));
1702 ptr
= ARR_DATA_PTR(array
);
1703 bitmap
= ARR_NULLBITMAP(array
);
1706 for (i
= 0; i
< nitems
; i
++)
1708 if (bitmap
&& (*bitmap
& bitmask
) == 0)
1714 values
[i
] = TextDatumGetCString(PointerGetDatum(ptr
));
1715 ptr
= att_addlength_pointer(ptr
, typlen
, ptr
);
1716 ptr
= (char *) att_align_nominal(ptr
, typalign
);
1719 /* advance bitmap pointer if any */
1723 if (bitmask
== 0x100)
1735 get_sql_insert(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
)
1748 initStringInfo(&buf
);
1750 /* get relation name including any needed schema prefix and quoting */
1751 relname
= generate_relation_name(relid
);
1754 * Open relation using relid
1756 rel
= relation_open(relid
, AccessShareLock
);
1757 tupdesc
= rel
->rd_att
;
1758 natts
= tupdesc
->natts
;
1760 tuple
= get_tuple_of_interest(relid
, pkattnums
, pknumatts
, src_pkattvals
);
1763 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
1764 errmsg("source row not found")));
1766 appendStringInfo(&buf
, "INSERT INTO %s(", relname
);
1769 for (i
= 0; i
< natts
; i
++)
1771 if (tupdesc
->attrs
[i
]->attisdropped
)
1775 appendStringInfo(&buf
, ",");
1777 appendStringInfoString(&buf
,
1778 quote_ident_cstr(NameStr(tupdesc
->attrs
[i
]->attname
)));
1782 appendStringInfo(&buf
, ") VALUES(");
1785 * remember attvals are 1 based
1788 for (i
= 0; i
< natts
; i
++)
1790 if (tupdesc
->attrs
[i
]->attisdropped
)
1794 appendStringInfo(&buf
, ",");
1796 if (tgt_pkattvals
!= NULL
)
1797 key
= get_attnum_pk_pos(pkattnums
, pknumatts
, i
+ 1);
1802 val
= tgt_pkattvals
[key
] ? pstrdup(tgt_pkattvals
[key
]) : NULL
;
1804 val
= SPI_getvalue(tuple
, tupdesc
, i
+ 1);
1808 appendStringInfoString(&buf
, quote_literal_cstr(val
));
1812 appendStringInfo(&buf
, "NULL");
1815 appendStringInfo(&buf
, ")");
1817 relation_close(rel
, AccessShareLock
);
1822 get_sql_delete(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **tgt_pkattvals
)
1831 initStringInfo(&buf
);
1833 /* get relation name including any needed schema prefix and quoting */
1834 relname
= generate_relation_name(relid
);
1837 * Open relation using relid
1839 rel
= relation_open(relid
, AccessShareLock
);
1840 tupdesc
= rel
->rd_att
;
1841 natts
= tupdesc
->natts
;
1843 appendStringInfo(&buf
, "DELETE FROM %s WHERE ", relname
);
1844 for (i
= 0; i
< pknumatts
; i
++)
1846 int16 pkattnum
= pkattnums
->values
[i
];
1849 appendStringInfo(&buf
, " AND ");
1851 appendStringInfoString(&buf
,
1852 quote_ident_cstr(NameStr(tupdesc
->attrs
[pkattnum
- 1]->attname
)));
1854 if (tgt_pkattvals
== NULL
)
1855 /* internal error */
1856 elog(ERROR
, "target key array must not be NULL");
1858 if (tgt_pkattvals
[i
] != NULL
)
1859 appendStringInfo(&buf
, " = %s",
1860 quote_literal_cstr(tgt_pkattvals
[i
]));
1862 appendStringInfo(&buf
, " IS NULL");
1865 relation_close(rel
, AccessShareLock
);
1870 get_sql_update(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
)
1883 initStringInfo(&buf
);
1885 /* get relation name including any needed schema prefix and quoting */
1886 relname
= generate_relation_name(relid
);
1889 * Open relation using relid
1891 rel
= relation_open(relid
, AccessShareLock
);
1892 tupdesc
= rel
->rd_att
;
1893 natts
= tupdesc
->natts
;
1895 tuple
= get_tuple_of_interest(relid
, pkattnums
, pknumatts
, src_pkattvals
);
1898 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
1899 errmsg("source row not found")));
1901 appendStringInfo(&buf
, "UPDATE %s SET ", relname
);
1904 for (i
= 0; i
< natts
; i
++)
1906 if (tupdesc
->attrs
[i
]->attisdropped
)
1910 appendStringInfo(&buf
, ", ");
1912 appendStringInfo(&buf
, "%s = ",
1913 quote_ident_cstr(NameStr(tupdesc
->attrs
[i
]->attname
)));
1915 if (tgt_pkattvals
!= NULL
)
1916 key
= get_attnum_pk_pos(pkattnums
, pknumatts
, i
+ 1);
1921 val
= tgt_pkattvals
[key
] ? pstrdup(tgt_pkattvals
[key
]) : NULL
;
1923 val
= SPI_getvalue(tuple
, tupdesc
, i
+ 1);
1927 appendStringInfoString(&buf
, quote_literal_cstr(val
));
1931 appendStringInfoString(&buf
, "NULL");
1935 appendStringInfo(&buf
, " WHERE ");
1937 for (i
= 0; i
< pknumatts
; i
++)
1939 int16 pkattnum
= pkattnums
->values
[i
];
1942 appendStringInfo(&buf
, " AND ");
1944 appendStringInfo(&buf
, "%s",
1945 quote_ident_cstr(NameStr(tupdesc
->attrs
[pkattnum
- 1]->attname
)));
1947 if (tgt_pkattvals
!= NULL
)
1948 val
= tgt_pkattvals
[i
] ? pstrdup(tgt_pkattvals
[i
]) : NULL
;
1950 val
= SPI_getvalue(tuple
, tupdesc
, pkattnum
);
1954 appendStringInfo(&buf
, " = %s", quote_literal_cstr(val
));
1958 appendStringInfo(&buf
, " IS NULL");
1961 relation_close(rel
, AccessShareLock
);
1966 * Return a properly quoted literal value.
1967 * Uses quote_literal in quote.c
1970 quote_literal_cstr(char *rawstr
)
1976 rawstr_text
= cstring_to_text(rawstr
);
1977 result_text
= DatumGetTextP(DirectFunctionCall1(quote_literal
,
1978 PointerGetDatum(rawstr_text
)));
1979 result
= text_to_cstring(result_text
);
1985 * Return a properly quoted identifier.
1986 * Uses quote_ident in quote.c
1989 quote_ident_cstr(char *rawstr
)
1995 rawstr_text
= cstring_to_text(rawstr
);
1996 result_text
= DatumGetTextP(DirectFunctionCall1(quote_ident
,
1997 PointerGetDatum(rawstr_text
)));
1998 result
= text_to_cstring(result_text
);
2004 get_attnum_pk_pos(int2vector
*pkattnums
, int16 pknumatts
, int16 key
)
2009 * Not likely a long list anyway, so just scan for the value
2011 for (i
= 0; i
< pknumatts
; i
++)
2012 if (key
== pkattnums
->values
[i
])
2019 get_tuple_of_interest(Oid relid
, int2vector
*pkattnums
, int16 pknumatts
, char **src_pkattvals
)
2029 initStringInfo(&buf
);
2031 /* get relation name including any needed schema prefix and quoting */
2032 relname
= generate_relation_name(relid
);
2035 * Open relation using relid
2037 rel
= relation_open(relid
, AccessShareLock
);
2038 tupdesc
= CreateTupleDescCopy(rel
->rd_att
);
2039 relation_close(rel
, AccessShareLock
);
2042 * Connect to SPI manager
2044 if ((ret
= SPI_connect()) < 0)
2045 /* internal error */
2046 elog(ERROR
, "SPI connect failure - returned %d", ret
);
2049 * Build sql statement to look up tuple of interest Use src_pkattvals as
2052 appendStringInfo(&buf
, "SELECT * FROM %s WHERE ", relname
);
2054 for (i
= 0; i
< pknumatts
; i
++)
2056 int16 pkattnum
= pkattnums
->values
[i
];
2059 appendStringInfo(&buf
, " AND ");
2061 appendStringInfoString(&buf
,
2062 quote_ident_cstr(NameStr(tupdesc
->attrs
[pkattnum
- 1]->attname
)));
2064 if (src_pkattvals
[i
] != NULL
)
2065 appendStringInfo(&buf
, " = %s",
2066 quote_literal_cstr(src_pkattvals
[i
]));
2068 appendStringInfo(&buf
, " IS NULL");
2072 * Retrieve the desired tuple
2074 ret
= SPI_exec(buf
.data
, 0);
2078 * Only allow one qualifying tuple
2080 if ((ret
== SPI_OK_SELECT
) && (SPI_processed
> 1))
2082 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
2083 errmsg("source criteria matched more than one record")));
2085 else if (ret
== SPI_OK_SELECT
&& SPI_processed
== 1)
2087 SPITupleTable
*tuptable
= SPI_tuptable
;
2089 tuple
= SPI_copytuple(tuptable
->vals
[0]);
2097 * no qualifying tuples
2105 * never reached, but keep compiler quiet
2111 get_relid_from_relname(text
*relname_text
)
2117 relvar
= makeRangeVarFromNameList(textToQualifiedNameList(relname_text
));
2118 rel
= heap_openrv(relvar
, AccessShareLock
);
2119 relid
= RelationGetRelid(rel
);
2120 relation_close(rel
, AccessShareLock
);
2126 * generate_relation_name - copied from ruleutils.c
2127 * Compute the name to display for a relation specified by OID
2129 * The result includes all necessary quoting and schema-prefixing.
2132 generate_relation_name(Oid relid
)
2135 Form_pg_class reltup
;
2139 tp
= SearchSysCache(RELOID
,
2140 ObjectIdGetDatum(relid
),
2142 if (!HeapTupleIsValid(tp
))
2143 elog(ERROR
, "cache lookup failed for relation %u", relid
);
2145 reltup
= (Form_pg_class
) GETSTRUCT(tp
);
2147 /* Qualify the name if not visible in search path */
2148 if (RelationIsVisible(relid
))
2151 nspname
= get_namespace_name(reltup
->relnamespace
);
2153 result
= quote_qualified_identifier(nspname
, NameStr(reltup
->relname
));
2155 ReleaseSysCache(tp
);
2162 getConnectionByName(const char *name
)
2164 remoteConnHashEnt
*hentry
;
2165 char key
[NAMEDATALEN
];
2167 if (!remoteConnHash
)
2168 remoteConnHash
= createConnHash();
2170 MemSet(key
, 0, NAMEDATALEN
);
2171 snprintf(key
, NAMEDATALEN
- 1, "%s", name
);
2172 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
,
2173 key
, HASH_FIND
, NULL
);
2176 return (hentry
->rconn
);
2182 createConnHash(void)
2186 ctl
.keysize
= NAMEDATALEN
;
2187 ctl
.entrysize
= sizeof(remoteConnHashEnt
);
2189 return hash_create("Remote Con hash", NUMCONN
, &ctl
, HASH_ELEM
);
2193 createNewConnection(const char *name
, remoteConn
* rconn
)
2195 remoteConnHashEnt
*hentry
;
2197 char key
[NAMEDATALEN
];
2199 if (!remoteConnHash
)
2200 remoteConnHash
= createConnHash();
2202 MemSet(key
, 0, NAMEDATALEN
);
2203 snprintf(key
, NAMEDATALEN
- 1, "%s", name
);
2204 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
, key
,
2205 HASH_ENTER
, &found
);
2209 (errcode(ERRCODE_DUPLICATE_OBJECT
),
2210 errmsg("duplicate connection name")));
2212 hentry
->rconn
= rconn
;
2213 strlcpy(hentry
->name
, name
, sizeof(hentry
->name
));
2217 deleteConnection(const char *name
)
2219 remoteConnHashEnt
*hentry
;
2221 char key
[NAMEDATALEN
];
2223 if (!remoteConnHash
)
2224 remoteConnHash
= createConnHash();
2226 MemSet(key
, 0, NAMEDATALEN
);
2227 snprintf(key
, NAMEDATALEN
- 1, "%s", name
);
2229 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
,
2230 key
, HASH_REMOVE
, &found
);
2234 (errcode(ERRCODE_UNDEFINED_OBJECT
),
2235 errmsg("undefined connection name")));
2240 dblink_security_check(PGconn
*conn
, remoteConn
*rconn
)
2244 if (!PQconnectionUsedPassword(conn
))
2251 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
2252 errmsg("password is required"),
2253 errdetail("Non-superuser cannot connect if the server does not request a password."),
2254 errhint("Target server's authentication method must be changed.")));
2260 * For non-superusers, insist that the connstr specify a password. This
2261 * prevents a password from being picked up from .pgpass, a service file,
2262 * the environment, etc. We don't want the postgres user's passwords
2263 * to be accessible to non-superusers.
2266 dblink_connstr_check(const char *connstr
)
2270 PQconninfoOption
*options
;
2271 PQconninfoOption
*option
;
2272 bool connstr_gives_password
= false;
2274 options
= PQconninfoParse(connstr
, NULL
);
2277 for (option
= options
; option
->keyword
!= NULL
; option
++)
2279 if (strcmp(option
->keyword
, "password") == 0)
2281 if (option
->val
!= NULL
&& option
->val
[0] != '\0')
2283 connstr_gives_password
= true;
2288 PQconninfoFree(options
);
2291 if (!connstr_gives_password
)
2293 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
2294 errmsg("password is required"),
2295 errdetail("Non-superusers must provide a password in the connection string.")));
2300 dblink_res_error(const char *conname
, PGresult
*res
, const char *dblink_context_msg
, bool fail
)
2303 char *pg_diag_sqlstate
= PQresultErrorField(res
, PG_DIAG_SQLSTATE
);
2304 char *pg_diag_message_primary
= PQresultErrorField(res
, PG_DIAG_MESSAGE_PRIMARY
);
2305 char *pg_diag_message_detail
= PQresultErrorField(res
, PG_DIAG_MESSAGE_DETAIL
);
2306 char *pg_diag_message_hint
= PQresultErrorField(res
, PG_DIAG_MESSAGE_HINT
);
2307 char *pg_diag_context
= PQresultErrorField(res
, PG_DIAG_CONTEXT
);
2309 char *message_primary
;
2310 char *message_detail
;
2312 char *message_context
;
2313 const char *dblink_context_conname
= "unnamed";
2320 if (pg_diag_sqlstate
)
2321 sqlstate
= MAKE_SQLSTATE(pg_diag_sqlstate
[0],
2322 pg_diag_sqlstate
[1],
2323 pg_diag_sqlstate
[2],
2324 pg_diag_sqlstate
[3],
2325 pg_diag_sqlstate
[4]);
2327 sqlstate
= ERRCODE_CONNECTION_FAILURE
;
2329 xpstrdup(message_primary
, pg_diag_message_primary
);
2330 xpstrdup(message_detail
, pg_diag_message_detail
);
2331 xpstrdup(message_hint
, pg_diag_message_hint
);
2332 xpstrdup(message_context
, pg_diag_context
);
2338 dblink_context_conname
= conname
;
2342 message_primary
? errmsg("%s", message_primary
) : errmsg("unknown error"),
2343 message_detail
? errdetail("%s", message_detail
) : 0,
2344 message_hint
? errhint("%s", message_hint
) : 0,
2345 message_context
? errcontext("%s", message_context
) : 0,
2346 errcontext("Error occurred on dblink connection named \"%s\": %s.",
2347 dblink_context_conname
, dblink_context_msg
)));