1 /*-------------------------------------------------------------------------
4 * Connection management functions for postgres_fdw
6 * Portions Copyright (c) 2012-2025, PostgreSQL Global Development Group
9 * contrib/postgres_fdw/connection.c
11 *-------------------------------------------------------------------------
19 #include "access/xact.h"
20 #include "catalog/pg_user_mapping.h"
21 #include "commands/defrem.h"
22 #include "common/base64.h"
24 #include "libpq/libpq-be.h"
25 #include "libpq/libpq-be-fe-helpers.h"
26 #include "mb/pg_wchar.h"
27 #include "miscadmin.h"
29 #include "postgres_fdw.h"
30 #include "storage/latch.h"
31 #include "utils/builtins.h"
32 #include "utils/hsearch.h"
33 #include "utils/inval.h"
34 #include "utils/syscache.h"
37 * Connection cache hash table entry
39 * The lookup key in this hash table is the user mapping OID. We use just one
40 * connection per user mapping ID, which ensures that all the scans use the
41 * same snapshot during a query. Using the user mapping OID rather than
42 * the foreign server OID + user OID avoids creating multiple connections when
43 * the public user mapping applies to all user OIDs.
45 * The "conn" pointer can be NULL if we don't currently have a live connection.
46 * When we do have a connection, xact_depth tracks the current depth of
47 * transactions and subtransactions open on the remote side. We need to issue
48 * commands at the same nesting depth on the remote as we're executing at
49 * ourselves, so that rolling back a subtransaction will kill the right
50 * queries and not the wrong ones.
52 typedef Oid ConnCacheKey
;
54 typedef struct ConnCacheEntry
56 ConnCacheKey key
; /* hash key (must be first) */
57 PGconn
*conn
; /* connection to foreign server, or NULL */
58 /* Remaining fields are invalid when conn is NULL: */
59 int xact_depth
; /* 0 = no xact open, 1 = main xact open, 2 =
60 * one level of subxact open, etc */
61 bool have_prep_stmt
; /* have we prepared any stmts in this xact? */
62 bool have_error
; /* have any subxacts aborted in this xact? */
63 bool changing_xact_state
; /* xact state change in process */
64 bool parallel_commit
; /* do we commit (sub)xacts in parallel? */
65 bool parallel_abort
; /* do we abort (sub)xacts in parallel? */
66 bool invalidated
; /* true if reconnect is pending */
67 bool keep_connections
; /* setting value of keep_connections
69 Oid serverid
; /* foreign server OID used to get server name */
70 uint32 server_hashvalue
; /* hash value of foreign server OID */
71 uint32 mapping_hashvalue
; /* hash value of user mapping OID */
72 PgFdwConnState state
; /* extra per-connection state */
76 * Connection cache (initialized on first use)
78 static HTAB
*ConnectionHash
= NULL
;
80 /* for assigning cursor numbers and prepared statement numbers */
81 static unsigned int cursor_number
= 0;
82 static unsigned int prep_stmt_number
= 0;
84 /* tracks whether any work is needed in callback functions */
85 static bool xact_got_connection
= false;
87 /* custom wait event values, retrieved from shared memory */
88 static uint32 pgfdw_we_cleanup_result
= 0;
89 static uint32 pgfdw_we_connect
= 0;
90 static uint32 pgfdw_we_get_result
= 0;
93 * Milliseconds to wait to cancel an in-progress query or execute a cleanup
94 * query; if it takes longer than 30 seconds to do these, we assume the
97 #define CONNECTION_CLEANUP_TIMEOUT 30000
100 * Milliseconds to wait before issuing another cancel request. This covers
101 * the race condition where the remote session ignored our cancel request
102 * because it arrived while idle.
104 #define RETRY_CANCEL_TIMEOUT 1000
106 /* Macro for constructing abort command to be sent */
107 #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
110 snprintf((sql), sizeof(sql), \
111 "ABORT TRANSACTION"); \
113 snprintf((sql), sizeof(sql), \
114 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
115 (entry)->xact_depth, (entry)->xact_depth); \
119 * Extension version number, for supporting older extension versions' objects
130 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections
);
131 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2
);
132 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect
);
133 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all
);
135 /* prototypes of private functions */
136 static void make_new_connection(ConnCacheEntry
*entry
, UserMapping
*user
);
137 static PGconn
*connect_pg_server(ForeignServer
*server
, UserMapping
*user
);
138 static void disconnect_pg_server(ConnCacheEntry
*entry
);
139 static void check_conn_params(const char **keywords
, const char **values
, UserMapping
*user
);
140 static void configure_remote_session(PGconn
*conn
);
141 static void do_sql_command_begin(PGconn
*conn
, const char *sql
);
142 static void do_sql_command_end(PGconn
*conn
, const char *sql
,
144 static void begin_remote_xact(ConnCacheEntry
*entry
);
145 static void pgfdw_xact_callback(XactEvent event
, void *arg
);
146 static void pgfdw_subxact_callback(SubXactEvent event
,
147 SubTransactionId mySubid
,
148 SubTransactionId parentSubid
,
150 static void pgfdw_inval_callback(Datum arg
, int cacheid
, uint32 hashvalue
);
151 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry
*entry
);
152 static void pgfdw_reset_xact_state(ConnCacheEntry
*entry
, bool toplevel
);
153 static bool pgfdw_cancel_query(PGconn
*conn
);
154 static bool pgfdw_cancel_query_begin(PGconn
*conn
, TimestampTz endtime
);
155 static bool pgfdw_cancel_query_end(PGconn
*conn
, TimestampTz endtime
,
156 TimestampTz retrycanceltime
,
158 static bool pgfdw_exec_cleanup_query(PGconn
*conn
, const char *query
,
160 static bool pgfdw_exec_cleanup_query_begin(PGconn
*conn
, const char *query
);
161 static bool pgfdw_exec_cleanup_query_end(PGconn
*conn
, const char *query
,
165 static bool pgfdw_get_cleanup_result(PGconn
*conn
, TimestampTz endtime
,
166 TimestampTz retrycanceltime
,
167 PGresult
**result
, bool *timed_out
);
168 static void pgfdw_abort_cleanup(ConnCacheEntry
*entry
, bool toplevel
);
169 static bool pgfdw_abort_cleanup_begin(ConnCacheEntry
*entry
, bool toplevel
,
170 List
**pending_entries
,
171 List
**cancel_requested
);
172 static void pgfdw_finish_pre_commit_cleanup(List
*pending_entries
);
173 static void pgfdw_finish_pre_subcommit_cleanup(List
*pending_entries
,
175 static void pgfdw_finish_abort_cleanup(List
*pending_entries
,
176 List
*cancel_requested
,
178 static void pgfdw_security_check(const char **keywords
, const char **values
,
179 UserMapping
*user
, PGconn
*conn
);
180 static bool UserMappingPasswordRequired(UserMapping
*user
);
181 static bool UseScramPassthrough(ForeignServer
*server
, UserMapping
*user
);
182 static bool disconnect_cached_connections(Oid serverid
);
183 static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo
,
184 enum pgfdwVersion api_version
);
185 static int pgfdw_conn_check(PGconn
*conn
);
186 static bool pgfdw_conn_checkable(void);
189 * Get a PGconn which can be used to execute queries on the remote PostgreSQL
190 * server with the user's authorization. A new connection is established
191 * if we don't already have a suitable one, and a transaction is opened at
192 * the right subtransaction nesting depth if we didn't do that already.
194 * will_prep_stmt must be true if caller intends to create any prepared
195 * statements. Since those don't go away automatically at transaction end
196 * (not even on error), we need this flag to cue manual cleanup.
198 * If state is not NULL, *state receives the per-connection state associated
202 GetConnection(UserMapping
*user
, bool will_prep_stmt
, PgFdwConnState
**state
)
206 ConnCacheEntry
*entry
;
208 MemoryContext ccxt
= CurrentMemoryContext
;
210 /* First time through, initialize connection cache hashtable */
211 if (ConnectionHash
== NULL
)
215 if (pgfdw_we_get_result
== 0)
216 pgfdw_we_get_result
=
217 WaitEventExtensionNew("PostgresFdwGetResult");
219 ctl
.keysize
= sizeof(ConnCacheKey
);
220 ctl
.entrysize
= sizeof(ConnCacheEntry
);
221 ConnectionHash
= hash_create("postgres_fdw connections", 8,
223 HASH_ELEM
| HASH_BLOBS
);
226 * Register some callback functions that manage connection cleanup.
227 * This should be done just once in each backend.
229 RegisterXactCallback(pgfdw_xact_callback
, NULL
);
230 RegisterSubXactCallback(pgfdw_subxact_callback
, NULL
);
231 CacheRegisterSyscacheCallback(FOREIGNSERVEROID
,
232 pgfdw_inval_callback
, (Datum
) 0);
233 CacheRegisterSyscacheCallback(USERMAPPINGOID
,
234 pgfdw_inval_callback
, (Datum
) 0);
237 /* Set flag that we did GetConnection during the current transaction */
238 xact_got_connection
= true;
240 /* Create hash key for the entry. Assume no pad bytes in key struct */
244 * Find or create cached entry for requested connection.
246 entry
= hash_search(ConnectionHash
, &key
, HASH_ENTER
, &found
);
250 * We need only clear "conn" here; remaining fields will be filled
251 * later when "conn" is set.
256 /* Reject further use of connections which failed abort cleanup. */
257 pgfdw_reject_incomplete_xact_state_change(entry
);
260 * If the connection needs to be remade due to invalidation, disconnect as
261 * soon as we're out of all transactions.
263 if (entry
->conn
!= NULL
&& entry
->invalidated
&& entry
->xact_depth
== 0)
265 elog(DEBUG3
, "closing connection %p for option changes to take effect",
267 disconnect_pg_server(entry
);
271 * If cache entry doesn't have a connection, we have to establish a new
272 * connection. (If connect_pg_server throws an error, the cache entry
273 * will remain in a valid empty state, ie conn == NULL.)
275 if (entry
->conn
== NULL
)
276 make_new_connection(entry
, user
);
279 * We check the health of the cached connection here when using it. In
280 * cases where we're out of all transactions, if a broken connection is
281 * detected, we try to reestablish a new connection later.
285 /* Process a pending asynchronous request if any. */
286 if (entry
->state
.pendingAreq
)
287 process_pending_request(entry
->state
.pendingAreq
);
288 /* Start a new transaction or subtransaction if needed. */
289 begin_remote_xact(entry
);
293 MemoryContext ecxt
= MemoryContextSwitchTo(ccxt
);
294 ErrorData
*errdata
= CopyErrorData();
297 * Determine whether to try to reestablish the connection.
299 * After a broken connection is detected in libpq, any error other
300 * than connection failure (e.g., out-of-memory) can be thrown
301 * somewhere between return from libpq and the expected ereport() call
302 * in pgfdw_report_error(). In this case, since PQstatus() indicates
303 * CONNECTION_BAD, checking only PQstatus() causes the false detection
304 * of connection failure. To avoid this, we also verify that the
305 * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
306 * checking only the sqlstate can cause another false detection
307 * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
308 * for any libpq-originated error condition.
310 if (errdata
->sqlerrcode
!= ERRCODE_CONNECTION_FAILURE
||
311 PQstatus(entry
->conn
) != CONNECTION_BAD
||
312 entry
->xact_depth
> 0)
314 MemoryContextSwitchTo(ecxt
);
318 /* Clean up the error state */
320 FreeErrorData(errdata
);
328 * If a broken connection is detected, disconnect it, reestablish a new
329 * connection and retry a new remote transaction. If connection failure is
330 * reported again, we give up getting a connection.
334 Assert(entry
->xact_depth
== 0);
337 (errmsg_internal("could not start remote transaction on connection %p",
339 errdetail_internal("%s", pchomp(PQerrorMessage(entry
->conn
))));
341 elog(DEBUG3
, "closing connection %p to reestablish a new one",
343 disconnect_pg_server(entry
);
345 make_new_connection(entry
, user
);
347 begin_remote_xact(entry
);
350 /* Remember if caller will prepare statements */
351 entry
->have_prep_stmt
|= will_prep_stmt
;
353 /* If caller needs access to the per-connection state, return it. */
355 *state
= &entry
->state
;
361 * Reset all transient state fields in the cached connection entry and
362 * establish new connection to the remote server.
365 make_new_connection(ConnCacheEntry
*entry
, UserMapping
*user
)
367 ForeignServer
*server
= GetForeignServer(user
->serverid
);
370 Assert(entry
->conn
== NULL
);
372 /* Reset all transient state fields, to be sure all are clean */
373 entry
->xact_depth
= 0;
374 entry
->have_prep_stmt
= false;
375 entry
->have_error
= false;
376 entry
->changing_xact_state
= false;
377 entry
->invalidated
= false;
378 entry
->serverid
= server
->serverid
;
379 entry
->server_hashvalue
=
380 GetSysCacheHashValue1(FOREIGNSERVEROID
,
381 ObjectIdGetDatum(server
->serverid
));
382 entry
->mapping_hashvalue
=
383 GetSysCacheHashValue1(USERMAPPINGOID
,
384 ObjectIdGetDatum(user
->umid
));
385 memset(&entry
->state
, 0, sizeof(entry
->state
));
388 * Determine whether to keep the connection that we're about to make here
389 * open even after the transaction using it ends, so that the subsequent
390 * transactions can re-use it.
392 * By default, all the connections to any foreign servers are kept open.
394 * Also determine whether to commit/abort (sub)transactions opened on the
395 * remote server in parallel at (sub)transaction end, which is disabled by
398 * Note: it's enough to determine these only when making a new connection
399 * because if these settings for it are changed, it will be closed and
402 entry
->keep_connections
= true;
403 entry
->parallel_commit
= false;
404 entry
->parallel_abort
= false;
405 foreach(lc
, server
->options
)
407 DefElem
*def
= (DefElem
*) lfirst(lc
);
409 if (strcmp(def
->defname
, "keep_connections") == 0)
410 entry
->keep_connections
= defGetBoolean(def
);
411 else if (strcmp(def
->defname
, "parallel_commit") == 0)
412 entry
->parallel_commit
= defGetBoolean(def
);
413 else if (strcmp(def
->defname
, "parallel_abort") == 0)
414 entry
->parallel_abort
= defGetBoolean(def
);
417 /* Now try to make the connection */
418 entry
->conn
= connect_pg_server(server
, user
);
420 elog(DEBUG3
, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
421 entry
->conn
, server
->servername
, user
->umid
, user
->userid
);
425 * Check that non-superuser has used password or delegated credentials
426 * to establish connection; otherwise, he's piggybacking on the
427 * postgres server's user identity. See also dblink_security_check()
428 * in contrib/dblink and check_conn_params.
431 pgfdw_security_check(const char **keywords
, const char **values
, UserMapping
*user
, PGconn
*conn
)
433 /* Superusers bypass the check */
434 if (superuser_arg(user
->userid
))
438 /* Connected via GSSAPI with delegated credentials- all good. */
439 if (PQconnectionUsedGSSAPI(conn
) && be_gssapi_get_delegation(MyProcPort
))
443 /* Ok if superuser set PW required false. */
444 if (!UserMappingPasswordRequired(user
))
447 /* Connected via PW, with PW required true, and provided non-empty PW. */
448 if (PQconnectionUsedPassword(conn
))
450 /* ok if params contain a non-empty password */
451 for (int i
= 0; keywords
[i
] != NULL
; i
++)
453 if (strcmp(keywords
[i
], "password") == 0 && values
[i
][0] != '\0')
459 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
460 errmsg("password or GSSAPI delegated credentials required"),
461 errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
462 errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
466 * Connect to remote server using specified server and user mapping properties.
469 connect_pg_server(ForeignServer
*server
, UserMapping
*user
)
471 PGconn
*volatile conn
= NULL
;
474 * Use PG_TRY block to ensure closing connection on error.
478 const char **keywords
;
480 char *appname
= NULL
;
484 * Construct connection params from generic options of ForeignServer
485 * and UserMapping. (Some of them might not be libpq options, in
486 * which case we'll just waste a few array slots.) Add 4 extra slots
487 * for application_name, fallback_application_name, client_encoding,
490 n
= list_length(server
->options
) + list_length(user
->options
) + 4 + 2;
491 keywords
= (const char **) palloc(n
* sizeof(char *));
492 values
= (const char **) palloc(n
* sizeof(char *));
495 n
+= ExtractConnectionOptions(server
->options
,
496 keywords
+ n
, values
+ n
);
497 n
+= ExtractConnectionOptions(user
->options
,
498 keywords
+ n
, values
+ n
);
501 * Use pgfdw_application_name as application_name if set.
503 * PQconnectdbParams() processes the parameter arrays from start to
504 * end. If any key word is repeated, the last value is used. Therefore
505 * note that pgfdw_application_name must be added to the arrays after
506 * options of ForeignServer are, so that it can override
507 * application_name set in ForeignServer.
509 if (pgfdw_application_name
&& *pgfdw_application_name
!= '\0')
511 keywords
[n
] = "application_name";
512 values
[n
] = pgfdw_application_name
;
517 * Search the parameter arrays to find application_name setting, and
518 * replace escape sequences in it with status information if found.
519 * The arrays are searched backwards because the last value is used if
520 * application_name is repeatedly set.
522 for (int i
= n
- 1; i
>= 0; i
--)
524 if (strcmp(keywords
[i
], "application_name") == 0 &&
525 *(values
[i
]) != '\0')
528 * Use this application_name setting if it's not empty string
529 * even after any escape sequences in it are replaced.
531 appname
= process_pgfdw_appname(values
[i
]);
532 if (appname
[0] != '\0')
539 * This empty application_name is not used, so we set
540 * values[i] to NULL and keep searching the array to find the
549 /* Use "postgres_fdw" as fallback_application_name */
550 keywords
[n
] = "fallback_application_name";
551 values
[n
] = "postgres_fdw";
554 /* Set client_encoding so that libpq can convert encoding properly. */
555 keywords
[n
] = "client_encoding";
556 values
[n
] = GetDatabaseEncodingName();
559 if (MyProcPort
->has_scram_keys
&& UseScramPassthrough(server
, user
))
564 keywords
[n
] = "scram_client_key";
565 len
= pg_b64_enc_len(sizeof(MyProcPort
->scram_ClientKey
));
566 /* don't forget the zero-terminator */
567 values
[n
] = palloc0(len
+ 1);
568 encoded_len
= pg_b64_encode((const char *) MyProcPort
->scram_ClientKey
,
569 sizeof(MyProcPort
->scram_ClientKey
),
570 (char *) values
[n
], len
);
572 elog(ERROR
, "could not encode SCRAM client key");
575 keywords
[n
] = "scram_server_key";
576 len
= pg_b64_enc_len(sizeof(MyProcPort
->scram_ServerKey
));
577 /* don't forget the zero-terminator */
578 values
[n
] = palloc0(len
+ 1);
579 encoded_len
= pg_b64_encode((const char *) MyProcPort
->scram_ServerKey
,
580 sizeof(MyProcPort
->scram_ServerKey
),
581 (char *) values
[n
], len
);
583 elog(ERROR
, "could not encode SCRAM server key");
587 keywords
[n
] = values
[n
] = NULL
;
590 * Verify the set of connection parameters only if scram pass-through
591 * is not being used because the password is not necessary.
593 if (!(MyProcPort
->has_scram_keys
&& UseScramPassthrough(server
, user
)))
594 check_conn_params(keywords
, values
, user
);
596 /* first time, allocate or get the custom wait event */
597 if (pgfdw_we_connect
== 0)
598 pgfdw_we_connect
= WaitEventExtensionNew("PostgresFdwConnect");
600 /* OK to make connection */
601 conn
= libpqsrv_connect_params(keywords
, values
,
602 false, /* expand_dbname */
605 if (!conn
|| PQstatus(conn
) != CONNECTION_OK
)
607 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
),
608 errmsg("could not connect to server \"%s\"",
610 errdetail_internal("%s", pchomp(PQerrorMessage(conn
)))));
613 * Perform post-connection security checks only if scram pass-through
614 * is not being used because the password is not necessary.
616 if (!(MyProcPort
->has_scram_keys
&& UseScramPassthrough(server
, user
)))
617 pgfdw_security_check(keywords
, values
, user
, conn
);
619 /* Prepare new session for use */
620 configure_remote_session(conn
);
629 libpqsrv_disconnect(conn
);
638 * Disconnect any open connection for a connection cache entry.
641 disconnect_pg_server(ConnCacheEntry
*entry
)
643 if (entry
->conn
!= NULL
)
645 libpqsrv_disconnect(entry
->conn
);
651 * Return true if the password_required is defined and false for this user
652 * mapping, otherwise false. The mapping has been pre-validated.
655 UserMappingPasswordRequired(UserMapping
*user
)
659 foreach(cell
, user
->options
)
661 DefElem
*def
= (DefElem
*) lfirst(cell
);
663 if (strcmp(def
->defname
, "password_required") == 0)
664 return defGetBoolean(def
);
671 UseScramPassthrough(ForeignServer
*server
, UserMapping
*user
)
675 foreach(cell
, server
->options
)
677 DefElem
*def
= (DefElem
*) lfirst(cell
);
679 if (strcmp(def
->defname
, "use_scram_passthrough") == 0)
680 return defGetBoolean(def
);
683 foreach(cell
, user
->options
)
685 DefElem
*def
= (DefElem
*) lfirst(cell
);
687 if (strcmp(def
->defname
, "use_scram_passthrough") == 0)
688 return defGetBoolean(def
);
695 * For non-superusers, insist that the connstr specify a password or that the
696 * user provided their own GSSAPI delegated credentials. This
697 * prevents a password from being picked up from .pgpass, a service file, the
698 * environment, etc. We don't want the postgres user's passwords,
699 * certificates, etc to be accessible to non-superusers. (See also
700 * dblink_connstr_check in contrib/dblink.)
703 check_conn_params(const char **keywords
, const char **values
, UserMapping
*user
)
707 /* no check required if superuser */
708 if (superuser_arg(user
->userid
))
712 /* ok if the user provided their own delegated credentials */
713 if (be_gssapi_get_delegation(MyProcPort
))
717 /* ok if params contain a non-empty password */
718 for (i
= 0; keywords
[i
] != NULL
; i
++)
720 if (strcmp(keywords
[i
], "password") == 0 && values
[i
][0] != '\0')
724 /* ok if the superuser explicitly said so at user mapping creation time */
725 if (!UserMappingPasswordRequired(user
))
729 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
730 errmsg("password or GSSAPI delegated credentials required"),
731 errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
735 * Issue SET commands to make sure remote session is configured properly.
737 * We do this just once at connection, assuming nothing will change the
738 * values later. Since we'll never send volatile function calls to the
739 * remote, there shouldn't be any way to break this assumption from our end.
740 * It's possible to think of ways to break it at the remote end, eg making
741 * a foreign table point to a view that includes a set_config call ---
742 * but once you admit the possibility of a malicious view definition,
743 * there are any number of ways to break things.
746 configure_remote_session(PGconn
*conn
)
748 int remoteversion
= PQserverVersion(conn
);
750 /* Force the search path to contain only pg_catalog (see deparse.c) */
751 do_sql_command(conn
, "SET search_path = pg_catalog");
754 * Set remote timezone; this is basically just cosmetic, since all
755 * transmitted and returned timestamptzs should specify a zone explicitly
756 * anyway. However it makes the regression test outputs more predictable.
758 * We don't risk setting remote zone equal to ours, since the remote
759 * server might use a different timezone database. Instead, use GMT
760 * (quoted, because very old servers are picky about case). That's
761 * guaranteed to work regardless of the remote's timezone database,
762 * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
764 do_sql_command(conn
, "SET timezone = 'GMT'");
767 * Set values needed to ensure unambiguous data output from remote. (This
768 * logic should match what pg_dump does. See also set_transmission_modes
769 * in postgres_fdw.c.)
771 do_sql_command(conn
, "SET datestyle = ISO");
772 if (remoteversion
>= 80400)
773 do_sql_command(conn
, "SET intervalstyle = postgres");
774 if (remoteversion
>= 90000)
775 do_sql_command(conn
, "SET extra_float_digits = 3");
777 do_sql_command(conn
, "SET extra_float_digits = 2");
781 * Convenience subroutine to issue a non-data-returning SQL command to remote
784 do_sql_command(PGconn
*conn
, const char *sql
)
786 do_sql_command_begin(conn
, sql
);
787 do_sql_command_end(conn
, sql
, false);
791 do_sql_command_begin(PGconn
*conn
, const char *sql
)
793 if (!PQsendQuery(conn
, sql
))
794 pgfdw_report_error(ERROR
, NULL
, conn
, false, sql
);
798 do_sql_command_end(PGconn
*conn
, const char *sql
, bool consume_input
)
803 * If requested, consume whatever data is available from the socket. (Note
804 * that if all data is available, this allows pgfdw_get_result to call
805 * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
806 * would be large compared to the overhead of PQconsumeInput.)
808 if (consume_input
&& !PQconsumeInput(conn
))
809 pgfdw_report_error(ERROR
, NULL
, conn
, false, sql
);
810 res
= pgfdw_get_result(conn
);
811 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
812 pgfdw_report_error(ERROR
, res
, conn
, true, sql
);
817 * Start remote transaction or subtransaction, if needed.
819 * Note that we always use at least REPEATABLE READ in the remote session.
820 * This is so that, if a query initiates multiple scans of the same or
821 * different foreign tables, we will get snapshot-consistent results from
822 * those scans. A disadvantage is that we can't provide sane emulation of
823 * READ COMMITTED behavior --- it would be nice if we had some other way to
824 * control which remote queries share a snapshot.
827 begin_remote_xact(ConnCacheEntry
*entry
)
829 int curlevel
= GetCurrentTransactionNestLevel();
831 /* Start main transaction if we haven't yet */
832 if (entry
->xact_depth
<= 0)
836 elog(DEBUG3
, "starting remote transaction on connection %p",
839 if (IsolationIsSerializable())
840 sql
= "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
842 sql
= "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
843 entry
->changing_xact_state
= true;
844 do_sql_command(entry
->conn
, sql
);
845 entry
->xact_depth
= 1;
846 entry
->changing_xact_state
= false;
850 * If we're in a subtransaction, stack up savepoints to match our level.
851 * This ensures we can rollback just the desired effects when a
852 * subtransaction aborts.
854 while (entry
->xact_depth
< curlevel
)
858 snprintf(sql
, sizeof(sql
), "SAVEPOINT s%d", entry
->xact_depth
+ 1);
859 entry
->changing_xact_state
= true;
860 do_sql_command(entry
->conn
, sql
);
862 entry
->changing_xact_state
= false;
867 * Release connection reference count created by calling GetConnection.
870 ReleaseConnection(PGconn
*conn
)
873 * Currently, we don't actually track connection references because all
874 * cleanup is managed on a transaction or subtransaction basis instead. So
875 * there's nothing to do here.
880 * Assign a "unique" number for a cursor.
882 * These really only need to be unique per connection within a transaction.
883 * For the moment we ignore the per-connection point and assign them across
884 * all connections in the transaction, but we ask for the connection to be
885 * supplied in case we want to refine that.
887 * Note that even if wraparound happens in a very long transaction, actual
888 * collisions are highly improbable; just be sure to use %u not %d to print.
891 GetCursorNumber(PGconn
*conn
)
893 return ++cursor_number
;
897 * Assign a "unique" number for a prepared statement.
899 * This works much like GetCursorNumber, except that we never reset the counter
900 * within a session. That's because we can't be 100% sure we've gotten rid
901 * of all prepared statements on all connections, and it's not really worth
902 * increasing the risk of prepared-statement name collisions by resetting.
905 GetPrepStmtNumber(PGconn
*conn
)
907 return ++prep_stmt_number
;
911 * Submit a query and wait for the result.
913 * Since we don't use non-blocking mode, this can't process interrupts while
914 * pushing the query text to the server. That risk is relatively small, so we
915 * ignore that for now.
917 * Caller is responsible for the error handling on the result.
920 pgfdw_exec_query(PGconn
*conn
, const char *query
, PgFdwConnState
*state
)
922 /* First, process a pending asynchronous request, if any. */
923 if (state
&& state
->pendingAreq
)
924 process_pending_request(state
->pendingAreq
);
926 if (!PQsendQuery(conn
, query
))
928 return pgfdw_get_result(conn
);
932 * Wrap libpqsrv_get_result_last(), adding wait event.
934 * Caller is responsible for the error handling on the result.
937 pgfdw_get_result(PGconn
*conn
)
939 return libpqsrv_get_result_last(conn
, pgfdw_we_get_result
);
943 * Report an error we got from the remote server.
945 * elevel: error level to use (typically ERROR, but might be less)
946 * res: PGresult containing the error
947 * conn: connection we did the query on
948 * clear: if true, PQclear the result (otherwise caller will handle it)
949 * sql: NULL, or text of remote command we tried to execute
951 * Note: callers that choose not to throw ERROR for a remote error are
952 * responsible for making sure that the associated ConnCacheEntry gets
953 * marked with have_error = true.
956 pgfdw_report_error(int elevel
, PGresult
*res
, PGconn
*conn
,
957 bool clear
, const char *sql
)
959 /* If requested, PGresult must be released before leaving this function. */
962 char *diag_sqlstate
= PQresultErrorField(res
, PG_DIAG_SQLSTATE
);
963 char *message_primary
= PQresultErrorField(res
, PG_DIAG_MESSAGE_PRIMARY
);
964 char *message_detail
= PQresultErrorField(res
, PG_DIAG_MESSAGE_DETAIL
);
965 char *message_hint
= PQresultErrorField(res
, PG_DIAG_MESSAGE_HINT
);
966 char *message_context
= PQresultErrorField(res
, PG_DIAG_CONTEXT
);
970 sqlstate
= MAKE_SQLSTATE(diag_sqlstate
[0],
976 sqlstate
= ERRCODE_CONNECTION_FAILURE
;
979 * If we don't get a message from the PGresult, try the PGconn. This
980 * is needed because for connection-level failures, PQgetResult may
981 * just return NULL, not a PGresult at all.
983 if (message_primary
== NULL
)
984 message_primary
= pchomp(PQerrorMessage(conn
));
988 (message_primary
!= NULL
&& message_primary
[0] != '\0') ?
989 errmsg_internal("%s", message_primary
) :
990 errmsg("could not obtain message string for remote error"),
991 message_detail
? errdetail_internal("%s", message_detail
) : 0,
992 message_hint
? errhint("%s", message_hint
) : 0,
993 message_context
? errcontext("%s", message_context
) : 0,
994 sql
? errcontext("remote SQL command: %s", sql
) : 0));
1005 * pgfdw_xact_callback --- cleanup at main-transaction end.
1007 * This runs just late enough that it must not enter user-defined code
1008 * locally. (Entering such code on the remote side is fine. Its remote
1009 * COMMIT TRANSACTION may run deferred triggers.)
1012 pgfdw_xact_callback(XactEvent event
, void *arg
)
1014 HASH_SEQ_STATUS scan
;
1015 ConnCacheEntry
*entry
;
1016 List
*pending_entries
= NIL
;
1017 List
*cancel_requested
= NIL
;
1019 /* Quick exit if no connections were touched in this transaction. */
1020 if (!xact_got_connection
)
1024 * Scan all connection cache entries to find open remote transactions, and
1027 hash_seq_init(&scan
, ConnectionHash
);
1028 while ((entry
= (ConnCacheEntry
*) hash_seq_search(&scan
)))
1032 /* Ignore cache entry if no open connection right now */
1033 if (entry
->conn
== NULL
)
1036 /* If it has an open remote transaction, try to close it */
1037 if (entry
->xact_depth
> 0)
1039 elog(DEBUG3
, "closing remote transaction on connection %p",
1044 case XACT_EVENT_PARALLEL_PRE_COMMIT
:
1045 case XACT_EVENT_PRE_COMMIT
:
1048 * If abort cleanup previously failed for this connection,
1049 * we can't issue any more commands against it.
1051 pgfdw_reject_incomplete_xact_state_change(entry
);
1053 /* Commit all remote transactions during pre-commit */
1054 entry
->changing_xact_state
= true;
1055 if (entry
->parallel_commit
)
1057 do_sql_command_begin(entry
->conn
, "COMMIT TRANSACTION");
1058 pending_entries
= lappend(pending_entries
, entry
);
1061 do_sql_command(entry
->conn
, "COMMIT TRANSACTION");
1062 entry
->changing_xact_state
= false;
1065 * If there were any errors in subtransactions, and we
1066 * made prepared statements, do a DEALLOCATE ALL to make
1067 * sure we get rid of all prepared statements. This is
1068 * annoying and not terribly bulletproof, but it's
1069 * probably not worth trying harder.
1071 * DEALLOCATE ALL only exists in 8.3 and later, so this
1072 * constrains how old a server postgres_fdw can
1073 * communicate with. We intentionally ignore errors in
1074 * the DEALLOCATE, so that we can hobble along to some
1075 * extent with older servers (leaking prepared statements
1076 * as we go; but we don't really support update operations
1079 if (entry
->have_prep_stmt
&& entry
->have_error
)
1081 res
= pgfdw_exec_query(entry
->conn
, "DEALLOCATE ALL",
1085 entry
->have_prep_stmt
= false;
1086 entry
->have_error
= false;
1088 case XACT_EVENT_PRE_PREPARE
:
1091 * We disallow any remote transactions, since it's not
1092 * very reasonable to hold them open until the prepared
1093 * transaction is committed. For the moment, throw error
1094 * unconditionally; later we might allow read-only cases.
1095 * Note that the error will cause us to come right back
1096 * here with event == XACT_EVENT_ABORT, so we'll clean up
1097 * the connection state at that point.
1100 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
1101 errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1103 case XACT_EVENT_PARALLEL_COMMIT
:
1104 case XACT_EVENT_COMMIT
:
1105 case XACT_EVENT_PREPARE
:
1106 /* Pre-commit should have closed the open transaction */
1107 elog(ERROR
, "missed cleaning up connection during pre-commit");
1109 case XACT_EVENT_PARALLEL_ABORT
:
1110 case XACT_EVENT_ABORT
:
1111 /* Rollback all remote transactions during abort */
1112 if (entry
->parallel_abort
)
1114 if (pgfdw_abort_cleanup_begin(entry
, true,
1120 pgfdw_abort_cleanup(entry
, true);
1125 /* Reset state to show we're out of a transaction */
1126 pgfdw_reset_xact_state(entry
, true);
1129 /* If there are any pending connections, finish cleaning them up */
1130 if (pending_entries
|| cancel_requested
)
1132 if (event
== XACT_EVENT_PARALLEL_PRE_COMMIT
||
1133 event
== XACT_EVENT_PRE_COMMIT
)
1135 Assert(cancel_requested
== NIL
);
1136 pgfdw_finish_pre_commit_cleanup(pending_entries
);
1140 Assert(event
== XACT_EVENT_PARALLEL_ABORT
||
1141 event
== XACT_EVENT_ABORT
);
1142 pgfdw_finish_abort_cleanup(pending_entries
, cancel_requested
,
1148 * Regardless of the event type, we can now mark ourselves as out of the
1149 * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1150 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1152 xact_got_connection
= false;
1154 /* Also reset cursor numbering for next transaction */
1159 * pgfdw_subxact_callback --- cleanup at subtransaction end.
1162 pgfdw_subxact_callback(SubXactEvent event
, SubTransactionId mySubid
,
1163 SubTransactionId parentSubid
, void *arg
)
1165 HASH_SEQ_STATUS scan
;
1166 ConnCacheEntry
*entry
;
1168 List
*pending_entries
= NIL
;
1169 List
*cancel_requested
= NIL
;
1171 /* Nothing to do at subxact start, nor after commit. */
1172 if (!(event
== SUBXACT_EVENT_PRE_COMMIT_SUB
||
1173 event
== SUBXACT_EVENT_ABORT_SUB
))
1176 /* Quick exit if no connections were touched in this transaction. */
1177 if (!xact_got_connection
)
1181 * Scan all connection cache entries to find open remote subtransactions
1182 * of the current level, and close them.
1184 curlevel
= GetCurrentTransactionNestLevel();
1185 hash_seq_init(&scan
, ConnectionHash
);
1186 while ((entry
= (ConnCacheEntry
*) hash_seq_search(&scan
)))
1191 * We only care about connections with open remote subtransactions of
1192 * the current level.
1194 if (entry
->conn
== NULL
|| entry
->xact_depth
< curlevel
)
1197 if (entry
->xact_depth
> curlevel
)
1198 elog(ERROR
, "missed cleaning up remote subtransaction at level %d",
1201 if (event
== SUBXACT_EVENT_PRE_COMMIT_SUB
)
1204 * If abort cleanup previously failed for this connection, we
1205 * can't issue any more commands against it.
1207 pgfdw_reject_incomplete_xact_state_change(entry
);
1209 /* Commit all remote subtransactions during pre-commit */
1210 snprintf(sql
, sizeof(sql
), "RELEASE SAVEPOINT s%d", curlevel
);
1211 entry
->changing_xact_state
= true;
1212 if (entry
->parallel_commit
)
1214 do_sql_command_begin(entry
->conn
, sql
);
1215 pending_entries
= lappend(pending_entries
, entry
);
1218 do_sql_command(entry
->conn
, sql
);
1219 entry
->changing_xact_state
= false;
1223 /* Rollback all remote subtransactions during abort */
1224 if (entry
->parallel_abort
)
1226 if (pgfdw_abort_cleanup_begin(entry
, false,
1232 pgfdw_abort_cleanup(entry
, false);
1235 /* OK, we're outta that level of subtransaction */
1236 pgfdw_reset_xact_state(entry
, false);
1239 /* If there are any pending connections, finish cleaning them up */
1240 if (pending_entries
|| cancel_requested
)
1242 if (event
== SUBXACT_EVENT_PRE_COMMIT_SUB
)
1244 Assert(cancel_requested
== NIL
);
1245 pgfdw_finish_pre_subcommit_cleanup(pending_entries
, curlevel
);
1249 Assert(event
== SUBXACT_EVENT_ABORT_SUB
);
1250 pgfdw_finish_abort_cleanup(pending_entries
, cancel_requested
,
1257 * Connection invalidation callback function
1259 * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1260 * close connections depending on that entry immediately if current transaction
1261 * has not used those connections yet. Otherwise, mark those connections as
1262 * invalid and then make pgfdw_xact_callback() close them at the end of current
1263 * transaction, since they cannot be closed in the midst of the transaction
1264 * using them. Closed connections will be remade at the next opportunity if
1267 * Although most cache invalidation callbacks blow away all the related stuff
1268 * regardless of the given hashvalue, connections are expensive enough that
1269 * it's worth trying to avoid that.
1271 * NB: We could avoid unnecessary disconnection more strictly by examining
1272 * individual option values, but it seems too much effort for the gain.
1275 pgfdw_inval_callback(Datum arg
, int cacheid
, uint32 hashvalue
)
1277 HASH_SEQ_STATUS scan
;
1278 ConnCacheEntry
*entry
;
1280 Assert(cacheid
== FOREIGNSERVEROID
|| cacheid
== USERMAPPINGOID
);
1282 /* ConnectionHash must exist already, if we're registered */
1283 hash_seq_init(&scan
, ConnectionHash
);
1284 while ((entry
= (ConnCacheEntry
*) hash_seq_search(&scan
)))
1286 /* Ignore invalid entries */
1287 if (entry
->conn
== NULL
)
1290 /* hashvalue == 0 means a cache reset, must clear all state */
1291 if (hashvalue
== 0 ||
1292 (cacheid
== FOREIGNSERVEROID
&&
1293 entry
->server_hashvalue
== hashvalue
) ||
1294 (cacheid
== USERMAPPINGOID
&&
1295 entry
->mapping_hashvalue
== hashvalue
))
1298 * Close the connection immediately if it's not used yet in this
1299 * transaction. Otherwise mark it as invalid so that
1300 * pgfdw_xact_callback() can close it at the end of this
1303 if (entry
->xact_depth
== 0)
1305 elog(DEBUG3
, "discarding connection %p", entry
->conn
);
1306 disconnect_pg_server(entry
);
1309 entry
->invalidated
= true;
1315 * Raise an error if the given connection cache entry is marked as being
1316 * in the middle of an xact state change. This should be called at which no
1317 * such change is expected to be in progress; if one is found to be in
1318 * progress, it means that we aborted in the middle of a previous state change
1319 * and now don't know what the remote transaction state actually is.
1320 * Such connections can't safely be further used. Re-establishing the
1321 * connection would change the snapshot and roll back any writes already
1322 * performed, so that's not an option, either. Thus, we must abort.
1325 pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry
*entry
)
1327 ForeignServer
*server
;
1329 /* nothing to do for inactive entries and entries of sane state */
1330 if (entry
->conn
== NULL
|| !entry
->changing_xact_state
)
1333 /* make sure this entry is inactive */
1334 disconnect_pg_server(entry
);
1336 /* find server name to be shown in the message below */
1337 server
= GetForeignServer(entry
->serverid
);
1340 (errcode(ERRCODE_CONNECTION_EXCEPTION
),
1341 errmsg("connection to server \"%s\" was lost",
1342 server
->servername
)));
1346 * Reset state to show we're out of a (sub)transaction.
1349 pgfdw_reset_xact_state(ConnCacheEntry
*entry
, bool toplevel
)
1353 /* Reset state to show we're out of a transaction */
1354 entry
->xact_depth
= 0;
1357 * If the connection isn't in a good idle state, it is marked as
1358 * invalid or keep_connections option of its server is disabled, then
1359 * discard it to recover. Next GetConnection will open a new
1362 if (PQstatus(entry
->conn
) != CONNECTION_OK
||
1363 PQtransactionStatus(entry
->conn
) != PQTRANS_IDLE
||
1364 entry
->changing_xact_state
||
1365 entry
->invalidated
||
1366 !entry
->keep_connections
)
1368 elog(DEBUG3
, "discarding connection %p", entry
->conn
);
1369 disconnect_pg_server(entry
);
1374 /* Reset state to show we're out of a subtransaction */
1375 entry
->xact_depth
--;
1380 * Cancel the currently-in-progress query (whose query text we do not have)
1381 * and ignore the result. Returns true if we successfully cancel the query
1382 * and discard any pending result, and false if not.
1384 * It's not a huge problem if we throw an ERROR here, but if we get into error
1385 * recursion trouble, we'll end up slamming the connection shut, which will
1386 * necessitate failing the entire toplevel transaction even if subtransactions
1387 * were used. Try to use WARNING where we can.
1389 * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1390 * query text from the pendingAreq saved in the per-connection state, then
1391 * report the query using it.
1394 pgfdw_cancel_query(PGconn
*conn
)
1396 TimestampTz now
= GetCurrentTimestamp();
1397 TimestampTz endtime
;
1398 TimestampTz retrycanceltime
;
1401 * If it takes too long to cancel the query and discard the result, assume
1402 * the connection is dead.
1404 endtime
= TimestampTzPlusMilliseconds(now
, CONNECTION_CLEANUP_TIMEOUT
);
1407 * Also, lose patience and re-issue the cancel request after a little bit.
1408 * (This serves to close some race conditions.)
1410 retrycanceltime
= TimestampTzPlusMilliseconds(now
, RETRY_CANCEL_TIMEOUT
);
1412 if (!pgfdw_cancel_query_begin(conn
, endtime
))
1414 return pgfdw_cancel_query_end(conn
, endtime
, retrycanceltime
, false);
1418 * Submit a cancel request to the given connection, waiting only until
1421 * We sleep interruptibly until we receive confirmation that the cancel
1422 * request has been accepted, and if it is, return true; if the timeout
1423 * lapses without that, or the request fails for whatever reason, return
1427 pgfdw_cancel_query_begin(PGconn
*conn
, TimestampTz endtime
)
1429 const char *errormsg
= libpqsrv_cancel(conn
, endtime
);
1431 if (errormsg
!= NULL
)
1433 errcode(ERRCODE_CONNECTION_FAILURE
),
1434 errmsg("could not send cancel request: %s", errormsg
));
1436 return errormsg
== NULL
;
1440 pgfdw_cancel_query_end(PGconn
*conn
, TimestampTz endtime
,
1441 TimestampTz retrycanceltime
, bool consume_input
)
1447 * If requested, consume whatever data is available from the socket. (Note
1448 * that if all data is available, this allows pgfdw_get_cleanup_result to
1449 * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1450 * which would be large compared to the overhead of PQconsumeInput.)
1452 if (consume_input
&& !PQconsumeInput(conn
))
1455 (errcode(ERRCODE_CONNECTION_FAILURE
),
1456 errmsg("could not get result of cancel request: %s",
1457 pchomp(PQerrorMessage(conn
)))));
1461 /* Get and discard the result of the query. */
1462 if (pgfdw_get_cleanup_result(conn
, endtime
, retrycanceltime
,
1463 &result
, &timed_out
))
1467 (errmsg("could not get result of cancel request due to timeout")));
1470 (errcode(ERRCODE_CONNECTION_FAILURE
),
1471 errmsg("could not get result of cancel request: %s",
1472 pchomp(PQerrorMessage(conn
)))));
1482 * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1483 * result. If the query is executed without error, the return value is true.
1484 * If the query is executed successfully but returns an error, the return
1485 * value is true if and only if ignore_errors is set. If the query can't be
1486 * sent or times out, the return value is false.
1488 * It's not a huge problem if we throw an ERROR here, but if we get into error
1489 * recursion trouble, we'll end up slamming the connection shut, which will
1490 * necessitate failing the entire toplevel transaction even if subtransactions
1491 * were used. Try to use WARNING where we can.
1494 pgfdw_exec_cleanup_query(PGconn
*conn
, const char *query
, bool ignore_errors
)
1496 TimestampTz endtime
;
1499 * If it takes too long to execute a cleanup query, assume the connection
1500 * is dead. It's fairly likely that this is why we aborted in the first
1501 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1504 endtime
= TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1505 CONNECTION_CLEANUP_TIMEOUT
);
1507 if (!pgfdw_exec_cleanup_query_begin(conn
, query
))
1509 return pgfdw_exec_cleanup_query_end(conn
, query
, endtime
,
1510 false, ignore_errors
);
1514 pgfdw_exec_cleanup_query_begin(PGconn
*conn
, const char *query
)
1516 Assert(query
!= NULL
);
1519 * Submit a query. Since we don't use non-blocking mode, this also can
1520 * block. But its risk is relatively small, so we ignore that for now.
1522 if (!PQsendQuery(conn
, query
))
1524 pgfdw_report_error(WARNING
, NULL
, conn
, false, query
);
1532 pgfdw_exec_cleanup_query_end(PGconn
*conn
, const char *query
,
1533 TimestampTz endtime
, bool consume_input
,
1539 Assert(query
!= NULL
);
1542 * If requested, consume whatever data is available from the socket. (Note
1543 * that if all data is available, this allows pgfdw_get_cleanup_result to
1544 * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1545 * which would be large compared to the overhead of PQconsumeInput.)
1547 if (consume_input
&& !PQconsumeInput(conn
))
1549 pgfdw_report_error(WARNING
, NULL
, conn
, false, query
);
1553 /* Get the result of the query. */
1554 if (pgfdw_get_cleanup_result(conn
, endtime
, endtime
, &result
, &timed_out
))
1558 (errmsg("could not get query result due to timeout"),
1559 errcontext("remote SQL command: %s", query
)));
1561 pgfdw_report_error(WARNING
, NULL
, conn
, false, query
);
1566 /* Issue a warning if not successful. */
1567 if (PQresultStatus(result
) != PGRES_COMMAND_OK
)
1569 pgfdw_report_error(WARNING
, result
, conn
, true, query
);
1570 return ignore_errors
;
1578 * Get, during abort cleanup, the result of a query that is in progress.
1579 * This might be a query that is being interrupted by a cancel request or by
1580 * transaction abort, or it might be a query that was initiated as part of
1581 * transaction abort to get the remote side back to the appropriate state.
1583 * endtime is the time at which we should give up and assume the remote side
1584 * is dead. retrycanceltime is the time at which we should issue a fresh
1585 * cancel request (pass the same value as endtime if this is not wanted).
1587 * Returns true if the timeout expired or connection trouble occurred,
1588 * false otherwise. Sets *result except in case of a true result.
1589 * Sets *timed_out to true only when the timeout expired.
1592 pgfdw_get_cleanup_result(PGconn
*conn
, TimestampTz endtime
,
1593 TimestampTz retrycanceltime
,
1597 volatile bool failed
= false;
1598 PGresult
*volatile last_res
= NULL
;
1603 /* In what follows, do not leak any PGresults on an error. */
1606 int canceldelta
= RETRY_CANCEL_TIMEOUT
* 2;
1612 while (PQisBusy(conn
))
1615 TimestampTz now
= GetCurrentTimestamp();
1618 /* If timeout has expired, give up. */
1626 /* If we need to re-issue the cancel request, do that. */
1627 if (now
>= retrycanceltime
)
1629 /* We ignore failure to issue the repeated request. */
1630 (void) libpqsrv_cancel(conn
, endtime
);
1632 /* Recompute "now" in case that took measurable time. */
1633 now
= GetCurrentTimestamp();
1635 /* Adjust re-cancel timeout in increasing steps. */
1636 retrycanceltime
= TimestampTzPlusMilliseconds(now
,
1638 canceldelta
+= canceldelta
;
1641 /* If timeout has expired, give up, else get sleep time. */
1642 cur_timeout
= TimestampDifferenceMilliseconds(now
,
1645 if (cur_timeout
<= 0)
1652 /* first time, allocate or get the custom wait event */
1653 if (pgfdw_we_cleanup_result
== 0)
1654 pgfdw_we_cleanup_result
= WaitEventExtensionNew("PostgresFdwCleanupResult");
1656 /* Sleep until there's something to do */
1657 wc
= WaitLatchOrSocket(MyLatch
,
1658 WL_LATCH_SET
| WL_SOCKET_READABLE
|
1659 WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
1661 cur_timeout
, pgfdw_we_cleanup_result
);
1662 ResetLatch(MyLatch
);
1664 CHECK_FOR_INTERRUPTS();
1666 /* Data available in socket? */
1667 if (wc
& WL_SOCKET_READABLE
)
1669 if (!PQconsumeInput(conn
))
1671 /* connection trouble */
1678 res
= PQgetResult(conn
);
1680 break; /* query is complete */
1702 * Abort remote transaction or subtransaction.
1704 * "toplevel" should be set to true if toplevel (main) transaction is
1705 * rollbacked, false otherwise.
1707 * Set entry->changing_xact_state to false on success, true on failure.
1710 pgfdw_abort_cleanup(ConnCacheEntry
*entry
, bool toplevel
)
1715 * Don't try to clean up the connection if we're already in error
1716 * recursion trouble.
1718 if (in_error_recursion_trouble())
1719 entry
->changing_xact_state
= true;
1722 * If connection is already unsalvageable, don't touch it further.
1724 if (entry
->changing_xact_state
)
1728 * Mark this connection as in the process of changing transaction state.
1730 entry
->changing_xact_state
= true;
1732 /* Assume we might have lost track of prepared statements */
1733 entry
->have_error
= true;
1736 * If a command has been submitted to the remote server by using an
1737 * asynchronous execution function, the command might not have yet
1738 * completed. Check to see if a command is still being processed by the
1739 * remote server, and if so, request cancellation of the command.
1741 if (PQtransactionStatus(entry
->conn
) == PQTRANS_ACTIVE
&&
1742 !pgfdw_cancel_query(entry
->conn
))
1743 return; /* Unable to cancel running query */
1745 CONSTRUCT_ABORT_COMMAND(sql
, entry
, toplevel
);
1746 if (!pgfdw_exec_cleanup_query(entry
->conn
, sql
, false))
1747 return; /* Unable to abort remote (sub)transaction */
1751 if (entry
->have_prep_stmt
&& entry
->have_error
&&
1752 !pgfdw_exec_cleanup_query(entry
->conn
,
1755 return; /* Trouble clearing prepared statements */
1757 entry
->have_prep_stmt
= false;
1758 entry
->have_error
= false;
1762 * If pendingAreq of the per-connection state is not NULL, it means that
1763 * an asynchronous fetch begun by fetch_more_data_begin() was not done
1764 * successfully and thus the per-connection state was not reset in
1765 * fetch_more_data(); in that case reset the per-connection state here.
1767 if (entry
->state
.pendingAreq
)
1768 memset(&entry
->state
, 0, sizeof(entry
->state
));
1770 /* Disarm changing_xact_state if it all worked */
1771 entry
->changing_xact_state
= false;
1775 * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1776 * don't wait for the result.
1778 * Returns true if the abort command or cancel request is successfully issued,
1779 * false otherwise. If the abort command is successfully issued, the given
1780 * connection cache entry is appended to *pending_entries. Otherwise, if the
1781 * cancel request is successfully issued, it is appended to *cancel_requested.
1784 pgfdw_abort_cleanup_begin(ConnCacheEntry
*entry
, bool toplevel
,
1785 List
**pending_entries
, List
**cancel_requested
)
1788 * Don't try to clean up the connection if we're already in error
1789 * recursion trouble.
1791 if (in_error_recursion_trouble())
1792 entry
->changing_xact_state
= true;
1795 * If connection is already unsalvageable, don't touch it further.
1797 if (entry
->changing_xact_state
)
1801 * Mark this connection as in the process of changing transaction state.
1803 entry
->changing_xact_state
= true;
1805 /* Assume we might have lost track of prepared statements */
1806 entry
->have_error
= true;
1809 * If a command has been submitted to the remote server by using an
1810 * asynchronous execution function, the command might not have yet
1811 * completed. Check to see if a command is still being processed by the
1812 * remote server, and if so, request cancellation of the command.
1814 if (PQtransactionStatus(entry
->conn
) == PQTRANS_ACTIVE
)
1816 TimestampTz endtime
;
1818 endtime
= TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1819 CONNECTION_CLEANUP_TIMEOUT
);
1820 if (!pgfdw_cancel_query_begin(entry
->conn
, endtime
))
1821 return false; /* Unable to cancel running query */
1822 *cancel_requested
= lappend(*cancel_requested
, entry
);
1828 CONSTRUCT_ABORT_COMMAND(sql
, entry
, toplevel
);
1829 if (!pgfdw_exec_cleanup_query_begin(entry
->conn
, sql
))
1830 return false; /* Unable to abort remote transaction */
1831 *pending_entries
= lappend(*pending_entries
, entry
);
1838 * Finish pre-commit cleanup of connections on each of which we've sent a
1839 * COMMIT command to the remote server.
1842 pgfdw_finish_pre_commit_cleanup(List
*pending_entries
)
1844 ConnCacheEntry
*entry
;
1845 List
*pending_deallocs
= NIL
;
1848 Assert(pending_entries
);
1851 * Get the result of the COMMIT command for each of the pending entries
1853 foreach(lc
, pending_entries
)
1855 entry
= (ConnCacheEntry
*) lfirst(lc
);
1857 Assert(entry
->changing_xact_state
);
1860 * We might already have received the result on the socket, so pass
1861 * consume_input=true to try to consume it first
1863 do_sql_command_end(entry
->conn
, "COMMIT TRANSACTION", true);
1864 entry
->changing_xact_state
= false;
1866 /* Do a DEALLOCATE ALL in parallel if needed */
1867 if (entry
->have_prep_stmt
&& entry
->have_error
)
1869 /* Ignore errors (see notes in pgfdw_xact_callback) */
1870 if (PQsendQuery(entry
->conn
, "DEALLOCATE ALL"))
1872 pending_deallocs
= lappend(pending_deallocs
, entry
);
1876 entry
->have_prep_stmt
= false;
1877 entry
->have_error
= false;
1879 pgfdw_reset_xact_state(entry
, true);
1882 /* No further work if no pending entries */
1883 if (!pending_deallocs
)
1887 * Get the result of the DEALLOCATE command for each of the pending
1890 foreach(lc
, pending_deallocs
)
1894 entry
= (ConnCacheEntry
*) lfirst(lc
);
1896 /* Ignore errors (see notes in pgfdw_xact_callback) */
1897 while ((res
= PQgetResult(entry
->conn
)) != NULL
)
1900 /* Stop if the connection is lost (else we'll loop infinitely) */
1901 if (PQstatus(entry
->conn
) == CONNECTION_BAD
)
1904 entry
->have_prep_stmt
= false;
1905 entry
->have_error
= false;
1907 pgfdw_reset_xact_state(entry
, true);
1912 * Finish pre-subcommit cleanup of connections on each of which we've sent a
1913 * RELEASE command to the remote server.
1916 pgfdw_finish_pre_subcommit_cleanup(List
*pending_entries
, int curlevel
)
1918 ConnCacheEntry
*entry
;
1922 Assert(pending_entries
);
1925 * Get the result of the RELEASE command for each of the pending entries
1927 snprintf(sql
, sizeof(sql
), "RELEASE SAVEPOINT s%d", curlevel
);
1928 foreach(lc
, pending_entries
)
1930 entry
= (ConnCacheEntry
*) lfirst(lc
);
1932 Assert(entry
->changing_xact_state
);
1935 * We might already have received the result on the socket, so pass
1936 * consume_input=true to try to consume it first
1938 do_sql_command_end(entry
->conn
, sql
, true);
1939 entry
->changing_xact_state
= false;
1941 pgfdw_reset_xact_state(entry
, false);
1946 * Finish abort cleanup of connections on each of which we've sent an abort
1947 * command or cancel request to the remote server.
1950 pgfdw_finish_abort_cleanup(List
*pending_entries
, List
*cancel_requested
,
1953 List
*pending_deallocs
= NIL
;
1957 * For each of the pending cancel requests (if any), get and discard the
1958 * result of the query, and submit an abort command to the remote server.
1960 if (cancel_requested
)
1962 foreach(lc
, cancel_requested
)
1964 ConnCacheEntry
*entry
= (ConnCacheEntry
*) lfirst(lc
);
1965 TimestampTz now
= GetCurrentTimestamp();
1966 TimestampTz endtime
;
1967 TimestampTz retrycanceltime
;
1970 Assert(entry
->changing_xact_state
);
1973 * Set end time. You might think we should do this before issuing
1974 * cancel request like in normal mode, but that is problematic,
1975 * because if, for example, it took longer than 30 seconds to
1976 * process the first few entries in the cancel_requested list, it
1977 * would cause a timeout error when processing each of the
1978 * remaining entries in the list, leading to slamming that entry's
1981 endtime
= TimestampTzPlusMilliseconds(now
,
1982 CONNECTION_CLEANUP_TIMEOUT
);
1983 retrycanceltime
= TimestampTzPlusMilliseconds(now
,
1984 RETRY_CANCEL_TIMEOUT
);
1986 if (!pgfdw_cancel_query_end(entry
->conn
, endtime
,
1987 retrycanceltime
, true))
1989 /* Unable to cancel running query */
1990 pgfdw_reset_xact_state(entry
, toplevel
);
1994 /* Send an abort command in parallel if needed */
1995 CONSTRUCT_ABORT_COMMAND(sql
, entry
, toplevel
);
1996 if (!pgfdw_exec_cleanup_query_begin(entry
->conn
, sql
))
1998 /* Unable to abort remote (sub)transaction */
1999 pgfdw_reset_xact_state(entry
, toplevel
);
2002 pending_entries
= lappend(pending_entries
, entry
);
2006 /* No further work if no pending entries */
2007 if (!pending_entries
)
2011 * Get the result of the abort command for each of the pending entries
2013 foreach(lc
, pending_entries
)
2015 ConnCacheEntry
*entry
= (ConnCacheEntry
*) lfirst(lc
);
2016 TimestampTz endtime
;
2019 Assert(entry
->changing_xact_state
);
2022 * Set end time. We do this now, not before issuing the command like
2023 * in normal mode, for the same reason as for the cancel_requested
2026 endtime
= TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2027 CONNECTION_CLEANUP_TIMEOUT
);
2029 CONSTRUCT_ABORT_COMMAND(sql
, entry
, toplevel
);
2030 if (!pgfdw_exec_cleanup_query_end(entry
->conn
, sql
, endtime
,
2033 /* Unable to abort remote (sub)transaction */
2034 pgfdw_reset_xact_state(entry
, toplevel
);
2040 /* Do a DEALLOCATE ALL in parallel if needed */
2041 if (entry
->have_prep_stmt
&& entry
->have_error
)
2043 if (!pgfdw_exec_cleanup_query_begin(entry
->conn
,
2046 /* Trouble clearing prepared statements */
2047 pgfdw_reset_xact_state(entry
, toplevel
);
2050 pending_deallocs
= lappend(pending_deallocs
, entry
);
2053 entry
->have_prep_stmt
= false;
2054 entry
->have_error
= false;
2057 /* Reset the per-connection state if needed */
2058 if (entry
->state
.pendingAreq
)
2059 memset(&entry
->state
, 0, sizeof(entry
->state
));
2061 /* We're done with this entry; unset the changing_xact_state flag */
2062 entry
->changing_xact_state
= false;
2063 pgfdw_reset_xact_state(entry
, toplevel
);
2066 /* No further work if no pending entries */
2067 if (!pending_deallocs
)
2072 * Get the result of the DEALLOCATE command for each of the pending
2075 foreach(lc
, pending_deallocs
)
2077 ConnCacheEntry
*entry
= (ConnCacheEntry
*) lfirst(lc
);
2078 TimestampTz endtime
;
2080 Assert(entry
->changing_xact_state
);
2081 Assert(entry
->have_prep_stmt
);
2082 Assert(entry
->have_error
);
2085 * Set end time. We do this now, not before issuing the command like
2086 * in normal mode, for the same reason as for the cancel_requested
2089 endtime
= TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2090 CONNECTION_CLEANUP_TIMEOUT
);
2092 if (!pgfdw_exec_cleanup_query_end(entry
->conn
, "DEALLOCATE ALL",
2093 endtime
, true, true))
2095 /* Trouble clearing prepared statements */
2096 pgfdw_reset_xact_state(entry
, toplevel
);
2099 entry
->have_prep_stmt
= false;
2100 entry
->have_error
= false;
2102 /* Reset the per-connection state if needed */
2103 if (entry
->state
.pendingAreq
)
2104 memset(&entry
->state
, 0, sizeof(entry
->state
));
2106 /* We're done with this entry; unset the changing_xact_state flag */
2107 entry
->changing_xact_state
= false;
2108 pgfdw_reset_xact_state(entry
, toplevel
);
2112 /* Number of output arguments (columns) for various API versions */
2113 #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
2114 #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 5
2115 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 5 /* maximum of above */
2118 * Internal function used by postgres_fdw_get_connections variants.
2120 * For API version 1.1, this function takes no input parameter and
2121 * returns a set of records with the following values:
2123 * - server_name - server name of active connection. In case the foreign server
2124 * is dropped but still the connection is active, then the server name will
2125 * be NULL in output.
2126 * - valid - true/false representing whether the connection is valid or not.
2127 * Note that connections can become invalid in pgfdw_inval_callback.
2129 * For API version 1.2 and later, this function takes an input parameter
2130 * to check a connection status and returns the following
2131 * additional values along with the three values from version 1.1:
2133 * - user_name - the local user name of the active connection. In case the
2134 * user mapping is dropped but the connection is still active, then the
2135 * user name will be NULL in the output.
2136 * - used_in_xact - true if the connection is used in the current transaction.
2137 * - closed - true if the connection is closed.
2139 * No records are returned when there are no cached connections at all.
2142 postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo
,
2143 enum pgfdwVersion api_version
)
2145 ReturnSetInfo
*rsinfo
= (ReturnSetInfo
*) fcinfo
->resultinfo
;
2146 HASH_SEQ_STATUS scan
;
2147 ConnCacheEntry
*entry
;
2149 InitMaterializedSRF(fcinfo
, 0);
2151 /* If cache doesn't exist, we return no records */
2152 if (!ConnectionHash
)
2155 /* Check we have the expected number of output arguments */
2156 switch (rsinfo
->setDesc
->natts
)
2158 case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1
:
2159 if (api_version
!= PGFDW_V1_1
)
2160 elog(ERROR
, "incorrect number of output arguments");
2162 case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2
:
2163 if (api_version
!= PGFDW_V1_2
)
2164 elog(ERROR
, "incorrect number of output arguments");
2167 elog(ERROR
, "incorrect number of output arguments");
2170 hash_seq_init(&scan
, ConnectionHash
);
2171 while ((entry
= (ConnCacheEntry
*) hash_seq_search(&scan
)))
2173 ForeignServer
*server
;
2174 Datum values
[POSTGRES_FDW_GET_CONNECTIONS_COLS
] = {0};
2175 bool nulls
[POSTGRES_FDW_GET_CONNECTIONS_COLS
] = {0};
2178 /* We only look for open remote connections */
2182 server
= GetForeignServerExtended(entry
->serverid
, FSV_MISSING_OK
);
2185 * The foreign server may have been dropped in current explicit
2186 * transaction. It is not possible to drop the server from another
2187 * session when the connection associated with it is in use in the
2188 * current transaction, if tried so, the drop query in another session
2189 * blocks until the current transaction finishes.
2191 * Even though the server is dropped in the current transaction, the
2192 * cache can still have associated active connection entry, say we
2193 * call such connections dangling. Since we can not fetch the server
2194 * name from system catalogs for dangling connections, instead we show
2195 * NULL value for server name in output.
2197 * We could have done better by storing the server name in the cache
2198 * entry instead of server oid so that it could be used in the output.
2199 * But the server name in each cache entry requires 64 bytes of
2200 * memory, which is huge, when there are many cached connections and
2201 * the use case i.e. dropping the foreign server within the explicit
2202 * current transaction seems rare. So, we chose to show NULL value for
2203 * server name in output.
2205 * Such dangling connections get closed either in next use or at the
2206 * end of current explicit transaction in pgfdw_xact_callback.
2211 * If the server has been dropped in the current explicit
2212 * transaction, then this entry would have been invalidated in
2213 * pgfdw_inval_callback at the end of drop server command. Note
2214 * that this connection would not have been closed in
2215 * pgfdw_inval_callback because it is still being used in the
2216 * current explicit transaction. So, assert that here.
2218 Assert(entry
->conn
&& entry
->xact_depth
> 0 && entry
->invalidated
);
2220 /* Show null, if no server name was found */
2224 values
[i
++] = CStringGetTextDatum(server
->servername
);
2226 if (api_version
>= PGFDW_V1_2
)
2230 /* Use the system cache to obtain the user mapping */
2231 tp
= SearchSysCache1(USERMAPPINGOID
, ObjectIdGetDatum(entry
->key
));
2234 * Just like in the foreign server case, user mappings can also be
2235 * dropped in the current explicit transaction. Therefore, the
2236 * similar check as in the server case is required.
2238 if (!HeapTupleIsValid(tp
))
2241 * If we reach here, this entry must have been invalidated in
2242 * pgfdw_inval_callback, same as in the server case.
2244 Assert(entry
->conn
&& entry
->xact_depth
> 0 &&
2245 entry
->invalidated
);
2253 userid
= ((Form_pg_user_mapping
) GETSTRUCT(tp
))->umuser
;
2254 values
[i
++] = CStringGetTextDatum(MappingUserName(userid
));
2255 ReleaseSysCache(tp
);
2259 values
[i
++] = BoolGetDatum(!entry
->invalidated
);
2261 if (api_version
>= PGFDW_V1_2
)
2263 bool check_conn
= PG_GETARG_BOOL(0);
2265 /* Is this connection used in the current transaction? */
2266 values
[i
++] = BoolGetDatum(entry
->xact_depth
> 0);
2269 * If a connection status check is requested and supported, return
2270 * whether the connection is closed. Otherwise, return NULL.
2272 if (check_conn
&& pgfdw_conn_checkable())
2273 values
[i
++] = BoolGetDatum(pgfdw_conn_check(entry
->conn
) != 0);
2278 tuplestore_putvalues(rsinfo
->setResult
, rsinfo
->setDesc
, values
, nulls
);
2283 * List active foreign server connections.
2285 * The SQL API of this function has changed multiple times, and will likely
2286 * do so again in future. To support the case where a newer version of this
2287 * loadable module is being used with an old SQL declaration of the function,
2288 * we continue to support the older API versions.
2291 postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS
)
2293 postgres_fdw_get_connections_internal(fcinfo
, PGFDW_V1_2
);
2299 postgres_fdw_get_connections(PG_FUNCTION_ARGS
)
2301 postgres_fdw_get_connections_internal(fcinfo
, PGFDW_V1_1
);
2307 * Disconnect the specified cached connections.
2309 * This function discards the open connections that are established by
2310 * postgres_fdw from the local session to the foreign server with
2311 * the given name. Note that there can be multiple connections to
2312 * the given server using different user mappings. If the connections
2313 * are used in the current local transaction, they are not disconnected
2314 * and warning messages are reported. This function returns true
2315 * if it disconnects at least one connection, otherwise false. If no
2316 * foreign server with the given name is found, an error is reported.
2319 postgres_fdw_disconnect(PG_FUNCTION_ARGS
)
2321 ForeignServer
*server
;
2324 servername
= text_to_cstring(PG_GETARG_TEXT_PP(0));
2325 server
= GetForeignServerByName(servername
, false);
2327 PG_RETURN_BOOL(disconnect_cached_connections(server
->serverid
));
2331 * Disconnect all the cached connections.
2333 * This function discards all the open connections that are established by
2334 * postgres_fdw from the local session to the foreign servers.
2335 * If the connections are used in the current local transaction, they are
2336 * not disconnected and warning messages are reported. This function
2337 * returns true if it disconnects at least one connection, otherwise false.
2340 postgres_fdw_disconnect_all(PG_FUNCTION_ARGS
)
2342 PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid
));
2346 * Workhorse to disconnect cached connections.
2348 * This function scans all the connection cache entries and disconnects
2349 * the open connections whose foreign server OID matches with
2350 * the specified one. If InvalidOid is specified, it disconnects all
2351 * the cached connections.
2353 * This function emits a warning for each connection that's used in
2354 * the current transaction and doesn't close it. It returns true if
2355 * it disconnects at least one connection, otherwise false.
2357 * Note that this function disconnects even the connections that are
2358 * established by other users in the same local session using different
2359 * user mappings. This leads even non-superuser to be able to close
2360 * the connections established by superusers in the same local session.
2362 * XXX As of now we don't see any security risk doing this. But we should
2363 * set some restrictions on that, for example, prevent non-superuser
2364 * from closing the connections established by superusers even
2365 * in the same session?
2368 disconnect_cached_connections(Oid serverid
)
2370 HASH_SEQ_STATUS scan
;
2371 ConnCacheEntry
*entry
;
2372 bool all
= !OidIsValid(serverid
);
2373 bool result
= false;
2376 * Connection cache hashtable has not been initialized yet in this
2377 * session, so return false.
2379 if (!ConnectionHash
)
2382 hash_seq_init(&scan
, ConnectionHash
);
2383 while ((entry
= (ConnCacheEntry
*) hash_seq_search(&scan
)))
2385 /* Ignore cache entry if no open connection right now. */
2389 if (all
|| entry
->serverid
== serverid
)
2392 * Emit a warning because the connection to close is used in the
2393 * current transaction and cannot be disconnected right now.
2395 if (entry
->xact_depth
> 0)
2397 ForeignServer
*server
;
2399 server
= GetForeignServerExtended(entry
->serverid
,
2405 * If the foreign server was dropped while its connection
2406 * was used in the current transaction, the connection
2407 * must have been marked as invalid by
2408 * pgfdw_inval_callback at the end of DROP SERVER command.
2410 Assert(entry
->invalidated
);
2413 (errmsg("cannot close dropped server connection because it is still in use")));
2417 (errmsg("cannot close connection for server \"%s\" because it is still in use",
2418 server
->servername
)));
2422 elog(DEBUG3
, "discarding connection %p", entry
->conn
);
2423 disconnect_pg_server(entry
);
2433 * Check if the remote server closed the connection.
2435 * Returns 1 if the connection is closed, -1 if an error occurred,
2436 * and 0 if it's not closed or if the connection check is unavailable
2440 pgfdw_conn_check(PGconn
*conn
)
2442 int sock
= PQsocket(conn
);
2444 if (PQstatus(conn
) != CONNECTION_OK
|| sock
== -1)
2447 #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2449 struct pollfd input_fd
;
2453 input_fd
.events
= POLLRDHUP
;
2454 input_fd
.revents
= 0;
2457 result
= poll(&input_fd
, 1, 0);
2458 while (result
< 0 && errno
== EINTR
);
2463 return (input_fd
.revents
&
2464 (POLLRDHUP
| POLLHUP
| POLLERR
| POLLNVAL
)) ? 1 : 0;
2472 * Check if connection status checking is available on this platform.
2474 * Returns true if available, false otherwise.
2477 pgfdw_conn_checkable(void)
2479 #if (defined(HAVE_POLL) && defined(POLLRDHUP))