1 /*-------------------------------------------------------------------------
4 * Foreign-data wrapper for remote PostgreSQL servers
6 * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
9 * contrib/postgres_fdw/postgres_fdw.c
11 *-------------------------------------------------------------------------
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "catalog/pg_opfamily.h"
22 #include "commands/defrem.h"
23 #include "commands/explain.h"
24 #include "commands/vacuum.h"
25 #include "executor/execAsync.h"
26 #include "foreign/fdwapi.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "nodes/nodeFuncs.h"
31 #include "optimizer/appendinfo.h"
32 #include "optimizer/clauses.h"
33 #include "optimizer/cost.h"
34 #include "optimizer/inherit.h"
35 #include "optimizer/optimizer.h"
36 #include "optimizer/pathnode.h"
37 #include "optimizer/paths.h"
38 #include "optimizer/planmain.h"
39 #include "optimizer/prep.h"
40 #include "optimizer/restrictinfo.h"
41 #include "optimizer/tlist.h"
42 #include "parser/parsetree.h"
43 #include "postgres_fdw.h"
44 #include "storage/latch.h"
45 #include "utils/builtins.h"
46 #include "utils/float.h"
47 #include "utils/guc.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/rel.h"
51 #include "utils/sampling.h"
52 #include "utils/selfuncs.h"
56 /* Default CPU cost to start up a foreign query. */
57 #define DEFAULT_FDW_STARTUP_COST 100.0
59 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
60 #define DEFAULT_FDW_TUPLE_COST 0.01
62 /* If no remote estimates, assume a sort costs 20% extra */
63 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
66 * Indexes of FDW-private information stored in fdw_private lists.
68 * These items are indexed with the enum FdwScanPrivateIndex, so an item
69 * can be fetched with list_nth(). For example, to get the SELECT statement:
70 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
72 enum FdwScanPrivateIndex
74 /* SQL statement to execute remotely (as a String node) */
75 FdwScanPrivateSelectSql
,
76 /* Integer list of attribute numbers retrieved by the SELECT */
77 FdwScanPrivateRetrievedAttrs
,
78 /* Integer representing the desired fetch_size */
79 FdwScanPrivateFetchSize
,
82 * String describing join i.e. names of relations being joined and types
83 * of join, added when the scan is join
85 FdwScanPrivateRelations
89 * Similarly, this enum describes what's kept in the fdw_private list for
90 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
92 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
93 * 2) Integer list of target attribute numbers for INSERT/UPDATE
95 * 3) Length till the end of VALUES clause for INSERT
96 * (-1 for a DELETE/UPDATE)
97 * 4) Boolean flag showing if the remote query has a RETURNING clause
98 * 5) Integer list of attribute numbers retrieved by RETURNING, if any
100 enum FdwModifyPrivateIndex
102 /* SQL statement to execute remotely (as a String node) */
103 FdwModifyPrivateUpdateSql
,
104 /* Integer list of target attribute numbers for INSERT/UPDATE */
105 FdwModifyPrivateTargetAttnums
,
106 /* Length till the end of VALUES clause (as an Integer node) */
108 /* has-returning flag (as a Boolean node) */
109 FdwModifyPrivateHasReturning
,
110 /* Integer list of attribute numbers retrieved by RETURNING */
111 FdwModifyPrivateRetrievedAttrs
115 * Similarly, this enum describes what's kept in the fdw_private list for
116 * a ForeignScan node that modifies a foreign table directly. We store:
118 * 1) UPDATE/DELETE statement text to be sent to the remote server
119 * 2) Boolean flag showing if the remote query has a RETURNING clause
120 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
121 * 4) Boolean flag showing if we set the command es_processed
123 enum FdwDirectModifyPrivateIndex
125 /* SQL statement to execute remotely (as a String node) */
126 FdwDirectModifyPrivateUpdateSql
,
127 /* has-returning flag (as a Boolean node) */
128 FdwDirectModifyPrivateHasReturning
,
129 /* Integer list of attribute numbers retrieved by RETURNING */
130 FdwDirectModifyPrivateRetrievedAttrs
,
131 /* set-processed flag (as a Boolean node) */
132 FdwDirectModifyPrivateSetProcessed
136 * Execution state of a foreign scan using postgres_fdw.
138 typedef struct PgFdwScanState
140 Relation rel
; /* relcache entry for the foreign table. NULL
141 * for a foreign join scan. */
142 TupleDesc tupdesc
; /* tuple descriptor of scan */
143 AttInMetadata
*attinmeta
; /* attribute datatype conversion metadata */
145 /* extracted fdw_private data */
146 char *query
; /* text of SELECT command */
147 List
*retrieved_attrs
; /* list of retrieved attribute numbers */
149 /* for remote query execution */
150 PGconn
*conn
; /* connection for the scan */
151 PgFdwConnState
*conn_state
; /* extra per-connection state */
152 unsigned int cursor_number
; /* quasi-unique ID for my cursor */
153 bool cursor_exists
; /* have we created the cursor? */
154 int numParams
; /* number of parameters passed to query */
155 FmgrInfo
*param_flinfo
; /* output conversion functions for them */
156 List
*param_exprs
; /* executable expressions for param values */
157 const char **param_values
; /* textual values of query parameters */
159 /* for storing result tuples */
160 HeapTuple
*tuples
; /* array of currently-retrieved tuples */
161 int num_tuples
; /* # of tuples in array */
162 int next_tuple
; /* index of next one to return */
164 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
165 int fetch_ct_2
; /* Min(# of fetches done, 2) */
166 bool eof_reached
; /* true if last fetch reached EOF */
168 /* for asynchronous execution */
169 bool async_capable
; /* engage asynchronous-capable logic? */
171 /* working memory contexts */
172 MemoryContext batch_cxt
; /* context holding current batch of tuples */
173 MemoryContext temp_cxt
; /* context for per-tuple temporary data */
175 int fetch_size
; /* number of tuples per fetch */
179 * Execution state of a foreign insert/update/delete operation.
181 typedef struct PgFdwModifyState
183 Relation rel
; /* relcache entry for the foreign table */
184 AttInMetadata
*attinmeta
; /* attribute datatype conversion metadata */
186 /* for remote query execution */
187 PGconn
*conn
; /* connection for the scan */
188 PgFdwConnState
*conn_state
; /* extra per-connection state */
189 char *p_name
; /* name of prepared statement, if created */
191 /* extracted fdw_private data */
192 char *query
; /* text of INSERT/UPDATE/DELETE command */
193 char *orig_query
; /* original text of INSERT command */
194 List
*target_attrs
; /* list of target attribute numbers */
195 int values_end
; /* length up to the end of VALUES */
196 int batch_size
; /* value of FDW option "batch_size" */
197 bool has_returning
; /* is there a RETURNING clause? */
198 List
*retrieved_attrs
; /* attr numbers retrieved by RETURNING */
200 /* info about parameters for prepared statement */
201 AttrNumber ctidAttno
; /* attnum of input resjunk ctid column */
202 int p_nums
; /* number of parameters to transmit */
203 FmgrInfo
*p_flinfo
; /* output conversion functions for them */
205 /* batch operation stuff */
206 int num_slots
; /* number of slots to insert */
208 /* working memory context */
209 MemoryContext temp_cxt
; /* context for per-tuple temporary data */
211 /* for update row movement if subplan result rel */
212 struct PgFdwModifyState
*aux_fmstate
; /* foreign-insert state, if
217 * Execution state of a foreign scan that modifies a foreign table directly.
219 typedef struct PgFdwDirectModifyState
221 Relation rel
; /* relcache entry for the foreign table */
222 AttInMetadata
*attinmeta
; /* attribute datatype conversion metadata */
224 /* extracted fdw_private data */
225 char *query
; /* text of UPDATE/DELETE command */
226 bool has_returning
; /* is there a RETURNING clause? */
227 List
*retrieved_attrs
; /* attr numbers retrieved by RETURNING */
228 bool set_processed
; /* do we set the command es_processed? */
230 /* for remote query execution */
231 PGconn
*conn
; /* connection for the update */
232 PgFdwConnState
*conn_state
; /* extra per-connection state */
233 int numParams
; /* number of parameters passed to query */
234 FmgrInfo
*param_flinfo
; /* output conversion functions for them */
235 List
*param_exprs
; /* executable expressions for param values */
236 const char **param_values
; /* textual values of query parameters */
238 /* for storing result tuples */
239 PGresult
*result
; /* result for query */
240 int num_tuples
; /* # of result tuples */
241 int next_tuple
; /* index of next one to return */
242 Relation resultRel
; /* relcache entry for the target relation */
243 AttrNumber
*attnoMap
; /* array of attnums of input user columns */
244 AttrNumber ctidAttno
; /* attnum of input ctid column */
245 AttrNumber oidAttno
; /* attnum of input oid column */
246 bool hasSystemCols
; /* are there system columns of resultRel? */
248 /* working memory context */
249 MemoryContext temp_cxt
; /* context for per-tuple temporary data */
250 } PgFdwDirectModifyState
;
253 * Workspace for analyzing a foreign table.
255 typedef struct PgFdwAnalyzeState
257 Relation rel
; /* relcache entry for the foreign table */
258 AttInMetadata
*attinmeta
; /* attribute datatype conversion metadata */
259 List
*retrieved_attrs
; /* attr numbers retrieved by query */
261 /* collected sample rows */
262 HeapTuple
*rows
; /* array of size targrows */
263 int targrows
; /* target # of sample rows */
264 int numrows
; /* # of sample rows collected */
266 /* for random sampling */
267 double samplerows
; /* # of rows fetched */
268 double rowstoskip
; /* # of rows to skip before next sample */
269 ReservoirStateData rstate
; /* state for reservoir sampling */
271 /* working memory contexts */
272 MemoryContext anl_cxt
; /* context for per-analyze lifespan data */
273 MemoryContext temp_cxt
; /* context for per-tuple temporary data */
277 * This enum describes what's kept in the fdw_private list for a ForeignPath.
280 * 1) Boolean flag showing if the remote query has the final sort
281 * 2) Boolean flag showing if the remote query has the LIMIT clause
283 enum FdwPathPrivateIndex
285 /* has-final-sort flag (as a Boolean node) */
286 FdwPathPrivateHasFinalSort
,
287 /* has-limit flag (as a Boolean node) */
288 FdwPathPrivateHasLimit
291 /* Struct for extra information passed to estimate_path_cost_size() */
300 } PgFdwPathExtraData
;
303 * Identify the attribute where data conversion fails.
305 typedef struct ConversionLocation
307 AttrNumber cur_attno
; /* attribute number being processed, or 0 */
308 Relation rel
; /* foreign table being processed, or NULL */
309 ForeignScanState
*fsstate
; /* plan node being processed, or NULL */
310 } ConversionLocation
;
312 /* Callback argument for ec_member_matches_foreign */
315 Expr
*current
; /* current expr, or NULL if not yet found */
316 List
*already_used
; /* expressions already dealt with */
317 } ec_member_foreign_arg
;
322 PG_FUNCTION_INFO_V1(postgres_fdw_handler
);
325 * FDW callback routines
327 static void postgresGetForeignRelSize(PlannerInfo
*root
,
330 static void postgresGetForeignPaths(PlannerInfo
*root
,
333 static ForeignScan
*postgresGetForeignPlan(PlannerInfo
*root
,
334 RelOptInfo
*foreignrel
,
336 ForeignPath
*best_path
,
340 static void postgresBeginForeignScan(ForeignScanState
*node
, int eflags
);
341 static TupleTableSlot
*postgresIterateForeignScan(ForeignScanState
*node
);
342 static void postgresReScanForeignScan(ForeignScanState
*node
);
343 static void postgresEndForeignScan(ForeignScanState
*node
);
344 static void postgresAddForeignUpdateTargets(PlannerInfo
*root
,
346 RangeTblEntry
*target_rte
,
347 Relation target_relation
);
348 static List
*postgresPlanForeignModify(PlannerInfo
*root
,
350 Index resultRelation
,
352 static void postgresBeginForeignModify(ModifyTableState
*mtstate
,
353 ResultRelInfo
*resultRelInfo
,
357 static TupleTableSlot
*postgresExecForeignInsert(EState
*estate
,
358 ResultRelInfo
*resultRelInfo
,
359 TupleTableSlot
*slot
,
360 TupleTableSlot
*planSlot
);
361 static TupleTableSlot
**postgresExecForeignBatchInsert(EState
*estate
,
362 ResultRelInfo
*resultRelInfo
,
363 TupleTableSlot
**slots
,
364 TupleTableSlot
**planSlots
,
366 static int postgresGetForeignModifyBatchSize(ResultRelInfo
*resultRelInfo
);
367 static TupleTableSlot
*postgresExecForeignUpdate(EState
*estate
,
368 ResultRelInfo
*resultRelInfo
,
369 TupleTableSlot
*slot
,
370 TupleTableSlot
*planSlot
);
371 static TupleTableSlot
*postgresExecForeignDelete(EState
*estate
,
372 ResultRelInfo
*resultRelInfo
,
373 TupleTableSlot
*slot
,
374 TupleTableSlot
*planSlot
);
375 static void postgresEndForeignModify(EState
*estate
,
376 ResultRelInfo
*resultRelInfo
);
377 static void postgresBeginForeignInsert(ModifyTableState
*mtstate
,
378 ResultRelInfo
*resultRelInfo
);
379 static void postgresEndForeignInsert(EState
*estate
,
380 ResultRelInfo
*resultRelInfo
);
381 static int postgresIsForeignRelUpdatable(Relation rel
);
382 static bool postgresPlanDirectModify(PlannerInfo
*root
,
384 Index resultRelation
,
386 static void postgresBeginDirectModify(ForeignScanState
*node
, int eflags
);
387 static TupleTableSlot
*postgresIterateDirectModify(ForeignScanState
*node
);
388 static void postgresEndDirectModify(ForeignScanState
*node
);
389 static void postgresExplainForeignScan(ForeignScanState
*node
,
391 static void postgresExplainForeignModify(ModifyTableState
*mtstate
,
392 ResultRelInfo
*rinfo
,
396 static void postgresExplainDirectModify(ForeignScanState
*node
,
398 static void postgresExecForeignTruncate(List
*rels
,
399 DropBehavior behavior
,
401 static bool postgresAnalyzeForeignTable(Relation relation
,
402 AcquireSampleRowsFunc
*func
,
403 BlockNumber
*totalpages
);
404 static List
*postgresImportForeignSchema(ImportForeignSchemaStmt
*stmt
,
406 static void postgresGetForeignJoinPaths(PlannerInfo
*root
,
408 RelOptInfo
*outerrel
,
409 RelOptInfo
*innerrel
,
411 JoinPathExtraData
*extra
);
412 static bool postgresRecheckForeignScan(ForeignScanState
*node
,
413 TupleTableSlot
*slot
);
414 static void postgresGetForeignUpperPaths(PlannerInfo
*root
,
415 UpperRelationKind stage
,
416 RelOptInfo
*input_rel
,
417 RelOptInfo
*output_rel
,
419 static bool postgresIsForeignPathAsyncCapable(ForeignPath
*path
);
420 static void postgresForeignAsyncRequest(AsyncRequest
*areq
);
421 static void postgresForeignAsyncConfigureWait(AsyncRequest
*areq
);
422 static void postgresForeignAsyncNotify(AsyncRequest
*areq
);
427 static void estimate_path_cost_size(PlannerInfo
*root
,
428 RelOptInfo
*foreignrel
,
429 List
*param_join_conds
,
431 PgFdwPathExtraData
*fpextra
,
432 double *p_rows
, int *p_width
,
433 Cost
*p_startup_cost
, Cost
*p_total_cost
);
434 static void get_remote_estimate(const char *sql
,
440 static void adjust_foreign_grouping_path_cost(PlannerInfo
*root
,
442 double retrieved_rows
,
445 Cost
*p_startup_cost
,
447 static bool ec_member_matches_foreign(PlannerInfo
*root
, RelOptInfo
*rel
,
448 EquivalenceClass
*ec
, EquivalenceMember
*em
,
450 static void create_cursor(ForeignScanState
*node
);
451 static void fetch_more_data(ForeignScanState
*node
);
452 static void close_cursor(PGconn
*conn
, unsigned int cursor_number
,
453 PgFdwConnState
*conn_state
);
454 static PgFdwModifyState
*create_foreign_modify(EState
*estate
,
456 ResultRelInfo
*resultRelInfo
,
463 List
*retrieved_attrs
);
464 static TupleTableSlot
**execute_foreign_modify(EState
*estate
,
465 ResultRelInfo
*resultRelInfo
,
467 TupleTableSlot
**slots
,
468 TupleTableSlot
**planSlots
,
470 static void prepare_foreign_modify(PgFdwModifyState
*fmstate
);
471 static const char **convert_prep_stmt_params(PgFdwModifyState
*fmstate
,
473 TupleTableSlot
**slots
,
475 static void store_returning_result(PgFdwModifyState
*fmstate
,
476 TupleTableSlot
*slot
, PGresult
*res
);
477 static void finish_foreign_modify(PgFdwModifyState
*fmstate
);
478 static void deallocate_query(PgFdwModifyState
*fmstate
);
479 static List
*build_remote_returning(Index rtindex
, Relation rel
,
480 List
*returningList
);
481 static void rebuild_fdw_scan_tlist(ForeignScan
*fscan
, List
*tlist
);
482 static void execute_dml_stmt(ForeignScanState
*node
);
483 static TupleTableSlot
*get_returning_data(ForeignScanState
*node
);
484 static void init_returning_filter(PgFdwDirectModifyState
*dmstate
,
485 List
*fdw_scan_tlist
,
487 static TupleTableSlot
*apply_returning_filter(PgFdwDirectModifyState
*dmstate
,
488 ResultRelInfo
*resultRelInfo
,
489 TupleTableSlot
*slot
,
491 static void prepare_query_params(PlanState
*node
,
494 FmgrInfo
**param_flinfo
,
496 const char ***param_values
);
497 static void process_query_params(ExprContext
*econtext
,
498 FmgrInfo
*param_flinfo
,
500 const char **param_values
);
501 static int postgresAcquireSampleRowsFunc(Relation relation
, int elevel
,
502 HeapTuple
*rows
, int targrows
,
504 double *totaldeadrows
);
505 static void analyze_row_processor(PGresult
*res
, int row
,
506 PgFdwAnalyzeState
*astate
);
507 static void produce_tuple_asynchronously(AsyncRequest
*areq
, bool fetch
);
508 static void fetch_more_data_begin(AsyncRequest
*areq
);
509 static void complete_pending_request(AsyncRequest
*areq
);
510 static HeapTuple
make_tuple_from_result_row(PGresult
*res
,
513 AttInMetadata
*attinmeta
,
514 List
*retrieved_attrs
,
515 ForeignScanState
*fsstate
,
516 MemoryContext temp_context
);
517 static void conversion_error_callback(void *arg
);
518 static bool foreign_join_ok(PlannerInfo
*root
, RelOptInfo
*joinrel
,
519 JoinType jointype
, RelOptInfo
*outerrel
, RelOptInfo
*innerrel
,
520 JoinPathExtraData
*extra
);
521 static bool foreign_grouping_ok(PlannerInfo
*root
, RelOptInfo
*grouped_rel
,
523 static List
*get_useful_pathkeys_for_relation(PlannerInfo
*root
,
525 static List
*get_useful_ecs_for_relation(PlannerInfo
*root
, RelOptInfo
*rel
);
526 static void add_paths_with_pathkeys_for_rel(PlannerInfo
*root
, RelOptInfo
*rel
,
527 Path
*epq_path
, List
*restrictlist
);
528 static void add_foreign_grouping_paths(PlannerInfo
*root
,
529 RelOptInfo
*input_rel
,
530 RelOptInfo
*grouped_rel
,
531 GroupPathExtraData
*extra
);
532 static void add_foreign_ordered_paths(PlannerInfo
*root
,
533 RelOptInfo
*input_rel
,
534 RelOptInfo
*ordered_rel
);
535 static void add_foreign_final_paths(PlannerInfo
*root
,
536 RelOptInfo
*input_rel
,
537 RelOptInfo
*final_rel
,
538 FinalPathExtraData
*extra
);
539 static void apply_server_options(PgFdwRelationInfo
*fpinfo
);
540 static void apply_table_options(PgFdwRelationInfo
*fpinfo
);
541 static void merge_fdw_options(PgFdwRelationInfo
*fpinfo
,
542 const PgFdwRelationInfo
*fpinfo_o
,
543 const PgFdwRelationInfo
*fpinfo_i
);
544 static int get_batch_size_option(Relation rel
);
548 * Foreign-data wrapper handler function: return a struct with pointers
549 * to my callback routines.
552 postgres_fdw_handler(PG_FUNCTION_ARGS
)
554 FdwRoutine
*routine
= makeNode(FdwRoutine
);
556 /* Functions for scanning foreign tables */
557 routine
->GetForeignRelSize
= postgresGetForeignRelSize
;
558 routine
->GetForeignPaths
= postgresGetForeignPaths
;
559 routine
->GetForeignPlan
= postgresGetForeignPlan
;
560 routine
->BeginForeignScan
= postgresBeginForeignScan
;
561 routine
->IterateForeignScan
= postgresIterateForeignScan
;
562 routine
->ReScanForeignScan
= postgresReScanForeignScan
;
563 routine
->EndForeignScan
= postgresEndForeignScan
;
565 /* Functions for updating foreign tables */
566 routine
->AddForeignUpdateTargets
= postgresAddForeignUpdateTargets
;
567 routine
->PlanForeignModify
= postgresPlanForeignModify
;
568 routine
->BeginForeignModify
= postgresBeginForeignModify
;
569 routine
->ExecForeignInsert
= postgresExecForeignInsert
;
570 routine
->ExecForeignBatchInsert
= postgresExecForeignBatchInsert
;
571 routine
->GetForeignModifyBatchSize
= postgresGetForeignModifyBatchSize
;
572 routine
->ExecForeignUpdate
= postgresExecForeignUpdate
;
573 routine
->ExecForeignDelete
= postgresExecForeignDelete
;
574 routine
->EndForeignModify
= postgresEndForeignModify
;
575 routine
->BeginForeignInsert
= postgresBeginForeignInsert
;
576 routine
->EndForeignInsert
= postgresEndForeignInsert
;
577 routine
->IsForeignRelUpdatable
= postgresIsForeignRelUpdatable
;
578 routine
->PlanDirectModify
= postgresPlanDirectModify
;
579 routine
->BeginDirectModify
= postgresBeginDirectModify
;
580 routine
->IterateDirectModify
= postgresIterateDirectModify
;
581 routine
->EndDirectModify
= postgresEndDirectModify
;
583 /* Function for EvalPlanQual rechecks */
584 routine
->RecheckForeignScan
= postgresRecheckForeignScan
;
585 /* Support functions for EXPLAIN */
586 routine
->ExplainForeignScan
= postgresExplainForeignScan
;
587 routine
->ExplainForeignModify
= postgresExplainForeignModify
;
588 routine
->ExplainDirectModify
= postgresExplainDirectModify
;
590 /* Support function for TRUNCATE */
591 routine
->ExecForeignTruncate
= postgresExecForeignTruncate
;
593 /* Support functions for ANALYZE */
594 routine
->AnalyzeForeignTable
= postgresAnalyzeForeignTable
;
596 /* Support functions for IMPORT FOREIGN SCHEMA */
597 routine
->ImportForeignSchema
= postgresImportForeignSchema
;
599 /* Support functions for join push-down */
600 routine
->GetForeignJoinPaths
= postgresGetForeignJoinPaths
;
602 /* Support functions for upper relation push-down */
603 routine
->GetForeignUpperPaths
= postgresGetForeignUpperPaths
;
605 /* Support functions for asynchronous execution */
606 routine
->IsForeignPathAsyncCapable
= postgresIsForeignPathAsyncCapable
;
607 routine
->ForeignAsyncRequest
= postgresForeignAsyncRequest
;
608 routine
->ForeignAsyncConfigureWait
= postgresForeignAsyncConfigureWait
;
609 routine
->ForeignAsyncNotify
= postgresForeignAsyncNotify
;
611 PG_RETURN_POINTER(routine
);
615 * postgresGetForeignRelSize
616 * Estimate # of rows and width of the result of the scan
618 * We should consider the effect of all baserestrictinfo clauses here, but
619 * not any join clauses.
622 postgresGetForeignRelSize(PlannerInfo
*root
,
626 PgFdwRelationInfo
*fpinfo
;
630 * We use PgFdwRelationInfo to pass various information to subsequent
633 fpinfo
= (PgFdwRelationInfo
*) palloc0(sizeof(PgFdwRelationInfo
));
634 baserel
->fdw_private
= (void *) fpinfo
;
636 /* Base foreign tables need to be pushed down always. */
637 fpinfo
->pushdown_safe
= true;
639 /* Look up foreign-table catalog info. */
640 fpinfo
->table
= GetForeignTable(foreigntableid
);
641 fpinfo
->server
= GetForeignServer(fpinfo
->table
->serverid
);
644 * Extract user-settable option values. Note that per-table settings of
645 * use_remote_estimate, fetch_size and async_capable override per-server
646 * settings of them, respectively.
648 fpinfo
->use_remote_estimate
= false;
649 fpinfo
->fdw_startup_cost
= DEFAULT_FDW_STARTUP_COST
;
650 fpinfo
->fdw_tuple_cost
= DEFAULT_FDW_TUPLE_COST
;
651 fpinfo
->shippable_extensions
= NIL
;
652 fpinfo
->fetch_size
= 100;
653 fpinfo
->async_capable
= false;
655 apply_server_options(fpinfo
);
656 apply_table_options(fpinfo
);
659 * If the table or the server is configured to use remote estimates,
660 * identify which user to do remote access as during planning. This
661 * should match what ExecCheckPermissions() does. If we fail due to lack
662 * of permissions, the query would have failed at runtime anyway.
664 if (fpinfo
->use_remote_estimate
)
668 userid
= OidIsValid(baserel
->userid
) ? baserel
->userid
: GetUserId();
669 fpinfo
->user
= GetUserMapping(userid
, fpinfo
->server
->serverid
);
675 * Identify which baserestrictinfo clauses can be sent to the remote
676 * server and which can't.
678 classifyConditions(root
, baserel
, baserel
->baserestrictinfo
,
679 &fpinfo
->remote_conds
, &fpinfo
->local_conds
);
682 * Identify which attributes will need to be retrieved from the remote
683 * server. These include all attrs needed for joins or final output, plus
684 * all attrs used in the local_conds. (Note: if we end up using a
685 * parameterized scan, it's possible that some of the join clauses will be
686 * sent to the remote and thus we wouldn't really need to retrieve the
687 * columns used in them. Doesn't seem worth detecting that case though.)
689 fpinfo
->attrs_used
= NULL
;
690 pull_varattnos((Node
*) baserel
->reltarget
->exprs
, baserel
->relid
,
691 &fpinfo
->attrs_used
);
692 foreach(lc
, fpinfo
->local_conds
)
694 RestrictInfo
*rinfo
= lfirst_node(RestrictInfo
, lc
);
696 pull_varattnos((Node
*) rinfo
->clause
, baserel
->relid
,
697 &fpinfo
->attrs_used
);
701 * Compute the selectivity and cost of the local_conds, so we don't have
702 * to do it over again for each path. The best we can do for these
703 * conditions is to estimate selectivity on the basis of local statistics.
705 fpinfo
->local_conds_sel
= clauselist_selectivity(root
,
711 cost_qual_eval(&fpinfo
->local_conds_cost
, fpinfo
->local_conds
, root
);
714 * Set # of retrieved rows and cached relation costs to some negative
715 * value, so that we can detect when they are set to some sensible values,
716 * during one (usually the first) of the calls to estimate_path_cost_size.
718 fpinfo
->retrieved_rows
= -1;
719 fpinfo
->rel_startup_cost
= -1;
720 fpinfo
->rel_total_cost
= -1;
723 * If the table or the server is configured to use remote estimates,
724 * connect to the foreign server and execute EXPLAIN to estimate the
725 * number of rows selected by the restriction clauses, as well as the
726 * average row width. Otherwise, estimate using whatever statistics we
727 * have locally, in a way similar to ordinary tables.
729 if (fpinfo
->use_remote_estimate
)
732 * Get cost/size estimates with help of remote server. Save the
733 * values in fpinfo so we don't need to do it again to generate the
734 * basic foreign path.
736 estimate_path_cost_size(root
, baserel
, NIL
, NIL
, NULL
,
737 &fpinfo
->rows
, &fpinfo
->width
,
738 &fpinfo
->startup_cost
, &fpinfo
->total_cost
);
740 /* Report estimated baserel size to planner. */
741 baserel
->rows
= fpinfo
->rows
;
742 baserel
->reltarget
->width
= fpinfo
->width
;
747 * If the foreign table has never been ANALYZEd, it will have
748 * reltuples < 0, meaning "unknown". We can't do much if we're not
749 * allowed to consult the remote server, but we can use a hack similar
750 * to plancat.c's treatment of empty relations: use a minimum size
751 * estimate of 10 pages, and divide by the column-datatype-based width
752 * estimate to get the corresponding number of tuples.
754 if (baserel
->tuples
< 0)
758 (10 * BLCKSZ
) / (baserel
->reltarget
->width
+
759 MAXALIGN(SizeofHeapTupleHeader
));
762 /* Estimate baserel size as best we can with local statistics. */
763 set_baserel_size_estimates(root
, baserel
);
765 /* Fill in basically-bogus cost estimates for use later. */
766 estimate_path_cost_size(root
, baserel
, NIL
, NIL
, NULL
,
767 &fpinfo
->rows
, &fpinfo
->width
,
768 &fpinfo
->startup_cost
, &fpinfo
->total_cost
);
772 * fpinfo->relation_name gets the numeric rangetable index of the foreign
773 * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
774 * human-readable string at that time.)
776 fpinfo
->relation_name
= psprintf("%u", baserel
->relid
);
778 /* No outer and inner relations. */
779 fpinfo
->make_outerrel_subquery
= false;
780 fpinfo
->make_innerrel_subquery
= false;
781 fpinfo
->lower_subquery_rels
= NULL
;
782 /* Set the relation index. */
783 fpinfo
->relation_index
= baserel
->relid
;
787 * get_useful_ecs_for_relation
788 * Determine which EquivalenceClasses might be involved in useful
789 * orderings of this relation.
791 * This function is in some respects a mirror image of the core function
792 * pathkeys_useful_for_merging: for a regular table, we know what indexes
793 * we have and want to test whether any of them are useful. For a foreign
794 * table, we don't know what indexes are present on the remote side but
795 * want to speculate about which ones we'd like to use if they existed.
797 * This function returns a list of potentially-useful equivalence classes,
798 * but it does not guarantee that an EquivalenceMember exists which contains
799 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
800 * ft1.x + t1.x = 0, this function will say that the equivalence class
801 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
802 * t1 is local (or on a different server), it will turn out that no useful
803 * ORDER BY clause can be generated. It's not our job to figure that out
804 * here; we're only interested in identifying relevant ECs.
807 get_useful_ecs_for_relation(PlannerInfo
*root
, RelOptInfo
*rel
)
809 List
*useful_eclass_list
= NIL
;
814 * First, consider whether any active EC is potentially useful for a merge
815 * join against this relation.
817 if (rel
->has_eclass_joins
)
819 foreach(lc
, root
->eq_classes
)
821 EquivalenceClass
*cur_ec
= (EquivalenceClass
*) lfirst(lc
);
823 if (eclass_useful_for_merging(root
, cur_ec
, rel
))
824 useful_eclass_list
= lappend(useful_eclass_list
, cur_ec
);
829 * Next, consider whether there are any non-EC derivable join clauses that
830 * are merge-joinable. If the joininfo list is empty, we can exit
833 if (rel
->joininfo
== NIL
)
834 return useful_eclass_list
;
836 /* If this is a child rel, we must use the topmost parent rel to search. */
837 if (IS_OTHER_REL(rel
))
839 Assert(!bms_is_empty(rel
->top_parent_relids
));
840 relids
= rel
->top_parent_relids
;
843 relids
= rel
->relids
;
845 /* Check each join clause in turn. */
846 foreach(lc
, rel
->joininfo
)
848 RestrictInfo
*restrictinfo
= (RestrictInfo
*) lfirst(lc
);
850 /* Consider only mergejoinable clauses */
851 if (restrictinfo
->mergeopfamilies
== NIL
)
854 /* Make sure we've got canonical ECs. */
855 update_mergeclause_eclasses(root
, restrictinfo
);
858 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
859 * that left_ec and right_ec will be initialized, per comments in
860 * distribute_qual_to_rels.
862 * We want to identify which side of this merge-joinable clause
863 * contains columns from the relation produced by this RelOptInfo. We
864 * test for overlap, not containment, because there could be extra
865 * relations on either side. For example, suppose we've got something
866 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
867 * A.y = D.y. The input rel might be the joinrel between A and B, and
868 * we'll consider the join clause A.y = D.y. relids contains a
869 * relation not involved in the join class (B) and the equivalence
870 * class for the left-hand side of the clause contains a relation not
871 * involved in the input rel (C). Despite the fact that we have only
872 * overlap and not containment in either direction, A.y is potentially
873 * useful as a sort column.
875 * Note that it's even possible that relids overlaps neither side of
876 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
877 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
878 * but overlaps neither side of B. In that case, we just skip this
879 * join clause, since it doesn't suggest a useful sort order for this
882 if (bms_overlap(relids
, restrictinfo
->right_ec
->ec_relids
))
883 useful_eclass_list
= list_append_unique_ptr(useful_eclass_list
,
884 restrictinfo
->right_ec
);
885 else if (bms_overlap(relids
, restrictinfo
->left_ec
->ec_relids
))
886 useful_eclass_list
= list_append_unique_ptr(useful_eclass_list
,
887 restrictinfo
->left_ec
);
890 return useful_eclass_list
;
894 * get_useful_pathkeys_for_relation
895 * Determine which orderings of a relation might be useful.
897 * Getting data in sorted order can be useful either because the requested
898 * order matches the final output ordering for the overall query we're
899 * planning, or because it enables an efficient merge join. Here, we try
900 * to figure out which pathkeys to consider.
903 get_useful_pathkeys_for_relation(PlannerInfo
*root
, RelOptInfo
*rel
)
905 List
*useful_pathkeys_list
= NIL
;
906 List
*useful_eclass_list
;
907 PgFdwRelationInfo
*fpinfo
= (PgFdwRelationInfo
*) rel
->fdw_private
;
908 EquivalenceClass
*query_ec
= NULL
;
912 * Pushing the query_pathkeys to the remote server is always worth
913 * considering, because it might let us avoid a local sort.
915 fpinfo
->qp_is_pushdown_safe
= false;
916 if (root
->query_pathkeys
)
918 bool query_pathkeys_ok
= true;
920 foreach(lc
, root
->query_pathkeys
)
922 PathKey
*pathkey
= (PathKey
*) lfirst(lc
);
925 * The planner and executor don't have any clever strategy for
926 * taking data sorted by a prefix of the query's pathkeys and
927 * getting it to be sorted by all of those pathkeys. We'll just
928 * end up resorting the entire data set. So, unless we can push
929 * down all of the query pathkeys, forget it.
931 if (!is_foreign_pathkey(root
, rel
, pathkey
))
933 query_pathkeys_ok
= false;
938 if (query_pathkeys_ok
)
940 useful_pathkeys_list
= list_make1(list_copy(root
->query_pathkeys
));
941 fpinfo
->qp_is_pushdown_safe
= true;
946 * Even if we're not using remote estimates, having the remote side do the
947 * sort generally won't be any worse than doing it locally, and it might
948 * be much better if the remote side can generate data in the right order
949 * without needing a sort at all. However, what we're going to do next is
950 * try to generate pathkeys that seem promising for possible merge joins,
951 * and that's more speculative. A wrong choice might hurt quite a bit, so
952 * bail out if we can't use remote estimates.
954 if (!fpinfo
->use_remote_estimate
)
955 return useful_pathkeys_list
;
957 /* Get the list of interesting EquivalenceClasses. */
958 useful_eclass_list
= get_useful_ecs_for_relation(root
, rel
);
960 /* Extract unique EC for query, if any, so we don't consider it again. */
961 if (list_length(root
->query_pathkeys
) == 1)
963 PathKey
*query_pathkey
= linitial(root
->query_pathkeys
);
965 query_ec
= query_pathkey
->pk_eclass
;
969 * As a heuristic, the only pathkeys we consider here are those of length
970 * one. It's surely possible to consider more, but since each one we
971 * choose to consider will generate a round-trip to the remote side, we
972 * need to be a bit cautious here. It would sure be nice to have a local
973 * cache of information about remote index definitions...
975 foreach(lc
, useful_eclass_list
)
977 EquivalenceClass
*cur_ec
= lfirst(lc
);
980 /* If redundant with what we did above, skip it. */
981 if (cur_ec
== query_ec
)
984 /* Can't push down the sort if the EC's opfamily is not shippable. */
985 if (!is_shippable(linitial_oid(cur_ec
->ec_opfamilies
),
986 OperatorFamilyRelationId
, fpinfo
))
989 /* If no pushable expression for this rel, skip it. */
990 if (find_em_for_rel(root
, cur_ec
, rel
) == NULL
)
993 /* Looks like we can generate a pathkey, so let's do it. */
994 pathkey
= make_canonical_pathkey(root
, cur_ec
,
995 linitial_oid(cur_ec
->ec_opfamilies
),
996 BTLessStrategyNumber
,
998 useful_pathkeys_list
= lappend(useful_pathkeys_list
,
999 list_make1(pathkey
));
1002 return useful_pathkeys_list
;
1006 * postgresGetForeignPaths
1007 * Create possible scan paths for a scan on the foreign table
1010 postgresGetForeignPaths(PlannerInfo
*root
,
1011 RelOptInfo
*baserel
,
1014 PgFdwRelationInfo
*fpinfo
= (PgFdwRelationInfo
*) baserel
->fdw_private
;
1020 * Create simplest ForeignScan path node and add it to baserel. This path
1021 * corresponds to SeqScan path of regular tables (though depending on what
1022 * baserestrict conditions we were able to send to remote, there might
1023 * actually be an indexscan happening there). We already did all the work
1024 * to estimate cost and size of this path.
1026 * Although this path uses no join clauses, it could still have required
1027 * parameterization due to LATERAL refs in its tlist.
1029 path
= create_foreignscan_path(root
, baserel
,
1030 NULL
, /* default pathtarget */
1032 fpinfo
->startup_cost
,
1034 NIL
, /* no pathkeys */
1035 baserel
->lateral_relids
,
1036 NULL
, /* no extra plan */
1037 NIL
, /* no fdw_restrictinfo list */
1038 NIL
); /* no fdw_private list */
1039 add_path(baserel
, (Path
*) path
);
1041 /* Add paths with pathkeys */
1042 add_paths_with_pathkeys_for_rel(root
, baserel
, NULL
, NIL
);
1045 * If we're not using remote estimates, stop here. We have no way to
1046 * estimate whether any join clauses would be worth sending across, so
1047 * don't bother building parameterized paths.
1049 if (!fpinfo
->use_remote_estimate
)
1053 * Thumb through all join clauses for the rel to identify which outer
1054 * relations could supply one or more safe-to-send-to-remote join clauses.
1055 * We'll build a parameterized path for each such outer relation.
1057 * It's convenient to manage this by representing each candidate outer
1058 * relation by the ParamPathInfo node for it. We can then use the
1059 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1060 * interesting join clauses for that rel. This takes care of the
1061 * possibility that there are multiple safe join clauses for such a rel,
1062 * and also ensures that we account for unsafe join clauses that we'll
1063 * still have to enforce locally (since the parameterized-path machinery
1064 * insists that we handle all movable clauses).
1067 foreach(lc
, baserel
->joininfo
)
1069 RestrictInfo
*rinfo
= (RestrictInfo
*) lfirst(lc
);
1070 Relids required_outer
;
1071 ParamPathInfo
*param_info
;
1073 /* Check if clause can be moved to this rel */
1074 if (!join_clause_is_movable_to(rinfo
, baserel
))
1077 /* See if it is safe to send to remote */
1078 if (!is_foreign_expr(root
, baserel
, rinfo
->clause
))
1081 /* Calculate required outer rels for the resulting path */
1082 required_outer
= bms_union(rinfo
->clause_relids
,
1083 baserel
->lateral_relids
);
1084 /* We do not want the foreign rel itself listed in required_outer */
1085 required_outer
= bms_del_member(required_outer
, baserel
->relid
);
1088 * required_outer probably can't be empty here, but if it were, we
1089 * couldn't make a parameterized path.
1091 if (bms_is_empty(required_outer
))
1094 /* Get the ParamPathInfo */
1095 param_info
= get_baserel_parampathinfo(root
, baserel
,
1097 Assert(param_info
!= NULL
);
1100 * Add it to list unless we already have it. Testing pointer equality
1101 * is OK since get_baserel_parampathinfo won't make duplicates.
1103 ppi_list
= list_append_unique_ptr(ppi_list
, param_info
);
1107 * The above scan examined only "generic" join clauses, not those that
1108 * were absorbed into EquivalenceClauses. See if we can make anything out
1109 * of EquivalenceClauses.
1111 if (baserel
->has_eclass_joins
)
1114 * We repeatedly scan the eclass list looking for column references
1115 * (or expressions) belonging to the foreign rel. Each time we find
1116 * one, we generate a list of equivalence joinclauses for it, and then
1117 * see if any are safe to send to the remote. Repeat till there are
1118 * no more candidate EC members.
1120 ec_member_foreign_arg arg
;
1122 arg
.already_used
= NIL
;
1127 /* Make clauses, skipping any that join to lateral_referencers */
1129 clauses
= generate_implied_equalities_for_column(root
,
1131 ec_member_matches_foreign
,
1133 baserel
->lateral_referencers
);
1135 /* Done if there are no more expressions in the foreign rel */
1136 if (arg
.current
== NULL
)
1138 Assert(clauses
== NIL
);
1142 /* Scan the extracted join clauses */
1143 foreach(lc
, clauses
)
1145 RestrictInfo
*rinfo
= (RestrictInfo
*) lfirst(lc
);
1146 Relids required_outer
;
1147 ParamPathInfo
*param_info
;
1149 /* Check if clause can be moved to this rel */
1150 if (!join_clause_is_movable_to(rinfo
, baserel
))
1153 /* See if it is safe to send to remote */
1154 if (!is_foreign_expr(root
, baserel
, rinfo
->clause
))
1157 /* Calculate required outer rels for the resulting path */
1158 required_outer
= bms_union(rinfo
->clause_relids
,
1159 baserel
->lateral_relids
);
1160 required_outer
= bms_del_member(required_outer
, baserel
->relid
);
1161 if (bms_is_empty(required_outer
))
1164 /* Get the ParamPathInfo */
1165 param_info
= get_baserel_parampathinfo(root
, baserel
,
1167 Assert(param_info
!= NULL
);
1169 /* Add it to list unless we already have it */
1170 ppi_list
= list_append_unique_ptr(ppi_list
, param_info
);
1173 /* Try again, now ignoring the expression we found this time */
1174 arg
.already_used
= lappend(arg
.already_used
, arg
.current
);
1179 * Now build a path for each useful outer relation.
1181 foreach(lc
, ppi_list
)
1183 ParamPathInfo
*param_info
= (ParamPathInfo
*) lfirst(lc
);
1189 /* Get a cost estimate from the remote */
1190 estimate_path_cost_size(root
, baserel
,
1191 param_info
->ppi_clauses
, NIL
, NULL
,
1193 &startup_cost
, &total_cost
);
1196 * ppi_rows currently won't get looked at by anything, but still we
1197 * may as well ensure that it matches our idea of the rowcount.
1199 param_info
->ppi_rows
= rows
;
1202 path
= create_foreignscan_path(root
, baserel
,
1203 NULL
, /* default pathtarget */
1207 NIL
, /* no pathkeys */
1208 param_info
->ppi_req_outer
,
1210 NIL
, /* no fdw_restrictinfo list */
1211 NIL
); /* no fdw_private list */
1212 add_path(baserel
, (Path
*) path
);
1217 * postgresGetForeignPlan
1218 * Create ForeignScan plan node which implements selected best path
1220 static ForeignScan
*
1221 postgresGetForeignPlan(PlannerInfo
*root
,
1222 RelOptInfo
*foreignrel
,
1224 ForeignPath
*best_path
,
1229 PgFdwRelationInfo
*fpinfo
= (PgFdwRelationInfo
*) foreignrel
->fdw_private
;
1232 List
*remote_exprs
= NIL
;
1233 List
*local_exprs
= NIL
;
1234 List
*params_list
= NIL
;
1235 List
*fdw_scan_tlist
= NIL
;
1236 List
*fdw_recheck_quals
= NIL
;
1237 List
*retrieved_attrs
;
1239 bool has_final_sort
= false;
1240 bool has_limit
= false;
1244 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1246 if (best_path
->fdw_private
)
1248 has_final_sort
= boolVal(list_nth(best_path
->fdw_private
,
1249 FdwPathPrivateHasFinalSort
));
1250 has_limit
= boolVal(list_nth(best_path
->fdw_private
,
1251 FdwPathPrivateHasLimit
));
1254 if (IS_SIMPLE_REL(foreignrel
))
1257 * For base relations, set scan_relid as the relid of the relation.
1259 scan_relid
= foreignrel
->relid
;
1262 * In a base-relation scan, we must apply the given scan_clauses.
1264 * Separate the scan_clauses into those that can be executed remotely
1265 * and those that can't. baserestrictinfo clauses that were
1266 * previously determined to be safe or unsafe by classifyConditions
1267 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1268 * else in the scan_clauses list will be a join clause, which we have
1269 * to check for remote-safety.
1271 * Note: the join clauses we see here should be the exact same ones
1272 * previously examined by postgresGetForeignPaths. Possibly it'd be
1273 * worth passing forward the classification work done then, rather
1274 * than repeating it here.
1276 * This code must match "extract_actual_clauses(scan_clauses, false)"
1277 * except for the additional decision about remote versus local
1280 foreach(lc
, scan_clauses
)
1282 RestrictInfo
*rinfo
= lfirst_node(RestrictInfo
, lc
);
1284 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1285 if (rinfo
->pseudoconstant
)
1288 if (list_member_ptr(fpinfo
->remote_conds
, rinfo
))
1289 remote_exprs
= lappend(remote_exprs
, rinfo
->clause
);
1290 else if (list_member_ptr(fpinfo
->local_conds
, rinfo
))
1291 local_exprs
= lappend(local_exprs
, rinfo
->clause
);
1292 else if (is_foreign_expr(root
, foreignrel
, rinfo
->clause
))
1293 remote_exprs
= lappend(remote_exprs
, rinfo
->clause
);
1295 local_exprs
= lappend(local_exprs
, rinfo
->clause
);
1299 * For a base-relation scan, we have to support EPQ recheck, which
1300 * should recheck all the remote quals.
1302 fdw_recheck_quals
= remote_exprs
;
1307 * Join relation or upper relation - set scan_relid to 0.
1312 * For a join rel, baserestrictinfo is NIL and we are not considering
1313 * parameterization right now, so there should be no scan_clauses for
1314 * a joinrel or an upper rel either.
1316 Assert(!scan_clauses
);
1319 * Instead we get the conditions to apply from the fdw_private
1322 remote_exprs
= extract_actual_clauses(fpinfo
->remote_conds
, false);
1323 local_exprs
= extract_actual_clauses(fpinfo
->local_conds
, false);
1326 * We leave fdw_recheck_quals empty in this case, since we never need
1327 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1328 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1329 * If we're planning an upperrel (ie, remote grouping or aggregation)
1330 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1331 * allowed, and indeed we *can't* put the remote clauses into
1332 * fdw_recheck_quals because the unaggregated Vars won't be available
1336 /* Build the list of columns to be fetched from the foreign server. */
1337 fdw_scan_tlist
= build_tlist_to_deparse(foreignrel
);
1340 * Ensure that the outer plan produces a tuple whose descriptor
1341 * matches our scan tuple slot. Also, remove the local conditions
1342 * from outer plan's quals, lest they be evaluated twice, once by the
1343 * local plan and once by the scan.
1348 * Right now, we only consider grouping and aggregation beyond
1349 * joins. Queries involving aggregates or grouping do not require
1350 * EPQ mechanism, hence should not have an outer plan here.
1352 Assert(!IS_UPPER_REL(foreignrel
));
1355 * First, update the plan's qual list if possible. In some cases
1356 * the quals might be enforced below the topmost plan level, in
1357 * which case we'll fail to remove them; it's not worth working
1360 foreach(lc
, local_exprs
)
1362 Node
*qual
= lfirst(lc
);
1364 outer_plan
->qual
= list_delete(outer_plan
->qual
, qual
);
1367 * For an inner join the local conditions of foreign scan plan
1368 * can be part of the joinquals as well. (They might also be
1369 * in the mergequals or hashquals, but we can't touch those
1370 * without breaking the plan.)
1372 if (IsA(outer_plan
, NestLoop
) ||
1373 IsA(outer_plan
, MergeJoin
) ||
1374 IsA(outer_plan
, HashJoin
))
1376 Join
*join_plan
= (Join
*) outer_plan
;
1378 if (join_plan
->jointype
== JOIN_INNER
)
1379 join_plan
->joinqual
= list_delete(join_plan
->joinqual
,
1385 * Now fix the subplan's tlist --- this might result in inserting
1386 * a Result node atop the plan tree.
1388 outer_plan
= change_plan_targetlist(outer_plan
, fdw_scan_tlist
,
1389 best_path
->path
.parallel_safe
);
1394 * Build the query string to be sent for execution, and identify
1395 * expressions to be sent as parameters.
1397 initStringInfo(&sql
);
1398 deparseSelectStmtForRel(&sql
, root
, foreignrel
, fdw_scan_tlist
,
1399 remote_exprs
, best_path
->path
.pathkeys
,
1400 has_final_sort
, has_limit
, false,
1401 &retrieved_attrs
, ¶ms_list
);
1403 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1404 fpinfo
->final_remote_exprs
= remote_exprs
;
1407 * Build the fdw_private list that will be available to the executor.
1408 * Items in the list must match order in enum FdwScanPrivateIndex.
1410 fdw_private
= list_make3(makeString(sql
.data
),
1412 makeInteger(fpinfo
->fetch_size
));
1413 if (IS_JOIN_REL(foreignrel
) || IS_UPPER_REL(foreignrel
))
1414 fdw_private
= lappend(fdw_private
,
1415 makeString(fpinfo
->relation_name
));
1418 * Create the ForeignScan node for the given relation.
1420 * Note that the remote parameter expressions are stored in the fdw_exprs
1421 * field of the finished plan node; we can't keep them in private state
1422 * because then they wouldn't be subject to later planner processing.
1424 return make_foreignscan(tlist
,
1435 * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1438 get_tupdesc_for_join_scan_tuples(ForeignScanState
*node
)
1440 ForeignScan
*fsplan
= (ForeignScan
*) node
->ss
.ps
.plan
;
1441 EState
*estate
= node
->ss
.ps
.state
;
1445 * The core code has already set up a scan tuple slot based on
1446 * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1447 * but there's one case where it isn't. If we have any whole-row row
1448 * identifier Vars, they may have vartype RECORD, and we need to replace
1449 * that with the associated table's actual composite type. This ensures
1450 * that when we read those ROW() expression values from the remote server,
1451 * we can convert them to a composite type the local server knows.
1453 tupdesc
= CreateTupleDescCopy(node
->ss
.ss_ScanTupleSlot
->tts_tupleDescriptor
);
1454 for (int i
= 0; i
< tupdesc
->natts
; i
++)
1456 Form_pg_attribute att
= TupleDescAttr(tupdesc
, i
);
1461 /* Nothing to do if it's not a generic RECORD attribute */
1462 if (att
->atttypid
!= RECORDOID
|| att
->atttypmod
>= 0)
1466 * If we can't identify the referenced table, do nothing. This'll
1467 * likely lead to failure later, but perhaps we can muddle through.
1469 var
= (Var
*) list_nth_node(TargetEntry
, fsplan
->fdw_scan_tlist
,
1471 if (!IsA(var
, Var
) || var
->varattno
!= 0)
1473 rte
= list_nth(estate
->es_range_table
, var
->varno
- 1);
1474 if (rte
->rtekind
!= RTE_RELATION
)
1476 reltype
= get_rel_type_id(rte
->relid
);
1477 if (!OidIsValid(reltype
))
1479 att
->atttypid
= reltype
;
1480 /* shouldn't need to change anything else */
1486 * postgresBeginForeignScan
1487 * Initiate an executor scan of a foreign PostgreSQL table.
1490 postgresBeginForeignScan(ForeignScanState
*node
, int eflags
)
1492 ForeignScan
*fsplan
= (ForeignScan
*) node
->ss
.ps
.plan
;
1493 EState
*estate
= node
->ss
.ps
.state
;
1494 PgFdwScanState
*fsstate
;
1497 ForeignTable
*table
;
1503 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1505 if (eflags
& EXEC_FLAG_EXPLAIN_ONLY
)
1509 * We'll save private state in node->fdw_state.
1511 fsstate
= (PgFdwScanState
*) palloc0(sizeof(PgFdwScanState
));
1512 node
->fdw_state
= (void *) fsstate
;
1515 * Identify which user to do the remote access as. This should match what
1516 * ExecCheckPermissions() does.
1518 userid
= OidIsValid(fsplan
->checkAsUser
) ? fsplan
->checkAsUser
: GetUserId();
1519 if (fsplan
->scan
.scanrelid
> 0)
1520 rtindex
= fsplan
->scan
.scanrelid
;
1522 rtindex
= bms_next_member(fsplan
->fs_base_relids
, -1);
1523 rte
= exec_rt_fetch(rtindex
, estate
);
1525 /* Get info about foreign table. */
1526 table
= GetForeignTable(rte
->relid
);
1527 user
= GetUserMapping(userid
, table
->serverid
);
1530 * Get connection to the foreign server. Connection manager will
1531 * establish new connection if necessary.
1533 fsstate
->conn
= GetConnection(user
, false, &fsstate
->conn_state
);
1535 /* Assign a unique ID for my cursor */
1536 fsstate
->cursor_number
= GetCursorNumber(fsstate
->conn
);
1537 fsstate
->cursor_exists
= false;
1539 /* Get private info created by planner functions. */
1540 fsstate
->query
= strVal(list_nth(fsplan
->fdw_private
,
1541 FdwScanPrivateSelectSql
));
1542 fsstate
->retrieved_attrs
= (List
*) list_nth(fsplan
->fdw_private
,
1543 FdwScanPrivateRetrievedAttrs
);
1544 fsstate
->fetch_size
= intVal(list_nth(fsplan
->fdw_private
,
1545 FdwScanPrivateFetchSize
));
1547 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1548 fsstate
->batch_cxt
= AllocSetContextCreate(estate
->es_query_cxt
,
1549 "postgres_fdw tuple data",
1550 ALLOCSET_DEFAULT_SIZES
);
1551 fsstate
->temp_cxt
= AllocSetContextCreate(estate
->es_query_cxt
,
1552 "postgres_fdw temporary data",
1553 ALLOCSET_SMALL_SIZES
);
1556 * Get info we'll need for converting data fetched from the foreign server
1557 * into local representation and error reporting during that process.
1559 if (fsplan
->scan
.scanrelid
> 0)
1561 fsstate
->rel
= node
->ss
.ss_currentRelation
;
1562 fsstate
->tupdesc
= RelationGetDescr(fsstate
->rel
);
1566 fsstate
->rel
= NULL
;
1567 fsstate
->tupdesc
= get_tupdesc_for_join_scan_tuples(node
);
1570 fsstate
->attinmeta
= TupleDescGetAttInMetadata(fsstate
->tupdesc
);
1573 * Prepare for processing of parameters used in remote query, if any.
1575 numParams
= list_length(fsplan
->fdw_exprs
);
1576 fsstate
->numParams
= numParams
;
1578 prepare_query_params((PlanState
*) node
,
1581 &fsstate
->param_flinfo
,
1582 &fsstate
->param_exprs
,
1583 &fsstate
->param_values
);
1585 /* Set the async-capable flag */
1586 fsstate
->async_capable
= node
->ss
.ps
.async_capable
;
1590 * postgresIterateForeignScan
1591 * Retrieve next row from the result set, or clear tuple slot to indicate
1594 static TupleTableSlot
*
1595 postgresIterateForeignScan(ForeignScanState
*node
)
1597 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
1598 TupleTableSlot
*slot
= node
->ss
.ss_ScanTupleSlot
;
1601 * In sync mode, if this is the first call after Begin or ReScan, we need
1602 * to create the cursor on the remote side. In async mode, we would have
1603 * already created the cursor before we get here, even if this is the
1604 * first call after Begin or ReScan.
1606 if (!fsstate
->cursor_exists
)
1607 create_cursor(node
);
1610 * Get some more tuples, if we've run out.
1612 if (fsstate
->next_tuple
>= fsstate
->num_tuples
)
1614 /* In async mode, just clear tuple slot. */
1615 if (fsstate
->async_capable
)
1616 return ExecClearTuple(slot
);
1617 /* No point in another fetch if we already detected EOF, though. */
1618 if (!fsstate
->eof_reached
)
1619 fetch_more_data(node
);
1620 /* If we didn't get any tuples, must be end of data. */
1621 if (fsstate
->next_tuple
>= fsstate
->num_tuples
)
1622 return ExecClearTuple(slot
);
1626 * Return the next tuple.
1628 ExecStoreHeapTuple(fsstate
->tuples
[fsstate
->next_tuple
++],
1636 * postgresReScanForeignScan
1640 postgresReScanForeignScan(ForeignScanState
*node
)
1642 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
1646 /* If we haven't created the cursor yet, nothing to do. */
1647 if (!fsstate
->cursor_exists
)
1651 * If the node is async-capable, and an asynchronous fetch for it has
1652 * begun, the asynchronous fetch might not have yet completed. Check if
1653 * the node is async-capable, and an asynchronous fetch for it is still in
1654 * progress; if so, complete the asynchronous fetch before restarting the
1657 if (fsstate
->async_capable
&&
1658 fsstate
->conn_state
->pendingAreq
&&
1659 fsstate
->conn_state
->pendingAreq
->requestee
== (PlanState
*) node
)
1660 fetch_more_data(node
);
1663 * If any internal parameters affecting this node have changed, we'd
1664 * better destroy and recreate the cursor. Otherwise, rewinding it should
1665 * be good enough. If we've only fetched zero or one batch, we needn't
1666 * even rewind the cursor, just rescan what we have.
1668 if (node
->ss
.ps
.chgParam
!= NULL
)
1670 fsstate
->cursor_exists
= false;
1671 snprintf(sql
, sizeof(sql
), "CLOSE c%u",
1672 fsstate
->cursor_number
);
1674 else if (fsstate
->fetch_ct_2
> 1)
1676 snprintf(sql
, sizeof(sql
), "MOVE BACKWARD ALL IN c%u",
1677 fsstate
->cursor_number
);
1681 /* Easy: just rescan what we already have in memory, if anything */
1682 fsstate
->next_tuple
= 0;
1687 * We don't use a PG_TRY block here, so be careful not to throw error
1688 * without releasing the PGresult.
1690 res
= pgfdw_exec_query(fsstate
->conn
, sql
, fsstate
->conn_state
);
1691 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1692 pgfdw_report_error(ERROR
, res
, fsstate
->conn
, true, sql
);
1695 /* Now force a fresh FETCH. */
1696 fsstate
->tuples
= NULL
;
1697 fsstate
->num_tuples
= 0;
1698 fsstate
->next_tuple
= 0;
1699 fsstate
->fetch_ct_2
= 0;
1700 fsstate
->eof_reached
= false;
1704 * postgresEndForeignScan
1705 * Finish scanning foreign table and dispose objects used for this scan
1708 postgresEndForeignScan(ForeignScanState
*node
)
1710 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
1712 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1713 if (fsstate
== NULL
)
1716 /* Close the cursor if open, to prevent accumulation of cursors */
1717 if (fsstate
->cursor_exists
)
1718 close_cursor(fsstate
->conn
, fsstate
->cursor_number
,
1719 fsstate
->conn_state
);
1721 /* Release remote connection */
1722 ReleaseConnection(fsstate
->conn
);
1723 fsstate
->conn
= NULL
;
1725 /* MemoryContexts will be deleted automatically. */
1729 * postgresAddForeignUpdateTargets
1730 * Add resjunk column(s) needed for update/delete on a foreign table
1733 postgresAddForeignUpdateTargets(PlannerInfo
*root
,
1735 RangeTblEntry
*target_rte
,
1736 Relation target_relation
)
1741 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1744 /* Make a Var representing the desired value */
1745 var
= makeVar(rtindex
,
1746 SelfItemPointerAttributeNumber
,
1752 /* Register it as a row-identity column needed by this target rel */
1753 add_row_identity_var(root
, var
, rtindex
, "ctid");
1757 * postgresPlanForeignModify
1758 * Plan an insert/update/delete operation on a foreign table
1761 postgresPlanForeignModify(PlannerInfo
*root
,
1763 Index resultRelation
,
1766 CmdType operation
= plan
->operation
;
1767 RangeTblEntry
*rte
= planner_rt_fetch(resultRelation
, root
);
1770 List
*targetAttrs
= NIL
;
1771 List
*withCheckOptionList
= NIL
;
1772 List
*returningList
= NIL
;
1773 List
*retrieved_attrs
= NIL
;
1774 bool doNothing
= false;
1775 int values_end_len
= -1;
1777 initStringInfo(&sql
);
1780 * Core code already has some lock on each rel being planned, so we can
1783 rel
= table_open(rte
->relid
, NoLock
);
1786 * In an INSERT, we transmit all columns that are defined in the foreign
1787 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1788 * foreign table, we transmit all columns like INSERT; else we transmit
1789 * only columns that were explicitly targets of the UPDATE, so as to avoid
1790 * unnecessary data transmission. (We can't do that for INSERT since we
1791 * would miss sending default values for columns not listed in the source
1792 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1793 * those triggers might change values for non-target columns, in which
1794 * case we would miss sending changed values for those columns.)
1796 if (operation
== CMD_INSERT
||
1797 (operation
== CMD_UPDATE
&&
1799 rel
->trigdesc
->trig_update_before_row
))
1801 TupleDesc tupdesc
= RelationGetDescr(rel
);
1804 for (attnum
= 1; attnum
<= tupdesc
->natts
; attnum
++)
1806 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, attnum
- 1);
1808 if (!attr
->attisdropped
)
1809 targetAttrs
= lappend_int(targetAttrs
, attnum
);
1812 else if (operation
== CMD_UPDATE
)
1815 RelOptInfo
*rel
= find_base_rel(root
, resultRelation
);
1816 Bitmapset
*allUpdatedCols
= get_rel_all_updated_cols(root
, rel
);
1819 while ((col
= bms_next_member(allUpdatedCols
, col
)) >= 0)
1821 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1822 AttrNumber attno
= col
+ FirstLowInvalidHeapAttributeNumber
;
1824 if (attno
<= InvalidAttrNumber
) /* shouldn't happen */
1825 elog(ERROR
, "system-column update is not supported");
1826 targetAttrs
= lappend_int(targetAttrs
, attno
);
1831 * Extract the relevant WITH CHECK OPTION list if any.
1833 if (plan
->withCheckOptionLists
)
1834 withCheckOptionList
= (List
*) list_nth(plan
->withCheckOptionLists
,
1838 * Extract the relevant RETURNING list if any.
1840 if (plan
->returningLists
)
1841 returningList
= (List
*) list_nth(plan
->returningLists
, subplan_index
);
1844 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1845 * should have already been rejected in the optimizer, as presently there
1846 * is no way to recognize an arbiter index on a foreign table. Only DO
1847 * NOTHING is supported without an inference specification.
1849 if (plan
->onConflictAction
== ONCONFLICT_NOTHING
)
1851 else if (plan
->onConflictAction
!= ONCONFLICT_NONE
)
1852 elog(ERROR
, "unexpected ON CONFLICT specification: %d",
1853 (int) plan
->onConflictAction
);
1856 * Construct the SQL command string.
1861 deparseInsertSql(&sql
, rte
, resultRelation
, rel
,
1862 targetAttrs
, doNothing
,
1863 withCheckOptionList
, returningList
,
1864 &retrieved_attrs
, &values_end_len
);
1867 deparseUpdateSql(&sql
, rte
, resultRelation
, rel
,
1869 withCheckOptionList
, returningList
,
1873 deparseDeleteSql(&sql
, rte
, resultRelation
, rel
,
1878 elog(ERROR
, "unexpected operation: %d", (int) operation
);
1882 table_close(rel
, NoLock
);
1885 * Build the fdw_private list that will be available to the executor.
1886 * Items in the list must match enum FdwModifyPrivateIndex, above.
1888 return list_make5(makeString(sql
.data
),
1890 makeInteger(values_end_len
),
1891 makeBoolean((retrieved_attrs
!= NIL
)),
1896 * postgresBeginForeignModify
1897 * Begin an insert/update/delete operation on a foreign table
1900 postgresBeginForeignModify(ModifyTableState
*mtstate
,
1901 ResultRelInfo
*resultRelInfo
,
1906 PgFdwModifyState
*fmstate
;
1911 List
*retrieved_attrs
;
1915 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1918 if (eflags
& EXEC_FLAG_EXPLAIN_ONLY
)
1921 /* Deconstruct fdw_private data. */
1922 query
= strVal(list_nth(fdw_private
,
1923 FdwModifyPrivateUpdateSql
));
1924 target_attrs
= (List
*) list_nth(fdw_private
,
1925 FdwModifyPrivateTargetAttnums
);
1926 values_end_len
= intVal(list_nth(fdw_private
,
1927 FdwModifyPrivateLen
));
1928 has_returning
= boolVal(list_nth(fdw_private
,
1929 FdwModifyPrivateHasReturning
));
1930 retrieved_attrs
= (List
*) list_nth(fdw_private
,
1931 FdwModifyPrivateRetrievedAttrs
);
1934 rte
= exec_rt_fetch(resultRelInfo
->ri_RangeTableIndex
,
1937 /* Construct an execution state. */
1938 fmstate
= create_foreign_modify(mtstate
->ps
.state
,
1942 outerPlanState(mtstate
)->plan
,
1949 resultRelInfo
->ri_FdwState
= fmstate
;
1953 * postgresExecForeignInsert
1954 * Insert one row into a foreign table
1956 static TupleTableSlot
*
1957 postgresExecForeignInsert(EState
*estate
,
1958 ResultRelInfo
*resultRelInfo
,
1959 TupleTableSlot
*slot
,
1960 TupleTableSlot
*planSlot
)
1962 PgFdwModifyState
*fmstate
= (PgFdwModifyState
*) resultRelInfo
->ri_FdwState
;
1963 TupleTableSlot
**rslot
;
1967 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1968 * postgresBeginForeignInsert())
1970 if (fmstate
->aux_fmstate
)
1971 resultRelInfo
->ri_FdwState
= fmstate
->aux_fmstate
;
1972 rslot
= execute_foreign_modify(estate
, resultRelInfo
, CMD_INSERT
,
1973 &slot
, &planSlot
, &numSlots
);
1974 /* Revert that change */
1975 if (fmstate
->aux_fmstate
)
1976 resultRelInfo
->ri_FdwState
= fmstate
;
1978 return rslot
? *rslot
: NULL
;
1982 * postgresExecForeignBatchInsert
1983 * Insert multiple rows into a foreign table
1985 static TupleTableSlot
**
1986 postgresExecForeignBatchInsert(EState
*estate
,
1987 ResultRelInfo
*resultRelInfo
,
1988 TupleTableSlot
**slots
,
1989 TupleTableSlot
**planSlots
,
1992 PgFdwModifyState
*fmstate
= (PgFdwModifyState
*) resultRelInfo
->ri_FdwState
;
1993 TupleTableSlot
**rslot
;
1996 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1997 * postgresBeginForeignInsert())
1999 if (fmstate
->aux_fmstate
)
2000 resultRelInfo
->ri_FdwState
= fmstate
->aux_fmstate
;
2001 rslot
= execute_foreign_modify(estate
, resultRelInfo
, CMD_INSERT
,
2002 slots
, planSlots
, numSlots
);
2003 /* Revert that change */
2004 if (fmstate
->aux_fmstate
)
2005 resultRelInfo
->ri_FdwState
= fmstate
;
2011 * postgresGetForeignModifyBatchSize
2012 * Determine the maximum number of tuples that can be inserted in bulk
2014 * Returns the batch size specified for server or table. When batching is not
2015 * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING
2016 * clause), returns 1.
2019 postgresGetForeignModifyBatchSize(ResultRelInfo
*resultRelInfo
)
2022 PgFdwModifyState
*fmstate
= (PgFdwModifyState
*) resultRelInfo
->ri_FdwState
;
2024 /* should be called only once */
2025 Assert(resultRelInfo
->ri_BatchSize
== 0);
2028 * Should never get called when the insert is being performed on a table
2029 * that is also among the target relations of an UPDATE operation, because
2030 * postgresBeginForeignInsert() currently rejects such insert attempts.
2032 Assert(fmstate
== NULL
|| fmstate
->aux_fmstate
== NULL
);
2035 * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2036 * the option directly in server/table options. Otherwise just use the
2037 * value we determined earlier.
2040 batch_size
= fmstate
->batch_size
;
2042 batch_size
= get_batch_size_option(resultRelInfo
->ri_RelationDesc
);
2045 * Disable batching when we have to use RETURNING, there are any
2046 * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any
2047 * WITH CHECK OPTION constraints from parent views.
2049 * When there are any BEFORE ROW INSERT triggers on the table, we can't
2050 * support it, because such triggers might query the table we're inserting
2051 * into and act differently if the tuples that have already been processed
2052 * and prepared for insertion are not there.
2054 if (resultRelInfo
->ri_projectReturning
!= NULL
||
2055 resultRelInfo
->ri_WithCheckOptions
!= NIL
||
2056 (resultRelInfo
->ri_TrigDesc
&&
2057 (resultRelInfo
->ri_TrigDesc
->trig_insert_before_row
||
2058 resultRelInfo
->ri_TrigDesc
->trig_insert_after_row
)))
2062 * If the foreign table has no columns, disable batching as the INSERT
2063 * syntax doesn't allow batching multiple empty rows into a zero-column
2064 * table in a single statement. This is needed for COPY FROM, in which
2065 * case fmstate must be non-NULL.
2067 if (fmstate
&& list_length(fmstate
->target_attrs
) == 0)
2071 * Otherwise use the batch size specified for server/table. The number of
2072 * parameters in a batch is limited to 65535 (uint16), so make sure we
2073 * don't exceed this limit by using the maximum batch_size possible.
2075 if (fmstate
&& fmstate
->p_nums
> 0)
2076 batch_size
= Min(batch_size
, PQ_QUERY_PARAM_MAX_LIMIT
/ fmstate
->p_nums
);
2082 * postgresExecForeignUpdate
2083 * Update one row in a foreign table
2085 static TupleTableSlot
*
2086 postgresExecForeignUpdate(EState
*estate
,
2087 ResultRelInfo
*resultRelInfo
,
2088 TupleTableSlot
*slot
,
2089 TupleTableSlot
*planSlot
)
2091 TupleTableSlot
**rslot
;
2094 rslot
= execute_foreign_modify(estate
, resultRelInfo
, CMD_UPDATE
,
2095 &slot
, &planSlot
, &numSlots
);
2097 return rslot
? rslot
[0] : NULL
;
2101 * postgresExecForeignDelete
2102 * Delete one row from a foreign table
2104 static TupleTableSlot
*
2105 postgresExecForeignDelete(EState
*estate
,
2106 ResultRelInfo
*resultRelInfo
,
2107 TupleTableSlot
*slot
,
2108 TupleTableSlot
*planSlot
)
2110 TupleTableSlot
**rslot
;
2113 rslot
= execute_foreign_modify(estate
, resultRelInfo
, CMD_DELETE
,
2114 &slot
, &planSlot
, &numSlots
);
2116 return rslot
? rslot
[0] : NULL
;
2120 * postgresEndForeignModify
2121 * Finish an insert/update/delete operation on a foreign table
2124 postgresEndForeignModify(EState
*estate
,
2125 ResultRelInfo
*resultRelInfo
)
2127 PgFdwModifyState
*fmstate
= (PgFdwModifyState
*) resultRelInfo
->ri_FdwState
;
2129 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2130 if (fmstate
== NULL
)
2133 /* Destroy the execution state */
2134 finish_foreign_modify(fmstate
);
2138 * postgresBeginForeignInsert
2139 * Begin an insert operation on a foreign table
2142 postgresBeginForeignInsert(ModifyTableState
*mtstate
,
2143 ResultRelInfo
*resultRelInfo
)
2145 PgFdwModifyState
*fmstate
;
2146 ModifyTable
*plan
= castNode(ModifyTable
, mtstate
->ps
.plan
);
2147 EState
*estate
= mtstate
->ps
.state
;
2148 Index resultRelation
;
2149 Relation rel
= resultRelInfo
->ri_RelationDesc
;
2151 TupleDesc tupdesc
= RelationGetDescr(rel
);
2155 List
*targetAttrs
= NIL
;
2156 List
*retrieved_attrs
= NIL
;
2157 bool doNothing
= false;
2160 * If the foreign table we are about to insert routed rows into is also an
2161 * UPDATE subplan result rel that will be updated later, proceeding with
2162 * the INSERT will result in the later UPDATE incorrectly modifying those
2163 * routed rows, so prevent the INSERT --- it would be nice if we could
2164 * handle this case; but for now, throw an error for safety.
2166 if (plan
&& plan
->operation
== CMD_UPDATE
&&
2167 (resultRelInfo
->ri_usesFdwDirectModify
||
2168 resultRelInfo
->ri_FdwState
))
2170 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
2171 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2172 RelationGetRelationName(rel
))));
2174 initStringInfo(&sql
);
2176 /* We transmit all columns that are defined in the foreign table. */
2177 for (attnum
= 1; attnum
<= tupdesc
->natts
; attnum
++)
2179 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, attnum
- 1);
2181 if (!attr
->attisdropped
)
2182 targetAttrs
= lappend_int(targetAttrs
, attnum
);
2185 /* Check if we add the ON CONFLICT clause to the remote query. */
2188 OnConflictAction onConflictAction
= plan
->onConflictAction
;
2190 /* We only support DO NOTHING without an inference specification. */
2191 if (onConflictAction
== ONCONFLICT_NOTHING
)
2193 else if (onConflictAction
!= ONCONFLICT_NONE
)
2194 elog(ERROR
, "unexpected ON CONFLICT specification: %d",
2195 (int) onConflictAction
);
2199 * If the foreign table is a partition that doesn't have a corresponding
2200 * RTE entry, we need to create a new RTE describing the foreign table for
2201 * use by deparseInsertSql and create_foreign_modify() below, after first
2202 * copying the parent's RTE and modifying some fields to describe the
2203 * foreign partition to work on. However, if this is invoked by UPDATE,
2204 * the existing RTE may already correspond to this partition if it is one
2205 * of the UPDATE subplan target rels; in that case, we can just use the
2206 * existing RTE as-is.
2208 if (resultRelInfo
->ri_RangeTableIndex
== 0)
2210 ResultRelInfo
*rootResultRelInfo
= resultRelInfo
->ri_RootResultRelInfo
;
2212 rte
= exec_rt_fetch(rootResultRelInfo
->ri_RangeTableIndex
, estate
);
2213 rte
= copyObject(rte
);
2214 rte
->relid
= RelationGetRelid(rel
);
2215 rte
->relkind
= RELKIND_FOREIGN_TABLE
;
2218 * For UPDATE, we must use the RT index of the first subplan target
2219 * rel's RTE, because the core code would have built expressions for
2220 * the partition, such as RETURNING, using that RT index as varno of
2221 * Vars contained in those expressions.
2223 if (plan
&& plan
->operation
== CMD_UPDATE
&&
2224 rootResultRelInfo
->ri_RangeTableIndex
== plan
->rootRelation
)
2225 resultRelation
= mtstate
->resultRelInfo
[0].ri_RangeTableIndex
;
2227 resultRelation
= rootResultRelInfo
->ri_RangeTableIndex
;
2231 resultRelation
= resultRelInfo
->ri_RangeTableIndex
;
2232 rte
= exec_rt_fetch(resultRelation
, estate
);
2235 /* Construct the SQL command string. */
2236 deparseInsertSql(&sql
, rte
, resultRelation
, rel
, targetAttrs
, doNothing
,
2237 resultRelInfo
->ri_WithCheckOptions
,
2238 resultRelInfo
->ri_returningList
,
2239 &retrieved_attrs
, &values_end_len
);
2241 /* Construct an execution state. */
2242 fmstate
= create_foreign_modify(mtstate
->ps
.state
,
2250 retrieved_attrs
!= NIL
,
2254 * If the given resultRelInfo already has PgFdwModifyState set, it means
2255 * the foreign table is an UPDATE subplan result rel; in which case, store
2256 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2258 if (resultRelInfo
->ri_FdwState
)
2260 Assert(plan
&& plan
->operation
== CMD_UPDATE
);
2261 Assert(resultRelInfo
->ri_usesFdwDirectModify
== false);
2262 ((PgFdwModifyState
*) resultRelInfo
->ri_FdwState
)->aux_fmstate
= fmstate
;
2265 resultRelInfo
->ri_FdwState
= fmstate
;
2269 * postgresEndForeignInsert
2270 * Finish an insert operation on a foreign table
2273 postgresEndForeignInsert(EState
*estate
,
2274 ResultRelInfo
*resultRelInfo
)
2276 PgFdwModifyState
*fmstate
= (PgFdwModifyState
*) resultRelInfo
->ri_FdwState
;
2278 Assert(fmstate
!= NULL
);
2281 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2282 * postgresBeginForeignInsert())
2284 if (fmstate
->aux_fmstate
)
2285 fmstate
= fmstate
->aux_fmstate
;
2287 /* Destroy the execution state */
2288 finish_foreign_modify(fmstate
);
2292 * postgresIsForeignRelUpdatable
2293 * Determine whether a foreign table supports INSERT, UPDATE and/or
2297 postgresIsForeignRelUpdatable(Relation rel
)
2300 ForeignTable
*table
;
2301 ForeignServer
*server
;
2305 * By default, all postgres_fdw foreign tables are assumed updatable. This
2306 * can be overridden by a per-server setting, which in turn can be
2307 * overridden by a per-table setting.
2311 table
= GetForeignTable(RelationGetRelid(rel
));
2312 server
= GetForeignServer(table
->serverid
);
2314 foreach(lc
, server
->options
)
2316 DefElem
*def
= (DefElem
*) lfirst(lc
);
2318 if (strcmp(def
->defname
, "updatable") == 0)
2319 updatable
= defGetBoolean(def
);
2321 foreach(lc
, table
->options
)
2323 DefElem
*def
= (DefElem
*) lfirst(lc
);
2325 if (strcmp(def
->defname
, "updatable") == 0)
2326 updatable
= defGetBoolean(def
);
2330 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2333 (1 << CMD_INSERT
) | (1 << CMD_UPDATE
) | (1 << CMD_DELETE
) : 0;
2337 * postgresRecheckForeignScan
2338 * Execute a local join execution plan for a foreign join
2341 postgresRecheckForeignScan(ForeignScanState
*node
, TupleTableSlot
*slot
)
2343 Index scanrelid
= ((Scan
*) node
->ss
.ps
.plan
)->scanrelid
;
2344 PlanState
*outerPlan
= outerPlanState(node
);
2345 TupleTableSlot
*result
;
2347 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2351 Assert(outerPlan
!= NULL
);
2353 /* Execute a local join execution plan */
2354 result
= ExecProcNode(outerPlan
);
2355 if (TupIsNull(result
))
2358 /* Store result in the given slot */
2359 ExecCopySlot(slot
, result
);
2365 * find_modifytable_subplan
2366 * Helper routine for postgresPlanDirectModify to find the
2367 * ModifyTable subplan node that scans the specified RTI.
2369 * Returns NULL if the subplan couldn't be identified. That's not a fatal
2370 * error condition, we just abandon trying to do the update directly.
2372 static ForeignScan
*
2373 find_modifytable_subplan(PlannerInfo
*root
,
2378 Plan
*subplan
= outerPlan(plan
);
2381 * The cases we support are (1) the desired ForeignScan is the immediate
2382 * child of ModifyTable, or (2) it is the subplan_index'th child of an
2383 * Append node that is the immediate child of ModifyTable. There is no
2384 * point in looking further down, as that would mean that local joins are
2385 * involved, so we can't do the update directly.
2387 * There could be a Result atop the Append too, acting to compute the
2388 * UPDATE targetlist values. We ignore that here; the tlist will be
2389 * checked by our caller.
2391 * In principle we could examine all the children of the Append, but it's
2392 * currently unlikely that the core planner would generate such a plan
2393 * with the children out-of-order. Moreover, such a search risks costing
2394 * O(N^2) time when there are a lot of children.
2396 if (IsA(subplan
, Append
))
2398 Append
*appendplan
= (Append
*) subplan
;
2400 if (subplan_index
< list_length(appendplan
->appendplans
))
2401 subplan
= (Plan
*) list_nth(appendplan
->appendplans
, subplan_index
);
2403 else if (IsA(subplan
, Result
) &&
2404 outerPlan(subplan
) != NULL
&&
2405 IsA(outerPlan(subplan
), Append
))
2407 Append
*appendplan
= (Append
*) outerPlan(subplan
);
2409 if (subplan_index
< list_length(appendplan
->appendplans
))
2410 subplan
= (Plan
*) list_nth(appendplan
->appendplans
, subplan_index
);
2413 /* Now, have we got a ForeignScan on the desired rel? */
2414 if (IsA(subplan
, ForeignScan
))
2416 ForeignScan
*fscan
= (ForeignScan
*) subplan
;
2418 if (bms_is_member(rtindex
, fscan
->fs_base_relids
))
2426 * postgresPlanDirectModify
2427 * Consider a direct foreign table modification
2429 * Decide whether it is safe to modify a foreign table directly, and if so,
2430 * rewrite subplan accordingly.
2433 postgresPlanDirectModify(PlannerInfo
*root
,
2435 Index resultRelation
,
2438 CmdType operation
= plan
->operation
;
2439 RelOptInfo
*foreignrel
;
2441 PgFdwRelationInfo
*fpinfo
;
2445 List
*processed_tlist
= NIL
;
2446 List
*targetAttrs
= NIL
;
2448 List
*params_list
= NIL
;
2449 List
*returningList
= NIL
;
2450 List
*retrieved_attrs
= NIL
;
2453 * Decide whether it is safe to modify a foreign table directly.
2457 * The table modification must be an UPDATE or DELETE.
2459 if (operation
!= CMD_UPDATE
&& operation
!= CMD_DELETE
)
2463 * Try to locate the ForeignScan subplan that's scanning resultRelation.
2465 fscan
= find_modifytable_subplan(root
, plan
, resultRelation
, subplan_index
);
2470 * It's unsafe to modify a foreign table directly if there are any quals
2471 * that should be evaluated locally.
2473 if (fscan
->scan
.plan
.qual
!= NIL
)
2476 /* Safe to fetch data about the target foreign rel */
2477 if (fscan
->scan
.scanrelid
== 0)
2479 foreignrel
= find_join_rel(root
, fscan
->fs_relids
);
2480 /* We should have a rel for this foreign join. */
2484 foreignrel
= root
->simple_rel_array
[resultRelation
];
2485 rte
= root
->simple_rte_array
[resultRelation
];
2486 fpinfo
= (PgFdwRelationInfo
*) foreignrel
->fdw_private
;
2489 * It's unsafe to update a foreign table directly, if any expressions to
2490 * assign to the target columns are unsafe to evaluate remotely.
2492 if (operation
== CMD_UPDATE
)
2498 * The expressions of concern are the first N columns of the processed
2499 * targetlist, where N is the length of the rel's update_colnos.
2501 get_translated_update_targetlist(root
, resultRelation
,
2502 &processed_tlist
, &targetAttrs
);
2503 forboth(lc
, processed_tlist
, lc2
, targetAttrs
)
2505 TargetEntry
*tle
= lfirst_node(TargetEntry
, lc
);
2506 AttrNumber attno
= lfirst_int(lc2
);
2508 /* update's new-value expressions shouldn't be resjunk */
2509 Assert(!tle
->resjunk
);
2511 if (attno
<= InvalidAttrNumber
) /* shouldn't happen */
2512 elog(ERROR
, "system-column update is not supported");
2514 if (!is_foreign_expr(root
, foreignrel
, (Expr
*) tle
->expr
))
2520 * Ok, rewrite subplan so as to modify the foreign table directly.
2522 initStringInfo(&sql
);
2525 * Core code already has some lock on each rel being planned, so we can
2528 rel
= table_open(rte
->relid
, NoLock
);
2531 * Recall the qual clauses that must be evaluated remotely. (These are
2532 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2535 remote_exprs
= fpinfo
->final_remote_exprs
;
2538 * Extract the relevant RETURNING list if any.
2540 if (plan
->returningLists
)
2542 returningList
= (List
*) list_nth(plan
->returningLists
, subplan_index
);
2545 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2546 * we fetch from the foreign server any Vars specified in RETURNING
2547 * that refer not only to the target relation but to non-target
2548 * relations. So we'll deparse them into the RETURNING clause of the
2549 * remote query; use a targetlist consisting of them instead, which
2550 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2553 if (fscan
->scan
.scanrelid
== 0)
2554 returningList
= build_remote_returning(resultRelation
, rel
,
2559 * Construct the SQL command string.
2564 deparseDirectUpdateSql(&sql
, root
, resultRelation
, rel
,
2568 remote_exprs
, ¶ms_list
,
2569 returningList
, &retrieved_attrs
);
2572 deparseDirectDeleteSql(&sql
, root
, resultRelation
, rel
,
2574 remote_exprs
, ¶ms_list
,
2575 returningList
, &retrieved_attrs
);
2578 elog(ERROR
, "unexpected operation: %d", (int) operation
);
2583 * Update the operation and target relation info.
2585 fscan
->operation
= operation
;
2586 fscan
->resultRelation
= resultRelation
;
2589 * Update the fdw_exprs list that will be available to the executor.
2591 fscan
->fdw_exprs
= params_list
;
2594 * Update the fdw_private list that will be available to the executor.
2595 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2597 fscan
->fdw_private
= list_make4(makeString(sql
.data
),
2598 makeBoolean((retrieved_attrs
!= NIL
)),
2600 makeBoolean(plan
->canSetTag
));
2603 * Update the foreign-join-related fields.
2605 if (fscan
->scan
.scanrelid
== 0)
2607 /* No need for the outer subplan. */
2608 fscan
->scan
.plan
.lefttree
= NULL
;
2610 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2612 rebuild_fdw_scan_tlist(fscan
, returningList
);
2616 * Finally, unset the async-capable flag if it is set, as we currently
2617 * don't support asynchronous execution of direct modifications.
2619 if (fscan
->scan
.plan
.async_capable
)
2620 fscan
->scan
.plan
.async_capable
= false;
2622 table_close(rel
, NoLock
);
2627 * postgresBeginDirectModify
2628 * Prepare a direct foreign table modification
2631 postgresBeginDirectModify(ForeignScanState
*node
, int eflags
)
2633 ForeignScan
*fsplan
= (ForeignScan
*) node
->ss
.ps
.plan
;
2634 EState
*estate
= node
->ss
.ps
.state
;
2635 PgFdwDirectModifyState
*dmstate
;
2638 ForeignTable
*table
;
2643 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2645 if (eflags
& EXEC_FLAG_EXPLAIN_ONLY
)
2649 * We'll save private state in node->fdw_state.
2651 dmstate
= (PgFdwDirectModifyState
*) palloc0(sizeof(PgFdwDirectModifyState
));
2652 node
->fdw_state
= (void *) dmstate
;
2655 * Identify which user to do the remote access as. This should match what
2656 * ExecCheckPermissions() does.
2658 userid
= OidIsValid(fsplan
->checkAsUser
) ? fsplan
->checkAsUser
: GetUserId();
2660 /* Get info about foreign table. */
2661 rtindex
= node
->resultRelInfo
->ri_RangeTableIndex
;
2662 if (fsplan
->scan
.scanrelid
== 0)
2663 dmstate
->rel
= ExecOpenScanRelation(estate
, rtindex
, eflags
);
2665 dmstate
->rel
= node
->ss
.ss_currentRelation
;
2666 table
= GetForeignTable(RelationGetRelid(dmstate
->rel
));
2667 user
= GetUserMapping(userid
, table
->serverid
);
2670 * Get connection to the foreign server. Connection manager will
2671 * establish new connection if necessary.
2673 dmstate
->conn
= GetConnection(user
, false, &dmstate
->conn_state
);
2675 /* Update the foreign-join-related fields. */
2676 if (fsplan
->scan
.scanrelid
== 0)
2678 /* Save info about foreign table. */
2679 dmstate
->resultRel
= dmstate
->rel
;
2682 * Set dmstate->rel to NULL to teach get_returning_data() and
2683 * make_tuple_from_result_row() that columns fetched from the remote
2684 * server are described by fdw_scan_tlist of the foreign-scan plan
2685 * node, not the tuple descriptor for the target relation.
2687 dmstate
->rel
= NULL
;
2690 /* Initialize state variable */
2691 dmstate
->num_tuples
= -1; /* -1 means not set yet */
2693 /* Get private info created by planner functions. */
2694 dmstate
->query
= strVal(list_nth(fsplan
->fdw_private
,
2695 FdwDirectModifyPrivateUpdateSql
));
2696 dmstate
->has_returning
= boolVal(list_nth(fsplan
->fdw_private
,
2697 FdwDirectModifyPrivateHasReturning
));
2698 dmstate
->retrieved_attrs
= (List
*) list_nth(fsplan
->fdw_private
,
2699 FdwDirectModifyPrivateRetrievedAttrs
);
2700 dmstate
->set_processed
= boolVal(list_nth(fsplan
->fdw_private
,
2701 FdwDirectModifyPrivateSetProcessed
));
2703 /* Create context for per-tuple temp workspace. */
2704 dmstate
->temp_cxt
= AllocSetContextCreate(estate
->es_query_cxt
,
2705 "postgres_fdw temporary data",
2706 ALLOCSET_SMALL_SIZES
);
2708 /* Prepare for input conversion of RETURNING results. */
2709 if (dmstate
->has_returning
)
2713 if (fsplan
->scan
.scanrelid
== 0)
2714 tupdesc
= get_tupdesc_for_join_scan_tuples(node
);
2716 tupdesc
= RelationGetDescr(dmstate
->rel
);
2718 dmstate
->attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
2721 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2722 * initialize a filter to extract an updated/deleted tuple from a scan
2725 if (fsplan
->scan
.scanrelid
== 0)
2726 init_returning_filter(dmstate
, fsplan
->fdw_scan_tlist
, rtindex
);
2730 * Prepare for processing of parameters used in remote query, if any.
2732 numParams
= list_length(fsplan
->fdw_exprs
);
2733 dmstate
->numParams
= numParams
;
2735 prepare_query_params((PlanState
*) node
,
2738 &dmstate
->param_flinfo
,
2739 &dmstate
->param_exprs
,
2740 &dmstate
->param_values
);
2744 * postgresIterateDirectModify
2745 * Execute a direct foreign table modification
2747 static TupleTableSlot
*
2748 postgresIterateDirectModify(ForeignScanState
*node
)
2750 PgFdwDirectModifyState
*dmstate
= (PgFdwDirectModifyState
*) node
->fdw_state
;
2751 EState
*estate
= node
->ss
.ps
.state
;
2752 ResultRelInfo
*resultRelInfo
= node
->resultRelInfo
;
2755 * If this is the first call after Begin, execute the statement.
2757 if (dmstate
->num_tuples
== -1)
2758 execute_dml_stmt(node
);
2761 * If the local query doesn't specify RETURNING, just clear tuple slot.
2763 if (!resultRelInfo
->ri_projectReturning
)
2765 TupleTableSlot
*slot
= node
->ss
.ss_ScanTupleSlot
;
2766 Instrumentation
*instr
= node
->ss
.ps
.instrument
;
2768 Assert(!dmstate
->has_returning
);
2770 /* Increment the command es_processed count if necessary. */
2771 if (dmstate
->set_processed
)
2772 estate
->es_processed
+= dmstate
->num_tuples
;
2774 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2776 instr
->tuplecount
+= dmstate
->num_tuples
;
2778 return ExecClearTuple(slot
);
2782 * Get the next RETURNING tuple.
2784 return get_returning_data(node
);
2788 * postgresEndDirectModify
2789 * Finish a direct foreign table modification
2792 postgresEndDirectModify(ForeignScanState
*node
)
2794 PgFdwDirectModifyState
*dmstate
= (PgFdwDirectModifyState
*) node
->fdw_state
;
2796 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2797 if (dmstate
== NULL
)
2800 /* Release PGresult */
2801 PQclear(dmstate
->result
);
2803 /* Release remote connection */
2804 ReleaseConnection(dmstate
->conn
);
2805 dmstate
->conn
= NULL
;
2807 /* MemoryContext will be deleted automatically. */
2811 * postgresExplainForeignScan
2812 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2815 postgresExplainForeignScan(ForeignScanState
*node
, ExplainState
*es
)
2817 ForeignScan
*plan
= castNode(ForeignScan
, node
->ss
.ps
.plan
);
2818 List
*fdw_private
= plan
->fdw_private
;
2821 * Identify foreign scans that are really joins or upper relations. The
2822 * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2823 * digit string(s), which are RT indexes, with the correct relation names.
2824 * We do that here, not when the plan is created, because we can't know
2825 * what aliases ruleutils.c will assign at plan creation time.
2827 if (list_length(fdw_private
) > FdwScanPrivateRelations
)
2829 StringInfo relations
;
2835 rawrelations
= strVal(list_nth(fdw_private
, FdwScanPrivateRelations
));
2838 * A difficulty with using a string representation of RT indexes is
2839 * that setrefs.c won't update the string when flattening the
2840 * rangetable. To find out what rtoffset was applied, identify the
2841 * minimum RT index appearing in the string and compare it to the
2842 * minimum member of plan->fs_base_relids. (We expect all the relids
2843 * in the join will have been offset by the same amount; the Asserts
2844 * below should catch it if that ever changes.)
2850 if (isdigit((unsigned char) *ptr
))
2852 int rti
= strtol(ptr
, &ptr
, 10);
2860 rtoffset
= bms_next_member(plan
->fs_base_relids
, -1) - minrti
;
2862 /* Now we can translate the string */
2863 relations
= makeStringInfo();
2867 if (isdigit((unsigned char) *ptr
))
2869 int rti
= strtol(ptr
, &ptr
, 10);
2875 Assert(bms_is_member(rti
, plan
->fs_base_relids
));
2876 rte
= rt_fetch(rti
, es
->rtable
);
2877 Assert(rte
->rtekind
== RTE_RELATION
);
2878 /* This logic should agree with explain.c's ExplainTargetRel */
2879 relname
= get_rel_name(rte
->relid
);
2884 namespace = get_namespace_name_or_temp(get_rel_namespace(rte
->relid
));
2885 appendStringInfo(relations
, "%s.%s",
2886 quote_identifier(namespace),
2887 quote_identifier(relname
));
2890 appendStringInfoString(relations
,
2891 quote_identifier(relname
));
2892 refname
= (char *) list_nth(es
->rtable_names
, rti
- 1);
2893 if (refname
== NULL
)
2894 refname
= rte
->eref
->aliasname
;
2895 if (strcmp(refname
, relname
) != 0)
2896 appendStringInfo(relations
, " %s",
2897 quote_identifier(refname
));
2900 appendStringInfoChar(relations
, *ptr
++);
2902 ExplainPropertyText("Relations", relations
->data
, es
);
2906 * Add remote query, when VERBOSE option is specified.
2912 sql
= strVal(list_nth(fdw_private
, FdwScanPrivateSelectSql
));
2913 ExplainPropertyText("Remote SQL", sql
, es
);
2918 * postgresExplainForeignModify
2919 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2922 postgresExplainForeignModify(ModifyTableState
*mtstate
,
2923 ResultRelInfo
*rinfo
,
2930 char *sql
= strVal(list_nth(fdw_private
,
2931 FdwModifyPrivateUpdateSql
));
2933 ExplainPropertyText("Remote SQL", sql
, es
);
2936 * For INSERT we should always have batch size >= 1, but UPDATE and
2937 * DELETE don't support batching so don't show the property.
2939 if (rinfo
->ri_BatchSize
> 0)
2940 ExplainPropertyInteger("Batch Size", NULL
, rinfo
->ri_BatchSize
, es
);
2945 * postgresExplainDirectModify
2946 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2947 * foreign table directly
2950 postgresExplainDirectModify(ForeignScanState
*node
, ExplainState
*es
)
2957 fdw_private
= ((ForeignScan
*) node
->ss
.ps
.plan
)->fdw_private
;
2958 sql
= strVal(list_nth(fdw_private
, FdwDirectModifyPrivateUpdateSql
));
2959 ExplainPropertyText("Remote SQL", sql
, es
);
2964 * postgresExecForeignTruncate
2965 * Truncate one or more foreign tables
2968 postgresExecForeignTruncate(List
*rels
,
2969 DropBehavior behavior
,
2972 Oid serverid
= InvalidOid
;
2973 UserMapping
*user
= NULL
;
2974 PGconn
*conn
= NULL
;
2977 bool server_truncatable
= true;
2980 * By default, all postgres_fdw foreign tables are assumed truncatable.
2981 * This can be overridden by a per-server setting, which in turn can be
2982 * overridden by a per-table setting.
2986 ForeignServer
*server
= NULL
;
2987 Relation rel
= lfirst(lc
);
2988 ForeignTable
*table
= GetForeignTable(RelationGetRelid(rel
));
2993 * First time through, determine whether the foreign server allows
2994 * truncates. Since all specified foreign tables are assumed to belong
2995 * to the same foreign server, this result can be used for other
2998 if (!OidIsValid(serverid
))
3000 serverid
= table
->serverid
;
3001 server
= GetForeignServer(serverid
);
3003 foreach(cell
, server
->options
)
3005 DefElem
*defel
= (DefElem
*) lfirst(cell
);
3007 if (strcmp(defel
->defname
, "truncatable") == 0)
3009 server_truncatable
= defGetBoolean(defel
);
3016 * Confirm that all specified foreign tables belong to the same
3019 Assert(table
->serverid
== serverid
);
3021 /* Determine whether this foreign table allows truncations */
3022 truncatable
= server_truncatable
;
3023 foreach(cell
, table
->options
)
3025 DefElem
*defel
= (DefElem
*) lfirst(cell
);
3027 if (strcmp(defel
->defname
, "truncatable") == 0)
3029 truncatable
= defGetBoolean(defel
);
3036 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
3037 errmsg("foreign table \"%s\" does not allow truncates",
3038 RelationGetRelationName(rel
))));
3040 Assert(OidIsValid(serverid
));
3043 * Get connection to the foreign server. Connection manager will
3044 * establish new connection if necessary.
3046 user
= GetUserMapping(GetUserId(), serverid
);
3047 conn
= GetConnection(user
, false, NULL
);
3049 /* Construct the TRUNCATE command string */
3050 initStringInfo(&sql
);
3051 deparseTruncateSql(&sql
, rels
, behavior
, restart_seqs
);
3053 /* Issue the TRUNCATE command to remote server */
3054 do_sql_command(conn
, sql
.data
);
3060 * estimate_path_cost_size
3061 * Get cost and size estimates for a foreign scan on given foreign relation
3062 * either a base relation or a join between foreign relations or an upper
3063 * relation containing foreign relations.
3065 * param_join_conds are the parameterization clauses with outer relations.
3066 * pathkeys specify the expected sort order if any for given path being costed.
3067 * fpextra specifies additional post-scan/join-processing steps such as the
3068 * final sort and the LIMIT restriction.
3070 * The function returns the cost and size estimates in p_rows, p_width,
3071 * p_startup_cost and p_total_cost variables.
3074 estimate_path_cost_size(PlannerInfo
*root
,
3075 RelOptInfo
*foreignrel
,
3076 List
*param_join_conds
,
3078 PgFdwPathExtraData
*fpextra
,
3079 double *p_rows
, int *p_width
,
3080 Cost
*p_startup_cost
, Cost
*p_total_cost
)
3082 PgFdwRelationInfo
*fpinfo
= (PgFdwRelationInfo
*) foreignrel
->fdw_private
;
3084 double retrieved_rows
;
3089 /* Make sure the core code has set up the relation's reltarget */
3090 Assert(foreignrel
->reltarget
);
3093 * If the table or the server is configured to use remote estimates,
3094 * connect to the foreign server and execute EXPLAIN to estimate the
3095 * number of rows selected by the restriction+join clauses. Otherwise,
3096 * estimate rows using whatever statistics we have locally, in a way
3097 * similar to ordinary tables.
3099 if (fpinfo
->use_remote_estimate
)
3101 List
*remote_param_join_conds
;
3102 List
*local_param_join_conds
;
3105 Selectivity local_sel
;
3106 QualCost local_cost
;
3107 List
*fdw_scan_tlist
= NIL
;
3110 /* Required only to be passed to deparseSelectStmtForRel */
3111 List
*retrieved_attrs
;
3114 * param_join_conds might contain both clauses that are safe to send
3115 * across, and clauses that aren't.
3117 classifyConditions(root
, foreignrel
, param_join_conds
,
3118 &remote_param_join_conds
, &local_param_join_conds
);
3120 /* Build the list of columns to be fetched from the foreign server. */
3121 if (IS_JOIN_REL(foreignrel
) || IS_UPPER_REL(foreignrel
))
3122 fdw_scan_tlist
= build_tlist_to_deparse(foreignrel
);
3124 fdw_scan_tlist
= NIL
;
3127 * The complete list of remote conditions includes everything from
3128 * baserestrictinfo plus any extra join_conds relevant to this
3131 remote_conds
= list_concat(remote_param_join_conds
,
3132 fpinfo
->remote_conds
);
3135 * Construct EXPLAIN query including the desired SELECT, FROM, and
3136 * WHERE clauses. Params and other-relation Vars are replaced by dummy
3137 * values, so don't request params_list.
3139 initStringInfo(&sql
);
3140 appendStringInfoString(&sql
, "EXPLAIN ");
3141 deparseSelectStmtForRel(&sql
, root
, foreignrel
, fdw_scan_tlist
,
3142 remote_conds
, pathkeys
,
3143 fpextra
? fpextra
->has_final_sort
: false,
3144 fpextra
? fpextra
->has_limit
: false,
3145 false, &retrieved_attrs
, NULL
);
3147 /* Get the remote estimate */
3148 conn
= GetConnection(fpinfo
->user
, false, NULL
);
3149 get_remote_estimate(sql
.data
, conn
, &rows
, &width
,
3150 &startup_cost
, &total_cost
);
3151 ReleaseConnection(conn
);
3153 retrieved_rows
= rows
;
3155 /* Factor in the selectivity of the locally-checked quals */
3156 local_sel
= clauselist_selectivity(root
,
3157 local_param_join_conds
,
3161 local_sel
*= fpinfo
->local_conds_sel
;
3163 rows
= clamp_row_est(rows
* local_sel
);
3165 /* Add in the eval cost of the locally-checked quals */
3166 startup_cost
+= fpinfo
->local_conds_cost
.startup
;
3167 total_cost
+= fpinfo
->local_conds_cost
.per_tuple
* retrieved_rows
;
3168 cost_qual_eval(&local_cost
, local_param_join_conds
, root
);
3169 startup_cost
+= local_cost
.startup
;
3170 total_cost
+= local_cost
.per_tuple
* retrieved_rows
;
3173 * Add in tlist eval cost for each output row. In case of an
3174 * aggregate, some of the tlist expressions such as grouping
3175 * expressions will be evaluated remotely, so adjust the costs.
3177 startup_cost
+= foreignrel
->reltarget
->cost
.startup
;
3178 total_cost
+= foreignrel
->reltarget
->cost
.startup
;
3179 total_cost
+= foreignrel
->reltarget
->cost
.per_tuple
* rows
;
3180 if (IS_UPPER_REL(foreignrel
))
3182 QualCost tlist_cost
;
3184 cost_qual_eval(&tlist_cost
, fdw_scan_tlist
, root
);
3185 startup_cost
-= tlist_cost
.startup
;
3186 total_cost
-= tlist_cost
.startup
;
3187 total_cost
-= tlist_cost
.per_tuple
* rows
;
3195 * We don't support join conditions in this mode (hence, no
3196 * parameterized paths can be made).
3198 Assert(param_join_conds
== NIL
);
3201 * We will come here again and again with different set of pathkeys or
3202 * additional post-scan/join-processing steps that caller wants to
3203 * cost. We don't need to calculate the cost/size estimates for the
3204 * underlying scan, join, or grouping each time. Instead, use those
3205 * estimates if we have cached them already.
3207 if (fpinfo
->rel_startup_cost
>= 0 && fpinfo
->rel_total_cost
>= 0)
3209 Assert(fpinfo
->retrieved_rows
>= 0);
3211 rows
= fpinfo
->rows
;
3212 retrieved_rows
= fpinfo
->retrieved_rows
;
3213 width
= fpinfo
->width
;
3214 startup_cost
= fpinfo
->rel_startup_cost
;
3215 run_cost
= fpinfo
->rel_total_cost
- fpinfo
->rel_startup_cost
;
3218 * If we estimate the costs of a foreign scan or a foreign join
3219 * with additional post-scan/join-processing steps, the scan or
3220 * join costs obtained from the cache wouldn't yet contain the
3221 * eval costs for the final scan/join target, which would've been
3222 * updated by apply_scanjoin_target_to_paths(); add the eval costs
3225 if (fpextra
&& !IS_UPPER_REL(foreignrel
))
3227 /* Shouldn't get here unless we have LIMIT */
3228 Assert(fpextra
->has_limit
);
3229 Assert(foreignrel
->reloptkind
== RELOPT_BASEREL
||
3230 foreignrel
->reloptkind
== RELOPT_JOINREL
);
3231 startup_cost
+= foreignrel
->reltarget
->cost
.startup
;
3232 run_cost
+= foreignrel
->reltarget
->cost
.per_tuple
* rows
;
3235 else if (IS_JOIN_REL(foreignrel
))
3237 PgFdwRelationInfo
*fpinfo_i
;
3238 PgFdwRelationInfo
*fpinfo_o
;
3240 QualCost remote_conds_cost
;
3243 /* Use rows/width estimates made by the core code. */
3244 rows
= foreignrel
->rows
;
3245 width
= foreignrel
->reltarget
->width
;
3247 /* For join we expect inner and outer relations set */
3248 Assert(fpinfo
->innerrel
&& fpinfo
->outerrel
);
3250 fpinfo_i
= (PgFdwRelationInfo
*) fpinfo
->innerrel
->fdw_private
;
3251 fpinfo_o
= (PgFdwRelationInfo
*) fpinfo
->outerrel
->fdw_private
;
3253 /* Estimate of number of rows in cross product */
3254 nrows
= fpinfo_i
->rows
* fpinfo_o
->rows
;
3257 * Back into an estimate of the number of retrieved rows. Just in
3258 * case this is nuts, clamp to at most nrows.
3260 retrieved_rows
= clamp_row_est(rows
/ fpinfo
->local_conds_sel
);
3261 retrieved_rows
= Min(retrieved_rows
, nrows
);
3264 * The cost of foreign join is estimated as cost of generating
3265 * rows for the joining relations + cost for applying quals on the
3270 * Calculate the cost of clauses pushed down to the foreign server
3272 cost_qual_eval(&remote_conds_cost
, fpinfo
->remote_conds
, root
);
3273 /* Calculate the cost of applying join clauses */
3274 cost_qual_eval(&join_cost
, fpinfo
->joinclauses
, root
);
3277 * Startup cost includes startup cost of joining relations and the
3278 * startup cost for join and other clauses. We do not include the
3279 * startup cost specific to join strategy (e.g. setting up hash
3280 * tables) since we do not know what strategy the foreign server
3283 startup_cost
= fpinfo_i
->rel_startup_cost
+ fpinfo_o
->rel_startup_cost
;
3284 startup_cost
+= join_cost
.startup
;
3285 startup_cost
+= remote_conds_cost
.startup
;
3286 startup_cost
+= fpinfo
->local_conds_cost
.startup
;
3289 * Run time cost includes:
3291 * 1. Run time cost (total_cost - startup_cost) of relations being
3294 * 2. Run time cost of applying join clauses on the cross product
3295 * of the joining relations.
3297 * 3. Run time cost of applying pushed down other clauses on the
3300 * 4. Run time cost of applying nonpushable other clauses locally
3301 * on the result fetched from the foreign server.
3303 run_cost
= fpinfo_i
->rel_total_cost
- fpinfo_i
->rel_startup_cost
;
3304 run_cost
+= fpinfo_o
->rel_total_cost
- fpinfo_o
->rel_startup_cost
;
3305 run_cost
+= nrows
* join_cost
.per_tuple
;
3306 nrows
= clamp_row_est(nrows
* fpinfo
->joinclause_sel
);
3307 run_cost
+= nrows
* remote_conds_cost
.per_tuple
;
3308 run_cost
+= fpinfo
->local_conds_cost
.per_tuple
* retrieved_rows
;
3310 /* Add in tlist eval cost for each output row */
3311 startup_cost
+= foreignrel
->reltarget
->cost
.startup
;
3312 run_cost
+= foreignrel
->reltarget
->cost
.per_tuple
* rows
;
3314 else if (IS_UPPER_REL(foreignrel
))
3316 RelOptInfo
*outerrel
= fpinfo
->outerrel
;
3317 PgFdwRelationInfo
*ofpinfo
;
3318 AggClauseCosts aggcosts
;
3321 double numGroups
= 1;
3323 /* The upper relation should have its outer relation set */
3325 /* and that outer relation should have its reltarget set */
3326 Assert(outerrel
->reltarget
);
3329 * This cost model is mixture of costing done for sorted and
3330 * hashed aggregates in cost_agg(). We are not sure which
3331 * strategy will be considered at remote side, thus for
3332 * simplicity, we put all startup related costs in startup_cost
3333 * and all finalization and run cost are added in total_cost.
3336 ofpinfo
= (PgFdwRelationInfo
*) outerrel
->fdw_private
;
3338 /* Get rows from input rel */
3339 input_rows
= ofpinfo
->rows
;
3341 /* Collect statistics about aggregates for estimating costs. */
3342 MemSet(&aggcosts
, 0, sizeof(AggClauseCosts
));
3343 if (root
->parse
->hasAggs
)
3345 get_agg_clause_costs(root
, AGGSPLIT_SIMPLE
, &aggcosts
);
3348 /* Get number of grouping columns and possible number of groups */
3349 numGroupCols
= list_length(root
->processed_groupClause
);
3350 numGroups
= estimate_num_groups(root
,
3351 get_sortgrouplist_exprs(root
->processed_groupClause
,
3352 fpinfo
->grouped_tlist
),
3353 input_rows
, NULL
, NULL
);
3356 * Get the retrieved_rows and rows estimates. If there are HAVING
3357 * quals, account for their selectivity.
3359 if (root
->hasHavingQual
)
3361 /* Factor in the selectivity of the remotely-checked quals */
3363 clamp_row_est(numGroups
*
3364 clauselist_selectivity(root
,
3365 fpinfo
->remote_conds
,
3369 /* Factor in the selectivity of the locally-checked quals */
3370 rows
= clamp_row_est(retrieved_rows
* fpinfo
->local_conds_sel
);
3374 rows
= retrieved_rows
= numGroups
;
3377 /* Use width estimate made by the core code. */
3378 width
= foreignrel
->reltarget
->width
;
3381 * Startup cost includes:
3382 * 1. Startup cost for underneath input relation, adjusted for
3383 * tlist replacement by apply_scanjoin_target_to_paths()
3384 * 2. Cost of performing aggregation, per cost_agg()
3387 startup_cost
= ofpinfo
->rel_startup_cost
;
3388 startup_cost
+= outerrel
->reltarget
->cost
.startup
;
3389 startup_cost
+= aggcosts
.transCost
.startup
;
3390 startup_cost
+= aggcosts
.transCost
.per_tuple
* input_rows
;
3391 startup_cost
+= aggcosts
.finalCost
.startup
;
3392 startup_cost
+= (cpu_operator_cost
* numGroupCols
) * input_rows
;
3395 * Run time cost includes:
3396 * 1. Run time cost of underneath input relation, adjusted for
3397 * tlist replacement by apply_scanjoin_target_to_paths()
3398 * 2. Run time cost of performing aggregation, per cost_agg()
3401 run_cost
= ofpinfo
->rel_total_cost
- ofpinfo
->rel_startup_cost
;
3402 run_cost
+= outerrel
->reltarget
->cost
.per_tuple
* input_rows
;
3403 run_cost
+= aggcosts
.finalCost
.per_tuple
* numGroups
;
3404 run_cost
+= cpu_tuple_cost
* numGroups
;
3406 /* Account for the eval cost of HAVING quals, if any */
3407 if (root
->hasHavingQual
)
3409 QualCost remote_cost
;
3411 /* Add in the eval cost of the remotely-checked quals */
3412 cost_qual_eval(&remote_cost
, fpinfo
->remote_conds
, root
);
3413 startup_cost
+= remote_cost
.startup
;
3414 run_cost
+= remote_cost
.per_tuple
* numGroups
;
3415 /* Add in the eval cost of the locally-checked quals */
3416 startup_cost
+= fpinfo
->local_conds_cost
.startup
;
3417 run_cost
+= fpinfo
->local_conds_cost
.per_tuple
* retrieved_rows
;
3420 /* Add in tlist eval cost for each output row */
3421 startup_cost
+= foreignrel
->reltarget
->cost
.startup
;
3422 run_cost
+= foreignrel
->reltarget
->cost
.per_tuple
* rows
;
3428 /* Use rows/width estimates made by set_baserel_size_estimates. */
3429 rows
= foreignrel
->rows
;
3430 width
= foreignrel
->reltarget
->width
;
3433 * Back into an estimate of the number of retrieved rows. Just in
3434 * case this is nuts, clamp to at most foreignrel->tuples.
3436 retrieved_rows
= clamp_row_est(rows
/ fpinfo
->local_conds_sel
);
3437 retrieved_rows
= Min(retrieved_rows
, foreignrel
->tuples
);
3440 * Cost as though this were a seqscan, which is pessimistic. We
3441 * effectively imagine the local_conds are being evaluated
3446 run_cost
+= seq_page_cost
* foreignrel
->pages
;
3448 startup_cost
+= foreignrel
->baserestrictcost
.startup
;
3449 cpu_per_tuple
= cpu_tuple_cost
+ foreignrel
->baserestrictcost
.per_tuple
;
3450 run_cost
+= cpu_per_tuple
* foreignrel
->tuples
;
3452 /* Add in tlist eval cost for each output row */
3453 startup_cost
+= foreignrel
->reltarget
->cost
.startup
;
3454 run_cost
+= foreignrel
->reltarget
->cost
.per_tuple
* rows
;
3458 * Without remote estimates, we have no real way to estimate the cost
3459 * of generating sorted output. It could be free if the query plan
3460 * the remote side would have chosen generates properly-sorted output
3461 * anyway, but in most cases it will cost something. Estimate a value
3462 * high enough that we won't pick the sorted path when the ordering
3463 * isn't locally useful, but low enough that we'll err on the side of
3464 * pushing down the ORDER BY clause when it's useful to do so.
3466 if (pathkeys
!= NIL
)
3468 if (IS_UPPER_REL(foreignrel
))
3470 Assert(foreignrel
->reloptkind
== RELOPT_UPPER_REL
&&
3471 fpinfo
->stage
== UPPERREL_GROUP_AGG
);
3472 adjust_foreign_grouping_path_cost(root
, pathkeys
,
3473 retrieved_rows
, width
,
3474 fpextra
->limit_tuples
,
3475 &startup_cost
, &run_cost
);
3479 startup_cost
*= DEFAULT_FDW_SORT_MULTIPLIER
;
3480 run_cost
*= DEFAULT_FDW_SORT_MULTIPLIER
;
3484 total_cost
= startup_cost
+ run_cost
;
3486 /* Adjust the cost estimates if we have LIMIT */
3487 if (fpextra
&& fpextra
->has_limit
)
3489 adjust_limit_rows_costs(&rows
, &startup_cost
, &total_cost
,
3490 fpextra
->offset_est
, fpextra
->count_est
);
3491 retrieved_rows
= rows
;
3496 * If this includes the final sort step, the given target, which will be
3497 * applied to the resulting path, might have different expressions from
3498 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3501 if (fpextra
&& fpextra
->has_final_sort
&&
3502 fpextra
->target
!= foreignrel
->reltarget
)
3504 QualCost oldcost
= foreignrel
->reltarget
->cost
;
3505 QualCost newcost
= fpextra
->target
->cost
;
3507 startup_cost
+= newcost
.startup
- oldcost
.startup
;
3508 total_cost
+= newcost
.startup
- oldcost
.startup
;
3509 total_cost
+= (newcost
.per_tuple
- oldcost
.per_tuple
) * rows
;
3513 * Cache the retrieved rows and cost estimates for scans, joins, or
3514 * groupings without any parameterization, pathkeys, or additional
3515 * post-scan/join-processing steps, before adding the costs for
3516 * transferring data from the foreign server. These estimates are useful
3517 * for costing remote joins involving this relation or costing other
3518 * remote operations on this relation such as remote sorts and remote
3519 * LIMIT restrictions, when the costs can not be obtained from the foreign
3520 * server. This function will be called at least once for every foreign
3521 * relation without any parameterization, pathkeys, or additional
3522 * post-scan/join-processing steps.
3524 if (pathkeys
== NIL
&& param_join_conds
== NIL
&& fpextra
== NULL
)
3526 fpinfo
->retrieved_rows
= retrieved_rows
;
3527 fpinfo
->rel_startup_cost
= startup_cost
;
3528 fpinfo
->rel_total_cost
= total_cost
;
3532 * Add some additional cost factors to account for connection overhead
3533 * (fdw_startup_cost), transferring data across the network
3534 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3535 * (cpu_tuple_cost per retrieved row).
3537 startup_cost
+= fpinfo
->fdw_startup_cost
;
3538 total_cost
+= fpinfo
->fdw_startup_cost
;
3539 total_cost
+= fpinfo
->fdw_tuple_cost
* retrieved_rows
;
3540 total_cost
+= cpu_tuple_cost
* retrieved_rows
;
3543 * If we have LIMIT, we should prefer performing the restriction remotely
3544 * rather than locally, as the former avoids extra row fetches from the
3545 * remote that the latter might cause. But since the core code doesn't
3546 * account for such fetches when estimating the costs of the local
3547 * restriction (see create_limit_path()), there would be no difference
3548 * between the costs of the local restriction and the costs of the remote
3549 * restriction estimated above if we don't use remote estimates (except
3550 * for the case where the foreignrel is a grouping relation, the given
3551 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3552 * accounted for in costing the remote restriction). Tweak the costs of
3553 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3556 if (!fpinfo
->use_remote_estimate
&&
3557 fpextra
&& fpextra
->has_limit
&&
3558 fpextra
->limit_tuples
> 0 &&
3559 fpextra
->limit_tuples
< fpinfo
->rows
)
3561 Assert(fpinfo
->rows
> 0);
3562 total_cost
-= (total_cost
- startup_cost
) * 0.05 *
3563 (fpinfo
->rows
- fpextra
->limit_tuples
) / fpinfo
->rows
;
3566 /* Return results. */
3569 *p_startup_cost
= startup_cost
;
3570 *p_total_cost
= total_cost
;
3574 * Estimate costs of executing a SQL statement remotely.
3575 * The given "sql" must be an EXPLAIN command.
3578 get_remote_estimate(const char *sql
, PGconn
*conn
,
3579 double *rows
, int *width
,
3580 Cost
*startup_cost
, Cost
*total_cost
)
3582 PGresult
*volatile res
= NULL
;
3584 /* PGresult must be released before leaving this function. */
3592 * Execute EXPLAIN remotely.
3594 res
= pgfdw_exec_query(conn
, sql
, NULL
);
3595 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
3596 pgfdw_report_error(ERROR
, res
, conn
, false, sql
);
3599 * Extract cost numbers for topmost plan node. Note we search for a
3600 * left paren from the end of the line to avoid being confused by
3601 * other uses of parentheses.
3603 line
= PQgetvalue(res
, 0, 0);
3604 p
= strrchr(line
, '(');
3606 elog(ERROR
, "could not interpret EXPLAIN output: \"%s\"", line
);
3607 n
= sscanf(p
, "(cost=%lf..%lf rows=%lf width=%d)",
3608 startup_cost
, total_cost
, rows
, width
);
3610 elog(ERROR
, "could not interpret EXPLAIN output: \"%s\"", line
);
3620 * Adjust the cost estimates of a foreign grouping path to include the cost of
3621 * generating properly-sorted output.
3624 adjust_foreign_grouping_path_cost(PlannerInfo
*root
,
3626 double retrieved_rows
,
3628 double limit_tuples
,
3629 Cost
*p_startup_cost
,
3633 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3634 * side is unlikely to generate properly-sorted output, so it would need
3635 * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3636 * if the GROUP BY clause is sort-able but isn't a superset of the given
3637 * pathkeys, adjust the costs with that function. Otherwise, adjust the
3638 * costs by applying the same heuristic as for the scan or join case.
3640 if (!grouping_is_sortable(root
->processed_groupClause
) ||
3641 !pathkeys_contained_in(pathkeys
, root
->group_pathkeys
))
3643 Path sort_path
; /* dummy for result of cost_sort */
3645 cost_sort(&sort_path
,
3648 *p_startup_cost
+ *p_run_cost
,
3655 *p_startup_cost
= sort_path
.startup_cost
;
3656 *p_run_cost
= sort_path
.total_cost
- sort_path
.startup_cost
;
3661 * The default extra cost seems too large for foreign-grouping cases;
3662 * add 1/4th of that default.
3664 double sort_multiplier
= 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3667 *p_startup_cost
*= sort_multiplier
;
3668 *p_run_cost
*= sort_multiplier
;
3673 * Detect whether we want to process an EquivalenceClass member.
3675 * This is a callback for use by generate_implied_equalities_for_column.
3678 ec_member_matches_foreign(PlannerInfo
*root
, RelOptInfo
*rel
,
3679 EquivalenceClass
*ec
, EquivalenceMember
*em
,
3682 ec_member_foreign_arg
*state
= (ec_member_foreign_arg
*) arg
;
3683 Expr
*expr
= em
->em_expr
;
3686 * If we've identified what we're processing in the current scan, we only
3687 * want to match that expression.
3689 if (state
->current
!= NULL
)
3690 return equal(expr
, state
->current
);
3693 * Otherwise, ignore anything we've already processed.
3695 if (list_member(state
->already_used
, expr
))
3698 /* This is the new target to process. */
3699 state
->current
= expr
;
3704 * Create cursor for node's query with current parameter values.
3707 create_cursor(ForeignScanState
*node
)
3709 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
3710 ExprContext
*econtext
= node
->ss
.ps
.ps_ExprContext
;
3711 int numParams
= fsstate
->numParams
;
3712 const char **values
= fsstate
->param_values
;
3713 PGconn
*conn
= fsstate
->conn
;
3717 /* First, process a pending asynchronous request, if any. */
3718 if (fsstate
->conn_state
->pendingAreq
)
3719 process_pending_request(fsstate
->conn_state
->pendingAreq
);
3722 * Construct array of query parameter values in text format. We do the
3723 * conversions in the short-lived per-tuple context, so as not to cause a
3724 * memory leak over repeated scans.
3728 MemoryContext oldcontext
;
3730 oldcontext
= MemoryContextSwitchTo(econtext
->ecxt_per_tuple_memory
);
3732 process_query_params(econtext
,
3733 fsstate
->param_flinfo
,
3734 fsstate
->param_exprs
,
3737 MemoryContextSwitchTo(oldcontext
);
3740 /* Construct the DECLARE CURSOR command */
3741 initStringInfo(&buf
);
3742 appendStringInfo(&buf
, "DECLARE c%u CURSOR FOR\n%s",
3743 fsstate
->cursor_number
, fsstate
->query
);
3746 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3747 * to infer types for all parameters. Since we explicitly cast every
3748 * parameter (see deparse.c), the "inference" is trivial and will produce
3749 * the desired result. This allows us to avoid assuming that the remote
3750 * server has the same OIDs we do for the parameters' types.
3752 if (!PQsendQueryParams(conn
, buf
.data
, numParams
,
3753 NULL
, values
, NULL
, NULL
, 0))
3754 pgfdw_report_error(ERROR
, NULL
, conn
, false, buf
.data
);
3757 * Get the result, and check for success.
3759 * We don't use a PG_TRY block here, so be careful not to throw error
3760 * without releasing the PGresult.
3762 res
= pgfdw_get_result(conn
, buf
.data
);
3763 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
3764 pgfdw_report_error(ERROR
, res
, conn
, true, fsstate
->query
);
3767 /* Mark the cursor as created, and show no tuples have been retrieved */
3768 fsstate
->cursor_exists
= true;
3769 fsstate
->tuples
= NULL
;
3770 fsstate
->num_tuples
= 0;
3771 fsstate
->next_tuple
= 0;
3772 fsstate
->fetch_ct_2
= 0;
3773 fsstate
->eof_reached
= false;
3780 * Fetch some more rows from the node's cursor.
3783 fetch_more_data(ForeignScanState
*node
)
3785 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
3786 PGresult
*volatile res
= NULL
;
3787 MemoryContext oldcontext
;
3790 * We'll store the tuples in the batch_cxt. First, flush the previous
3793 fsstate
->tuples
= NULL
;
3794 MemoryContextReset(fsstate
->batch_cxt
);
3795 oldcontext
= MemoryContextSwitchTo(fsstate
->batch_cxt
);
3797 /* PGresult must be released before leaving this function. */
3800 PGconn
*conn
= fsstate
->conn
;
3804 if (fsstate
->async_capable
)
3806 Assert(fsstate
->conn_state
->pendingAreq
);
3809 * The query was already sent by an earlier call to
3810 * fetch_more_data_begin. So now we just fetch the result.
3812 res
= pgfdw_get_result(conn
, fsstate
->query
);
3813 /* On error, report the original query, not the FETCH. */
3814 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
3815 pgfdw_report_error(ERROR
, res
, conn
, false, fsstate
->query
);
3817 /* Reset per-connection state */
3818 fsstate
->conn_state
->pendingAreq
= NULL
;
3824 /* This is a regular synchronous fetch. */
3825 snprintf(sql
, sizeof(sql
), "FETCH %d FROM c%u",
3826 fsstate
->fetch_size
, fsstate
->cursor_number
);
3828 res
= pgfdw_exec_query(conn
, sql
, fsstate
->conn_state
);
3829 /* On error, report the original query, not the FETCH. */
3830 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
3831 pgfdw_report_error(ERROR
, res
, conn
, false, fsstate
->query
);
3834 /* Convert the data into HeapTuples */
3835 numrows
= PQntuples(res
);
3836 fsstate
->tuples
= (HeapTuple
*) palloc0(numrows
* sizeof(HeapTuple
));
3837 fsstate
->num_tuples
= numrows
;
3838 fsstate
->next_tuple
= 0;
3840 for (i
= 0; i
< numrows
; i
++)
3842 Assert(IsA(node
->ss
.ps
.plan
, ForeignScan
));
3844 fsstate
->tuples
[i
] =
3845 make_tuple_from_result_row(res
, i
,
3848 fsstate
->retrieved_attrs
,
3853 /* Update fetch_ct_2 */
3854 if (fsstate
->fetch_ct_2
< 2)
3855 fsstate
->fetch_ct_2
++;
3857 /* Must be EOF if we didn't get as many tuples as we asked for. */
3858 fsstate
->eof_reached
= (numrows
< fsstate
->fetch_size
);
3866 MemoryContextSwitchTo(oldcontext
);
3870 * Force assorted GUC parameters to settings that ensure that we'll output
3871 * data values in a form that is unambiguous to the remote server.
3873 * This is rather expensive and annoying to do once per row, but there's
3874 * little choice if we want to be sure values are transmitted accurately;
3875 * we can't leave the settings in place between rows for fear of affecting
3876 * user-visible computations.
3878 * We use the equivalent of a function SET option to allow the settings to
3879 * persist only until the caller calls reset_transmission_modes(). If an
3880 * error is thrown in between, guc.c will take care of undoing the settings.
3882 * The return value is the nestlevel that must be passed to
3883 * reset_transmission_modes() to undo things.
3886 set_transmission_modes(void)
3888 int nestlevel
= NewGUCNestLevel();
3891 * The values set here should match what pg_dump does. See also
3892 * configure_remote_session in connection.c.
3894 if (DateStyle
!= USE_ISO_DATES
)
3895 (void) set_config_option("datestyle", "ISO",
3896 PGC_USERSET
, PGC_S_SESSION
,
3897 GUC_ACTION_SAVE
, true, 0, false);
3898 if (IntervalStyle
!= INTSTYLE_POSTGRES
)
3899 (void) set_config_option("intervalstyle", "postgres",
3900 PGC_USERSET
, PGC_S_SESSION
,
3901 GUC_ACTION_SAVE
, true, 0, false);
3902 if (extra_float_digits
< 3)
3903 (void) set_config_option("extra_float_digits", "3",
3904 PGC_USERSET
, PGC_S_SESSION
,
3905 GUC_ACTION_SAVE
, true, 0, false);
3908 * In addition force restrictive search_path, in case there are any
3909 * regproc or similar constants to be printed.
3911 (void) set_config_option("search_path", "pg_catalog",
3912 PGC_USERSET
, PGC_S_SESSION
,
3913 GUC_ACTION_SAVE
, true, 0, false);
3919 * Undo the effects of set_transmission_modes().
3922 reset_transmission_modes(int nestlevel
)
3924 AtEOXact_GUC(true, nestlevel
);
3928 * Utility routine to close a cursor.
3931 close_cursor(PGconn
*conn
, unsigned int cursor_number
,
3932 PgFdwConnState
*conn_state
)
3937 snprintf(sql
, sizeof(sql
), "CLOSE c%u", cursor_number
);
3940 * We don't use a PG_TRY block here, so be careful not to throw error
3941 * without releasing the PGresult.
3943 res
= pgfdw_exec_query(conn
, sql
, conn_state
);
3944 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
3945 pgfdw_report_error(ERROR
, res
, conn
, true, sql
);
3950 * create_foreign_modify
3951 * Construct an execution state of a foreign insert/update/delete
3954 static PgFdwModifyState
*
3955 create_foreign_modify(EState
*estate
,
3957 ResultRelInfo
*resultRelInfo
,
3964 List
*retrieved_attrs
)
3966 PgFdwModifyState
*fmstate
;
3967 Relation rel
= resultRelInfo
->ri_RelationDesc
;
3968 TupleDesc tupdesc
= RelationGetDescr(rel
);
3970 ForeignTable
*table
;
3972 AttrNumber n_params
;
3977 /* Begin constructing PgFdwModifyState. */
3978 fmstate
= (PgFdwModifyState
*) palloc0(sizeof(PgFdwModifyState
));
3981 /* Identify which user to do the remote access as. */
3982 userid
= ExecGetResultRelCheckAsUser(resultRelInfo
, estate
);
3984 /* Get info about foreign table. */
3985 table
= GetForeignTable(RelationGetRelid(rel
));
3986 user
= GetUserMapping(userid
, table
->serverid
);
3988 /* Open connection; report that we'll create a prepared statement. */
3989 fmstate
->conn
= GetConnection(user
, true, &fmstate
->conn_state
);
3990 fmstate
->p_name
= NULL
; /* prepared statement not made yet */
3992 /* Set up remote query information. */
3993 fmstate
->query
= query
;
3994 if (operation
== CMD_INSERT
)
3996 fmstate
->query
= pstrdup(fmstate
->query
);
3997 fmstate
->orig_query
= pstrdup(fmstate
->query
);
3999 fmstate
->target_attrs
= target_attrs
;
4000 fmstate
->values_end
= values_end
;
4001 fmstate
->has_returning
= has_returning
;
4002 fmstate
->retrieved_attrs
= retrieved_attrs
;
4004 /* Create context for per-tuple temp workspace. */
4005 fmstate
->temp_cxt
= AllocSetContextCreate(estate
->es_query_cxt
,
4006 "postgres_fdw temporary data",
4007 ALLOCSET_SMALL_SIZES
);
4009 /* Prepare for input conversion of RETURNING results. */
4010 if (fmstate
->has_returning
)
4011 fmstate
->attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
4013 /* Prepare for output conversion of parameters used in prepared stmt. */
4014 n_params
= list_length(fmstate
->target_attrs
) + 1;
4015 fmstate
->p_flinfo
= (FmgrInfo
*) palloc0(sizeof(FmgrInfo
) * n_params
);
4016 fmstate
->p_nums
= 0;
4018 if (operation
== CMD_UPDATE
|| operation
== CMD_DELETE
)
4020 Assert(subplan
!= NULL
);
4022 /* Find the ctid resjunk column in the subplan's result */
4023 fmstate
->ctidAttno
= ExecFindJunkAttributeInTlist(subplan
->targetlist
,
4025 if (!AttributeNumberIsValid(fmstate
->ctidAttno
))
4026 elog(ERROR
, "could not find junk ctid column");
4028 /* First transmittable parameter will be ctid */
4029 getTypeOutputInfo(TIDOID
, &typefnoid
, &isvarlena
);
4030 fmgr_info(typefnoid
, &fmstate
->p_flinfo
[fmstate
->p_nums
]);
4034 if (operation
== CMD_INSERT
|| operation
== CMD_UPDATE
)
4036 /* Set up for remaining transmittable parameters */
4037 foreach(lc
, fmstate
->target_attrs
)
4039 int attnum
= lfirst_int(lc
);
4040 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, attnum
- 1);
4042 Assert(!attr
->attisdropped
);
4044 /* Ignore generated columns; they are set to DEFAULT */
4045 if (attr
->attgenerated
)
4047 getTypeOutputInfo(attr
->atttypid
, &typefnoid
, &isvarlena
);
4048 fmgr_info(typefnoid
, &fmstate
->p_flinfo
[fmstate
->p_nums
]);
4053 Assert(fmstate
->p_nums
<= n_params
);
4055 /* Set batch_size from foreign server/table options. */
4056 if (operation
== CMD_INSERT
)
4057 fmstate
->batch_size
= get_batch_size_option(rel
);
4059 fmstate
->num_slots
= 1;
4061 /* Initialize auxiliary state */
4062 fmstate
->aux_fmstate
= NULL
;
4068 * execute_foreign_modify
4069 * Perform foreign-table modification as required, and fetch RETURNING
4070 * result if any. (This is the shared guts of postgresExecForeignInsert,
4071 * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4072 * postgresExecForeignDelete.)
4074 static TupleTableSlot
**
4075 execute_foreign_modify(EState
*estate
,
4076 ResultRelInfo
*resultRelInfo
,
4078 TupleTableSlot
**slots
,
4079 TupleTableSlot
**planSlots
,
4082 PgFdwModifyState
*fmstate
= (PgFdwModifyState
*) resultRelInfo
->ri_FdwState
;
4083 ItemPointer ctid
= NULL
;
4084 const char **p_values
;
4089 /* The operation should be INSERT, UPDATE, or DELETE */
4090 Assert(operation
== CMD_INSERT
||
4091 operation
== CMD_UPDATE
||
4092 operation
== CMD_DELETE
);
4094 /* First, process a pending asynchronous request, if any. */
4095 if (fmstate
->conn_state
->pendingAreq
)
4096 process_pending_request(fmstate
->conn_state
->pendingAreq
);
4099 * If the existing query was deparsed and prepared for a different number
4100 * of rows, rebuild it for the proper number.
4102 if (operation
== CMD_INSERT
&& fmstate
->num_slots
!= *numSlots
)
4104 /* Destroy the prepared statement created previously */
4105 if (fmstate
->p_name
)
4106 deallocate_query(fmstate
);
4108 /* Build INSERT string with numSlots records in its VALUES clause. */
4109 initStringInfo(&sql
);
4110 rebuildInsertSql(&sql
, fmstate
->rel
,
4111 fmstate
->orig_query
, fmstate
->target_attrs
,
4112 fmstate
->values_end
, fmstate
->p_nums
,
4114 pfree(fmstate
->query
);
4115 fmstate
->query
= sql
.data
;
4116 fmstate
->num_slots
= *numSlots
;
4119 /* Set up the prepared statement on the remote server, if we didn't yet */
4120 if (!fmstate
->p_name
)
4121 prepare_foreign_modify(fmstate
);
4124 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4126 if (operation
== CMD_UPDATE
|| operation
== CMD_DELETE
)
4131 datum
= ExecGetJunkAttribute(planSlots
[0],
4134 /* shouldn't ever get a null result... */
4136 elog(ERROR
, "ctid is NULL");
4137 ctid
= (ItemPointer
) DatumGetPointer(datum
);
4140 /* Convert parameters needed by prepared statement to text form */
4141 p_values
= convert_prep_stmt_params(fmstate
, ctid
, slots
, *numSlots
);
4144 * Execute the prepared statement.
4146 if (!PQsendQueryPrepared(fmstate
->conn
,
4148 fmstate
->p_nums
* (*numSlots
),
4153 pgfdw_report_error(ERROR
, NULL
, fmstate
->conn
, false, fmstate
->query
);
4156 * Get the result, and check for success.
4158 * We don't use a PG_TRY block here, so be careful not to throw error
4159 * without releasing the PGresult.
4161 res
= pgfdw_get_result(fmstate
->conn
, fmstate
->query
);
4162 if (PQresultStatus(res
) !=
4163 (fmstate
->has_returning
? PGRES_TUPLES_OK
: PGRES_COMMAND_OK
))
4164 pgfdw_report_error(ERROR
, res
, fmstate
->conn
, true, fmstate
->query
);
4166 /* Check number of rows affected, and fetch RETURNING tuple if any */
4167 if (fmstate
->has_returning
)
4169 Assert(*numSlots
== 1);
4170 n_rows
= PQntuples(res
);
4172 store_returning_result(fmstate
, slots
[0], res
);
4175 n_rows
= atoi(PQcmdTuples(res
));
4180 MemoryContextReset(fmstate
->temp_cxt
);
4185 * Return NULL if nothing was inserted/updated/deleted on the remote end
4187 return (n_rows
> 0) ? slots
: NULL
;
4191 * prepare_foreign_modify
4192 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4195 prepare_foreign_modify(PgFdwModifyState
*fmstate
)
4197 char prep_name
[NAMEDATALEN
];
4202 * The caller would already have processed a pending asynchronous request
4203 * if any, so no need to do it here.
4206 /* Construct name we'll use for the prepared statement. */
4207 snprintf(prep_name
, sizeof(prep_name
), "pgsql_fdw_prep_%u",
4208 GetPrepStmtNumber(fmstate
->conn
));
4209 p_name
= pstrdup(prep_name
);
4212 * We intentionally do not specify parameter types here, but leave the
4213 * remote server to derive them by default. This avoids possible problems
4214 * with the remote server using different type OIDs than we do. All of
4215 * the prepared statements we use in this module are simple enough that
4216 * the remote server will make the right choices.
4218 if (!PQsendPrepare(fmstate
->conn
,
4223 pgfdw_report_error(ERROR
, NULL
, fmstate
->conn
, false, fmstate
->query
);
4226 * Get the result, and check for success.
4228 * We don't use a PG_TRY block here, so be careful not to throw error
4229 * without releasing the PGresult.
4231 res
= pgfdw_get_result(fmstate
->conn
, fmstate
->query
);
4232 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
4233 pgfdw_report_error(ERROR
, res
, fmstate
->conn
, true, fmstate
->query
);
4236 /* This action shows that the prepare has been done. */
4237 fmstate
->p_name
= p_name
;
4241 * convert_prep_stmt_params
4242 * Create array of text strings representing parameter values
4244 * tupleid is ctid to send, or NULL if none
4245 * slot is slot to get remaining parameters from, or NULL if none
4247 * Data is constructed in temp_cxt; caller should reset that after use.
4249 static const char **
4250 convert_prep_stmt_params(PgFdwModifyState
*fmstate
,
4251 ItemPointer tupleid
,
4252 TupleTableSlot
**slots
,
4255 const char **p_values
;
4259 MemoryContext oldcontext
;
4261 oldcontext
= MemoryContextSwitchTo(fmstate
->temp_cxt
);
4263 p_values
= (const char **) palloc(sizeof(char *) * fmstate
->p_nums
* numSlots
);
4265 /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4266 Assert(!(tupleid
!= NULL
&& numSlots
> 1));
4268 /* 1st parameter should be ctid, if it's in use */
4269 if (tupleid
!= NULL
)
4271 Assert(numSlots
== 1);
4272 /* don't need set_transmission_modes for TID output */
4273 p_values
[pindex
] = OutputFunctionCall(&fmstate
->p_flinfo
[pindex
],
4274 PointerGetDatum(tupleid
));
4278 /* get following parameters from slots */
4279 if (slots
!= NULL
&& fmstate
->target_attrs
!= NIL
)
4281 TupleDesc tupdesc
= RelationGetDescr(fmstate
->rel
);
4285 nestlevel
= set_transmission_modes();
4287 for (i
= 0; i
< numSlots
; i
++)
4289 j
= (tupleid
!= NULL
) ? 1 : 0;
4290 foreach(lc
, fmstate
->target_attrs
)
4292 int attnum
= lfirst_int(lc
);
4293 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, attnum
- 1);
4297 /* Ignore generated columns; they are set to DEFAULT */
4298 if (attr
->attgenerated
)
4300 value
= slot_getattr(slots
[i
], attnum
, &isnull
);
4302 p_values
[pindex
] = NULL
;
4304 p_values
[pindex
] = OutputFunctionCall(&fmstate
->p_flinfo
[j
],
4311 reset_transmission_modes(nestlevel
);
4314 Assert(pindex
== fmstate
->p_nums
* numSlots
);
4316 MemoryContextSwitchTo(oldcontext
);
4322 * store_returning_result
4323 * Store the result of a RETURNING clause
4325 * On error, be sure to release the PGresult on the way out. Callers do not
4326 * have PG_TRY blocks to ensure this happens.
4329 store_returning_result(PgFdwModifyState
*fmstate
,
4330 TupleTableSlot
*slot
, PGresult
*res
)
4336 newtup
= make_tuple_from_result_row(res
, 0,
4339 fmstate
->retrieved_attrs
,
4344 * The returning slot will not necessarily be suitable to store
4345 * heaptuples directly, so allow for conversion.
4347 ExecForceStoreHeapTuple(newtup
, slot
, true);
4358 * finish_foreign_modify
4359 * Release resources for a foreign insert/update/delete operation
4362 finish_foreign_modify(PgFdwModifyState
*fmstate
)
4364 Assert(fmstate
!= NULL
);
4366 /* If we created a prepared statement, destroy it */
4367 deallocate_query(fmstate
);
4369 /* Release remote connection */
4370 ReleaseConnection(fmstate
->conn
);
4371 fmstate
->conn
= NULL
;
4376 * Deallocate a prepared statement for a foreign insert/update/delete
4380 deallocate_query(PgFdwModifyState
*fmstate
)
4385 /* do nothing if the query is not allocated */
4386 if (!fmstate
->p_name
)
4389 snprintf(sql
, sizeof(sql
), "DEALLOCATE %s", fmstate
->p_name
);
4392 * We don't use a PG_TRY block here, so be careful not to throw error
4393 * without releasing the PGresult.
4395 res
= pgfdw_exec_query(fmstate
->conn
, sql
, fmstate
->conn_state
);
4396 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
4397 pgfdw_report_error(ERROR
, res
, fmstate
->conn
, true, sql
);
4399 pfree(fmstate
->p_name
);
4400 fmstate
->p_name
= NULL
;
4404 * build_remote_returning
4405 * Build a RETURNING targetlist of a remote query for performing an
4406 * UPDATE/DELETE .. RETURNING on a join directly
4409 build_remote_returning(Index rtindex
, Relation rel
, List
*returningList
)
4411 bool have_wholerow
= false;
4416 Assert(returningList
);
4418 vars
= pull_var_clause((Node
*) returningList
, PVC_INCLUDE_PLACEHOLDERS
);
4421 * If there's a whole-row reference to the target relation, then we'll
4422 * need all the columns of the relation.
4426 Var
*var
= (Var
*) lfirst(lc
);
4428 if (IsA(var
, Var
) &&
4429 var
->varno
== rtindex
&&
4430 var
->varattno
== InvalidAttrNumber
)
4432 have_wholerow
= true;
4439 TupleDesc tupdesc
= RelationGetDescr(rel
);
4442 for (i
= 1; i
<= tupdesc
->natts
; i
++)
4444 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, i
- 1);
4447 /* Ignore dropped attributes. */
4448 if (attr
->attisdropped
)
4451 var
= makeVar(rtindex
,
4458 tlist
= lappend(tlist
,
4459 makeTargetEntry((Expr
*) var
,
4460 list_length(tlist
) + 1,
4466 /* Now add any remaining columns to tlist. */
4469 Var
*var
= (Var
*) lfirst(lc
);
4472 * No need for whole-row references to the target relation. We don't
4473 * need system columns other than ctid and oid either, since those are
4476 if (IsA(var
, Var
) &&
4477 var
->varno
== rtindex
&&
4478 var
->varattno
<= InvalidAttrNumber
&&
4479 var
->varattno
!= SelfItemPointerAttributeNumber
)
4480 continue; /* don't need it */
4482 if (tlist_member((Expr
*) var
, tlist
))
4483 continue; /* already got it */
4485 tlist
= lappend(tlist
,
4486 makeTargetEntry((Expr
*) var
,
4487 list_length(tlist
) + 1,
4498 * rebuild_fdw_scan_tlist
4499 * Build new fdw_scan_tlist of given foreign-scan plan node from given
4502 * There might be columns that the fdw_scan_tlist of the given foreign-scan
4503 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4504 * have contained resjunk columns such as 'ctid' of the target relation and
4505 * 'wholerow' of non-target relations, but the tlist might not contain them,
4506 * for example. So, adjust the tlist so it contains all the columns specified
4507 * in the fdw_scan_tlist; else setrefs.c will get confused.
4510 rebuild_fdw_scan_tlist(ForeignScan
*fscan
, List
*tlist
)
4512 List
*new_tlist
= tlist
;
4513 List
*old_tlist
= fscan
->fdw_scan_tlist
;
4516 foreach(lc
, old_tlist
)
4518 TargetEntry
*tle
= (TargetEntry
*) lfirst(lc
);
4520 if (tlist_member(tle
->expr
, new_tlist
))
4521 continue; /* already got it */
4523 new_tlist
= lappend(new_tlist
,
4524 makeTargetEntry(tle
->expr
,
4525 list_length(new_tlist
) + 1,
4529 fscan
->fdw_scan_tlist
= new_tlist
;
4533 * Execute a direct UPDATE/DELETE statement.
4536 execute_dml_stmt(ForeignScanState
*node
)
4538 PgFdwDirectModifyState
*dmstate
= (PgFdwDirectModifyState
*) node
->fdw_state
;
4539 ExprContext
*econtext
= node
->ss
.ps
.ps_ExprContext
;
4540 int numParams
= dmstate
->numParams
;
4541 const char **values
= dmstate
->param_values
;
4543 /* First, process a pending asynchronous request, if any. */
4544 if (dmstate
->conn_state
->pendingAreq
)
4545 process_pending_request(dmstate
->conn_state
->pendingAreq
);
4548 * Construct array of query parameter values in text format.
4551 process_query_params(econtext
,
4552 dmstate
->param_flinfo
,
4553 dmstate
->param_exprs
,
4557 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4558 * to infer types for all parameters. Since we explicitly cast every
4559 * parameter (see deparse.c), the "inference" is trivial and will produce
4560 * the desired result. This allows us to avoid assuming that the remote
4561 * server has the same OIDs we do for the parameters' types.
4563 if (!PQsendQueryParams(dmstate
->conn
, dmstate
->query
, numParams
,
4564 NULL
, values
, NULL
, NULL
, 0))
4565 pgfdw_report_error(ERROR
, NULL
, dmstate
->conn
, false, dmstate
->query
);
4568 * Get the result, and check for success.
4570 * We don't use a PG_TRY block here, so be careful not to throw error
4571 * without releasing the PGresult.
4573 dmstate
->result
= pgfdw_get_result(dmstate
->conn
, dmstate
->query
);
4574 if (PQresultStatus(dmstate
->result
) !=
4575 (dmstate
->has_returning
? PGRES_TUPLES_OK
: PGRES_COMMAND_OK
))
4576 pgfdw_report_error(ERROR
, dmstate
->result
, dmstate
->conn
, true,
4579 /* Get the number of rows affected. */
4580 if (dmstate
->has_returning
)
4581 dmstate
->num_tuples
= PQntuples(dmstate
->result
);
4583 dmstate
->num_tuples
= atoi(PQcmdTuples(dmstate
->result
));
4587 * Get the result of a RETURNING clause.
4589 static TupleTableSlot
*
4590 get_returning_data(ForeignScanState
*node
)
4592 PgFdwDirectModifyState
*dmstate
= (PgFdwDirectModifyState
*) node
->fdw_state
;
4593 EState
*estate
= node
->ss
.ps
.state
;
4594 ResultRelInfo
*resultRelInfo
= node
->resultRelInfo
;
4595 TupleTableSlot
*slot
= node
->ss
.ss_ScanTupleSlot
;
4596 TupleTableSlot
*resultSlot
;
4598 Assert(resultRelInfo
->ri_projectReturning
);
4600 /* If we didn't get any tuples, must be end of data. */
4601 if (dmstate
->next_tuple
>= dmstate
->num_tuples
)
4602 return ExecClearTuple(slot
);
4604 /* Increment the command es_processed count if necessary. */
4605 if (dmstate
->set_processed
)
4606 estate
->es_processed
+= 1;
4609 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4610 * tuple. (has_returning is false when the local query is of the form
4611 * "UPDATE/DELETE .. RETURNING 1" for example.)
4613 if (!dmstate
->has_returning
)
4615 ExecStoreAllNullTuple(slot
);
4621 * On error, be sure to release the PGresult on the way out. Callers
4622 * do not have PG_TRY blocks to ensure this happens.
4628 newtup
= make_tuple_from_result_row(dmstate
->result
,
4629 dmstate
->next_tuple
,
4632 dmstate
->retrieved_attrs
,
4635 ExecStoreHeapTuple(newtup
, slot
, false);
4639 PQclear(dmstate
->result
);
4644 /* Get the updated/deleted tuple. */
4648 resultSlot
= apply_returning_filter(dmstate
, resultRelInfo
, slot
, estate
);
4650 dmstate
->next_tuple
++;
4652 /* Make slot available for evaluation of the local query RETURNING list. */
4653 resultRelInfo
->ri_projectReturning
->pi_exprContext
->ecxt_scantuple
=
4660 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4663 init_returning_filter(PgFdwDirectModifyState
*dmstate
,
4664 List
*fdw_scan_tlist
,
4667 TupleDesc resultTupType
= RelationGetDescr(dmstate
->resultRel
);
4672 * Calculate the mapping between the fdw_scan_tlist's entries and the
4673 * result tuple's attributes.
4675 * The "map" is an array of indexes of the result tuple's attributes in
4676 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4677 * tuple. We store zero for any attributes that don't have the
4678 * corresponding entries in that list, marking that a NULL is needed in
4681 * Also get the indexes of the entries for ctid and oid if any.
4683 dmstate
->attnoMap
= (AttrNumber
*)
4684 palloc0(resultTupType
->natts
* sizeof(AttrNumber
));
4686 dmstate
->ctidAttno
= dmstate
->oidAttno
= 0;
4689 dmstate
->hasSystemCols
= false;
4690 foreach(lc
, fdw_scan_tlist
)
4692 TargetEntry
*tle
= (TargetEntry
*) lfirst(lc
);
4693 Var
*var
= (Var
*) tle
->expr
;
4695 Assert(IsA(var
, Var
));
4698 * If the Var is a column of the target relation to be retrieved from
4699 * the foreign server, get the index of the entry.
4701 if (var
->varno
== rtindex
&&
4702 list_member_int(dmstate
->retrieved_attrs
, i
))
4704 int attrno
= var
->varattno
;
4709 * We don't retrieve system columns other than ctid and oid.
4711 if (attrno
== SelfItemPointerAttributeNumber
)
4712 dmstate
->ctidAttno
= i
;
4715 dmstate
->hasSystemCols
= true;
4720 * We don't retrieve whole-row references to the target
4725 dmstate
->attnoMap
[attrno
- 1] = i
;
4733 * Extract and return an updated/deleted tuple from a scan tuple.
4735 static TupleTableSlot
*
4736 apply_returning_filter(PgFdwDirectModifyState
*dmstate
,
4737 ResultRelInfo
*resultRelInfo
,
4738 TupleTableSlot
*slot
,
4741 TupleDesc resultTupType
= RelationGetDescr(dmstate
->resultRel
);
4742 TupleTableSlot
*resultSlot
;
4750 * Use the return tuple slot as a place to store the result tuple.
4752 resultSlot
= ExecGetReturningSlot(estate
, resultRelInfo
);
4755 * Extract all the values of the scan tuple.
4757 slot_getallattrs(slot
);
4758 old_values
= slot
->tts_values
;
4759 old_isnull
= slot
->tts_isnull
;
4762 * Prepare to build the result tuple.
4764 ExecClearTuple(resultSlot
);
4765 values
= resultSlot
->tts_values
;
4766 isnull
= resultSlot
->tts_isnull
;
4769 * Transpose data into proper fields of the result tuple.
4771 for (i
= 0; i
< resultTupType
->natts
; i
++)
4773 int j
= dmstate
->attnoMap
[i
];
4777 values
[i
] = (Datum
) 0;
4782 values
[i
] = old_values
[j
- 1];
4783 isnull
[i
] = old_isnull
[j
- 1];
4788 * Build the virtual tuple.
4790 ExecStoreVirtualTuple(resultSlot
);
4793 * If we have any system columns to return, materialize a heap tuple in
4794 * the slot from column values set above and install system columns in
4797 if (dmstate
->hasSystemCols
)
4799 HeapTuple resultTup
= ExecFetchSlotHeapTuple(resultSlot
, true, NULL
);
4802 if (dmstate
->ctidAttno
)
4804 ItemPointer ctid
= NULL
;
4806 ctid
= (ItemPointer
) DatumGetPointer(old_values
[dmstate
->ctidAttno
- 1]);
4807 resultTup
->t_self
= *ctid
;
4811 * And remaining columns
4813 * Note: since we currently don't allow the target relation to appear
4814 * on the nullable side of an outer join, any system columns wouldn't
4817 * Note: no need to care about tableoid here because it will be
4818 * initialized in ExecProcessReturning().
4820 HeapTupleHeaderSetXmin(resultTup
->t_data
, InvalidTransactionId
);
4821 HeapTupleHeaderSetXmax(resultTup
->t_data
, InvalidTransactionId
);
4822 HeapTupleHeaderSetCmin(resultTup
->t_data
, InvalidTransactionId
);
4826 * And return the result tuple.
4832 * Prepare for processing of parameters used in remote query.
4835 prepare_query_params(PlanState
*node
,
4838 FmgrInfo
**param_flinfo
,
4840 const char ***param_values
)
4845 Assert(numParams
> 0);
4847 /* Prepare for output conversion of parameters used in remote query. */
4848 *param_flinfo
= (FmgrInfo
*) palloc0(sizeof(FmgrInfo
) * numParams
);
4851 foreach(lc
, fdw_exprs
)
4853 Node
*param_expr
= (Node
*) lfirst(lc
);
4857 getTypeOutputInfo(exprType(param_expr
), &typefnoid
, &isvarlena
);
4858 fmgr_info(typefnoid
, &(*param_flinfo
)[i
]);
4863 * Prepare remote-parameter expressions for evaluation. (Note: in
4864 * practice, we expect that all these expressions will be just Params, so
4865 * we could possibly do something more efficient than using the full
4866 * expression-eval machinery for this. But probably there would be little
4867 * benefit, and it'd require postgres_fdw to know more than is desirable
4868 * about Param evaluation.)
4870 *param_exprs
= ExecInitExprList(fdw_exprs
, node
);
4872 /* Allocate buffer for text form of query parameters. */
4873 *param_values
= (const char **) palloc0(numParams
* sizeof(char *));
4877 * Construct array of query parameter values in text format.
4880 process_query_params(ExprContext
*econtext
,
4881 FmgrInfo
*param_flinfo
,
4883 const char **param_values
)
4889 nestlevel
= set_transmission_modes();
4892 foreach(lc
, param_exprs
)
4894 ExprState
*expr_state
= (ExprState
*) lfirst(lc
);
4898 /* Evaluate the parameter expression */
4899 expr_value
= ExecEvalExpr(expr_state
, econtext
, &isNull
);
4902 * Get string representation of each parameter value by invoking
4903 * type-specific output function, unless the value is null.
4906 param_values
[i
] = NULL
;
4908 param_values
[i
] = OutputFunctionCall(¶m_flinfo
[i
], expr_value
);
4913 reset_transmission_modes(nestlevel
);
4917 * postgresAnalyzeForeignTable
4918 * Test whether analyzing this foreign table is supported
4921 postgresAnalyzeForeignTable(Relation relation
,
4922 AcquireSampleRowsFunc
*func
,
4923 BlockNumber
*totalpages
)
4925 ForeignTable
*table
;
4929 PGresult
*volatile res
= NULL
;
4931 /* Return the row-analysis function pointer */
4932 *func
= postgresAcquireSampleRowsFunc
;
4935 * Now we have to get the number of pages. It's annoying that the ANALYZE
4936 * API requires us to return that now, because it forces some duplication
4937 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4938 * it's probably not worth redefining that API at this point.
4942 * Get the connection to use. We do the remote access as the table's
4943 * owner, even if the ANALYZE was started by some other user.
4945 table
= GetForeignTable(RelationGetRelid(relation
));
4946 user
= GetUserMapping(relation
->rd_rel
->relowner
, table
->serverid
);
4947 conn
= GetConnection(user
, false, NULL
);
4950 * Construct command to get page count for relation.
4952 initStringInfo(&sql
);
4953 deparseAnalyzeSizeSql(&sql
, relation
);
4955 /* In what follows, do not risk leaking any PGresults. */
4958 res
= pgfdw_exec_query(conn
, sql
.data
, NULL
);
4959 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
4960 pgfdw_report_error(ERROR
, res
, conn
, false, sql
.data
);
4962 if (PQntuples(res
) != 1 || PQnfields(res
) != 1)
4963 elog(ERROR
, "unexpected result from deparseAnalyzeSizeSql query");
4964 *totalpages
= strtoul(PQgetvalue(res
, 0, 0), NULL
, 10);
4972 ReleaseConnection(conn
);
4978 * postgresGetAnalyzeInfoForForeignTable
4979 * Count tuples in foreign table (just get pg_class.reltuples).
4981 * can_tablesample determines if the remote relation supports acquiring the
4982 * sample using TABLESAMPLE.
4985 postgresGetAnalyzeInfoForForeignTable(Relation relation
, bool *can_tablesample
)
4987 ForeignTable
*table
;
4991 PGresult
*volatile res
= NULL
;
4992 volatile double reltuples
= -1;
4993 volatile char relkind
= 0;
4995 /* assume the remote relation does not support TABLESAMPLE */
4996 *can_tablesample
= false;
4999 * Get the connection to use. We do the remote access as the table's
5000 * owner, even if the ANALYZE was started by some other user.
5002 table
= GetForeignTable(RelationGetRelid(relation
));
5003 user
= GetUserMapping(relation
->rd_rel
->relowner
, table
->serverid
);
5004 conn
= GetConnection(user
, false, NULL
);
5007 * Construct command to get page count for relation.
5009 initStringInfo(&sql
);
5010 deparseAnalyzeInfoSql(&sql
, relation
);
5012 /* In what follows, do not risk leaking any PGresults. */
5015 res
= pgfdw_exec_query(conn
, sql
.data
, NULL
);
5016 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
5017 pgfdw_report_error(ERROR
, res
, conn
, false, sql
.data
);
5019 if (PQntuples(res
) != 1 || PQnfields(res
) != 2)
5020 elog(ERROR
, "unexpected result from deparseAnalyzeInfoSql query");
5021 reltuples
= strtod(PQgetvalue(res
, 0, 0), NULL
);
5022 relkind
= *(PQgetvalue(res
, 0, 1));
5031 ReleaseConnection(conn
);
5033 /* TABLESAMPLE is supported only for regular tables and matviews */
5034 *can_tablesample
= (relkind
== RELKIND_RELATION
||
5035 relkind
== RELKIND_MATVIEW
||
5036 relkind
== RELKIND_PARTITIONED_TABLE
);
5042 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
5044 * Selected rows are returned in the caller-allocated array rows[],
5045 * which must have at least targrows entries.
5046 * The actual number of rows selected is returned as the function result.
5047 * We also count the total number of rows in the table and return it into
5048 * *totalrows. Note that *totaldeadrows is always set to 0.
5050 * Note that the returned list of rows is not always in order by physical
5051 * position in the table. Therefore, correlation estimates derived later
5052 * may be meaningless, but it's OK because we don't use the estimates
5053 * currently (the planner only pays attention to correlation for indexscans).
5056 postgresAcquireSampleRowsFunc(Relation relation
, int elevel
,
5057 HeapTuple
*rows
, int targrows
,
5059 double *totaldeadrows
)
5061 PgFdwAnalyzeState astate
;
5062 ForeignTable
*table
;
5063 ForeignServer
*server
;
5066 int server_version_num
;
5067 PgFdwSamplingMethod method
= ANALYZE_SAMPLE_AUTO
; /* auto is default */
5068 double sample_frac
= -1.0;
5070 unsigned int cursor_number
;
5072 PGresult
*volatile res
= NULL
;
5075 /* Initialize workspace state */
5076 astate
.rel
= relation
;
5077 astate
.attinmeta
= TupleDescGetAttInMetadata(RelationGetDescr(relation
));
5080 astate
.targrows
= targrows
;
5082 astate
.samplerows
= 0;
5083 astate
.rowstoskip
= -1; /* -1 means not set yet */
5084 reservoir_init_selection_state(&astate
.rstate
, targrows
);
5086 /* Remember ANALYZE context, and create a per-tuple temp context */
5087 astate
.anl_cxt
= CurrentMemoryContext
;
5088 astate
.temp_cxt
= AllocSetContextCreate(CurrentMemoryContext
,
5089 "postgres_fdw temporary data",
5090 ALLOCSET_SMALL_SIZES
);
5093 * Get the connection to use. We do the remote access as the table's
5094 * owner, even if the ANALYZE was started by some other user.
5096 table
= GetForeignTable(RelationGetRelid(relation
));
5097 server
= GetForeignServer(table
->serverid
);
5098 user
= GetUserMapping(relation
->rd_rel
->relowner
, table
->serverid
);
5099 conn
= GetConnection(user
, false, NULL
);
5101 /* We'll need server version, so fetch it now. */
5102 server_version_num
= PQserverVersion(conn
);
5105 * What sampling method should we use?
5107 foreach(lc
, server
->options
)
5109 DefElem
*def
= (DefElem
*) lfirst(lc
);
5111 if (strcmp(def
->defname
, "analyze_sampling") == 0)
5113 char *value
= defGetString(def
);
5115 if (strcmp(value
, "off") == 0)
5116 method
= ANALYZE_SAMPLE_OFF
;
5117 else if (strcmp(value
, "auto") == 0)
5118 method
= ANALYZE_SAMPLE_AUTO
;
5119 else if (strcmp(value
, "random") == 0)
5120 method
= ANALYZE_SAMPLE_RANDOM
;
5121 else if (strcmp(value
, "system") == 0)
5122 method
= ANALYZE_SAMPLE_SYSTEM
;
5123 else if (strcmp(value
, "bernoulli") == 0)
5124 method
= ANALYZE_SAMPLE_BERNOULLI
;
5130 foreach(lc
, table
->options
)
5132 DefElem
*def
= (DefElem
*) lfirst(lc
);
5134 if (strcmp(def
->defname
, "analyze_sampling") == 0)
5136 char *value
= defGetString(def
);
5138 if (strcmp(value
, "off") == 0)
5139 method
= ANALYZE_SAMPLE_OFF
;
5140 else if (strcmp(value
, "auto") == 0)
5141 method
= ANALYZE_SAMPLE_AUTO
;
5142 else if (strcmp(value
, "random") == 0)
5143 method
= ANALYZE_SAMPLE_RANDOM
;
5144 else if (strcmp(value
, "system") == 0)
5145 method
= ANALYZE_SAMPLE_SYSTEM
;
5146 else if (strcmp(value
, "bernoulli") == 0)
5147 method
= ANALYZE_SAMPLE_BERNOULLI
;
5154 * Error-out if explicitly required one of the TABLESAMPLE methods, but
5155 * the server does not support it.
5157 if ((server_version_num
< 95000) &&
5158 (method
== ANALYZE_SAMPLE_SYSTEM
||
5159 method
== ANALYZE_SAMPLE_BERNOULLI
))
5161 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
5162 errmsg("remote server does not support TABLESAMPLE feature")));
5165 * If we've decided to do remote sampling, calculate the sampling rate. We
5166 * need to get the number of tuples from the remote server, but skip that
5167 * network round-trip if not needed.
5169 if (method
!= ANALYZE_SAMPLE_OFF
)
5171 bool can_tablesample
;
5173 reltuples
= postgresGetAnalyzeInfoForForeignTable(relation
,
5177 * Make sure we're not choosing TABLESAMPLE when the remote relation
5178 * does not support that. But only do this for "auto" - if the user
5179 * explicitly requested BERNOULLI/SYSTEM, it's better to fail.
5181 if (!can_tablesample
&& (method
== ANALYZE_SAMPLE_AUTO
))
5182 method
= ANALYZE_SAMPLE_RANDOM
;
5185 * Remote's reltuples could be 0 or -1 if the table has never been
5186 * vacuumed/analyzed. In that case, disable sampling after all.
5188 if ((reltuples
<= 0) || (targrows
>= reltuples
))
5189 method
= ANALYZE_SAMPLE_OFF
;
5193 * All supported sampling methods require sampling rate, not
5194 * target rows directly, so we calculate that using the remote
5195 * reltuples value. That's imperfect, because it might be off a
5196 * good deal, but that's not something we can (or should) address
5199 * If reltuples is too low (i.e. when table grew), we'll end up
5200 * sampling more rows - but then we'll apply the local sampling,
5201 * so we get the expected sample size. This is the same outcome as
5202 * without remote sampling.
5204 * If reltuples is too high (e.g. after bulk DELETE), we will end
5205 * up sampling too few rows.
5207 * We can't really do much better here - we could try sampling a
5208 * bit more rows, but we don't know how off the reltuples value is
5209 * so how much is "a bit more"?
5211 * Furthermore, the targrows value for partitions is determined
5212 * based on table size (relpages), which can be off in different
5213 * ways too. Adjusting the sampling rate here might make the issue
5216 sample_frac
= targrows
/ reltuples
;
5219 * We should never get sampling rate outside the valid range
5220 * (between 0.0 and 1.0), because those cases should be covered by
5221 * the previous branch that sets ANALYZE_SAMPLE_OFF.
5223 Assert(sample_frac
>= 0.0 && sample_frac
<= 1.0);
5228 * For "auto" method, pick the one we believe is best. For servers with
5229 * TABLESAMPLE support we pick BERNOULLI, for old servers we fall-back to
5230 * random() to at least reduce network transfer.
5232 if (method
== ANALYZE_SAMPLE_AUTO
)
5234 if (server_version_num
< 95000)
5235 method
= ANALYZE_SAMPLE_RANDOM
;
5237 method
= ANALYZE_SAMPLE_BERNOULLI
;
5241 * Construct cursor that retrieves whole rows from remote.
5243 cursor_number
= GetCursorNumber(conn
);
5244 initStringInfo(&sql
);
5245 appendStringInfo(&sql
, "DECLARE c%u CURSOR FOR ", cursor_number
);
5247 deparseAnalyzeSql(&sql
, relation
, method
, sample_frac
, &astate
.retrieved_attrs
);
5249 /* In what follows, do not risk leaking any PGresults. */
5255 res
= pgfdw_exec_query(conn
, sql
.data
, NULL
);
5256 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
5257 pgfdw_report_error(ERROR
, res
, conn
, false, sql
.data
);
5262 * Determine the fetch size. The default is arbitrary, but shouldn't
5266 foreach(lc
, server
->options
)
5268 DefElem
*def
= (DefElem
*) lfirst(lc
);
5270 if (strcmp(def
->defname
, "fetch_size") == 0)
5272 (void) parse_int(defGetString(def
), &fetch_size
, 0, NULL
);
5276 foreach(lc
, table
->options
)
5278 DefElem
*def
= (DefElem
*) lfirst(lc
);
5280 if (strcmp(def
->defname
, "fetch_size") == 0)
5282 (void) parse_int(defGetString(def
), &fetch_size
, 0, NULL
);
5287 /* Construct command to fetch rows from remote. */
5288 snprintf(fetch_sql
, sizeof(fetch_sql
), "FETCH %d FROM c%u",
5289 fetch_size
, cursor_number
);
5291 /* Retrieve and process rows a batch at a time. */
5297 /* Allow users to cancel long query */
5298 CHECK_FOR_INTERRUPTS();
5301 * XXX possible future improvement: if rowstoskip is large, we
5302 * could issue a MOVE rather than physically fetching the rows,
5303 * then just adjust rowstoskip and samplerows appropriately.
5306 /* Fetch some rows */
5307 res
= pgfdw_exec_query(conn
, fetch_sql
, NULL
);
5308 /* On error, report the original query, not the FETCH. */
5309 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
5310 pgfdw_report_error(ERROR
, res
, conn
, false, sql
.data
);
5312 /* Process whatever we got. */
5313 numrows
= PQntuples(res
);
5314 for (i
= 0; i
< numrows
; i
++)
5315 analyze_row_processor(res
, i
, &astate
);
5320 /* Must be EOF if we didn't get all the rows requested. */
5321 if (numrows
< fetch_size
)
5325 /* Close the cursor, just to be tidy. */
5326 close_cursor(conn
, cursor_number
, NULL
);
5335 ReleaseConnection(conn
);
5337 /* We assume that we have no dead tuple. */
5338 *totaldeadrows
= 0.0;
5341 * Without sampling, we've retrieved all living tuples from foreign
5342 * server, so report that as totalrows. Otherwise use the reltuples
5343 * estimate we got from the remote side.
5345 if (method
== ANALYZE_SAMPLE_OFF
)
5346 *totalrows
= astate
.samplerows
;
5348 *totalrows
= reltuples
;
5351 * Emit some interesting relation info
5354 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
5355 RelationGetRelationName(relation
),
5356 *totalrows
, astate
.numrows
)));
5358 return astate
.numrows
;
5362 * Collect sample rows from the result of query.
5363 * - Use all tuples in sample until target # of samples are collected.
5364 * - Subsequently, replace already-sampled tuples randomly.
5367 analyze_row_processor(PGresult
*res
, int row
, PgFdwAnalyzeState
*astate
)
5369 int targrows
= astate
->targrows
;
5370 int pos
; /* array index to store tuple in */
5371 MemoryContext oldcontext
;
5373 /* Always increment sample row counter. */
5374 astate
->samplerows
+= 1;
5377 * Determine the slot where this sample row should be stored. Set pos to
5378 * negative value to indicate the row should be skipped.
5380 if (astate
->numrows
< targrows
)
5382 /* First targrows rows are always included into the sample */
5383 pos
= astate
->numrows
++;
5388 * Now we start replacing tuples in the sample until we reach the end
5389 * of the relation. Same algorithm as in acquire_sample_rows in
5390 * analyze.c; see Jeff Vitter's paper.
5392 if (astate
->rowstoskip
< 0)
5393 astate
->rowstoskip
= reservoir_get_next_S(&astate
->rstate
, astate
->samplerows
, targrows
);
5395 if (astate
->rowstoskip
<= 0)
5397 /* Choose a random reservoir element to replace. */
5398 pos
= (int) (targrows
* sampler_random_fract(&astate
->rstate
.randstate
));
5399 Assert(pos
>= 0 && pos
< targrows
);
5400 heap_freetuple(astate
->rows
[pos
]);
5404 /* Skip this tuple. */
5408 astate
->rowstoskip
-= 1;
5414 * Create sample tuple from current result row, and store it in the
5415 * position determined above. The tuple has to be created in anl_cxt.
5417 oldcontext
= MemoryContextSwitchTo(astate
->anl_cxt
);
5419 astate
->rows
[pos
] = make_tuple_from_result_row(res
, row
,
5422 astate
->retrieved_attrs
,
5426 MemoryContextSwitchTo(oldcontext
);
5431 * Import a foreign schema
5434 postgresImportForeignSchema(ImportForeignSchemaStmt
*stmt
, Oid serverOid
)
5436 List
*commands
= NIL
;
5437 bool import_collate
= true;
5438 bool import_default
= false;
5439 bool import_generated
= true;
5440 bool import_not_null
= true;
5441 ForeignServer
*server
;
5442 UserMapping
*mapping
;
5445 PGresult
*volatile res
= NULL
;
5450 /* Parse statement options */
5451 foreach(lc
, stmt
->options
)
5453 DefElem
*def
= (DefElem
*) lfirst(lc
);
5455 if (strcmp(def
->defname
, "import_collate") == 0)
5456 import_collate
= defGetBoolean(def
);
5457 else if (strcmp(def
->defname
, "import_default") == 0)
5458 import_default
= defGetBoolean(def
);
5459 else if (strcmp(def
->defname
, "import_generated") == 0)
5460 import_generated
= defGetBoolean(def
);
5461 else if (strcmp(def
->defname
, "import_not_null") == 0)
5462 import_not_null
= defGetBoolean(def
);
5465 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME
),
5466 errmsg("invalid option \"%s\"", def
->defname
)));
5470 * Get connection to the foreign server. Connection manager will
5471 * establish new connection if necessary.
5473 server
= GetForeignServer(serverOid
);
5474 mapping
= GetUserMapping(GetUserId(), server
->serverid
);
5475 conn
= GetConnection(mapping
, false, NULL
);
5477 /* Don't attempt to import collation if remote server hasn't got it */
5478 if (PQserverVersion(conn
) < 90100)
5479 import_collate
= false;
5481 /* Create workspace for strings */
5482 initStringInfo(&buf
);
5484 /* In what follows, do not risk leaking any PGresults. */
5487 /* Check that the schema really exists */
5488 appendStringInfoString(&buf
, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
5489 deparseStringLiteral(&buf
, stmt
->remote_schema
);
5491 res
= pgfdw_exec_query(conn
, buf
.data
, NULL
);
5492 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
5493 pgfdw_report_error(ERROR
, res
, conn
, false, buf
.data
);
5495 if (PQntuples(res
) != 1)
5497 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND
),
5498 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
5499 stmt
->remote_schema
, server
->servername
)));
5503 resetStringInfo(&buf
);
5506 * Fetch all table data from this schema, possibly restricted by
5507 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
5508 * to EXCEPT/LIMIT TO here, because the core code will filter the
5509 * statements we return according to those lists anyway. But it
5510 * should save a few cycles to not process excluded tables in the
5513 * Import table data for partitions only when they are explicitly
5514 * specified in LIMIT TO clause. Otherwise ignore them and only
5515 * include the definitions of the root partitioned tables to allow
5516 * access to the complete remote data set locally in the schema
5519 * Note: because we run the connection with search_path restricted to
5520 * pg_catalog, the format_type() and pg_get_expr() outputs will always
5521 * include a schema name for types/functions in other schemas, which
5524 appendStringInfoString(&buf
,
5527 " format_type(atttypid, atttypmod), "
5529 " pg_get_expr(adbin, adrelid), ");
5531 /* Generated columns are supported since Postgres 12 */
5532 if (PQserverVersion(conn
) >= 120000)
5533 appendStringInfoString(&buf
,
5536 appendStringInfoString(&buf
,
5540 appendStringInfoString(&buf
,
5542 " collnsp.nspname ");
5544 appendStringInfoString(&buf
,
5547 appendStringInfoString(&buf
,
5549 " JOIN pg_namespace n ON "
5550 " relnamespace = n.oid "
5551 " LEFT JOIN pg_attribute a ON "
5552 " attrelid = c.oid AND attnum > 0 "
5553 " AND NOT attisdropped "
5554 " LEFT JOIN pg_attrdef ad ON "
5555 " adrelid = c.oid AND adnum = attnum ");
5558 appendStringInfoString(&buf
,
5559 " LEFT JOIN pg_collation coll ON "
5560 " coll.oid = attcollation "
5561 " LEFT JOIN pg_namespace collnsp ON "
5562 " collnsp.oid = collnamespace ");
5564 appendStringInfoString(&buf
,
5565 "WHERE c.relkind IN ("
5566 CppAsString2(RELKIND_RELATION
) ","
5567 CppAsString2(RELKIND_VIEW
) ","
5568 CppAsString2(RELKIND_FOREIGN_TABLE
) ","
5569 CppAsString2(RELKIND_MATVIEW
) ","
5570 CppAsString2(RELKIND_PARTITIONED_TABLE
) ") "
5571 " AND n.nspname = ");
5572 deparseStringLiteral(&buf
, stmt
->remote_schema
);
5574 /* Partitions are supported since Postgres 10 */
5575 if (PQserverVersion(conn
) >= 100000 &&
5576 stmt
->list_type
!= FDW_IMPORT_SCHEMA_LIMIT_TO
)
5577 appendStringInfoString(&buf
, " AND NOT c.relispartition ");
5579 /* Apply restrictions for LIMIT TO and EXCEPT */
5580 if (stmt
->list_type
== FDW_IMPORT_SCHEMA_LIMIT_TO
||
5581 stmt
->list_type
== FDW_IMPORT_SCHEMA_EXCEPT
)
5583 bool first_item
= true;
5585 appendStringInfoString(&buf
, " AND c.relname ");
5586 if (stmt
->list_type
== FDW_IMPORT_SCHEMA_EXCEPT
)
5587 appendStringInfoString(&buf
, "NOT ");
5588 appendStringInfoString(&buf
, "IN (");
5590 /* Append list of table names within IN clause */
5591 foreach(lc
, stmt
->table_list
)
5593 RangeVar
*rv
= (RangeVar
*) lfirst(lc
);
5598 appendStringInfoString(&buf
, ", ");
5599 deparseStringLiteral(&buf
, rv
->relname
);
5601 appendStringInfoChar(&buf
, ')');
5604 /* Append ORDER BY at the end of query to ensure output ordering */
5605 appendStringInfoString(&buf
, " ORDER BY c.relname, a.attnum");
5607 /* Fetch the data */
5608 res
= pgfdw_exec_query(conn
, buf
.data
, NULL
);
5609 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
5610 pgfdw_report_error(ERROR
, res
, conn
, false, buf
.data
);
5612 /* Process results */
5613 numrows
= PQntuples(res
);
5614 /* note: incrementation of i happens in inner loop's while() test */
5615 for (i
= 0; i
< numrows
;)
5617 char *tablename
= PQgetvalue(res
, i
, 0);
5618 bool first_item
= true;
5620 resetStringInfo(&buf
);
5621 appendStringInfo(&buf
, "CREATE FOREIGN TABLE %s (\n",
5622 quote_identifier(tablename
));
5624 /* Scan all rows for this table */
5633 char *collnamespace
;
5635 /* If table has no columns, we'll see nulls here */
5636 if (PQgetisnull(res
, i
, 1))
5639 attname
= PQgetvalue(res
, i
, 1);
5640 typename
= PQgetvalue(res
, i
, 2);
5641 attnotnull
= PQgetvalue(res
, i
, 3);
5642 attdefault
= PQgetisnull(res
, i
, 4) ? (char *) NULL
:
5643 PQgetvalue(res
, i
, 4);
5644 attgenerated
= PQgetisnull(res
, i
, 5) ? (char *) NULL
:
5645 PQgetvalue(res
, i
, 5);
5646 collname
= PQgetisnull(res
, i
, 6) ? (char *) NULL
:
5647 PQgetvalue(res
, i
, 6);
5648 collnamespace
= PQgetisnull(res
, i
, 7) ? (char *) NULL
:
5649 PQgetvalue(res
, i
, 7);
5654 appendStringInfoString(&buf
, ",\n");
5656 /* Print column name and type */
5657 appendStringInfo(&buf
, " %s %s",
5658 quote_identifier(attname
),
5662 * Add column_name option so that renaming the foreign table's
5663 * column doesn't break the association to the underlying
5666 appendStringInfoString(&buf
, " OPTIONS (column_name ");
5667 deparseStringLiteral(&buf
, attname
);
5668 appendStringInfoChar(&buf
, ')');
5670 /* Add COLLATE if needed */
5671 if (import_collate
&& collname
!= NULL
&& collnamespace
!= NULL
)
5672 appendStringInfo(&buf
, " COLLATE %s.%s",
5673 quote_identifier(collnamespace
),
5674 quote_identifier(collname
));
5676 /* Add DEFAULT if needed */
5677 if (import_default
&& attdefault
!= NULL
&&
5678 (!attgenerated
|| !attgenerated
[0]))
5679 appendStringInfo(&buf
, " DEFAULT %s", attdefault
);
5681 /* Add GENERATED if needed */
5682 if (import_generated
&& attgenerated
!= NULL
&&
5683 attgenerated
[0] == ATTRIBUTE_GENERATED_STORED
)
5685 Assert(attdefault
!= NULL
);
5686 appendStringInfo(&buf
,
5687 " GENERATED ALWAYS AS (%s) STORED",
5691 /* Add NOT NULL if needed */
5692 if (import_not_null
&& attnotnull
[0] == 't')
5693 appendStringInfoString(&buf
, " NOT NULL");
5695 while (++i
< numrows
&&
5696 strcmp(PQgetvalue(res
, i
, 0), tablename
) == 0);
5699 * Add server name and table-level options. We specify remote
5700 * schema and table name as options (the latter to ensure that
5701 * renaming the foreign table doesn't break the association).
5703 appendStringInfo(&buf
, "\n) SERVER %s\nOPTIONS (",
5704 quote_identifier(server
->servername
));
5706 appendStringInfoString(&buf
, "schema_name ");
5707 deparseStringLiteral(&buf
, stmt
->remote_schema
);
5708 appendStringInfoString(&buf
, ", table_name ");
5709 deparseStringLiteral(&buf
, tablename
);
5711 appendStringInfoString(&buf
, ");");
5713 commands
= lappend(commands
, pstrdup(buf
.data
));
5722 ReleaseConnection(conn
);
5728 * Assess whether the join between inner and outer relations can be pushed down
5729 * to the foreign server. As a side effect, save information we obtain in this
5730 * function to PgFdwRelationInfo passed in.
5733 foreign_join_ok(PlannerInfo
*root
, RelOptInfo
*joinrel
, JoinType jointype
,
5734 RelOptInfo
*outerrel
, RelOptInfo
*innerrel
,
5735 JoinPathExtraData
*extra
)
5737 PgFdwRelationInfo
*fpinfo
;
5738 PgFdwRelationInfo
*fpinfo_o
;
5739 PgFdwRelationInfo
*fpinfo_i
;
5744 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
5745 * Constructing queries representing SEMI and ANTI joins is hard, hence
5746 * not considered right now.
5748 if (jointype
!= JOIN_INNER
&& jointype
!= JOIN_LEFT
&&
5749 jointype
!= JOIN_RIGHT
&& jointype
!= JOIN_FULL
)
5753 * If either of the joining relations is marked as unsafe to pushdown, the
5754 * join can not be pushed down.
5756 fpinfo
= (PgFdwRelationInfo
*) joinrel
->fdw_private
;
5757 fpinfo_o
= (PgFdwRelationInfo
*) outerrel
->fdw_private
;
5758 fpinfo_i
= (PgFdwRelationInfo
*) innerrel
->fdw_private
;
5759 if (!fpinfo_o
|| !fpinfo_o
->pushdown_safe
||
5760 !fpinfo_i
|| !fpinfo_i
->pushdown_safe
)
5764 * If joining relations have local conditions, those conditions are
5765 * required to be applied before joining the relations. Hence the join can
5766 * not be pushed down.
5768 if (fpinfo_o
->local_conds
|| fpinfo_i
->local_conds
)
5772 * Merge FDW options. We might be tempted to do this after we have deemed
5773 * the foreign join to be OK. But we must do this beforehand so that we
5774 * know which quals can be evaluated on the foreign server, which might
5775 * depend on shippable_extensions.
5777 fpinfo
->server
= fpinfo_o
->server
;
5778 merge_fdw_options(fpinfo
, fpinfo_o
, fpinfo_i
);
5781 * Separate restrict list into join quals and pushed-down (other) quals.
5783 * Join quals belonging to an outer join must all be shippable, else we
5784 * cannot execute the join remotely. Add such quals to 'joinclauses'.
5786 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5787 * fpinfo->local_conds. In an inner join it's okay to execute conditions
5788 * either locally or remotely; the same is true for pushed-down conditions
5791 * Note we might return failure after having already scribbled on
5792 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5793 * won't consult those lists again if we deem the join unshippable.
5796 foreach(lc
, extra
->restrictlist
)
5798 RestrictInfo
*rinfo
= lfirst_node(RestrictInfo
, lc
);
5799 bool is_remote_clause
= is_foreign_expr(root
, joinrel
,
5802 if (IS_OUTER_JOIN(jointype
) &&
5803 !RINFO_IS_PUSHED_DOWN(rinfo
, joinrel
->relids
))
5805 if (!is_remote_clause
)
5807 joinclauses
= lappend(joinclauses
, rinfo
);
5811 if (is_remote_clause
)
5812 fpinfo
->remote_conds
= lappend(fpinfo
->remote_conds
, rinfo
);
5814 fpinfo
->local_conds
= lappend(fpinfo
->local_conds
, rinfo
);
5819 * deparseExplicitTargetList() isn't smart enough to handle anything other
5820 * than a Var. In particular, if there's some PlaceHolderVar that would
5821 * need to be evaluated within this join tree (because there's an upper
5822 * reference to a quantity that may go to NULL as a result of an outer
5823 * join), then we can't try to push the join down because we'll fail when
5824 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5825 * needs to be evaluated *at the top* of this join tree is OK, because we
5826 * can do that locally after fetching the results from the remote side.
5828 foreach(lc
, root
->placeholder_list
)
5830 PlaceHolderInfo
*phinfo
= lfirst(lc
);
5833 /* PlaceHolderInfo refers to parent relids, not child relids. */
5834 relids
= IS_OTHER_REL(joinrel
) ?
5835 joinrel
->top_parent_relids
: joinrel
->relids
;
5837 if (bms_is_subset(phinfo
->ph_eval_at
, relids
) &&
5838 bms_nonempty_difference(relids
, phinfo
->ph_eval_at
))
5842 /* Save the join clauses, for later use. */
5843 fpinfo
->joinclauses
= joinclauses
;
5845 fpinfo
->outerrel
= outerrel
;
5846 fpinfo
->innerrel
= innerrel
;
5847 fpinfo
->jointype
= jointype
;
5850 * By default, both the input relations are not required to be deparsed as
5851 * subqueries, but there might be some relations covered by the input
5852 * relations that are required to be deparsed as subqueries, so save the
5853 * relids of those relations for later use by the deparser.
5855 fpinfo
->make_outerrel_subquery
= false;
5856 fpinfo
->make_innerrel_subquery
= false;
5857 Assert(bms_is_subset(fpinfo_o
->lower_subquery_rels
, outerrel
->relids
));
5858 Assert(bms_is_subset(fpinfo_i
->lower_subquery_rels
, innerrel
->relids
));
5859 fpinfo
->lower_subquery_rels
= bms_union(fpinfo_o
->lower_subquery_rels
,
5860 fpinfo_i
->lower_subquery_rels
);
5863 * Pull the other remote conditions from the joining relations into join
5864 * clauses or other remote clauses (remote_conds) of this relation
5865 * wherever possible. This avoids building subqueries at every join step.
5867 * For an inner join, clauses from both the relations are added to the
5868 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5869 * the outer side are added to remote_conds since those can be evaluated
5870 * after the join is evaluated. The clauses from inner side are added to
5871 * the joinclauses, since they need to be evaluated while constructing the
5874 * For a FULL OUTER JOIN, the other clauses from either relation can not
5875 * be added to the joinclauses or remote_conds, since each relation acts
5876 * as an outer relation for the other.
5878 * The joining sides can not have local conditions, thus no need to test
5879 * shippability of the clauses being pulled up.
5884 fpinfo
->remote_conds
= list_concat(fpinfo
->remote_conds
,
5885 fpinfo_i
->remote_conds
);
5886 fpinfo
->remote_conds
= list_concat(fpinfo
->remote_conds
,
5887 fpinfo_o
->remote_conds
);
5891 fpinfo
->joinclauses
= list_concat(fpinfo
->joinclauses
,
5892 fpinfo_i
->remote_conds
);
5893 fpinfo
->remote_conds
= list_concat(fpinfo
->remote_conds
,
5894 fpinfo_o
->remote_conds
);
5898 fpinfo
->joinclauses
= list_concat(fpinfo
->joinclauses
,
5899 fpinfo_o
->remote_conds
);
5900 fpinfo
->remote_conds
= list_concat(fpinfo
->remote_conds
,
5901 fpinfo_i
->remote_conds
);
5907 * In this case, if any of the input relations has conditions, we
5908 * need to deparse that relation as a subquery so that the
5909 * conditions can be evaluated before the join. Remember it in
5910 * the fpinfo of this relation so that the deparser can take
5911 * appropriate action. Also, save the relids of base relations
5912 * covered by that relation for later use by the deparser.
5914 if (fpinfo_o
->remote_conds
)
5916 fpinfo
->make_outerrel_subquery
= true;
5917 fpinfo
->lower_subquery_rels
=
5918 bms_add_members(fpinfo
->lower_subquery_rels
,
5921 if (fpinfo_i
->remote_conds
)
5923 fpinfo
->make_innerrel_subquery
= true;
5924 fpinfo
->lower_subquery_rels
=
5925 bms_add_members(fpinfo
->lower_subquery_rels
,
5931 /* Should not happen, we have just checked this above */
5932 elog(ERROR
, "unsupported join type %d", jointype
);
5936 * For an inner join, all restrictions can be treated alike. Treating the
5937 * pushed down conditions as join conditions allows a top level full outer
5938 * join to be deparsed without requiring subqueries.
5940 if (jointype
== JOIN_INNER
)
5942 Assert(!fpinfo
->joinclauses
);
5943 fpinfo
->joinclauses
= fpinfo
->remote_conds
;
5944 fpinfo
->remote_conds
= NIL
;
5947 /* Mark that this join can be pushed down safely */
5948 fpinfo
->pushdown_safe
= true;
5950 /* Get user mapping */
5951 if (fpinfo
->use_remote_estimate
)
5953 if (fpinfo_o
->use_remote_estimate
)
5954 fpinfo
->user
= fpinfo_o
->user
;
5956 fpinfo
->user
= fpinfo_i
->user
;
5959 fpinfo
->user
= NULL
;
5962 * Set # of retrieved rows and cached relation costs to some negative
5963 * value, so that we can detect when they are set to some sensible values,
5964 * during one (usually the first) of the calls to estimate_path_cost_size.
5966 fpinfo
->retrieved_rows
= -1;
5967 fpinfo
->rel_startup_cost
= -1;
5968 fpinfo
->rel_total_cost
= -1;
5971 * Set the string describing this join relation to be used in EXPLAIN
5972 * output of corresponding ForeignScan. Note that the decoration we add
5973 * to the base relation names mustn't include any digits, or it'll confuse
5974 * postgresExplainForeignScan.
5976 fpinfo
->relation_name
= psprintf("(%s) %s JOIN (%s)",
5977 fpinfo_o
->relation_name
,
5978 get_jointype_name(fpinfo
->jointype
),
5979 fpinfo_i
->relation_name
);
5982 * Set the relation index. This is defined as the position of this
5983 * joinrel in the join_rel_list list plus the length of the rtable list.
5984 * Note that since this joinrel is at the end of the join_rel_list list
5985 * when we are called, we can get the position by list_length.
5987 Assert(fpinfo
->relation_index
== 0); /* shouldn't be set yet */
5988 fpinfo
->relation_index
=
5989 list_length(root
->parse
->rtable
) + list_length(root
->join_rel_list
);
5995 add_paths_with_pathkeys_for_rel(PlannerInfo
*root
, RelOptInfo
*rel
,
5996 Path
*epq_path
, List
*restrictlist
)
5998 List
*useful_pathkeys_list
= NIL
; /* List of all pathkeys */
6001 useful_pathkeys_list
= get_useful_pathkeys_for_relation(root
, rel
);
6004 * Before creating sorted paths, arrange for the passed-in EPQ path, if
6005 * any, to return columns needed by the parent ForeignScan node so that
6006 * they will propagate up through Sort nodes injected below, if necessary.
6008 if (epq_path
!= NULL
&& useful_pathkeys_list
!= NIL
)
6010 PgFdwRelationInfo
*fpinfo
= (PgFdwRelationInfo
*) rel
->fdw_private
;
6011 PathTarget
*target
= copy_pathtarget(epq_path
->pathtarget
);
6013 /* Include columns required for evaluating PHVs in the tlist. */
6014 add_new_columns_to_pathtarget(target
,
6015 pull_var_clause((Node
*) target
->exprs
,
6016 PVC_RECURSE_PLACEHOLDERS
));
6018 /* Include columns required for evaluating the local conditions. */
6019 foreach(lc
, fpinfo
->local_conds
)
6021 RestrictInfo
*rinfo
= lfirst_node(RestrictInfo
, lc
);
6023 add_new_columns_to_pathtarget(target
,
6024 pull_var_clause((Node
*) rinfo
->clause
,
6025 PVC_RECURSE_PLACEHOLDERS
));
6029 * If we have added any new columns, adjust the tlist of the EPQ path.
6031 * Note: the plan created using this path will only be used to execute
6032 * EPQ checks, where accuracy of the plan cost and width estimates
6033 * would not be important, so we do not do set_pathtarget_cost_width()
6034 * for the new pathtarget here. See also postgresGetForeignPlan().
6036 if (list_length(target
->exprs
) > list_length(epq_path
->pathtarget
->exprs
))
6038 /* The EPQ path is a join path, so it is projection-capable. */
6039 Assert(is_projection_capable_path(epq_path
));
6042 * Use create_projection_path() here, so as to avoid modifying it
6045 epq_path
= (Path
*) create_projection_path(root
,
6052 /* Create one path for each set of pathkeys we found above. */
6053 foreach(lc
, useful_pathkeys_list
)
6059 List
*useful_pathkeys
= lfirst(lc
);
6060 Path
*sorted_epq_path
;
6062 estimate_path_cost_size(root
, rel
, NIL
, useful_pathkeys
, NULL
,
6063 &rows
, &width
, &startup_cost
, &total_cost
);
6066 * The EPQ path must be at least as well sorted as the path itself, in
6067 * case it gets used as input to a mergejoin.
6069 sorted_epq_path
= epq_path
;
6070 if (sorted_epq_path
!= NULL
&&
6071 !pathkeys_contained_in(useful_pathkeys
,
6072 sorted_epq_path
->pathkeys
))
6073 sorted_epq_path
= (Path
*)
6074 create_sort_path(root
,
6080 if (IS_SIMPLE_REL(rel
))
6081 add_path(rel
, (Path
*)
6082 create_foreignscan_path(root
, rel
,
6088 rel
->lateral_relids
,
6090 NIL
, /* no fdw_restrictinfo
6094 add_path(rel
, (Path
*)
6095 create_foreign_join_path(root
, rel
,
6101 rel
->lateral_relids
,
6109 * Parse options from foreign server and apply them to fpinfo.
6111 * New options might also require tweaking merge_fdw_options().
6114 apply_server_options(PgFdwRelationInfo
*fpinfo
)
6118 foreach(lc
, fpinfo
->server
->options
)
6120 DefElem
*def
= (DefElem
*) lfirst(lc
);
6122 if (strcmp(def
->defname
, "use_remote_estimate") == 0)
6123 fpinfo
->use_remote_estimate
= defGetBoolean(def
);
6124 else if (strcmp(def
->defname
, "fdw_startup_cost") == 0)
6125 (void) parse_real(defGetString(def
), &fpinfo
->fdw_startup_cost
, 0,
6127 else if (strcmp(def
->defname
, "fdw_tuple_cost") == 0)
6128 (void) parse_real(defGetString(def
), &fpinfo
->fdw_tuple_cost
, 0,
6130 else if (strcmp(def
->defname
, "extensions") == 0)
6131 fpinfo
->shippable_extensions
=
6132 ExtractExtensionList(defGetString(def
), false);
6133 else if (strcmp(def
->defname
, "fetch_size") == 0)
6134 (void) parse_int(defGetString(def
), &fpinfo
->fetch_size
, 0, NULL
);
6135 else if (strcmp(def
->defname
, "async_capable") == 0)
6136 fpinfo
->async_capable
= defGetBoolean(def
);
6141 * Parse options from foreign table and apply them to fpinfo.
6143 * New options might also require tweaking merge_fdw_options().
6146 apply_table_options(PgFdwRelationInfo
*fpinfo
)
6150 foreach(lc
, fpinfo
->table
->options
)
6152 DefElem
*def
= (DefElem
*) lfirst(lc
);
6154 if (strcmp(def
->defname
, "use_remote_estimate") == 0)
6155 fpinfo
->use_remote_estimate
= defGetBoolean(def
);
6156 else if (strcmp(def
->defname
, "fetch_size") == 0)
6157 (void) parse_int(defGetString(def
), &fpinfo
->fetch_size
, 0, NULL
);
6158 else if (strcmp(def
->defname
, "async_capable") == 0)
6159 fpinfo
->async_capable
= defGetBoolean(def
);
6164 * Merge FDW options from input relations into a new set of options for a join
6167 * For a join relation, FDW-specific information about the inner and outer
6168 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
6169 * fpinfo_o provides the information for the input relation; fpinfo_i is
6173 merge_fdw_options(PgFdwRelationInfo
*fpinfo
,
6174 const PgFdwRelationInfo
*fpinfo_o
,
6175 const PgFdwRelationInfo
*fpinfo_i
)
6177 /* We must always have fpinfo_o. */
6180 /* fpinfo_i may be NULL, but if present the servers must both match. */
6182 fpinfo_i
->server
->serverid
== fpinfo_o
->server
->serverid
);
6185 * Copy the server specific FDW options. (For a join, both relations come
6186 * from the same server, so the server options should have the same value
6187 * for both relations.)
6189 fpinfo
->fdw_startup_cost
= fpinfo_o
->fdw_startup_cost
;
6190 fpinfo
->fdw_tuple_cost
= fpinfo_o
->fdw_tuple_cost
;
6191 fpinfo
->shippable_extensions
= fpinfo_o
->shippable_extensions
;
6192 fpinfo
->use_remote_estimate
= fpinfo_o
->use_remote_estimate
;
6193 fpinfo
->fetch_size
= fpinfo_o
->fetch_size
;
6194 fpinfo
->async_capable
= fpinfo_o
->async_capable
;
6196 /* Merge the table level options from either side of the join. */
6200 * We'll prefer to use remote estimates for this join if any table
6201 * from either side of the join is using remote estimates. This is
6202 * most likely going to be preferred since they're already willing to
6203 * pay the price of a round trip to get the remote EXPLAIN. In any
6204 * case it's not entirely clear how we might otherwise handle this
6207 fpinfo
->use_remote_estimate
= fpinfo_o
->use_remote_estimate
||
6208 fpinfo_i
->use_remote_estimate
;
6211 * Set fetch size to maximum of the joining sides, since we are
6212 * expecting the rows returned by the join to be proportional to the
6215 fpinfo
->fetch_size
= Max(fpinfo_o
->fetch_size
, fpinfo_i
->fetch_size
);
6218 * We'll prefer to consider this join async-capable if any table from
6219 * either side of the join is considered async-capable. This would be
6220 * reasonable because in that case the foreign server would have its
6221 * own resources to scan that table asynchronously, and the join could
6222 * also be computed asynchronously using the resources.
6224 fpinfo
->async_capable
= fpinfo_o
->async_capable
||
6225 fpinfo_i
->async_capable
;
6230 * postgresGetForeignJoinPaths
6231 * Add possible ForeignPath to joinrel, if join is safe to push down.
6234 postgresGetForeignJoinPaths(PlannerInfo
*root
,
6235 RelOptInfo
*joinrel
,
6236 RelOptInfo
*outerrel
,
6237 RelOptInfo
*innerrel
,
6239 JoinPathExtraData
*extra
)
6241 PgFdwRelationInfo
*fpinfo
;
6242 ForeignPath
*joinpath
;
6247 Path
*epq_path
; /* Path to create plan to be executed when
6248 * EvalPlanQual gets triggered. */
6251 * Skip if this join combination has been considered already.
6253 if (joinrel
->fdw_private
)
6257 * This code does not work for joins with lateral references, since those
6258 * must have parameterized paths, which we don't generate yet.
6260 if (!bms_is_empty(joinrel
->lateral_relids
))
6264 * Create unfinished PgFdwRelationInfo entry which is used to indicate
6265 * that the join relation is already considered, so that we won't waste
6266 * time in judging safety of join pushdown and adding the same paths again
6267 * if found safe. Once we know that this join can be pushed down, we fill
6270 fpinfo
= (PgFdwRelationInfo
*) palloc0(sizeof(PgFdwRelationInfo
));
6271 fpinfo
->pushdown_safe
= false;
6272 joinrel
->fdw_private
= fpinfo
;
6273 /* attrs_used is only for base relations. */
6274 fpinfo
->attrs_used
= NULL
;
6277 * If there is a possibility that EvalPlanQual will be executed, we need
6278 * to be able to reconstruct the row using scans of the base relations.
6279 * GetExistingLocalJoinPath will find a suitable path for this purpose in
6280 * the path list of the joinrel, if one exists. We must be careful to
6281 * call it before adding any ForeignPath, since the ForeignPath might
6282 * dominate the only suitable local path available. We also do it before
6283 * calling foreign_join_ok(), since that function updates fpinfo and marks
6284 * it as pushable if the join is found to be pushable.
6286 if (root
->parse
->commandType
== CMD_DELETE
||
6287 root
->parse
->commandType
== CMD_UPDATE
||
6290 epq_path
= GetExistingLocalJoinPath(joinrel
);
6293 elog(DEBUG3
, "could not push down foreign join because a local path suitable for EPQ checks was not found");
6300 if (!foreign_join_ok(root
, joinrel
, jointype
, outerrel
, innerrel
, extra
))
6302 /* Free path required for EPQ if we copied one; we don't need it now */
6309 * Compute the selectivity and cost of the local_conds, so we don't have
6310 * to do it over again for each path. The best we can do for these
6311 * conditions is to estimate selectivity on the basis of local statistics.
6312 * The local conditions are applied after the join has been computed on
6313 * the remote side like quals in WHERE clause, so pass jointype as
6316 fpinfo
->local_conds_sel
= clauselist_selectivity(root
,
6317 fpinfo
->local_conds
,
6321 cost_qual_eval(&fpinfo
->local_conds_cost
, fpinfo
->local_conds
, root
);
6324 * If we are going to estimate costs locally, estimate the join clause
6325 * selectivity here while we have special join info.
6327 if (!fpinfo
->use_remote_estimate
)
6328 fpinfo
->joinclause_sel
= clauselist_selectivity(root
, fpinfo
->joinclauses
,
6329 0, fpinfo
->jointype
,
6332 /* Estimate costs for bare join relation */
6333 estimate_path_cost_size(root
, joinrel
, NIL
, NIL
, NULL
,
6334 &rows
, &width
, &startup_cost
, &total_cost
);
6335 /* Now update this information in the joinrel */
6336 joinrel
->rows
= rows
;
6337 joinrel
->reltarget
->width
= width
;
6338 fpinfo
->rows
= rows
;
6339 fpinfo
->width
= width
;
6340 fpinfo
->startup_cost
= startup_cost
;
6341 fpinfo
->total_cost
= total_cost
;
6344 * Create a new join path and add it to the joinrel which represents a
6345 * join between foreign tables.
6347 joinpath
= create_foreign_join_path(root
,
6349 NULL
, /* default pathtarget */
6353 NIL
, /* no pathkeys */
6354 joinrel
->lateral_relids
,
6356 extra
->restrictlist
,
6357 NIL
); /* no fdw_private */
6359 /* Add generated path into joinrel by add_path(). */
6360 add_path(joinrel
, (Path
*) joinpath
);
6362 /* Consider pathkeys for the join relation */
6363 add_paths_with_pathkeys_for_rel(root
, joinrel
, epq_path
,
6364 extra
->restrictlist
);
6366 /* XXX Consider parameterized paths for the join relation */
6370 * Assess whether the aggregation, grouping and having operations can be pushed
6371 * down to the foreign server. As a side effect, save information we obtain in
6372 * this function to PgFdwRelationInfo of the input relation.
6375 foreign_grouping_ok(PlannerInfo
*root
, RelOptInfo
*grouped_rel
,
6378 Query
*query
= root
->parse
;
6379 PgFdwRelationInfo
*fpinfo
= (PgFdwRelationInfo
*) grouped_rel
->fdw_private
;
6380 PathTarget
*grouping_target
= grouped_rel
->reltarget
;
6381 PgFdwRelationInfo
*ofpinfo
;
6386 /* We currently don't support pushing Grouping Sets. */
6387 if (query
->groupingSets
)
6390 /* Get the fpinfo of the underlying scan relation. */
6391 ofpinfo
= (PgFdwRelationInfo
*) fpinfo
->outerrel
->fdw_private
;
6394 * If underlying scan relation has any local conditions, those conditions
6395 * are required to be applied before performing aggregation. Hence the
6396 * aggregate cannot be pushed down.
6398 if (ofpinfo
->local_conds
)
6402 * Examine grouping expressions, as well as other expressions we'd need to
6403 * compute, and check whether they are safe to push down to the foreign
6404 * server. All GROUP BY expressions will be part of the grouping target
6405 * and thus there is no need to search for them separately. Add grouping
6406 * expressions into target list which will be passed to foreign server.
6408 * A tricky fine point is that we must not put any expression into the
6409 * target list that is just a foreign param (that is, something that
6410 * deparse.c would conclude has to be sent to the foreign server). If we
6411 * do, the expression will also appear in the fdw_exprs list of the plan
6412 * node, and setrefs.c will get confused and decide that the fdw_exprs
6413 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
6414 * a broken plan. Somewhat oddly, it's OK if the expression contains such
6415 * a node, as long as it's not at top level; then no match is possible.
6418 foreach(lc
, grouping_target
->exprs
)
6420 Expr
*expr
= (Expr
*) lfirst(lc
);
6421 Index sgref
= get_pathtarget_sortgroupref(grouping_target
, i
);
6425 * Check whether this expression is part of GROUP BY clause. Note we
6426 * check the whole GROUP BY clause not just processed_groupClause,
6427 * because we will ship all of it, cf. appendGroupByClause.
6429 if (sgref
&& get_sortgroupref_clause_noerr(sgref
, query
->groupClause
))
6434 * If any GROUP BY expression is not shippable, then we cannot
6435 * push down aggregation to the foreign server.
6437 if (!is_foreign_expr(root
, grouped_rel
, expr
))
6441 * If it would be a foreign param, we can't put it into the tlist,
6442 * so we have to fail.
6444 if (is_foreign_param(root
, grouped_rel
, expr
))
6448 * Pushable, so add to tlist. We need to create a TLE for this
6449 * expression and apply the sortgroupref to it. We cannot use
6450 * add_to_flat_tlist() here because that avoids making duplicate
6451 * entries in the tlist. If there are duplicate entries with
6452 * distinct sortgrouprefs, we have to duplicate that situation in
6455 tle
= makeTargetEntry(expr
, list_length(tlist
) + 1, NULL
, false);
6456 tle
->ressortgroupref
= sgref
;
6457 tlist
= lappend(tlist
, tle
);
6462 * Non-grouping expression we need to compute. Can we ship it
6463 * as-is to the foreign server?
6465 if (is_foreign_expr(root
, grouped_rel
, expr
) &&
6466 !is_foreign_param(root
, grouped_rel
, expr
))
6468 /* Yes, so add to tlist as-is; OK to suppress duplicates */
6469 tlist
= add_to_flat_tlist(tlist
, list_make1(expr
));
6473 /* Not pushable as a whole; extract its Vars and aggregates */
6476 aggvars
= pull_var_clause((Node
*) expr
,
6477 PVC_INCLUDE_AGGREGATES
);
6480 * If any aggregate expression is not shippable, then we
6481 * cannot push down aggregation to the foreign server. (We
6482 * don't have to check is_foreign_param, since that certainly
6483 * won't return true for any such expression.)
6485 if (!is_foreign_expr(root
, grouped_rel
, (Expr
*) aggvars
))
6489 * Add aggregates, if any, into the targetlist. Plain Vars
6490 * outside an aggregate can be ignored, because they should be
6491 * either same as some GROUP BY column or part of some GROUP
6492 * BY expression. In either case, they are already part of
6493 * the targetlist and thus no need to add them again. In fact
6494 * including plain Vars in the tlist when they do not match a
6495 * GROUP BY column would cause the foreign server to complain
6496 * that the shipped query is invalid.
6500 Expr
*aggref
= (Expr
*) lfirst(l
);
6502 if (IsA(aggref
, Aggref
))
6503 tlist
= add_to_flat_tlist(tlist
, list_make1(aggref
));
6512 * Classify the pushable and non-pushable HAVING clauses and save them in
6513 * remote_conds and local_conds of the grouped rel's fpinfo.
6517 foreach(lc
, (List
*) havingQual
)
6519 Expr
*expr
= (Expr
*) lfirst(lc
);
6520 RestrictInfo
*rinfo
;
6523 * Currently, the core code doesn't wrap havingQuals in
6524 * RestrictInfos, so we must make our own.
6526 Assert(!IsA(expr
, RestrictInfo
));
6527 rinfo
= make_restrictinfo(root
,
6533 root
->qual_security_level
,
6534 grouped_rel
->relids
,
6537 if (is_foreign_expr(root
, grouped_rel
, expr
))
6538 fpinfo
->remote_conds
= lappend(fpinfo
->remote_conds
, rinfo
);
6540 fpinfo
->local_conds
= lappend(fpinfo
->local_conds
, rinfo
);
6545 * If there are any local conditions, pull Vars and aggregates from it and
6546 * check whether they are safe to pushdown or not.
6548 if (fpinfo
->local_conds
)
6550 List
*aggvars
= NIL
;
6552 foreach(lc
, fpinfo
->local_conds
)
6554 RestrictInfo
*rinfo
= lfirst_node(RestrictInfo
, lc
);
6556 aggvars
= list_concat(aggvars
,
6557 pull_var_clause((Node
*) rinfo
->clause
,
6558 PVC_INCLUDE_AGGREGATES
));
6561 foreach(lc
, aggvars
)
6563 Expr
*expr
= (Expr
*) lfirst(lc
);
6566 * If aggregates within local conditions are not safe to push
6567 * down, then we cannot push down the query. Vars are already
6568 * part of GROUP BY clause which are checked above, so no need to
6569 * access them again here. Again, we need not check
6570 * is_foreign_param for a foreign aggregate.
6572 if (IsA(expr
, Aggref
))
6574 if (!is_foreign_expr(root
, grouped_rel
, expr
))
6577 tlist
= add_to_flat_tlist(tlist
, list_make1(expr
));
6582 /* Store generated targetlist */
6583 fpinfo
->grouped_tlist
= tlist
;
6585 /* Safe to pushdown */
6586 fpinfo
->pushdown_safe
= true;
6589 * Set # of retrieved rows and cached relation costs to some negative
6590 * value, so that we can detect when they are set to some sensible values,
6591 * during one (usually the first) of the calls to estimate_path_cost_size.
6593 fpinfo
->retrieved_rows
= -1;
6594 fpinfo
->rel_startup_cost
= -1;
6595 fpinfo
->rel_total_cost
= -1;
6598 * Set the string describing this grouped relation to be used in EXPLAIN
6599 * output of corresponding ForeignScan. Note that the decoration we add
6600 * to the base relation name mustn't include any digits, or it'll confuse
6601 * postgresExplainForeignScan.
6603 fpinfo
->relation_name
= psprintf("Aggregate on (%s)",
6604 ofpinfo
->relation_name
);
6610 * postgresGetForeignUpperPaths
6611 * Add paths for post-join operations like aggregation, grouping etc. if
6612 * corresponding operations are safe to push down.
6615 postgresGetForeignUpperPaths(PlannerInfo
*root
, UpperRelationKind stage
,
6616 RelOptInfo
*input_rel
, RelOptInfo
*output_rel
,
6619 PgFdwRelationInfo
*fpinfo
;
6622 * If input rel is not safe to pushdown, then simply return as we cannot
6623 * perform any post-join operations on the foreign server.
6625 if (!input_rel
->fdw_private
||
6626 !((PgFdwRelationInfo
*) input_rel
->fdw_private
)->pushdown_safe
)
6629 /* Ignore stages we don't support; and skip any duplicate calls. */
6630 if ((stage
!= UPPERREL_GROUP_AGG
&&
6631 stage
!= UPPERREL_ORDERED
&&
6632 stage
!= UPPERREL_FINAL
) ||
6633 output_rel
->fdw_private
)
6636 fpinfo
= (PgFdwRelationInfo
*) palloc0(sizeof(PgFdwRelationInfo
));
6637 fpinfo
->pushdown_safe
= false;
6638 fpinfo
->stage
= stage
;
6639 output_rel
->fdw_private
= fpinfo
;
6643 case UPPERREL_GROUP_AGG
:
6644 add_foreign_grouping_paths(root
, input_rel
, output_rel
,
6645 (GroupPathExtraData
*) extra
);
6647 case UPPERREL_ORDERED
:
6648 add_foreign_ordered_paths(root
, input_rel
, output_rel
);
6650 case UPPERREL_FINAL
:
6651 add_foreign_final_paths(root
, input_rel
, output_rel
,
6652 (FinalPathExtraData
*) extra
);
6655 elog(ERROR
, "unexpected upper relation: %d", (int) stage
);
6661 * add_foreign_grouping_paths
6662 * Add foreign path for grouping and/or aggregation.
6664 * Given input_rel represents the underlying scan. The paths are added to the
6665 * given grouped_rel.
6668 add_foreign_grouping_paths(PlannerInfo
*root
, RelOptInfo
*input_rel
,
6669 RelOptInfo
*grouped_rel
,
6670 GroupPathExtraData
*extra
)
6672 Query
*parse
= root
->parse
;
6673 PgFdwRelationInfo
*ifpinfo
= input_rel
->fdw_private
;
6674 PgFdwRelationInfo
*fpinfo
= grouped_rel
->fdw_private
;
6675 ForeignPath
*grouppath
;
6681 /* Nothing to be done, if there is no grouping or aggregation required. */
6682 if (!parse
->groupClause
&& !parse
->groupingSets
&& !parse
->hasAggs
&&
6683 !root
->hasHavingQual
)
6686 Assert(extra
->patype
== PARTITIONWISE_AGGREGATE_NONE
||
6687 extra
->patype
== PARTITIONWISE_AGGREGATE_FULL
);
6689 /* save the input_rel as outerrel in fpinfo */
6690 fpinfo
->outerrel
= input_rel
;
6693 * Copy foreign table, foreign server, user mapping, FDW options etc.
6694 * details from the input relation's fpinfo.
6696 fpinfo
->table
= ifpinfo
->table
;
6697 fpinfo
->server
= ifpinfo
->server
;
6698 fpinfo
->user
= ifpinfo
->user
;
6699 merge_fdw_options(fpinfo
, ifpinfo
, NULL
);
6702 * Assess if it is safe to push down aggregation and grouping.
6704 * Use HAVING qual from extra. In case of child partition, it will have
6707 if (!foreign_grouping_ok(root
, grouped_rel
, extra
->havingQual
))
6711 * Compute the selectivity and cost of the local_conds, so we don't have
6712 * to do it over again for each path. (Currently we create just a single
6713 * path here, but in future it would be possible that we build more paths
6714 * such as pre-sorted paths as in postgresGetForeignPaths and
6715 * postgresGetForeignJoinPaths.) The best we can do for these conditions
6716 * is to estimate selectivity on the basis of local statistics.
6718 fpinfo
->local_conds_sel
= clauselist_selectivity(root
,
6719 fpinfo
->local_conds
,
6724 cost_qual_eval(&fpinfo
->local_conds_cost
, fpinfo
->local_conds
, root
);
6726 /* Estimate the cost of push down */
6727 estimate_path_cost_size(root
, grouped_rel
, NIL
, NIL
, NULL
,
6728 &rows
, &width
, &startup_cost
, &total_cost
);
6730 /* Now update this information in the fpinfo */
6731 fpinfo
->rows
= rows
;
6732 fpinfo
->width
= width
;
6733 fpinfo
->startup_cost
= startup_cost
;
6734 fpinfo
->total_cost
= total_cost
;
6736 /* Create and add foreign path to the grouping relation. */
6737 grouppath
= create_foreign_upper_path(root
,
6739 grouped_rel
->reltarget
,
6743 NIL
, /* no pathkeys */
6745 NIL
, /* no fdw_restrictinfo list */
6746 NIL
); /* no fdw_private */
6748 /* Add generated path into grouped_rel by add_path(). */
6749 add_path(grouped_rel
, (Path
*) grouppath
);
6753 * add_foreign_ordered_paths
6754 * Add foreign paths for performing the final sort remotely.
6756 * Given input_rel contains the source-data Paths. The paths are added to the
6757 * given ordered_rel.
6760 add_foreign_ordered_paths(PlannerInfo
*root
, RelOptInfo
*input_rel
,
6761 RelOptInfo
*ordered_rel
)
6763 Query
*parse
= root
->parse
;
6764 PgFdwRelationInfo
*ifpinfo
= input_rel
->fdw_private
;
6765 PgFdwRelationInfo
*fpinfo
= ordered_rel
->fdw_private
;
6766 PgFdwPathExtraData
*fpextra
;
6772 ForeignPath
*ordered_path
;
6775 /* Shouldn't get here unless the query has ORDER BY */
6776 Assert(parse
->sortClause
);
6778 /* We don't support cases where there are any SRFs in the targetlist */
6779 if (parse
->hasTargetSRFs
)
6782 /* Save the input_rel as outerrel in fpinfo */
6783 fpinfo
->outerrel
= input_rel
;
6786 * Copy foreign table, foreign server, user mapping, FDW options etc.
6787 * details from the input relation's fpinfo.
6789 fpinfo
->table
= ifpinfo
->table
;
6790 fpinfo
->server
= ifpinfo
->server
;
6791 fpinfo
->user
= ifpinfo
->user
;
6792 merge_fdw_options(fpinfo
, ifpinfo
, NULL
);
6795 * If the input_rel is a base or join relation, we would already have
6796 * considered pushing down the final sort to the remote server when
6797 * creating pre-sorted foreign paths for that relation, because the
6798 * query_pathkeys is set to the root->sort_pathkeys in that case (see
6799 * standard_qp_callback()).
6801 if (input_rel
->reloptkind
== RELOPT_BASEREL
||
6802 input_rel
->reloptkind
== RELOPT_JOINREL
)
6804 Assert(root
->query_pathkeys
== root
->sort_pathkeys
);
6806 /* Safe to push down if the query_pathkeys is safe to push down */
6807 fpinfo
->pushdown_safe
= ifpinfo
->qp_is_pushdown_safe
;
6812 /* The input_rel should be a grouping relation */
6813 Assert(input_rel
->reloptkind
== RELOPT_UPPER_REL
&&
6814 ifpinfo
->stage
== UPPERREL_GROUP_AGG
);
6817 * We try to create a path below by extending a simple foreign path for
6818 * the underlying grouping relation to perform the final sort remotely,
6819 * which is stored into the fdw_private list of the resulting path.
6822 /* Assess if it is safe to push down the final sort */
6823 foreach(lc
, root
->sort_pathkeys
)
6825 PathKey
*pathkey
= (PathKey
*) lfirst(lc
);
6826 EquivalenceClass
*pathkey_ec
= pathkey
->pk_eclass
;
6829 * is_foreign_expr would detect volatile expressions as well, but
6830 * checking ec_has_volatile here saves some cycles.
6832 if (pathkey_ec
->ec_has_volatile
)
6836 * Can't push down the sort if pathkey's opfamily is not shippable.
6838 if (!is_shippable(pathkey
->pk_opfamily
, OperatorFamilyRelationId
,
6843 * The EC must contain a shippable EM that is computed in input_rel's
6844 * reltarget, else we can't push down the sort.
6846 if (find_em_for_rel_target(root
,
6852 /* Safe to push down */
6853 fpinfo
->pushdown_safe
= true;
6855 /* Construct PgFdwPathExtraData */
6856 fpextra
= (PgFdwPathExtraData
*) palloc0(sizeof(PgFdwPathExtraData
));
6857 fpextra
->target
= root
->upper_targets
[UPPERREL_ORDERED
];
6858 fpextra
->has_final_sort
= true;
6860 /* Estimate the costs of performing the final sort remotely */
6861 estimate_path_cost_size(root
, input_rel
, NIL
, root
->sort_pathkeys
, fpextra
,
6862 &rows
, &width
, &startup_cost
, &total_cost
);
6865 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6866 * Items in the list must match order in enum FdwPathPrivateIndex.
6868 fdw_private
= list_make2(makeBoolean(true), makeBoolean(false));
6870 /* Create foreign ordering path */
6871 ordered_path
= create_foreign_upper_path(root
,
6873 root
->upper_targets
[UPPERREL_ORDERED
],
6877 root
->sort_pathkeys
,
6878 NULL
, /* no extra plan */
6879 NIL
, /* no fdw_restrictinfo
6883 /* and add it to the ordered_rel */
6884 add_path(ordered_rel
, (Path
*) ordered_path
);
6888 * add_foreign_final_paths
6889 * Add foreign paths for performing the final processing remotely.
6891 * Given input_rel contains the source-data Paths. The paths are added to the
6895 add_foreign_final_paths(PlannerInfo
*root
, RelOptInfo
*input_rel
,
6896 RelOptInfo
*final_rel
,
6897 FinalPathExtraData
*extra
)
6899 Query
*parse
= root
->parse
;
6900 PgFdwRelationInfo
*ifpinfo
= (PgFdwRelationInfo
*) input_rel
->fdw_private
;
6901 PgFdwRelationInfo
*fpinfo
= (PgFdwRelationInfo
*) final_rel
->fdw_private
;
6902 bool has_final_sort
= false;
6903 List
*pathkeys
= NIL
;
6904 PgFdwPathExtraData
*fpextra
;
6905 bool save_use_remote_estimate
= false;
6911 ForeignPath
*final_path
;
6914 * Currently, we only support this for SELECT commands
6916 if (parse
->commandType
!= CMD_SELECT
)
6920 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
6921 * to add a LIMIT node
6923 if (!parse
->rowMarks
&& !extra
->limit_needed
)
6926 /* We don't support cases where there are any SRFs in the targetlist */
6927 if (parse
->hasTargetSRFs
)
6930 /* Save the input_rel as outerrel in fpinfo */
6931 fpinfo
->outerrel
= input_rel
;
6934 * Copy foreign table, foreign server, user mapping, FDW options etc.
6935 * details from the input relation's fpinfo.
6937 fpinfo
->table
= ifpinfo
->table
;
6938 fpinfo
->server
= ifpinfo
->server
;
6939 fpinfo
->user
= ifpinfo
->user
;
6940 merge_fdw_options(fpinfo
, ifpinfo
, NULL
);
6943 * If there is no need to add a LIMIT node, there might be a ForeignPath
6944 * in the input_rel's pathlist that implements all behavior of the query.
6945 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
6946 * (if any) before we get here.
6948 if (!extra
->limit_needed
)
6952 Assert(parse
->rowMarks
);
6955 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
6956 * so the input_rel should be a base, join, or ordered relation; and
6957 * if it's an ordered relation, its input relation should be a base or
6960 Assert(input_rel
->reloptkind
== RELOPT_BASEREL
||
6961 input_rel
->reloptkind
== RELOPT_JOINREL
||
6962 (input_rel
->reloptkind
== RELOPT_UPPER_REL
&&
6963 ifpinfo
->stage
== UPPERREL_ORDERED
&&
6964 (ifpinfo
->outerrel
->reloptkind
== RELOPT_BASEREL
||
6965 ifpinfo
->outerrel
->reloptkind
== RELOPT_JOINREL
)));
6967 foreach(lc
, input_rel
->pathlist
)
6969 Path
*path
= (Path
*) lfirst(lc
);
6972 * apply_scanjoin_target_to_paths() uses create_projection_path()
6973 * to adjust each of its input paths if needed, whereas
6974 * create_ordered_paths() uses apply_projection_to_path() to do
6975 * that. So the former might have put a ProjectionPath on top of
6976 * the ForeignPath; look through ProjectionPath and see if the
6977 * path underneath it is ForeignPath.
6979 if (IsA(path
, ForeignPath
) ||
6980 (IsA(path
, ProjectionPath
) &&
6981 IsA(((ProjectionPath
*) path
)->subpath
, ForeignPath
)))
6984 * Create foreign final path; this gets rid of a
6985 * no-longer-needed outer plan (if any), which makes the
6986 * EXPLAIN output look cleaner
6988 final_path
= create_foreign_upper_path(root
,
6995 NULL
, /* no extra plan */
6996 NIL
, /* no fdw_restrictinfo
6998 NIL
); /* no fdw_private */
7000 /* and add it to the final_rel */
7001 add_path(final_rel
, (Path
*) final_path
);
7003 /* Safe to push down */
7004 fpinfo
->pushdown_safe
= true;
7011 * If we get here it means no ForeignPaths; since we would already
7012 * have considered pushing down all operations for the query to the
7013 * remote server, give up on it.
7018 Assert(extra
->limit_needed
);
7021 * If the input_rel is an ordered relation, replace the input_rel with its
7024 if (input_rel
->reloptkind
== RELOPT_UPPER_REL
&&
7025 ifpinfo
->stage
== UPPERREL_ORDERED
)
7027 input_rel
= ifpinfo
->outerrel
;
7028 ifpinfo
= (PgFdwRelationInfo
*) input_rel
->fdw_private
;
7029 has_final_sort
= true;
7030 pathkeys
= root
->sort_pathkeys
;
7033 /* The input_rel should be a base, join, or grouping relation */
7034 Assert(input_rel
->reloptkind
== RELOPT_BASEREL
||
7035 input_rel
->reloptkind
== RELOPT_JOINREL
||
7036 (input_rel
->reloptkind
== RELOPT_UPPER_REL
&&
7037 ifpinfo
->stage
== UPPERREL_GROUP_AGG
));
7040 * We try to create a path below by extending a simple foreign path for
7041 * the underlying base, join, or grouping relation to perform the final
7042 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
7043 * stored into the fdw_private list of the resulting path. (We
7044 * re-estimate the costs of sorting the underlying relation, if
7049 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
7054 * If the underlying relation has any local conditions, the LIMIT/OFFSET
7055 * cannot be pushed down.
7057 if (ifpinfo
->local_conds
)
7061 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
7062 * not safe to remote.
7064 if (!is_foreign_expr(root
, input_rel
, (Expr
*) parse
->limitOffset
) ||
7065 !is_foreign_expr(root
, input_rel
, (Expr
*) parse
->limitCount
))
7068 /* Safe to push down */
7069 fpinfo
->pushdown_safe
= true;
7071 /* Construct PgFdwPathExtraData */
7072 fpextra
= (PgFdwPathExtraData
*) palloc0(sizeof(PgFdwPathExtraData
));
7073 fpextra
->target
= root
->upper_targets
[UPPERREL_FINAL
];
7074 fpextra
->has_final_sort
= has_final_sort
;
7075 fpextra
->has_limit
= extra
->limit_needed
;
7076 fpextra
->limit_tuples
= extra
->limit_tuples
;
7077 fpextra
->count_est
= extra
->count_est
;
7078 fpextra
->offset_est
= extra
->offset_est
;
7081 * Estimate the costs of performing the final sort and the LIMIT
7082 * restriction remotely. If has_final_sort is false, we wouldn't need to
7083 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
7084 * roughly estimated using the costs we already have for the underlying
7085 * relation, in the same way as when use_remote_estimate is false. Since
7086 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
7087 * false in that case.
7089 if (!fpextra
->has_final_sort
)
7091 save_use_remote_estimate
= ifpinfo
->use_remote_estimate
;
7092 ifpinfo
->use_remote_estimate
= false;
7094 estimate_path_cost_size(root
, input_rel
, NIL
, pathkeys
, fpextra
,
7095 &rows
, &width
, &startup_cost
, &total_cost
);
7096 if (!fpextra
->has_final_sort
)
7097 ifpinfo
->use_remote_estimate
= save_use_remote_estimate
;
7100 * Build the fdw_private list that will be used by postgresGetForeignPlan.
7101 * Items in the list must match order in enum FdwPathPrivateIndex.
7103 fdw_private
= list_make2(makeBoolean(has_final_sort
),
7104 makeBoolean(extra
->limit_needed
));
7107 * Create foreign final path; this gets rid of a no-longer-needed outer
7108 * plan (if any), which makes the EXPLAIN output look cleaner
7110 final_path
= create_foreign_upper_path(root
,
7112 root
->upper_targets
[UPPERREL_FINAL
],
7117 NULL
, /* no extra plan */
7118 NIL
, /* no fdw_restrictinfo list */
7121 /* and add it to the final_rel */
7122 add_path(final_rel
, (Path
*) final_path
);
7126 * postgresIsForeignPathAsyncCapable
7127 * Check whether a given ForeignPath node is async-capable.
7130 postgresIsForeignPathAsyncCapable(ForeignPath
*path
)
7132 RelOptInfo
*rel
= ((Path
*) path
)->parent
;
7133 PgFdwRelationInfo
*fpinfo
= (PgFdwRelationInfo
*) rel
->fdw_private
;
7135 return fpinfo
->async_capable
;
7139 * postgresForeignAsyncRequest
7140 * Asynchronously request next tuple from a foreign PostgreSQL table.
7143 postgresForeignAsyncRequest(AsyncRequest
*areq
)
7145 produce_tuple_asynchronously(areq
, true);
7149 * postgresForeignAsyncConfigureWait
7150 * Configure a file descriptor event for which we wish to wait.
7153 postgresForeignAsyncConfigureWait(AsyncRequest
*areq
)
7155 ForeignScanState
*node
= (ForeignScanState
*) areq
->requestee
;
7156 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
7157 AsyncRequest
*pendingAreq
= fsstate
->conn_state
->pendingAreq
;
7158 AppendState
*requestor
= (AppendState
*) areq
->requestor
;
7159 WaitEventSet
*set
= requestor
->as_eventset
;
7161 /* This should not be called unless callback_pending */
7162 Assert(areq
->callback_pending
);
7165 * If process_pending_request() has been invoked on the given request
7166 * before we get here, we might have some tuples already; in which case
7167 * complete the request
7169 if (fsstate
->next_tuple
< fsstate
->num_tuples
)
7171 complete_pending_request(areq
);
7172 if (areq
->request_complete
)
7174 Assert(areq
->callback_pending
);
7177 /* We must have run out of tuples */
7178 Assert(fsstate
->next_tuple
>= fsstate
->num_tuples
);
7180 /* The core code would have registered postmaster death event */
7181 Assert(GetNumRegisteredWaitEvents(set
) >= 1);
7183 /* Begin an asynchronous data fetch if not already done */
7185 fetch_more_data_begin(areq
);
7186 else if (pendingAreq
->requestor
!= areq
->requestor
)
7189 * This is the case when the in-process request was made by another
7190 * Append. Note that it might be useless to process the request,
7191 * because the query might not need tuples from that Append anymore.
7192 * If there are any child subplans of the same parent that are ready
7193 * for new requests, skip the given request. Likewise, if there are
7194 * any configured events other than the postmaster death event, skip
7195 * it. Otherwise, process the in-process request, then begin a fetch
7196 * to configure the event below, because we might otherwise end up
7197 * with no configured events other than the postmaster death event.
7199 if (!bms_is_empty(requestor
->as_needrequest
))
7201 if (GetNumRegisteredWaitEvents(set
) > 1)
7203 process_pending_request(pendingAreq
);
7204 fetch_more_data_begin(areq
);
7206 else if (pendingAreq
->requestee
!= areq
->requestee
)
7209 * This is the case when the in-process request was made by the same
7210 * parent but for a different child. Since we configure only the
7211 * event for the request made for that child, skip the given request.
7216 Assert(pendingAreq
== areq
);
7218 AddWaitEventToSet(set
, WL_SOCKET_READABLE
, PQsocket(fsstate
->conn
),
7223 * postgresForeignAsyncNotify
7224 * Fetch some more tuples from a file descriptor that becomes ready,
7225 * requesting next tuple.
7228 postgresForeignAsyncNotify(AsyncRequest
*areq
)
7230 ForeignScanState
*node
= (ForeignScanState
*) areq
->requestee
;
7231 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
7233 /* The core code would have initialized the callback_pending flag */
7234 Assert(!areq
->callback_pending
);
7237 * If process_pending_request() has been invoked on the given request
7238 * before we get here, we might have some tuples already; in which case
7239 * produce the next tuple
7241 if (fsstate
->next_tuple
< fsstate
->num_tuples
)
7243 produce_tuple_asynchronously(areq
, true);
7247 /* We must have run out of tuples */
7248 Assert(fsstate
->next_tuple
>= fsstate
->num_tuples
);
7250 /* The request should be currently in-process */
7251 Assert(fsstate
->conn_state
->pendingAreq
== areq
);
7253 /* On error, report the original query, not the FETCH. */
7254 if (!PQconsumeInput(fsstate
->conn
))
7255 pgfdw_report_error(ERROR
, NULL
, fsstate
->conn
, false, fsstate
->query
);
7257 fetch_more_data(node
);
7259 produce_tuple_asynchronously(areq
, true);
7263 * Asynchronously produce next tuple from a foreign PostgreSQL table.
7266 produce_tuple_asynchronously(AsyncRequest
*areq
, bool fetch
)
7268 ForeignScanState
*node
= (ForeignScanState
*) areq
->requestee
;
7269 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
7270 AsyncRequest
*pendingAreq
= fsstate
->conn_state
->pendingAreq
;
7271 TupleTableSlot
*result
;
7273 /* This should not be called if the request is currently in-process */
7274 Assert(areq
!= pendingAreq
);
7276 /* Fetch some more tuples, if we've run out */
7277 if (fsstate
->next_tuple
>= fsstate
->num_tuples
)
7279 /* No point in another fetch if we already detected EOF, though */
7280 if (!fsstate
->eof_reached
)
7282 /* Mark the request as pending for a callback */
7283 ExecAsyncRequestPending(areq
);
7284 /* Begin another fetch if requested and if no pending request */
7285 if (fetch
&& !pendingAreq
)
7286 fetch_more_data_begin(areq
);
7290 /* There's nothing more to do; just return a NULL pointer */
7292 /* Mark the request as complete */
7293 ExecAsyncRequestDone(areq
, result
);
7298 /* Get a tuple from the ForeignScan node */
7299 result
= areq
->requestee
->ExecProcNodeReal(areq
->requestee
);
7300 if (!TupIsNull(result
))
7302 /* Mark the request as complete */
7303 ExecAsyncRequestDone(areq
, result
);
7307 /* We must have run out of tuples */
7308 Assert(fsstate
->next_tuple
>= fsstate
->num_tuples
);
7310 /* Fetch some more tuples, if we've not detected EOF yet */
7311 if (!fsstate
->eof_reached
)
7313 /* Mark the request as pending for a callback */
7314 ExecAsyncRequestPending(areq
);
7315 /* Begin another fetch if requested and if no pending request */
7316 if (fetch
&& !pendingAreq
)
7317 fetch_more_data_begin(areq
);
7321 /* There's nothing more to do; just return a NULL pointer */
7323 /* Mark the request as complete */
7324 ExecAsyncRequestDone(areq
, result
);
7329 * Begin an asynchronous data fetch.
7331 * Note: this function assumes there is no currently-in-progress asynchronous
7334 * Note: fetch_more_data must be called to fetch the result.
7337 fetch_more_data_begin(AsyncRequest
*areq
)
7339 ForeignScanState
*node
= (ForeignScanState
*) areq
->requestee
;
7340 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
7343 Assert(!fsstate
->conn_state
->pendingAreq
);
7345 /* Create the cursor synchronously. */
7346 if (!fsstate
->cursor_exists
)
7347 create_cursor(node
);
7349 /* We will send this query, but not wait for the response. */
7350 snprintf(sql
, sizeof(sql
), "FETCH %d FROM c%u",
7351 fsstate
->fetch_size
, fsstate
->cursor_number
);
7353 if (!PQsendQuery(fsstate
->conn
, sql
))
7354 pgfdw_report_error(ERROR
, NULL
, fsstate
->conn
, false, fsstate
->query
);
7356 /* Remember that the request is in process */
7357 fsstate
->conn_state
->pendingAreq
= areq
;
7361 * Process a pending asynchronous request.
7364 process_pending_request(AsyncRequest
*areq
)
7366 ForeignScanState
*node
= (ForeignScanState
*) areq
->requestee
;
7367 PgFdwScanState
*fsstate
= (PgFdwScanState
*) node
->fdw_state
;
7369 /* The request would have been pending for a callback */
7370 Assert(areq
->callback_pending
);
7372 /* The request should be currently in-process */
7373 Assert(fsstate
->conn_state
->pendingAreq
== areq
);
7375 fetch_more_data(node
);
7378 * If we didn't get any tuples, must be end of data; complete the request
7379 * now. Otherwise, we postpone completing the request until we are called
7380 * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify().
7382 if (fsstate
->next_tuple
>= fsstate
->num_tuples
)
7384 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7385 areq
->callback_pending
= false;
7386 /* Mark the request as complete */
7387 ExecAsyncRequestDone(areq
, NULL
);
7388 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7389 ExecAsyncResponse(areq
);
7394 * Complete a pending asynchronous request.
7397 complete_pending_request(AsyncRequest
*areq
)
7399 /* The request would have been pending for a callback */
7400 Assert(areq
->callback_pending
);
7402 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7403 areq
->callback_pending
= false;
7405 /* We begin a fetch afterwards if necessary; don't fetch */
7406 produce_tuple_asynchronously(areq
, false);
7408 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7409 ExecAsyncResponse(areq
);
7411 /* Also, we do instrumentation ourselves, if required */
7412 if (areq
->requestee
->instrument
)
7413 InstrUpdateTupleCount(areq
->requestee
->instrument
,
7414 TupIsNull(areq
->result
) ? 0.0 : 1.0);
7418 * Create a tuple from the specified row of the PGresult.
7420 * rel is the local representation of the foreign table, attinmeta is
7421 * conversion data for the rel's tupdesc, and retrieved_attrs is an
7422 * integer list of the table column numbers present in the PGresult.
7423 * fsstate is the ForeignScan plan node's execution state.
7424 * temp_context is a working context that can be reset after each tuple.
7426 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
7427 * if we're processing a remote join, while fsstate is NULL in a non-query
7428 * context such as ANALYZE, or if we're processing a non-scan query node.
7431 make_tuple_from_result_row(PGresult
*res
,
7434 AttInMetadata
*attinmeta
,
7435 List
*retrieved_attrs
,
7436 ForeignScanState
*fsstate
,
7437 MemoryContext temp_context
)
7443 ItemPointer ctid
= NULL
;
7444 ConversionLocation errpos
;
7445 ErrorContextCallback errcallback
;
7446 MemoryContext oldcontext
;
7450 Assert(row
< PQntuples(res
));
7453 * Do the following work in a temp context that we reset after each tuple.
7454 * This cleans up not only the data we have direct access to, but any
7455 * cruft the I/O functions might leak.
7457 oldcontext
= MemoryContextSwitchTo(temp_context
);
7460 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
7461 * provided, otherwise look to the scan node's ScanTupleSlot.
7464 tupdesc
= RelationGetDescr(rel
);
7468 tupdesc
= fsstate
->ss
.ss_ScanTupleSlot
->tts_tupleDescriptor
;
7471 values
= (Datum
*) palloc0(tupdesc
->natts
* sizeof(Datum
));
7472 nulls
= (bool *) palloc(tupdesc
->natts
* sizeof(bool));
7473 /* Initialize to nulls for any columns not present in result */
7474 memset(nulls
, true, tupdesc
->natts
* sizeof(bool));
7477 * Set up and install callback to report where conversion error occurs.
7479 errpos
.cur_attno
= 0;
7481 errpos
.fsstate
= fsstate
;
7482 errcallback
.callback
= conversion_error_callback
;
7483 errcallback
.arg
= (void *) &errpos
;
7484 errcallback
.previous
= error_context_stack
;
7485 error_context_stack
= &errcallback
;
7488 * i indexes columns in the relation, j indexes columns in the PGresult.
7491 foreach(lc
, retrieved_attrs
)
7493 int i
= lfirst_int(lc
);
7496 /* fetch next column's textual value */
7497 if (PQgetisnull(res
, row
, j
))
7500 valstr
= PQgetvalue(res
, row
, j
);
7503 * convert value to internal representation
7505 * Note: we ignore system columns other than ctid and oid in result
7507 errpos
.cur_attno
= i
;
7510 /* ordinary column */
7511 Assert(i
<= tupdesc
->natts
);
7512 nulls
[i
- 1] = (valstr
== NULL
);
7513 /* Apply the input function even to nulls, to support domains */
7514 values
[i
- 1] = InputFunctionCall(&attinmeta
->attinfuncs
[i
- 1],
7516 attinmeta
->attioparams
[i
- 1],
7517 attinmeta
->atttypmods
[i
- 1]);
7519 else if (i
== SelfItemPointerAttributeNumber
)
7526 datum
= DirectFunctionCall1(tidin
, CStringGetDatum(valstr
));
7527 ctid
= (ItemPointer
) DatumGetPointer(datum
);
7530 errpos
.cur_attno
= 0;
7535 /* Uninstall error context callback. */
7536 error_context_stack
= errcallback
.previous
;
7539 * Check we got the expected number of columns. Note: j == 0 and
7540 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
7542 if (j
> 0 && j
!= PQnfields(res
))
7543 elog(ERROR
, "remote query result does not match the foreign table");
7546 * Build the result tuple in caller's memory context.
7548 MemoryContextSwitchTo(oldcontext
);
7550 tuple
= heap_form_tuple(tupdesc
, values
, nulls
);
7553 * If we have a CTID to return, install it in both t_self and t_ctid.
7554 * t_self is the normal place, but if the tuple is converted to a
7555 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
7556 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
7559 tuple
->t_self
= tuple
->t_data
->t_ctid
= *ctid
;
7562 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
7563 * heap_form_tuple. heap_form_tuple actually creates the tuple with
7564 * DatumTupleFields, not HeapTupleFields, but the executor expects
7565 * HeapTupleFields and will happily extract system columns on that
7566 * assumption. If we don't do this then, for example, the tuple length
7567 * ends up in the xmin field, which isn't what we want.
7569 HeapTupleHeaderSetXmax(tuple
->t_data
, InvalidTransactionId
);
7570 HeapTupleHeaderSetXmin(tuple
->t_data
, InvalidTransactionId
);
7571 HeapTupleHeaderSetCmin(tuple
->t_data
, InvalidTransactionId
);
7574 MemoryContextReset(temp_context
);
7580 * Callback function which is called when error occurs during column value
7581 * conversion. Print names of column and relation.
7583 * Note that this function mustn't do any catalog lookups, since we are in
7584 * an already-failed transaction. Fortunately, we can get the needed info
7585 * from the relation or the query's rangetable instead.
7588 conversion_error_callback(void *arg
)
7590 ConversionLocation
*errpos
= (ConversionLocation
*) arg
;
7591 Relation rel
= errpos
->rel
;
7592 ForeignScanState
*fsstate
= errpos
->fsstate
;
7593 const char *attname
= NULL
;
7594 const char *relname
= NULL
;
7595 bool is_wholerow
= false;
7598 * If we're in a scan node, always use aliases from the rangetable, for
7599 * consistency between the simple-relation and remote-join cases. Look at
7600 * the relation's tupdesc only if we're not in a scan node.
7604 /* ForeignScan case */
7605 ForeignScan
*fsplan
= castNode(ForeignScan
, fsstate
->ss
.ps
.plan
);
7607 AttrNumber colno
= 0;
7609 if (fsplan
->scan
.scanrelid
> 0)
7611 /* error occurred in a scan against a foreign table */
7612 varno
= fsplan
->scan
.scanrelid
;
7613 colno
= errpos
->cur_attno
;
7617 /* error occurred in a scan against a foreign join */
7620 tle
= list_nth_node(TargetEntry
, fsplan
->fdw_scan_tlist
,
7621 errpos
->cur_attno
- 1);
7624 * Target list can have Vars and expressions. For Vars, we can
7625 * get some information, however for expressions we can't. Thus
7626 * for expressions, just show generic context message.
7628 if (IsA(tle
->expr
, Var
))
7630 Var
*var
= (Var
*) tle
->expr
;
7633 colno
= var
->varattno
;
7639 EState
*estate
= fsstate
->ss
.ps
.state
;
7640 RangeTblEntry
*rte
= exec_rt_fetch(varno
, estate
);
7642 relname
= rte
->eref
->aliasname
;
7646 else if (colno
> 0 && colno
<= list_length(rte
->eref
->colnames
))
7647 attname
= strVal(list_nth(rte
->eref
->colnames
, colno
- 1));
7648 else if (colno
== SelfItemPointerAttributeNumber
)
7654 /* Non-ForeignScan case (we should always have a rel here) */
7655 TupleDesc tupdesc
= RelationGetDescr(rel
);
7657 relname
= RelationGetRelationName(rel
);
7658 if (errpos
->cur_attno
> 0 && errpos
->cur_attno
<= tupdesc
->natts
)
7660 Form_pg_attribute attr
= TupleDescAttr(tupdesc
,
7661 errpos
->cur_attno
- 1);
7663 attname
= NameStr(attr
->attname
);
7665 else if (errpos
->cur_attno
== SelfItemPointerAttributeNumber
)
7669 if (relname
&& is_wholerow
)
7670 errcontext("whole-row reference to foreign table \"%s\"", relname
);
7671 else if (relname
&& attname
)
7672 errcontext("column \"%s\" of foreign table \"%s\"", attname
, relname
);
7674 errcontext("processing expression at position %d in select list",
7679 * Given an EquivalenceClass and a foreign relation, find an EC member
7680 * that can be used to sort the relation remotely according to a pathkey
7683 * If there is more than one suitable candidate, return an arbitrary
7684 * one of them. If there is none, return NULL.
7686 * This checks that the EC member expression uses only Vars from the given
7687 * rel and is shippable. Caller must separately verify that the pathkey's
7688 * ordering operator is shippable.
7691 find_em_for_rel(PlannerInfo
*root
, EquivalenceClass
*ec
, RelOptInfo
*rel
)
7695 foreach(lc
, ec
->ec_members
)
7697 EquivalenceMember
*em
= (EquivalenceMember
*) lfirst(lc
);
7700 * Note we require !bms_is_empty, else we'd accept constant
7701 * expressions which are not suitable for the purpose.
7703 if (bms_is_subset(em
->em_relids
, rel
->relids
) &&
7704 !bms_is_empty(em
->em_relids
) &&
7705 is_foreign_expr(root
, rel
, em
->em_expr
))
7713 * Find an EquivalenceClass member that is to be computed as a sort column
7714 * in the given rel's reltarget, and is shippable.
7716 * If there is more than one suitable candidate, return an arbitrary
7717 * one of them. If there is none, return NULL.
7719 * This checks that the EC member expression uses only Vars from the given
7720 * rel and is shippable. Caller must separately verify that the pathkey's
7721 * ordering operator is shippable.
7724 find_em_for_rel_target(PlannerInfo
*root
, EquivalenceClass
*ec
,
7727 PathTarget
*target
= rel
->reltarget
;
7732 foreach(lc1
, target
->exprs
)
7734 Expr
*expr
= (Expr
*) lfirst(lc1
);
7735 Index sgref
= get_pathtarget_sortgroupref(target
, i
);
7738 /* Ignore non-sort expressions */
7740 get_sortgroupref_clause_noerr(sgref
,
7741 root
->parse
->sortClause
) == NULL
)
7747 /* We ignore binary-compatible relabeling on both ends */
7748 while (expr
&& IsA(expr
, RelabelType
))
7749 expr
= ((RelabelType
*) expr
)->arg
;
7751 /* Locate an EquivalenceClass member matching this expr, if any */
7752 foreach(lc2
, ec
->ec_members
)
7754 EquivalenceMember
*em
= (EquivalenceMember
*) lfirst(lc2
);
7757 /* Don't match constants */
7758 if (em
->em_is_const
)
7761 /* Ignore child members */
7762 if (em
->em_is_child
)
7765 /* Match if same expression (after stripping relabel) */
7766 em_expr
= em
->em_expr
;
7767 while (em_expr
&& IsA(em_expr
, RelabelType
))
7768 em_expr
= ((RelabelType
*) em_expr
)->arg
;
7770 if (!equal(em_expr
, expr
))
7773 /* Check that expression (including relabels!) is shippable */
7774 if (is_foreign_expr(root
, rel
, em
->em_expr
))
7785 * Determine batch size for a given foreign table. The option specified for
7786 * a table has precedence.
7789 get_batch_size_option(Relation rel
)
7791 Oid foreigntableid
= RelationGetRelid(rel
);
7792 ForeignTable
*table
;
7793 ForeignServer
*server
;
7797 /* we use 1 by default, which means "no batching" */
7801 * Load options for table and server. We append server options after table
7802 * options, because table options take precedence.
7804 table
= GetForeignTable(foreigntableid
);
7805 server
= GetForeignServer(table
->serverid
);
7808 options
= list_concat(options
, table
->options
);
7809 options
= list_concat(options
, server
->options
);
7811 /* See if either table or server specifies batch_size. */
7812 foreach(lc
, options
)
7814 DefElem
*def
= (DefElem
*) lfirst(lc
);
7816 if (strcmp(def
->defname
, "batch_size") == 0)
7818 (void) parse_int(defGetString(def
), &batch_size
, 0, NULL
);